##// END OF EJS Templates
flush on newline in subprocesses...
MinRK -
Show More
@@ -1,205 +1,213 b''
1 """wrappers for stdout/stderr forwarding over zmq
1 """wrappers for stdout/stderr forwarding over zmq
2 """
2 """
3
3
4 #-----------------------------------------------------------------------------
4 #-----------------------------------------------------------------------------
5 # Copyright (C) 2013 The IPython Development Team
5 # Copyright (C) 2013 The IPython Development Team
6 #
6 #
7 # Distributed under the terms of the BSD License. The full license is in
7 # Distributed under the terms of the BSD License. The full license is in
8 # the file COPYING, distributed as part of this software.
8 # the file COPYING, distributed as part of this software.
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10
10
11 import sys
11 import sys
12 import time
12 import time
13 import os
13 import os
14 import threading
14 import threading
15 import uuid
15 import uuid
16 from io import StringIO
16 from io import StringIO
17
17
18 import zmq
18 import zmq
19
19
20 from session import extract_header
20 from session import extract_header
21
21
22 from IPython.utils import py3compat
22 from IPython.utils import py3compat
23
23
24 #-----------------------------------------------------------------------------
24 #-----------------------------------------------------------------------------
25 # Globals
25 # Globals
26 #-----------------------------------------------------------------------------
26 #-----------------------------------------------------------------------------
27
27
28 MASTER = 0
28 MASTER = 0
29 CHILD = 1
29 CHILD = 1
30
30
31 #-----------------------------------------------------------------------------
31 #-----------------------------------------------------------------------------
32 # Stream classes
32 # Stream classes
33 #-----------------------------------------------------------------------------
33 #-----------------------------------------------------------------------------
34
34
35 class OutStream(object):
35 class OutStream(object):
36 """A file like object that publishes the stream to a 0MQ PUB socket."""
36 """A file like object that publishes the stream to a 0MQ PUB socket."""
37
37
38 # The time interval between automatic flushes, in seconds.
38 # The time interval between automatic flushes, in seconds.
39 flush_interval = 0.05
39 flush_interval = 0.05
40 topic=None
40 topic=None
41
41
42 def __init__(self, session, pub_socket, name, pipe=True):
42 def __init__(self, session, pub_socket, name, pipe=True):
43 self.encoding = 'UTF-8'
43 self.encoding = 'UTF-8'
44 self.session = session
44 self.session = session
45 self.pub_socket = pub_socket
45 self.pub_socket = pub_socket
46 self.name = name
46 self.name = name
47 self.parent_header = {}
47 self.parent_header = {}
48 self._new_buffer()
48 self._new_buffer()
49 self._buffer_lock = threading.Lock()
49 self._buffer_lock = threading.Lock()
50 self._master_pid = os.getpid()
50 self._master_pid = os.getpid()
51 self._master_thread = threading.current_thread().ident
51 self._master_thread = threading.current_thread().ident
52 self._pipe_pid = os.getpid()
52 self._pipe_pid = os.getpid()
53 self._pipe_flag = pipe
53 self._pipe_flag = pipe
54 if pipe:
54 if pipe:
55 self._setup_pipe_in()
55 self._setup_pipe_in()
56
56
57 def _setup_pipe_in(self):
57 def _setup_pipe_in(self):
58 """setup listening pipe for subprocesses"""
58 """setup listening pipe for subprocesses"""
59 ctx = self.pub_socket.context
59 ctx = self.pub_socket.context
60
60
61 # use UUID to authenticate pipe messages
61 # use UUID to authenticate pipe messages
62 self._pipe_uuid = uuid.uuid4().bytes
62 self._pipe_uuid = uuid.uuid4().bytes
63
63
64 self._pipe_in = ctx.socket(zmq.PULL)
64 self._pipe_in = ctx.socket(zmq.PULL)
65 self._pipe_in.linger = 0
65 self._pipe_in.linger = 0
66 self._pipe_port = self._pipe_in.bind_to_random_port("tcp://127.0.0.1")
66 self._pipe_port = self._pipe_in.bind_to_random_port("tcp://127.0.0.1")
67 self._pipe_poller = zmq.Poller()
67 self._pipe_poller = zmq.Poller()
68 self._pipe_poller.register(self._pipe_in, zmq.POLLIN)
68 self._pipe_poller.register(self._pipe_in, zmq.POLLIN)
69
69
70 def _setup_pipe_out(self):
70 def _setup_pipe_out(self):
71 # must be new context after fork
71 # must be new context after fork
72 ctx = zmq.Context()
72 ctx = zmq.Context()
73 self._pipe_pid = os.getpid()
73 self._pipe_pid = os.getpid()
74 self._pipe_out = ctx.socket(zmq.PUSH)
74 self._pipe_out = ctx.socket(zmq.PUSH)
75 self._pipe_out_lock = threading.Lock()
75 self._pipe_out_lock = threading.Lock()
76 self._pipe_out.connect("tcp://127.0.0.1:%i" % self._pipe_port)
76 self._pipe_out.connect("tcp://127.0.0.1:%i" % self._pipe_port)
77
77
78 def _is_master_process(self):
78 def _is_master_process(self):
79 return os.getpid() == self._master_pid
79 return os.getpid() == self._master_pid
80
80
81 def _is_master_thread(self):
81 def _is_master_thread(self):
82 return threading.current_thread().ident == self._master_thread
82 return threading.current_thread().ident == self._master_thread
83
83
84 def _have_pipe_out(self):
84 def _have_pipe_out(self):
85 return os.getpid() == self._pipe_pid
85 return os.getpid() == self._pipe_pid
86
86
87 def _check_mp_mode(self):
87 def _check_mp_mode(self):
88 """check for forks, and switch to zmq pipeline if necessary"""
88 """check for forks, and switch to zmq pipeline if necessary"""
89 if not self._pipe_flag or self._is_master_process():
89 if not self._pipe_flag or self._is_master_process():
90 return MASTER
90 return MASTER
91 else:
91 else:
92 if not self._have_pipe_out():
92 if not self._have_pipe_out():
93 self._flush_buffer()
93 # setup a new out pipe
94 # setup a new out pipe
94 self._setup_pipe_out()
95 self._setup_pipe_out()
95 return CHILD
96 return CHILD
96
97
97 def set_parent(self, parent):
98 def set_parent(self, parent):
98 self.parent_header = extract_header(parent)
99 self.parent_header = extract_header(parent)
99
100
100 def close(self):
101 def close(self):
101 self.pub_socket = None
102 self.pub_socket = None
102
103
103 def _flush_from_subprocesses(self):
104 def _flush_from_subprocesses(self):
104 """flush possible pub data from subprocesses into my buffer"""
105 """flush possible pub data from subprocesses into my buffer"""
105 if not self._pipe_flag or not self._is_master_process():
106 if not self._pipe_flag or not self._is_master_process():
106 return
107 return
107 for i in range(100):
108 for i in range(100):
108 if self._pipe_poller.poll(0):
109 if self._pipe_poller.poll(0):
109 msg = self._pipe_in.recv_multipart()
110 msg = self._pipe_in.recv_multipart()
110 if msg[0] != self._pipe_uuid:
111 if msg[0] != self._pipe_uuid:
111 continue
112 continue
112 else:
113 else:
113 self._buffer.write(msg[1].decode(self.encoding, 'replace'))
114 self._buffer.write(msg[1].decode(self.encoding, 'replace'))
114 # this always means a flush,
115 # this always means a flush,
115 # so reset our timer
116 # so reset our timer
116 self._start = 0
117 self._start = 0
117 else:
118 else:
118 break
119 break
119
120
120 def flush(self):
121 def flush(self):
121 """trigger actual zmq send"""
122 """trigger actual zmq send"""
122 if self.pub_socket is None:
123 if self.pub_socket is None:
123 raise ValueError(u'I/O operation on closed file')
124 raise ValueError(u'I/O operation on closed file')
124
125
125 mp_mode = self._check_mp_mode()
126 mp_mode = self._check_mp_mode()
126
127
127 if mp_mode != CHILD:
128 if mp_mode != CHILD:
128 # we are master
129 # we are master
129 if not self._is_master_thread():
130 if not self._is_master_thread():
130 # sub-threads must not trigger flush,
131 # sub-threads must not trigger flush,
131 # but at least they can force the timer.
132 # but at least they can force the timer.
132 self._start = 0
133 self._start = 0
133 return
134 return
134
135
135 self._flush_from_subprocesses()
136 self._flush_from_subprocesses()
136 data = self._flush_buffer()
137 data = self._flush_buffer()
137
138
138 if data:
139 if data:
139 content = {u'name':self.name, u'data':data}
140 content = {u'name':self.name, u'data':data}
140 msg = self.session.send(self.pub_socket, u'stream', content=content,
141 msg = self.session.send(self.pub_socket, u'stream', content=content,
141 parent=self.parent_header, ident=self.topic)
142 parent=self.parent_header, ident=self.topic)
142
143
143 if hasattr(self.pub_socket, 'flush'):
144 if hasattr(self.pub_socket, 'flush'):
144 # socket itself has flush (presumably ZMQStream)
145 # socket itself has flush (presumably ZMQStream)
145 self.pub_socket.flush()
146 self.pub_socket.flush()
146 else:
147 else:
147 with self._pipe_out_lock:
148 with self._pipe_out_lock:
148 string = self._flush_buffer()
149 string = self._flush_buffer()
149 tracker = self._pipe_out.send_multipart([
150 tracker = self._pipe_out.send_multipart([
150 self._pipe_uuid,
151 self._pipe_uuid,
151 string.encode(self.encoding, 'replace'),
152 string.encode(self.encoding, 'replace'),
152 ], copy=False, track=True)
153 ], copy=False, track=True)
153 tracker.wait(1)
154 tracker.wait(1)
154
155
155 def isatty(self):
156 def isatty(self):
156 return False
157 return False
157
158
158 def __next__(self):
159 def __next__(self):
159 raise IOError('Read not supported on a write only stream.')
160 raise IOError('Read not supported on a write only stream.')
160
161
161 if not py3compat.PY3:
162 if not py3compat.PY3:
162 next = __next__
163 next = __next__
163
164
164 def read(self, size=-1):
165 def read(self, size=-1):
165 raise IOError('Read not supported on a write only stream.')
166 raise IOError('Read not supported on a write only stream.')
166
167
167 def readline(self, size=-1):
168 def readline(self, size=-1):
168 raise IOError('Read not supported on a write only stream.')
169 raise IOError('Read not supported on a write only stream.')
169
170
170 def write(self, string):
171 def write(self, string):
171 if self.pub_socket is None:
172 if self.pub_socket is None:
172 raise ValueError('I/O operation on closed file')
173 raise ValueError('I/O operation on closed file')
173 else:
174 else:
174 # Make sure that we're handling unicode
175 # Make sure that we're handling unicode
175 if not isinstance(string, unicode):
176 if not isinstance(string, unicode):
176 string = string.decode(self.encoding, 'replace')
177 string = string.decode(self.encoding, 'replace')
178
179 is_child = (self._check_mp_mode() == CHILD)
177 self._buffer.write(string)
180 self._buffer.write(string)
178 self._check_mp_mode()
181 if is_child:
182 # newlines imply flush in subprocesses
183 # mp.Pool cannot be trusted to flush promptly (or ever),
184 # and this helps.
185 if '\n' in string:
186 self.flush()
179 # do we want to check subprocess flushes on write?
187 # do we want to check subprocess flushes on write?
180 # self._flush_from_subprocesses()
188 # self._flush_from_subprocesses()
181 current_time = time.time()
189 current_time = time.time()
182 if self._start < 0:
190 if self._start < 0:
183 self._start = current_time
191 self._start = current_time
184 elif current_time - self._start > self.flush_interval:
192 elif current_time - self._start > self.flush_interval:
185 self.flush()
193 self.flush()
186
194
187 def writelines(self, sequence):
195 def writelines(self, sequence):
188 if self.pub_socket is None:
196 if self.pub_socket is None:
189 raise ValueError('I/O operation on closed file')
197 raise ValueError('I/O operation on closed file')
190 else:
198 else:
191 for string in sequence:
199 for string in sequence:
192 self.write(string)
200 self.write(string)
193
201
194 def _flush_buffer(self):
202 def _flush_buffer(self):
195 """clear the current buffer and return the current buffer data"""
203 """clear the current buffer and return the current buffer data"""
196 data = u''
204 data = u''
197 if self._buffer is not None:
205 if self._buffer is not None:
198 data = self._buffer.getvalue()
206 data = self._buffer.getvalue()
199 self._buffer.close()
207 self._buffer.close()
200 self._new_buffer()
208 self._new_buffer()
201 return data
209 return data
202
210
203 def _new_buffer(self):
211 def _new_buffer(self):
204 self._buffer = StringIO()
212 self._buffer = StringIO()
205 self._start = -1
213 self._start = -1
General Comments 0
You need to be logged in to leave comments. Login now