##// END OF EJS Templates
test re-entrant embed_kernel
MinRK -
Show More
@@ -1,158 +1,188 b''
1 1 """test IPython.embed_kernel()"""
2 2
3 3 #-------------------------------------------------------------------------------
4 4 # Copyright (C) 2012 The IPython Development Team
5 5 #
6 6 # Distributed under the terms of the BSD License. The full license is in
7 7 # the file COPYING, distributed as part of this software.
8 8 #-------------------------------------------------------------------------------
9 9
10 10 #-------------------------------------------------------------------------------
11 11 # Imports
12 12 #-------------------------------------------------------------------------------
13 13
14 14 import os
15 15 import shutil
16 16 import sys
17 17 import tempfile
18 18 import time
19 19
20 20 from contextlib import contextmanager
21 21 from subprocess import Popen, PIPE
22 22
23 23 import nose.tools as nt
24 24
25 25 from IPython.zmq.blockingkernelmanager import BlockingKernelManager
26 26 from IPython.utils import path
27 27
28 28
29 29 #-------------------------------------------------------------------------------
30 30 # Tests
31 31 #-------------------------------------------------------------------------------
32 32
33 33 def setup():
34 34 """setup temporary IPYTHONDIR for tests"""
35 35 global IPYTHONDIR
36 36 global env
37 37 global save_get_ipython_dir
38 38
39 39 IPYTHONDIR = tempfile.mkdtemp()
40 40 env = dict(IPYTHONDIR=IPYTHONDIR)
41 41 save_get_ipython_dir = path.get_ipython_dir
42 42 path.get_ipython_dir = lambda : IPYTHONDIR
43 43
44 44
45 45 def teardown():
46 46 path.get_ipython_dir = save_get_ipython_dir
47 47
48 48 try:
49 49 shutil.rmtree(IPYTHONDIR)
50 50 except (OSError, IOError):
51 51 # no such file
52 52 pass
53 53
54 54
55 55 @contextmanager
56 56 def setup_kernel(cmd):
57 57 """start an embedded kernel in a subprocess, and wait for it to be ready
58 58
59 59 Returns
60 60 -------
61 61 kernel_manager: connected KernelManager instance
62 62 """
63 63 kernel = Popen([sys.executable, '-c', cmd], stdout=PIPE, stderr=PIPE, env=env)
64 64 connection_file = os.path.join(IPYTHONDIR,
65 65 'profile_default',
66 66 'security',
67 67 'kernel-%i.json' % kernel.pid
68 68 )
69 69 # wait for connection file to exist, timeout after 5s
70 70 tic = time.time()
71 71 while not os.path.exists(connection_file) and kernel.poll() is None and time.time() < tic + 5:
72 72 time.sleep(0.1)
73 73
74 74 if not os.path.exists(connection_file):
75 75 if kernel.poll() is None:
76 76 kernel.terminate()
77 77 raise IOError("Connection file %r never arrived" % connection_file)
78 78
79 79 if kernel.poll() is not None:
80 80 raise IOError("Kernel failed to start")
81 81
82 82 km = BlockingKernelManager(connection_file=connection_file)
83 83 km.load_connection_file()
84 84 km.start_channels()
85 85
86 86 try:
87 87 yield km
88 88 finally:
89 89 km.stop_channels()
90 90
91 91 def test_embed_kernel_basic():
92 92 """IPython.embed_kernel() is basically functional"""
93 93 cmd = '\n'.join([
94 94 'from IPython import embed_kernel',
95 95 'def go():',
96 96 ' a=5',
97 97 ' b="hi there"',
98 98 ' embed_kernel()',
99 99 'go()',
100 100 '',
101 101 ])
102 102
103 103 with setup_kernel(cmd) as km:
104 104 shell = km.shell_channel
105 105
106 106 # oinfo a (int)
107 107 msg_id = shell.object_info('a')
108 108 msg = shell.get_msg(block=True, timeout=2)
109 109 content = msg['content']
110 110 nt.assert_true(content['found'])
111 111
112 112 msg_id = shell.execute("c=a*2")
113 113 msg = shell.get_msg(block=True, timeout=2)
114 114 content = msg['content']
115 115 nt.assert_equals(content['status'], u'ok')
116 116
117 117 # oinfo c (should be 10)
118 118 msg_id = shell.object_info('c')
119 119 msg = shell.get_msg(block=True, timeout=2)
120 120 content = msg['content']
121 121 nt.assert_true(content['found'])
122 122 nt.assert_equals(content['string_form'], u'10')
123 123
124 124 def test_embed_kernel_namespace():
125 125 """IPython.embed_kernel() inherits calling namespace"""
126 126 cmd = '\n'.join([
127 127 'from IPython import embed_kernel',
128 128 'def go():',
129 129 ' a=5',
130 130 ' b="hi there"',
131 131 ' embed_kernel()',
132 132 'go()',
133 133 '',
134 134 ])
135 135
136 136 with setup_kernel(cmd) as km:
137 137 shell = km.shell_channel
138 138
139 139 # oinfo a (int)
140 140 msg_id = shell.object_info('a')
141 141 msg = shell.get_msg(block=True, timeout=2)
142 142 content = msg['content']
143 143 nt.assert_true(content['found'])
144 144 nt.assert_equals(content['string_form'], u'5')
145 145
146 146 # oinfo b (str)
147 147 msg_id = shell.object_info('b')
148 148 msg = shell.get_msg(block=True, timeout=2)
149 149 content = msg['content']
150 150 nt.assert_true(content['found'])
151 151 nt.assert_equals(content['string_form'], u'hi there')
152 152
153 153 # oinfo c (undefined)
154 154 msg_id = shell.object_info('c')
155 155 msg = shell.get_msg(block=True, timeout=2)
156 156 content = msg['content']
157 157 nt.assert_false(content['found'])
158 158
159 def test_embed_kernel_reentrant():
160 """IPython.embed_kernel() can be called multiple times"""
161 cmd = '\n'.join([
162 'from IPython import embed_kernel',
163 'count = 0',
164 'def go():',
165 ' global count',
166 ' embed_kernel()',
167 ' count = count + 1',
168 '',
169 'while True:'
170 ' go()',
171 '',
172 ])
173
174 with setup_kernel(cmd) as km:
175 shell = km.shell_channel
176 for i in range(5):
177 msg_id = shell.object_info('count')
178 msg = shell.get_msg(block=True, timeout=2)
179 content = msg['content']
180 nt.assert_true(content['found'])
181 nt.assert_equals(content['string_form'], unicode(i))
182
183 # exit from embed_kernel
184 shell.execute("get_ipython().exit_now = True")
185 msg = shell.get_msg(block=True, timeout=2)
186 time.sleep(0.2)
187
188
General Comments 0
You need to be logged in to leave comments. Login now