##// END OF EJS Templates
add basic embed_kernel tests
MinRK -
Show More
@@ -0,0 +1,153 b''
1 """test IPython.embed_kernel()"""
2
3 #-------------------------------------------------------------------------------
4 # Copyright (C) 2012 The IPython Development Team
5 #
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
8 #-------------------------------------------------------------------------------
9
10 #-------------------------------------------------------------------------------
11 # Imports
12 #-------------------------------------------------------------------------------
13
14 import os
15 import shutil
16 import sys
17 import tempfile
18 import time
19
20 from subprocess import Popen, PIPE
21
22 import nose.tools as nt
23
24 from IPython.zmq.blockingkernelmanager import BlockingKernelManager
25 from IPython.utils import path
26
27
28 #-------------------------------------------------------------------------------
29 # Tests
30 #-------------------------------------------------------------------------------
31
32 def setup():
33 """setup temporary IPYTHONDIR for tests"""
34 global IPYTHONDIR
35 global env
36 global save_get_ipython_dir
37
38 IPYTHONDIR = tempfile.mkdtemp()
39 env = dict(IPYTHONDIR=IPYTHONDIR)
40 save_get_ipython_dir = path.get_ipython_dir
41 path.get_ipython_dir = lambda : IPYTHONDIR
42
43
44 def teardown():
45 path.get_ipython_dir = save_get_ipython_dir
46
47 try:
48 shutil.rmtree(IPYTHONDIR)
49 except (OSError, IOError):
50 # no such file
51 pass
52
53
54 def _launch_kernel(cmd):
55 """start an embedded kernel in a subprocess, and wait for it to be ready
56
57 Returns
58 -------
59 kernel, kernel_manager: Popen instance and connected KernelManager
60 """
61 kernel = Popen([sys.executable, '-c', cmd], stdout=PIPE, stderr=PIPE, env=env)
62 connection_file = os.path.join(IPYTHONDIR,
63 'profile_default',
64 'security',
65 'kernel-%i.json' % kernel.pid
66 )
67 # wait for connection file to exist, timeout after 5s
68 tic = time.time()
69 while not os.path.exists(connection_file) and kernel.poll() is None and time.time() < tic + 5:
70 time.sleep(0.1)
71
72 if not os.path.exists(connection_file):
73 if kernel.poll() is None:
74 kernel.terminate()
75 raise IOError("Connection file %r never arrived" % connection_file)
76
77 if kernel.poll() is not None:
78 raise IOError("Kernel failed to start")
79
80 km = BlockingKernelManager(connection_file=connection_file)
81 km.load_connection_file()
82 km.start_channels()
83
84 return kernel, km
85
86 def test_embed_kernel_basic():
87 """IPython.embed_kernel() is basically functional"""
88 cmd = '\n'.join([
89 'from IPython import embed_kernel',
90 'def go():',
91 ' a=5',
92 ' b="hi there"',
93 ' embed_kernel()',
94 'go()',
95 '',
96 ])
97
98 kernel, km = _launch_kernel(cmd)
99 shell = km.shell_channel
100
101 # oinfo a (int)
102 msg_id = shell.object_info('a')
103 msg = shell.get_msg(block=True, timeout=2)
104 content = msg['content']
105 nt.assert_true(content['found'])
106
107 msg_id = shell.execute("c=a*2")
108 msg = shell.get_msg(block=True, timeout=2)
109 content = msg['content']
110 nt.assert_equals(content['status'], u'ok')
111
112 # oinfo c (should be 10)
113 msg_id = shell.object_info('c')
114 msg = shell.get_msg(block=True, timeout=2)
115 content = msg['content']
116 nt.assert_true(content['found'])
117 nt.assert_equals(content['string_form'], u'10')
118
119 def test_embed_kernel_namespace():
120 """IPython.embed_kernel() inherits calling namespace"""
121 cmd = '\n'.join([
122 'from IPython import embed_kernel',
123 'def go():',
124 ' a=5',
125 ' b="hi there"',
126 ' embed_kernel()',
127 'go()',
128 '',
129 ])
130
131 kernel, km = _launch_kernel(cmd)
132 shell = km.shell_channel
133
134 # oinfo a (int)
135 msg_id = shell.object_info('a')
136 msg = shell.get_msg(block=True, timeout=2)
137 content = msg['content']
138 nt.assert_true(content['found'])
139 nt.assert_equals(content['string_form'], u'5')
140
141 # oinfo b (str)
142 msg_id = shell.object_info('b')
143 msg = shell.get_msg(block=True, timeout=2)
144 content = msg['content']
145 nt.assert_true(content['found'])
146 nt.assert_equals(content['string_form'], u'hi there')
147
148 # oinfo c (undefined)
149 msg_id = shell.object_info('c')
150 msg = shell.get_msg(block=True, timeout=2)
151 content = msg['content']
152 nt.assert_false(content['found'])
153
General Comments 0
You need to be logged in to leave comments. Login now