From 414d3ae98cb15331cea738d7ce61b3bb9357170a 2014-12-05 01:48:49
From: Thomas Kluyver <>
Date: 2014-12-05 01:48:49
Subject: [PATCH] Share an IOLoop among Qt channels, rather than one each


diff --git a/IPython/kernel/blocking/ b/IPython/kernel/blocking/
index 0417e60..6d022e7 100644
--- a/IPython/kernel/blocking/
+++ b/IPython/kernel/blocking/
@@ -51,17 +51,17 @@ class ZMQSocketChannel(object):
     _exiting = False
     proxy_methods = []
-    def __init__(self, socket, session):
+    def __init__(self, socket, session, loop=None):
         """Create a channel.
-        context : :class:`zmq.Context`
-            The ZMQ context to use.
+        socket : :class:`zmq.Socket`
+            The ZMQ socket to use.
         session : :class:`session.Session`
             The session to use.
-        address : zmq url
-            Standard (ip, port) tuple that the kernel is listening on.
+        loop
+            Unused here, for other implementations
         super(ZMQSocketChannel, self).__init__()
diff --git a/IPython/kernel/ b/IPython/kernel/
index cd77320..11f85ce 100644
--- a/IPython/kernel/
+++ b/IPython/kernel/
@@ -124,6 +124,8 @@ class KernelClient(ConnectionFileMixin):
         return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
                 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
+    ioloop = None  # Overridden in subclasses that use pyzmq event loop
     def shell_channel(self):
         """Get the shell channel object for this kernel."""
@@ -132,7 +134,7 @@ class KernelClient(ConnectionFileMixin):
             self.log.debug("connecting shell channel to %s", url)
             socket = make_shell_socket(self.context, self.session.bsession, url)
             self._shell_channel = self.shell_channel_class(
-                socket, self.session
+                socket, self.session, self.ioloop
         return self._shell_channel
@@ -144,7 +146,7 @@ class KernelClient(ConnectionFileMixin):
             self.log.debug("connecting iopub channel to %s", url)
             socket = make_iopub_socket(self.context, self.session.bsession, url)
             self._iopub_channel = self.iopub_channel_class(
-                socket, self.session
+                socket, self.session, self.ioloop
         return self._iopub_channel
@@ -156,7 +158,7 @@ class KernelClient(ConnectionFileMixin):
             self.log.debug("connecting stdin channel to %s", url)
             socket = make_stdin_socket(self.context, self.session.bsession, url)
             self._stdin_channel = self.stdin_channel_class(
-                socket, self.session
+                socket, self.session, self.ioloop
         return self._stdin_channel
diff --git a/IPython/qt/ b/IPython/qt/
index 0410191..b424d64 100644
--- a/IPython/qt/
+++ b/IPython/qt/
@@ -14,7 +14,7 @@ from zmq.eventloop import ioloop, zmqstream
 from IPython.external.qt import QtCore
 # Local imports
-from IPython.utils.traitlets import Type
+from IPython.utils.traitlets import Type, Instance
 from IPython.kernel.channels import HBChannel,\
     make_shell_socket, make_iopub_socket, make_stdin_socket
 from IPython.kernel import KernelClient
@@ -53,20 +53,12 @@ def validate_string_dict(dct):
-class QtZMQSocketChannel(SuperQObject, Thread):
+class QtZMQSocketChannel(SuperQObject):
     """The base class for the channels that use ZMQ sockets."""
     session = None
     socket = None
     ioloop = None
     stream = None
-    _exiting = False
-    proxy_methods = []
-    # Emitted when the channel is started.
-    started = QtCore.Signal()
-    # Emitted when the channel is stopped.
-    stopped = QtCore.Signal()
     message_received = QtCore.Signal(object)
@@ -85,78 +77,38 @@ class QtZMQSocketChannel(SuperQObject, Thread):
-    def __init__(self, socket, session):
+    def __init__(self, socket, session, loop):
         """Create a channel.
-        context : :class:`zmq.Context`
-            The ZMQ context to use.
+        socket : :class:`zmq.Socket`
+            The ZMQ socket to use.
         session : :class:`session.Session`
             The session to use.
-        address : zmq url
-            Standard (ip, port) tuple that the kernel is listening on.
+        loop
+            A pyzmq ioloop to connect the socket to using a ZMQStream
         super(QtZMQSocketChannel, self).__init__()
