From ad81cbf3e65c80a1664d4b53af053bb990efc45c 2010-09-15 19:25:28 From: epatters Date: 2010-09-15 19:25:28 Subject: [PATCH] Integrated the heart beat pausing/unpausing logic with the (Qt)KernelManager. --- diff --git a/IPython/frontend/qt/console/console_widget.py b/IPython/frontend/qt/console/console_widget.py index df8a011..d5e51f0 100644 --- a/IPython/frontend/qt/console/console_widget.py +++ b/IPython/frontend/qt/console/console_widget.py @@ -1,4 +1,4 @@ -"""A base class for console-type widgets. +""" An abstract base class for console-type widgets. """ #----------------------------------------------------------------------------- # Imports @@ -57,7 +57,7 @@ class ConsoleWidget(Configurable, QtGui.QWidget): kind = Enum(['plain', 'rich'], default_value='plain', config=True) # The type of paging to use. Valid values are: - # 'inside' : The widget pages like a traditional terminal pager. + # 'inside' : The widget pages like a traditional terminal. # 'hsplit' : When paging is requested, the widget is split # horizontally. The top pane contains the console, and the # bottom pane contains the paged text. diff --git a/IPython/frontend/qt/kernelmanager.py b/IPython/frontend/qt/kernelmanager.py index 0848736..09e7fa6 100644 --- a/IPython/frontend/qt/kernelmanager.py +++ b/IPython/frontend/qt/kernelmanager.py @@ -47,7 +47,7 @@ class QtXReqSocketChannel(SocketChannelQObject, XReqSocketChannel): complete_reply = QtCore.pyqtSignal(object) object_info_reply = QtCore.pyqtSignal(object) - # Emitted when the first reply comes back + # Emitted when the first reply comes back. first_reply = QtCore.pyqtSignal() # Used by the first_reply signal logic to determine if a reply is the @@ -72,8 +72,11 @@ class QtXReqSocketChannel(SocketChannelQObject, XReqSocketChannel): if not self._handlers_called: self.first_reply.emit() + self._handlers_called = True - self._handlers_called = True + #--------------------------------------------------------------------------- + # 'QtXReqSocketChannel' interface + #--------------------------------------------------------------------------- def reset_first_reply(self): """ Reset the first_reply signal to fire again on the next reply. @@ -188,6 +191,17 @@ class QtKernelManager(KernelManager, SuperQObject): #--------------------------------------------------------------------------- # 'KernelManager' interface #--------------------------------------------------------------------------- + + #------ Kernel process management ------------------------------------------ + + def start_kernel(self, *args, **kw): + """ Reimplemented for proper heartbeat management. + """ + if self._xreq_channel is not None: + self._xreq_channel.reset_first_reply() + super(QtKernelManager, self).start_kernel(*args, **kw) + + #------ Channel management ------------------------------------------------- def start_channels(self, *args, **kw): """ Reimplemented to emit signal. @@ -200,3 +214,24 @@ class QtKernelManager(KernelManager, SuperQObject): """ super(QtKernelManager, self).stop_channels() self.stopped_channels.emit() + + @property + def xreq_channel(self): + """ Reimplemented for proper heartbeat management. + """ + if self._xreq_channel is None: + self._xreq_channel = super(QtKernelManager, self).xreq_channel + self._xreq_channel.first_reply.connect(self._first_reply) + return self._xreq_channel + + #--------------------------------------------------------------------------- + # Protected interface + #--------------------------------------------------------------------------- + + def _first_reply(self): + """ Unpauses the heartbeat channel when the first reply is received on + the execute channel. Note that this will *not* start the heartbeat + channel if it is not already running! + """ + if self._hb_channel is not None: + self._hb_channel.unpause() diff --git a/IPython/zmq/kernelmanager.py b/IPython/zmq/kernelmanager.py index 4e0f6ca..6da3c18 100644 --- a/IPython/zmq/kernelmanager.py +++ b/IPython/zmq/kernelmanager.py @@ -509,7 +509,12 @@ class RepSocketChannel(ZmqSocketChannel): class HBSocketChannel(ZmqSocketChannel): - """The heartbeat channel which monitors the kernel heartbeat.""" + """The heartbeat channel which monitors the kernel heartbeat. + + Note that the heartbeat channel is paused by default. As long as you start + this channel, the kernel manager will ensure that it is paused and un-paused + as appropriate. + """ time_to_dead = 3.0 socket = None @@ -520,7 +525,7 @@ class HBSocketChannel(ZmqSocketChannel): def __init__(self, context, session, address): super(HBSocketChannel, self).__init__(context, session, address) self._running = False - self._pause = False + self._pause = True def _create_socket(self): self.socket = self.context.socket(zmq.REQ) @@ -533,12 +538,6 @@ class HBSocketChannel(ZmqSocketChannel): """The thread's main activity. Call start() instead.""" self._create_socket() self._running = True - # Wait 2 seconds for the kernel to come up and the sockets to auto - # connect. If we don't we will see the kernel as dead. Also, before - # the sockets are connected, the poller.poll line below is returning - # too fast. This avoids that because the polling doesn't start until - # after the sockets are connected. - time.sleep(2.0) while self._running: if self._pause: time.sleep(self.time_to_dead) @@ -567,14 +566,15 @@ class HBSocketChannel(ZmqSocketChannel): until_dead = self.time_to_dead - (before_poll - request_time) - # When the return value of poll() is an empty list, - # that is when things have gone wrong (zeromq bug). - # As long as it is not an empty list, poll is - # working correctly even if it returns quickly. - # Note: poll timeout is in milliseconds. + # When the return value of poll() is an empty + # list, that is when things have gone wrong + # (zeromq bug). As long as it is not an empty + # list, poll is working correctly even if it + # returns quickly. Note: poll timeout is in + # milliseconds. self.poller.poll(1000*until_dead) - since_last_heartbeat = time.time() - request_time + since_last_heartbeat = time.time()-request_time if since_last_heartbeat > self.time_to_dead: self.call_handlers(since_last_heartbeat) break @@ -666,8 +666,8 @@ class KernelManager(HasTraits): # Channel management methods: #-------------------------------------------------------------------------- - def start_channels(self, xreq=True, sub=True, rep=True): - """Starts the channels for this kernel, but not the heartbeat. + def start_channels(self, xreq=True, sub=True, rep=True, hb=True): + """Starts the channels for this kernel. This will create the channels if they do not exist and then start them. If port numbers of 0 are being used (random ports) then you @@ -680,6 +680,8 @@ class KernelManager(HasTraits): self.sub_channel.start() if rep: self.rep_channel.start() + if hb: + self.hb_channel.start() def stop_channels(self): """Stops all the running channels for this kernel. @@ -690,13 +692,14 @@ class KernelManager(HasTraits): self.sub_channel.stop() if self.rep_channel.is_alive(): self.rep_channel.stop() + if self.hb_channel.is_alive(): + self.hb_channel.stop() @property def channels_running(self): """Are any of the channels created and running?""" - return self.xreq_channel.is_alive() \ - or self.sub_channel.is_alive() \ - or self.rep_channel.is_alive() + return (self.xreq_channel.is_alive() or self.sub_channel.is_alive() or + self.rep_channel.is_alive() or self.hb_channel.is_alive()) #-------------------------------------------------------------------------- # Kernel process management methods: @@ -743,10 +746,14 @@ class KernelManager(HasTraits): self.kill_kernel() return - self.xreq_channel.shutdown() + # Pause the heart beat channel if it exists. + if self._hb_channel is not None: + self._hb_channel.pause() + # Don't send any additional kernel kill messages immediately, to give # the kernel a chance to properly execute shutdown actions. Wait for at # most 1s, checking every 0.1s. + self.xreq_channel.shutdown() for i in range(10): if self.is_alive: time.sleep(0.1) @@ -798,6 +805,10 @@ class KernelManager(HasTraits): def kill_kernel(self): """ Kill the running kernel. """ if self.has_kernel: + # Pause the heart beat channel if it exists. + if self._hb_channel is not None: + self._hb_channel.pause() + self.kernel.kill() self.kernel = None else: