##// END OF EJS Templates
survive failure to bind to localhost in zmq.iostream...
MinRK -
Show More
@@ -1,221 +1,231 b''
1 """wrappers for stdout/stderr forwarding over zmq
1 """wrappers for stdout/stderr forwarding over zmq
2 """
2 """
3
3
4 #-----------------------------------------------------------------------------
4 #-----------------------------------------------------------------------------
5 # Copyright (C) 2013 The IPython Development Team
5 # Copyright (C) 2013 The IPython Development Team
6 #
6 #
7 # Distributed under the terms of the BSD License. The full license is in
7 # Distributed under the terms of the BSD License. The full license is in
8 # the file COPYING, distributed as part of this software.
8 # the file COPYING, distributed as part of this software.
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10
10
11 import os
11 import os
12 import threading
12 import threading
13 import time
13 import time
14 import uuid
14 import uuid
15 from io import StringIO, UnsupportedOperation
15 from io import StringIO, UnsupportedOperation
16
16
17 import zmq
17 import zmq
18
18
19 from .session import extract_header
19 from .session import extract_header
20
20
21 from IPython.utils import py3compat
21 from IPython.utils import py3compat
22 from IPython.utils.py3compat import unicode_type
22 from IPython.utils.py3compat import unicode_type
23 from IPython.utils.warn import warn
23
24
24 #-----------------------------------------------------------------------------
25 #-----------------------------------------------------------------------------
25 # Globals
26 # Globals
26 #-----------------------------------------------------------------------------
27 #-----------------------------------------------------------------------------
27
28
28 MASTER = 0
29 MASTER = 0
29 CHILD = 1
30 CHILD = 1
30
31
31 #-----------------------------------------------------------------------------
32 #-----------------------------------------------------------------------------
32 # Stream classes
33 # Stream classes
33 #-----------------------------------------------------------------------------
34 #-----------------------------------------------------------------------------
34
35
35 class OutStream(object):
36 class OutStream(object):
36 """A file like object that publishes the stream to a 0MQ PUB socket."""
37 """A file like object that publishes the stream to a 0MQ PUB socket."""
37
38
38 # The time interval between automatic flushes, in seconds.
39 # The time interval between automatic flushes, in seconds.
39 _subprocess_flush_limit = 256
40 _subprocess_flush_limit = 256
40 flush_interval = 0.05
41 flush_interval = 0.05
41 topic=None
42 topic=None
42
43
43 def __init__(self, session, pub_socket, name, pipe=True):
44 def __init__(self, session, pub_socket, name, pipe=True):
44 self.encoding = 'UTF-8'
45 self.encoding = 'UTF-8'
45 self.session = session
46 self.session = session
46 self.pub_socket = pub_socket
47 self.pub_socket = pub_socket
47 self.name = name
48 self.name = name
48 self.topic = b'stream.' + py3compat.cast_bytes(name)
49 self.topic = b'stream.' + py3compat.cast_bytes(name)
49 self.parent_header = {}
50 self.parent_header = {}
50 self._new_buffer()
51 self._new_buffer()
51 self._buffer_lock = threading.Lock()
52 self._buffer_lock = threading.Lock()
52 self._master_pid = os.getpid()
53 self._master_pid = os.getpid()
53 self._master_thread = threading.current_thread().ident
54 self._master_thread = threading.current_thread().ident
54 self._pipe_pid = os.getpid()
55 self._pipe_pid = os.getpid()
55 self._pipe_flag = pipe
56 self._pipe_flag = pipe
56 if pipe:
57 if pipe:
57 self._setup_pipe_in()
58 self._setup_pipe_in()
58
59
59 def _setup_pipe_in(self):
60 def _setup_pipe_in(self):
60 """setup listening pipe for subprocesses"""
61 """setup listening pipe for subprocesses"""
61 ctx = self.pub_socket.context
62 ctx = self.pub_socket.context
62
63
63 # use UUID to authenticate pipe messages
64 # use UUID to authenticate pipe messages
64 self._pipe_uuid = uuid.uuid4().bytes
65 self._pipe_uuid = uuid.uuid4().bytes
65
66
66 self._pipe_in = ctx.socket(zmq.PULL)
67 self._pipe_in = ctx.socket(zmq.PULL)
67 self._pipe_in.linger = 0
68 self._pipe_in.linger = 0
68 self._pipe_port = self._pipe_in.bind_to_random_port("tcp://127.0.0.1")
69 try:
70 self._pipe_port = self._pipe_in.bind_to_random_port("tcp://127.0.0.1")
71 except zmq.ZMQError as e:
72 warn("Couldn't bind IOStream to 127.0.0.1: %s" % e +
73 "\nsubprocess output will be unavailable."
74 )
75 self._pipe_flag = False
76 self._pipe_in.close()
77 del self._pipe_in
78 return
69 self._pipe_poller = zmq.Poller()
79 self._pipe_poller = zmq.Poller()
70 self._pipe_poller.register(self._pipe_in, zmq.POLLIN)
80 self._pipe_poller.register(self._pipe_in, zmq.POLLIN)
71
81
72 def _setup_pipe_out(self):
82 def _setup_pipe_out(self):
73 # must be new context after fork
83 # must be new context after fork
74 ctx = zmq.Context()
84 ctx = zmq.Context()
75 self._pipe_pid = os.getpid()
85 self._pipe_pid = os.getpid()
76 self._pipe_out = ctx.socket(zmq.PUSH)
86 self._pipe_out = ctx.socket(zmq.PUSH)
77 self._pipe_out_lock = threading.Lock()
87 self._pipe_out_lock = threading.Lock()
78 self._pipe_out.connect("tcp://127.0.0.1:%i" % self._pipe_port)
88 self._pipe_out.connect("tcp://127.0.0.1:%i" % self._pipe_port)
79
89
80 def _is_master_process(self):
90 def _is_master_process(self):
81 return os.getpid() == self._master_pid
91 return os.getpid() == self._master_pid
82
92
83 def _is_master_thread(self):
93 def _is_master_thread(self):
84 return threading.current_thread().ident == self._master_thread
94 return threading.current_thread().ident == self._master_thread
85
95
86 def _have_pipe_out(self):
96 def _have_pipe_out(self):
87 return os.getpid() == self._pipe_pid
97 return os.getpid() == self._pipe_pid
88
98
89 def _check_mp_mode(self):
99 def _check_mp_mode(self):
90 """check for forks, and switch to zmq pipeline if necessary"""
100 """check for forks, and switch to zmq pipeline if necessary"""
91 if not self._pipe_flag or self._is_master_process():
101 if not self._pipe_flag or self._is_master_process():
92 return MASTER
102 return MASTER
93 else:
103 else:
94 if not self._have_pipe_out():
104 if not self._have_pipe_out():
95 self._flush_buffer()
105 self._flush_buffer()
96 # setup a new out pipe
106 # setup a new out pipe
97 self._setup_pipe_out()
107 self._setup_pipe_out()
98 return CHILD
108 return CHILD
99
109
100 def set_parent(self, parent):
110 def set_parent(self, parent):
101 self.parent_header = extract_header(parent)
111 self.parent_header = extract_header(parent)
102
112
103 def close(self):
113 def close(self):
104 self.pub_socket = None
114 self.pub_socket = None
105
115
106 def _flush_from_subprocesses(self):
116 def _flush_from_subprocesses(self):
107 """flush possible pub data from subprocesses into my buffer"""
117 """flush possible pub data from subprocesses into my buffer"""
108 if not self._pipe_flag or not self._is_master_process():
118 if not self._pipe_flag or not self._is_master_process():
109 return
119 return
110 for i in range(self._subprocess_flush_limit):
120 for i in range(self._subprocess_flush_limit):
111 if self._pipe_poller.poll(0):
121 if self._pipe_poller.poll(0):
112 msg = self._pipe_in.recv_multipart()
122 msg = self._pipe_in.recv_multipart()
113 if msg[0] != self._pipe_uuid:
123 if msg[0] != self._pipe_uuid:
114 continue
124 continue
115 else:
125 else:
116 self._buffer.write(msg[1].decode(self.encoding, 'replace'))
126 self._buffer.write(msg[1].decode(self.encoding, 'replace'))
117 # this always means a flush,
127 # this always means a flush,
118 # so reset our timer
128 # so reset our timer
119 self._start = 0
129 self._start = 0
120 else:
130 else:
121 break
131 break
122
132
123 def flush(self):
133 def flush(self):
124 """trigger actual zmq send"""
134 """trigger actual zmq send"""
125 if self.pub_socket is None:
135 if self.pub_socket is None:
126 raise ValueError(u'I/O operation on closed file')
136 raise ValueError(u'I/O operation on closed file')
127
137
128 mp_mode = self._check_mp_mode()
138 mp_mode = self._check_mp_mode()
129
139
130 if mp_mode != CHILD:
140 if mp_mode != CHILD:
131 # we are master
141 # we are master
132 if not self._is_master_thread():
142 if not self._is_master_thread():
133 # sub-threads must not trigger flush,
143 # sub-threads must not trigger flush,
134 # but at least they can force the timer.
144 # but at least they can force the timer.
135 self._start = 0
145 self._start = 0
136 return
146 return
137
147
138 self._flush_from_subprocesses()
148 self._flush_from_subprocesses()
139 data = self._flush_buffer()
149 data = self._flush_buffer()
140
150
141 if data:
151 if data:
142 content = {u'name':self.name, u'data':data}
152 content = {u'name':self.name, u'data':data}
143 msg = self.session.send(self.pub_socket, u'stream', content=content,
153 msg = self.session.send(self.pub_socket, u'stream', content=content,
144 parent=self.parent_header, ident=self.topic)
154 parent=self.parent_header, ident=self.topic)
145
155
146 if hasattr(self.pub_socket, 'flush'):
156 if hasattr(self.pub_socket, 'flush'):
147 # socket itself has flush (presumably ZMQStream)
157 # socket itself has flush (presumably ZMQStream)
148 self.pub_socket.flush()
158 self.pub_socket.flush()
149 else:
159 else:
150 with self._pipe_out_lock:
160 with self._pipe_out_lock:
151 string = self._flush_buffer()
161 string = self._flush_buffer()
152 tracker = self._pipe_out.send_multipart([
162 tracker = self._pipe_out.send_multipart([
153 self._pipe_uuid,
163 self._pipe_uuid,
154 string.encode(self.encoding, 'replace'),
164 string.encode(self.encoding, 'replace'),
155 ], copy=False, track=True)
165 ], copy=False, track=True)
156 try:
166 try:
157 tracker.wait(1)
167 tracker.wait(1)
158 except:
168 except:
159 pass
169 pass
160
170
161 def isatty(self):
171 def isatty(self):
162 return False
172 return False
163
173
164 def __next__(self):
174 def __next__(self):
165 raise IOError('Read not supported on a write only stream.')
175 raise IOError('Read not supported on a write only stream.')
166
176
167 if not py3compat.PY3:
177 if not py3compat.PY3:
168 next = __next__
178 next = __next__
169
179
170 def read(self, size=-1):
180 def read(self, size=-1):
171 raise IOError('Read not supported on a write only stream.')
181 raise IOError('Read not supported on a write only stream.')
172
182
173 def readline(self, size=-1):
183 def readline(self, size=-1):
174 raise IOError('Read not supported on a write only stream.')
184 raise IOError('Read not supported on a write only stream.')
175
185
176 def fileno(self):
186 def fileno(self):
177 raise UnsupportedOperation("IOStream has no fileno.")
187 raise UnsupportedOperation("IOStream has no fileno.")
178
188
179 def write(self, string):
189 def write(self, string):
180 if self.pub_socket is None:
190 if self.pub_socket is None:
181 raise ValueError('I/O operation on closed file')
191 raise ValueError('I/O operation on closed file')
182 else:
192 else:
183 # Make sure that we're handling unicode
193 # Make sure that we're handling unicode
184 if not isinstance(string, unicode_type):
194 if not isinstance(string, unicode_type):
185 string = string.decode(self.encoding, 'replace')
195 string = string.decode(self.encoding, 'replace')
186
196
187 is_child = (self._check_mp_mode() == CHILD)
197 is_child = (self._check_mp_mode() == CHILD)
188 self._buffer.write(string)
198 self._buffer.write(string)
189 if is_child:
199 if is_child:
190 # newlines imply flush in subprocesses
200 # newlines imply flush in subprocesses
191 # mp.Pool cannot be trusted to flush promptly (or ever),
201 # mp.Pool cannot be trusted to flush promptly (or ever),
192 # and this helps.
202 # and this helps.
193 if '\n' in string:
203 if '\n' in string:
194 self.flush()
204 self.flush()
195 # do we want to check subprocess flushes on write?
205 # do we want to check subprocess flushes on write?
196 # self._flush_from_subprocesses()
206 # self._flush_from_subprocesses()
197 current_time = time.time()
207 current_time = time.time()
198 if self._start < 0:
208 if self._start < 0:
199 self._start = current_time
209 self._start = current_time
200 elif current_time - self._start > self.flush_interval:
210 elif current_time - self._start > self.flush_interval:
201 self.flush()
211 self.flush()
202
212
203 def writelines(self, sequence):
213 def writelines(self, sequence):
204 if self.pub_socket is None:
214 if self.pub_socket is None:
205 raise ValueError('I/O operation on closed file')
215 raise ValueError('I/O operation on closed file')
206 else:
216 else:
207 for string in sequence:
217 for string in sequence:
208 self.write(string)
218 self.write(string)
209
219
210 def _flush_buffer(self):
220 def _flush_buffer(self):
211 """clear the current buffer and return the current buffer data"""
221 """clear the current buffer and return the current buffer data"""
212 data = u''
222 data = u''
213 if self._buffer is not None:
223 if self._buffer is not None:
214 data = self._buffer.getvalue()
224 data = self._buffer.getvalue()
215 self._buffer.close()
225 self._buffer.close()
216 self._new_buffer()
226 self._new_buffer()
217 return data
227 return data
218
228
219 def _new_buffer(self):
229 def _new_buffer(self):
220 self._buffer = StringIO()
230 self._buffer = StringIO()
221 self._start = -1
231 self._start = -1
General Comments 0
You need to be logged in to leave comments. Login now