##// END OF EJS Templates
Merge pull request #7765 from JanSchulz/Outstream_closed...
Min RK -
r20421:2a7d3585 merge
parent child Browse files
Show More
@@ -1,245 +1,249
1 """Wrappers for forwarding stdout/stderr over zmq"""
1 """Wrappers for forwarding stdout/stderr over zmq"""
2
2
3 # Copyright (c) IPython Development Team.
3 # Copyright (c) IPython Development Team.
4 # Distributed under the terms of the Modified BSD License.
4 # Distributed under the terms of the Modified BSD License.
5
5
6 import os
6 import os
7 import threading
7 import threading
8 import time
8 import time
9 import uuid
9 import uuid
10 from io import StringIO, UnsupportedOperation
10 from io import StringIO, UnsupportedOperation
11
11
12 import zmq
12 import zmq
13 from zmq.eventloop.ioloop import IOLoop
13 from zmq.eventloop.ioloop import IOLoop
14
14
15 from .session import extract_header
15 from .session import extract_header
16
16
17 from IPython.utils import py3compat
17 from IPython.utils import py3compat
18 from IPython.utils.py3compat import unicode_type
18 from IPython.utils.py3compat import unicode_type
19 from IPython.utils.warn import warn
19 from IPython.utils.warn import warn
20
20
21 #-----------------------------------------------------------------------------
21 #-----------------------------------------------------------------------------
22 # Globals
22 # Globals
23 #-----------------------------------------------------------------------------
23 #-----------------------------------------------------------------------------
24
24
25 MASTER = 0
25 MASTER = 0
26 CHILD = 1
26 CHILD = 1
27
27
28 #-----------------------------------------------------------------------------
28 #-----------------------------------------------------------------------------
29 # Stream classes
29 # Stream classes
30 #-----------------------------------------------------------------------------
30 #-----------------------------------------------------------------------------
31
31
32 class OutStream(object):
32 class OutStream(object):
33 """A file like object that publishes the stream to a 0MQ PUB socket."""
33 """A file like object that publishes the stream to a 0MQ PUB socket."""
34
34
35 # The time interval between automatic flushes, in seconds.
35 # The time interval between automatic flushes, in seconds.
36 _subprocess_flush_limit = 256
36 _subprocess_flush_limit = 256
37 flush_interval = 0.05
37 flush_interval = 0.05
38 topic=None
38 topic=None
39
39
40 def __init__(self, session, pub_socket, name, pipe=True):
40 def __init__(self, session, pub_socket, name, pipe=True):
41 self.encoding = 'UTF-8'
41 self.encoding = 'UTF-8'
42 self.session = session
42 self.session = session
43 self.pub_socket = pub_socket
43 self.pub_socket = pub_socket
44 self.name = name
44 self.name = name
45 self.topic = b'stream.' + py3compat.cast_bytes(name)
45 self.topic = b'stream.' + py3compat.cast_bytes(name)
46 self.parent_header = {}
46 self.parent_header = {}
47 self._new_buffer()
47 self._new_buffer()
48 self._buffer_lock = threading.Lock()
48 self._buffer_lock = threading.Lock()
49 self._master_pid = os.getpid()
49 self._master_pid = os.getpid()
50 self._master_thread = threading.current_thread().ident
50 self._master_thread = threading.current_thread().ident
51 self._pipe_pid = os.getpid()
51 self._pipe_pid = os.getpid()
52 self._pipe_flag = pipe
52 self._pipe_flag = pipe
53 if pipe:
53 if pipe:
54 self._setup_pipe_in()
54 self._setup_pipe_in()
55
55
56 def _setup_pipe_in(self):
56 def _setup_pipe_in(self):
57 """setup listening pipe for subprocesses"""
57 """setup listening pipe for subprocesses"""
58 ctx = self.pub_socket.context
58 ctx = self.pub_socket.context
59
59
60 # use UUID to authenticate pipe messages
60 # use UUID to authenticate pipe messages
61 self._pipe_uuid = uuid.uuid4().bytes
61 self._pipe_uuid = uuid.uuid4().bytes
62
62
63 self._pipe_in = ctx.socket(zmq.PULL)
63 self._pipe_in = ctx.socket(zmq.PULL)
64 self._pipe_in.linger = 0
64 self._pipe_in.linger = 0
65 try:
65 try:
66 self._pipe_port = self._pipe_in.bind_to_random_port("tcp://127.0.0.1")
66 self._pipe_port = self._pipe_in.bind_to_random_port("tcp://127.0.0.1")
67 except zmq.ZMQError as e:
67 except zmq.ZMQError as e:
68 warn("Couldn't bind IOStream to 127.0.0.1: %s" % e +
68 warn("Couldn't bind IOStream to 127.0.0.1: %s" % e +
69 "\nsubprocess output will be unavailable."
69 "\nsubprocess output will be unavailable."
70 )
70 )
71 self._pipe_flag = False
71 self._pipe_flag = False
72 self._pipe_in.close()
72 self._pipe_in.close()
73 del self._pipe_in
73 del self._pipe_in
74 return
74 return
75 self._pipe_poller = zmq.Poller()
75 self._pipe_poller = zmq.Poller()
76 self._pipe_poller.register(self._pipe_in, zmq.POLLIN)
76 self._pipe_poller.register(self._pipe_in, zmq.POLLIN)
77 if IOLoop.initialized():
77 if IOLoop.initialized():
78 # subprocess flush should trigger flush
78 # subprocess flush should trigger flush
79 # if kernel is idle
79 # if kernel is idle
80 IOLoop.instance().add_handler(self._pipe_in,
80 IOLoop.instance().add_handler(self._pipe_in,
81 lambda s, event: self.flush(),
81 lambda s, event: self.flush(),
82 IOLoop.READ,
82 IOLoop.READ,
83 )
83 )
84
84
85 def _setup_pipe_out(self):
85 def _setup_pipe_out(self):
86 # must be new context after fork
86 # must be new context after fork
87 ctx = zmq.Context()
87 ctx = zmq.Context()
88 self._pipe_pid = os.getpid()
88 self._pipe_pid = os.getpid()
89 self._pipe_out = ctx.socket(zmq.PUSH)
89 self._pipe_out = ctx.socket(zmq.PUSH)
90 self._pipe_out_lock = threading.Lock()
90 self._pipe_out_lock = threading.Lock()
91 self._pipe_out.connect("tcp://127.0.0.1:%i" % self._pipe_port)
91 self._pipe_out.connect("tcp://127.0.0.1:%i" % self._pipe_port)
92
92
93 def _is_master_process(self):
93 def _is_master_process(self):
94 return os.getpid() == self._master_pid
94 return os.getpid() == self._master_pid
95
95
96 def _is_master_thread(self):
96 def _is_master_thread(self):
97 return threading.current_thread().ident == self._master_thread
97 return threading.current_thread().ident == self._master_thread
98
98
99 def _have_pipe_out(self):
99 def _have_pipe_out(self):
100 return os.getpid() == self._pipe_pid
100 return os.getpid() == self._pipe_pid
101
101
102 def _check_mp_mode(self):
102 def _check_mp_mode(self):
103 """check for forks, and switch to zmq pipeline if necessary"""
103 """check for forks, and switch to zmq pipeline if necessary"""
104 if not self._pipe_flag or self._is_master_process():
104 if not self._pipe_flag or self._is_master_process():
105 return MASTER
105 return MASTER
106 else:
106 else:
107 if not self._have_pipe_out():
107 if not self._have_pipe_out():
108 self._flush_buffer()
108 self._flush_buffer()
109 # setup a new out pipe
109 # setup a new out pipe
110 self._setup_pipe_out()
110 self._setup_pipe_out()
111 return CHILD
111 return CHILD
112
112
113 def set_parent(self, parent):
113 def set_parent(self, parent):
114 self.parent_header = extract_header(parent)
114 self.parent_header = extract_header(parent)
115
115
116 def close(self):
116 def close(self):
117 self.pub_socket = None
117 self.pub_socket = None
118
118
119 @property
120 def closed(self):
121 return self.pub_socket is None
122
119 def _flush_from_subprocesses(self):
123 def _flush_from_subprocesses(self):
120 """flush possible pub data from subprocesses into my buffer"""
124 """flush possible pub data from subprocesses into my buffer"""
121 if not self._pipe_flag or not self._is_master_process():
125 if not self._pipe_flag or not self._is_master_process():
122 return
126 return
123 for i in range(self._subprocess_flush_limit):
127 for i in range(self._subprocess_flush_limit):
124 if self._pipe_poller.poll(0):
128 if self._pipe_poller.poll(0):
125 msg = self._pipe_in.recv_multipart()
129 msg = self._pipe_in.recv_multipart()
126 if msg[0] != self._pipe_uuid:
130 if msg[0] != self._pipe_uuid:
127 continue
131 continue
128 else:
132 else:
129 self._buffer.write(msg[1].decode(self.encoding, 'replace'))
133 self._buffer.write(msg[1].decode(self.encoding, 'replace'))
130 # this always means a flush,
134 # this always means a flush,
131 # so reset our timer
135 # so reset our timer
132 self._start = 0
136 self._start = 0
133 else:
137 else:
134 break
138 break
135
139
136 def _schedule_flush(self):
140 def _schedule_flush(self):
137 """schedule a flush in the main thread
141 """schedule a flush in the main thread
138
142
139 only works with a tornado/pyzmq eventloop running
143 only works with a tornado/pyzmq eventloop running
140 """
144 """
141 if IOLoop.initialized():
145 if IOLoop.initialized():
142 IOLoop.instance().add_callback(self.flush)
146 IOLoop.instance().add_callback(self.flush)
143 else:
147 else:
144 # no async loop, at least force the timer
148 # no async loop, at least force the timer
145 self._start = 0
149 self._start = 0
146
150
147 def flush(self):
151 def flush(self):
148 """trigger actual zmq send"""
152 """trigger actual zmq send"""
149 if self.pub_socket is None:
153 if self.pub_socket is None:
150 raise ValueError(u'I/O operation on closed file')
154 raise ValueError(u'I/O operation on closed file')
151
155
152 mp_mode = self._check_mp_mode()
156 mp_mode = self._check_mp_mode()
153
157
154 if mp_mode != CHILD:
158 if mp_mode != CHILD:
155 # we are master
159 # we are master
156 if not self._is_master_thread():
160 if not self._is_master_thread():
157 # sub-threads must not trigger flush directly,
161 # sub-threads must not trigger flush directly,
158 # but at least they can schedule an async flush, or force the timer.
162 # but at least they can schedule an async flush, or force the timer.
159 self._schedule_flush()
163 self._schedule_flush()
160 return
164 return
161
165
162 self._flush_from_subprocesses()
166 self._flush_from_subprocesses()
163 data = self._flush_buffer()
167 data = self._flush_buffer()
164
168
165 if data:
169 if data:
166 content = {u'name':self.name, u'text':data}
170 content = {u'name':self.name, u'text':data}
167 msg = self.session.send(self.pub_socket, u'stream', content=content,
171 msg = self.session.send(self.pub_socket, u'stream', content=content,
168 parent=self.parent_header, ident=self.topic)
172 parent=self.parent_header, ident=self.topic)
169
173
170 if hasattr(self.pub_socket, 'flush'):
174 if hasattr(self.pub_socket, 'flush'):
171 # socket itself has flush (presumably ZMQStream)
175 # socket itself has flush (presumably ZMQStream)
172 self.pub_socket.flush()
176 self.pub_socket.flush()
173 else:
177 else:
174 with self._pipe_out_lock:
178 with self._pipe_out_lock:
175 string = self._flush_buffer()
179 string = self._flush_buffer()
176 tracker = self._pipe_out.send_multipart([
180 tracker = self._pipe_out.send_multipart([
177 self._pipe_uuid,
181 self._pipe_uuid,
178 string.encode(self.encoding, 'replace'),
182 string.encode(self.encoding, 'replace'),
179 ], copy=False, track=True)
183 ], copy=False, track=True)
180 try:
184 try:
181 tracker.wait(1)
185 tracker.wait(1)
182 except:
186 except:
183 pass
187 pass
184
188
185 def isatty(self):
189 def isatty(self):
186 return False
190 return False
187
191
188 def __next__(self):
192 def __next__(self):
189 raise IOError('Read not supported on a write only stream.')
193 raise IOError('Read not supported on a write only stream.')
190
194
191 if not py3compat.PY3:
195 if not py3compat.PY3:
192 next = __next__
196 next = __next__
193
197
194 def read(self, size=-1):
198 def read(self, size=-1):
195 raise IOError('Read not supported on a write only stream.')
199 raise IOError('Read not supported on a write only stream.')
196
200
197 def readline(self, size=-1):
201 def readline(self, size=-1):
198 raise IOError('Read not supported on a write only stream.')
202 raise IOError('Read not supported on a write only stream.')
199
203
200 def fileno(self):
204 def fileno(self):
201 raise UnsupportedOperation("IOStream has no fileno.")
205 raise UnsupportedOperation("IOStream has no fileno.")
202
206
203 def write(self, string):
207 def write(self, string):
204 if self.pub_socket is None:
208 if self.pub_socket is None:
205 raise ValueError('I/O operation on closed file')
209 raise ValueError('I/O operation on closed file')
206 else:
210 else:
207 # Make sure that we're handling unicode
211 # Make sure that we're handling unicode
208 if not isinstance(string, unicode_type):
212 if not isinstance(string, unicode_type):
209 string = string.decode(self.encoding, 'replace')
213 string = string.decode(self.encoding, 'replace')
210
214
211 is_child = (self._check_mp_mode() == CHILD)
215 is_child = (self._check_mp_mode() == CHILD)
212 self._buffer.write(string)
216 self._buffer.write(string)
213 if is_child:
217 if is_child:
214 # newlines imply flush in subprocesses
218 # newlines imply flush in subprocesses
215 # mp.Pool cannot be trusted to flush promptly (or ever),
219 # mp.Pool cannot be trusted to flush promptly (or ever),
216 # and this helps.
220 # and this helps.
217 if '\n' in string:
221 if '\n' in string:
218 self.flush()
222 self.flush()
219 # do we want to check subprocess flushes on write?
223 # do we want to check subprocess flushes on write?
220 # self._flush_from_subprocesses()
224 # self._flush_from_subprocesses()
221 current_time = time.time()
225 current_time = time.time()
222 if self._start < 0:
226 if self._start < 0:
223 self._start = current_time
227 self._start = current_time
224 elif current_time - self._start > self.flush_interval:
228 elif current_time - self._start > self.flush_interval:
225 self.flush()
229 self.flush()
226
230
227 def writelines(self, sequence):
231 def writelines(self, sequence):
228 if self.pub_socket is None:
232 if self.pub_socket is None:
229 raise ValueError('I/O operation on closed file')
233 raise ValueError('I/O operation on closed file')
230 else:
234 else:
231 for string in sequence:
235 for string in sequence:
232 self.write(string)
236 self.write(string)
233
237
234 def _flush_buffer(self):
238 def _flush_buffer(self):
235 """clear the current buffer and return the current buffer data"""
239 """clear the current buffer and return the current buffer data"""
236 data = u''
240 data = u''
237 if self._buffer is not None:
241 if self._buffer is not None:
238 data = self._buffer.getvalue()
242 data = self._buffer.getvalue()
239 self._buffer.close()
243 self._buffer.close()
240 self._new_buffer()
244 self._new_buffer()
241 return data
245 return data
242
246
243 def _new_buffer(self):
247 def _new_buffer(self):
244 self._buffer = StringIO()
248 self._buffer = StringIO()
245 self._start = -1
249 self._start = -1
General Comments 0
You need to be logged in to leave comments. Login now