##// END OF EJS Templates
Backport PR #5669: allows threadsafe sys.stdout.flush from background threads...
MinRK -
Show More
@@ -1,231 +1,238 b''
1 """wrappers for stdout/stderr forwarding over zmq
1 """Wrappers for forwarding stdout/stderr over zmq"""
2 """
3
2
4 #-----------------------------------------------------------------------------
3 # Copyright (c) IPython Development Team.
5 # Copyright (C) 2013 The IPython Development Team
4 # Distributed under the terms of the Modified BSD License.
6 #
7 # Distributed under the terms of the BSD License. The full license is in
8 # the file COPYING, distributed as part of this software.
9 #-----------------------------------------------------------------------------
10
5
11 import os
6 import os
12 import threading
7 import threading
13 import time
8 import time
14 import uuid
9 import uuid
15 from io import StringIO, UnsupportedOperation
10 from io import StringIO, UnsupportedOperation
16
11
17 import zmq
12 import zmq
13 from zmq.eventloop.ioloop import IOLoop
18
14
19 from .session import extract_header
15 from .session import extract_header
20
16
21 from IPython.utils import py3compat
17 from IPython.utils import py3compat
22 from IPython.utils.py3compat import unicode_type
18 from IPython.utils.py3compat import unicode_type
23 from IPython.utils.warn import warn
19 from IPython.utils.warn import warn
24
20
25 #-----------------------------------------------------------------------------
21 #-----------------------------------------------------------------------------
26 # Globals
22 # Globals
27 #-----------------------------------------------------------------------------
23 #-----------------------------------------------------------------------------
28
24
29 MASTER = 0
25 MASTER = 0
30 CHILD = 1
26 CHILD = 1
31
27
32 #-----------------------------------------------------------------------------
28 #-----------------------------------------------------------------------------
33 # Stream classes
29 # Stream classes
34 #-----------------------------------------------------------------------------
30 #-----------------------------------------------------------------------------
35
31
36 class OutStream(object):
32 class OutStream(object):
37 """A file like object that publishes the stream to a 0MQ PUB socket."""
33 """A file like object that publishes the stream to a 0MQ PUB socket."""
38
34
39 # The time interval between automatic flushes, in seconds.
35 # The time interval between automatic flushes, in seconds.
40 _subprocess_flush_limit = 256
36 _subprocess_flush_limit = 256
41 flush_interval = 0.05
37 flush_interval = 0.05
42 topic=None
38 topic=None
43
39
44 def __init__(self, session, pub_socket, name, pipe=True):
40 def __init__(self, session, pub_socket, name, pipe=True):
45 self.encoding = 'UTF-8'
41 self.encoding = 'UTF-8'
46 self.session = session
42 self.session = session
47 self.pub_socket = pub_socket
43 self.pub_socket = pub_socket
48 self.name = name
44 self.name = name
49 self.topic = b'stream.' + py3compat.cast_bytes(name)
45 self.topic = b'stream.' + py3compat.cast_bytes(name)
50 self.parent_header = {}
46 self.parent_header = {}
51 self._new_buffer()
47 self._new_buffer()
52 self._buffer_lock = threading.Lock()
48 self._buffer_lock = threading.Lock()
53 self._master_pid = os.getpid()
49 self._master_pid = os.getpid()
54 self._master_thread = threading.current_thread().ident
50 self._master_thread = threading.current_thread().ident
55 self._pipe_pid = os.getpid()
51 self._pipe_pid = os.getpid()
56 self._pipe_flag = pipe
52 self._pipe_flag = pipe
57 if pipe:
53 if pipe:
58 self._setup_pipe_in()
54 self._setup_pipe_in()
59
55
60 def _setup_pipe_in(self):
56 def _setup_pipe_in(self):
61 """setup listening pipe for subprocesses"""
57 """setup listening pipe for subprocesses"""
62 ctx = self.pub_socket.context
58 ctx = self.pub_socket.context
63
59
64 # use UUID to authenticate pipe messages
60 # use UUID to authenticate pipe messages
65 self._pipe_uuid = uuid.uuid4().bytes
61 self._pipe_uuid = uuid.uuid4().bytes
66
62
67 self._pipe_in = ctx.socket(zmq.PULL)
63 self._pipe_in = ctx.socket(zmq.PULL)
68 self._pipe_in.linger = 0
64 self._pipe_in.linger = 0
69 try:
65 try:
70 self._pipe_port = self._pipe_in.bind_to_random_port("tcp://127.0.0.1")
66 self._pipe_port = self._pipe_in.bind_to_random_port("tcp://127.0.0.1")
71 except zmq.ZMQError as e:
67 except zmq.ZMQError as e:
72 warn("Couldn't bind IOStream to 127.0.0.1: %s" % e +
68 warn("Couldn't bind IOStream to 127.0.0.1: %s" % e +
73 "\nsubprocess output will be unavailable."
69 "\nsubprocess output will be unavailable."
74 )
70 )
75 self._pipe_flag = False
71 self._pipe_flag = False
76 self._pipe_in.close()
72 self._pipe_in.close()
77 del self._pipe_in
73 del self._pipe_in
78 return
74 return
79 self._pipe_poller = zmq.Poller()
75 self._pipe_poller = zmq.Poller()
80 self._pipe_poller.register(self._pipe_in, zmq.POLLIN)
76 self._pipe_poller.register(self._pipe_in, zmq.POLLIN)
81
77
82 def _setup_pipe_out(self):
78 def _setup_pipe_out(self):
83 # must be new context after fork
79 # must be new context after fork
84 ctx = zmq.Context()
80 ctx = zmq.Context()
85 self._pipe_pid = os.getpid()
81 self._pipe_pid = os.getpid()
86 self._pipe_out = ctx.socket(zmq.PUSH)
82 self._pipe_out = ctx.socket(zmq.PUSH)
87 self._pipe_out_lock = threading.Lock()
83 self._pipe_out_lock = threading.Lock()
88 self._pipe_out.connect("tcp://127.0.0.1:%i" % self._pipe_port)
84 self._pipe_out.connect("tcp://127.0.0.1:%i" % self._pipe_port)
89
85
90 def _is_master_process(self):
86 def _is_master_process(self):
91 return os.getpid() == self._master_pid
87 return os.getpid() == self._master_pid
92
88
93 def _is_master_thread(self):
89 def _is_master_thread(self):
94 return threading.current_thread().ident == self._master_thread
90 return threading.current_thread().ident == self._master_thread
95
91
96 def _have_pipe_out(self):
92 def _have_pipe_out(self):
97 return os.getpid() == self._pipe_pid
93 return os.getpid() == self._pipe_pid
98
94
99 def _check_mp_mode(self):
95 def _check_mp_mode(self):
100 """check for forks, and switch to zmq pipeline if necessary"""
96 """check for forks, and switch to zmq pipeline if necessary"""
101 if not self._pipe_flag or self._is_master_process():
97 if not self._pipe_flag or self._is_master_process():
102 return MASTER
98 return MASTER
103 else:
99 else:
104 if not self._have_pipe_out():
100 if not self._have_pipe_out():
105 self._flush_buffer()
101 self._flush_buffer()
106 # setup a new out pipe
102 # setup a new out pipe
107 self._setup_pipe_out()
103 self._setup_pipe_out()
108 return CHILD
104 return CHILD
109
105
110 def set_parent(self, parent):
106 def set_parent(self, parent):
111 self.parent_header = extract_header(parent)
107 self.parent_header = extract_header(parent)
112
108
113 def close(self):
109 def close(self):
114 self.pub_socket = None
110 self.pub_socket = None
115
111
116 def _flush_from_subprocesses(self):
112 def _flush_from_subprocesses(self):
117 """flush possible pub data from subprocesses into my buffer"""
113 """flush possible pub data from subprocesses into my buffer"""
118 if not self._pipe_flag or not self._is_master_process():
114 if not self._pipe_flag or not self._is_master_process():
119 return
115 return
120 for i in range(self._subprocess_flush_limit):
116 for i in range(self._subprocess_flush_limit):
121 if self._pipe_poller.poll(0):
117 if self._pipe_poller.poll(0):
122 msg = self._pipe_in.recv_multipart()
118 msg = self._pipe_in.recv_multipart()
123 if msg[0] != self._pipe_uuid:
119 if msg[0] != self._pipe_uuid:
124 continue
120 continue
125 else:
121 else:
126 self._buffer.write(msg[1].decode(self.encoding, 'replace'))
122 self._buffer.write(msg[1].decode(self.encoding, 'replace'))
127 # this always means a flush,
123 # this always means a flush,
128 # so reset our timer
124 # so reset our timer
129 self._start = 0
125 self._start = 0
130 else:
126 else:
131 break
127 break
132
128
129 def _schedule_flush(self):
130 """schedule a flush in the main thread
131
132 only works with a tornado/pyzmq eventloop running
133 """
134 if IOLoop.initialized():
135 IOLoop.instance().add_callback(self.flush)
136 else:
137 # no async loop, at least force the timer
138 self._start = 0
139
133 def flush(self):
140 def flush(self):
134 """trigger actual zmq send"""
141 """trigger actual zmq send"""
135 if self.pub_socket is None:
142 if self.pub_socket is None:
136 raise ValueError(u'I/O operation on closed file')
143 raise ValueError(u'I/O operation on closed file')
137
144
138 mp_mode = self._check_mp_mode()
145 mp_mode = self._check_mp_mode()
139
146
140 if mp_mode != CHILD:
147 if mp_mode != CHILD:
141 # we are master
148 # we are master
142 if not self._is_master_thread():
149 if not self._is_master_thread():
143 # sub-threads must not trigger flush,
150 # sub-threads must not trigger flush directly,
144 # but at least they can force the timer.
151 # but at least they can schedule an async flush, or force the timer.
145 self._start = 0
152 self._schedule_flush()
146 return
153 return
147
154
148 self._flush_from_subprocesses()
155 self._flush_from_subprocesses()
149 data = self._flush_buffer()
156 data = self._flush_buffer()
150
157
151 if data:
158 if data:
152 content = {u'name':self.name, u'data':data}
159 content = {u'name':self.name, u'data':data}
153 msg = self.session.send(self.pub_socket, u'stream', content=content,
160 msg = self.session.send(self.pub_socket, u'stream', content=content,
154 parent=self.parent_header, ident=self.topic)
161 parent=self.parent_header, ident=self.topic)
155
162
156 if hasattr(self.pub_socket, 'flush'):
163 if hasattr(self.pub_socket, 'flush'):
157 # socket itself has flush (presumably ZMQStream)
164 # socket itself has flush (presumably ZMQStream)
158 self.pub_socket.flush()
165 self.pub_socket.flush()
159 else:
166 else:
160 with self._pipe_out_lock:
167 with self._pipe_out_lock:
161 string = self._flush_buffer()
168 string = self._flush_buffer()
162 tracker = self._pipe_out.send_multipart([
169 tracker = self._pipe_out.send_multipart([
163 self._pipe_uuid,
170 self._pipe_uuid,
164 string.encode(self.encoding, 'replace'),
171 string.encode(self.encoding, 'replace'),
165 ], copy=False, track=True)
172 ], copy=False, track=True)
166 try:
173 try:
167 tracker.wait(1)
174 tracker.wait(1)
168 except:
175 except:
169 pass
176 pass
170
177
171 def isatty(self):
178 def isatty(self):
172 return False
179 return False
173
180
174 def __next__(self):
181 def __next__(self):
175 raise IOError('Read not supported on a write only stream.')
182 raise IOError('Read not supported on a write only stream.')
176
183
177 if not py3compat.PY3:
184 if not py3compat.PY3:
178 next = __next__
185 next = __next__
179
186
180 def read(self, size=-1):
187 def read(self, size=-1):
181 raise IOError('Read not supported on a write only stream.')
188 raise IOError('Read not supported on a write only stream.')
182
189
183 def readline(self, size=-1):
190 def readline(self, size=-1):
184 raise IOError('Read not supported on a write only stream.')
191 raise IOError('Read not supported on a write only stream.')
185
192
186 def fileno(self):
193 def fileno(self):
187 raise UnsupportedOperation("IOStream has no fileno.")
194 raise UnsupportedOperation("IOStream has no fileno.")
188
195
189 def write(self, string):
196 def write(self, string):
190 if self.pub_socket is None:
197 if self.pub_socket is None:
191 raise ValueError('I/O operation on closed file')
198 raise ValueError('I/O operation on closed file')
192 else:
199 else:
193 # Make sure that we're handling unicode
200 # Make sure that we're handling unicode
194 if not isinstance(string, unicode_type):
201 if not isinstance(string, unicode_type):
195 string = string.decode(self.encoding, 'replace')
202 string = string.decode(self.encoding, 'replace')
196
203
197 is_child = (self._check_mp_mode() == CHILD)
204 is_child = (self._check_mp_mode() == CHILD)
198 self._buffer.write(string)
205 self._buffer.write(string)
199 if is_child:
206 if is_child:
200 # newlines imply flush in subprocesses
207 # newlines imply flush in subprocesses
201 # mp.Pool cannot be trusted to flush promptly (or ever),
208 # mp.Pool cannot be trusted to flush promptly (or ever),
202 # and this helps.
209 # and this helps.
203 if '\n' in string:
210 if '\n' in string:
204 self.flush()
211 self.flush()
205 # do we want to check subprocess flushes on write?
212 # do we want to check subprocess flushes on write?
206 # self._flush_from_subprocesses()
213 # self._flush_from_subprocesses()
207 current_time = time.time()
214 current_time = time.time()
208 if self._start < 0:
215 if self._start < 0:
209 self._start = current_time
216 self._start = current_time
210 elif current_time - self._start > self.flush_interval:
217 elif current_time - self._start > self.flush_interval:
211 self.flush()
218 self.flush()
212
219
213 def writelines(self, sequence):
220 def writelines(self, sequence):
214 if self.pub_socket is None:
221 if self.pub_socket is None:
215 raise ValueError('I/O operation on closed file')
222 raise ValueError('I/O operation on closed file')
216 else:
223 else:
217 for string in sequence:
224 for string in sequence:
218 self.write(string)
225 self.write(string)
219
226
220 def _flush_buffer(self):
227 def _flush_buffer(self):
221 """clear the current buffer and return the current buffer data"""
228 """clear the current buffer and return the current buffer data"""
222 data = u''
229 data = u''
223 if self._buffer is not None:
230 if self._buffer is not None:
224 data = self._buffer.getvalue()
231 data = self._buffer.getvalue()
225 self._buffer.close()
232 self._buffer.close()
226 self._new_buffer()
233 self._new_buffer()
227 return data
234 return data
228
235
229 def _new_buffer(self):
236 def _new_buffer(self):
230 self._buffer = StringIO()
237 self._buffer = StringIO()
231 self._start = -1
238 self._start = -1
General Comments 0
You need to be logged in to leave comments. Login now