From 414d3ae98cb15331cea738d7ce61b3bb9357170a 2014-12-05 01:48:49
From: Thomas Kluyver <takowl@gmail.com>
Date: 2014-12-05 01:48:49
Subject: [PATCH] Share an IOLoop among Qt channels, rather than one each

---

diff --git a/IPython/kernel/blocking/channels.py b/IPython/kernel/blocking/channels.py
index 0417e60..6d022e7 100644
--- a/IPython/kernel/blocking/channels.py
+++ b/IPython/kernel/blocking/channels.py
@@ -51,17 +51,17 @@ class ZMQSocketChannel(object):
     _exiting = False
     proxy_methods = []
 
-    def __init__(self, socket, session):
+    def __init__(self, socket, session, loop=None):
         """Create a channel.
 
         Parameters
         ----------
-        context : :class:`zmq.Context`
-            The ZMQ context to use.
+        socket : :class:`zmq.Socket`
+            The ZMQ socket to use.
         session : :class:`session.Session`
             The session to use.
-        address : zmq url
-            Standard (ip, port) tuple that the kernel is listening on.
+        loop
+            Unused here, for other implementations
         """
         super(ZMQSocketChannel, self).__init__()
 
diff --git a/IPython/kernel/client.py b/IPython/kernel/client.py
index cd77320..11f85ce 100644
--- a/IPython/kernel/client.py
+++ b/IPython/kernel/client.py
@@ -124,6 +124,8 @@ class KernelClient(ConnectionFileMixin):
         return (self.shell_channel.is_alive() or self.iopub_channel.is_alive() or
                 self.stdin_channel.is_alive() or self.hb_channel.is_alive())
 
+    ioloop = None  # Overridden in subclasses that use pyzmq event loop
+
     @property
     def shell_channel(self):
         """Get the shell channel object for this kernel."""
@@ -132,7 +134,7 @@ class KernelClient(ConnectionFileMixin):
             self.log.debug("connecting shell channel to %s", url)
             socket = make_shell_socket(self.context, self.session.bsession, url)
             self._shell_channel = self.shell_channel_class(
-                socket, self.session
+                socket, self.session, self.ioloop
             )
         return self._shell_channel
 
@@ -144,7 +146,7 @@ class KernelClient(ConnectionFileMixin):
             self.log.debug("connecting iopub channel to %s", url)
             socket = make_iopub_socket(self.context, self.session.bsession, url)
             self._iopub_channel = self.iopub_channel_class(
-                socket, self.session
+                socket, self.session, self.ioloop
             )
         return self._iopub_channel
 
@@ -156,7 +158,7 @@ class KernelClient(ConnectionFileMixin):
             self.log.debug("connecting stdin channel to %s", url)
             socket = make_stdin_socket(self.context, self.session.bsession, url)
             self._stdin_channel = self.stdin_channel_class(
-                socket, self.session
+                socket, self.session, self.ioloop
             )
         return self._stdin_channel
 
diff --git a/IPython/qt/client.py b/IPython/qt/client.py
index 0410191..b424d64 100644
--- a/IPython/qt/client.py
+++ b/IPython/qt/client.py
@@ -14,7 +14,7 @@ from zmq.eventloop import ioloop, zmqstream
 from IPython.external.qt import QtCore
 
 # Local imports
-from IPython.utils.traitlets import Type
+from IPython.utils.traitlets import Type, Instance
 from IPython.kernel.channels import HBChannel,\
     make_shell_socket, make_iopub_socket, make_stdin_socket
 from IPython.kernel import KernelClient
@@ -53,20 +53,12 @@ def validate_string_dict(dct):
 
 
 
-class QtZMQSocketChannel(SuperQObject, Thread):
+class QtZMQSocketChannel(SuperQObject):
     """The base class for the channels that use ZMQ sockets."""
     session = None
     socket = None
     ioloop = None
     stream = None
-    _exiting = False
-    proxy_methods = []
-
-    # Emitted when the channel is started.
-    started = QtCore.Signal()
-
-    # Emitted when the channel is stopped.
-    stopped = QtCore.Signal()
 
     message_received = QtCore.Signal(object)
 
@@ -85,78 +77,38 @@ class QtZMQSocketChannel(SuperQObject, Thread):
         """
         QtCore.QCoreApplication.instance().processEvents()
 
-    def __init__(self, socket, session):
+    def __init__(self, socket, session, loop):
         """Create a channel.
 
         Parameters
         ----------
-        context : :class:`zmq.Context`
-            The ZMQ context to use.
+        socket : :class:`zmq.Socket`
+            The ZMQ socket to use.
         session : :class:`session.Session`
             The session to use.
