##// END OF EJS Templates
subprocess iostreams only send on flush
MinRK -
Show More
@@ -1,239 +1,203 b''
1 1 import sys
2 2 import time
3 3 import os
4 4 import threading
5 5 import uuid
6 6 from io import StringIO
7 7
8 8 import zmq
9 9
10 10 from session import extract_header, Message
11 11
12 12 from IPython.utils import io, text
13 13 from IPython.utils import py3compat
14 14
15 15 import multiprocessing as mp
16 16 # import multiprocessing.sharedctypes as mpshc
17 17 from ctypes import c_bool
18 18 #-----------------------------------------------------------------------------
19 19 # Globals
20 20 #-----------------------------------------------------------------------------
21 21
22 22 MASTER_NO_CHILDREN = 0
23 23 MASTER_WITH_CHILDREN = 1
24 24 CHILD = 2
25 25
26 26 #-----------------------------------------------------------------------------
27 27 # Stream classes
28 28 #-----------------------------------------------------------------------------
29 29
30 30 class OutStream(object):
31 31 """A file like object that publishes the stream to a 0MQ PUB socket."""
32 32
33 33 # The time interval between automatic flushes, in seconds.
34 34 flush_interval = 0.05
35 35 topic=None
36 36
37 37 def __init__(self, session, pub_socket, name):
38 38 self.encoding = 'UTF-8'
39 39 self.session = session
40 40 self.pub_socket = pub_socket
41 41 self.name = name
42 42 self.parent_header = {}
43 43 self._new_buffer()
44 self._found_newprocess = threading.Event()
44 self._found_newprocess = 0
45 45 self._buffer_lock = threading.Lock()
46 46 self._master_pid = os.getpid()
47 47 self._master_thread = threading.current_thread().ident
48 48 self._pipe_pid = os.getpid()
49 49 self._setup_pipe_in()
50 50
51 51 def _setup_pipe_in(self):
52 52 """setup listening pipe for subprocesses"""
53 ctx = self._pipe_ctx = zmq.Context()
54
55 # signal pair for terminating background thread
56 self._pipe_signaler = ctx.socket(zmq.PAIR)
57 self._pipe_signalee = ctx.socket(zmq.PAIR)
58 self._pipe_signaler.bind("inproc://ostream_pipe")
59 self._pipe_signalee.connect("inproc://ostream_pipe")
60 # thread event to signal cleanup is done
61 self._pipe_done = threading.Event()
53 ctx = self.pub_socket.context
62 54
63 55 # use UUID to authenticate pipe messages
64 56 self._pipe_uuid = uuid.uuid4().bytes
65 57
66 self._pipe_thread = threading.Thread(target=self._pipe_main)
67 self._pipe_thread.start()
58 self._pipe_in = ctx.socket(zmq.PULL)
59 self._pipe_in.linger = 0
60 self._pipe_port = self._pipe_in.bind_to_random_port("tcp://127.0.0.1")
61 self._pipe_poller = zmq.Poller()
62 self._pipe_poller.register(self._pipe_in, zmq.POLLIN)
68 63
69 64 def _setup_pipe_out(self):
70 65 # must be new context after fork
71 66 ctx = zmq.Context()
72 67 self._pipe_pid = os.getpid()
73 68 self._pipe_out = ctx.socket(zmq.PUSH)
74 69 self._pipe_out_lock = threading.Lock()
75 70 self._pipe_out.connect("tcp://127.0.0.1:%i" % self._pipe_port)
76 71
77 def _pipe_main(self):
78 """eventloop for receiving"""
79 ctx = self._pipe_ctx
80 self._pipe_in = ctx.socket(zmq.PULL)
81 self._pipe_port = self._pipe_in.bind_to_random_port("tcp://127.0.0.1")
82 poller = zmq.Poller()
83 poller.register(self._pipe_signalee, zmq.POLLIN)
84 poller.register(self._pipe_in, zmq.POLLIN)
85 while True:
86 if not self._is_master_process():
87 return
88 try:
89 events = dict(poller.poll(1000))
90 except zmq.ZMQError:
91 # should only be triggered by process-ending cleanup
92 return
93
94 if self._pipe_signalee in events:
95 break
96 if self._pipe_in in events:
97 msg = self._pipe_in.recv_multipart()
98 if msg[0] != self._pipe_uuid:
99 # message not authenticated
100 continue
101 self._found_newprocess.set()
102 text = msg[1].decode(self.encoding, 'replace')
103 with self._buffer_lock:
104 self._buffer.write(text)
105 if self._start < 0:
106 self._start = time.time()
107
108 # wrap it up
109 self._pipe_signaler.close()
110 self._pipe_signalee.close()
111 self._pipe_in.close()
112 self._pipe_ctx.term()
113 self._pipe_done.set()
114
115
116 def __del__(self):
117 if not self._is_master_process():
118 return
119 self._pipe_signaler.send(b'die')
120 self._pipe_done.wait(10)
121
122 72 def _is_master_process(self):
123 73 return os.getpid() == self._master_pid
124 74
125 75 def _is_master_thread(self):
126 76 return threading.current_thread().ident == self._master_thread
127 77
128 78 def _have_pipe_out(self):
129 79 return os.getpid() == self._pipe_pid
130 80
131 81 def _check_mp_mode(self):
132 82 """check for forks, and switch to zmq pipeline if necessary"""
133 83 if self._is_master_process():
134 if self._found_newprocess.is_set():
84 if self._found_newprocess:
135 85 return MASTER_WITH_CHILDREN
136 86 else:
137 87 return MASTER_NO_CHILDREN
138 88 else:
139 89 if not self._have_pipe_out():
140 90 # setup a new out pipe
141 91 self._setup_pipe_out()
142 92 return CHILD
143 93
144 94 def set_parent(self, parent):
145 95 self.parent_header = extract_header(parent)
146 96
147 97 def close(self):
148 98 self.pub_socket = None
149 99
100 def _flush_from_subprocesses(self):
101 """flush possible pub data from subprocesses into my buffer"""
102 if not self._is_master_process():
103 return
104 for i in range(100):
105 if self._pipe_poller.poll(0):
106 msg = self._pipe_in.recv_multipart()
107 if msg[0] != self._pipe_uuid:
108 continue
109 else:
110 self._buffer.write(msg[1].decode(self.encoding, 'replace'))
111 # this always means a flush,
112 # so reset our timer
113 self._start = 0
114 else:
115 break
116
150 117 def flush(self):
151 118 """trigger actual zmq send"""
152 119 if self.pub_socket is None:
153 120 raise ValueError(u'I/O operation on closed file')
121
122 mp_mode = self._check_mp_mode()
123
124 if mp_mode != CHILD:
125 # we are master
126 if not self._is_master_thread():
127 # sub-threads must not trigger flush,
128 # but at least they can force the timer.
129 self._start = 0
130 return
131
132 self._flush_from_subprocesses()
133 data = self._flush_buffer()
134
135 if data:
136 content = {u'name':self.name, u'data':data}
137 msg = self.session.send(self.pub_socket, u'stream', content=content,
138 parent=self.parent_header, ident=self.topic)
139
140 if hasattr(self.pub_socket, 'flush'):
141 # socket itself has flush (presumably ZMQStream)
142 self.pub_socket.flush()
154 143 else:
155 if self._is_master_process():
156 if not self._is_master_thread():
157 # sub-threads mustn't trigger flush,
158 # but at least they can force the timer.
159 self._start = 0
160 data = u''
161 # obtain data
162 if self._check_mp_mode(): # multiprocess, needs a lock
163 with self._buffer_lock:
164 data = self._buffer.getvalue()
165 self._buffer.close()
166 self._new_buffer()
167 else: # single process mode
168 data = self._buffer.getvalue()
169 self._buffer.close()
170 self._new_buffer()
171
172 if data:
173 content = {u'name':self.name, u'data':data}
174 msg = self.session.send(self.pub_socket, u'stream', content=content,
175 parent=self.parent_header, ident=self.topic)
176
177 if hasattr(self.pub_socket, 'flush'):
178 # socket itself has flush (presumably ZMQStream)
179 self.pub_socket.flush()
180 else:
181 self._check_mp_mode()
182 with self._pipe_out_lock:
183 tracker = self._pipe_out.send(b'', copy=False, track=True)
184 tracker.wait(1)
185
144 with self._pipe_out_lock:
145 string = self._flush_buffer()
146 tracker = self._pipe_out.send_multipart([
147 self._pipe_uuid,
148 string.encode(self.encoding, 'replace'),
149 ], copy=False, track=True)
150 tracker.wait(1)
186 151
187 152 def isatty(self):
188 153 return False
189 154
190 155 def __next__(self):
191 156 raise IOError('Read not supported on a write only stream.')
192 157
193 158 if not py3compat.PY3:
194 159 next = __next__
195 160
196 161 def read(self, size=-1):
197 162 raise IOError('Read not supported on a write only stream.')
198 163
199 164 def readline(self, size=-1):
200 165 raise IOError('Read not supported on a write only stream.')
201 166
202 167 def write(self, string):
203 168 if self.pub_socket is None:
204 169 raise ValueError('I/O operation on closed file')
205 170 else:
206 171 # Make sure that we're handling unicode
207 172 if not isinstance(string, unicode):
208 173 string = string.decode(self.encoding, 'replace')
209
210 mp_mode = self._check_mp_mode()
211 if mp_mode == CHILD:
212 with self._pipe_out_lock:
213 self._pipe_out.send_multipart([
214 self._pipe_uuid,
215 string.encode(self.encoding, 'replace'),
216 ])
217 return
218 elif mp_mode == MASTER_NO_CHILDREN:
219 self._buffer.write(string)
220 elif mp_mode == MASTER_WITH_CHILDREN:
221 with self._buffer_lock:
222 self._buffer.write(string)
223
174 self._buffer.write(string)
175 self._check_mp_mode()
176 # do we want to check subprocess flushes on write?
177 # self._flush_from_subprocesses()
224 178 current_time = time.time()
225 179 if self._start < 0:
226 180 self._start = current_time
227 181 elif current_time - self._start > self.flush_interval:
228 182 self.flush()
229 183
230 184 def writelines(self, sequence):
231 185 if self.pub_socket is None:
232 186 raise ValueError('I/O operation on closed file')
233 187 else:
234 188 for string in sequence:
235 189 self.write(string)
236 190
191 def _flush_buffer(self):
192 """clear the current buffer and return the current buffer data"""
193 data = u''
194 if self._buffer is not None:
195 data = self._buffer.getvalue()
196 self._buffer.close()
197 self._new_buffer()
198 return data
199
237 200 def _new_buffer(self):
238 201 self._buffer = StringIO()
239 202 self._start = -1
203
@@ -1,203 +1,203 b''
1 1 """test the IPython Kernel"""
2 2
3 3 #-------------------------------------------------------------------------------
4 4 # Copyright (C) 2013 The IPython Development Team
5 5 #
6 6 # Distributed under the terms of the BSD License. The full license is in
7 7 # the file COPYING, distributed as part of this software.
8 8 #-------------------------------------------------------------------------------
9 9
10 10 #-------------------------------------------------------------------------------
11 11 # Imports
12 12 #-------------------------------------------------------------------------------
13 13
14 14 import os
15 15 import shutil
16 16 import tempfile
17 17
18 18 from Queue import Empty
19 19 from contextlib import contextmanager
20 20 from subprocess import PIPE
21 21
22 22 import nose.tools as nt
23 23
24 24 from IPython.zmq.blockingkernelmanager import BlockingKernelManager
25 25 from IPython.zmq.tests.test_message_spec import execute, flush_channels
26 26 from IPython.testing import decorators as dec
27 27 from IPython.utils import path, py3compat
28 28
29 29 #-------------------------------------------------------------------------------
30 30 # Tests
31 31 #-------------------------------------------------------------------------------
32 32
33 33 def setup():
34 34 """setup temporary IPYTHONDIR for tests"""
35 35 global IPYTHONDIR
36 36 global save_env
37 37 global save_get_ipython_dir
38 38
39 39 IPYTHONDIR = tempfile.mkdtemp()
40 40
41 41 save_env = os.environ.copy()
42 42 os.environ["IPYTHONDIR"] = IPYTHONDIR
43 43
44 44 save_get_ipython_dir = path.get_ipython_dir
45 45 path.get_ipython_dir = lambda : IPYTHONDIR
46 46
47 47
48 48 def teardown():
49 49 path.get_ipython_dir = save_get_ipython_dir
50 50 os.environ = save_env
51 51
52 52 try:
53 53 shutil.rmtree(IPYTHONDIR)
54 54 except (OSError, IOError):
55 55 # no such file
56 56 pass
57 57
58 58
59 59 @contextmanager
60 60 def new_kernel():
61 61 """start a kernel in a subprocess, and wait for it to be ready
62 62
63 63 Returns
64 64 -------
65 65 kernel_manager: connected KernelManager instance
66 66 """
67 67 KM = BlockingKernelManager()
68 68
69 69 KM.start_kernel(stdout=PIPE, stderr=PIPE)
70 70 KM.start_channels()
71 71
72 72 # wait for kernel to be ready
73 73 KM.shell_channel.execute("import sys")
74 74 KM.shell_channel.get_msg(block=True, timeout=5)
75 75 flush_channels(KM)
76 76 try:
77 77 yield KM
78 78 finally:
79 79 KM.stop_channels()
80 80 KM.shutdown_kernel()
81 81
82 82
83 83 def assemble_output(iopub):
84 84 """assemble stdout/err from an execution"""
85 85 stdout = ''
86 86 stderr = ''
87 87 while True:
88 88 msg = iopub.get_msg(block=True, timeout=1)
89 89 msg_type = msg['msg_type']
90 90 content = msg['content']
91 91 if msg_type == 'status' and content['execution_state'] == 'idle':
92 92 # idle message signals end of output
93 93 break
94 94 elif msg['msg_type'] == 'stream':
95 95 if content['name'] == 'stdout':
96 96 stdout = stdout + content['data']
97 97 elif content['name'] == 'stderr':
98 98 stderr = stderr + content['data']
99 99 else:
100 100 raise KeyError("bad stream: %r" % content['name'])
101 101 else:
102 102 # other output, ignored
103 103 pass
104 104 return stdout, stderr
105 105
106 106
107 107 def _check_mp_mode(km, expected=False, stream="stdout"):
108 108 execute(km=km, code="import sys")
109 109 flush_channels(km)
110 110 msg_id, content = execute(km=km, code="print (sys.%s._check_mp_mode())" % stream)
111 111 stdout, stderr = assemble_output(km.sub_channel)
112 112 nt.assert_equal(eval(stdout.strip()), expected)
113 113
114 114
115 115 def test_simple_print():
116 116 """simple print statement in kernel"""
117 117 with new_kernel() as km:
118 118 iopub = km.sub_channel
119 119 msg_id, content = execute(km=km, code="print ('hi')")
120 120 stdout, stderr = assemble_output(iopub)
121 121 nt.assert_equal(stdout, 'hi\n')
122 122 nt.assert_equal(stderr, '')
123 123 _check_mp_mode(km, expected=False)
124 124 print ('hello')
125 125
126 126
127 127 def test_subprocess_print():
128 128 """printing from forked mp.Process"""
129 129 with new_kernel() as km:
130 130 iopub = km.sub_channel
131 131
132 132 _check_mp_mode(km, expected=False)
133 133 flush_channels(km)
134 134 np = 5
135 135 code = '\n'.join([
136 136 "import multiprocessing as mp",
137 137 "def f(x):",
138 138 " print('hello',x)",
139 139 "pool = [mp.Process(target=f,args=(i,)) for i in range(%i)]" % np,
140 140 "for p in pool: p.start()",
141 141 "for p in pool: p.join()"
142 142 ])
143 143
144 144 expected = '\n'.join([
145 145 "hello %s" % i for i in range(np)
146 146 ]) + '\n'
147 147
148 148 msg_id, content = execute(km=km, code=code)
149 149 stdout, stderr = assemble_output(iopub)
150 150 nt.assert_equal(stdout.count("hello"), np, stdout)
151 151 for n in range(np):
152 152 nt.assert_equal(stdout.count(str(n)), 1, stdout)
153 153 nt.assert_equal(stderr, '')
154 _check_mp_mode(km, expected=True)
154 _check_mp_mode(km, expected=False)
155 155 _check_mp_mode(km, expected=False, stream="stderr")
156 156
157 157
158 158 def test_subprocess_noprint():
159 159 """mp.Process without print doesn't trigger iostream mp_mode"""
160 160 with new_kernel() as km:
161 161 iopub = km.sub_channel
162 162
163 163 np = 5
164 164 code = '\n'.join([
165 165 "import multiprocessing as mp",
166 166 "def f(x):",
167 167 " return x",
168 168 "pool = [mp.Process(target=f,args=(i,)) for i in range(%i)]" % np,
169 169 "for p in pool: p.start()",
170 170 "for p in pool: p.join()"
171 171 ])
172 172
173 173 msg_id, content = execute(km=km, code=code)
174 174 stdout, stderr = assemble_output(iopub)
175 175 nt.assert_equal(stdout, '')
176 176 nt.assert_equal(stderr, '')
177 177
178 178 _check_mp_mode(km, expected=False)
179 179 _check_mp_mode(km, expected=False, stream="stderr")
180 180
181 181
182 182 def test_subprocess_error():
183 183 """error in mp.Process doesn't crash"""
184 184 with new_kernel() as km:
185 185 iopub = km.sub_channel
186 186
187 187 code = '\n'.join([
188 188 "import multiprocessing as mp",
189 189 "def f():",
190 190 " return 1/0",
191 191 "p = mp.Process(target=f)",
192 192 "p.start()",
193 193 "p.join()",
194 194 ])
195 195
196 196 msg_id, content = execute(km=km, code=code)
197 197 stdout, stderr = assemble_output(iopub)
198 198 nt.assert_equal(stdout, '')
199 199 nt.assert_true("ZeroDivisionError" in stderr, stderr)
200 200
201 201 _check_mp_mode(km, expected=False)
202 _check_mp_mode(km, expected=True, stream="stderr")
202 _check_mp_mode(km, expected=False, stream="stderr")
203 203
General Comments 0
You need to be logged in to leave comments. Login now