From ad81cbf3e65c80a1664d4b53af053bb990efc45c 2010-09-15 19:25:28
From: epatters <epatters@enthought.com>
Date: 2010-09-15 19:25:28
Subject: [PATCH] Integrated the heart beat pausing/unpausing logic with the (Qt)KernelManager.

---

diff --git a/IPython/frontend/qt/console/console_widget.py b/IPython/frontend/qt/console/console_widget.py
index df8a011..d5e51f0 100644
--- a/IPython/frontend/qt/console/console_widget.py
+++ b/IPython/frontend/qt/console/console_widget.py
@@ -1,4 +1,4 @@
-"""A base class for console-type widgets.
+""" An abstract base class for console-type widgets.
 """
 #-----------------------------------------------------------------------------
 # Imports
@@ -57,7 +57,7 @@ class ConsoleWidget(Configurable, QtGui.QWidget):
     kind = Enum(['plain', 'rich'], default_value='plain', config=True)
 
     # The type of paging to use. Valid values are:
-    #     'inside' : The widget pages like a traditional terminal pager.
+    #     'inside' : The widget pages like a traditional terminal.
     #     'hsplit' : When paging is requested, the widget is split
     #                horizontally. The top pane contains the console, and the
     #                bottom pane contains the paged text.
diff --git a/IPython/frontend/qt/kernelmanager.py b/IPython/frontend/qt/kernelmanager.py
index 0848736..09e7fa6 100644
--- a/IPython/frontend/qt/kernelmanager.py
+++ b/IPython/frontend/qt/kernelmanager.py
@@ -47,7 +47,7 @@ class QtXReqSocketChannel(SocketChannelQObject, XReqSocketChannel):
     complete_reply = QtCore.pyqtSignal(object)
     object_info_reply = QtCore.pyqtSignal(object)
 
-    # Emitted when the first reply comes back
+    # Emitted when the first reply comes back.
     first_reply = QtCore.pyqtSignal()
 
     # Used by the first_reply signal logic to determine if a reply is the 
@@ -72,8 +72,11 @@ class QtXReqSocketChannel(SocketChannelQObject, XReqSocketChannel):
 
         if not self._handlers_called:
             self.first_reply.emit()
+            self._handlers_called = True
 
-        self._handlers_called = True
+    #---------------------------------------------------------------------------
+    # 'QtXReqSocketChannel' interface
+    #---------------------------------------------------------------------------
 
     def reset_first_reply(self):
         """ Reset the first_reply signal to fire again on the next reply.
@@ -188,6 +191,17 @@ class QtKernelManager(KernelManager, SuperQObject):
     #---------------------------------------------------------------------------
     # 'KernelManager' interface
     #---------------------------------------------------------------------------
+
+    #------ Kernel process management ------------------------------------------
+
+    def start_kernel(self, *args, **kw):
+        """ Reimplemented for proper heartbeat management.
+        """
+        if self._xreq_channel is not None:
+            self._xreq_channel.reset_first_reply()
+        super(QtKernelManager, self).start_kernel(*args, **kw)
+
+    #------ Channel management -------------------------------------------------
     
     def start_channels(self, *args, **kw):
         """ Reimplemented to emit signal.
@@ -200,3 +214,24 @@ class QtKernelManager(KernelManager, SuperQObject):
         """ 
         super(QtKernelManager, self).stop_channels()
         self.stopped_channels.emit()
+
+    @property
+    def xreq_channel(self):
+        """ Reimplemented for proper heartbeat management.
+        """
+        if self._xreq_channel is None:
+            self._xreq_channel = super(QtKernelManager, self).xreq_channel
+            self._xreq_channel.first_reply.connect(self._first_reply)
+        return self._xreq_channel
+
+    #---------------------------------------------------------------------------
+    # Protected interface
+    #---------------------------------------------------------------------------
+    
+    def _first_reply(self):
+        """ Unpauses the heartbeat channel when the first reply is received on
+            the execute channel. Note that this will *not* start the heartbeat
+            channel if it is not already running!
+        """
+        if self._hb_channel is not None:
+            self._hb_channel.unpause()
diff --git a/IPython/zmq/kernelmanager.py b/IPython/zmq/kernelmanager.py
index 4e0f6ca..6da3c18 100644
--- a/IPython/zmq/kernelmanager.py
+++ b/IPython/zmq/kernelmanager.py
@@ -509,7 +509,12 @@ class RepSocketChannel(ZmqSocketChannel):
 
 
 class HBSocketChannel(ZmqSocketChannel):
