##// END OF EJS Templates
Greatly increased frontend performance by improving kernel stdout/stderr buffering.
epatters -
Show More
@@ -18,6 +18,7 b' Things to do:'
18 # Standard library imports.
18 # Standard library imports.
19 import __builtin__
19 import __builtin__
20 from code import CommandCompiler
20 from code import CommandCompiler
21 from cStringIO import StringIO
21 import os
22 import os
22 import sys
23 import sys
23 from threading import Thread
24 from threading import Thread
@@ -126,14 +127,15 b' class InStream(object):'
126 class OutStream(object):
127 class OutStream(object):
127 """A file like object that publishes the stream to a 0MQ PUB socket."""
128 """A file like object that publishes the stream to a 0MQ PUB socket."""
128
129
129 def __init__(self, session, pub_socket, name, max_buffer=200):
130 # The time interval between automatic flushes, in seconds.
131 flush_interval = 0.05
132
133 def __init__(self, session, pub_socket, name):
130 self.session = session
134 self.session = session
131 self.pub_socket = pub_socket
135 self.pub_socket = pub_socket
132 self.name = name
136 self.name = name
133 self._buffer = []
134 self._buffer_len = 0
135 self.max_buffer = max_buffer
136 self.parent_header = {}
137 self.parent_header = {}
138 self._new_buffer()
137
139
138 def set_parent(self, parent):
140 def set_parent(self, parent):
139 self.parent_header = extract_header(parent)
141 self.parent_header = extract_header(parent)
@@ -145,15 +147,16 b' class OutStream(object):'
145 if self.pub_socket is None:
147 if self.pub_socket is None:
146 raise ValueError(u'I/O operation on closed file')
148 raise ValueError(u'I/O operation on closed file')
147 else:
149 else:
148 if self._buffer:
150 data = self._buffer.getvalue()
149 data = ''.join(self._buffer)
151 if data:
150 content = {u'name':self.name, u'data':data}
152 content = {u'name':self.name, u'data':data}
151 msg = self.session.msg(u'stream', content=content,
153 msg = self.session.msg(u'stream', content=content,
152 parent=self.parent_header)
154 parent=self.parent_header)
153 print>>sys.__stdout__, Message(msg)
155 print>>sys.__stdout__, Message(msg)
154 self.pub_socket.send_json(msg)
156 self.pub_socket.send_json(msg)
155 self._buffer_len = 0
157
156 self._buffer = []
158 self._buffer.close()
159 self._new_buffer()
157
160
158 def isatty(self):
161 def isatty(self):
159 return False
162 return False
@@ -161,31 +164,33 b' class OutStream(object):'
161 def next(self):
164 def next(self):
162 raise IOError('Read not supported on a write only stream.')
165 raise IOError('Read not supported on a write only stream.')
163
166
164 def read(self, size=None):
167 def read(self, size=-1):
165 raise IOError('Read not supported on a write only stream.')
168 raise IOError('Read not supported on a write only stream.')
166
169
167 readline=read
170 def readline(self, size=-1):
171 raise IOError('Read not supported on a write only stream.')
168
172
169 def write(self, s):
173 def write(self, string):
170 if self.pub_socket is None:
174 if self.pub_socket is None:
171 raise ValueError('I/O operation on closed file')
175 raise ValueError('I/O operation on closed file')
172 else:
176 else:
173 self._buffer.append(s)
177 self._buffer.write(string)
174 self._buffer_len += len(s)
178 current_time = time.time()
175 self._maybe_send()
179 if self._start <= 0:
176
180 self._start = current_time
177 def _maybe_send(self):
181 elif current_time - self._start > self.flush_interval:
178 if '\n' in self._buffer[-1]:
182 self.flush()
179 self.flush()
180 if self._buffer_len > self.max_buffer:
181 self.flush()
182
183
183 def writelines(self, sequence):
184 def writelines(self, sequence):
184 if self.pub_socket is None:
185 if self.pub_socket is None:
185 raise ValueError('I/O operation on closed file')
186 raise ValueError('I/O operation on closed file')
186 else:
187 else:
187 for s in sequence:
188 for string in sequence:
188 self.write(s)
189 self.write(string)
190
191 def _new_buffer(self):
192 self._buffer = StringIO()
193 self._start = -1
189
194
190
195
191 class DisplayHook(object):
196 class DisplayHook(object):
@@ -257,6 +262,7 b' class Kernel(object):'
257 return
262 return
258 pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
263 pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
259 self.pub_socket.send_json(pyin_msg)
264 self.pub_socket.send_json(pyin_msg)
265
260 try:
266 try:
261 comp_code = self.compiler(code, '<zmq-kernel>')
267 comp_code = self.compiler(code, '<zmq-kernel>')
262 sys.displayhook.set_parent(parent)
268 sys.displayhook.set_parent(parent)
@@ -276,6 +282,12 b' class Kernel(object):'
276 reply_content = exc_content
282 reply_content = exc_content
277 else:
283 else:
278 reply_content = {'status' : 'ok'}
284 reply_content = {'status' : 'ok'}
285
286 # Flush output before sending the reply.
287 sys.stderr.flush()
288 sys.stdout.flush()
289
290 # Send the reply.
279 reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
291 reply_msg = self.session.msg(u'execute_reply', reply_content, parent)
280 print>>sys.__stdout__, Message(reply_msg)
292 print>>sys.__stdout__, Message(reply_msg)
281 self.reply_socket.send(ident, zmq.SNDMORE)
293 self.reply_socket.send(ident, zmq.SNDMORE)
General Comments 0
You need to be logged in to leave comments. Login now