##// END OF EJS Templates
wrap `client` method for InProcessKM
wrap `client` method for InProcessKM

File last commit:

r10279:e8125122
r10304:608887ab
Show More
heartbeat.py
106 lines | 3.3 KiB | text/x-python | PythonLexer
Brian E. Granger
More work on the heartbeat.
r10274 """A ZMQStream based heartbeat.
Brian E. Granger
First go at adding autorestart to MultiKernelManager.
r10279 This code is currently not used.
Brian E. Granger
More work on the heartbeat.
r10274 Authors:
* Brian Granger
"""
#-----------------------------------------------------------------------------
# Copyright (C) 2008-2011 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
import time
import zmq
from zmq.eventloop import ioloop
from IPython.config.configurable import LoggingConfigurable
from IPython.utils.traitlets import (
Instance, Float, Bool
)
#-----------------------------------------------------------------------------
# Classes
#-----------------------------------------------------------------------------
class Heartbeat(LoggingConfigurable):
context = Instance('zmq.Context')
def _context_default(self):
return zmq.Context.instance()
loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
def _loop_default(self):
return ioloop.IOLoop.instance()
stream = Instance('zmq.eventloop.zmqstream.ZMQStream')
_beating = Bool(False)
_kernel_alive = Bool(True)
time_to_dead = Float(3.0, config=True, help="""Kernel heartbeat interval in seconds.""")
first_beat = Float(5.0, config=True, help="Delay (in seconds) before sending first heartbeat.")
def start(self, callback):
"""Start the heartbeating and call the callback if the kernel dies."""
if not self._beating:
self._kernel_alive = True
def ping_or_dead():
self.stream.flush()
if self._kernel_alive:
self._kernel_alive = False
self.stream.send(b'ping')
# flush stream to force immediate socket send
self.stream.flush()
else:
try:
callback()
except:
pass
finally:
self.stop()
def beat_received(msg):
self._kernel_alive = True
self.stream.on_recv(beat_received)
self._hb_periodic_callback = ioloop.PeriodicCallback(
ping_or_dead, self.time_to_dead*1000, self.loop
)
self.loop.add_timeout(time.time()+self.first_beat, self._really_start_hb)
self._beating= True
def _really_start_hb(self):
"""callback for delayed heartbeat start
Only start the hb loop if we haven't been closed during the wait.
"""
if self._beating and not self.stream.closed():
self._hb_periodic_callback.start()
def stop(self):
"""Stop the heartbeating and cancel all related callbacks."""
if self._beating:
self._beating = False
self._hb_periodic_callback.stop()
if not self.stream.closed():
self.stream.on_recv(None)
def pause(self):
"""Pause the heartbeat."""
pass
def unpause(self):
"""Unpase the heartbeat."""
pass