-    """The heartbeat channel which monitors the kernel heartbeat."""
+    """The heartbeat channel which monitors the kernel heartbeat.
+
+    Note that the heartbeat channel is paused by default. As long as you start
+    this channel, the kernel manager will ensure that it is paused and un-paused
+    as appropriate.
+    """
 
     time_to_dead = 3.0
     socket = None
@@ -520,7 +525,7 @@ class HBSocketChannel(ZmqSocketChannel):
     def __init__(self, context, session, address):
         super(HBSocketChannel, self).__init__(context, session, address)
         self._running = False
-        self._pause = False
+        self._pause = True
 
     def _create_socket(self):
         self.socket = self.context.socket(zmq.REQ)
@@ -533,12 +538,6 @@ class HBSocketChannel(ZmqSocketChannel):
         """The thread's main activity.  Call start() instead."""
         self._create_socket()
         self._running = True
-        # Wait 2 seconds for the kernel to come up and the sockets to auto
-        # connect. If we don't we will see the kernel as dead. Also, before
-        # the sockets are connected, the poller.poll line below is returning
-        # too fast. This avoids that because the polling doesn't start until
-        # after the sockets are connected.
-        time.sleep(2.0)
         while self._running:
             if self._pause:
                 time.sleep(self.time_to_dead)
@@ -567,14 +566,15 @@ class HBSocketChannel(ZmqSocketChannel):
                                 until_dead = self.time_to_dead - (before_poll -
                                                                   request_time)
 
-                                # When the return value of poll() is an empty list,
-                                # that is when things have gone wrong (zeromq bug).
-                                # As long as it is not an empty list, poll is
-                                # working correctly even if it returns quickly.
-                                # Note: poll timeout is in milliseconds.
+                                # When the return value of poll() is an empty
+                                # list, that is when things have gone wrong
+                                # (zeromq bug). As long as it is not an empty
+                                # list, poll is working correctly even if it
+                                # returns quickly. Note: poll timeout is in
+                                # milliseconds.
                                 self.poller.poll(1000*until_dead)
                             
-                                since_last_heartbeat = time.time() - request_time
+                                since_last_heartbeat = time.time()-request_time
                                 if since_last_heartbeat > self.time_to_dead:
                                     self.call_handlers(since_last_heartbeat)
                                     break
@@ -666,8 +666,8 @@ class KernelManager(HasTraits):
     # Channel management methods:
     #--------------------------------------------------------------------------
 
-    def start_channels(self, xreq=True, sub=True, rep=True):
-        """Starts the channels for this kernel, but not the heartbeat.
+    def start_channels(self, xreq=True, sub=True, rep=True, hb=True):
+        """Starts the channels for this kernel.
 
         This will create the channels if they do not exist and then start
         them. If port numbers of 0 are being used (random ports) then you
@@ -680,6 +680,8 @@ class KernelManager(HasTraits):
             self.sub_channel.start()
         if rep:
             self.rep_channel.start()
+        if hb:
+            self.hb_channel.start()
 
     def stop_channels(self):
         """Stops all the running channels for this kernel.
@@ -690,13 +692,14 @@ class KernelManager(HasTraits):
             self.sub_channel.stop()
         if self.rep_channel.is_alive():
             self.rep_channel.stop()
+        if self.hb_channel.is_alive():
+            self.hb_channel.stop()
 
     @property
     def channels_running(self):
         """Are any of the channels created and running?"""
-        return self.xreq_channel.is_alive() \
-            or self.sub_channel.is_alive() \
-            or self.rep_channel.is_alive()
+        return (self.xreq_channel.is_alive() or self.sub_channel.is_alive() or
+                self.rep_channel.is_alive() or self.hb_channel.is_alive())
 
     #--------------------------------------------------------------------------
     # Kernel process management methods:
@@ -743,10 +746,14 @@ class KernelManager(HasTraits):
             self.kill_kernel()
             return
 
-        self.xreq_channel.shutdown()
+        # Pause the heart beat channel if it exists.
+        if self._hb_channel is not None:
+            self._hb_channel.pause()
+
         # Don't send any additional kernel kill messages immediately, to give
         # the kernel a chance to properly execute shutdown actions. Wait for at
         # most 1s, checking every 0.1s.
+        self.xreq_channel.shutdown()
         for i in range(10):
             if self.is_alive:
                 time.sleep(0.1)
@@ -798,6 +805,10 @@ class KernelManager(HasTraits):
     def kill_kernel(self):
         """ Kill the running kernel. """
         if self.has_kernel:
+            # Pause the heart beat channel if it exists.
+            if self._hb_channel is not None:
+                self._hb_channel.pause()
+
             self.kernel.kill()
             self.kernel = None
         else: