From f0587bad4a5d1f1f0dd31ae22f99a82c7a0314db 2010-07-13 19:04:45 From: Brian Granger Date: 2010-07-13 19:04:45 Subject: [PATCH] Work on the kernel manager. --- diff --git a/IPython/zmq/kernel.py b/IPython/zmq/kernel.py index d39f950..7932379 100755 --- a/IPython/zmq/kernel.py +++ b/IPython/zmq/kernel.py @@ -219,6 +219,7 @@ class Kernel(object): assert self.reply_socket.rcvmore(), "Unexpected missing message part." msg = self.reply_socket.recv_json() omsg = Message(msg) + print>>sys.__stdout__ print>>sys.__stdout__, omsg handler = self.handlers.get(omsg.msg_type, None) if handler is None: @@ -237,7 +238,8 @@ def main(): pub_conn = connection % (port_base+1) print >>sys.__stdout__, "Starting the kernel..." - print >>sys.__stdout__, "On:",rep_conn, pub_conn + print >>sys.__stdout__, "XREP Channel:", rep_conn + print >>sys.__stdout__, "PUB Channel:", pub_conn session = Session(username=u'kernel') diff --git a/IPython/zmq/kernelmanager.py b/IPython/zmq/kernelmanager.py new file mode 100644 index 0000000..5dad5c0 --- /dev/null +++ b/IPython/zmq/kernelmanager.py @@ -0,0 +1,271 @@ +"""Kernel frontend classes. + +To do: + +1. Create custom channel subclasses for Qt. +2. Create logger to handle debugging and console messages. + +""" + +from Queue import Queue, Empty +from threading import Thread +import traceback + +import zmq +from zmq import POLLIN, POLLOUT, POLLERR +from zmq.eventloop import ioloop +from session import Session + + +class MissingHandlerError(Exception): + pass + + +class KernelManager(object): + + def __init__(self, xreq_addr, sub_addr, rep_addr, + context=None, session=None): + self.context = zmq.Context() if context is None else context + self.session = Session() if session is None else session + self.xreq_addr = xreq_addr + self.sub_addr = sub_addr + self.rep_addr = rep_addr + + def start_kernel(self): + """Start a localhost kernel on ip and port. + + The SUB channel is for the frontend to receive messages published by + the kernel. + + The REQ channel is for the frontend to make requests of the kernel. + + The REP channel is for the kernel to request stdin (raw_input) from + the frontend. + """ + + def kill_kernel(self): + """Kill the running kernel""" + + def is_alive(self): + """Is the kernel alive?""" + return True + + def signal_kernel(self, signum): + """Send signum to the kernel.""" + + def get_sub_channel(self): + """Get the SUB socket channel object.""" + return SubSocketChannel(self.context, self.session, self.sub_addr) + + def get_xreq_channel(self): + """Get the REQ socket channel object to make requests of the kernel.""" + return XReqSocketChannel(self.context, self.session, self.xreq_addr) + + def get_rep_channel(self): + """Get the REP socket channel object to handle stdin (raw_input).""" + return RepSocketChannel(self.context, self.session, self.rep_addr) + + +class ZmqSocketChannel(Thread): + + socket = None + + def __init__(self, context, session, addr): + self.context = context + self.session = session + self.addr = addr + super(ZmqSocketChannel, self).__init__() + self.daemon = True + + +class SubSocketChannel(ZmqSocketChannel): + + handlers = None + _overriden_call_handler = None + + def __init__(self, context, session, addr): + self.handlers = {} + super(SubSocketChannel, self).__init__(context, session, addr) + + def run(self): + self.socket = self.context.socket(zmq.SUB) + self.socket.setsockopt(zmq.SUBSCRIBE,'') + self.socket.setsockopt(zmq.IDENTITY, self.session.session) + self.socket.connect('tcp://%s:%i' % self.addr) + self.ioloop = ioloop.IOLoop() + self.ioloop.add_handler(self.socket, self._handle_events, + POLLIN|POLLERR) + self.ioloop.start() + + def _handle_events(self, socket, events): + # Turn on and off POLLOUT depending on if we have made a request + if events & POLLERR: + self._handle_err() + if events & POLLIN: + self._handle_recv() + + def _handle_err(self): + raise zmq.ZmqError() + + def _handle_recv(self): + msg = self.socket.recv_json() + self.call_handlers(msg) + + def override_call_handler(self, func): + """Permanently override the call_handler. + + The function func will be called as:: + + func(handler, msg) + + And must call:: + + handler(msg) + + in the main thread. + """ + assert callable(func), "not a callable: %r" % func + self._overriden_call_handler = func + + def call_handlers(self, msg): + handler = self.handlers.get(msg['msg_type'], None) + if handler is not None: + try: + self.call_handler(handler, msg) + except: + # XXX: This should be logged at least + traceback.print_last() + + def call_handler(self, handler, msg): + if self._overriden_call_handler is not None: + self._overriden_call_handler(handler, msg) + elif hasattr(self, '_call_handler'): + call_handler = getattr(self, '_call_handler') + call_handler(handler, msg) + else: + raise RuntimeError('no handler!') + + def add_handler(self, callback, msg_type): + """Register a callback for msg type.""" + self.handlers[msg_type] = callback + + def remove_handler(self, msg_type): + self.handlers.pop(msg_type, None) + + +class XReqSocketChannel(ZmqSocketChannel): + + handler_queue = None + command_queue = None + handlers = None + _overriden_call_handler = None + + def __init__(self, context, session, addr): + self.handlers = {} + self.handler_queue = Queue() + self.command_queue = Queue() + super(XReqSocketChannel, self).__init__(context, session, addr) + + def run(self): + self.socket = self.context.socket(zmq.XREQ) + self.socket.setsockopt(zmq.IDENTITY, self.session.session) + self.socket.connect('tcp://%s:%i' % self.addr) + self.ioloop = ioloop.IOLoop() + self.ioloop.add_handler(self.socket, self._handle_events, + POLLIN|POLLOUT|POLLERR) + self.ioloop.start() + + def _handle_events(self, socket, events): + # Turn on and off POLLOUT depending on if we have made a request + if events & POLLERR: + self._handle_err() + if events & POLLOUT: + self._handle_send() + if events & POLLIN: + self._handle_recv() + + def _handle_recv(self): + msg = self.socket.recv_json() + print "Got reply:", msg + try: + handler = self.handler_queue.get(False) + except Empty: + print "Message received with no handler!!!" + print msg + else: + self.call_handler(handler, msg) + + def _handle_send(self): + try: + msg = self.command_queue.get(False) + except Empty: + pass + else: + self.socket.send_json(msg) + + def _handle_err(self): + raise zmq.ZmqError() + + def _queue_request(self, msg, callback): + handler = self._find_handler(msg['msg_type'], callback) + self.handler_queue.put(handler) + self.command_queue.put(msg) + + def execute(self, code, callback=None): + # Create class for content/msg creation. Related to, but possibly + # not in Session. + content = dict(code=code) + msg = self.session.msg('execute_request', content) + self._queue_request(msg, callback) + return msg['header']['msg_id'] + + def complete(self, text, line, block=None, callback=None): + content = dict(text=text, line=line) + msg = self.session.msg('complete_request', content) + return self._queue_request(msg, callback) + return msg['header']['msg_id'] + + def object_info(self, oname, callback=None): + content = dict(oname=oname) + msg = self.session.msg('object_info_request', content) + return self._queue_request(msg, callback) + return msg['header']['msg_id'] + + def _find_handler(self, name, callback): + if callback is not None: + return callback + handler = self.handlers.get(name) + if handler is None: + raise MissingHandlerError('No handler defined for method: %s' % name) + return handler + + def override_call_handler(self, func): + """Permanently override the call_handler. + + The function func will be called as:: + + func(handler, msg) + + And must call:: + + handler(msg) + + in the main thread. + """ + assert callable(func), "not a callable: %r" % func + self._overriden_call_handler = func + + def call_handler(self, handler, msg): + if self._overriden_call_handler is not None: + self._overriden_call_handler(handler, msg) + elif hasattr(self, '_call_handler'): + call_handler = getattr(self, '_call_handler') + call_handler(handler, msg) + else: + raise RuntimeError('no handler!') + + +class RepSocketChannel(ZmqSocketChannel): + + def on_raw_input(): + pass diff --git a/IPython/zmq/ktest.py b/IPython/zmq/ktest.py new file mode 100644 index 0000000..d4e6a24 --- /dev/null +++ b/IPython/zmq/ktest.py @@ -0,0 +1,52 @@ +from Queue import Queue, Empty +import time + +from kernelmanager import KernelManager + +xreq_addr = ('127.0.0.1',5575) +sub_addr = ('127.0.0.1', 5576) +rep_addr = ('127.0.0.1', 5577) + + +km = KernelManager(xreq_addr, sub_addr, rep_addr) +# xreq_channel = km.get_xreq_channel() +sub_channel = km.get_sub_channel() + +# xreq_channel.start() +sub_channel.start() + +print "Channels are started" + +def printer(msg): + print + print msg + +class CallHandler(object): + + def __init__(self): + self.queue = Queue() + + def __call__(self, handler, msg): + self.queue.put((handler, msg)) + + def handle(self): + try: + handler, msg = self.queue.get(block=False) + except Empty: + pass + else: + handler(msg) + +call_handler = CallHandler() +sub_channel.override_call_handler(call_handler) +sub_channel.add_handler(printer, 'pyin') +sub_channel.add_handler(printer, 'pyout') +sub_channel.add_handler(printer, 'stdout') +sub_channel.add_handler(printer, 'stderr') + +for i in range(100): + call_handler.handle() + time.sleep(1) + +# xreq_channel.join() +sub_channel.join() \ No newline at end of file diff --git a/IPython/zmq/session.py b/IPython/zmq/session.py index 658b3ed..de1de3e 100644 --- a/IPython/zmq/session.py +++ b/IPython/zmq/session.py @@ -64,9 +64,12 @@ def extract_header(msg_or_header): class Session(object): - def __init__(self, username=os.environ.get('USER','username')): + def __init__(self, username=os.environ.get('USER','username'), session=None): self.username = username - self.session = str(uuid.uuid4()) + if session is None: + self.session = str(uuid.uuid4()) + else: + self.session = session self.msg_id = 0 def msg_header(self):