diff --git a/IPython/zmq/displayhook.py b/IPython/zmq/displayhook.py new file mode 100644 index 0000000..f37f30a --- /dev/null +++ b/IPython/zmq/displayhook.py @@ -0,0 +1,22 @@ +import __builtin__ + +from session import extract_header + +class DisplayHook(object): + + def __init__(self, session, pub_socket): + self.session = session + self.pub_socket = pub_socket + self.parent_header = {} + + def __call__(self, obj): + if obj is None: + return + + __builtin__._ = obj + msg = self.session.msg(u'pyout', {u'data':repr(obj)}, + parent=self.parent_header) + self.pub_socket.send_json(msg) + + def set_parent(self, parent): + self.parent_header = extract_header(parent) \ No newline at end of file diff --git a/IPython/zmq/exitpoller.py b/IPython/zmq/exitpoller.py new file mode 100644 index 0000000..6f55703 --- /dev/null +++ b/IPython/zmq/exitpoller.py @@ -0,0 +1,42 @@ +import os +import time +from threading import Thread + + +class ExitPollerUnix(Thread): + """ A Unix-specific daemon thread that terminates the program immediately + when the parent process no longer exists. + """ + + def __init__(self): + super(ExitPollerUnix, self).__init__() + self.daemon = True + + def run(self): + # We cannot use os.waitpid because it works only for child processes. + from errno import EINTR + while True: + try: + if os.getppid() == 1: + os._exit(1) + time.sleep(1.0) + except OSError, e: + if e.errno == EINTR: + continue + raise + +class ExitPollerWindows(Thread): + """ A Windows-specific daemon thread that terminates the program immediately + when a Win32 handle is signaled. + """ + + def __init__(self, handle): + super(ExitPollerWindows, self).__init__() + self.daemon = True + self.handle = handle + + def run(self): + from _subprocess import WaitForSingleObject, WAIT_OBJECT_0, INFINITE + result = WaitForSingleObject(self.handle, INFINITE) + if result == WAIT_OBJECT_0: + os._exit(1) \ No newline at end of file diff --git a/IPython/zmq/iostream.py b/IPython/zmq/iostream.py new file mode 100644 index 0000000..8ba2d7a --- /dev/null +++ b/IPython/zmq/iostream.py @@ -0,0 +1,77 @@ +import sys +import time +from cStringIO import StringIO + +from session import extract_header, Message + +#----------------------------------------------------------------------------- +# Stream classes +#----------------------------------------------------------------------------- + +class OutStream(object): + """A file like object that publishes the stream to a 0MQ PUB socket.""" + + # The time interval between automatic flushes, in seconds. + flush_interval = 0.05 + + def __init__(self, session, pub_socket, name): + self.session = session + self.pub_socket = pub_socket + self.name = name + self.parent_header = {} + self._new_buffer() + + def set_parent(self, parent): + self.parent_header = extract_header(parent) + + def close(self): + self.pub_socket = None + + def flush(self): + if self.pub_socket is None: + raise ValueError(u'I/O operation on closed file') + else: + data = self._buffer.getvalue() + if data: + content = {u'name':self.name, u'data':data} + msg = self.session.msg(u'stream', content=content, + parent=self.parent_header) + print>>sys.__stdout__, Message(msg) + self.pub_socket.send_json(msg) + + self._buffer.close() + self._new_buffer() + + def isatty(self): + return False + + def next(self): + raise IOError('Read not supported on a write only stream.') + + def read(self, size=-1): + raise IOError('Read not supported on a write only stream.') + + def readline(self, size=-1): + raise IOError('Read not supported on a write only stream.') + + def write(self, string): + if self.pub_socket is None: + raise ValueError('I/O operation on closed file') + else: + self._buffer.write(string) + current_time = time.time() + if self._start <= 0: + self._start = current_time + elif current_time - self._start > self.flush_interval: + self.flush() + + def writelines(self, sequence): + if self.pub_socket is None: + raise ValueError('I/O operation on closed file') + else: + for string in sequence: + self.write(string) + + def _new_buffer(self): + self._buffer = StringIO() + self._start = -1 \ No newline at end of file diff --git a/IPython/zmq/kernel.py b/IPython/zmq/kernel.py index 75bf654..00be617 100755 --- a/IPython/zmq/kernel.py +++ b/IPython/zmq/kernel.py @@ -3,7 +3,6 @@ Things to do: -* Finish implementing `raw_input`. * Implement `set_parent` logic. Right before doing exec, the Kernel should call set_parent on all the PUB objects with the message about to be executed. * Implement random port and security key logic. @@ -18,10 +17,8 @@ Things to do: # Standard library imports. import __builtin__ from code import CommandCompiler -from cStringIO import StringIO import os import sys -from threading import Thread import time import traceback @@ -30,102 +27,16 @@ import zmq # Local imports. from IPython.external.argparse import ArgumentParser -from session import Session, Message, extract_header +from session import Session, Message from completer import KernelCompleter +from .iostream import OutStream +from .displayhook import DisplayHook +from .exitpoller import ExitPollerUnix, ExitPollerWindows #----------------------------------------------------------------------------- -# Kernel and stream classes +# Main kernel class #----------------------------------------------------------------------------- -class OutStream(object): - """A file like object that publishes the stream to a 0MQ PUB socket.""" - - # The time interval between automatic flushes, in seconds. - flush_interval = 0.05 - - def __init__(self, session, pub_socket, name): - self.session = session - self.pub_socket = pub_socket - self.name = name - self.parent_header = {} - self._new_buffer() - - def set_parent(self, parent): - self.parent_header = extract_header(parent) - - def close(self): - self.pub_socket = None - - def flush(self): - if self.pub_socket is None: - raise ValueError(u'I/O operation on closed file') - else: - data = self._buffer.getvalue() - if data: - content = {u'name':self.name, u'data':data} - msg = self.session.msg(u'stream', content=content, - parent=self.parent_header) - print>>sys.__stdout__, Message(msg) - self.pub_socket.send_json(msg) - - self._buffer.close() - self._new_buffer() - - def isatty(self): - return False - - def next(self): - raise IOError('Read not supported on a write only stream.') - - def read(self, size=-1): - raise IOError('Read not supported on a write only stream.') - - def readline(self, size=-1): - raise IOError('Read not supported on a write only stream.') - - def write(self, string): - if self.pub_socket is None: - raise ValueError('I/O operation on closed file') - else: - self._buffer.write(string) - current_time = time.time() - if self._start <= 0: - self._start = current_time - elif current_time - self._start > self.flush_interval: - self.flush() - - def writelines(self, sequence): - if self.pub_socket is None: - raise ValueError('I/O operation on closed file') - else: - for string in sequence: - self.write(string) - - def _new_buffer(self): - self._buffer = StringIO() - self._start = -1 - - -class DisplayHook(object): - - def __init__(self, session, pub_socket): - self.session = session - self.pub_socket = pub_socket - self.parent_header = {} - - def __call__(self, obj): - if obj is None: - return - - __builtin__._ = obj - msg = self.session.msg(u'pyout', {u'data':repr(obj)}, - parent=self.parent_header) - self.pub_socket.send_json(msg) - - def set_parent(self, parent): - self.parent_header = extract_header(parent) - - class Kernel(object): def __init__(self, session, reply_socket, pub_socket, req_socket): @@ -190,7 +101,6 @@ class Kernel(object): exec comp_code in self.user_ns, self.user_ns except: - result = u'error' etype, evalue, tb = sys.exc_info() tb = traceback.format_exception(etype, evalue, tb) exc_content = { @@ -302,45 +212,6 @@ class Kernel(object): # Kernel main and launch functions #----------------------------------------------------------------------------- -class ExitPollerUnix(Thread): - """ A Unix-specific daemon thread that terminates the program immediately - when the parent process no longer exists. - """ - - def __init__(self): - super(ExitPollerUnix, self).__init__() - self.daemon = True - - def run(self): - # We cannot use os.waitpid because it works only for child processes. - from errno import EINTR - while True: - try: - if os.getppid() == 1: - os._exit(1) - time.sleep(1.0) - except OSError, e: - if e.errno == EINTR: - continue - raise - -class ExitPollerWindows(Thread): - """ A Windows-specific daemon thread that terminates the program immediately - when a Win32 handle is signaled. - """ - - def __init__(self, handle): - super(ExitPollerWindows, self).__init__() - self.daemon = True - self.handle = handle - - def run(self): - from _subprocess import WaitForSingleObject, WAIT_OBJECT_0, INFINITE - result = WaitForSingleObject(self.handle, INFINITE) - if result == WAIT_OBJECT_0: - os._exit(1) - - def bind_port(socket, ip, port): """ Binds the specified ZMQ socket. If the port is less than zero, a random port is chosen. Returns the port that was bound.