##// END OF EJS Templates
update embed_kernel test to Client API
MinRK -
Show More
@@ -1,193 +1,193 b''
1 1 """test IPython.embed_kernel()"""
2 2
3 3 #-------------------------------------------------------------------------------
4 4 # Copyright (C) 2012 The IPython Development Team
5 5 #
6 6 # Distributed under the terms of the BSD License. The full license is in
7 7 # the file COPYING, distributed as part of this software.
8 8 #-------------------------------------------------------------------------------
9 9
10 10 #-------------------------------------------------------------------------------
11 11 # Imports
12 12 #-------------------------------------------------------------------------------
13 13
14 14 import os
15 15 import shutil
16 16 import sys
17 17 import tempfile
18 18 import time
19 19
20 20 from contextlib import contextmanager
21 21 from subprocess import Popen, PIPE
22 22
23 23 import nose.tools as nt
24 24
25 from IPython.kernel.blocking import BlockingKernelManager
25 from IPython.kernel import BlockingKernelClient
26 26 from IPython.utils import path, py3compat
27 27
28 28 #-------------------------------------------------------------------------------
29 29 # Tests
30 30 #-------------------------------------------------------------------------------
31 31
32 32 def setup():
33 33 """setup temporary IPYTHONDIR for tests"""
34 34 global IPYTHONDIR
35 35 global env
36 36 global save_get_ipython_dir
37 37
38 38 IPYTHONDIR = tempfile.mkdtemp()
39 39
40 40 env = os.environ.copy()
41 41 env["IPYTHONDIR"] = IPYTHONDIR
42 42
43 43 save_get_ipython_dir = path.get_ipython_dir
44 44 path.get_ipython_dir = lambda : IPYTHONDIR
45 45
46 46
47 47 def teardown():
48 48 path.get_ipython_dir = save_get_ipython_dir
49 49
50 50 try:
51 51 shutil.rmtree(IPYTHONDIR)
52 52 except (OSError, IOError):
53 53 # no such file
54 54 pass
55 55
56 56
57 57 @contextmanager
58 58 def setup_kernel(cmd):
59 59 """start an embedded kernel in a subprocess, and wait for it to be ready
60 60
61 61 Returns
62 62 -------
63 63 kernel_manager: connected KernelManager instance
64 64 """
65 65 kernel = Popen([sys.executable, '-c', cmd], stdout=PIPE, stderr=PIPE, env=env)
66 66 connection_file = os.path.join(IPYTHONDIR,
67 67 'profile_default',
68 68 'security',
69 69 'kernel-%i.json' % kernel.pid
70 70 )
71 71 # wait for connection file to exist, timeout after 5s
72 72 tic = time.time()
73 73 while not os.path.exists(connection_file) and kernel.poll() is None and time.time() < tic + 10:
74 74 time.sleep(0.1)
75 75
76 76 if kernel.poll() is not None:
77 77 o,e = kernel.communicate()
78 78 e = py3compat.cast_unicode(e)
79 79 raise IOError("Kernel failed to start:\n%s" % e)
80 80
81 81 if not os.path.exists(connection_file):
82 82 if kernel.poll() is None:
83 83 kernel.terminate()
84 84 raise IOError("Connection file %r never arrived" % connection_file)
85 85
86 km = BlockingKernelManager(connection_file=connection_file)
87 km.load_connection_file()
88 km.start_channels()
86 client = BlockingKernelClient(connection_file=connection_file)
87 client.load_connection_file()
88 client.start_channels()
89 89
90 90 try:
91 yield km
91 yield client
92 92 finally:
93 km.stop_channels()
93 client.stop_channels()
94 94 kernel.terminate()
95 95
96 96 def test_embed_kernel_basic():
97 97 """IPython.embed_kernel() is basically functional"""
98 98 cmd = '\n'.join([
99 99 'from IPython import embed_kernel',
100 100 'def go():',
101 101 ' a=5',
102 102 ' b="hi there"',
103 103 ' embed_kernel()',
104 104 'go()',
105 105 '',
106 106 ])
107 107
108 with setup_kernel(cmd) as km:
109 shell = km.shell_channel
108 with setup_kernel(cmd) as client:
109 shell = client.shell_channel
110 110
111 111 # oinfo a (int)
112 112 msg_id = shell.object_info('a')
113 113 msg = shell.get_msg(block=True, timeout=2)
114 114 content = msg['content']
115 115 nt.assert_true(content['found'])
116 116
117 117 msg_id = shell.execute("c=a*2")
118 118 msg = shell.get_msg(block=True, timeout=2)
119 119 content = msg['content']
120 120 nt.assert_equal(content['status'], u'ok')
121 121
122 122 # oinfo c (should be 10)
123 123 msg_id = shell.object_info('c')
124 124 msg = shell.get_msg(block=True, timeout=2)
125 125 content = msg['content']
126 126 nt.assert_true(content['found'])
127 127 nt.assert_equal(content['string_form'], u'10')
128 128
129 129 def test_embed_kernel_namespace():
130 130 """IPython.embed_kernel() inherits calling namespace"""
131 131 cmd = '\n'.join([
132 132 'from IPython import embed_kernel',
133 133 'def go():',
134 134 ' a=5',
135 135 ' b="hi there"',
136 136 ' embed_kernel()',
137 137 'go()',
138 138 '',
139 139 ])
140 140
141 with setup_kernel(cmd) as km:
142 shell = km.shell_channel
141 with setup_kernel(cmd) as client:
142 shell = client.shell_channel
143 143
144 144 # oinfo a (int)
145 145 msg_id = shell.object_info('a')
146 146 msg = shell.get_msg(block=True, timeout=2)
147 147 content = msg['content']
148 148 nt.assert_true(content['found'])
149 149 nt.assert_equal(content['string_form'], u'5')
150 150
151 151 # oinfo b (str)
152 152 msg_id = shell.object_info('b')
153 153 msg = shell.get_msg(block=True, timeout=2)
154 154 content = msg['content']
155 155 nt.assert_true(content['found'])
156 156 nt.assert_equal(content['string_form'], u'hi there')
157 157
158 158 # oinfo c (undefined)
159 159 msg_id = shell.object_info('c')
160 160 msg = shell.get_msg(block=True, timeout=2)
161 161 content = msg['content']
162 162 nt.assert_false(content['found'])
163 163
164 164 def test_embed_kernel_reentrant():
165 165 """IPython.embed_kernel() can be called multiple times"""
166 166 cmd = '\n'.join([
167 167 'from IPython import embed_kernel',
168 168 'count = 0',
169 169 'def go():',
170 170 ' global count',
171 171 ' embed_kernel()',
172 172 ' count = count + 1',
173 173 '',
174 174 'while True:'
175 175 ' go()',
176 176 '',
177 177 ])
178 178
179 with setup_kernel(cmd) as km:
180 shell = km.shell_channel
179 with setup_kernel(cmd) as client:
180 shell = client.shell_channel
181 181 for i in range(5):
182 182 msg_id = shell.object_info('count')
183 183 msg = shell.get_msg(block=True, timeout=2)
184 184 content = msg['content']
185 185 nt.assert_true(content['found'])
186 186 nt.assert_equal(content['string_form'], unicode(i))
187 187
188 188 # exit from embed_kernel
189 189 shell.execute("get_ipython().exit_now = True")
190 190 msg = shell.get_msg(block=True, timeout=2)
191 191 time.sleep(0.2)
192 192
193 193
General Comments 0
You need to be logged in to leave comments. Login now