##// END OF EJS Templates
Integrated the heart beat pausing/unpausing logic with the (Qt)KernelManager.
epatters -
Show More
@@ -1,4 +1,4 b''
1 """A base class for console-type widgets.
1 """ An abstract base class for console-type widgets.
2 """
2 """
3 #-----------------------------------------------------------------------------
3 #-----------------------------------------------------------------------------
4 # Imports
4 # Imports
@@ -57,7 +57,7 b' class ConsoleWidget(Configurable, QtGui.QWidget):'
57 kind = Enum(['plain', 'rich'], default_value='plain', config=True)
57 kind = Enum(['plain', 'rich'], default_value='plain', config=True)
58
58
59 # The type of paging to use. Valid values are:
59 # The type of paging to use. Valid values are:
60 # 'inside' : The widget pages like a traditional terminal pager.
60 # 'inside' : The widget pages like a traditional terminal.
61 # 'hsplit' : When paging is requested, the widget is split
61 # 'hsplit' : When paging is requested, the widget is split
62 # horizontally. The top pane contains the console, and the
62 # horizontally. The top pane contains the console, and the
63 # bottom pane contains the paged text.
63 # bottom pane contains the paged text.
@@ -47,7 +47,7 b' class QtXReqSocketChannel(SocketChannelQObject, XReqSocketChannel):'
47 complete_reply = QtCore.pyqtSignal(object)
47 complete_reply = QtCore.pyqtSignal(object)
48 object_info_reply = QtCore.pyqtSignal(object)
48 object_info_reply = QtCore.pyqtSignal(object)
49
49
50 # Emitted when the first reply comes back
50 # Emitted when the first reply comes back.
51 first_reply = QtCore.pyqtSignal()
51 first_reply = QtCore.pyqtSignal()
52
52
53 # Used by the first_reply signal logic to determine if a reply is the
53 # Used by the first_reply signal logic to determine if a reply is the
@@ -72,8 +72,11 b' class QtXReqSocketChannel(SocketChannelQObject, XReqSocketChannel):'
72
72
73 if not self._handlers_called:
73 if not self._handlers_called:
74 self.first_reply.emit()
74 self.first_reply.emit()
75 self._handlers_called = True
75
76
76 self._handlers_called = True
77 #---------------------------------------------------------------------------
78 # 'QtXReqSocketChannel' interface
79 #---------------------------------------------------------------------------
77
80
78 def reset_first_reply(self):
81 def reset_first_reply(self):
79 """ Reset the first_reply signal to fire again on the next reply.
82 """ Reset the first_reply signal to fire again on the next reply.
@@ -188,6 +191,17 b' class QtKernelManager(KernelManager, SuperQObject):'
188 #---------------------------------------------------------------------------
191 #---------------------------------------------------------------------------
189 # 'KernelManager' interface
192 # 'KernelManager' interface
190 #---------------------------------------------------------------------------
193 #---------------------------------------------------------------------------
194
195 #------ Kernel process management ------------------------------------------
196
197 def start_kernel(self, *args, **kw):
198 """ Reimplemented for proper heartbeat management.
199 """
200 if self._xreq_channel is not None:
201 self._xreq_channel.reset_first_reply()
202 super(QtKernelManager, self).start_kernel(*args, **kw)
203
204 #------ Channel management -------------------------------------------------
191
205
192 def start_channels(self, *args, **kw):
206 def start_channels(self, *args, **kw):
193 """ Reimplemented to emit signal.
207 """ Reimplemented to emit signal.
@@ -200,3 +214,24 b' class QtKernelManager(KernelManager, SuperQObject):'
200 """
214 """
201 super(QtKernelManager, self).stop_channels()
215 super(QtKernelManager, self).stop_channels()
202 self.stopped_channels.emit()
216 self.stopped_channels.emit()
217
218 @property
219 def xreq_channel(self):
220 """ Reimplemented for proper heartbeat management.
221 """
222 if self._xreq_channel is None:
223 self._xreq_channel = super(QtKernelManager, self).xreq_channel
224 self._xreq_channel.first_reply.connect(self._first_reply)
225 return self._xreq_channel
226
227 #---------------------------------------------------------------------------
228 # Protected interface
229 #---------------------------------------------------------------------------
230
231 def _first_reply(self):
232 """ Unpauses the heartbeat channel when the first reply is received on
233 the execute channel. Note that this will *not* start the heartbeat
234 channel if it is not already running!
235 """
236 if self._hb_channel is not None:
237 self._hb_channel.unpause()
@@ -509,7 +509,12 b' class RepSocketChannel(ZmqSocketChannel):'
509
509
510
510
511 class HBSocketChannel(ZmqSocketChannel):
511 class HBSocketChannel(ZmqSocketChannel):
512 """The heartbeat channel which monitors the kernel heartbeat."""
512 """The heartbeat channel which monitors the kernel heartbeat.
513
514 Note that the heartbeat channel is paused by default. As long as you start
515 this channel, the kernel manager will ensure that it is paused and un-paused
516 as appropriate.
517 """
513
518
514 time_to_dead = 3.0
519 time_to_dead = 3.0
515 socket = None
520 socket = None
@@ -520,7 +525,7 b' class HBSocketChannel(ZmqSocketChannel):'
520 def __init__(self, context, session, address):
525 def __init__(self, context, session, address):
521 super(HBSocketChannel, self).__init__(context, session, address)
526 super(HBSocketChannel, self).__init__(context, session, address)
522 self._running = False
527 self._running = False
523 self._pause = False
528 self._pause = True
524
529
525 def _create_socket(self):
530 def _create_socket(self):
526 self.socket = self.context.socket(zmq.REQ)
531 self.socket = self.context.socket(zmq.REQ)
@@ -533,12 +538,6 b' class HBSocketChannel(ZmqSocketChannel):'
533 """The thread's main activity. Call start() instead."""
538 """The thread's main activity. Call start() instead."""
534 self._create_socket()
539 self._create_socket()
535 self._running = True
540 self._running = True
536 # Wait 2 seconds for the kernel to come up and the sockets to auto
537 # connect. If we don't we will see the kernel as dead. Also, before
538 # the sockets are connected, the poller.poll line below is returning
539 # too fast. This avoids that because the polling doesn't start until
540 # after the sockets are connected.
541 time.sleep(2.0)
542 while self._running:
541 while self._running:
543 if self._pause:
542 if self._pause:
544 time.sleep(self.time_to_dead)
543 time.sleep(self.time_to_dead)
@@ -567,14 +566,15 b' class HBSocketChannel(ZmqSocketChannel):'
567 until_dead = self.time_to_dead - (before_poll -
566 until_dead = self.time_to_dead - (before_poll -
568 request_time)
567 request_time)
569
568
570 # When the return value of poll() is an empty list,
569 # When the return value of poll() is an empty
571 # that is when things have gone wrong (zeromq bug).
570 # list, that is when things have gone wrong
572 # As long as it is not an empty list, poll is
571 # (zeromq bug). As long as it is not an empty
573 # working correctly even if it returns quickly.
572 # list, poll is working correctly even if it
574 # Note: poll timeout is in milliseconds.
573 # returns quickly. Note: poll timeout is in
574 # milliseconds.
575 self.poller.poll(1000*until_dead)
575 self.poller.poll(1000*until_dead)
576
576
577 since_last_heartbeat = time.time() - request_time
577 since_last_heartbeat = time.time()-request_time
578 if since_last_heartbeat > self.time_to_dead:
578 if since_last_heartbeat > self.time_to_dead:
579 self.call_handlers(since_last_heartbeat)
579 self.call_handlers(since_last_heartbeat)
580 break
580 break
@@ -666,8 +666,8 b' class KernelManager(HasTraits):'
666 # Channel management methods:
666 # Channel management methods:
667 #--------------------------------------------------------------------------
667 #--------------------------------------------------------------------------
668
668
669 def start_channels(self, xreq=True, sub=True, rep=True):
669 def start_channels(self, xreq=True, sub=True, rep=True, hb=True):
670 """Starts the channels for this kernel, but not the heartbeat.
670 """Starts the channels for this kernel.
671
671
672 This will create the channels if they do not exist and then start
672 This will create the channels if they do not exist and then start
673 them. If port numbers of 0 are being used (random ports) then you
673 them. If port numbers of 0 are being used (random ports) then you
@@ -680,6 +680,8 b' class KernelManager(HasTraits):'
680 self.sub_channel.start()
680 self.sub_channel.start()
681 if rep:
681 if rep:
682 self.rep_channel.start()
682 self.rep_channel.start()
683 if hb:
684 self.hb_channel.start()
683
685
684 def stop_channels(self):
686 def stop_channels(self):
685 """Stops all the running channels for this kernel.
687 """Stops all the running channels for this kernel.
@@ -690,13 +692,14 b' class KernelManager(HasTraits):'
690 self.sub_channel.stop()
692 self.sub_channel.stop()
691 if self.rep_channel.is_alive():
693 if self.rep_channel.is_alive():
692 self.rep_channel.stop()
694 self.rep_channel.stop()
695 if self.hb_channel.is_alive():
696 self.hb_channel.stop()
693
697
694 @property
698 @property
695 def channels_running(self):
699 def channels_running(self):
696 """Are any of the channels created and running?"""
700 """Are any of the channels created and running?"""
697 return self.xreq_channel.is_alive() \
701 return (self.xreq_channel.is_alive() or self.sub_channel.is_alive() or
698 or self.sub_channel.is_alive() \
702 self.rep_channel.is_alive() or self.hb_channel.is_alive())
699 or self.rep_channel.is_alive()
700
703
701 #--------------------------------------------------------------------------
704 #--------------------------------------------------------------------------
702 # Kernel process management methods:
705 # Kernel process management methods:
@@ -743,10 +746,14 b' class KernelManager(HasTraits):'
743 self.kill_kernel()
746 self.kill_kernel()
744 return
747 return
745
748
746 self.xreq_channel.shutdown()
749 # Pause the heart beat channel if it exists.
750 if self._hb_channel is not None:
751 self._hb_channel.pause()
752
747 # Don't send any additional kernel kill messages immediately, to give
753 # Don't send any additional kernel kill messages immediately, to give
748 # the kernel a chance to properly execute shutdown actions. Wait for at
754 # the kernel a chance to properly execute shutdown actions. Wait for at
749 # most 1s, checking every 0.1s.
755 # most 1s, checking every 0.1s.
756 self.xreq_channel.shutdown()
750 for i in range(10):
757 for i in range(10):
751 if self.is_alive:
758 if self.is_alive:
752 time.sleep(0.1)
759 time.sleep(0.1)
@@ -798,6 +805,10 b' class KernelManager(HasTraits):'
798 def kill_kernel(self):
805 def kill_kernel(self):
799 """ Kill the running kernel. """
806 """ Kill the running kernel. """
800 if self.has_kernel:
807 if self.has_kernel:
808 # Pause the heart beat channel if it exists.
809 if self._hb_channel is not None:
810 self._hb_channel.pause()
811
801 self.kernel.kill()
812 self.kernel.kill()
802 self.kernel = None
813 self.kernel = None
803 else:
814 else:
General Comments 0
You need to be logged in to leave comments. Login now