##// END OF EJS Templates
fix for #1809, failing tests in IPython.zmq...
Jörgen Stenarson -
Show More
@@ -1,192 +1,196
1 1 """test IPython.embed_kernel()"""
2 2
3 3 #-------------------------------------------------------------------------------
4 4 # Copyright (C) 2012 The IPython Development Team
5 5 #
6 6 # Distributed under the terms of the BSD License. The full license is in
7 7 # the file COPYING, distributed as part of this software.
8 8 #-------------------------------------------------------------------------------
9 9
10 10 #-------------------------------------------------------------------------------
11 11 # Imports
12 12 #-------------------------------------------------------------------------------
13 13
14 14 import os
15 15 import shutil
16 16 import sys
17 17 import tempfile
18 18 import time
19 19
20 20 from contextlib import contextmanager
21 21 from subprocess import Popen, PIPE
22 22
23 23 import nose.tools as nt
24 24
25 25 from IPython.zmq.blockingkernelmanager import BlockingKernelManager
26 26 from IPython.utils import path, py3compat
27 27
28 28 #-------------------------------------------------------------------------------
29 29 # Tests
30 30 #-------------------------------------------------------------------------------
31 31
32 32 def setup():
33 33 """setup temporary IPYTHONDIR for tests"""
34 34 global IPYTHONDIR
35 35 global env
36 36 global save_get_ipython_dir
37 37
38 38 IPYTHONDIR = tempfile.mkdtemp()
39
39 40 env = dict(IPYTHONDIR=IPYTHONDIR)
40 41 if 'PYTHONPATH' in os.environ:
41 42 env['PYTHONPATH'] = os.environ['PYTHONPATH']
43 if sys.platform == 'win32':
44 env["SYSTEMROOT"] = os.environ["SYSTEMROOT"]
45
42 46 save_get_ipython_dir = path.get_ipython_dir
43 47 path.get_ipython_dir = lambda : IPYTHONDIR
44 48
45 49
46 50 def teardown():
47 51 path.get_ipython_dir = save_get_ipython_dir
48 52
49 53 try:
50 54 shutil.rmtree(IPYTHONDIR)
51 55 except (OSError, IOError):
52 56 # no such file
53 57 pass
54 58
55 59
56 60 @contextmanager
57 61 def setup_kernel(cmd):
58 62 """start an embedded kernel in a subprocess, and wait for it to be ready
59 63
60 64 Returns
61 65 -------
62 66 kernel_manager: connected KernelManager instance
63 67 """
64 68 kernel = Popen([sys.executable, '-c', cmd], stdout=PIPE, stderr=PIPE, env=env)
65 69 connection_file = os.path.join(IPYTHONDIR,
66 70 'profile_default',
67 71 'security',
68 72 'kernel-%i.json' % kernel.pid
69 73 )
70 74 # wait for connection file to exist, timeout after 5s
71 75 tic = time.time()
72 76 while not os.path.exists(connection_file) and kernel.poll() is None and time.time() < tic + 10:
73 77 time.sleep(0.1)
74 78
75 79 if kernel.poll() is not None:
76 80 o,e = kernel.communicate()
77 81 e = py3compat.cast_unicode(e)
78 82 raise IOError("Kernel failed to start:\n%s" % e)
79 83
80 84 if not os.path.exists(connection_file):
81 85 if kernel.poll() is None:
82 86 kernel.terminate()
83 87 raise IOError("Connection file %r never arrived" % connection_file)
84 88
85 89 km = BlockingKernelManager(connection_file=connection_file)
86 90 km.load_connection_file()
87 91 km.start_channels()
88 92
89 93 try:
90 94 yield km
91 95 finally:
92 96 km.stop_channels()
93 97 kernel.terminate()
94 98
95 99 def test_embed_kernel_basic():
96 100 """IPython.embed_kernel() is basically functional"""
97 101 cmd = '\n'.join([
98 102 'from IPython import embed_kernel',
99 103 'def go():',
100 104 ' a=5',
101 105 ' b="hi there"',
102 106 ' embed_kernel()',
103 107 'go()',
104 108 '',
105 109 ])
106 110
107 111 with setup_kernel(cmd) as km:
108 112 shell = km.shell_channel
109 113
110 114 # oinfo a (int)
111 115 msg_id = shell.object_info('a')
112 116 msg = shell.get_msg(block=True, timeout=2)
113 117 content = msg['content']
114 118 nt.assert_true(content['found'])
115 119
116 120 msg_id = shell.execute("c=a*2")
117 121 msg = shell.get_msg(block=True, timeout=2)
118 122 content = msg['content']
119 123 nt.assert_equals(content['status'], u'ok')
120 124
121 125 # oinfo c (should be 10)
122 126 msg_id = shell.object_info('c')
123 127 msg = shell.get_msg(block=True, timeout=2)
124 128 content = msg['content']
125 129 nt.assert_true(content['found'])
126 130 nt.assert_equals(content['string_form'], u'10')
127 131
128 132 def test_embed_kernel_namespace():
129 133 """IPython.embed_kernel() inherits calling namespace"""
130 134 cmd = '\n'.join([
131 135 'from IPython import embed_kernel',
132 136 'def go():',
133 137 ' a=5',
134 138 ' b="hi there"',
135 139 ' embed_kernel()',
136 140 'go()',
137 141 '',
138 142 ])
139 143
140 144 with setup_kernel(cmd) as km:
141 145 shell = km.shell_channel
142 146
143 147 # oinfo a (int)
144 148 msg_id = shell.object_info('a')
145 149 msg = shell.get_msg(block=True, timeout=2)
146 150 content = msg['content']
147 151 nt.assert_true(content['found'])
148 152 nt.assert_equals(content['string_form'], u'5')
149 153
150 154 # oinfo b (str)
151 155 msg_id = shell.object_info('b')
152 156 msg = shell.get_msg(block=True, timeout=2)
153 157 content = msg['content']
154 158 nt.assert_true(content['found'])
155 159 nt.assert_equals(content['string_form'], u'hi there')
156 160
157 161 # oinfo c (undefined)
158 162 msg_id = shell.object_info('c')
159 163 msg = shell.get_msg(block=True, timeout=2)
160 164 content = msg['content']
161 165 nt.assert_false(content['found'])
162 166
163 167 def test_embed_kernel_reentrant():
164 168 """IPython.embed_kernel() can be called multiple times"""
165 169 cmd = '\n'.join([
166 170 'from IPython import embed_kernel',
167 171 'count = 0',
168 172 'def go():',
169 173 ' global count',
170 174 ' embed_kernel()',
171 175 ' count = count + 1',
172 176 '',
173 177 'while True:'
174 178 ' go()',
175 179 '',
176 180 ])
177 181
178 182 with setup_kernel(cmd) as km:
179 183 shell = km.shell_channel
180 184 for i in range(5):
181 185 msg_id = shell.object_info('count')
182 186 msg = shell.get_msg(block=True, timeout=2)
183 187 content = msg['content']
184 188 nt.assert_true(content['found'])
185 189 nt.assert_equals(content['string_form'], unicode(i))
186 190
187 191 # exit from embed_kernel
188 192 shell.execute("get_ipython().exit_now = True")
189 193 msg = shell.get_msg(block=True, timeout=2)
190 194 time.sleep(0.2)
191 195
192 196
General Comments 0
You need to be logged in to leave comments. Login now