##// END OF EJS Templates
subprocess iostreams only send on flush
MinRK -
Show More
@@ -1,239 +1,203 b''
1 import sys
1 import sys
2 import time
2 import time
3 import os
3 import os
4 import threading
4 import threading
5 import uuid
5 import uuid
6 from io import StringIO
6 from io import StringIO
7
7
8 import zmq
8 import zmq
9
9
10 from session import extract_header, Message
10 from session import extract_header, Message
11
11
12 from IPython.utils import io, text
12 from IPython.utils import io, text
13 from IPython.utils import py3compat
13 from IPython.utils import py3compat
14
14
15 import multiprocessing as mp
15 import multiprocessing as mp
16 # import multiprocessing.sharedctypes as mpshc
16 # import multiprocessing.sharedctypes as mpshc
17 from ctypes import c_bool
17 from ctypes import c_bool
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19 # Globals
19 # Globals
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21
21
22 MASTER_NO_CHILDREN = 0
22 MASTER_NO_CHILDREN = 0
23 MASTER_WITH_CHILDREN = 1
23 MASTER_WITH_CHILDREN = 1
24 CHILD = 2
24 CHILD = 2
25
25
26 #-----------------------------------------------------------------------------
26 #-----------------------------------------------------------------------------
27 # Stream classes
27 # Stream classes
28 #-----------------------------------------------------------------------------
28 #-----------------------------------------------------------------------------
29
29
30 class OutStream(object):
30 class OutStream(object):
31 """A file like object that publishes the stream to a 0MQ PUB socket."""
31 """A file like object that publishes the stream to a 0MQ PUB socket."""
32
32
33 # The time interval between automatic flushes, in seconds.
33 # The time interval between automatic flushes, in seconds.
34 flush_interval = 0.05
34 flush_interval = 0.05
35 topic=None
35 topic=None
36
36
37 def __init__(self, session, pub_socket, name):
37 def __init__(self, session, pub_socket, name):
38 self.encoding = 'UTF-8'
38 self.encoding = 'UTF-8'
39 self.session = session
39 self.session = session
40 self.pub_socket = pub_socket
40 self.pub_socket = pub_socket
41 self.name = name
41 self.name = name
42 self.parent_header = {}
42 self.parent_header = {}
43 self._new_buffer()
43 self._new_buffer()
44 self._found_newprocess = threading.Event()
44 self._found_newprocess = 0
45 self._buffer_lock = threading.Lock()
45 self._buffer_lock = threading.Lock()
46 self._master_pid = os.getpid()
46 self._master_pid = os.getpid()
47 self._master_thread = threading.current_thread().ident
47 self._master_thread = threading.current_thread().ident
48 self._pipe_pid = os.getpid()
48 self._pipe_pid = os.getpid()
49 self._setup_pipe_in()
49 self._setup_pipe_in()
50
50
51 def _setup_pipe_in(self):
51 def _setup_pipe_in(self):
52 """setup listening pipe for subprocesses"""
52 """setup listening pipe for subprocesses"""
53 ctx = self._pipe_ctx = zmq.Context()
53 ctx = self.pub_socket.context
54
55 # signal pair for terminating background thread
56 self._pipe_signaler = ctx.socket(zmq.PAIR)
57 self._pipe_signalee = ctx.socket(zmq.PAIR)
58 self._pipe_signaler.bind("inproc://ostream_pipe")
59 self._pipe_signalee.connect("inproc://ostream_pipe")
60 # thread event to signal cleanup is done
61 self._pipe_done = threading.Event()
62
54
63 # use UUID to authenticate pipe messages
55 # use UUID to authenticate pipe messages
64 self._pipe_uuid = uuid.uuid4().bytes
56 self._pipe_uuid = uuid.uuid4().bytes
65
57
66 self._pipe_thread = threading.Thread(target=self._pipe_main)
58 self._pipe_in = ctx.socket(zmq.PULL)
67 self._pipe_thread.start()
59 self._pipe_in.linger = 0
60 self._pipe_port = self._pipe_in.bind_to_random_port("tcp://127.0.0.1")
61 self._pipe_poller = zmq.Poller()
62 self._pipe_poller.register(self._pipe_in, zmq.POLLIN)
68
63
69 def _setup_pipe_out(self):
64 def _setup_pipe_out(self):
70 # must be new context after fork
65 # must be new context after fork
71 ctx = zmq.Context()
66 ctx = zmq.Context()
72 self._pipe_pid = os.getpid()
67 self._pipe_pid = os.getpid()
73 self._pipe_out = ctx.socket(zmq.PUSH)
68 self._pipe_out = ctx.socket(zmq.PUSH)
74 self._pipe_out_lock = threading.Lock()
69 self._pipe_out_lock = threading.Lock()
75 self._pipe_out.connect("tcp://127.0.0.1:%i" % self._pipe_port)
70 self._pipe_out.connect("tcp://127.0.0.1:%i" % self._pipe_port)
76
71
77 def _pipe_main(self):
78 """eventloop for receiving"""
79 ctx = self._pipe_ctx
80 self._pipe_in = ctx.socket(zmq.PULL)
81 self._pipe_port = self._pipe_in.bind_to_random_port("tcp://127.0.0.1")
82 poller = zmq.Poller()
83 poller.register(self._pipe_signalee, zmq.POLLIN)
84 poller.register(self._pipe_in, zmq.POLLIN)
85 while True:
86 if not self._is_master_process():
87 return
88 try:
89 events = dict(poller.poll(1000))
90 except zmq.ZMQError:
91 # should only be triggered by process-ending cleanup
92 return
93
94 if self._pipe_signalee in events:
95 break
96 if self._pipe_in in events:
97 msg = self._pipe_in.recv_multipart()
98 if msg[0] != self._pipe_uuid:
99 # message not authenticated
100 continue
101 self._found_newprocess.set()
102 text = msg[1].decode(self.encoding, 'replace')
103 with self._buffer_lock:
104 self._buffer.write(text)
105 if self._start < 0:
106 self._start = time.time()
107
108 # wrap it up
109 self._pipe_signaler.close()
110 self._pipe_signalee.close()
111 self._pipe_in.close()
112 self._pipe_ctx.term()
113 self._pipe_done.set()
114
115
116 def __del__(self):
117 if not self._is_master_process():
118 return
119 self._pipe_signaler.send(b'die')
120 self._pipe_done.wait(10)
121
122 def _is_master_process(self):
72 def _is_master_process(self):
123 return os.getpid() == self._master_pid
73 return os.getpid() == self._master_pid
124
74
125 def _is_master_thread(self):
75 def _is_master_thread(self):
126 return threading.current_thread().ident == self._master_thread
76 return threading.current_thread().ident == self._master_thread
127
77
128 def _have_pipe_out(self):
78 def _have_pipe_out(self):
129 return os.getpid() == self._pipe_pid
79 return os.getpid() == self._pipe_pid
130
80
131 def _check_mp_mode(self):
81 def _check_mp_mode(self):
132 """check for forks, and switch to zmq pipeline if necessary"""
82 """check for forks, and switch to zmq pipeline if necessary"""
133 if self._is_master_process():
83 if self._is_master_process():
134 if self._found_newprocess.is_set():
84 if self._found_newprocess:
135 return MASTER_WITH_CHILDREN
85 return MASTER_WITH_CHILDREN
136 else:
86 else:
137 return MASTER_NO_CHILDREN
87 return MASTER_NO_CHILDREN
138 else:
88 else:
139 if not self._have_pipe_out():
89 if not self._have_pipe_out():
140 # setup a new out pipe
90 # setup a new out pipe
141 self._setup_pipe_out()
91 self._setup_pipe_out()
142 return CHILD
92 return CHILD
143
93
144 def set_parent(self, parent):
94 def set_parent(self, parent):
145 self.parent_header = extract_header(parent)
95 self.parent_header = extract_header(parent)
146
96
147 def close(self):
97 def close(self):
148 self.pub_socket = None
98 self.pub_socket = None
149
99
100 def _flush_from_subprocesses(self):
101 """flush possible pub data from subprocesses into my buffer"""
102 if not self._is_master_process():
103 return
104 for i in range(100):
105 if self._pipe_poller.poll(0):
106 msg = self._pipe_in.recv_multipart()
107 if msg[0] != self._pipe_uuid:
108 continue
109 else:
110 self._buffer.write(msg[1].decode(self.encoding, 'replace'))
111 # this always means a flush,
112 # so reset our timer
113 self._start = 0
114 else:
115 break
116
150 def flush(self):
117 def flush(self):
151 """trigger actual zmq send"""
118 """trigger actual zmq send"""
152 if self.pub_socket is None:
119 if self.pub_socket is None:
153 raise ValueError(u'I/O operation on closed file')
120 raise ValueError(u'I/O operation on closed file')
121
122 mp_mode = self._check_mp_mode()
123
124 if mp_mode != CHILD:
125 # we are master
126 if not self._is_master_thread():
127 # sub-threads must not trigger flush,
128 # but at least they can force the timer.
129 self._start = 0
130 return
131
132 self._flush_from_subprocesses()
133 data = self._flush_buffer()
134
135 if data:
136 content = {u'name':self.name, u'data':data}
137 msg = self.session.send(self.pub_socket, u'stream', content=content,
138 parent=self.parent_header, ident=self.topic)
139
140 if hasattr(self.pub_socket, 'flush'):
141 # socket itself has flush (presumably ZMQStream)
142 self.pub_socket.flush()
154 else:
143 else:
155 if self._is_master_process():
144 with self._pipe_out_lock:
156 if not self._is_master_thread():
145 string = self._flush_buffer()
157 # sub-threads mustn't trigger flush,
146 tracker = self._pipe_out.send_multipart([
158 # but at least they can force the timer.
147 self._pipe_uuid,
159 self._start = 0
148 string.encode(self.encoding, 'replace'),
160 data = u''
149 ], copy=False, track=True)
161 # obtain data
150 tracker.wait(1)
162 if self._check_mp_mode(): # multiprocess, needs a lock
163 with self._buffer_lock:
164 data = self._buffer.getvalue()
165 self._buffer.close()
166 self._new_buffer()
167 else: # single process mode
168 data = self._buffer.getvalue()
169 self._buffer.close()
170 self._new_buffer()
171
172 if data:
173 content = {u'name':self.name, u'data':data}
174 msg = self.session.send(self.pub_socket, u'stream', content=content,
175 parent=self.parent_header, ident=self.topic)
176
177 if hasattr(self.pub_socket, 'flush'):
178 # socket itself has flush (presumably ZMQStream)
179 self.pub_socket.flush()
180 else:
181 self._check_mp_mode()
182 with self._pipe_out_lock:
183 tracker = self._pipe_out.send(b'', copy=False, track=True)
184 tracker.wait(1)
185
186
151
187 def isatty(self):
152 def isatty(self):
188 return False
153 return False
189
154
190 def __next__(self):
155 def __next__(self):
191 raise IOError('Read not supported on a write only stream.')
156 raise IOError('Read not supported on a write only stream.')
192
157
193 if not py3compat.PY3:
158 if not py3compat.PY3:
194 next = __next__
159 next = __next__
195
160
196 def read(self, size=-1):
161 def read(self, size=-1):
197 raise IOError('Read not supported on a write only stream.')
162 raise IOError('Read not supported on a write only stream.')
198
163
199 def readline(self, size=-1):
164 def readline(self, size=-1):
200 raise IOError('Read not supported on a write only stream.')
165 raise IOError('Read not supported on a write only stream.')
201
166
202 def write(self, string):
167 def write(self, string):
203 if self.pub_socket is None:
168 if self.pub_socket is None:
204 raise ValueError('I/O operation on closed file')
169 raise ValueError('I/O operation on closed file')
205 else:
170 else:
206 # Make sure that we're handling unicode
171 # Make sure that we're handling unicode
207 if not isinstance(string, unicode):
172 if not isinstance(string, unicode):
208 string = string.decode(self.encoding, 'replace')
173 string = string.decode(self.encoding, 'replace')
209
174 self._buffer.write(string)
210 mp_mode = self._check_mp_mode()
175 self._check_mp_mode()
211 if mp_mode == CHILD:
176 # do we want to check subprocess flushes on write?
212 with self._pipe_out_lock:
177 # self._flush_from_subprocesses()
213 self._pipe_out.send_multipart([
214 self._pipe_uuid,
215 string.encode(self.encoding, 'replace'),
216 ])
217 return
218 elif mp_mode == MASTER_NO_CHILDREN:
219 self._buffer.write(string)
220 elif mp_mode == MASTER_WITH_CHILDREN:
221 with self._buffer_lock:
222 self._buffer.write(string)
223
224 current_time = time.time()
178 current_time = time.time()
225 if self._start < 0:
179 if self._start < 0:
226 self._start = current_time
180 self._start = current_time
227 elif current_time - self._start > self.flush_interval:
181 elif current_time - self._start > self.flush_interval:
228 self.flush()
182 self.flush()
229
183
230 def writelines(self, sequence):
184 def writelines(self, sequence):
231 if self.pub_socket is None:
185 if self.pub_socket is None:
232 raise ValueError('I/O operation on closed file')
186 raise ValueError('I/O operation on closed file')
233 else:
187 else:
234 for string in sequence:
188 for string in sequence:
235 self.write(string)
189 self.write(string)
236
190
191 def _flush_buffer(self):
192 """clear the current buffer and return the current buffer data"""
193 data = u''
194 if self._buffer is not None:
195 data = self._buffer.getvalue()
196 self._buffer.close()
197 self._new_buffer()
198 return data
199
237 def _new_buffer(self):
200 def _new_buffer(self):
238 self._buffer = StringIO()
201 self._buffer = StringIO()
239 self._start = -1
202 self._start = -1
203
@@ -1,203 +1,203 b''
1 """test the IPython Kernel"""
1 """test the IPython Kernel"""
2
2
3 #-------------------------------------------------------------------------------
3 #-------------------------------------------------------------------------------
4 # Copyright (C) 2013 The IPython Development Team
4 # Copyright (C) 2013 The IPython Development Team
5 #
5 #
6 # Distributed under the terms of the BSD License. The full license is in
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING, distributed as part of this software.
7 # the file COPYING, distributed as part of this software.
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9
9
10 #-------------------------------------------------------------------------------
10 #-------------------------------------------------------------------------------
11 # Imports
11 # Imports
12 #-------------------------------------------------------------------------------
12 #-------------------------------------------------------------------------------
13
13
14 import os
14 import os
15 import shutil
15 import shutil
16 import tempfile
16 import tempfile
17
17
18 from Queue import Empty
18 from Queue import Empty
19 from contextlib import contextmanager
19 from contextlib import contextmanager
20 from subprocess import PIPE
20 from subprocess import PIPE
21
21
22 import nose.tools as nt
22 import nose.tools as nt
23
23
24 from IPython.zmq.blockingkernelmanager import BlockingKernelManager
24 from IPython.zmq.blockingkernelmanager import BlockingKernelManager
25 from IPython.zmq.tests.test_message_spec import execute, flush_channels
25 from IPython.zmq.tests.test_message_spec import execute, flush_channels
26 from IPython.testing import decorators as dec
26 from IPython.testing import decorators as dec
27 from IPython.utils import path, py3compat
27 from IPython.utils import path, py3compat
28
28
29 #-------------------------------------------------------------------------------
29 #-------------------------------------------------------------------------------
30 # Tests
30 # Tests
31 #-------------------------------------------------------------------------------
31 #-------------------------------------------------------------------------------
32
32
33 def setup():
33 def setup():
34 """setup temporary IPYTHONDIR for tests"""
34 """setup temporary IPYTHONDIR for tests"""
35 global IPYTHONDIR
35 global IPYTHONDIR
36 global save_env
36 global save_env
37 global save_get_ipython_dir
37 global save_get_ipython_dir
38
38
39 IPYTHONDIR = tempfile.mkdtemp()
39 IPYTHONDIR = tempfile.mkdtemp()
40
40
41 save_env = os.environ.copy()
41 save_env = os.environ.copy()
42 os.environ["IPYTHONDIR"] = IPYTHONDIR
42 os.environ["IPYTHONDIR"] = IPYTHONDIR
43
43
44 save_get_ipython_dir = path.get_ipython_dir
44 save_get_ipython_dir = path.get_ipython_dir
45 path.get_ipython_dir = lambda : IPYTHONDIR
45 path.get_ipython_dir = lambda : IPYTHONDIR
46
46
47
47
48 def teardown():
48 def teardown():
49 path.get_ipython_dir = save_get_ipython_dir
49 path.get_ipython_dir = save_get_ipython_dir
50 os.environ = save_env
50 os.environ = save_env
51
51
52 try:
52 try:
53 shutil.rmtree(IPYTHONDIR)
53 shutil.rmtree(IPYTHONDIR)
54 except (OSError, IOError):
54 except (OSError, IOError):
55 # no such file
55 # no such file
56 pass
56 pass
57
57
58
58
59 @contextmanager
59 @contextmanager
60 def new_kernel():
60 def new_kernel():
61 """start a kernel in a subprocess, and wait for it to be ready
61 """start a kernel in a subprocess, and wait for it to be ready
62
62
63 Returns
63 Returns
64 -------
64 -------
65 kernel_manager: connected KernelManager instance
65 kernel_manager: connected KernelManager instance
66 """
66 """
67 KM = BlockingKernelManager()
67 KM = BlockingKernelManager()
68
68
69 KM.start_kernel(stdout=PIPE, stderr=PIPE)
69 KM.start_kernel(stdout=PIPE, stderr=PIPE)
70 KM.start_channels()
70 KM.start_channels()
71
71
72 # wait for kernel to be ready
72 # wait for kernel to be ready
73 KM.shell_channel.execute("import sys")
73 KM.shell_channel.execute("import sys")
74 KM.shell_channel.get_msg(block=True, timeout=5)
74 KM.shell_channel.get_msg(block=True, timeout=5)
75 flush_channels(KM)
75 flush_channels(KM)
76 try:
76 try:
77 yield KM
77 yield KM
78 finally:
78 finally:
79 KM.stop_channels()
79 KM.stop_channels()
80 KM.shutdown_kernel()
80 KM.shutdown_kernel()
81
81
82
82
83 def assemble_output(iopub):
83 def assemble_output(iopub):
84 """assemble stdout/err from an execution"""
84 """assemble stdout/err from an execution"""
85 stdout = ''
85 stdout = ''
86 stderr = ''
86 stderr = ''
87 while True:
87 while True:
88 msg = iopub.get_msg(block=True, timeout=1)
88 msg = iopub.get_msg(block=True, timeout=1)
89 msg_type = msg['msg_type']
89 msg_type = msg['msg_type']
90 content = msg['content']
90 content = msg['content']
91 if msg_type == 'status' and content['execution_state'] == 'idle':
91 if msg_type == 'status' and content['execution_state'] == 'idle':
92 # idle message signals end of output
92 # idle message signals end of output
93 break
93 break
94 elif msg['msg_type'] == 'stream':
94 elif msg['msg_type'] == 'stream':
95 if content['name'] == 'stdout':
95 if content['name'] == 'stdout':
96 stdout = stdout + content['data']
96 stdout = stdout + content['data']
97 elif content['name'] == 'stderr':
97 elif content['name'] == 'stderr':
98 stderr = stderr + content['data']
98 stderr = stderr + content['data']
99 else:
99 else:
100 raise KeyError("bad stream: %r" % content['name'])
100 raise KeyError("bad stream: %r" % content['name'])
101 else:
101 else:
102 # other output, ignored
102 # other output, ignored
103 pass
103 pass
104 return stdout, stderr
104 return stdout, stderr
105
105
106
106
107 def _check_mp_mode(km, expected=False, stream="stdout"):
107 def _check_mp_mode(km, expected=False, stream="stdout"):
108 execute(km=km, code="import sys")
108 execute(km=km, code="import sys")
109 flush_channels(km)
109 flush_channels(km)
110 msg_id, content = execute(km=km, code="print (sys.%s._check_mp_mode())" % stream)
110 msg_id, content = execute(km=km, code="print (sys.%s._check_mp_mode())" % stream)
111 stdout, stderr = assemble_output(km.sub_channel)
111 stdout, stderr = assemble_output(km.sub_channel)
112 nt.assert_equal(eval(stdout.strip()), expected)
112 nt.assert_equal(eval(stdout.strip()), expected)
113
113
114
114
115 def test_simple_print():
115 def test_simple_print():
116 """simple print statement in kernel"""
116 """simple print statement in kernel"""
117 with new_kernel() as km:
117 with new_kernel() as km:
118 iopub = km.sub_channel
118 iopub = km.sub_channel
119 msg_id, content = execute(km=km, code="print ('hi')")
119 msg_id, content = execute(km=km, code="print ('hi')")
120 stdout, stderr = assemble_output(iopub)
120 stdout, stderr = assemble_output(iopub)
121 nt.assert_equal(stdout, 'hi\n')
121 nt.assert_equal(stdout, 'hi\n')
122 nt.assert_equal(stderr, '')
122 nt.assert_equal(stderr, '')
123 _check_mp_mode(km, expected=False)
123 _check_mp_mode(km, expected=False)
124 print ('hello')
124 print ('hello')
125
125
126
126
127 def test_subprocess_print():
127 def test_subprocess_print():
128 """printing from forked mp.Process"""
128 """printing from forked mp.Process"""
129 with new_kernel() as km:
129 with new_kernel() as km:
130 iopub = km.sub_channel
130 iopub = km.sub_channel
131
131
132 _check_mp_mode(km, expected=False)
132 _check_mp_mode(km, expected=False)
133 flush_channels(km)
133 flush_channels(km)
134 np = 5
134 np = 5
135 code = '\n'.join([
135 code = '\n'.join([
136 "import multiprocessing as mp",
136 "import multiprocessing as mp",
137 "def f(x):",
137 "def f(x):",
138 " print('hello',x)",
138 " print('hello',x)",
139 "pool = [mp.Process(target=f,args=(i,)) for i in range(%i)]" % np,
139 "pool = [mp.Process(target=f,args=(i,)) for i in range(%i)]" % np,
140 "for p in pool: p.start()",
140 "for p in pool: p.start()",
141 "for p in pool: p.join()"
141 "for p in pool: p.join()"
142 ])
142 ])
143
143
144 expected = '\n'.join([
144 expected = '\n'.join([
145 "hello %s" % i for i in range(np)
145 "hello %s" % i for i in range(np)
146 ]) + '\n'
146 ]) + '\n'
147
147
148 msg_id, content = execute(km=km, code=code)
148 msg_id, content = execute(km=km, code=code)
149 stdout, stderr = assemble_output(iopub)
149 stdout, stderr = assemble_output(iopub)
150 nt.assert_equal(stdout.count("hello"), np, stdout)
150 nt.assert_equal(stdout.count("hello"), np, stdout)
151 for n in range(np):
151 for n in range(np):
152 nt.assert_equal(stdout.count(str(n)), 1, stdout)
152 nt.assert_equal(stdout.count(str(n)), 1, stdout)
153 nt.assert_equal(stderr, '')
153 nt.assert_equal(stderr, '')
154 _check_mp_mode(km, expected=True)
154 _check_mp_mode(km, expected=False)
155 _check_mp_mode(km, expected=False, stream="stderr")
155 _check_mp_mode(km, expected=False, stream="stderr")
156
156
157
157
158 def test_subprocess_noprint():
158 def test_subprocess_noprint():
159 """mp.Process without print doesn't trigger iostream mp_mode"""
159 """mp.Process without print doesn't trigger iostream mp_mode"""
160 with new_kernel() as km:
160 with new_kernel() as km:
161 iopub = km.sub_channel
161 iopub = km.sub_channel
162
162
163 np = 5
163 np = 5
164 code = '\n'.join([
164 code = '\n'.join([
165 "import multiprocessing as mp",
165 "import multiprocessing as mp",
166 "def f(x):",
166 "def f(x):",
167 " return x",
167 " return x",
168 "pool = [mp.Process(target=f,args=(i,)) for i in range(%i)]" % np,
168 "pool = [mp.Process(target=f,args=(i,)) for i in range(%i)]" % np,
169 "for p in pool: p.start()",
169 "for p in pool: p.start()",
170 "for p in pool: p.join()"
170 "for p in pool: p.join()"
171 ])
171 ])
172
172
173 msg_id, content = execute(km=km, code=code)
173 msg_id, content = execute(km=km, code=code)
174 stdout, stderr = assemble_output(iopub)
174 stdout, stderr = assemble_output(iopub)
175 nt.assert_equal(stdout, '')
175 nt.assert_equal(stdout, '')
176 nt.assert_equal(stderr, '')
176 nt.assert_equal(stderr, '')
177
177
178 _check_mp_mode(km, expected=False)
178 _check_mp_mode(km, expected=False)
179 _check_mp_mode(km, expected=False, stream="stderr")
179 _check_mp_mode(km, expected=False, stream="stderr")
180
180
181
181
182 def test_subprocess_error():
182 def test_subprocess_error():
183 """error in mp.Process doesn't crash"""
183 """error in mp.Process doesn't crash"""
184 with new_kernel() as km:
184 with new_kernel() as km:
185 iopub = km.sub_channel
185 iopub = km.sub_channel
186
186
187 code = '\n'.join([
187 code = '\n'.join([
188 "import multiprocessing as mp",
188 "import multiprocessing as mp",
189 "def f():",
189 "def f():",
190 " return 1/0",
190 " return 1/0",
191 "p = mp.Process(target=f)",
191 "p = mp.Process(target=f)",
192 "p.start()",
192 "p.start()",
193 "p.join()",
193 "p.join()",
194 ])
194 ])
195
195
196 msg_id, content = execute(km=km, code=code)
196 msg_id, content = execute(km=km, code=code)
197 stdout, stderr = assemble_output(iopub)
197 stdout, stderr = assemble_output(iopub)
198 nt.assert_equal(stdout, '')
198 nt.assert_equal(stdout, '')
199 nt.assert_true("ZeroDivisionError" in stderr, stderr)
199 nt.assert_true("ZeroDivisionError" in stderr, stderr)
200
200
201 _check_mp_mode(km, expected=False)
201 _check_mp_mode(km, expected=False)
202 _check_mp_mode(km, expected=True, stream="stderr")
202 _check_mp_mode(km, expected=False, stream="stderr")
203
203
General Comments 0
You need to be logged in to leave comments. Login now