-        address : zmq url
-            Standard (ip, port) tuple that the kernel is listening on.
+        loop
+            A pyzmq ioloop to connect the socket to using a ZMQStream
         """
         super(QtZMQSocketChannel, self).__init__()
-        self.daemon = True
 
         self.socket = socket
         self.session = session
-        atexit.register(self._notice_exit)
-        self.ioloop = ioloop.IOLoop()
+        self.ioloop = loop
 
-    def _notice_exit(self):
-        self._exiting = True
+        self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
+        self.stream.on_recv(self._handle_recv)
 
-    def _run_loop(self):
-        """Run my loop, ignoring EINTR events in the poller"""
-        while True:
-            try:
-                self.ioloop.start()
-            except ZMQError as e:
-                if e.errno == errno.EINTR:
-                    continue
-                else:
-                    raise
-            except Exception:
-                if self._exiting:
-                    break
-                else:
-                    raise
-            else:
-                break
+    _is_alive = False
+    def is_alive(self):
+        return self._is_alive
 
     def start(self):
-        """ Reimplemented to emit signal.
-        """
-        super(QtZMQSocketChannel, self).start()
-        self.started.emit()
-
-    def run(self):
-        """The thread's main activity.  Call start() instead."""
-        self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
-        self.stream.on_recv(self._handle_recv)
-        self._run_loop()
+        self._is_alive = True
 
     def stop(self):
-        """Stop the channel's event loop and join its thread.
-
-        This calls :meth:`~threading.Thread.join` and returns when the thread
-        terminates. :class:`RuntimeError` will be raised if
-        :meth:`~threading.Thread.start` is called again.
-        """
-        if self.ioloop is not None:
-            self.ioloop.stop()
-        self.join()
-        self.close()
-        self.stopped.emit()
+        self._is_alive = False
 
     def close(self):
-        if self.ioloop is not None:
-            try:
-                self.ioloop.close(all_fds=True)
-            except Exception:
-                pass
         if self.socket is not None:
             try:
                 self.socket.close(linger=0)
@@ -238,12 +190,76 @@ class QtZMQSocketChannel(SuperQObject, Thread):
         self._flushed = True
 
 
+class IOLoopThread(Thread):
+    """Run a pyzmq ioloop in a thread to send and receive messages
+    """
+    def __init__(self, loop):
+        super(IOLoopThread, self).__init__()
+        self.daemon = True
+        atexit.register(self._notice_exit)
+        self.ioloop = loop or ioloop.IOLoop()
+
+    def _notice_exit(self):
+        self._exiting = True
+
+    def run(self):
+        """Run my loop, ignoring EINTR events in the poller"""
+        while True:
+            try:
+                self.ioloop.start()
+            except ZMQError as e:
+                if e.errno == errno.EINTR:
+                    continue
+                else:
+                    raise
+            except Exception:
+                if self._exiting:
+                    break
+                else:
+                    raise
+            else:
+                break
+
+    def stop(self):
+        """Stop the channel's event loop and join its thread.
+
+        This calls :meth:`~threading.Thread.join` and returns when the thread
+        terminates. :class:`RuntimeError` will be raised if
+        :meth:`~threading.Thread.start` is called again.
+        """
+        if self.ioloop is not None:
+            self.ioloop.stop()
+        self.join()
+        self.close()
+
+    def close(self):
+        if self.ioloop is not None:
+            try:
+                self.ioloop.close(all_fds=True)
+            except Exception:
+                pass
+
+
 class QtKernelClient(QtKernelClientMixin, KernelClient):
     """ A KernelClient that provides signals and slots.
     """
+
+    _ioloop = None
+    @property
+    def ioloop(self):
+        if self._ioloop is None:
+            self._ioloop = ioloop.IOLoop()
+        return self._ioloop
+
+    ioloop_thread = Instance(IOLoopThread)
+
     def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
         if shell:
             self.shell_channel.message_received.connect(self._check_kernel_info_reply)
+
+        self.channel_listener_thread = IOLoopThread(self.ioloop)
+        self.channel_listener_thread.start()
+
         super(QtKernelClient, self).start_channels(shell, iopub, stdin, hb)
 
     def _check_kernel_info_reply(self, msg):
@@ -251,6 +267,11 @@ class QtKernelClient(QtKernelClientMixin, KernelClient):
             self._handle_kernel_info_reply(msg)
             self.shell_channel.message_received.disconnect(self._check_kernel_info_reply)
 
+    def stop_channels(self):
+        super(QtKernelClient, self).stop_channels()
+        if self.ioloop_thread.is_alive():
+            self.ioloop_thread.stop()
+
     iopub_channel_class = Type(QtZMQSocketChannel)
     shell_channel_class = Type(QtZMQSocketChannel)
     stdin_channel_class = Type(QtZMQSocketChannel)