##// END OF EJS Templates
switch order of failed-to-start errors in test_embed_kernel...
MinRK -
Show More
@@ -1,188 +1,190 b''
1 1 """test IPython.embed_kernel()"""
2 2
3 3 #-------------------------------------------------------------------------------
4 4 # Copyright (C) 2012 The IPython Development Team
5 5 #
6 6 # Distributed under the terms of the BSD License. The full license is in
7 7 # the file COPYING, distributed as part of this software.
8 8 #-------------------------------------------------------------------------------
9 9
10 10 #-------------------------------------------------------------------------------
11 11 # Imports
12 12 #-------------------------------------------------------------------------------
13 13
14 14 import os
15 15 import shutil
16 16 import sys
17 17 import tempfile
18 18 import time
19 19
20 20 from contextlib import contextmanager
21 21 from subprocess import Popen, PIPE
22 22
23 23 import nose.tools as nt
24 24
25 25 from IPython.zmq.blockingkernelmanager import BlockingKernelManager
26 from IPython.utils import path
26 from IPython.utils import path, py3compat
27 27
28 28
29 29 #-------------------------------------------------------------------------------
30 30 # Tests
31 31 #-------------------------------------------------------------------------------
32 32
33 33 def setup():
34 34 """setup temporary IPYTHONDIR for tests"""
35 35 global IPYTHONDIR
36 36 global env
37 37 global save_get_ipython_dir
38 38
39 39 IPYTHONDIR = tempfile.mkdtemp()
40 40 env = dict(IPYTHONDIR=IPYTHONDIR)
41 41 save_get_ipython_dir = path.get_ipython_dir
42 42 path.get_ipython_dir = lambda : IPYTHONDIR
43 43
44 44
45 45 def teardown():
46 46 path.get_ipython_dir = save_get_ipython_dir
47 47
48 48 try:
49 49 shutil.rmtree(IPYTHONDIR)
50 50 except (OSError, IOError):
51 51 # no such file
52 52 pass
53 53
54 54
55 55 @contextmanager
56 56 def setup_kernel(cmd):
57 57 """start an embedded kernel in a subprocess, and wait for it to be ready
58 58
59 59 Returns
60 60 -------
61 61 kernel_manager: connected KernelManager instance
62 62 """
63 63 kernel = Popen([sys.executable, '-c', cmd], stdout=PIPE, stderr=PIPE, env=env)
64 64 connection_file = os.path.join(IPYTHONDIR,
65 65 'profile_default',
66 66 'security',
67 67 'kernel-%i.json' % kernel.pid
68 68 )
69 69 # wait for connection file to exist, timeout after 5s
70 70 tic = time.time()
71 while not os.path.exists(connection_file) and kernel.poll() is None and time.time() < tic + 5:
71 while not os.path.exists(connection_file) and kernel.poll() is None and time.time() < tic + 10:
72 72 time.sleep(0.1)
73 73
74 if kernel.poll() is not None:
75 o,e = kernel.communicate()
76 e = py3compat.cast_unicode(e)
77 raise IOError("Kernel failed to start:\n%s" % e)
78
74 79 if not os.path.exists(connection_file):
75 80 if kernel.poll() is None:
76 81 kernel.terminate()
77 82 raise IOError("Connection file %r never arrived" % connection_file)
78 83
79 if kernel.poll() is not None:
80 raise IOError("Kernel failed to start")
81
82 84 km = BlockingKernelManager(connection_file=connection_file)
83 85 km.load_connection_file()
84 86 km.start_channels()
85 87
86 88 try:
87 89 yield km
88 90 finally:
89 91 km.stop_channels()
90 92
91 93 def test_embed_kernel_basic():
92 94 """IPython.embed_kernel() is basically functional"""
93 95 cmd = '\n'.join([
94 96 'from IPython import embed_kernel',
95 97 'def go():',
96 98 ' a=5',
97 99 ' b="hi there"',
98 100 ' embed_kernel()',
99 101 'go()',
100 102 '',
101 103 ])
102 104
103 105 with setup_kernel(cmd) as km:
104 106 shell = km.shell_channel
105 107
106 108 # oinfo a (int)
107 109 msg_id = shell.object_info('a')
108 110 msg = shell.get_msg(block=True, timeout=2)
109 111 content = msg['content']
110 112 nt.assert_true(content['found'])
111 113
112 114 msg_id = shell.execute("c=a*2")
113 115 msg = shell.get_msg(block=True, timeout=2)
114 116 content = msg['content']
115 117 nt.assert_equals(content['status'], u'ok')
116 118
117 119 # oinfo c (should be 10)
118 120 msg_id = shell.object_info('c')
119 121 msg = shell.get_msg(block=True, timeout=2)
120 122 content = msg['content']
121 123 nt.assert_true(content['found'])
122 124 nt.assert_equals(content['string_form'], u'10')
123 125
124 126 def test_embed_kernel_namespace():
125 127 """IPython.embed_kernel() inherits calling namespace"""
126 128 cmd = '\n'.join([
127 129 'from IPython import embed_kernel',
128 130 'def go():',
129 131 ' a=5',
130 132 ' b="hi there"',
131 133 ' embed_kernel()',
132 134 'go()',
133 135 '',
134 136 ])
135 137
136 138 with setup_kernel(cmd) as km:
137 139 shell = km.shell_channel
138 140
139 141 # oinfo a (int)
140 142 msg_id = shell.object_info('a')
141 143 msg = shell.get_msg(block=True, timeout=2)
142 144 content = msg['content']
143 145 nt.assert_true(content['found'])
144 146 nt.assert_equals(content['string_form'], u'5')
145 147
146 148 # oinfo b (str)
147 149 msg_id = shell.object_info('b')
148 150 msg = shell.get_msg(block=True, timeout=2)
149 151 content = msg['content']
150 152 nt.assert_true(content['found'])
151 153 nt.assert_equals(content['string_form'], u'hi there')
152 154
153 155 # oinfo c (undefined)
154 156 msg_id = shell.object_info('c')
155 157 msg = shell.get_msg(block=True, timeout=2)
156 158 content = msg['content']
157 159 nt.assert_false(content['found'])
158 160
159 161 def test_embed_kernel_reentrant():
160 162 """IPython.embed_kernel() can be called multiple times"""
161 163 cmd = '\n'.join([
162 164 'from IPython import embed_kernel',
163 165 'count = 0',
164 166 'def go():',
165 167 ' global count',
166 168 ' embed_kernel()',
167 169 ' count = count + 1',
168 170 '',
169 171 'while True:'
170 172 ' go()',
171 173 '',
172 174 ])
173 175
174 176 with setup_kernel(cmd) as km:
175 177 shell = km.shell_channel
176 178 for i in range(5):
177 179 msg_id = shell.object_info('count')
178 180 msg = shell.get_msg(block=True, timeout=2)
179 181 content = msg['content']
180 182 nt.assert_true(content['found'])
181 183 nt.assert_equals(content['string_form'], unicode(i))
182 184
183 185 # exit from embed_kernel
184 186 shell.execute("get_ipython().exit_now = True")
185 187 msg = shell.get_msg(block=True, timeout=2)
186 188 time.sleep(0.2)
187 189
188 190
General Comments 0
You need to be logged in to leave comments. Login now