##// END OF EJS Templates
Integrated the heart beat pausing/unpausing logic with the (Qt)KernelManager.
epatters -
Show More
@@ -1,4 +1,4 b''
1 """A base class for console-type widgets.
1 """ An abstract base class for console-type widgets.
2 2 """
3 3 #-----------------------------------------------------------------------------
4 4 # Imports
@@ -57,7 +57,7 b' class ConsoleWidget(Configurable, QtGui.QWidget):'
57 57 kind = Enum(['plain', 'rich'], default_value='plain', config=True)
58 58
59 59 # The type of paging to use. Valid values are:
60 # 'inside' : The widget pages like a traditional terminal pager.
60 # 'inside' : The widget pages like a traditional terminal.
61 61 # 'hsplit' : When paging is requested, the widget is split
62 62 # horizontally. The top pane contains the console, and the
63 63 # bottom pane contains the paged text.
@@ -47,7 +47,7 b' class QtXReqSocketChannel(SocketChannelQObject, XReqSocketChannel):'
47 47 complete_reply = QtCore.pyqtSignal(object)
48 48 object_info_reply = QtCore.pyqtSignal(object)
49 49
50 # Emitted when the first reply comes back
50 # Emitted when the first reply comes back.
51 51 first_reply = QtCore.pyqtSignal()
52 52
53 53 # Used by the first_reply signal logic to determine if a reply is the
@@ -72,8 +72,11 b' class QtXReqSocketChannel(SocketChannelQObject, XReqSocketChannel):'
72 72
73 73 if not self._handlers_called:
74 74 self.first_reply.emit()
75 self._handlers_called = True
75 76
76 self._handlers_called = True
77 #---------------------------------------------------------------------------
78 # 'QtXReqSocketChannel' interface
79 #---------------------------------------------------------------------------
77 80
78 81 def reset_first_reply(self):
79 82 """ Reset the first_reply signal to fire again on the next reply.
@@ -188,6 +191,17 b' class QtKernelManager(KernelManager, SuperQObject):'
188 191 #---------------------------------------------------------------------------
189 192 # 'KernelManager' interface
190 193 #---------------------------------------------------------------------------
194
195 #------ Kernel process management ------------------------------------------
196
197 def start_kernel(self, *args, **kw):
198 """ Reimplemented for proper heartbeat management.
199 """
200 if self._xreq_channel is not None:
201 self._xreq_channel.reset_first_reply()
202 super(QtKernelManager, self).start_kernel(*args, **kw)
203
204 #------ Channel management -------------------------------------------------
191 205
192 206 def start_channels(self, *args, **kw):
193 207 """ Reimplemented to emit signal.
@@ -200,3 +214,24 b' class QtKernelManager(KernelManager, SuperQObject):'
200 214 """
201 215 super(QtKernelManager, self).stop_channels()
202 216 self.stopped_channels.emit()
217
218 @property
219 def xreq_channel(self):
220 """ Reimplemented for proper heartbeat management.
221 """
222 if self._xreq_channel is None:
223 self._xreq_channel = super(QtKernelManager, self).xreq_channel
224 self._xreq_channel.first_reply.connect(self._first_reply)
225 return self._xreq_channel
226
227 #---------------------------------------------------------------------------
228 # Protected interface
229 #---------------------------------------------------------------------------
230
231 def _first_reply(self):
232 """ Unpauses the heartbeat channel when the first reply is received on
233 the execute channel. Note that this will *not* start the heartbeat
234 channel if it is not already running!
235 """
236 if self._hb_channel is not None:
237 self._hb_channel.unpause()
@@ -509,7 +509,12 b' class RepSocketChannel(ZmqSocketChannel):'
509 509
510 510
511 511 class HBSocketChannel(ZmqSocketChannel):
512 """The heartbeat channel which monitors the kernel heartbeat."""
512 """The heartbeat channel which monitors the kernel heartbeat.
513
514 Note that the heartbeat channel is paused by default. As long as you start
515 this channel, the kernel manager will ensure that it is paused and un-paused
516 as appropriate.
517 """
513 518
514 519 time_to_dead = 3.0
515 520 socket = None
@@ -520,7 +525,7 b' class HBSocketChannel(ZmqSocketChannel):'
520 525 def __init__(self, context, session, address):
521 526 super(HBSocketChannel, self).__init__(context, session, address)
522 527 self._running = False
523 self._pause = False
528 self._pause = True
524 529
525 530 def _create_socket(self):
526 531 self.socket = self.context.socket(zmq.REQ)
@@ -533,12 +538,6 b' class HBSocketChannel(ZmqSocketChannel):'
533 538 """The thread's main activity. Call start() instead."""
534 539 self._create_socket()
535 540 self._running = True
536 # Wait 2 seconds for the kernel to come up and the sockets to auto
537 # connect. If we don't we will see the kernel as dead. Also, before
538 # the sockets are connected, the poller.poll line below is returning
539 # too fast. This avoids that because the polling doesn't start until
540 # after the sockets are connected.
541 time.sleep(2.0)
542 541 while self._running:
543 542 if self._pause:
544 543 time.sleep(self.time_to_dead)
@@ -567,14 +566,15 b' class HBSocketChannel(ZmqSocketChannel):'
567 566 until_dead = self.time_to_dead - (before_poll -
568 567 request_time)
569 568
570 # When the return value of poll() is an empty list,
571 # that is when things have gone wrong (zeromq bug).
572 # As long as it is not an empty list, poll is
573 # working correctly even if it returns quickly.
574 # Note: poll timeout is in milliseconds.
569 # When the return value of poll() is an empty
570 # list, that is when things have gone wrong
571 # (zeromq bug). As long as it is not an empty
572 # list, poll is working correctly even if it
573 # returns quickly. Note: poll timeout is in
574 # milliseconds.
575 575 self.poller.poll(1000*until_dead)
576 576
577 since_last_heartbeat = time.time() - request_time
577 since_last_heartbeat = time.time()-request_time
578 578 if since_last_heartbeat > self.time_to_dead:
579 579 self.call_handlers(since_last_heartbeat)
580 580 break
@@ -666,8 +666,8 b' class KernelManager(HasTraits):'
666 666 # Channel management methods:
667 667 #--------------------------------------------------------------------------
668 668
669 def start_channels(self, xreq=True, sub=True, rep=True):
670 """Starts the channels for this kernel, but not the heartbeat.
669 def start_channels(self, xreq=True, sub=True, rep=True, hb=True):
670 """Starts the channels for this kernel.
671 671
672 672 This will create the channels if they do not exist and then start
673 673 them. If port numbers of 0 are being used (random ports) then you
@@ -680,6 +680,8 b' class KernelManager(HasTraits):'
680 680 self.sub_channel.start()
681 681 if rep:
682 682 self.rep_channel.start()
683 if hb:
684 self.hb_channel.start()
683 685
684 686 def stop_channels(self):
685 687 """Stops all the running channels for this kernel.
@@ -690,13 +692,14 b' class KernelManager(HasTraits):'
690 692 self.sub_channel.stop()
691 693 if self.rep_channel.is_alive():
692 694 self.rep_channel.stop()
695 if self.hb_channel.is_alive():
696 self.hb_channel.stop()
693 697
694 698 @property
695 699 def channels_running(self):
696 700 """Are any of the channels created and running?"""
697 return self.xreq_channel.is_alive() \
698 or self.sub_channel.is_alive() \
699 or self.rep_channel.is_alive()
701 return (self.xreq_channel.is_alive() or self.sub_channel.is_alive() or
702 self.rep_channel.is_alive() or self.hb_channel.is_alive())
700 703
701 704 #--------------------------------------------------------------------------
702 705 # Kernel process management methods:
@@ -743,10 +746,14 b' class KernelManager(HasTraits):'
743 746 self.kill_kernel()
744 747 return
745 748
746 self.xreq_channel.shutdown()
749 # Pause the heart beat channel if it exists.
750 if self._hb_channel is not None:
751 self._hb_channel.pause()
752
747 753 # Don't send any additional kernel kill messages immediately, to give
748 754 # the kernel a chance to properly execute shutdown actions. Wait for at
749 755 # most 1s, checking every 0.1s.
756 self.xreq_channel.shutdown()
750 757 for i in range(10):
751 758 if self.is_alive:
752 759 time.sleep(0.1)
@@ -798,6 +805,10 b' class KernelManager(HasTraits):'
798 805 def kill_kernel(self):
799 806 """ Kill the running kernel. """
800 807 if self.has_kernel:
808 # Pause the heart beat channel if it exists.
809 if self._hb_channel is not None:
810 self._hb_channel.pause()
811
801 812 self.kernel.kill()
802 813 self.kernel = None
803 814 else:
General Comments 0
You need to be logged in to leave comments. Login now