diff --git a/IPython/zmq/kernelmanager.py b/IPython/zmq/kernelmanager.py index 82529bd..81fe6cb 100644 --- a/IPython/zmq/kernelmanager.py +++ b/IPython/zmq/kernelmanager.py @@ -76,11 +76,7 @@ class ZmqSocketChannel(Thread): class SubSocketChannel(ZmqSocketChannel): - handlers = None - _overriden_call_handler = None - def __init__(self, context, session, address=None): - self.handlers = {} super(SubSocketChannel, self).__init__(context, session, address) def run(self): @@ -105,53 +101,22 @@ class SubSocketChannel(ZmqSocketChannel): self._handle_recv() def _handle_err(self): + # We don't want to let this go silently, so eventually we should log. raise zmq.ZmqError() def _handle_recv(self): msg = self.socket.recv_json() self.call_handlers(msg) - def override_call_handler(self, func): - """Permanently override the call_handler. - - The function func will be called as:: - - func(handler, msg) - - And must call:: - - handler(msg) - - in the main thread. - """ - assert callable(func), "not a callable: %r" % func - self._overriden_call_handler = func - def call_handlers(self, msg): - handler = self.handlers.get(msg['msg_type'], None) - if handler is not None: - try: - self.call_handler(handler, msg) - except: - # XXX: This should be logged at least - traceback.print_last() + """This method is called in the ioloop thread when a message arrives. - def call_handler(self, handler, msg): - if self._overriden_call_handler is not None: - self._overriden_call_handler(handler, msg) - elif hasattr(self, '_call_handler'): - call_handler = getattr(self, '_call_handler') - call_handler(handler, msg) - else: - raise RuntimeError('no handler!') - - def add_handler(self, callback, msg_type): - """Register a callback for msg type.""" - self.handlers[msg_type] = callback - - def remove_handler(self, msg_type): - """Remove the callback for msg type.""" - self.handlers.pop(msg_type, None) + Subclasses should override this method to handle incoming messages. + It is important to remember that this method is called in the thread + so that some logic must be done to ensure that the application leve + handlers are called in the application thread. + """ + raise NotImplementedError('call_handlers must be defined in a subclass.') def flush(self, timeout=1.0): """Immediately processes all pending messages on the SUB channel. @@ -228,6 +193,7 @@ class XReqSocketChannel(ZmqSocketChannel): self.socket.send_json(msg) def _handle_err(self): + # We don't want to let this go silently, so eventually we should log. raise zmq.ZmqError() def _queue_request(self, msg, callback):