|
|
"""A ZMQStream based heartbeat.
|
|
|
|
|
|
This code is currently not used.
|
|
|
|
|
|
Authors:
|
|
|
|
|
|
* Brian Granger
|
|
|
"""
|
|
|
|
|
|
#-----------------------------------------------------------------------------
|
|
|
# Copyright (C) 2008-2011 The IPython Development Team
|
|
|
#
|
|
|
# Distributed under the terms of the BSD License. The full license is in
|
|
|
# the file COPYING, distributed as part of this software.
|
|
|
#-----------------------------------------------------------------------------
|
|
|
|
|
|
#-----------------------------------------------------------------------------
|
|
|
# Imports
|
|
|
#-----------------------------------------------------------------------------
|
|
|
|
|
|
import time
|
|
|
|
|
|
import zmq
|
|
|
from zmq.eventloop import ioloop
|
|
|
|
|
|
|
|
|
from IPython.config.configurable import LoggingConfigurable
|
|
|
from IPython.utils.traitlets import (
|
|
|
Instance, Float, Bool
|
|
|
)
|
|
|
|
|
|
#-----------------------------------------------------------------------------
|
|
|
# Classes
|
|
|
#-----------------------------------------------------------------------------
|
|
|
|
|
|
class Heartbeat(LoggingConfigurable):
|
|
|
|
|
|
context = Instance('zmq.Context')
|
|
|
def _context_default(self):
|
|
|
return zmq.Context.instance()
|
|
|
|
|
|
loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
|
|
|
def _loop_default(self):
|
|
|
return ioloop.IOLoop.instance()
|
|
|
|
|
|
stream = Instance('zmq.eventloop.zmqstream.ZMQStream')
|
|
|
|
|
|
_beating = Bool(False)
|
|
|
_kernel_alive = Bool(True)
|
|
|
|
|
|
time_to_dead = Float(3.0, config=True, help="""Kernel heartbeat interval in seconds.""")
|
|
|
first_beat = Float(5.0, config=True, help="Delay (in seconds) before sending first heartbeat.")
|
|
|
|
|
|
def start(self, callback):
|
|
|
"""Start the heartbeating and call the callback if the kernel dies."""
|
|
|
if not self._beating:
|
|
|
self._kernel_alive = True
|
|
|
|
|
|
def ping_or_dead():
|
|
|
self.stream.flush()
|
|
|
if self._kernel_alive:
|
|
|
self._kernel_alive = False
|
|
|
self.stream.send(b'ping')
|
|
|
# flush stream to force immediate socket send
|
|
|
self.stream.flush()
|
|
|
else:
|
|
|
try:
|
|
|
callback()
|
|
|
except:
|
|
|
pass
|
|
|
finally:
|
|
|
self.stop()
|
|
|
|
|
|
def beat_received(msg):
|
|
|
self._kernel_alive = True
|
|
|
|
|
|
self.stream.on_recv(beat_received)
|
|
|
self._hb_periodic_callback = ioloop.PeriodicCallback(
|
|
|
ping_or_dead, self.time_to_dead*1000, self.loop
|
|
|
)
|
|
|
self.loop.add_timeout(time.time()+self.first_beat, self._really_start_hb)
|
|
|
self._beating= True
|
|
|
|
|
|
def _really_start_hb(self):
|
|
|
"""callback for delayed heartbeat start
|
|
|
|
|
|
Only start the hb loop if we haven't been closed during the wait.
|
|
|
"""
|
|
|
if self._beating and not self.stream.closed():
|
|
|
self._hb_periodic_callback.start()
|
|
|
|
|
|
def stop(self):
|
|
|
"""Stop the heartbeating and cancel all related callbacks."""
|
|
|
if self._beating:
|
|
|
self._beating = False
|
|
|
self._hb_periodic_callback.stop()
|
|
|
if not self.stream.closed():
|
|
|
self.stream.on_recv(None)
|
|
|
|
|
|
def pause(self):
|
|
|
"""Pause the heartbeat."""
|
|
|
pass
|
|
|
|
|
|
def unpause(self):
|
|
|
"""Unpase the heartbeat."""
|
|
|
pass
|
|
|
|