##// END OF EJS Templates
Greatly increased frontend performance by improving kernel stdout/stderr buffering.
epatters -
Show More
@@ -18,6 +18,7 b' Things to do:'
18 18 # Standard library imports.
19 19 import __builtin__
20 20 from code import CommandCompiler
21 from cStringIO import StringIO
21 22 import os
22 23 import sys
23 24 from threading import Thread
@@ -126,14 +127,15 b' class InStream(object):'
126 127 class OutStream(object):
127 128 """A file like object that publishes the stream to a 0MQ PUB socket."""
128 129
129 def __init__(self, session, pub_socket, name, max_buffer=200):
130 # The time interval between automatic flushes, in seconds.
131 flush_interval = 0.05
132
133 def __init__(self, session, pub_socket, name):
130 134 self.session = session
131 135 self.pub_socket = pub_socket
132 136 self.name = name
133 self._buffer = []
134 self._buffer_len = 0
135 self.max_buffer = max_buffer
136 137 self.parent_header = {}
138 self._new_buffer()
137 139
138 140 def set_parent(self, parent):
139 141 self.parent_header = extract_header(parent)
@@ -145,15 +147,16 b' class OutStream(object):'
145 147 if self.pub_socket is None:
146 148 raise ValueError(u'I/O operation on closed file')
147 149 else:
148 if self._buffer:
149 data = ''.join(self._buffer)
150 data = self._buffer.getvalue()
151 if data:
150 152 content = {u'name':self.name, u'data':data}
151 153 msg = self.session.msg(u'stream', content=content,
152 154 parent=self.parent_header)
153 155 print>>sys.__stdout__, Message(msg)
154 156 self.pub_socket.send_json(msg)
155 self._buffer_len = 0
156 self._buffer = []
157
158 self._buffer.close()
159 self._new_buffer()
157 160
158 161 def isatty(self):
159 162 return False
@@ -161,31 +164,33 b' class OutStream(object):'
161 164 def next(self):
162 165 raise IOError('Read not supported on a write only stream.')
163 166
164 def read(self, size=None):
167 def read(self, size=-1):
165 168 raise IOError('Read not supported on a write only stream.')
166 169
167 readline=read
170 def readline(self, size=-1):
171 raise IOError('Read not supported on a write only stream.')
168 172
169 def write(self, s):
173 def write(self, string):
170 174 if self.pub_socket is None:
171 175 raise ValueError('I/O operation on closed file')
172 176 else:
173 self._buffer.append(s)
174 self._buffer_len += len(s)
175 self._maybe_send()
176
177 def _maybe_send(self):
178 if '\n' in self._buffer[-1]:
179 self.flush()
180 if self._buffer_len > self.max_buffer:
181 self.flush()
177 self._buffer.write(string)
178 current_time = time.time()
179 if self._start <= 0:
180 self._start = current_time
181 elif current_time - self._start > self.flush_interval:
182 self.flush()
182 183
183 184 def writelines(self, sequence):
184 185 if self.pub_socket is None:
185 186 raise ValueError('I/O operation on closed file')
186 187 else:
187 for s in sequence:
188 self.write(s)
188 for string in sequence:
189 self.write(string)
190
191 def _new_buffer(self):
192 self._buffer = StringIO()
193 self._start = -1
189 194
190 195
191 196 class DisplayHook(object):
@@ -257,6 +262,7 b' class Kernel(object):'
257 262 return
258 263 pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
259 264 self.pub_socket.send_json(pyin_msg)
265
260 266 try:
261 267 comp_code = self.compiler(code, '<zmq-kernel>')
262 268 sys.displayhook.set_parent(parent)
@@ -276,6 +282,12 b' class Kernel(object):'
276 282 reply_content = exc_content
277 283 else:
278 284 reply_content = {'status' : 'ok'}
285
286 # Flush output before sending the reply.
287 sys.stderr.flush()
288 sys.stdout.flush()
289
290 # Send the reply.
279 291 reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
280 292 print>>sys.__stdout__, Message(reply_msg)
281 293 self.reply_socket.send(ident, zmq.SNDMORE)
General Comments 0
You need to be logged in to leave comments. Login now