##// END OF EJS Templates
ensure kernels are cleaned up in embed_kernel tests
MinRK -
Show More
@@ -1,153 +1,158 b''
1 """test IPython.embed_kernel()"""
1 """test IPython.embed_kernel()"""
2
2
3 #-------------------------------------------------------------------------------
3 #-------------------------------------------------------------------------------
4 # Copyright (C) 2012 The IPython Development Team
4 # Copyright (C) 2012 The IPython Development Team
5 #
5 #
6 # Distributed under the terms of the BSD License. The full license is in
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
7 # the file COPYING, distributed as part of this software.
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9
9
10 #-------------------------------------------------------------------------------
10 #-------------------------------------------------------------------------------
11 # Imports
11 # Imports
12 #-------------------------------------------------------------------------------
12 #-------------------------------------------------------------------------------
13
13
14 import os
14 import os
15 import shutil
15 import shutil
16 import sys
16 import sys
17 import tempfile
17 import tempfile
18 import time
18 import time
19
19
20 from contextlib import contextmanager
20 from subprocess import Popen, PIPE
21 from subprocess import Popen, PIPE
21
22
22 import nose.tools as nt
23 import nose.tools as nt
23
24
24 from IPython.zmq.blockingkernelmanager import BlockingKernelManager
25 from IPython.zmq.blockingkernelmanager import BlockingKernelManager
25 from IPython.utils import path
26 from IPython.utils import path
26
27
27
28
28 #-------------------------------------------------------------------------------
29 #-------------------------------------------------------------------------------
29 # Tests
30 # Tests
30 #-------------------------------------------------------------------------------
31 #-------------------------------------------------------------------------------
31
32
32 def setup():
33 def setup():
33 """setup temporary IPYTHONDIR for tests"""
34 """setup temporary IPYTHONDIR for tests"""
34 global IPYTHONDIR
35 global IPYTHONDIR
35 global env
36 global env
36 global save_get_ipython_dir
37 global save_get_ipython_dir
37
38
38 IPYTHONDIR = tempfile.mkdtemp()
39 IPYTHONDIR = tempfile.mkdtemp()
39 env = dict(IPYTHONDIR=IPYTHONDIR)
40 env = dict(IPYTHONDIR=IPYTHONDIR)
40 save_get_ipython_dir = path.get_ipython_dir
41 save_get_ipython_dir = path.get_ipython_dir
41 path.get_ipython_dir = lambda : IPYTHONDIR
42 path.get_ipython_dir = lambda : IPYTHONDIR
42
43
43
44
44 def teardown():
45 def teardown():
45 path.get_ipython_dir = save_get_ipython_dir
46 path.get_ipython_dir = save_get_ipython_dir
46
47
47 try:
48 try:
48 shutil.rmtree(IPYTHONDIR)
49 shutil.rmtree(IPYTHONDIR)
49 except (OSError, IOError):
50 except (OSError, IOError):
50 # no such file
51 # no such file
51 pass
52 pass
52
53
53
54
54 def _launch_kernel(cmd):
55 @contextmanager
56 def setup_kernel(cmd):
55 """start an embedded kernel in a subprocess, and wait for it to be ready
57 """start an embedded kernel in a subprocess, and wait for it to be ready
56
58
57 Returns
59 Returns
58 -------
60 -------
59 kernel, kernel_manager: Popen instance and connected KernelManager
61 kernel_manager: connected KernelManager instance
60 """
62 """
61 kernel = Popen([sys.executable, '-c', cmd], stdout=PIPE, stderr=PIPE, env=env)
63 kernel = Popen([sys.executable, '-c', cmd], stdout=PIPE, stderr=PIPE, env=env)
62 connection_file = os.path.join(IPYTHONDIR,
64 connection_file = os.path.join(IPYTHONDIR,
63 'profile_default',
65 'profile_default',
64 'security',
66 'security',
65 'kernel-%i.json' % kernel.pid
67 'kernel-%i.json' % kernel.pid
66 )
68 )
67 # wait for connection file to exist, timeout after 5s
69 # wait for connection file to exist, timeout after 5s
68 tic = time.time()
70 tic = time.time()
69 while not os.path.exists(connection_file) and kernel.poll() is None and time.time() < tic + 5:
71 while not os.path.exists(connection_file) and kernel.poll() is None and time.time() < tic + 5:
70 time.sleep(0.1)
72 time.sleep(0.1)
71
73
72 if not os.path.exists(connection_file):
74 if not os.path.exists(connection_file):
73 if kernel.poll() is None:
75 if kernel.poll() is None:
74 kernel.terminate()
76 kernel.terminate()
75 raise IOError("Connection file %r never arrived" % connection_file)
77 raise IOError("Connection file %r never arrived" % connection_file)
76
78
77 if kernel.poll() is not None:
79 if kernel.poll() is not None:
78 raise IOError("Kernel failed to start")
80 raise IOError("Kernel failed to start")
79
81
80 km = BlockingKernelManager(connection_file=connection_file)
82 km = BlockingKernelManager(connection_file=connection_file)
81 km.load_connection_file()
83 km.load_connection_file()
82 km.start_channels()
84 km.start_channels()
83
85
84 return kernel, km
86 try:
87 yield km
88 finally:
89 km.stop_channels()
85
90
86 def test_embed_kernel_basic():
91 def test_embed_kernel_basic():
87 """IPython.embed_kernel() is basically functional"""
92 """IPython.embed_kernel() is basically functional"""
88 cmd = '\n'.join([
93 cmd = '\n'.join([
89 'from IPython import embed_kernel',
94 'from IPython import embed_kernel',
90 'def go():',
95 'def go():',
91 ' a=5',
96 ' a=5',
92 ' b="hi there"',
97 ' b="hi there"',
93 ' embed_kernel()',
98 ' embed_kernel()',
94 'go()',
99 'go()',
95 '',
100 '',
96 ])
101 ])
97
102
98 kernel, km = _launch_kernel(cmd)
103 with setup_kernel(cmd) as km:
99 shell = km.shell_channel
104 shell = km.shell_channel
100
105
101 # oinfo a (int)
106 # oinfo a (int)
102 msg_id = shell.object_info('a')
107 msg_id = shell.object_info('a')
103 msg = shell.get_msg(block=True, timeout=2)
108 msg = shell.get_msg(block=True, timeout=2)
104 content = msg['content']
109 content = msg['content']
105 nt.assert_true(content['found'])
110 nt.assert_true(content['found'])
106
111
107 msg_id = shell.execute("c=a*2")
112 msg_id = shell.execute("c=a*2")
108 msg = shell.get_msg(block=True, timeout=2)
113 msg = shell.get_msg(block=True, timeout=2)
109 content = msg['content']
114 content = msg['content']
110 nt.assert_equals(content['status'], u'ok')
115 nt.assert_equals(content['status'], u'ok')
111
116
112 # oinfo c (should be 10)
117 # oinfo c (should be 10)
113 msg_id = shell.object_info('c')
118 msg_id = shell.object_info('c')
114 msg = shell.get_msg(block=True, timeout=2)
119 msg = shell.get_msg(block=True, timeout=2)
115 content = msg['content']
120 content = msg['content']
116 nt.assert_true(content['found'])
121 nt.assert_true(content['found'])
117 nt.assert_equals(content['string_form'], u'10')
122 nt.assert_equals(content['string_form'], u'10')
118
123
119 def test_embed_kernel_namespace():
124 def test_embed_kernel_namespace():
120 """IPython.embed_kernel() inherits calling namespace"""
125 """IPython.embed_kernel() inherits calling namespace"""
121 cmd = '\n'.join([
126 cmd = '\n'.join([
122 'from IPython import embed_kernel',
127 'from IPython import embed_kernel',
123 'def go():',
128 'def go():',
124 ' a=5',
129 ' a=5',
125 ' b="hi there"',
130 ' b="hi there"',
126 ' embed_kernel()',
131 ' embed_kernel()',
127 'go()',
132 'go()',
128 '',
133 '',
129 ])
134 ])
130
135
131 kernel, km = _launch_kernel(cmd)
136 with setup_kernel(cmd) as km:
132 shell = km.shell_channel
137 shell = km.shell_channel
133
138
134 # oinfo a (int)
139 # oinfo a (int)
135 msg_id = shell.object_info('a')
140 msg_id = shell.object_info('a')
136 msg = shell.get_msg(block=True, timeout=2)
141 msg = shell.get_msg(block=True, timeout=2)
137 content = msg['content']
142 content = msg['content']
138 nt.assert_true(content['found'])
143 nt.assert_true(content['found'])
139 nt.assert_equals(content['string_form'], u'5')
144 nt.assert_equals(content['string_form'], u'5')
140
145
141 # oinfo b (str)
146 # oinfo b (str)
142 msg_id = shell.object_info('b')
147 msg_id = shell.object_info('b')
143 msg = shell.get_msg(block=True, timeout=2)
148 msg = shell.get_msg(block=True, timeout=2)
144 content = msg['content']
149 content = msg['content']
145 nt.assert_true(content['found'])
150 nt.assert_true(content['found'])
146 nt.assert_equals(content['string_form'], u'hi there')
151 nt.assert_equals(content['string_form'], u'hi there')
147
152
148 # oinfo c (undefined)
153 # oinfo c (undefined)
149 msg_id = shell.object_info('c')
154 msg_id = shell.object_info('c')
150 msg = shell.get_msg(block=True, timeout=2)
155 msg = shell.get_msg(block=True, timeout=2)
151 content = msg['content']
156 content = msg['content']
152 nt.assert_false(content['found'])
157 nt.assert_false(content['found'])
153
158
General Comments 0
You need to be logged in to leave comments. Login now