-        self.daemon = True
         self.socket = socket
         self.session = session
-        atexit.register(self._notice_exit)
-        self.ioloop = ioloop.IOLoop()
+        self.ioloop = loop
-    def _notice_exit(self):
-        self._exiting = True
+ = zmqstream.ZMQStream(self.socket, self.ioloop)
-    def _run_loop(self):
-        """Run my loop, ignoring EINTR events in the poller"""
-        while True:
-            try:
-                self.ioloop.start()
-            except ZMQError as e:
-                if e.errno == errno.EINTR:
-                    continue
-                else:
-                    raise
-            except Exception:
-                if self._exiting:
-                    break
-                else:
-                    raise
-            else:
-                break
+    _is_alive = False
+    def is_alive(self):
+        return self._is_alive
     def start(self):
-        """ Reimplemented to emit signal.
-        """
-        super(QtZMQSocketChannel, self).start()
-        self.started.emit()
-    def run(self):
-        """The thread's main activity.  Call start() instead."""
- = zmqstream.ZMQStream(self.socket, self.ioloop)
-        self._run_loop()
+        self._is_alive = True
     def stop(self):
-        """Stop the channel's event loop and join its thread.
-        This calls :meth:`~threading.Thread.join` and returns when the thread
-        terminates. :class:`RuntimeError` will be raised if
-        :meth:`~threading.Thread.start` is called again.
-        """
-        if self.ioloop is not None:
-            self.ioloop.stop()
-        self.join()
-        self.close()
-        self.stopped.emit()
+        self._is_alive = False
     def close(self):
-        if self.ioloop is not None:
-            try:
-                self.ioloop.close(all_fds=True)
-            except Exception:
-                pass
         if self.socket is not None:
@@ -238,12 +190,76 @@ class QtZMQSocketChannel(SuperQObject, Thread):
         self._flushed = True
+class IOLoopThread(Thread):
+    """Run a pyzmq ioloop in a thread to send and receive messages
+    """
+    def __init__(self, loop):
+        super(IOLoopThread, self).__init__()
+        self.daemon = True
+        atexit.register(self._notice_exit)
+        self.ioloop = loop or ioloop.IOLoop()
+    def _notice_exit(self):
+        self._exiting = True
+    def run(self):
+        """Run my loop, ignoring EINTR events in the poller"""
+        while True:
+            try:
+                self.ioloop.start()
+            except ZMQError as e:
+                if e.errno == errno.EINTR:
+                    continue
+                else:
+                    raise
+            except Exception:
+                if self._exiting:
+                    break
+                else:
+                    raise
+            else:
+                break
+    def stop(self):
+        """Stop the channel's event loop and join its thread.
+        This calls :meth:`~threading.Thread.join` and returns when the thread
+        terminates. :class:`RuntimeError` will be raised if
+        :meth:`~threading.Thread.start` is called again.
+        """
+        if self.ioloop is not None:
+            self.ioloop.stop()
+        self.join()
+        self.close()
+    def close(self):
+        if self.ioloop is not None:
+            try:
+                self.ioloop.close(all_fds=True)
+            except Exception:
+                pass
 class QtKernelClient(QtKernelClientMixin, KernelClient):
     """ A KernelClient that provides signals and slots.
+    _ioloop = None
+    @property
+    def ioloop(self):
+        if self._ioloop is None:
+            self._ioloop = ioloop.IOLoop()
+        return self._ioloop
+    ioloop_thread = Instance(IOLoopThread)
     def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
         if shell:
+        self.channel_listener_thread = IOLoopThread(self.ioloop)
+        self.channel_listener_thread.start()
         super(QtKernelClient, self).start_channels(shell, iopub, stdin, hb)
     def _check_kernel_info_reply(self, msg):
@@ -251,6 +267,11 @@ class QtKernelClient(QtKernelClientMixin, KernelClient):
+    def stop_channels(self):
+        super(QtKernelClient, self).stop_channels()
+        if self.ioloop_thread.is_alive():
+            self.ioloop_thread.stop()
     iopub_channel_class = Type(QtZMQSocketChannel)
     shell_channel_class = Type(QtZMQSocketChannel)
     stdin_channel_class = Type(QtZMQSocketChannel)