##// END OF EJS Templates
survive failure to bind to localhost in zmq.iostream...
MinRK -
Show More
@@ -1,221 +1,231 b''
1 1 """wrappers for stdout/stderr forwarding over zmq
2 2 """
3 3
4 4 #-----------------------------------------------------------------------------
5 5 # Copyright (C) 2013 The IPython Development Team
6 6 #
7 7 # Distributed under the terms of the BSD License. The full license is in
8 8 # the file COPYING, distributed as part of this software.
9 9 #-----------------------------------------------------------------------------
10 10
11 11 import os
12 12 import threading
13 13 import time
14 14 import uuid
15 15 from io import StringIO, UnsupportedOperation
16 16
17 17 import zmq
18 18
19 19 from .session import extract_header
20 20
21 21 from IPython.utils import py3compat
22 22 from IPython.utils.py3compat import unicode_type
23 from IPython.utils.warn import warn
23 24
24 25 #-----------------------------------------------------------------------------
25 26 # Globals
26 27 #-----------------------------------------------------------------------------
27 28
28 29 MASTER = 0
29 30 CHILD = 1
30 31
31 32 #-----------------------------------------------------------------------------
32 33 # Stream classes
33 34 #-----------------------------------------------------------------------------
34 35
35 36 class OutStream(object):
36 37 """A file like object that publishes the stream to a 0MQ PUB socket."""
37 38
38 39 # The time interval between automatic flushes, in seconds.
39 40 _subprocess_flush_limit = 256
40 41 flush_interval = 0.05
41 42 topic=None
42 43
43 44 def __init__(self, session, pub_socket, name, pipe=True):
44 45 self.encoding = 'UTF-8'
45 46 self.session = session
46 47 self.pub_socket = pub_socket
47 48 self.name = name
48 49 self.topic = b'stream.' + py3compat.cast_bytes(name)
49 50 self.parent_header = {}
50 51 self._new_buffer()
51 52 self._buffer_lock = threading.Lock()
52 53 self._master_pid = os.getpid()
53 54 self._master_thread = threading.current_thread().ident
54 55 self._pipe_pid = os.getpid()
55 56 self._pipe_flag = pipe
56 57 if pipe:
57 58 self._setup_pipe_in()
58 59
59 60 def _setup_pipe_in(self):
60 61 """setup listening pipe for subprocesses"""
61 62 ctx = self.pub_socket.context
62 63
63 64 # use UUID to authenticate pipe messages
64 65 self._pipe_uuid = uuid.uuid4().bytes
65 66
66 67 self._pipe_in = ctx.socket(zmq.PULL)
67 68 self._pipe_in.linger = 0
69 try:
68 70 self._pipe_port = self._pipe_in.bind_to_random_port("tcp://127.0.0.1")
71 except zmq.ZMQError as e:
72 warn("Couldn't bind IOStream to 127.0.0.1: %s" % e +
73 "\nsubprocess output will be unavailable."
74 )
75 self._pipe_flag = False
76 self._pipe_in.close()
77 del self._pipe_in
78 return
69 79 self._pipe_poller = zmq.Poller()
70 80 self._pipe_poller.register(self._pipe_in, zmq.POLLIN)
71 81
72 82 def _setup_pipe_out(self):
73 83 # must be new context after fork
74 84 ctx = zmq.Context()
75 85 self._pipe_pid = os.getpid()
76 86 self._pipe_out = ctx.socket(zmq.PUSH)
77 87 self._pipe_out_lock = threading.Lock()
78 88 self._pipe_out.connect("tcp://127.0.0.1:%i" % self._pipe_port)
79 89
80 90 def _is_master_process(self):
81 91 return os.getpid() == self._master_pid
82 92
83 93 def _is_master_thread(self):
84 94 return threading.current_thread().ident == self._master_thread
85 95
86 96 def _have_pipe_out(self):
87 97 return os.getpid() == self._pipe_pid
88 98
89 99 def _check_mp_mode(self):
90 100 """check for forks, and switch to zmq pipeline if necessary"""
91 101 if not self._pipe_flag or self._is_master_process():
92 102 return MASTER
93 103 else:
94 104 if not self._have_pipe_out():
95 105 self._flush_buffer()
96 106 # setup a new out pipe
97 107 self._setup_pipe_out()
98 108 return CHILD
99 109
100 110 def set_parent(self, parent):
101 111 self.parent_header = extract_header(parent)
102 112
103 113 def close(self):
104 114 self.pub_socket = None
105 115
106 116 def _flush_from_subprocesses(self):
107 117 """flush possible pub data from subprocesses into my buffer"""
108 118 if not self._pipe_flag or not self._is_master_process():
109 119 return
110 120 for i in range(self._subprocess_flush_limit):
111 121 if self._pipe_poller.poll(0):
112 122 msg = self._pipe_in.recv_multipart()
113 123 if msg[0] != self._pipe_uuid:
114 124 continue
115 125 else:
116 126 self._buffer.write(msg[1].decode(self.encoding, 'replace'))
117 127 # this always means a flush,
118 128 # so reset our timer
119 129 self._start = 0
120 130 else:
121 131 break
122 132
123 133 def flush(self):
124 134 """trigger actual zmq send"""
125 135 if self.pub_socket is None:
126 136 raise ValueError(u'I/O operation on closed file')
127 137
128 138 mp_mode = self._check_mp_mode()
129 139
130 140 if mp_mode != CHILD:
131 141 # we are master
132 142 if not self._is_master_thread():
133 143 # sub-threads must not trigger flush,
134 144 # but at least they can force the timer.
135 145 self._start = 0
136 146 return
137 147
138 148 self._flush_from_subprocesses()
139 149 data = self._flush_buffer()
140 150
141 151 if data:
142 152 content = {u'name':self.name, u'data':data}
143 153 msg = self.session.send(self.pub_socket, u'stream', content=content,
144 154 parent=self.parent_header, ident=self.topic)
145 155
146 156 if hasattr(self.pub_socket, 'flush'):
147 157 # socket itself has flush (presumably ZMQStream)
148 158 self.pub_socket.flush()
149 159 else:
150 160 with self._pipe_out_lock:
151 161 string = self._flush_buffer()
152 162 tracker = self._pipe_out.send_multipart([
153 163 self._pipe_uuid,
154 164 string.encode(self.encoding, 'replace'),
155 165 ], copy=False, track=True)
156 166 try:
157 167 tracker.wait(1)
158 168 except:
159 169 pass
160 170
161 171 def isatty(self):
162 172 return False
163 173
164 174 def __next__(self):
165 175 raise IOError('Read not supported on a write only stream.')
166 176
167 177 if not py3compat.PY3:
168 178 next = __next__
169 179
170 180 def read(self, size=-1):
171 181 raise IOError('Read not supported on a write only stream.')
172 182
173 183 def readline(self, size=-1):
174 184 raise IOError('Read not supported on a write only stream.')
175 185
176 186 def fileno(self):
177 187 raise UnsupportedOperation("IOStream has no fileno.")
178 188
179 189 def write(self, string):
180 190 if self.pub_socket is None:
181 191 raise ValueError('I/O operation on closed file')
182 192 else:
183 193 # Make sure that we're handling unicode
184 194 if not isinstance(string, unicode_type):
185 195 string = string.decode(self.encoding, 'replace')
186 196
187 197 is_child = (self._check_mp_mode() == CHILD)
188 198 self._buffer.write(string)
189 199 if is_child:
190 200 # newlines imply flush in subprocesses
191 201 # mp.Pool cannot be trusted to flush promptly (or ever),
192 202 # and this helps.
193 203 if '\n' in string:
194 204 self.flush()
195 205 # do we want to check subprocess flushes on write?
196 206 # self._flush_from_subprocesses()
197 207 current_time = time.time()
198 208 if self._start < 0:
199 209 self._start = current_time
200 210 elif current_time - self._start > self.flush_interval:
201 211 self.flush()
202 212
203 213 def writelines(self, sequence):
204 214 if self.pub_socket is None:
205 215 raise ValueError('I/O operation on closed file')
206 216 else:
207 217 for string in sequence:
208 218 self.write(string)
209 219
210 220 def _flush_buffer(self):
211 221 """clear the current buffer and return the current buffer data"""
212 222 data = u''
213 223 if self._buffer is not None:
214 224 data = self._buffer.getvalue()
215 225 self._buffer.close()
216 226 self._new_buffer()
217 227 return data
218 228
219 229 def _new_buffer(self):
220 230 self._buffer = StringIO()
221 231 self._start = -1
General Comments 0
You need to be logged in to leave comments. Login now