##// END OF EJS Templates
Ensure pythonpath is applied to test env only if present.
Fernando Perez -
Show More
@@ -1,191 +1,192 b''
1 """test IPython.embed_kernel()"""
1 """test IPython.embed_kernel()"""
2
2
3 #-------------------------------------------------------------------------------
3 #-------------------------------------------------------------------------------
4 # Copyright (C) 2012 The IPython Development Team
4 # Copyright (C) 2012 The IPython Development Team
5 #
5 #
6 # Distributed under the terms of the BSD License. The full license is in
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
7 # the file COPYING, distributed as part of this software.
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9
9
10 #-------------------------------------------------------------------------------
10 #-------------------------------------------------------------------------------
11 # Imports
11 # Imports
12 #-------------------------------------------------------------------------------
12 #-------------------------------------------------------------------------------
13
13
14 import os
14 import os
15 import shutil
15 import shutil
16 import sys
16 import sys
17 import tempfile
17 import tempfile
18 import time
18 import time
19
19
20 from contextlib import contextmanager
20 from contextlib import contextmanager
21 from subprocess import Popen, PIPE
21 from subprocess import Popen, PIPE
22
22
23 import nose.tools as nt
23 import nose.tools as nt
24
24
25 from IPython.zmq.blockingkernelmanager import BlockingKernelManager
25 from IPython.zmq.blockingkernelmanager import BlockingKernelManager
26 from IPython.utils import path, py3compat
26 from IPython.utils import path, py3compat
27
27
28 #-------------------------------------------------------------------------------
28 #-------------------------------------------------------------------------------
29 # Tests
29 # Tests
30 #-------------------------------------------------------------------------------
30 #-------------------------------------------------------------------------------
31
31
32 def setup():
32 def setup():
33 """setup temporary IPYTHONDIR for tests"""
33 """setup temporary IPYTHONDIR for tests"""
34 global IPYTHONDIR
34 global IPYTHONDIR
35 global env
35 global env
36 global save_get_ipython_dir
36 global save_get_ipython_dir
37
37
38 IPYTHONDIR = tempfile.mkdtemp()
38 IPYTHONDIR = tempfile.mkdtemp()
39 env = dict(IPYTHONDIR=IPYTHONDIR,
39 env = dict(IPYTHONDIR=IPYTHONDIR)
40 PYTHONPATH=os.environ['PYTHONPATH'])
40 if 'PYTHONPATH' in os.environ:
41 env['PYTHONPATH'] = os.environ['PYTHONPATH']
41 save_get_ipython_dir = path.get_ipython_dir
42 save_get_ipython_dir = path.get_ipython_dir
42 path.get_ipython_dir = lambda : IPYTHONDIR
43 path.get_ipython_dir = lambda : IPYTHONDIR
43
44
44
45
45 def teardown():
46 def teardown():
46 path.get_ipython_dir = save_get_ipython_dir
47 path.get_ipython_dir = save_get_ipython_dir
47
48
48 try:
49 try:
49 shutil.rmtree(IPYTHONDIR)
50 shutil.rmtree(IPYTHONDIR)
50 except (OSError, IOError):
51 except (OSError, IOError):
51 # no such file
52 # no such file
52 pass
53 pass
53
54
54
55
55 @contextmanager
56 @contextmanager
56 def setup_kernel(cmd):
57 def setup_kernel(cmd):
57 """start an embedded kernel in a subprocess, and wait for it to be ready
58 """start an embedded kernel in a subprocess, and wait for it to be ready
58
59
59 Returns
60 Returns
60 -------
61 -------
61 kernel_manager: connected KernelManager instance
62 kernel_manager: connected KernelManager instance
62 """
63 """
63 kernel = Popen([sys.executable, '-c', cmd], stdout=PIPE, stderr=PIPE, env=env)
64 kernel = Popen([sys.executable, '-c', cmd], stdout=PIPE, stderr=PIPE, env=env)
64 connection_file = os.path.join(IPYTHONDIR,
65 connection_file = os.path.join(IPYTHONDIR,
65 'profile_default',
66 'profile_default',
66 'security',
67 'security',
67 'kernel-%i.json' % kernel.pid
68 'kernel-%i.json' % kernel.pid
68 )
69 )
69 # wait for connection file to exist, timeout after 5s
70 # wait for connection file to exist, timeout after 5s
70 tic = time.time()
71 tic = time.time()
71 while not os.path.exists(connection_file) and kernel.poll() is None and time.time() < tic + 10:
72 while not os.path.exists(connection_file) and kernel.poll() is None and time.time() < tic + 10:
72 time.sleep(0.1)
73 time.sleep(0.1)
73
74
74 if kernel.poll() is not None:
75 if kernel.poll() is not None:
75 o,e = kernel.communicate()
76 o,e = kernel.communicate()
76 e = py3compat.cast_unicode(e)
77 e = py3compat.cast_unicode(e)
77 raise IOError("Kernel failed to start:\n%s" % e)
78 raise IOError("Kernel failed to start:\n%s" % e)
78
79
79 if not os.path.exists(connection_file):
80 if not os.path.exists(connection_file):
80 if kernel.poll() is None:
81 if kernel.poll() is None:
81 kernel.terminate()
82 kernel.terminate()
82 raise IOError("Connection file %r never arrived" % connection_file)
83 raise IOError("Connection file %r never arrived" % connection_file)
83
84
84 km = BlockingKernelManager(connection_file=connection_file)
85 km = BlockingKernelManager(connection_file=connection_file)
85 km.load_connection_file()
86 km.load_connection_file()
86 km.start_channels()
87 km.start_channels()
87
88
88 try:
89 try:
89 yield km
90 yield km
90 finally:
91 finally:
91 km.stop_channels()
92 km.stop_channels()
92 kernel.terminate()
93 kernel.terminate()
93
94
94 def test_embed_kernel_basic():
95 def test_embed_kernel_basic():
95 """IPython.embed_kernel() is basically functional"""
96 """IPython.embed_kernel() is basically functional"""
96 cmd = '\n'.join([
97 cmd = '\n'.join([
97 'from IPython import embed_kernel',
98 'from IPython import embed_kernel',
98 'def go():',
99 'def go():',
99 ' a=5',
100 ' a=5',
100 ' b="hi there"',
101 ' b="hi there"',
101 ' embed_kernel()',
102 ' embed_kernel()',
102 'go()',
103 'go()',
103 '',
104 '',
104 ])
105 ])
105
106
106 with setup_kernel(cmd) as km:
107 with setup_kernel(cmd) as km:
107 shell = km.shell_channel
108 shell = km.shell_channel
108
109
109 # oinfo a (int)
110 # oinfo a (int)
110 msg_id = shell.object_info('a')
111 msg_id = shell.object_info('a')
111 msg = shell.get_msg(block=True, timeout=2)
112 msg = shell.get_msg(block=True, timeout=2)
112 content = msg['content']
113 content = msg['content']
113 nt.assert_true(content['found'])
114 nt.assert_true(content['found'])
114
115
115 msg_id = shell.execute("c=a*2")
116 msg_id = shell.execute("c=a*2")
116 msg = shell.get_msg(block=True, timeout=2)
117 msg = shell.get_msg(block=True, timeout=2)
117 content = msg['content']
118 content = msg['content']
118 nt.assert_equals(content['status'], u'ok')
119 nt.assert_equals(content['status'], u'ok')
119
120
120 # oinfo c (should be 10)
121 # oinfo c (should be 10)
121 msg_id = shell.object_info('c')
122 msg_id = shell.object_info('c')
122 msg = shell.get_msg(block=True, timeout=2)
123 msg = shell.get_msg(block=True, timeout=2)
123 content = msg['content']
124 content = msg['content']
124 nt.assert_true(content['found'])
125 nt.assert_true(content['found'])
125 nt.assert_equals(content['string_form'], u'10')
126 nt.assert_equals(content['string_form'], u'10')
126
127
127 def test_embed_kernel_namespace():
128 def test_embed_kernel_namespace():
128 """IPython.embed_kernel() inherits calling namespace"""
129 """IPython.embed_kernel() inherits calling namespace"""
129 cmd = '\n'.join([
130 cmd = '\n'.join([
130 'from IPython import embed_kernel',
131 'from IPython import embed_kernel',
131 'def go():',
132 'def go():',
132 ' a=5',
133 ' a=5',
133 ' b="hi there"',
134 ' b="hi there"',
134 ' embed_kernel()',
135 ' embed_kernel()',
135 'go()',
136 'go()',
136 '',
137 '',
137 ])
138 ])
138
139
139 with setup_kernel(cmd) as km:
140 with setup_kernel(cmd) as km:
140 shell = km.shell_channel
141 shell = km.shell_channel
141
142
142 # oinfo a (int)
143 # oinfo a (int)
143 msg_id = shell.object_info('a')
144 msg_id = shell.object_info('a')
144 msg = shell.get_msg(block=True, timeout=2)
145 msg = shell.get_msg(block=True, timeout=2)
145 content = msg['content']
146 content = msg['content']
146 nt.assert_true(content['found'])
147 nt.assert_true(content['found'])
147 nt.assert_equals(content['string_form'], u'5')
148 nt.assert_equals(content['string_form'], u'5')
148
149
149 # oinfo b (str)
150 # oinfo b (str)
150 msg_id = shell.object_info('b')
151 msg_id = shell.object_info('b')
151 msg = shell.get_msg(block=True, timeout=2)
152 msg = shell.get_msg(block=True, timeout=2)
152 content = msg['content']
153 content = msg['content']
153 nt.assert_true(content['found'])
154 nt.assert_true(content['found'])
154 nt.assert_equals(content['string_form'], u'hi there')
155 nt.assert_equals(content['string_form'], u'hi there')
155
156
156 # oinfo c (undefined)
157 # oinfo c (undefined)
157 msg_id = shell.object_info('c')
158 msg_id = shell.object_info('c')
158 msg = shell.get_msg(block=True, timeout=2)
159 msg = shell.get_msg(block=True, timeout=2)
159 content = msg['content']
160 content = msg['content']
160 nt.assert_false(content['found'])
161 nt.assert_false(content['found'])
161
162
162 def test_embed_kernel_reentrant():
163 def test_embed_kernel_reentrant():
163 """IPython.embed_kernel() can be called multiple times"""
164 """IPython.embed_kernel() can be called multiple times"""
164 cmd = '\n'.join([
165 cmd = '\n'.join([
165 'from IPython import embed_kernel',
166 'from IPython import embed_kernel',
166 'count = 0',
167 'count = 0',
167 'def go():',
168 'def go():',
168 ' global count',
169 ' global count',
169 ' embed_kernel()',
170 ' embed_kernel()',
170 ' count = count + 1',
171 ' count = count + 1',
171 '',
172 '',
172 'while True:'
173 'while True:'
173 ' go()',
174 ' go()',
174 '',
175 '',
175 ])
176 ])
176
177
177 with setup_kernel(cmd) as km:
178 with setup_kernel(cmd) as km:
178 shell = km.shell_channel
179 shell = km.shell_channel
179 for i in range(5):
180 for i in range(5):
180 msg_id = shell.object_info('count')
181 msg_id = shell.object_info('count')
181 msg = shell.get_msg(block=True, timeout=2)
182 msg = shell.get_msg(block=True, timeout=2)
182 content = msg['content']
183 content = msg['content']
183 nt.assert_true(content['found'])
184 nt.assert_true(content['found'])
184 nt.assert_equals(content['string_form'], unicode(i))
185 nt.assert_equals(content['string_form'], unicode(i))
185
186
186 # exit from embed_kernel
187 # exit from embed_kernel
187 shell.execute("get_ipython().exit_now = True")
188 shell.execute("get_ipython().exit_now = True")
188 msg = shell.get_msg(block=True, timeout=2)
189 msg = shell.get_msg(block=True, timeout=2)
189 time.sleep(0.2)
190 time.sleep(0.2)
190
191
191
192
General Comments 0
You need to be logged in to leave comments. Login now