##// END OF EJS Templates
Starting to refactor heart beating of notebook kernels.
Brian E. Granger -
Show More
@@ -466,10 +466,9 b' class AuthenticatedZMQStreamHandler(ZMQStreamHandler):'
466 466 class IOPubHandler(AuthenticatedZMQStreamHandler):
467 467
468 468 def initialize(self, *args, **kwargs):
469 self._kernel_alive = True
470 self._beating = False
471 469 self.iopub_stream = None
472 470 self.hb_stream = None
471 self.heartbeat = None
473 472
474 473 def on_first_message(self, msg):
475 474 try:
@@ -478,12 +477,13 b' class IOPubHandler(AuthenticatedZMQStreamHandler):'
478 477 self.close()
479 478 return
480 479 km = self.application.kernel_manager
481 self.time_to_dead = km.time_to_dead
482 self.first_beat = km.first_beat
483 480 kernel_id = self.kernel_id
484 481 try:
485 482 self.iopub_stream = km.create_iopub_stream(kernel_id)
486 483 self.hb_stream = km.create_hb_stream(kernel_id)
484 self.heartbeat = Heartbeat(
485 stream=self.hb_stream, config=self.application.config
486 )
487 487 except web.HTTPError:
488 488 # WebSockets don't response to traditional error codes so we
489 489 # close the connection.
@@ -492,7 +492,7 b' class IOPubHandler(AuthenticatedZMQStreamHandler):'
492 492 self.close()
493 493 else:
494 494 self.iopub_stream.on_recv(self._on_zmq_reply)
495 self.start_hb(self.kernel_died)
495 self.heartbeat.start(self.kernel_died)
496 496
497 497 def on_message(self, msg):
498 498 pass
@@ -507,51 +507,7 b' class IOPubHandler(AuthenticatedZMQStreamHandler):'
507 507 self.iopub_stream.close()
508 508 if self.hb_stream is not None and not self.hb_stream.closed():
509 509 self.hb_stream.close()
510
511 def start_hb(self, callback):
512 """Start the heartbeating and call the callback if the kernel dies."""
513 if not self._beating:
514 self._kernel_alive = True
515
516 def ping_or_dead():
517 self.hb_stream.flush()
518 if self._kernel_alive:
519 self._kernel_alive = False
520 self.hb_stream.send(b'ping')
521 # flush stream to force immediate socket send
522 self.hb_stream.flush()
523 else:
524 try:
525 callback()
526 except:
527 pass
528 finally:
529 self.stop_hb()
530
531 def beat_received(msg):
532 self._kernel_alive = True
533
534 self.hb_stream.on_recv(beat_received)
535 loop = ioloop.IOLoop.instance()
536 self._hb_periodic_callback = ioloop.PeriodicCallback(ping_or_dead, self.time_to_dead*1000, loop)
537 loop.add_timeout(time.time()+self.first_beat, self._really_start_hb)
538 self._beating= True
539
540 def _really_start_hb(self):
541 """callback for delayed heartbeat start
542
543 Only start the hb loop if we haven't been closed during the wait.
544 """
545 if self._beating and not self.hb_stream.closed():
546 self._hb_periodic_callback.start()
547
548 def stop_hb(self):
549 """Stop the heartbeating and cancel all related callbacks."""
550 if self._beating:
551 self._beating = False
552 self._hb_periodic_callback.stop()
553 if not self.hb_stream.closed():
554 self.hb_stream.on_recv(None)
510 # stop the heartbeat here
555 511
556 512 def _delete_kernel_data(self):
557 513 """Remove the kernel data and notebook mapping."""
@@ -22,6 +22,7 b' from IPython.kernel.multikernelmanager import MultiKernelManager'
22 22 from IPython.utils.traitlets import (
23 23 Dict, List, Unicode, Float, Integer,
24 24 )
25
25 26 #-----------------------------------------------------------------------------
26 27 # Classes
27 28 #-----------------------------------------------------------------------------
General Comments 0
You need to be logged in to leave comments. Login now