##// END OF EJS Templates
ensure kernels are cleaned up in embed_kernel tests
MinRK -
Show More
@@ -1,153 +1,158 b''
1 1 """test IPython.embed_kernel()"""
2 2
3 3 #-------------------------------------------------------------------------------
4 4 # Copyright (C) 2012 The IPython Development Team
5 5 #
6 6 # Distributed under the terms of the BSD License. The full license is in
7 7 # the file COPYING, distributed as part of this software.
8 8 #-------------------------------------------------------------------------------
9 9
10 10 #-------------------------------------------------------------------------------
11 11 # Imports
12 12 #-------------------------------------------------------------------------------
13 13
14 14 import os
15 15 import shutil
16 16 import sys
17 17 import tempfile
18 18 import time
19 19
20 from contextlib import contextmanager
20 21 from subprocess import Popen, PIPE
21 22
22 23 import nose.tools as nt
23 24
24 25 from IPython.zmq.blockingkernelmanager import BlockingKernelManager
25 26 from IPython.utils import path
26 27
27 28
28 29 #-------------------------------------------------------------------------------
29 30 # Tests
30 31 #-------------------------------------------------------------------------------
31 32
32 33 def setup():
33 34 """setup temporary IPYTHONDIR for tests"""
34 35 global IPYTHONDIR
35 36 global env
36 37 global save_get_ipython_dir
37 38
38 39 IPYTHONDIR = tempfile.mkdtemp()
39 40 env = dict(IPYTHONDIR=IPYTHONDIR)
40 41 save_get_ipython_dir = path.get_ipython_dir
41 42 path.get_ipython_dir = lambda : IPYTHONDIR
42 43
43 44
44 45 def teardown():
45 46 path.get_ipython_dir = save_get_ipython_dir
46 47
47 48 try:
48 49 shutil.rmtree(IPYTHONDIR)
49 50 except (OSError, IOError):
50 51 # no such file
51 52 pass
52 53
53 54
54 def _launch_kernel(cmd):
55 @contextmanager
56 def setup_kernel(cmd):
55 57 """start an embedded kernel in a subprocess, and wait for it to be ready
56 58
57 59 Returns
58 60 -------
59 kernel, kernel_manager: Popen instance and connected KernelManager
61 kernel_manager: connected KernelManager instance
60 62 """
61 63 kernel = Popen([sys.executable, '-c', cmd], stdout=PIPE, stderr=PIPE, env=env)
62 64 connection_file = os.path.join(IPYTHONDIR,
63 65 'profile_default',
64 66 'security',
65 67 'kernel-%i.json' % kernel.pid
66 68 )
67 69 # wait for connection file to exist, timeout after 5s
68 70 tic = time.time()
69 71 while not os.path.exists(connection_file) and kernel.poll() is None and time.time() < tic + 5:
70 72 time.sleep(0.1)
71 73
72 74 if not os.path.exists(connection_file):
73 75 if kernel.poll() is None:
74 76 kernel.terminate()
75 77 raise IOError("Connection file %r never arrived" % connection_file)
76 78
77 79 if kernel.poll() is not None:
78 80 raise IOError("Kernel failed to start")
79 81
80 82 km = BlockingKernelManager(connection_file=connection_file)
81 83 km.load_connection_file()
82 84 km.start_channels()
83 85
84 return kernel, km
86 try:
87 yield km
88 finally:
89 km.stop_channels()
85 90
86 91 def test_embed_kernel_basic():
87 92 """IPython.embed_kernel() is basically functional"""
88 93 cmd = '\n'.join([
89 94 'from IPython import embed_kernel',
90 95 'def go():',
91 96 ' a=5',
92 97 ' b="hi there"',
93 98 ' embed_kernel()',
94 99 'go()',
95 100 '',
96 101 ])
97 102
98 kernel, km = _launch_kernel(cmd)
99 shell = km.shell_channel
103 with setup_kernel(cmd) as km:
104 shell = km.shell_channel
100 105
101 # oinfo a (int)
102 msg_id = shell.object_info('a')
103 msg = shell.get_msg(block=True, timeout=2)
104 content = msg['content']
105 nt.assert_true(content['found'])
106 # oinfo a (int)
107 msg_id = shell.object_info('a')
108 msg = shell.get_msg(block=True, timeout=2)
109 content = msg['content']
110 nt.assert_true(content['found'])
106 111
107 msg_id = shell.execute("c=a*2")
108 msg = shell.get_msg(block=True, timeout=2)
109 content = msg['content']
110 nt.assert_equals(content['status'], u'ok')
111
112 # oinfo c (should be 10)
113 msg_id = shell.object_info('c')
114 msg = shell.get_msg(block=True, timeout=2)
115 content = msg['content']
116 nt.assert_true(content['found'])
117 nt.assert_equals(content['string_form'], u'10')
112 msg_id = shell.execute("c=a*2")
113 msg = shell.get_msg(block=True, timeout=2)
114 content = msg['content']
115 nt.assert_equals(content['status'], u'ok')
116
117 # oinfo c (should be 10)
118 msg_id = shell.object_info('c')
119 msg = shell.get_msg(block=True, timeout=2)
120 content = msg['content']
121 nt.assert_true(content['found'])
122 nt.assert_equals(content['string_form'], u'10')
118 123
119 124 def test_embed_kernel_namespace():
120 125 """IPython.embed_kernel() inherits calling namespace"""
121 126 cmd = '\n'.join([
122 127 'from IPython import embed_kernel',
123 128 'def go():',
124 129 ' a=5',
125 130 ' b="hi there"',
126 131 ' embed_kernel()',
127 132 'go()',
128 133 '',
129 134 ])
130 135
131 kernel, km = _launch_kernel(cmd)
132 shell = km.shell_channel
136 with setup_kernel(cmd) as km:
137 shell = km.shell_channel
133 138
134 # oinfo a (int)
135 msg_id = shell.object_info('a')
136 msg = shell.get_msg(block=True, timeout=2)
137 content = msg['content']
138 nt.assert_true(content['found'])
139 nt.assert_equals(content['string_form'], u'5')
140
141 # oinfo b (str)
142 msg_id = shell.object_info('b')
143 msg = shell.get_msg(block=True, timeout=2)
144 content = msg['content']
145 nt.assert_true(content['found'])
146 nt.assert_equals(content['string_form'], u'hi there')
147
148 # oinfo c (undefined)
149 msg_id = shell.object_info('c')
150 msg = shell.get_msg(block=True, timeout=2)
151 content = msg['content']
152 nt.assert_false(content['found'])
139 # oinfo a (int)
140 msg_id = shell.object_info('a')
141 msg = shell.get_msg(block=True, timeout=2)
142 content = msg['content']
143 nt.assert_true(content['found'])
144 nt.assert_equals(content['string_form'], u'5')
145
146 # oinfo b (str)
147 msg_id = shell.object_info('b')
148 msg = shell.get_msg(block=True, timeout=2)
149 content = msg['content']
150 nt.assert_true(content['found'])
151 nt.assert_equals(content['string_form'], u'hi there')
152
153 # oinfo c (undefined)
154 msg_id = shell.object_info('c')
155 msg = shell.get_msg(block=True, timeout=2)
156 content = msg['content']
157 nt.assert_false(content['found'])
153 158
General Comments 0
You need to be logged in to leave comments. Login now