diff --git a/IPython/zmq/kernel.py b/IPython/zmq/kernel.py index 2688816..285fd66 100755 --- a/IPython/zmq/kernel.py +++ b/IPython/zmq/kernel.py @@ -18,6 +18,7 @@ Things to do: # Standard library imports. import __builtin__ from code import CommandCompiler +from cStringIO import StringIO import os import sys from threading import Thread @@ -126,14 +127,15 @@ class InStream(object): class OutStream(object): """A file like object that publishes the stream to a 0MQ PUB socket.""" - def __init__(self, session, pub_socket, name, max_buffer=200): + # The time interval between automatic flushes, in seconds. + flush_interval = 0.05 + + def __init__(self, session, pub_socket, name): self.session = session self.pub_socket = pub_socket self.name = name - self._buffer = [] - self._buffer_len = 0 - self.max_buffer = max_buffer self.parent_header = {} + self._new_buffer() def set_parent(self, parent): self.parent_header = extract_header(parent) @@ -145,15 +147,16 @@ class OutStream(object): if self.pub_socket is None: raise ValueError(u'I/O operation on closed file') else: - if self._buffer: - data = ''.join(self._buffer) + data = self._buffer.getvalue() + if data: content = {u'name':self.name, u'data':data} msg = self.session.msg(u'stream', content=content, parent=self.parent_header) print>>sys.__stdout__, Message(msg) self.pub_socket.send_json(msg) - self._buffer_len = 0 - self._buffer = [] + + self._buffer.close() + self._new_buffer() def isatty(self): return False @@ -161,31 +164,33 @@ class OutStream(object): def next(self): raise IOError('Read not supported on a write only stream.') - def read(self, size=None): + def read(self, size=-1): raise IOError('Read not supported on a write only stream.') - readline=read + def readline(self, size=-1): + raise IOError('Read not supported on a write only stream.') - def write(self, s): + def write(self, string): if self.pub_socket is None: raise ValueError('I/O operation on closed file') else: - self._buffer.append(s) - self._buffer_len += len(s) - self._maybe_send() - - def _maybe_send(self): - if '\n' in self._buffer[-1]: - self.flush() - if self._buffer_len > self.max_buffer: - self.flush() + self._buffer.write(string) + current_time = time.time() + if self._start <= 0: + self._start = current_time + elif current_time - self._start > self.flush_interval: + self.flush() def writelines(self, sequence): if self.pub_socket is None: raise ValueError('I/O operation on closed file') else: - for s in sequence: - self.write(s) + for string in sequence: + self.write(string) + + def _new_buffer(self): + self._buffer = StringIO() + self._start = -1 class DisplayHook(object): @@ -257,6 +262,7 @@ class Kernel(object): return pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent) self.pub_socket.send_json(pyin_msg) + try: comp_code = self.compiler(code, '') sys.displayhook.set_parent(parent) @@ -276,6 +282,12 @@ class Kernel(object): reply_content = exc_content else: reply_content = {'status' : 'ok'} + + # Flush output before sending the reply. + sys.stderr.flush() + sys.stdout.flush() + + # Send the reply. reply_msg = self.session.msg(u'execute_reply', reply_content, parent) print>>sys.__stdout__, Message(reply_msg) self.reply_socket.send(ident, zmq.SNDMORE)