##// END OF EJS Templates
flush on newline in subprocesses...
MinRK -
Show More
@@ -1,205 +1,213 b''
1 1 """wrappers for stdout/stderr forwarding over zmq
2 2 """
3 3
4 4 #-----------------------------------------------------------------------------
5 5 # Copyright (C) 2013 The IPython Development Team
6 6 #
7 7 # Distributed under the terms of the BSD License. The full license is in
8 8 # the file COPYING, distributed as part of this software.
9 9 #-----------------------------------------------------------------------------
10 10
11 11 import sys
12 12 import time
13 13 import os
14 14 import threading
15 15 import uuid
16 16 from io import StringIO
17 17
18 18 import zmq
19 19
20 20 from session import extract_header
21 21
22 22 from IPython.utils import py3compat
23 23
24 24 #-----------------------------------------------------------------------------
25 25 # Globals
26 26 #-----------------------------------------------------------------------------
27 27
28 28 MASTER = 0
29 29 CHILD = 1
30 30
31 31 #-----------------------------------------------------------------------------
32 32 # Stream classes
33 33 #-----------------------------------------------------------------------------
34 34
35 35 class OutStream(object):
36 36 """A file like object that publishes the stream to a 0MQ PUB socket."""
37 37
38 38 # The time interval between automatic flushes, in seconds.
39 39 flush_interval = 0.05
40 40 topic=None
41 41
42 42 def __init__(self, session, pub_socket, name, pipe=True):
43 43 self.encoding = 'UTF-8'
44 44 self.session = session
45 45 self.pub_socket = pub_socket
46 46 self.name = name
47 47 self.parent_header = {}
48 48 self._new_buffer()
49 49 self._buffer_lock = threading.Lock()
50 50 self._master_pid = os.getpid()
51 51 self._master_thread = threading.current_thread().ident
52 52 self._pipe_pid = os.getpid()
53 53 self._pipe_flag = pipe
54 54 if pipe:
55 55 self._setup_pipe_in()
56 56
57 57 def _setup_pipe_in(self):
58 58 """setup listening pipe for subprocesses"""
59 59 ctx = self.pub_socket.context
60 60
61 61 # use UUID to authenticate pipe messages
62 62 self._pipe_uuid = uuid.uuid4().bytes
63 63
64 64 self._pipe_in = ctx.socket(zmq.PULL)
65 65 self._pipe_in.linger = 0
66 66 self._pipe_port = self._pipe_in.bind_to_random_port("tcp://127.0.0.1")
67 67 self._pipe_poller = zmq.Poller()
68 68 self._pipe_poller.register(self._pipe_in, zmq.POLLIN)
69 69
70 70 def _setup_pipe_out(self):
71 71 # must be new context after fork
72 72 ctx = zmq.Context()
73 73 self._pipe_pid = os.getpid()
74 74 self._pipe_out = ctx.socket(zmq.PUSH)
75 75 self._pipe_out_lock = threading.Lock()
76 76 self._pipe_out.connect("tcp://127.0.0.1:%i" % self._pipe_port)
77 77
78 78 def _is_master_process(self):
79 79 return os.getpid() == self._master_pid
80 80
81 81 def _is_master_thread(self):
82 82 return threading.current_thread().ident == self._master_thread
83 83
84 84 def _have_pipe_out(self):
85 85 return os.getpid() == self._pipe_pid
86 86
87 87 def _check_mp_mode(self):
88 88 """check for forks, and switch to zmq pipeline if necessary"""
89 89 if not self._pipe_flag or self._is_master_process():
90 90 return MASTER
91 91 else:
92 92 if not self._have_pipe_out():
93 self._flush_buffer()
93 94 # setup a new out pipe
94 95 self._setup_pipe_out()
95 96 return CHILD
96 97
97 98 def set_parent(self, parent):
98 99 self.parent_header = extract_header(parent)
99 100
100 101 def close(self):
101 102 self.pub_socket = None
102 103
103 104 def _flush_from_subprocesses(self):
104 105 """flush possible pub data from subprocesses into my buffer"""
105 106 if not self._pipe_flag or not self._is_master_process():
106 107 return
107 108 for i in range(100):
108 109 if self._pipe_poller.poll(0):
109 110 msg = self._pipe_in.recv_multipart()
110 111 if msg[0] != self._pipe_uuid:
111 112 continue
112 113 else:
113 114 self._buffer.write(msg[1].decode(self.encoding, 'replace'))
114 115 # this always means a flush,
115 116 # so reset our timer
116 117 self._start = 0
117 118 else:
118 119 break
119 120
120 121 def flush(self):
121 122 """trigger actual zmq send"""
122 123 if self.pub_socket is None:
123 124 raise ValueError(u'I/O operation on closed file')
124 125
125 126 mp_mode = self._check_mp_mode()
126 127
127 128 if mp_mode != CHILD:
128 129 # we are master
129 130 if not self._is_master_thread():
130 131 # sub-threads must not trigger flush,
131 132 # but at least they can force the timer.
132 133 self._start = 0
133 134 return
134 135
135 136 self._flush_from_subprocesses()
136 137 data = self._flush_buffer()
137 138
138 139 if data:
139 140 content = {u'name':self.name, u'data':data}
140 141 msg = self.session.send(self.pub_socket, u'stream', content=content,
141 142 parent=self.parent_header, ident=self.topic)
142 143
143 144 if hasattr(self.pub_socket, 'flush'):
144 145 # socket itself has flush (presumably ZMQStream)
145 146 self.pub_socket.flush()
146 147 else:
147 148 with self._pipe_out_lock:
148 149 string = self._flush_buffer()
149 150 tracker = self._pipe_out.send_multipart([
150 151 self._pipe_uuid,
151 152 string.encode(self.encoding, 'replace'),
152 153 ], copy=False, track=True)
153 154 tracker.wait(1)
154 155
155 156 def isatty(self):
156 157 return False
157 158
158 159 def __next__(self):
159 160 raise IOError('Read not supported on a write only stream.')
160 161
161 162 if not py3compat.PY3:
162 163 next = __next__
163 164
164 165 def read(self, size=-1):
165 166 raise IOError('Read not supported on a write only stream.')
166 167
167 168 def readline(self, size=-1):
168 169 raise IOError('Read not supported on a write only stream.')
169 170
170 171 def write(self, string):
171 172 if self.pub_socket is None:
172 173 raise ValueError('I/O operation on closed file')
173 174 else:
174 175 # Make sure that we're handling unicode
175 176 if not isinstance(string, unicode):
176 177 string = string.decode(self.encoding, 'replace')
178
179 is_child = (self._check_mp_mode() == CHILD)
177 180 self._buffer.write(string)
178 self._check_mp_mode()
181 if is_child:
182 # newlines imply flush in subprocesses
183 # mp.Pool cannot be trusted to flush promptly (or ever),
184 # and this helps.
185 if '\n' in string:
186 self.flush()
179 187 # do we want to check subprocess flushes on write?
180 188 # self._flush_from_subprocesses()
181 189 current_time = time.time()
182 190 if self._start < 0:
183 191 self._start = current_time
184 192 elif current_time - self._start > self.flush_interval:
185 193 self.flush()
186 194
187 195 def writelines(self, sequence):
188 196 if self.pub_socket is None:
189 197 raise ValueError('I/O operation on closed file')
190 198 else:
191 199 for string in sequence:
192 200 self.write(string)
193 201
194 202 def _flush_buffer(self):
195 203 """clear the current buffer and return the current buffer data"""
196 204 data = u''
197 205 if self._buffer is not None:
198 206 data = self._buffer.getvalue()
199 207 self._buffer.close()
200 208 self._new_buffer()
201 209 return data
202 210
203 211 def _new_buffer(self):
204 212 self._buffer = StringIO()
205 213 self._start = -1
General Comments 0
You need to be logged in to leave comments. Login now