##// END OF EJS Templates
change to pass whole env + temporary IPYTHONDIR
Jörgen Stenarson -
Show More
@@ -1,196 +1,193 b''
1 1 """test IPython.embed_kernel()"""
2 2
3 3 #-------------------------------------------------------------------------------
4 4 # Copyright (C) 2012 The IPython Development Team
5 5 #
6 6 # Distributed under the terms of the BSD License. The full license is in
7 7 # the file COPYING, distributed as part of this software.
8 8 #-------------------------------------------------------------------------------
9 9
10 10 #-------------------------------------------------------------------------------
11 11 # Imports
12 12 #-------------------------------------------------------------------------------
13 13
14 14 import os
15 15 import shutil
16 16 import sys
17 17 import tempfile
18 18 import time
19 19
20 20 from contextlib import contextmanager
21 21 from subprocess import Popen, PIPE
22 22
23 23 import nose.tools as nt
24 24
25 25 from IPython.zmq.blockingkernelmanager import BlockingKernelManager
26 26 from IPython.utils import path, py3compat
27 27
28 28 #-------------------------------------------------------------------------------
29 29 # Tests
30 30 #-------------------------------------------------------------------------------
31 31
32 32 def setup():
33 33 """setup temporary IPYTHONDIR for tests"""
34 34 global IPYTHONDIR
35 35 global env
36 36 global save_get_ipython_dir
37 37
38 38 IPYTHONDIR = tempfile.mkdtemp()
39 39
40 env = dict(IPYTHONDIR=IPYTHONDIR)
41 if 'PYTHONPATH' in os.environ:
42 env['PYTHONPATH'] = os.environ['PYTHONPATH']
43 if sys.platform == 'win32':
44 env["SYSTEMROOT"] = os.environ["SYSTEMROOT"]
40 env = os.environ.copy()
41 env["IPYTHONDIR"] = IPYTHONDIR
45 42
46 43 save_get_ipython_dir = path.get_ipython_dir
47 44 path.get_ipython_dir = lambda : IPYTHONDIR
48 45
49 46
50 47 def teardown():
51 48 path.get_ipython_dir = save_get_ipython_dir
52 49
53 50 try:
54 51 shutil.rmtree(IPYTHONDIR)
55 52 except (OSError, IOError):
56 53 # no such file
57 54 pass
58 55
59 56
60 57 @contextmanager
61 58 def setup_kernel(cmd):
62 59 """start an embedded kernel in a subprocess, and wait for it to be ready
63 60
64 61 Returns
65 62 -------
66 63 kernel_manager: connected KernelManager instance
67 64 """
68 65 kernel = Popen([sys.executable, '-c', cmd], stdout=PIPE, stderr=PIPE, env=env)
69 66 connection_file = os.path.join(IPYTHONDIR,
70 67 'profile_default',
71 68 'security',
72 69 'kernel-%i.json' % kernel.pid
73 70 )
74 71 # wait for connection file to exist, timeout after 5s
75 72 tic = time.time()
76 73 while not os.path.exists(connection_file) and kernel.poll() is None and time.time() < tic + 10:
77 74 time.sleep(0.1)
78 75
79 76 if kernel.poll() is not None:
80 77 o,e = kernel.communicate()
81 78 e = py3compat.cast_unicode(e)
82 79 raise IOError("Kernel failed to start:\n%s" % e)
83 80
84 81 if not os.path.exists(connection_file):
85 82 if kernel.poll() is None:
86 83 kernel.terminate()
87 84 raise IOError("Connection file %r never arrived" % connection_file)
88 85
89 86 km = BlockingKernelManager(connection_file=connection_file)
90 87 km.load_connection_file()
91 88 km.start_channels()
92 89
93 90 try:
94 91 yield km
95 92 finally:
96 93 km.stop_channels()
97 94 kernel.terminate()
98 95
99 96 def test_embed_kernel_basic():
100 97 """IPython.embed_kernel() is basically functional"""
101 98 cmd = '\n'.join([
102 99 'from IPython import embed_kernel',
103 100 'def go():',
104 101 ' a=5',
105 102 ' b="hi there"',
106 103 ' embed_kernel()',
107 104 'go()',
108 105 '',
109 106 ])
110 107
111 108 with setup_kernel(cmd) as km:
112 109 shell = km.shell_channel
113 110
114 111 # oinfo a (int)
115 112 msg_id = shell.object_info('a')
116 113 msg = shell.get_msg(block=True, timeout=2)
117 114 content = msg['content']
118 115 nt.assert_true(content['found'])
119 116
120 117 msg_id = shell.execute("c=a*2")
121 118 msg = shell.get_msg(block=True, timeout=2)
122 119 content = msg['content']
123 120 nt.assert_equals(content['status'], u'ok')
124 121
125 122 # oinfo c (should be 10)
126 123 msg_id = shell.object_info('c')
127 124 msg = shell.get_msg(block=True, timeout=2)
128 125 content = msg['content']
129 126 nt.assert_true(content['found'])
130 127 nt.assert_equals(content['string_form'], u'10')
131 128
132 129 def test_embed_kernel_namespace():
133 130 """IPython.embed_kernel() inherits calling namespace"""
134 131 cmd = '\n'.join([
135 132 'from IPython import embed_kernel',
136 133 'def go():',
137 134 ' a=5',
138 135 ' b="hi there"',
139 136 ' embed_kernel()',
140 137 'go()',
141 138 '',
142 139 ])
143 140
144 141 with setup_kernel(cmd) as km:
145 142 shell = km.shell_channel
146 143
147 144 # oinfo a (int)
148 145 msg_id = shell.object_info('a')
149 146 msg = shell.get_msg(block=True, timeout=2)
150 147 content = msg['content']
151 148 nt.assert_true(content['found'])
152 149 nt.assert_equals(content['string_form'], u'5')
153 150
154 151 # oinfo b (str)
155 152 msg_id = shell.object_info('b')
156 153 msg = shell.get_msg(block=True, timeout=2)
157 154 content = msg['content']
158 155 nt.assert_true(content['found'])
159 156 nt.assert_equals(content['string_form'], u'hi there')
160 157
161 158 # oinfo c (undefined)
162 159 msg_id = shell.object_info('c')
163 160 msg = shell.get_msg(block=True, timeout=2)
164 161 content = msg['content']
165 162 nt.assert_false(content['found'])
166 163
167 164 def test_embed_kernel_reentrant():
168 165 """IPython.embed_kernel() can be called multiple times"""
169 166 cmd = '\n'.join([
170 167 'from IPython import embed_kernel',
171 168 'count = 0',
172 169 'def go():',
173 170 ' global count',
174 171 ' embed_kernel()',
175 172 ' count = count + 1',
176 173 '',
177 174 'while True:'
178 175 ' go()',
179 176 '',
180 177 ])
181 178
182 179 with setup_kernel(cmd) as km:
183 180 shell = km.shell_channel
184 181 for i in range(5):
185 182 msg_id = shell.object_info('count')
186 183 msg = shell.get_msg(block=True, timeout=2)
187 184 content = msg['content']
188 185 nt.assert_true(content['found'])
189 186 nt.assert_equals(content['string_form'], unicode(i))
190 187
191 188 # exit from embed_kernel
192 189 shell.execute("get_ipython().exit_now = True")
193 190 msg = shell.get_msg(block=True, timeout=2)
194 191 time.sleep(0.2)
195 192
196 193
General Comments 0
You need to be logged in to leave comments. Login now