diff --git a/IPython/frontend/qt/kernelmanager.py b/IPython/frontend/qt/kernelmanager.py index f06d84b..f546479 100644 --- a/IPython/frontend/qt/kernelmanager.py +++ b/IPython/frontend/qt/kernelmanager.py @@ -3,6 +3,7 @@ # System library imports. from PyQt4 import QtCore +import zmq # IPython imports. from IPython.zmq.kernelmanager import KernelManager, SubSocketChannel, \ @@ -10,6 +11,7 @@ from IPython.zmq.kernelmanager import KernelManager, SubSocketChannel, \ from util import MetaQObjectHasTraits + class QtSubSocketChannel(SubSocketChannel, QtCore.QObject): # Emitted when any message is received. @@ -95,6 +97,7 @@ class QtXReqSocketChannel(XReqSocketChannel, QtCore.QObject): """ Reimplemented to skip callback handling. """ self.command_queue.put(msg) + self.add_io_state(zmq.POLLOUT) class QtRepSocketChannel(RepSocketChannel, QtCore.QObject): diff --git a/IPython/zmq/kernelmanager.py b/IPython/zmq/kernelmanager.py index f0a920d..0829c35 100644 --- a/IPython/zmq/kernelmanager.py +++ b/IPython/zmq/kernelmanager.py @@ -34,6 +34,12 @@ class ZmqSocketChannel(Thread): """ The base class for the channels that use ZMQ sockets. """ + context = None + session = None + socket = None + ioloop = None + iostate = None + def __init__(self, context, session, address=None): super(ZmqSocketChannel, self).__init__() self.daemon = True @@ -41,7 +47,6 @@ class ZmqSocketChannel(Thread): self.context = context self.session = session self.address = address - self.socket = None def stop(self): """Stop the thread's activity. Returns when the thread terminates. @@ -73,6 +78,28 @@ class ZmqSocketChannel(Thread): address = property(get_address, set_adresss) + def add_io_state(self, state): + """Add IO state to the eventloop. + + This is thread safe as it uses the thread safe IOLoop.add_callback. + """ + def add_io_state_callback(): + if not self.iostate & state: + self.iostate = self.iostate | state + self.ioloop.update_handler(self.socket, self.iostate) + self.ioloop.add_callback(add_io_state_callback) + + def drop_io_state(self, state): + """Drop IO state from the eventloop. + + This is thread safe as it uses the thread safe IOLoop.add_callback. + """ + def drop_io_state_callback(): + if self.iostate & state: + self.iostate = self.iostate & (~state) + self.ioloop.update_handler(self.socket, self.iostate) + self.ioloop.add_callback(drop_io_state_callback) + class SubSocketChannel(ZmqSocketChannel): @@ -85,8 +112,9 @@ class SubSocketChannel(ZmqSocketChannel): self.socket.setsockopt(zmq.IDENTITY, self.session.session) self.socket.connect('tcp://%s:%i' % self.address) self.ioloop = ioloop.IOLoop() + self.iostate = POLLIN|POLLERR self.ioloop.add_handler(self.socket, self._handle_events, - POLLIN|POLLERR) + self.iostate) self.ioloop.start() def stop(self): @@ -171,8 +199,9 @@ class XReqSocketChannel(ZmqSocketChannel): self.socket.setsockopt(zmq.IDENTITY, self.session.session) self.socket.connect('tcp://%s:%i' % self.address) self.ioloop = ioloop.IOLoop() + self.iostate = POLLERR|POLLIN self.ioloop.add_handler(self.socket, self._handle_events, - POLLIN|POLLOUT|POLLERR) + self.iostate) self.ioloop.start() def stop(self): @@ -180,7 +209,6 @@ class XReqSocketChannel(ZmqSocketChannel): super(XReqSocketChannel, self).stop() def _handle_events(self, socket, events): - # Turn on and off POLLOUT depending on if we have made a request if events & POLLERR: self._handle_err() if events & POLLOUT: @@ -199,6 +227,8 @@ class XReqSocketChannel(ZmqSocketChannel): pass else: self.socket.send_json(msg) + if self.command_queue.empty(): + self.drop_io_state(POLLOUT) def _handle_err(self): # We don't want to let this go silently, so eventually we should log. @@ -208,6 +238,7 @@ class XReqSocketChannel(ZmqSocketChannel): handler = self._find_handler(msg['msg_type'], callback) self.handler_queue.put(handler) self.command_queue.put(msg) + self.add_io_state(POLLOUT) def execute(self, code, callback=None): # Create class for content/msg creation. Related to, but possibly