##// END OF EJS Templates
subprocess iostreams only send on flush
MinRK -
Show More
@@ -41,7 +41,7 b' class OutStream(object):'
41 self.name = name
41 self.name = name
42 self.parent_header = {}
42 self.parent_header = {}
43 self._new_buffer()
43 self._new_buffer()
44 self._found_newprocess = threading.Event()
44 self._found_newprocess = 0
45 self._buffer_lock = threading.Lock()
45 self._buffer_lock = threading.Lock()
46 self._master_pid = os.getpid()
46 self._master_pid = os.getpid()
47 self._master_thread = threading.current_thread().ident
47 self._master_thread = threading.current_thread().ident
@@ -50,21 +50,16 b' class OutStream(object):'
50
50
51 def _setup_pipe_in(self):
51 def _setup_pipe_in(self):
52 """setup listening pipe for subprocesses"""
52 """setup listening pipe for subprocesses"""
53 ctx = self._pipe_ctx = zmq.Context()
53 ctx = self.pub_socket.context
54
55 # signal pair for terminating background thread
56 self._pipe_signaler = ctx.socket(zmq.PAIR)
57 self._pipe_signalee = ctx.socket(zmq.PAIR)
58 self._pipe_signaler.bind("inproc://ostream_pipe")
59 self._pipe_signalee.connect("inproc://ostream_pipe")
60 # thread event to signal cleanup is done
61 self._pipe_done = threading.Event()
62
54
63 # use UUID to authenticate pipe messages
55 # use UUID to authenticate pipe messages
64 self._pipe_uuid = uuid.uuid4().bytes
56 self._pipe_uuid = uuid.uuid4().bytes
65
57
66 self._pipe_thread = threading.Thread(target=self._pipe_main)
58 self._pipe_in = ctx.socket(zmq.PULL)
67 self._pipe_thread.start()
59 self._pipe_in.linger = 0
60 self._pipe_port = self._pipe_in.bind_to_random_port("tcp://127.0.0.1")
61 self._pipe_poller = zmq.Poller()
62 self._pipe_poller.register(self._pipe_in, zmq.POLLIN)
68
63
69 def _setup_pipe_out(self):
64 def _setup_pipe_out(self):
70 # must be new context after fork
65 # must be new context after fork
@@ -74,51 +69,6 b' class OutStream(object):'
74 self._pipe_out_lock = threading.Lock()
69 self._pipe_out_lock = threading.Lock()
75 self._pipe_out.connect("tcp://127.0.0.1:%i" % self._pipe_port)
70 self._pipe_out.connect("tcp://127.0.0.1:%i" % self._pipe_port)
76
71
77 def _pipe_main(self):
78 """eventloop for receiving"""
79 ctx = self._pipe_ctx
80 self._pipe_in = ctx.socket(zmq.PULL)
81 self._pipe_port = self._pipe_in.bind_to_random_port("tcp://127.0.0.1")
82 poller = zmq.Poller()
83 poller.register(self._pipe_signalee, zmq.POLLIN)
84 poller.register(self._pipe_in, zmq.POLLIN)
85 while True:
86 if not self._is_master_process():
87 return
88 try:
89 events = dict(poller.poll(1000))
90 except zmq.ZMQError:
91 # should only be triggered by process-ending cleanup
92 return
93
94 if self._pipe_signalee in events:
95 break
96 if self._pipe_in in events:
97 msg = self._pipe_in.recv_multipart()
98 if msg[0] != self._pipe_uuid:
99 # message not authenticated
100 continue
101 self._found_newprocess.set()
102 text = msg[1].decode(self.encoding, 'replace')
103 with self._buffer_lock:
104 self._buffer.write(text)
105 if self._start < 0:
106 self._start = time.time()
107
108 # wrap it up
109 self._pipe_signaler.close()
110 self._pipe_signalee.close()
111 self._pipe_in.close()
112 self._pipe_ctx.term()
113 self._pipe_done.set()
114
115
116 def __del__(self):
117 if not self._is_master_process():
118 return
119 self._pipe_signaler.send(b'die')
120 self._pipe_done.wait(10)
121
122 def _is_master_process(self):
72 def _is_master_process(self):
123 return os.getpid() == self._master_pid
73 return os.getpid() == self._master_pid
124
74
@@ -131,7 +81,7 b' class OutStream(object):'
131 def _check_mp_mode(self):
81 def _check_mp_mode(self):
132 """check for forks, and switch to zmq pipeline if necessary"""
82 """check for forks, and switch to zmq pipeline if necessary"""
133 if self._is_master_process():
83 if self._is_master_process():
134 if self._found_newprocess.is_set():
84 if self._found_newprocess:
135 return MASTER_WITH_CHILDREN
85 return MASTER_WITH_CHILDREN
136 else:
86 else:
137 return MASTER_NO_CHILDREN
87 return MASTER_NO_CHILDREN
@@ -147,42 +97,57 b' class OutStream(object):'
147 def close(self):
97 def close(self):
148 self.pub_socket = None
98 self.pub_socket = None
149
99
100 def _flush_from_subprocesses(self):
101 """flush possible pub data from subprocesses into my buffer"""
102 if not self._is_master_process():
103 return
104 for i in range(100):
105 if self._pipe_poller.poll(0):
106 msg = self._pipe_in.recv_multipart()
107 if msg[0] != self._pipe_uuid:
108 continue
109 else:
110 self._buffer.write(msg[1].decode(self.encoding, 'replace'))
111 # this always means a flush,
112 # so reset our timer
113 self._start = 0
114 else:
115 break
116
150 def flush(self):
117 def flush(self):
151 """trigger actual zmq send"""
118 """trigger actual zmq send"""
152 if self.pub_socket is None:
119 if self.pub_socket is None:
153 raise ValueError(u'I/O operation on closed file')
120 raise ValueError(u'I/O operation on closed file')
121
122 mp_mode = self._check_mp_mode()
123
124 if mp_mode != CHILD:
125 # we are master
126 if not self._is_master_thread():
127 # sub-threads must not trigger flush,
128 # but at least they can force the timer.
129 self._start = 0
130 return
131
132 self._flush_from_subprocesses()
133 data = self._flush_buffer()
134
135 if data:
136 content = {u'name':self.name, u'data':data}
137 msg = self.session.send(self.pub_socket, u'stream', content=content,
138 parent=self.parent_header, ident=self.topic)
139
140 if hasattr(self.pub_socket, 'flush'):
141 # socket itself has flush (presumably ZMQStream)
142 self.pub_socket.flush()
154 else:
143 else:
155 if self._is_master_process():
144 with self._pipe_out_lock:
156 if not self._is_master_thread():
145 string = self._flush_buffer()
157 # sub-threads mustn't trigger flush,
146 tracker = self._pipe_out.send_multipart([
158 # but at least they can force the timer.
147 self._pipe_uuid,
159 self._start = 0
148 string.encode(self.encoding, 'replace'),
160 data = u''
149 ], copy=False, track=True)
161 # obtain data
150 tracker.wait(1)
162 if self._check_mp_mode(): # multiprocess, needs a lock
163 with self._buffer_lock:
164 data = self._buffer.getvalue()
165 self._buffer.close()
166 self._new_buffer()
167 else: # single process mode
168 data = self._buffer.getvalue()
169 self._buffer.close()
170 self._new_buffer()
171
172 if data:
173 content = {u'name':self.name, u'data':data}
174 msg = self.session.send(self.pub_socket, u'stream', content=content,
175 parent=self.parent_header, ident=self.topic)
176
177 if hasattr(self.pub_socket, 'flush'):
178 # socket itself has flush (presumably ZMQStream)
179 self.pub_socket.flush()
180 else:
181 self._check_mp_mode()
182 with self._pipe_out_lock:
183 tracker = self._pipe_out.send(b'', copy=False, track=True)
184 tracker.wait(1)
185
186
151
187 def isatty(self):
152 def isatty(self):
188 return False
153 return False
@@ -206,21 +171,10 b' class OutStream(object):'
206 # Make sure that we're handling unicode
171 # Make sure that we're handling unicode
207 if not isinstance(string, unicode):
172 if not isinstance(string, unicode):
208 string = string.decode(self.encoding, 'replace')
173 string = string.decode(self.encoding, 'replace')
209
174 self._buffer.write(string)
210 mp_mode = self._check_mp_mode()
175 self._check_mp_mode()
211 if mp_mode == CHILD:
176 # do we want to check subprocess flushes on write?
212 with self._pipe_out_lock:
177 # self._flush_from_subprocesses()
213 self._pipe_out.send_multipart([
214 self._pipe_uuid,
215 string.encode(self.encoding, 'replace'),
216 ])
217 return
218 elif mp_mode == MASTER_NO_CHILDREN:
219 self._buffer.write(string)
220 elif mp_mode == MASTER_WITH_CHILDREN:
221 with self._buffer_lock:
222 self._buffer.write(string)
223
224 current_time = time.time()
178 current_time = time.time()
225 if self._start < 0:
179 if self._start < 0:
226 self._start = current_time
180 self._start = current_time
@@ -234,6 +188,16 b' class OutStream(object):'
234 for string in sequence:
188 for string in sequence:
235 self.write(string)
189 self.write(string)
236
190
191 def _flush_buffer(self):
192 """clear the current buffer and return the current buffer data"""
193 data = u''
194 if self._buffer is not None:
195 data = self._buffer.getvalue()
196 self._buffer.close()
197 self._new_buffer()
198 return data
199
237 def _new_buffer(self):
200 def _new_buffer(self):
238 self._buffer = StringIO()
201 self._buffer = StringIO()
239 self._start = -1
202 self._start = -1
203
@@ -151,7 +151,7 b' def test_subprocess_print():'
151 for n in range(np):
151 for n in range(np):
152 nt.assert_equal(stdout.count(str(n)), 1, stdout)
152 nt.assert_equal(stdout.count(str(n)), 1, stdout)
153 nt.assert_equal(stderr, '')
153 nt.assert_equal(stderr, '')
154 _check_mp_mode(km, expected=True)
154 _check_mp_mode(km, expected=False)
155 _check_mp_mode(km, expected=False, stream="stderr")
155 _check_mp_mode(km, expected=False, stream="stderr")
156
156
157
157
@@ -199,5 +199,5 b' def test_subprocess_error():'
199 nt.assert_true("ZeroDivisionError" in stderr, stderr)
199 nt.assert_true("ZeroDivisionError" in stderr, stderr)
200
200
201 _check_mp_mode(km, expected=False)
201 _check_mp_mode(km, expected=False)
202 _check_mp_mode(km, expected=True, stream="stderr")
202 _check_mp_mode(km, expected=False, stream="stderr")
203
203
General Comments 0
You need to be logged in to leave comments. Login now