##// END OF EJS Templates
change to pass whole env + temporary IPYTHONDIR
Jörgen Stenarson -
Show More
@@ -1,196 +1,193
1 """test IPython.embed_kernel()"""
1 """test IPython.embed_kernel()"""
2
2
3 #-------------------------------------------------------------------------------
3 #-------------------------------------------------------------------------------
4 # Copyright (C) 2012 The IPython Development Team
4 # Copyright (C) 2012 The IPython Development Team
5 #
5 #
6 # Distributed under the terms of the BSD License. The full license is in
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
7 # the file COPYING, distributed as part of this software.
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9
9
10 #-------------------------------------------------------------------------------
10 #-------------------------------------------------------------------------------
11 # Imports
11 # Imports
12 #-------------------------------------------------------------------------------
12 #-------------------------------------------------------------------------------
13
13
14 import os
14 import os
15 import shutil
15 import shutil
16 import sys
16 import sys
17 import tempfile
17 import tempfile
18 import time
18 import time
19
19
20 from contextlib import contextmanager
20 from contextlib import contextmanager
21 from subprocess import Popen, PIPE
21 from subprocess import Popen, PIPE
22
22
23 import nose.tools as nt
23 import nose.tools as nt
24
24
25 from IPython.zmq.blockingkernelmanager import BlockingKernelManager
25 from IPython.zmq.blockingkernelmanager import BlockingKernelManager
26 from IPython.utils import path, py3compat
26 from IPython.utils import path, py3compat
27
27
28 #-------------------------------------------------------------------------------
28 #-------------------------------------------------------------------------------
29 # Tests
29 # Tests
30 #-------------------------------------------------------------------------------
30 #-------------------------------------------------------------------------------
31
31
32 def setup():
32 def setup():
33 """setup temporary IPYTHONDIR for tests"""
33 """setup temporary IPYTHONDIR for tests"""
34 global IPYTHONDIR
34 global IPYTHONDIR
35 global env
35 global env
36 global save_get_ipython_dir
36 global save_get_ipython_dir
37
37
38 IPYTHONDIR = tempfile.mkdtemp()
38 IPYTHONDIR = tempfile.mkdtemp()
39
39
40 env = dict(IPYTHONDIR=IPYTHONDIR)
40 env = os.environ.copy()
41 if 'PYTHONPATH' in os.environ:
41 env["IPYTHONDIR"] = IPYTHONDIR
42 env['PYTHONPATH'] = os.environ['PYTHONPATH']
43 if sys.platform == 'win32':
44 env["SYSTEMROOT"] = os.environ["SYSTEMROOT"]
45
42
46 save_get_ipython_dir = path.get_ipython_dir
43 save_get_ipython_dir = path.get_ipython_dir
47 path.get_ipython_dir = lambda : IPYTHONDIR
44 path.get_ipython_dir = lambda : IPYTHONDIR
48
45
49
46
50 def teardown():
47 def teardown():
51 path.get_ipython_dir = save_get_ipython_dir
48 path.get_ipython_dir = save_get_ipython_dir
52
49
53 try:
50 try:
54 shutil.rmtree(IPYTHONDIR)
51 shutil.rmtree(IPYTHONDIR)
55 except (OSError, IOError):
52 except (OSError, IOError):
56 # no such file
53 # no such file
57 pass
54 pass
58
55
59
56
60 @contextmanager
57 @contextmanager
61 def setup_kernel(cmd):
58 def setup_kernel(cmd):
62 """start an embedded kernel in a subprocess, and wait for it to be ready
59 """start an embedded kernel in a subprocess, and wait for it to be ready
63
60
64 Returns
61 Returns
65 -------
62 -------
66 kernel_manager: connected KernelManager instance
63 kernel_manager: connected KernelManager instance
67 """
64 """
68 kernel = Popen([sys.executable, '-c', cmd], stdout=PIPE, stderr=PIPE, env=env)
65 kernel = Popen([sys.executable, '-c', cmd], stdout=PIPE, stderr=PIPE, env=env)
69 connection_file = os.path.join(IPYTHONDIR,
66 connection_file = os.path.join(IPYTHONDIR,
70 'profile_default',
67 'profile_default',
71 'security',
68 'security',
72 'kernel-%i.json' % kernel.pid
69 'kernel-%i.json' % kernel.pid
73 )
70 )
74 # wait for connection file to exist, timeout after 5s
71 # wait for connection file to exist, timeout after 5s
75 tic = time.time()
72 tic = time.time()
76 while not os.path.exists(connection_file) and kernel.poll() is None and time.time() < tic + 10:
73 while not os.path.exists(connection_file) and kernel.poll() is None and time.time() < tic + 10:
77 time.sleep(0.1)
74 time.sleep(0.1)
78
75
79 if kernel.poll() is not None:
76 if kernel.poll() is not None:
80 o,e = kernel.communicate()
77 o,e = kernel.communicate()
81 e = py3compat.cast_unicode(e)
78 e = py3compat.cast_unicode(e)
82 raise IOError("Kernel failed to start:\n%s" % e)
79 raise IOError("Kernel failed to start:\n%s" % e)
83
80
84 if not os.path.exists(connection_file):
81 if not os.path.exists(connection_file):
85 if kernel.poll() is None:
82 if kernel.poll() is None:
86 kernel.terminate()
83 kernel.terminate()
87 raise IOError("Connection file %r never arrived" % connection_file)
84 raise IOError("Connection file %r never arrived" % connection_file)
88
85
89 km = BlockingKernelManager(connection_file=connection_file)
86 km = BlockingKernelManager(connection_file=connection_file)
90 km.load_connection_file()
87 km.load_connection_file()
91 km.start_channels()
88 km.start_channels()
92
89
93 try:
90 try:
94 yield km
91 yield km
95 finally:
92 finally:
96 km.stop_channels()
93 km.stop_channels()
97 kernel.terminate()
94 kernel.terminate()
98
95
99 def test_embed_kernel_basic():
96 def test_embed_kernel_basic():
100 """IPython.embed_kernel() is basically functional"""
97 """IPython.embed_kernel() is basically functional"""
101 cmd = '\n'.join([
98 cmd = '\n'.join([
102 'from IPython import embed_kernel',
99 'from IPython import embed_kernel',
103 'def go():',
100 'def go():',
104 ' a=5',
101 ' a=5',
105 ' b="hi there"',
102 ' b="hi there"',
106 ' embed_kernel()',
103 ' embed_kernel()',
107 'go()',
104 'go()',
108 '',
105 '',
109 ])
106 ])
110
107
111 with setup_kernel(cmd) as km:
108 with setup_kernel(cmd) as km:
112 shell = km.shell_channel
109 shell = km.shell_channel
113
110
114 # oinfo a (int)
111 # oinfo a (int)
115 msg_id = shell.object_info('a')
112 msg_id = shell.object_info('a')
116 msg = shell.get_msg(block=True, timeout=2)
113 msg = shell.get_msg(block=True, timeout=2)
117 content = msg['content']
114 content = msg['content']
118 nt.assert_true(content['found'])
115 nt.assert_true(content['found'])
119
116
120 msg_id = shell.execute("c=a*2")
117 msg_id = shell.execute("c=a*2")
121 msg = shell.get_msg(block=True, timeout=2)
118 msg = shell.get_msg(block=True, timeout=2)
122 content = msg['content']
119 content = msg['content']
123 nt.assert_equals(content['status'], u'ok')
120 nt.assert_equals(content['status'], u'ok')
124
121
125 # oinfo c (should be 10)
122 # oinfo c (should be 10)
126 msg_id = shell.object_info('c')
123 msg_id = shell.object_info('c')
127 msg = shell.get_msg(block=True, timeout=2)
124 msg = shell.get_msg(block=True, timeout=2)
128 content = msg['content']
125 content = msg['content']
129 nt.assert_true(content['found'])
126 nt.assert_true(content['found'])
130 nt.assert_equals(content['string_form'], u'10')
127 nt.assert_equals(content['string_form'], u'10')
131
128
132 def test_embed_kernel_namespace():
129 def test_embed_kernel_namespace():
133 """IPython.embed_kernel() inherits calling namespace"""
130 """IPython.embed_kernel() inherits calling namespace"""
134 cmd = '\n'.join([
131 cmd = '\n'.join([
135 'from IPython import embed_kernel',
132 'from IPython import embed_kernel',
136 'def go():',
133 'def go():',
137 ' a=5',
134 ' a=5',
138 ' b="hi there"',
135 ' b="hi there"',
139 ' embed_kernel()',
136 ' embed_kernel()',
140 'go()',
137 'go()',
141 '',
138 '',
142 ])
139 ])
143
140
144 with setup_kernel(cmd) as km:
141 with setup_kernel(cmd) as km:
145 shell = km.shell_channel
142 shell = km.shell_channel
146
143
147 # oinfo a (int)
144 # oinfo a (int)
148 msg_id = shell.object_info('a')
145 msg_id = shell.object_info('a')
149 msg = shell.get_msg(block=True, timeout=2)
146 msg = shell.get_msg(block=True, timeout=2)
150 content = msg['content']
147 content = msg['content']
151 nt.assert_true(content['found'])
148 nt.assert_true(content['found'])
152 nt.assert_equals(content['string_form'], u'5')
149 nt.assert_equals(content['string_form'], u'5')
153
150
154 # oinfo b (str)
151 # oinfo b (str)
155 msg_id = shell.object_info('b')
152 msg_id = shell.object_info('b')
156 msg = shell.get_msg(block=True, timeout=2)
153 msg = shell.get_msg(block=True, timeout=2)
157 content = msg['content']
154 content = msg['content']
158 nt.assert_true(content['found'])
155 nt.assert_true(content['found'])
159 nt.assert_equals(content['string_form'], u'hi there')
156 nt.assert_equals(content['string_form'], u'hi there')
160
157
161 # oinfo c (undefined)
158 # oinfo c (undefined)
162 msg_id = shell.object_info('c')
159 msg_id = shell.object_info('c')
163 msg = shell.get_msg(block=True, timeout=2)
160 msg = shell.get_msg(block=True, timeout=2)
164 content = msg['content']
161 content = msg['content']
165 nt.assert_false(content['found'])
162 nt.assert_false(content['found'])
166
163
167 def test_embed_kernel_reentrant():
164 def test_embed_kernel_reentrant():
168 """IPython.embed_kernel() can be called multiple times"""
165 """IPython.embed_kernel() can be called multiple times"""
169 cmd = '\n'.join([
166 cmd = '\n'.join([
170 'from IPython import embed_kernel',
167 'from IPython import embed_kernel',
171 'count = 0',
168 'count = 0',
172 'def go():',
169 'def go():',
173 ' global count',
170 ' global count',
174 ' embed_kernel()',
171 ' embed_kernel()',
175 ' count = count + 1',
172 ' count = count + 1',
176 '',
173 '',
177 'while True:'
174 'while True:'
178 ' go()',
175 ' go()',
179 '',
176 '',
180 ])
177 ])
181
178
182 with setup_kernel(cmd) as km:
179 with setup_kernel(cmd) as km:
183 shell = km.shell_channel
180 shell = km.shell_channel
184 for i in range(5):
181 for i in range(5):
185 msg_id = shell.object_info('count')
182 msg_id = shell.object_info('count')
186 msg = shell.get_msg(block=True, timeout=2)
183 msg = shell.get_msg(block=True, timeout=2)
187 content = msg['content']
184 content = msg['content']
188 nt.assert_true(content['found'])
185 nt.assert_true(content['found'])
189 nt.assert_equals(content['string_form'], unicode(i))
186 nt.assert_equals(content['string_form'], unicode(i))
190
187
191 # exit from embed_kernel
188 # exit from embed_kernel
192 shell.execute("get_ipython().exit_now = True")
189 shell.execute("get_ipython().exit_now = True")
193 msg = shell.get_msg(block=True, timeout=2)
190 msg = shell.get_msg(block=True, timeout=2)
194 time.sleep(0.2)
191 time.sleep(0.2)
195
192
196
193
General Comments 0
You need to be logged in to leave comments. Login now