##// END OF EJS Templates
subprocess outstream forwarding doesn't work on Windows
MinRK -
Show More
@@ -1,198 +1,201 b''
1 """test the IPython Kernel"""
1 """test the IPython Kernel"""
2
2
3 #-------------------------------------------------------------------------------
3 #-------------------------------------------------------------------------------
4 # Copyright (C) 2013 The IPython Development Team
4 # Copyright (C) 2013 The IPython Development Team
5 #
5 #
6 # Distributed under the terms of the BSD License. The full license is in
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
7 # the file COPYING, distributed as part of this software.
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9
9
10 #-------------------------------------------------------------------------------
10 #-------------------------------------------------------------------------------
11 # Imports
11 # Imports
12 #-------------------------------------------------------------------------------
12 #-------------------------------------------------------------------------------
13
13
14 import os
14 import os
15 import shutil
15 import shutil
16 import sys
16 import tempfile
17 import tempfile
17
18
18 from Queue import Empty
19 from Queue import Empty
19 from contextlib import contextmanager
20 from contextlib import contextmanager
20 from subprocess import PIPE
21 from subprocess import PIPE
21
22
22 import nose.tools as nt
23 import nose.tools as nt
23
24
24 from IPython.zmq.blockingkernelmanager import BlockingKernelManager
25 from IPython.zmq.blockingkernelmanager import BlockingKernelManager
25 from IPython.zmq.tests.test_message_spec import execute, flush_channels
26 from IPython.zmq.tests.test_message_spec import execute, flush_channels
26 from IPython.testing import decorators as dec
27 from IPython.testing import decorators as dec
27 from IPython.utils import path, py3compat
28 from IPython.utils import path, py3compat
28
29
29 #-------------------------------------------------------------------------------
30 #-------------------------------------------------------------------------------
30 # Tests
31 # Tests
31 #-------------------------------------------------------------------------------
32 #-------------------------------------------------------------------------------
32
33
33 def setup():
34 def setup():
34 """setup temporary IPYTHONDIR for tests"""
35 """setup temporary IPYTHONDIR for tests"""
35 global IPYTHONDIR
36 global IPYTHONDIR
36 global save_env
37 global save_env
37 global save_get_ipython_dir
38 global save_get_ipython_dir
38
39
39 IPYTHONDIR = tempfile.mkdtemp()
40 IPYTHONDIR = tempfile.mkdtemp()
40
41
41 save_env = os.environ.copy()
42 save_env = os.environ.copy()
42 os.environ["IPYTHONDIR"] = IPYTHONDIR
43 os.environ["IPYTHONDIR"] = IPYTHONDIR
43
44
44 save_get_ipython_dir = path.get_ipython_dir
45 save_get_ipython_dir = path.get_ipython_dir
45 path.get_ipython_dir = lambda : IPYTHONDIR
46 path.get_ipython_dir = lambda : IPYTHONDIR
46
47
47
48
48 def teardown():
49 def teardown():
49 path.get_ipython_dir = save_get_ipython_dir
50 path.get_ipython_dir = save_get_ipython_dir
50 os.environ = save_env
51 os.environ = save_env
51
52
52 try:
53 try:
53 shutil.rmtree(IPYTHONDIR)
54 shutil.rmtree(IPYTHONDIR)
54 except (OSError, IOError):
55 except (OSError, IOError):
55 # no such file
56 # no such file
56 pass
57 pass
57
58
58
59
59 @contextmanager
60 @contextmanager
60 def new_kernel():
61 def new_kernel():
61 """start a kernel in a subprocess, and wait for it to be ready
62 """start a kernel in a subprocess, and wait for it to be ready
62
63
63 Returns
64 Returns
64 -------
65 -------
65 kernel_manager: connected KernelManager instance
66 kernel_manager: connected KernelManager instance
66 """
67 """
67 KM = BlockingKernelManager()
68 KM = BlockingKernelManager()
68
69
69 KM.start_kernel(stdout=PIPE, stderr=PIPE)
70 KM.start_kernel(stdout=PIPE, stderr=PIPE)
70 KM.start_channels()
71 KM.start_channels()
71
72
72 # wait for kernel to be ready
73 # wait for kernel to be ready
73 KM.shell_channel.execute("import sys")
74 KM.shell_channel.execute("import sys")
74 KM.shell_channel.get_msg(block=True, timeout=5)
75 KM.shell_channel.get_msg(block=True, timeout=5)
75 flush_channels(KM)
76 flush_channels(KM)
76 try:
77 try:
77 yield KM
78 yield KM
78 finally:
79 finally:
79 KM.stop_channels()
80 KM.stop_channels()
80 KM.shutdown_kernel()
81 KM.shutdown_kernel()
81
82
82
83
83 def assemble_output(iopub):
84 def assemble_output(iopub):
84 """assemble stdout/err from an execution"""
85 """assemble stdout/err from an execution"""
85 stdout = ''
86 stdout = ''
86 stderr = ''
87 stderr = ''
87 while True:
88 while True:
88 msg = iopub.get_msg(block=True, timeout=1)
89 msg = iopub.get_msg(block=True, timeout=1)
89 msg_type = msg['msg_type']
90 msg_type = msg['msg_type']
90 content = msg['content']
91 content = msg['content']
91 if msg_type == 'status' and content['execution_state'] == 'idle':
92 if msg_type == 'status' and content['execution_state'] == 'idle':
92 # idle message signals end of output
93 # idle message signals end of output
93 break
94 break
94 elif msg['msg_type'] == 'stream':
95 elif msg['msg_type'] == 'stream':
95 if content['name'] == 'stdout':
96 if content['name'] == 'stdout':
96 stdout = stdout + content['data']
97 stdout = stdout + content['data']
97 elif content['name'] == 'stderr':
98 elif content['name'] == 'stderr':
98 stderr = stderr + content['data']
99 stderr = stderr + content['data']
99 else:
100 else:
100 raise KeyError("bad stream: %r" % content['name'])
101 raise KeyError("bad stream: %r" % content['name'])
101 else:
102 else:
102 # other output, ignored
103 # other output, ignored
103 pass
104 pass
104 return stdout, stderr
105 return stdout, stderr
105
106
106
107
107 def _check_mp_mode(km, expected=False, stream="stdout"):
108 def _check_mp_mode(km, expected=False, stream="stdout"):
108 execute(km=km, code="import sys")
109 execute(km=km, code="import sys")
109 flush_channels(km)
110 flush_channels(km)
110 msg_id, content = execute(km=km, code="print (sys.%s._check_mp_mode())" % stream)
111 msg_id, content = execute(km=km, code="print (sys.%s._check_mp_mode())" % stream)
111 stdout, stderr = assemble_output(km.iopub_channel)
112 stdout, stderr = assemble_output(km.iopub_channel)
112 nt.assert_equal(eval(stdout.strip()), expected)
113 nt.assert_equal(eval(stdout.strip()), expected)
113
114
114
115
115 def test_simple_print():
116 def test_simple_print():
116 """simple print statement in kernel"""
117 """simple print statement in kernel"""
117 with new_kernel() as km:
118 with new_kernel() as km:
118 iopub = km.iopub_channel
119 iopub = km.iopub_channel
119 msg_id, content = execute(km=km, code="print ('hi')")
120 msg_id, content = execute(km=km, code="print ('hi')")
120 stdout, stderr = assemble_output(iopub)
121 stdout, stderr = assemble_output(iopub)
121 nt.assert_equal(stdout, 'hi\n')
122 nt.assert_equal(stdout, 'hi\n')
122 nt.assert_equal(stderr, '')
123 nt.assert_equal(stderr, '')
123 _check_mp_mode(km, expected=False)
124 _check_mp_mode(km, expected=False)
124 print ('hello')
125 print ('hello')
125
126
126
127
128 @dec.knownfailureif(sys.platform == 'win32', "subprocess prints fail on Windows")
127 def test_subprocess_print():
129 def test_subprocess_print():
128 """printing from forked mp.Process"""
130 """printing from forked mp.Process"""
129 with new_kernel() as km:
131 with new_kernel() as km:
130 iopub = km.iopub_channel
132 iopub = km.iopub_channel
131
133
132 _check_mp_mode(km, expected=False)
134 _check_mp_mode(km, expected=False)
133 flush_channels(km)
135 flush_channels(km)
134 np = 5
136 np = 5
135 code = '\n'.join([
137 code = '\n'.join([
136 "from __future__ import print_function",
138 "from __future__ import print_function",
137 "import multiprocessing as mp",
139 "import multiprocessing as mp",
138 "pool = [mp.Process(target=print, args=('hello', i,)) for i in range(%i)]" % np,
140 "pool = [mp.Process(target=print, args=('hello', i,)) for i in range(%i)]" % np,
139 "for p in pool: p.start()",
141 "for p in pool: p.start()",
140 "for p in pool: p.join()"
142 "for p in pool: p.join()"
141 ])
143 ])
142
144
143 expected = '\n'.join([
145 expected = '\n'.join([
144 "hello %s" % i for i in range(np)
146 "hello %s" % i for i in range(np)
145 ]) + '\n'
147 ]) + '\n'
146
148
147 msg_id, content = execute(km=km, code=code)
149 msg_id, content = execute(km=km, code=code)
148 stdout, stderr = assemble_output(iopub)
150 stdout, stderr = assemble_output(iopub)
149 nt.assert_equal(stdout.count("hello"), np, stdout)
151 nt.assert_equal(stdout.count("hello"), np, stdout)
150 for n in range(np):
152 for n in range(np):
151 nt.assert_equal(stdout.count(str(n)), 1, stdout)
153 nt.assert_equal(stdout.count(str(n)), 1, stdout)
152 nt.assert_equal(stderr, '')
154 nt.assert_equal(stderr, '')
153 _check_mp_mode(km, expected=False)
155 _check_mp_mode(km, expected=False)
154 _check_mp_mode(km, expected=False, stream="stderr")
156 _check_mp_mode(km, expected=False, stream="stderr")
155
157
156
158
157 def test_subprocess_noprint():
159 def test_subprocess_noprint():
158 """mp.Process without print doesn't trigger iostream mp_mode"""
160 """mp.Process without print doesn't trigger iostream mp_mode"""
159 with new_kernel() as km:
161 with new_kernel() as km:
160 iopub = km.iopub_channel
162 iopub = km.iopub_channel
161
163
162 np = 5
164 np = 5
163 code = '\n'.join([
165 code = '\n'.join([
164 "import multiprocessing as mp",
166 "import multiprocessing as mp",
165 "pool = [mp.Process(target=range,args=(i,)) for i in range(%i)]" % np,
167 "pool = [mp.Process(target=range, args=(i,)) for i in range(%i)]" % np,
166 "for p in pool: p.start()",
168 "for p in pool: p.start()",
167 "for p in pool: p.join()"
169 "for p in pool: p.join()"
168 ])
170 ])
169
171
170 msg_id, content = execute(km=km, code=code)
172 msg_id, content = execute(km=km, code=code)
171 stdout, stderr = assemble_output(iopub)
173 stdout, stderr = assemble_output(iopub)
172 nt.assert_equal(stdout, '')
174 nt.assert_equal(stdout, '')
173 nt.assert_equal(stderr, '')
175 nt.assert_equal(stderr, '')
174
176
175 _check_mp_mode(km, expected=False)
177 _check_mp_mode(km, expected=False)
176 _check_mp_mode(km, expected=False, stream="stderr")
178 _check_mp_mode(km, expected=False, stream="stderr")
177
179
178
180
181 @dec.knownfailureif(sys.platform == 'win32', "subprocess prints fail on Windows")
179 def test_subprocess_error():
182 def test_subprocess_error():
180 """error in mp.Process doesn't crash"""
183 """error in mp.Process doesn't crash"""
181 with new_kernel() as km:
184 with new_kernel() as km:
182 iopub = km.iopub_channel
185 iopub = km.iopub_channel
183
186
184 code = '\n'.join([
187 code = '\n'.join([
185 "import multiprocessing as mp",
188 "import multiprocessing as mp",
186 "p = mp.Process(target=int, args=('hi',))",
189 "p = mp.Process(target=int, args=('hi',))",
187 "p.start()",
190 "p.start()",
188 "p.join()",
191 "p.join()",
189 ])
192 ])
190
193
191 msg_id, content = execute(km=km, code=code)
194 msg_id, content = execute(km=km, code=code)
192 stdout, stderr = assemble_output(iopub)
195 stdout, stderr = assemble_output(iopub)
193 nt.assert_equal(stdout, '')
196 nt.assert_equal(stdout, '')
194 nt.assert_true("ValueError" in stderr, stderr)
197 nt.assert_true("ValueError" in stderr, stderr)
195
198
196 _check_mp_mode(km, expected=False)
199 _check_mp_mode(km, expected=False)
197 _check_mp_mode(km, expected=False, stream="stderr")
200 _check_mp_mode(km, expected=False, stream="stderr")
198
201
General Comments 0
You need to be logged in to leave comments. Login now