##// END OF EJS Templates
switch order of failed-to-start errors in test_embed_kernel...
MinRK -
Show More
@@ -1,188 +1,190 b''
1 """test IPython.embed_kernel()"""
1 """test IPython.embed_kernel()"""
2
2
3 #-------------------------------------------------------------------------------
3 #-------------------------------------------------------------------------------
4 # Copyright (C) 2012 The IPython Development Team
4 # Copyright (C) 2012 The IPython Development Team
5 #
5 #
6 # Distributed under the terms of the BSD License. The full license is in
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
7 # the file COPYING, distributed as part of this software.
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9
9
10 #-------------------------------------------------------------------------------
10 #-------------------------------------------------------------------------------
11 # Imports
11 # Imports
12 #-------------------------------------------------------------------------------
12 #-------------------------------------------------------------------------------
13
13
14 import os
14 import os
15 import shutil
15 import shutil
16 import sys
16 import sys
17 import tempfile
17 import tempfile
18 import time
18 import time
19
19
20 from contextlib import contextmanager
20 from contextlib import contextmanager
21 from subprocess import Popen, PIPE
21 from subprocess import Popen, PIPE
22
22
23 import nose.tools as nt
23 import nose.tools as nt
24
24
25 from IPython.zmq.blockingkernelmanager import BlockingKernelManager
25 from IPython.zmq.blockingkernelmanager import BlockingKernelManager
26 from IPython.utils import path
26 from IPython.utils import path, py3compat
27
27
28
28
29 #-------------------------------------------------------------------------------
29 #-------------------------------------------------------------------------------
30 # Tests
30 # Tests
31 #-------------------------------------------------------------------------------
31 #-------------------------------------------------------------------------------
32
32
33 def setup():
33 def setup():
34 """setup temporary IPYTHONDIR for tests"""
34 """setup temporary IPYTHONDIR for tests"""
35 global IPYTHONDIR
35 global IPYTHONDIR
36 global env
36 global env
37 global save_get_ipython_dir
37 global save_get_ipython_dir
38
38
39 IPYTHONDIR = tempfile.mkdtemp()
39 IPYTHONDIR = tempfile.mkdtemp()
40 env = dict(IPYTHONDIR=IPYTHONDIR)
40 env = dict(IPYTHONDIR=IPYTHONDIR)
41 save_get_ipython_dir = path.get_ipython_dir
41 save_get_ipython_dir = path.get_ipython_dir
42 path.get_ipython_dir = lambda : IPYTHONDIR
42 path.get_ipython_dir = lambda : IPYTHONDIR
43
43
44
44
45 def teardown():
45 def teardown():
46 path.get_ipython_dir = save_get_ipython_dir
46 path.get_ipython_dir = save_get_ipython_dir
47
47
48 try:
48 try:
49 shutil.rmtree(IPYTHONDIR)
49 shutil.rmtree(IPYTHONDIR)
50 except (OSError, IOError):
50 except (OSError, IOError):
51 # no such file
51 # no such file
52 pass
52 pass
53
53
54
54
55 @contextmanager
55 @contextmanager
56 def setup_kernel(cmd):
56 def setup_kernel(cmd):
57 """start an embedded kernel in a subprocess, and wait for it to be ready
57 """start an embedded kernel in a subprocess, and wait for it to be ready
58
58
59 Returns
59 Returns
60 -------
60 -------
61 kernel_manager: connected KernelManager instance
61 kernel_manager: connected KernelManager instance
62 """
62 """
63 kernel = Popen([sys.executable, '-c', cmd], stdout=PIPE, stderr=PIPE, env=env)
63 kernel = Popen([sys.executable, '-c', cmd], stdout=PIPE, stderr=PIPE, env=env)
64 connection_file = os.path.join(IPYTHONDIR,
64 connection_file = os.path.join(IPYTHONDIR,
65 'profile_default',
65 'profile_default',
66 'security',
66 'security',
67 'kernel-%i.json' % kernel.pid
67 'kernel-%i.json' % kernel.pid
68 )
68 )
69 # wait for connection file to exist, timeout after 5s
69 # wait for connection file to exist, timeout after 5s
70 tic = time.time()
70 tic = time.time()
71 while not os.path.exists(connection_file) and kernel.poll() is None and time.time() < tic + 5:
71 while not os.path.exists(connection_file) and kernel.poll() is None and time.time() < tic + 10:
72 time.sleep(0.1)
72 time.sleep(0.1)
73
73
74 if kernel.poll() is not None:
75 o,e = kernel.communicate()
76 e = py3compat.cast_unicode(e)
77 raise IOError("Kernel failed to start:\n%s" % e)
78
74 if not os.path.exists(connection_file):
79 if not os.path.exists(connection_file):
75 if kernel.poll() is None:
80 if kernel.poll() is None:
76 kernel.terminate()
81 kernel.terminate()
77 raise IOError("Connection file %r never arrived" % connection_file)
82 raise IOError("Connection file %r never arrived" % connection_file)
78
83
79 if kernel.poll() is not None:
80 raise IOError("Kernel failed to start")
81
82 km = BlockingKernelManager(connection_file=connection_file)
84 km = BlockingKernelManager(connection_file=connection_file)
83 km.load_connection_file()
85 km.load_connection_file()
84 km.start_channels()
86 km.start_channels()
85
87
86 try:
88 try:
87 yield km
89 yield km
88 finally:
90 finally:
89 km.stop_channels()
91 km.stop_channels()
90
92
91 def test_embed_kernel_basic():
93 def test_embed_kernel_basic():
92 """IPython.embed_kernel() is basically functional"""
94 """IPython.embed_kernel() is basically functional"""
93 cmd = '\n'.join([
95 cmd = '\n'.join([
94 'from IPython import embed_kernel',
96 'from IPython import embed_kernel',
95 'def go():',
97 'def go():',
96 ' a=5',
98 ' a=5',
97 ' b="hi there"',
99 ' b="hi there"',
98 ' embed_kernel()',
100 ' embed_kernel()',
99 'go()',
101 'go()',
100 '',
102 '',
101 ])
103 ])
102
104
103 with setup_kernel(cmd) as km:
105 with setup_kernel(cmd) as km:
104 shell = km.shell_channel
106 shell = km.shell_channel
105
107
106 # oinfo a (int)
108 # oinfo a (int)
107 msg_id = shell.object_info('a')
109 msg_id = shell.object_info('a')
108 msg = shell.get_msg(block=True, timeout=2)
110 msg = shell.get_msg(block=True, timeout=2)
109 content = msg['content']
111 content = msg['content']
110 nt.assert_true(content['found'])
112 nt.assert_true(content['found'])
111
113
112 msg_id = shell.execute("c=a*2")
114 msg_id = shell.execute("c=a*2")
113 msg = shell.get_msg(block=True, timeout=2)
115 msg = shell.get_msg(block=True, timeout=2)
114 content = msg['content']
116 content = msg['content']
115 nt.assert_equals(content['status'], u'ok')
117 nt.assert_equals(content['status'], u'ok')
116
118
117 # oinfo c (should be 10)
119 # oinfo c (should be 10)
118 msg_id = shell.object_info('c')
120 msg_id = shell.object_info('c')
119 msg = shell.get_msg(block=True, timeout=2)
121 msg = shell.get_msg(block=True, timeout=2)
120 content = msg['content']
122 content = msg['content']
121 nt.assert_true(content['found'])
123 nt.assert_true(content['found'])
122 nt.assert_equals(content['string_form'], u'10')
124 nt.assert_equals(content['string_form'], u'10')
123
125
124 def test_embed_kernel_namespace():
126 def test_embed_kernel_namespace():
125 """IPython.embed_kernel() inherits calling namespace"""
127 """IPython.embed_kernel() inherits calling namespace"""
126 cmd = '\n'.join([
128 cmd = '\n'.join([
127 'from IPython import embed_kernel',
129 'from IPython import embed_kernel',
128 'def go():',
130 'def go():',
129 ' a=5',
131 ' a=5',
130 ' b="hi there"',
132 ' b="hi there"',
131 ' embed_kernel()',
133 ' embed_kernel()',
132 'go()',
134 'go()',
133 '',
135 '',
134 ])
136 ])
135
137
136 with setup_kernel(cmd) as km:
138 with setup_kernel(cmd) as km:
137 shell = km.shell_channel
139 shell = km.shell_channel
138
140
139 # oinfo a (int)
141 # oinfo a (int)
140 msg_id = shell.object_info('a')
142 msg_id = shell.object_info('a')
141 msg = shell.get_msg(block=True, timeout=2)
143 msg = shell.get_msg(block=True, timeout=2)
142 content = msg['content']
144 content = msg['content']
143 nt.assert_true(content['found'])
145 nt.assert_true(content['found'])
144 nt.assert_equals(content['string_form'], u'5')
146 nt.assert_equals(content['string_form'], u'5')
145
147
146 # oinfo b (str)
148 # oinfo b (str)
147 msg_id = shell.object_info('b')
149 msg_id = shell.object_info('b')
148 msg = shell.get_msg(block=True, timeout=2)
150 msg = shell.get_msg(block=True, timeout=2)
149 content = msg['content']
151 content = msg['content']
150 nt.assert_true(content['found'])
152 nt.assert_true(content['found'])
151 nt.assert_equals(content['string_form'], u'hi there')
153 nt.assert_equals(content['string_form'], u'hi there')
152
154
153 # oinfo c (undefined)
155 # oinfo c (undefined)
154 msg_id = shell.object_info('c')
156 msg_id = shell.object_info('c')
155 msg = shell.get_msg(block=True, timeout=2)
157 msg = shell.get_msg(block=True, timeout=2)
156 content = msg['content']
158 content = msg['content']
157 nt.assert_false(content['found'])
159 nt.assert_false(content['found'])
158
160
159 def test_embed_kernel_reentrant():
161 def test_embed_kernel_reentrant():
160 """IPython.embed_kernel() can be called multiple times"""
162 """IPython.embed_kernel() can be called multiple times"""
161 cmd = '\n'.join([
163 cmd = '\n'.join([
162 'from IPython import embed_kernel',
164 'from IPython import embed_kernel',
163 'count = 0',
165 'count = 0',
164 'def go():',
166 'def go():',
165 ' global count',
167 ' global count',
166 ' embed_kernel()',
168 ' embed_kernel()',
167 ' count = count + 1',
169 ' count = count + 1',
168 '',
170 '',
169 'while True:'
171 'while True:'
170 ' go()',
172 ' go()',
171 '',
173 '',
172 ])
174 ])
173
175
174 with setup_kernel(cmd) as km:
176 with setup_kernel(cmd) as km:
175 shell = km.shell_channel
177 shell = km.shell_channel
176 for i in range(5):
178 for i in range(5):
177 msg_id = shell.object_info('count')
179 msg_id = shell.object_info('count')
178 msg = shell.get_msg(block=True, timeout=2)
180 msg = shell.get_msg(block=True, timeout=2)
179 content = msg['content']
181 content = msg['content']
180 nt.assert_true(content['found'])
182 nt.assert_true(content['found'])
181 nt.assert_equals(content['string_form'], unicode(i))
183 nt.assert_equals(content['string_form'], unicode(i))
182
184
183 # exit from embed_kernel
185 # exit from embed_kernel
184 shell.execute("get_ipython().exit_now = True")
186 shell.execute("get_ipython().exit_now = True")
185 msg = shell.get_msg(block=True, timeout=2)
187 msg = shell.get_msg(block=True, timeout=2)
186 time.sleep(0.2)
188 time.sleep(0.2)
187
189
188
190
General Comments 0
You need to be logged in to leave comments. Login now