##// END OF EJS Templates
fix for #1809, failing tests in IPython.zmq...
Jörgen Stenarson -
Show More
@@ -1,192 +1,196 b''
1 """test IPython.embed_kernel()"""
1 """test IPython.embed_kernel()"""
2
2
3 #-------------------------------------------------------------------------------
3 #-------------------------------------------------------------------------------
4 # Copyright (C) 2012 The IPython Development Team
4 # Copyright (C) 2012 The IPython Development Team
5 #
5 #
6 # Distributed under the terms of the BSD License. The full license is in
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
7 # the file COPYING, distributed as part of this software.
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9
9
10 #-------------------------------------------------------------------------------
10 #-------------------------------------------------------------------------------
11 # Imports
11 # Imports
12 #-------------------------------------------------------------------------------
12 #-------------------------------------------------------------------------------
13
13
14 import os
14 import os
15 import shutil
15 import shutil
16 import sys
16 import sys
17 import tempfile
17 import tempfile
18 import time
18 import time
19
19
20 from contextlib import contextmanager
20 from contextlib import contextmanager
21 from subprocess import Popen, PIPE
21 from subprocess import Popen, PIPE
22
22
23 import nose.tools as nt
23 import nose.tools as nt
24
24
25 from IPython.zmq.blockingkernelmanager import BlockingKernelManager
25 from IPython.zmq.blockingkernelmanager import BlockingKernelManager
26 from IPython.utils import path, py3compat
26 from IPython.utils import path, py3compat
27
27
28 #-------------------------------------------------------------------------------
28 #-------------------------------------------------------------------------------
29 # Tests
29 # Tests
30 #-------------------------------------------------------------------------------
30 #-------------------------------------------------------------------------------
31
31
32 def setup():
32 def setup():
33 """setup temporary IPYTHONDIR for tests"""
33 """setup temporary IPYTHONDIR for tests"""
34 global IPYTHONDIR
34 global IPYTHONDIR
35 global env
35 global env
36 global save_get_ipython_dir
36 global save_get_ipython_dir
37
37
38 IPYTHONDIR = tempfile.mkdtemp()
38 IPYTHONDIR = tempfile.mkdtemp()
39
39 env = dict(IPYTHONDIR=IPYTHONDIR)
40 env = dict(IPYTHONDIR=IPYTHONDIR)
40 if 'PYTHONPATH' in os.environ:
41 if 'PYTHONPATH' in os.environ:
41 env['PYTHONPATH'] = os.environ['PYTHONPATH']
42 env['PYTHONPATH'] = os.environ['PYTHONPATH']
43 if sys.platform == 'win32':
44 env["SYSTEMROOT"] = os.environ["SYSTEMROOT"]
45
42 save_get_ipython_dir = path.get_ipython_dir
46 save_get_ipython_dir = path.get_ipython_dir
43 path.get_ipython_dir = lambda : IPYTHONDIR
47 path.get_ipython_dir = lambda : IPYTHONDIR
44
48
45
49
46 def teardown():
50 def teardown():
47 path.get_ipython_dir = save_get_ipython_dir
51 path.get_ipython_dir = save_get_ipython_dir
48
52
49 try:
53 try:
50 shutil.rmtree(IPYTHONDIR)
54 shutil.rmtree(IPYTHONDIR)
51 except (OSError, IOError):
55 except (OSError, IOError):
52 # no such file
56 # no such file
53 pass
57 pass
54
58
55
59
56 @contextmanager
60 @contextmanager
57 def setup_kernel(cmd):
61 def setup_kernel(cmd):
58 """start an embedded kernel in a subprocess, and wait for it to be ready
62 """start an embedded kernel in a subprocess, and wait for it to be ready
59
63
60 Returns
64 Returns
61 -------
65 -------
62 kernel_manager: connected KernelManager instance
66 kernel_manager: connected KernelManager instance
63 """
67 """
64 kernel = Popen([sys.executable, '-c', cmd], stdout=PIPE, stderr=PIPE, env=env)
68 kernel = Popen([sys.executable, '-c', cmd], stdout=PIPE, stderr=PIPE, env=env)
65 connection_file = os.path.join(IPYTHONDIR,
69 connection_file = os.path.join(IPYTHONDIR,
66 'profile_default',
70 'profile_default',
67 'security',
71 'security',
68 'kernel-%i.json' % kernel.pid
72 'kernel-%i.json' % kernel.pid
69 )
73 )
70 # wait for connection file to exist, timeout after 5s
74 # wait for connection file to exist, timeout after 5s
71 tic = time.time()
75 tic = time.time()
72 while not os.path.exists(connection_file) and kernel.poll() is None and time.time() < tic + 10:
76 while not os.path.exists(connection_file) and kernel.poll() is None and time.time() < tic + 10:
73 time.sleep(0.1)
77 time.sleep(0.1)
74
78
75 if kernel.poll() is not None:
79 if kernel.poll() is not None:
76 o,e = kernel.communicate()
80 o,e = kernel.communicate()
77 e = py3compat.cast_unicode(e)
81 e = py3compat.cast_unicode(e)
78 raise IOError("Kernel failed to start:\n%s" % e)
82 raise IOError("Kernel failed to start:\n%s" % e)
79
83
80 if not os.path.exists(connection_file):
84 if not os.path.exists(connection_file):
81 if kernel.poll() is None:
85 if kernel.poll() is None:
82 kernel.terminate()
86 kernel.terminate()
83 raise IOError("Connection file %r never arrived" % connection_file)
87 raise IOError("Connection file %r never arrived" % connection_file)
84
88
85 km = BlockingKernelManager(connection_file=connection_file)
89 km = BlockingKernelManager(connection_file=connection_file)
86 km.load_connection_file()
90 km.load_connection_file()
87 km.start_channels()
91 km.start_channels()
88
92
89 try:
93 try:
90 yield km
94 yield km
91 finally:
95 finally:
92 km.stop_channels()
96 km.stop_channels()
93 kernel.terminate()
97 kernel.terminate()
94
98
95 def test_embed_kernel_basic():
99 def test_embed_kernel_basic():
96 """IPython.embed_kernel() is basically functional"""
100 """IPython.embed_kernel() is basically functional"""
97 cmd = '\n'.join([
101 cmd = '\n'.join([
98 'from IPython import embed_kernel',
102 'from IPython import embed_kernel',
99 'def go():',
103 'def go():',
100 ' a=5',
104 ' a=5',
101 ' b="hi there"',
105 ' b="hi there"',
102 ' embed_kernel()',
106 ' embed_kernel()',
103 'go()',
107 'go()',
104 '',
108 '',
105 ])
109 ])
106
110
107 with setup_kernel(cmd) as km:
111 with setup_kernel(cmd) as km:
108 shell = km.shell_channel
112 shell = km.shell_channel
109
113
110 # oinfo a (int)
114 # oinfo a (int)
111 msg_id = shell.object_info('a')
115 msg_id = shell.object_info('a')
112 msg = shell.get_msg(block=True, timeout=2)
116 msg = shell.get_msg(block=True, timeout=2)
113 content = msg['content']
117 content = msg['content']
114 nt.assert_true(content['found'])
118 nt.assert_true(content['found'])
115
119
116 msg_id = shell.execute("c=a*2")
120 msg_id = shell.execute("c=a*2")
117 msg = shell.get_msg(block=True, timeout=2)
121 msg = shell.get_msg(block=True, timeout=2)
118 content = msg['content']
122 content = msg['content']
119 nt.assert_equals(content['status'], u'ok')
123 nt.assert_equals(content['status'], u'ok')
120
124
121 # oinfo c (should be 10)
125 # oinfo c (should be 10)
122 msg_id = shell.object_info('c')
126 msg_id = shell.object_info('c')
123 msg = shell.get_msg(block=True, timeout=2)
127 msg = shell.get_msg(block=True, timeout=2)
124 content = msg['content']
128 content = msg['content']
125 nt.assert_true(content['found'])
129 nt.assert_true(content['found'])
126 nt.assert_equals(content['string_form'], u'10')
130 nt.assert_equals(content['string_form'], u'10')
127
131
128 def test_embed_kernel_namespace():
132 def test_embed_kernel_namespace():
129 """IPython.embed_kernel() inherits calling namespace"""
133 """IPython.embed_kernel() inherits calling namespace"""
130 cmd = '\n'.join([
134 cmd = '\n'.join([
131 'from IPython import embed_kernel',
135 'from IPython import embed_kernel',
132 'def go():',
136 'def go():',
133 ' a=5',
137 ' a=5',
134 ' b="hi there"',
138 ' b="hi there"',
135 ' embed_kernel()',
139 ' embed_kernel()',
136 'go()',
140 'go()',
137 '',
141 '',
138 ])
142 ])
139
143
140 with setup_kernel(cmd) as km:
144 with setup_kernel(cmd) as km:
141 shell = km.shell_channel
145 shell = km.shell_channel
142
146
143 # oinfo a (int)
147 # oinfo a (int)
144 msg_id = shell.object_info('a')
148 msg_id = shell.object_info('a')
145 msg = shell.get_msg(block=True, timeout=2)
149 msg = shell.get_msg(block=True, timeout=2)
146 content = msg['content']
150 content = msg['content']
147 nt.assert_true(content['found'])
151 nt.assert_true(content['found'])
148 nt.assert_equals(content['string_form'], u'5')
152 nt.assert_equals(content['string_form'], u'5')
149
153
150 # oinfo b (str)
154 # oinfo b (str)
151 msg_id = shell.object_info('b')
155 msg_id = shell.object_info('b')
152 msg = shell.get_msg(block=True, timeout=2)
156 msg = shell.get_msg(block=True, timeout=2)
153 content = msg['content']
157 content = msg['content']
154 nt.assert_true(content['found'])
158 nt.assert_true(content['found'])
155 nt.assert_equals(content['string_form'], u'hi there')
159 nt.assert_equals(content['string_form'], u'hi there')
156
160
157 # oinfo c (undefined)
161 # oinfo c (undefined)
158 msg_id = shell.object_info('c')
162 msg_id = shell.object_info('c')
159 msg = shell.get_msg(block=True, timeout=2)
163 msg = shell.get_msg(block=True, timeout=2)
160 content = msg['content']
164 content = msg['content']
161 nt.assert_false(content['found'])
165 nt.assert_false(content['found'])
162
166
163 def test_embed_kernel_reentrant():
167 def test_embed_kernel_reentrant():
164 """IPython.embed_kernel() can be called multiple times"""
168 """IPython.embed_kernel() can be called multiple times"""
165 cmd = '\n'.join([
169 cmd = '\n'.join([
166 'from IPython import embed_kernel',
170 'from IPython import embed_kernel',
167 'count = 0',
171 'count = 0',
168 'def go():',
172 'def go():',
169 ' global count',
173 ' global count',
170 ' embed_kernel()',
174 ' embed_kernel()',
171 ' count = count + 1',
175 ' count = count + 1',
172 '',
176 '',
173 'while True:'
177 'while True:'
174 ' go()',
178 ' go()',
175 '',
179 '',
176 ])
180 ])
177
181
178 with setup_kernel(cmd) as km:
182 with setup_kernel(cmd) as km:
179 shell = km.shell_channel
183 shell = km.shell_channel
180 for i in range(5):
184 for i in range(5):
181 msg_id = shell.object_info('count')
185 msg_id = shell.object_info('count')
182 msg = shell.get_msg(block=True, timeout=2)
186 msg = shell.get_msg(block=True, timeout=2)
183 content = msg['content']
187 content = msg['content']
184 nt.assert_true(content['found'])
188 nt.assert_true(content['found'])
185 nt.assert_equals(content['string_form'], unicode(i))
189 nt.assert_equals(content['string_form'], unicode(i))
186
190
187 # exit from embed_kernel
191 # exit from embed_kernel
188 shell.execute("get_ipython().exit_now = True")
192 shell.execute("get_ipython().exit_now = True")
189 msg = shell.get_msg(block=True, timeout=2)
193 msg = shell.get_msg(block=True, timeout=2)
190 time.sleep(0.2)
194 time.sleep(0.2)
191
195
192
196
General Comments 0
You need to be logged in to leave comments. Login now