##// END OF EJS Templates
Merge pull request #7765 from JanSchulz/Outstream_closed...
Min RK -
r20421:2a7d3585 merge
parent child Browse files
Show More
@@ -1,245 +1,249
1 1 """Wrappers for forwarding stdout/stderr over zmq"""
2 2
3 3 # Copyright (c) IPython Development Team.
4 4 # Distributed under the terms of the Modified BSD License.
5 5
6 6 import os
7 7 import threading
8 8 import time
9 9 import uuid
10 10 from io import StringIO, UnsupportedOperation
11 11
12 12 import zmq
13 13 from zmq.eventloop.ioloop import IOLoop
14 14
15 15 from .session import extract_header
16 16
17 17 from IPython.utils import py3compat
18 18 from IPython.utils.py3compat import unicode_type
19 19 from IPython.utils.warn import warn
20 20
21 21 #-----------------------------------------------------------------------------
22 22 # Globals
23 23 #-----------------------------------------------------------------------------
24 24
25 25 MASTER = 0
26 26 CHILD = 1
27 27
28 28 #-----------------------------------------------------------------------------
29 29 # Stream classes
30 30 #-----------------------------------------------------------------------------
31 31
32 32 class OutStream(object):
33 33 """A file like object that publishes the stream to a 0MQ PUB socket."""
34 34
35 35 # The time interval between automatic flushes, in seconds.
36 36 _subprocess_flush_limit = 256
37 37 flush_interval = 0.05
38 38 topic=None
39 39
40 40 def __init__(self, session, pub_socket, name, pipe=True):
41 41 self.encoding = 'UTF-8'
42 42 self.session = session
43 43 self.pub_socket = pub_socket
44 44 self.name = name
45 45 self.topic = b'stream.' + py3compat.cast_bytes(name)
46 46 self.parent_header = {}
47 47 self._new_buffer()
48 48 self._buffer_lock = threading.Lock()
49 49 self._master_pid = os.getpid()
50 50 self._master_thread = threading.current_thread().ident
51 51 self._pipe_pid = os.getpid()
52 52 self._pipe_flag = pipe
53 53 if pipe:
54 54 self._setup_pipe_in()
55 55
56 56 def _setup_pipe_in(self):
57 57 """setup listening pipe for subprocesses"""
58 58 ctx = self.pub_socket.context
59 59
60 60 # use UUID to authenticate pipe messages
61 61 self._pipe_uuid = uuid.uuid4().bytes
62 62
63 63 self._pipe_in = ctx.socket(zmq.PULL)
64 64 self._pipe_in.linger = 0
65 65 try:
66 66 self._pipe_port = self._pipe_in.bind_to_random_port("tcp://127.0.0.1")
67 67 except zmq.ZMQError as e:
68 68 warn("Couldn't bind IOStream to 127.0.0.1: %s" % e +
69 69 "\nsubprocess output will be unavailable."
70 70 )
71 71 self._pipe_flag = False
72 72 self._pipe_in.close()
73 73 del self._pipe_in
74 74 return
75 75 self._pipe_poller = zmq.Poller()
76 76 self._pipe_poller.register(self._pipe_in, zmq.POLLIN)
77 77 if IOLoop.initialized():
78 78 # subprocess flush should trigger flush
79 79 # if kernel is idle
80 80 IOLoop.instance().add_handler(self._pipe_in,
81 81 lambda s, event: self.flush(),
82 82 IOLoop.READ,
83 83 )
84 84
85 85 def _setup_pipe_out(self):
86 86 # must be new context after fork
87 87 ctx = zmq.Context()
88 88 self._pipe_pid = os.getpid()
89 89 self._pipe_out = ctx.socket(zmq.PUSH)
90 90 self._pipe_out_lock = threading.Lock()
91 91 self._pipe_out.connect("tcp://127.0.0.1:%i" % self._pipe_port)
92 92
93 93 def _is_master_process(self):
94 94 return os.getpid() == self._master_pid
95 95
96 96 def _is_master_thread(self):
97 97 return threading.current_thread().ident == self._master_thread
98 98
99 99 def _have_pipe_out(self):
100 100 return os.getpid() == self._pipe_pid
101 101
102 102 def _check_mp_mode(self):
103 103 """check for forks, and switch to zmq pipeline if necessary"""
104 104 if not self._pipe_flag or self._is_master_process():
105 105 return MASTER
106 106 else:
107 107 if not self._have_pipe_out():
108 108 self._flush_buffer()
109 109 # setup a new out pipe
110 110 self._setup_pipe_out()
111 111 return CHILD
112 112
113 113 def set_parent(self, parent):
114 114 self.parent_header = extract_header(parent)
115 115
116 116 def close(self):
117 117 self.pub_socket = None
118 118
119 @property
120 def closed(self):
121 return self.pub_socket is None
122
119 123 def _flush_from_subprocesses(self):
120 124 """flush possible pub data from subprocesses into my buffer"""
121 125 if not self._pipe_flag or not self._is_master_process():
122 126 return
123 127 for i in range(self._subprocess_flush_limit):
124 128 if self._pipe_poller.poll(0):
125 129 msg = self._pipe_in.recv_multipart()
126 130 if msg[0] != self._pipe_uuid:
127 131 continue
128 132 else:
129 133 self._buffer.write(msg[1].decode(self.encoding, 'replace'))
130 134 # this always means a flush,
131 135 # so reset our timer
132 136 self._start = 0
133 137 else:
134 138 break
135 139
136 140 def _schedule_flush(self):
137 141 """schedule a flush in the main thread
138 142
139 143 only works with a tornado/pyzmq eventloop running
140 144 """
141 145 if IOLoop.initialized():
142 146 IOLoop.instance().add_callback(self.flush)
143 147 else:
144 148 # no async loop, at least force the timer
145 149 self._start = 0
146 150
147 151 def flush(self):
148 152 """trigger actual zmq send"""
149 153 if self.pub_socket is None:
150 154 raise ValueError(u'I/O operation on closed file')
151 155
152 156 mp_mode = self._check_mp_mode()
153 157
154 158 if mp_mode != CHILD:
155 159 # we are master
156 160 if not self._is_master_thread():
157 161 # sub-threads must not trigger flush directly,
158 162 # but at least they can schedule an async flush, or force the timer.
159 163 self._schedule_flush()
160 164 return
161 165
162 166 self._flush_from_subprocesses()
163 167 data = self._flush_buffer()
164 168
165 169 if data:
166 170 content = {u'name':self.name, u'text':data}
167 171 msg = self.session.send(self.pub_socket, u'stream', content=content,
168 172 parent=self.parent_header, ident=self.topic)
169 173
170 174 if hasattr(self.pub_socket, 'flush'):
171 175 # socket itself has flush (presumably ZMQStream)
172 176 self.pub_socket.flush()
173 177 else:
174 178 with self._pipe_out_lock:
175 179 string = self._flush_buffer()
176 180 tracker = self._pipe_out.send_multipart([
177 181 self._pipe_uuid,
178 182 string.encode(self.encoding, 'replace'),
179 183 ], copy=False, track=True)
180 184 try:
181 185 tracker.wait(1)
182 186 except:
183 187 pass
184 188
185 189 def isatty(self):
186 190 return False
187 191
188 192 def __next__(self):
189 193 raise IOError('Read not supported on a write only stream.')
190 194
191 195 if not py3compat.PY3:
192 196 next = __next__
193 197
194 198 def read(self, size=-1):
195 199 raise IOError('Read not supported on a write only stream.')
196 200
197 201 def readline(self, size=-1):
198 202 raise IOError('Read not supported on a write only stream.')
199 203
200 204 def fileno(self):
201 205 raise UnsupportedOperation("IOStream has no fileno.")
202 206
203 207 def write(self, string):
204 208 if self.pub_socket is None:
205 209 raise ValueError('I/O operation on closed file')
206 210 else:
207 211 # Make sure that we're handling unicode
208 212 if not isinstance(string, unicode_type):
209 213 string = string.decode(self.encoding, 'replace')
210 214
211 215 is_child = (self._check_mp_mode() == CHILD)
212 216 self._buffer.write(string)
213 217 if is_child:
214 218 # newlines imply flush in subprocesses
215 219 # mp.Pool cannot be trusted to flush promptly (or ever),
216 220 # and this helps.
217 221 if '\n' in string:
218 222 self.flush()
219 223 # do we want to check subprocess flushes on write?
220 224 # self._flush_from_subprocesses()
221 225 current_time = time.time()
222 226 if self._start < 0:
223 227 self._start = current_time
224 228 elif current_time - self._start > self.flush_interval:
225 229 self.flush()
226 230
227 231 def writelines(self, sequence):
228 232 if self.pub_socket is None:
229 233 raise ValueError('I/O operation on closed file')
230 234 else:
231 235 for string in sequence:
232 236 self.write(string)
233 237
234 238 def _flush_buffer(self):
235 239 """clear the current buffer and return the current buffer data"""
236 240 data = u''
237 241 if self._buffer is not None:
238 242 data = self._buffer.getvalue()
239 243 self._buffer.close()
240 244 self._new_buffer()
241 245 return data
242 246
243 247 def _new_buffer(self):
244 248 self._buffer = StringIO()
245 249 self._start = -1
General Comments 0
You need to be logged in to leave comments. Login now