##// END OF EJS Templates
relocate redundantly-named kernel files...
relocate redundantly-named kernel files uses subpackages for different subclasses (blocking, ioloop)

File last commit:

r10279:e8125122
r10283:124bcff9
Show More
heartbeat.py
106 lines | 3.3 KiB | text/x-python | PythonLexer
"""A ZMQStream based heartbeat.
This code is currently not used.
Authors:
* Brian Granger
"""
#-----------------------------------------------------------------------------
# Copyright (C) 2008-2011 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
import time
import zmq
from zmq.eventloop import ioloop
from IPython.config.configurable import LoggingConfigurable
from IPython.utils.traitlets import (
Instance, Float, Bool
)
#-----------------------------------------------------------------------------
# Classes
#-----------------------------------------------------------------------------
class Heartbeat(LoggingConfigurable):
context = Instance('zmq.Context')
def _context_default(self):
return zmq.Context.instance()
loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
def _loop_default(self):
return ioloop.IOLoop.instance()
stream = Instance('zmq.eventloop.zmqstream.ZMQStream')
_beating = Bool(False)
_kernel_alive = Bool(True)
time_to_dead = Float(3.0, config=True, help="""Kernel heartbeat interval in seconds.""")
first_beat = Float(5.0, config=True, help="Delay (in seconds) before sending first heartbeat.")
def start(self, callback):
"""Start the heartbeating and call the callback if the kernel dies."""
if not self._beating:
self._kernel_alive = True
def ping_or_dead():
self.stream.flush()
if self._kernel_alive:
self._kernel_alive = False
self.stream.send(b'ping')
# flush stream to force immediate socket send
self.stream.flush()
else:
try:
callback()
except:
pass
finally:
self.stop()
def beat_received(msg):
self._kernel_alive = True
self.stream.on_recv(beat_received)
self._hb_periodic_callback = ioloop.PeriodicCallback(
ping_or_dead, self.time_to_dead*1000, self.loop
)
self.loop.add_timeout(time.time()+self.first_beat, self._really_start_hb)
self._beating= True
def _really_start_hb(self):
"""callback for delayed heartbeat start
Only start the hb loop if we haven't been closed during the wait.
"""
if self._beating and not self.stream.closed():
self._hb_periodic_callback.start()
def stop(self):
"""Stop the heartbeating and cancel all related callbacks."""
if self._beating:
self._beating = False
self._hb_periodic_callback.stop()
if not self.stream.closed():
self.stream.on_recv(None)
def pause(self):
"""Pause the heartbeat."""
pass
def unpause(self):
"""Unpase the heartbeat."""
pass