##// END OF EJS Templates
Added heartbeat support.
Brian Granger -
Show More
@@ -0,0 +1,43
1 """The client and server for a basic ping-pong style heartbeat.
2 """
3
4 #-----------------------------------------------------------------------------
5 # Copyright (C) 2008-2010 The IPython Development Team
6 #
7 # Distributed under the terms of the BSD License. The full license is in
8 # the file COPYING, distributed as part of this software.
9 #-----------------------------------------------------------------------------
10
11 #-----------------------------------------------------------------------------
12 # Imports
13 #-----------------------------------------------------------------------------
14
15 import sys
16 from threading import Thread
17
18 import zmq
19
20 #-----------------------------------------------------------------------------
21 # Code
22 #-----------------------------------------------------------------------------
23
24
25 class Heartbeat(Thread):
26 "A simple ping-pong style heartbeat that runs in a thread."
27
28 def __init__(self, context, addr=('127.0.0.1', 0)):
29 Thread.__init__(self)
30 self.context = context
31 self.addr = addr
32 self.ip = addr[0]
33 self.port = addr[1]
34 self.daemon = True
35
36 def run(self):
37 self.socket = self.context.socket(zmq.REP)
38 if self.port == 0:
39 self.port = self.socket.bind_to_random_port('tcp://%s' % self.ip)
40 else:
41 self.socket.bind('tcp://%s:%i' % self.addr)
42 zmq.device(zmq.FORWARDER, self.socket, self.socket)
43
@@ -31,8 +31,9 class BaseFrontendMixin(object):
31 31 # Disconnect the old kernel manager's channels.
32 32 old_manager.sub_channel.message_received.disconnect(self._dispatch)
33 33 old_manager.xreq_channel.message_received.disconnect(self._dispatch)
34 old_manager.rep_channel.message_received.connect(self._dispatch)
35
34 old_manager.rep_channel.message_received.disconnect(self._dispatch)
35 old_manager.hb_channel.kernel_died.disconnect(self._handle_kernel_died)
36
36 37 # Handle the case where the old kernel manager is still listening.
37 38 if old_manager.channels_running:
38 39 self._stopped_channels()
@@ -50,7 +51,8 class BaseFrontendMixin(object):
50 51 kernel_manager.sub_channel.message_received.connect(self._dispatch)
51 52 kernel_manager.xreq_channel.message_received.connect(self._dispatch)
52 53 kernel_manager.rep_channel.message_received.connect(self._dispatch)
53
54 kernel_manager.hb_channel.kernel_died.connect(self._handle_kernel_died)
55
54 56 # Handle the case where the kernel manager started channels before
55 57 # we connected.
56 58 if kernel_manager.channels_running:
@@ -91,3 +93,17 class BaseFrontendMixin(object):
91 93 """
92 94 session = self._kernel_manager.session.session
93 95 return msg['parent_header']['session'] == session
96
97 def _handle_kernel_died(self, since_last_heartbeat):
98 """ This is called when the ``kernel_died`` signal is emitted.
99
100 This method is called when the kernel heartbeat has not been
101 active for a certain amount of time. The typical action will be to
102 give the user the option of restarting the kernel.
103
104 Parameters
105 ----------
106 since_last_heartbeat : float
107 The time since the heartbeat was last received.
108 """
109 pass
@@ -90,6 +90,7 class FrontendWidget(HistoryConsoleWidget, BaseFrontendMixin):
90 90
91 91 # Protected class variables.
92 92 _input_splitter_class = InputSplitter
93 _possible_kernel_restart = Bool(False)
93 94
94 95 #---------------------------------------------------------------------------
95 96 # 'object' interface
@@ -176,7 +177,8 class FrontendWidget(HistoryConsoleWidget, BaseFrontendMixin):
176 177 self._kernel_interrupt()
177 178 return True
178 179 elif key == QtCore.Qt.Key_Period:
179 self._kernel_restart()
180 message = 'Are you sure you want to restart the kernel?'
181 self._kernel_restart(message)
180 182 return True
181 183 return super(FrontendWidget, self)._event_filter_console_keypress(event)
182 184
@@ -348,29 +350,38 class FrontendWidget(HistoryConsoleWidget, BaseFrontendMixin):
348 350 self._append_plain_text('Kernel process is either remote or '
349 351 'unspecified. Cannot interrupt.\n')
350 352
351 def _kernel_restart(self):
353 def _kernel_restart(self, message):
352 354 """ Attempts to restart the running kernel.
353 355 """
354 if self.custom_restart:
355 self.custom_restart_requested.emit()
356 elif self.kernel_manager.has_kernel:
357 message = 'Are you sure you want to restart the kernel?'
358 buttons = QtGui.QMessageBox.Yes | QtGui.QMessageBox.No
359 result = QtGui.QMessageBox.question(self, 'Restart kernel?',
360 message, buttons)
361 if result == QtGui.QMessageBox.Yes:
362 try:
363 self.kernel_manager.restart_kernel()
364 except RuntimeError:
365 message = 'Kernel started externally. Cannot restart.\n'
366 self._append_plain_text(message)
367 else:
368 self._stopped_channels()
369 self._append_plain_text('Kernel restarting...\n')
370 self._show_interpreter_prompt()
371 else:
372 self._append_plain_text('Kernel process is either remote or '
373 'unspecified. Cannot restart.\n')
356 # We want to make sure that if this dialog is already happening, that
357 # other signals don't trigger it again. This can happen when the
358 # kernel_died heartbeat signal is emitted and the user is slow to
359 # respond to the dialog.
360 if not self._possible_kernel_restart:
361 if self.custom_restart:
362 self.custom_restart_requested.emit()
363 elif self.kernel_manager.has_kernel:
364 # Setting this to True will prevent this logic from happening
365 # again until the current pass is completed.
366 self._possible_kernel_restart = True
367 buttons = QtGui.QMessageBox.Yes | QtGui.QMessageBox.No
368 result = QtGui.QMessageBox.question(self, 'Restart kernel?',
369 message, buttons)
370 if result == QtGui.QMessageBox.Yes:
371 try:
372 self.kernel_manager.restart_kernel()
373 except RuntimeError:
374 message = 'Kernel started externally. Cannot restart.\n'
375 self._append_plain_text(message)
376 else:
377 self._stopped_channels()
378 self._append_plain_text('Kernel restarting...\n')
379 self._show_interpreter_prompt()
380 # This might need to be moved to another location?
381 self._possible_kernel_restart = False
382 else:
383 self._append_plain_text('Kernel process is either remote or '
384 'unspecified. Cannot restart.\n')
374 385
375 386 def _process_execute_abort(self, msg):
376 387 """ Process a reply for an aborted execution request.
@@ -154,6 +154,14 class IPythonWidget(FrontendWidget):
154 154 # FIXME: Disabled until history requests are properly implemented.
155 155 #self.kernel_manager.xreq_channel.history(raw=True, output=False)
156 156
157 def _handle_kernel_died(self, since_last_heartbeat):
158 """ Handle the kernel's death by asking if the user wants to restart.
159 """
160 message = 'The kernel heartbeat has been inactive for %.2f ' \
161 'seconds. Do you want to restart the kernel? You may ' \
162 'first want to check the network connection.' % since_last_heartbeat
163 self._kernel_restart(message)
164
157 165 #---------------------------------------------------------------------------
158 166 # 'FrontendWidget' interface
159 167 #---------------------------------------------------------------------------
@@ -33,6 +33,8 def main():
33 33 help='set the SUB channel port [default random]')
34 34 kgroup.add_argument('--rep', type=int, metavar='PORT', default=0,
35 35 help='set the REP channel port [default random]')
36 kgroup.add_argument('--hb', type=int, metavar='PORT', default=0,
37 help='set the heartbeat port [default: random]')
36 38
37 39 egroup = kgroup.add_mutually_exclusive_group()
38 40 egroup.add_argument('--pure', action='store_true', help = \
@@ -61,7 +63,8 def main():
61 63 # Create a KernelManager and start a kernel.
62 64 kernel_manager = QtKernelManager(xreq_address=(args.ip, args.xreq),
63 65 sub_address=(args.ip, args.sub),
64 rep_address=(args.ip, args.rep))
66 rep_address=(args.ip, args.rep),
67 hb_address=(args.ip, args.hb))
65 68 if args.ip == LOCALHOST and not args.existing:
66 69 if args.pure:
67 70 kernel_manager.start_kernel(ipython=False)
@@ -8,7 +8,7 import zmq
8 8 # IPython imports.
9 9 from IPython.utils.traitlets import Type
10 10 from IPython.zmq.kernelmanager import KernelManager, SubSocketChannel, \
11 XReqSocketChannel, RepSocketChannel
11 XReqSocketChannel, RepSocketChannel, HBSocketChannel
12 12 from util import MetaQObjectHasTraits
13 13
14 14 # When doing multiple inheritance from QtCore.QObject and other classes
@@ -149,6 +149,32 class QtRepSocketChannel(RepSocketChannel, QtCore.QObject):
149 149 self.input_requested.emit(msg)
150 150
151 151
152 class QtHBSocketChannel(HBSocketChannel, QtCore.QObject):
153
154 # Emitted when the kernel has died.
155 kernel_died = QtCore.pyqtSignal(object)
156
157 #---------------------------------------------------------------------------
158 # 'object' interface
159 #---------------------------------------------------------------------------
160
161 def __init__(self, *args, **kw):
162 """ Reimplemented to ensure that QtCore.QObject is initialized first.
163 """
164 QtCore.QObject.__init__(self)
165 HBSocketChannel.__init__(self, *args, **kw)
166
167 #---------------------------------------------------------------------------
168 # 'RepSocketChannel' interface
169 #---------------------------------------------------------------------------
170
171 def call_handlers(self, since_last_heartbeat):
172 """ Reimplemented to emit signals instead of making callbacks.
173 """
174 # Emit the generic signal.
175 self.kernel_died.emit(since_last_heartbeat)
176
177
152 178 class QtKernelManager(KernelManager, QtCore.QObject):
153 179 """ A KernelManager that provides signals and slots.
154 180 """
@@ -165,6 +191,7 class QtKernelManager(KernelManager, QtCore.QObject):
165 191 sub_channel_class = Type(QtSubSocketChannel)
166 192 xreq_channel_class = Type(QtXReqSocketChannel)
167 193 rep_channel_class = Type(QtRepSocketChannel)
194 hb_channel_class = Type(QtHBSocketChannel)
168 195
169 196 #---------------------------------------------------------------------------
170 197 # 'object' interface
@@ -19,7 +19,7 from exitpoller import ExitPollerUnix, ExitPollerWindows
19 19 from displayhook import DisplayHook
20 20 from iostream import OutStream
21 21 from session import Session
22
22 from heartbeat import Heartbeat
23 23
24 24 def bind_port(socket, ip, port):
25 25 """ Binds the specified ZMQ socket. If the port is zero, a random port is
@@ -47,6 +47,8 def make_argument_parser():
47 47 help='set the PUB channel port [default: random]')
48 48 parser.add_argument('--req', type=int, metavar='PORT', default=0,
49 49 help='set the REQ channel port [default: random]')
50 parser.add_argument('--hb', type=int, metavar='PORT', default=0,
51 help='set the heartbeat port [default: random]')
50 52
51 53 if sys.platform == 'win32':
52 54 parser.add_argument('--parent', type=int, metavar='HANDLE',
@@ -84,6 +86,10 def make_kernel(namespace, kernel_factory,
84 86 req_port = bind_port(req_socket, namespace.ip, namespace.req)
85 87 io.raw_print("REQ Channel on port", req_port)
86 88
89 hb = Heartbeat(context, (namespace.ip, namespace.hb))
90 hb.start()
91 io.raw_print("Heartbeat REP Channel on port", hb.port)
92
87 93 # Redirect input streams and set a display hook.
88 94 if out_stream_factory:
89 95 pass
@@ -122,7 +128,7 def make_default_main(kernel_factory):
122 128 return main
123 129
124 130
125 def base_launch_kernel(code, xrep_port=0, pub_port=0, req_port=0,
131 def base_launch_kernel(code, xrep_port=0, pub_port=0, req_port=0, hb_port=0,
126 132 independent=False, extra_arguments=[]):
127 133 """ Launches a localhost kernel, binding to the specified ports.
128 134
@@ -140,6 +146,9 def base_launch_kernel(code, xrep_port=0, pub_port=0, req_port=0,
140 146 req_port : int, optional
141 147 The port to use for the REQ (raw input) channel.
142 148
149 hb_port : int, optional
150 The port to use for the hearbeat REP channel.
151
143 152 independent : bool, optional (default False)
144 153 If set, the kernel process is guaranteed to survive if this process
145 154 dies. If not set, an effort is made to ensure that the kernel is killed
@@ -157,7 +166,8 def base_launch_kernel(code, xrep_port=0, pub_port=0, req_port=0,
157 166 """
158 167 # Find open ports as necessary.
159 168 ports = []
160 ports_needed = int(xrep_port <= 0) + int(pub_port <= 0) + int(req_port <= 0)
169 ports_needed = int(xrep_port <= 0) + int(pub_port <= 0) + \
170 int(req_port <= 0) + int(hb_port <= 0)
161 171 for i in xrange(ports_needed):
162 172 sock = socket.socket()
163 173 sock.bind(('', 0))
@@ -172,10 +182,13 def base_launch_kernel(code, xrep_port=0, pub_port=0, req_port=0,
172 182 pub_port = ports.pop(0)
173 183 if req_port <= 0:
174 184 req_port = ports.pop(0)
185 if hb_port <= 0:
186 hb_port = ports.pop(0)
175 187
176 188 # Build the kernel launch command.
177 189 arguments = [ sys.executable, '-c', code, '--xrep', str(xrep_port),
178 '--pub', str(pub_port), '--req', str(req_port) ]
190 '--pub', str(pub_port), '--req', str(req_port),
191 '--hb', str(hb_port) ]
179 192 arguments.extend(extra_arguments)
180 193
181 194 # Spawn a kernel.
@@ -196,4 +209,4 def base_launch_kernel(code, xrep_port=0, pub_port=0, req_port=0,
196 209 else:
197 210 proc = Popen(arguments + ['--parent'])
198 211
199 return proc, xrep_port, pub_port, req_port
212 return proc, xrep_port, pub_port, req_port, hb_port
@@ -377,8 +377,8 class TkKernel(Kernel):
377 377 # Kernel main and launch functions
378 378 #-----------------------------------------------------------------------------
379 379
380 def launch_kernel(xrep_port=0, pub_port=0, req_port=0, independent=False,
381 pylab=False):
380 def launch_kernel(xrep_port=0, pub_port=0, req_port=0, hb_port=0,
381 independent=False, pylab=False):
382 382 """ Launches a localhost kernel, binding to the specified ports.
383 383
384 384 Parameters
@@ -392,6 +392,9 def launch_kernel(xrep_port=0, pub_port=0, req_port=0, independent=False,
392 392 req_port : int, optional
393 393 The port to use for the REQ (raw input) channel.
394 394
395 hb_port : int, optional
396 The port to use for the hearbeat REP channel.
397
395 398 independent : bool, optional (default False)
396 399 If set, the kernel process is guaranteed to survive if this process
397 400 dies. If not set, an effort is made to ensure that the kernel is killed
@@ -415,8 +418,8 def launch_kernel(xrep_port=0, pub_port=0, req_port=0, independent=False,
415 418 if isinstance(pylab, basestring):
416 419 extra_arguments.append(pylab)
417 420 return base_launch_kernel('from IPython.zmq.ipkernel import main; main()',
418 xrep_port, pub_port, req_port, independent,
419 extra_arguments)
421 xrep_port, pub_port, req_port, hb_port,
422 independent, extra_arguments)
420 423
421 424 def main():
422 425 """ The IPython kernel main entry point.
@@ -447,6 +447,71 class RepSocketChannel(ZmqSocketChannel):
447 447 self.add_io_state(POLLOUT)
448 448
449 449
450 class HBSocketChannel(ZmqSocketChannel):
451 """The heartbeat channel which monitors the kernel heartbeat.
452 """
453
454 time_to_dead = 5.0
455 socket = None
456 poller = None
457
458 def __init__(self, context, session, address):
459 super(HBSocketChannel, self).__init__(context, session, address)
460
461 def _create_socket(self):
462 self.socket = self.context.socket(zmq.REQ)
463 self.socket.setsockopt(zmq.IDENTITY, self.session.session)
464 self.socket.connect('tcp://%s:%i' % self.address)
465 self.poller = zmq.Poller()
466 self.poller.register(self.socket, zmq.POLLIN)
467
468 def run(self):
469 """The thread's main activity. Call start() instead."""
470 self._create_socket()
471
472 while True:
473 since_last_heartbeat = 0.0
474 request_time = time.time()
475 try:
476 self.socket.send_json('ping')
477 except zmq.ZMQError, e:
478 if e.errno == zmq.EFSM:
479 time.sleep(self.time_to_dead)
480 self._create_socket()
481 else:
482 raise
483 else:
484 while True:
485 try:
486 reply = self.socket.recv_json(zmq.NOBLOCK)
487 except zmq.ZMQError, e:
488 if e.errno == zmq.EAGAIN:
489 until_dead = self.time_to_dead-(time.time()-request_time)
490 self.poller.poll(until_dead)
491 since_last_heartbeat = time.time() - request_time
492 if since_last_heartbeat > self.time_to_dead:
493 self.call_handlers(since_last_heartbeat)
494 break
495 else:
496 # We should probably log this instead
497 raise
498 else:
499 until_dead = self.time_to_dead-(time.time()-request_time)
500 if until_dead > 0.0:
501 time.sleep(until_dead)
502 break
503
504 def call_handlers(self, since_last_heartbeat):
505 """This method is called in the ioloop thread when a message arrives.
506
507 Subclasses should override this method to handle incoming messages.
508 It is important to remember that this method is called in the thread
509 so that some logic must be done to ensure that the application leve
510 handlers are called in the application thread.
511 """
512 raise NotImplementedError('call_handlers must be defined in a subclass.')
513
514
450 515 #-----------------------------------------------------------------------------
451 516 # Main kernel manager class
452 517 #-----------------------------------------------------------------------------
@@ -475,17 +540,20 class KernelManager(HasTraits):
475 540 xreq_address = TCPAddress((LOCALHOST, 0))
476 541 sub_address = TCPAddress((LOCALHOST, 0))
477 542 rep_address = TCPAddress((LOCALHOST, 0))
543 hb_address = TCPAddress((LOCALHOST, 0))
478 544
479 545 # The classes to use for the various channels.
480 546 xreq_channel_class = Type(XReqSocketChannel)
481 547 sub_channel_class = Type(SubSocketChannel)
482 548 rep_channel_class = Type(RepSocketChannel)
549 hb_channel_class = Type(HBSocketChannel)
483 550
484 551 # Protected traits.
485 552 _launch_args = Any
486 553 _xreq_channel = Any
487 554 _sub_channel = Any
488 555 _rep_channel = Any
556 _hb_channel = Any
489 557
490 558 #--------------------------------------------------------------------------
491 559 # Channel management methods:
@@ -502,6 +570,7 class KernelManager(HasTraits):
502 570 self.xreq_channel.start()
503 571 self.sub_channel.start()
504 572 self.rep_channel.start()
573 self.hb_channel.start()
505 574
506 575 def stop_channels(self):
507 576 """Stops the channels for this kernel.
@@ -512,13 +581,15 class KernelManager(HasTraits):
512 581 self.xreq_channel.stop()
513 582 self.sub_channel.stop()
514 583 self.rep_channel.stop()
584 self.hb_channel.stop()
515 585
516 586 @property
517 587 def channels_running(self):
518 588 """Are all of the channels created and running?"""
519 589 return self.xreq_channel.is_alive() \
520 590 and self.sub_channel.is_alive() \
521 and self.rep_channel.is_alive()
591 and self.rep_channel.is_alive() \
592 and self.hb_channel.is_alive()
522 593
523 594 #--------------------------------------------------------------------------
524 595 # Kernel process management methods:
@@ -535,8 +606,9 class KernelManager(HasTraits):
535 606 ipython : bool, optional (default True)
536 607 Whether to use an IPython kernel instead of a plain Python kernel.
537 608 """
538 xreq, sub, rep = self.xreq_address, self.sub_address, self.rep_address
539 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST or rep[0] != LOCALHOST:
609 xreq, sub, rep, hb = self.xreq_address, self.sub_address, \
610 self.rep_address, self.hb_address
611 if xreq[0] != LOCALHOST or sub[0] != LOCALHOST or rep[0] != LOCALHOST or hb[0] != LOCALHOST:
540 612 raise RuntimeError("Can only launch a kernel on localhost."
541 613 "Make sure that the '*_address' attributes are "
542 614 "configured properly.")
@@ -546,11 +618,13 class KernelManager(HasTraits):
546 618 from ipkernel import launch_kernel as launch
547 619 else:
548 620 from pykernel import launch_kernel as launch
549 self.kernel, xrep, pub, req = launch(xrep_port=xreq[1], pub_port=sub[1],
550 req_port=rep[1], **kw)
621 self.kernel, xrep, pub, req, hb = launch(
622 xrep_port=xreq[1], pub_port=sub[1], req_port=rep[1],
623 hb_port=hb[1], **kw)
551 624 self.xreq_address = (LOCALHOST, xrep)
552 625 self.sub_address = (LOCALHOST, pub)
553 626 self.rep_address = (LOCALHOST, req)
627 self.hb_address = (LOCALHOST, hb)
554 628
555 629 def restart_kernel(self):
556 630 """Restarts a kernel with the same arguments that were used to launch
@@ -630,3 +704,12 class KernelManager(HasTraits):
630 704 self.session,
631 705 self.rep_address)
632 706 return self._rep_channel
707
708 @property
709 def hb_channel(self):
710 """Get the REP socket channel object to handle stdin (raw_input)."""
711 if self._hb_channel is None:
712 self._hb_channel = self.hb_channel_class(self.context,
713 self.session,
714 self.hb_address)
715 return self._hb_channel
General Comments 0
You need to be logged in to leave comments. Login now