##// END OF EJS Templates
add basic print tests for kernel...
MinRK -
Show More
@@ -0,0 +1,183 b''
1 """test the IPython Kernel"""
2
3 #-------------------------------------------------------------------------------
4 # Copyright (C) 2013 The IPython Development Team
5 #
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
8 #-------------------------------------------------------------------------------
9
10 #-------------------------------------------------------------------------------
11 # Imports
12 #-------------------------------------------------------------------------------
13
14 import os
15 import shutil
16 import tempfile
17
18 from Queue import Empty
19 from contextlib import contextmanager
20 from subprocess import PIPE
21
22 import nose.tools as nt
23
24 from IPython.zmq.blockingkernelmanager import BlockingKernelManager
25 from IPython.zmq.tests.test_message_spec import execute, flush_channels
26 from IPython.testing import decorators as dec
27 from IPython.utils import path, py3compat
28
29 #-------------------------------------------------------------------------------
30 # Tests
31 #-------------------------------------------------------------------------------
32
33 def setup():
34 """setup temporary IPYTHONDIR for tests"""
35 global IPYTHONDIR
36 global save_env
37 global save_get_ipython_dir
38
39 IPYTHONDIR = tempfile.mkdtemp()
40
41 save_env = os.environ.copy()
42 os.environ["IPYTHONDIR"] = IPYTHONDIR
43
44 save_get_ipython_dir = path.get_ipython_dir
45 path.get_ipython_dir = lambda : IPYTHONDIR
46
47
48 def teardown():
49 path.get_ipython_dir = save_get_ipython_dir
50 os.environ = save_env
51
52 try:
53 shutil.rmtree(IPYTHONDIR)
54 except (OSError, IOError):
55 # no such file
56 pass
57
58
59 @contextmanager
60 def new_kernel():
61 """start a kernel in a subprocess, and wait for it to be ready
62
63 Returns
64 -------
65 kernel_manager: connected KernelManager instance
66 """
67 KM = BlockingKernelManager()
68
69 KM.start_kernel(stdout=PIPE, stderr=PIPE)
70 KM.start_channels()
71
72 # wait for kernel to be ready
73 KM.shell_channel.execute("import sys")
74 KM.shell_channel.get_msg(block=True, timeout=5)
75 flush_channels(KM)
76 try:
77 yield KM
78 finally:
79 KM.stop_channels()
80 KM.shutdown_kernel()
81
82
83 def assemble_output(iopub):
84 """assemble stdout/err from an execution"""
85 stdout = ''
86 stderr = ''
87 while True:
88 msg = iopub.get_msg(block=True, timeout=1)
89 msg_type = msg['msg_type']
90 content = msg['content']
91 if msg_type == 'status' and content['execution_state'] == 'idle':
92 # idle message signals end of output
93 break
94 elif msg['msg_type'] == 'stream':
95 if content['name'] == 'stdout':
96 stdout = stdout + content['data']
97 elif content['name'] == 'stderr':
98 stderr = stderr + content['data']
99 else:
100 raise KeyError("bad stream: %r" % content['name'])
101 else:
102 # other output, ignored
103 pass
104 return stdout, stderr
105
106
107 def _check_mp_mode(km, expected=False, stream="stdout"):
108 execute(km=km, code="import sys")
109 flush_channels(km)
110 msg_id, content = execute(km=km, code="print (sys.stdout._check_mp_mode())")
111 stdout, stderr = assemble_output(km.sub_channel)
112 nt.assert_equal(eval(stdout.strip()), expected)
113
114
115 @dec.parametric
116 def test_simple_print():
117 """simple print statement in kernel"""
118 with new_kernel() as km:
119 iopub = km.sub_channel
120
121 msg_id, content = execute(km=km, code="print ('hi')")
122 stdout, stderr = assemble_output(iopub)
123 yield nt.assert_equal(stdout, 'hi\n')
124 yield nt.assert_equal(stderr, '')
125 yield _check_mp_mode(km, expected=False)
126
127
128 @dec.parametric
129 def test_subprocess_print():
130 """printing from forked mp.Process"""
131 with new_kernel() as km:
132 iopub = km.sub_channel
133
134 yield _check_mp_mode(km, expected=False)
135 flush_channels(km)
136 np = 5
137 code = '\n'.join([
138 "import multiprocessing as mp",
139 "def f(x):",
140 " print 'hello',x",
141 "pool = [mp.Process(target=f,args=(i,)) for i in range(%i)]" % np,
142 "for p in pool: p.start()",
143 "for p in pool: p.join()"
144 ])
145
146 expected = '\n'.join([
147 "hello %s" % i for i in range(np)
148 ]) + '\n'
149
150 msg_id, content = execute(km=km, code=code)
151 stdout, stderr = assemble_output(iopub)
152 yield nt.assert_equal(stdout.count("hello"), np, stdout)
153 for n in range(np):
154 yield nt.assert_equal(stdout.count(str(n)), 1, stdout)
155 yield nt.assert_equal(stderr, '')
156 yield _check_mp_mode(km, expected=True)
157 yield _check_mp_mode(km, expected=True, stream="stderr")
158
159
160 @dec.parametric
161 def test_subprocess_noprint():
162 """mp.Process without print doesn't trigger iostream mp_mode"""
163 with new_kernel() as km:
164 iopub = km.sub_channel
165
166 np = 5
167 code = '\n'.join([
168 "import multiprocessing as mp",
169 "def f(x):",
170 " return x",
171 "pool = [mp.Process(target=f,args=(i,)) for i in range(%i)]" % np,
172 "for p in pool: p.start()",
173 "for p in pool: p.join()"
174 ])
175
176 msg_id, content = execute(km=km, code=code)
177 stdout, stderr = assemble_output(iopub)
178 yield nt.assert_equal(stdout, '')
179 yield nt.assert_equal(stderr, '')
180
181 yield _check_mp_mode(km, expected=False)
182 yield _check_mp_mode(km, expected=False, stream="stderr")
183
General Comments 0
You need to be logged in to leave comments. Login now