##// END OF EJS Templates
add test for raising error in forked process
MinRK -
Show More
@@ -1,183 +1,205 b''
1 """test the IPython Kernel"""
1 """test the IPython Kernel"""
2
2
3 #-------------------------------------------------------------------------------
3 #-------------------------------------------------------------------------------
4 # Copyright (C) 2013 The IPython Development Team
4 # Copyright (C) 2013 The IPython Development Team
5 #
5 #
6 # Distributed under the terms of the BSD License. The full license is in
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
7 # the file COPYING, distributed as part of this software.
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9
9
10 #-------------------------------------------------------------------------------
10 #-------------------------------------------------------------------------------
11 # Imports
11 # Imports
12 #-------------------------------------------------------------------------------
12 #-------------------------------------------------------------------------------
13
13
14 import os
14 import os
15 import shutil
15 import shutil
16 import tempfile
16 import tempfile
17
17
18 from Queue import Empty
18 from Queue import Empty
19 from contextlib import contextmanager
19 from contextlib import contextmanager
20 from subprocess import PIPE
20 from subprocess import PIPE
21
21
22 import nose.tools as nt
22 import nose.tools as nt
23
23
24 from IPython.zmq.blockingkernelmanager import BlockingKernelManager
24 from IPython.zmq.blockingkernelmanager import BlockingKernelManager
25 from IPython.zmq.tests.test_message_spec import execute, flush_channels
25 from IPython.zmq.tests.test_message_spec import execute, flush_channels
26 from IPython.testing import decorators as dec
26 from IPython.testing import decorators as dec
27 from IPython.utils import path, py3compat
27 from IPython.utils import path, py3compat
28
28
29 #-------------------------------------------------------------------------------
29 #-------------------------------------------------------------------------------
30 # Tests
30 # Tests
31 #-------------------------------------------------------------------------------
31 #-------------------------------------------------------------------------------
32
32
33 def setup():
33 def setup():
34 """setup temporary IPYTHONDIR for tests"""
34 """setup temporary IPYTHONDIR for tests"""
35 global IPYTHONDIR
35 global IPYTHONDIR
36 global save_env
36 global save_env
37 global save_get_ipython_dir
37 global save_get_ipython_dir
38
38
39 IPYTHONDIR = tempfile.mkdtemp()
39 IPYTHONDIR = tempfile.mkdtemp()
40
40
41 save_env = os.environ.copy()
41 save_env = os.environ.copy()
42 os.environ["IPYTHONDIR"] = IPYTHONDIR
42 os.environ["IPYTHONDIR"] = IPYTHONDIR
43
43
44 save_get_ipython_dir = path.get_ipython_dir
44 save_get_ipython_dir = path.get_ipython_dir
45 path.get_ipython_dir = lambda : IPYTHONDIR
45 path.get_ipython_dir = lambda : IPYTHONDIR
46
46
47
47
48 def teardown():
48 def teardown():
49 path.get_ipython_dir = save_get_ipython_dir
49 path.get_ipython_dir = save_get_ipython_dir
50 os.environ = save_env
50 os.environ = save_env
51
51
52 try:
52 try:
53 shutil.rmtree(IPYTHONDIR)
53 shutil.rmtree(IPYTHONDIR)
54 except (OSError, IOError):
54 except (OSError, IOError):
55 # no such file
55 # no such file
56 pass
56 pass
57
57
58
58
59 @contextmanager
59 @contextmanager
60 def new_kernel():
60 def new_kernel():
61 """start a kernel in a subprocess, and wait for it to be ready
61 """start a kernel in a subprocess, and wait for it to be ready
62
62
63 Returns
63 Returns
64 -------
64 -------
65 kernel_manager: connected KernelManager instance
65 kernel_manager: connected KernelManager instance
66 """
66 """
67 KM = BlockingKernelManager()
67 KM = BlockingKernelManager()
68
68
69 KM.start_kernel(stdout=PIPE, stderr=PIPE)
69 KM.start_kernel(stdout=PIPE, stderr=PIPE)
70 KM.start_channels()
70 KM.start_channels()
71
71
72 # wait for kernel to be ready
72 # wait for kernel to be ready
73 KM.shell_channel.execute("import sys")
73 KM.shell_channel.execute("import sys")
74 KM.shell_channel.get_msg(block=True, timeout=5)
74 KM.shell_channel.get_msg(block=True, timeout=5)
75 flush_channels(KM)
75 flush_channels(KM)
76 try:
76 try:
77 yield KM
77 yield KM
78 finally:
78 finally:
79 KM.stop_channels()
79 KM.stop_channels()
80 KM.shutdown_kernel()
80 KM.shutdown_kernel()
81
81
82
82
83 def assemble_output(iopub):
83 def assemble_output(iopub):
84 """assemble stdout/err from an execution"""
84 """assemble stdout/err from an execution"""
85 stdout = ''
85 stdout = ''
86 stderr = ''
86 stderr = ''
87 while True:
87 while True:
88 msg = iopub.get_msg(block=True, timeout=1)
88 msg = iopub.get_msg(block=True, timeout=1)
89 msg_type = msg['msg_type']
89 msg_type = msg['msg_type']
90 content = msg['content']
90 content = msg['content']
91 if msg_type == 'status' and content['execution_state'] == 'idle':
91 if msg_type == 'status' and content['execution_state'] == 'idle':
92 # idle message signals end of output
92 # idle message signals end of output
93 break
93 break
94 elif msg['msg_type'] == 'stream':
94 elif msg['msg_type'] == 'stream':
95 if content['name'] == 'stdout':
95 if content['name'] == 'stdout':
96 stdout = stdout + content['data']
96 stdout = stdout + content['data']
97 elif content['name'] == 'stderr':
97 elif content['name'] == 'stderr':
98 stderr = stderr + content['data']
98 stderr = stderr + content['data']
99 else:
99 else:
100 raise KeyError("bad stream: %r" % content['name'])
100 raise KeyError("bad stream: %r" % content['name'])
101 else:
101 else:
102 # other output, ignored
102 # other output, ignored
103 pass
103 pass
104 return stdout, stderr
104 return stdout, stderr
105
105
106
106
107 def _check_mp_mode(km, expected=False, stream="stdout"):
107 def _check_mp_mode(km, expected=False, stream="stdout"):
108 execute(km=km, code="import sys")
108 execute(km=km, code="import sys")
109 flush_channels(km)
109 flush_channels(km)
110 msg_id, content = execute(km=km, code="print (sys.stdout._check_mp_mode())")
110 msg_id, content = execute(km=km, code="print (sys.%s._check_mp_mode())" % stream)
111 stdout, stderr = assemble_output(km.sub_channel)
111 stdout, stderr = assemble_output(km.sub_channel)
112 nt.assert_equal(eval(stdout.strip()), expected)
112 nt.assert_equal(eval(stdout.strip()), expected)
113
113
114
114
115 @dec.parametric
115 @dec.parametric
116 def test_simple_print():
116 def test_simple_print():
117 """simple print statement in kernel"""
117 """simple print statement in kernel"""
118 with new_kernel() as km:
118 with new_kernel() as km:
119 iopub = km.sub_channel
119 iopub = km.sub_channel
120
121 msg_id, content = execute(km=km, code="print ('hi')")
120 msg_id, content = execute(km=km, code="print ('hi')")
122 stdout, stderr = assemble_output(iopub)
121 stdout, stderr = assemble_output(iopub)
123 yield nt.assert_equal(stdout, 'hi\n')
122 yield nt.assert_equal(stdout, 'hi\n')
124 yield nt.assert_equal(stderr, '')
123 yield nt.assert_equal(stderr, '')
125 yield _check_mp_mode(km, expected=False)
124 yield _check_mp_mode(km, expected=False)
126
125
127
126
128 @dec.parametric
127 @dec.parametric
129 def test_subprocess_print():
128 def test_subprocess_print():
130 """printing from forked mp.Process"""
129 """printing from forked mp.Process"""
131 with new_kernel() as km:
130 with new_kernel() as km:
132 iopub = km.sub_channel
131 iopub = km.sub_channel
133
132
134 yield _check_mp_mode(km, expected=False)
133 yield _check_mp_mode(km, expected=False)
135 flush_channels(km)
134 flush_channels(km)
136 np = 5
135 np = 5
137 code = '\n'.join([
136 code = '\n'.join([
138 "import multiprocessing as mp",
137 "import multiprocessing as mp",
139 "def f(x):",
138 "def f(x):",
140 " print 'hello',x",
139 " print 'hello',x",
141 "pool = [mp.Process(target=f,args=(i,)) for i in range(%i)]" % np,
140 "pool = [mp.Process(target=f,args=(i,)) for i in range(%i)]" % np,
142 "for p in pool: p.start()",
141 "for p in pool: p.start()",
143 "for p in pool: p.join()"
142 "for p in pool: p.join()"
144 ])
143 ])
145
144
146 expected = '\n'.join([
145 expected = '\n'.join([
147 "hello %s" % i for i in range(np)
146 "hello %s" % i for i in range(np)
148 ]) + '\n'
147 ]) + '\n'
149
148
150 msg_id, content = execute(km=km, code=code)
149 msg_id, content = execute(km=km, code=code)
151 stdout, stderr = assemble_output(iopub)
150 stdout, stderr = assemble_output(iopub)
152 yield nt.assert_equal(stdout.count("hello"), np, stdout)
151 yield nt.assert_equal(stdout.count("hello"), np, stdout)
153 for n in range(np):
152 for n in range(np):
154 yield nt.assert_equal(stdout.count(str(n)), 1, stdout)
153 yield nt.assert_equal(stdout.count(str(n)), 1, stdout)
155 yield nt.assert_equal(stderr, '')
154 yield nt.assert_equal(stderr, '')
156 yield _check_mp_mode(km, expected=True)
155 yield _check_mp_mode(km, expected=True)
157 yield _check_mp_mode(km, expected=True, stream="stderr")
156 yield _check_mp_mode(km, expected=False, stream="stderr")
158
157
159
158
160 @dec.parametric
159 @dec.parametric
161 def test_subprocess_noprint():
160 def test_subprocess_noprint():
162 """mp.Process without print doesn't trigger iostream mp_mode"""
161 """mp.Process without print doesn't trigger iostream mp_mode"""
163 with new_kernel() as km:
162 with new_kernel() as km:
164 iopub = km.sub_channel
163 iopub = km.sub_channel
165
164
166 np = 5
165 np = 5
167 code = '\n'.join([
166 code = '\n'.join([
168 "import multiprocessing as mp",
167 "import multiprocessing as mp",
169 "def f(x):",
168 "def f(x):",
170 " return x",
169 " return x",
171 "pool = [mp.Process(target=f,args=(i,)) for i in range(%i)]" % np,
170 "pool = [mp.Process(target=f,args=(i,)) for i in range(%i)]" % np,
172 "for p in pool: p.start()",
171 "for p in pool: p.start()",
173 "for p in pool: p.join()"
172 "for p in pool: p.join()"
174 ])
173 ])
175
174
176 msg_id, content = execute(km=km, code=code)
175 msg_id, content = execute(km=km, code=code)
177 stdout, stderr = assemble_output(iopub)
176 stdout, stderr = assemble_output(iopub)
178 yield nt.assert_equal(stdout, '')
177 yield nt.assert_equal(stdout, '')
179 yield nt.assert_equal(stderr, '')
178 yield nt.assert_equal(stderr, '')
180
179
181 yield _check_mp_mode(km, expected=False)
180 yield _check_mp_mode(km, expected=False)
182 yield _check_mp_mode(km, expected=False, stream="stderr")
181 yield _check_mp_mode(km, expected=False, stream="stderr")
183
182
183 @dec.parametric
184 def test_subprocess_error():
185 """error in mp.Process doesn't crash"""
186 with new_kernel() as km:
187 iopub = km.sub_channel
188
189 code = '\n'.join([
190 "import multiprocessing as mp",
191 "def f():",
192 " return 1/0",
193 "p = mp.Process(target=f)",
194 "p.start()",
195 "p.join()",
196 ])
197
198 msg_id, content = execute(km=km, code=code)
199 stdout, stderr = assemble_output(iopub)
200 yield nt.assert_equal(stdout, '')
201 nt.assert_true("ZeroDivisionError" in stderr, stderr)
202
203 yield _check_mp_mode(km, expected=False)
204 yield _check_mp_mode(km, expected=True, stream="stderr")
205
General Comments 0
You need to be logged in to leave comments. Login now