|
|
#!/usr/bin/env python
|
|
|
"""
|
|
|
A multi-heart Heartbeat system using PUB and ROUTER sockets. pings are sent out on the PUB,
|
|
|
and hearts are tracked based on their DEALER identities.
|
|
|
"""
|
|
|
|
|
|
# Copyright (c) IPython Development Team.
|
|
|
# Distributed under the terms of the Modified BSD License.
|
|
|
|
|
|
from __future__ import print_function
|
|
|
import time
|
|
|
import uuid
|
|
|
|
|
|
import zmq
|
|
|
from zmq.devices import ThreadDevice, ThreadMonitoredQueue
|
|
|
from zmq.eventloop import ioloop, zmqstream
|
|
|
|
|
|
from IPython.config.configurable import LoggingConfigurable
|
|
|
from IPython.utils.py3compat import str_to_bytes
|
|
|
from IPython.utils.traitlets import Set, Instance, CFloat, Integer, Dict, Bool
|
|
|
|
|
|
from ipython_parallel.util import log_errors
|
|
|
|
|
|
class Heart(object):
|
|
|
"""A basic heart object for responding to a HeartMonitor.
|
|
|
This is a simple wrapper with defaults for the most common
|
|
|
Device model for responding to heartbeats.
|
|
|
|
|
|
It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
|
|
|
SUB/DEALER for in/out.
|
|
|
|
|
|
You can specify the DEALER's IDENTITY via the optional heart_id argument."""
|
|
|
device=None
|
|
|
id=None
|
|
|
def __init__(self, in_addr, out_addr, mon_addr=None, in_type=zmq.SUB, out_type=zmq.DEALER, mon_type=zmq.PUB, heart_id=None):
|
|
|
if mon_addr is None:
|
|
|
self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
|
|
|
else:
|
|
|
self.device = ThreadMonitoredQueue(in_type, out_type, mon_type, in_prefix=b"", out_prefix=b"")
|
|
|
# do not allow the device to share global Context.instance,
|
|
|
# which is the default behavior in pyzmq > 2.1.10
|
|
|
self.device.context_factory = zmq.Context
|
|
|
|
|
|
self.device.daemon=True
|
|
|
self.device.connect_in(in_addr)
|
|
|
self.device.connect_out(out_addr)
|
|
|
if mon_addr is not None:
|
|
|
self.device.connect_mon(mon_addr)
|
|
|
if in_type == zmq.SUB:
|
|
|
self.device.setsockopt_in(zmq.SUBSCRIBE, b"")
|
|
|
if heart_id is None:
|
|
|
heart_id = uuid.uuid4().bytes
|
|
|
self.device.setsockopt_out(zmq.IDENTITY, heart_id)
|
|
|
self.id = heart_id
|
|
|
|
|
|
def start(self):
|
|
|
return self.device.start()
|
|
|
|
|
|
|
|
|
class HeartMonitor(LoggingConfigurable):
|
|
|
"""A basic HeartMonitor class
|
|
|
pingstream: a PUB stream
|
|
|
pongstream: an ROUTER stream
|
|
|
period: the period of the heartbeat in milliseconds"""
|
|
|
|
|
|
debug = Bool(False, config=True,
|
|
|
help="""Whether to include every heartbeat in debugging output.
|
|
|
|
|
|
Has to be set explicitly, because there will be *a lot* of output.
|
|
|
"""
|
|
|
)
|
|
|
period = Integer(3000, config=True,
|
|
|
help='The frequency at which the Hub pings the engines for heartbeats '
|
|
|
'(in ms)',
|
|
|
)
|
|
|
max_heartmonitor_misses = Integer(10, config=True,
|
|
|
help='Allowed consecutive missed pings from controller Hub to engine before unregistering.',
|
|
|
)
|
|
|
|
|
|
pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream', allow_none=True)
|
|
|
pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream', allow_none=True)
|
|
|
loop = Instance('zmq.eventloop.ioloop.IOLoop')
|
|
|
def _loop_default(self):
|
|
|
return ioloop.IOLoop.instance()
|
|
|
|
|
|
# not settable:
|
|
|
hearts=Set()
|
|
|
responses=Set()
|
|
|
on_probation=Dict()
|
|
|
last_ping=CFloat(0)
|
|
|
_new_handlers = Set()
|
|
|
_failure_handlers = Set()
|
|
|
lifetime = CFloat(0)
|
|
|
tic = CFloat(0)
|
|
|
|
|
|
def __init__(self, **kwargs):
|
|
|
super(HeartMonitor, self).__init__(**kwargs)
|
|
|
|
|
|
self.pongstream.on_recv(self.handle_pong)
|
|
|
|
|
|
def start(self):
|
|
|
self.tic = time.time()
|
|
|
self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
|
|
|
self.caller.start()
|
|
|
|
|
|
def add_new_heart_handler(self, handler):
|
|
|
"""add a new handler for new hearts"""
|
|
|
self.log.debug("heartbeat::new_heart_handler: %s", handler)
|
|
|
self._new_handlers.add(handler)
|
|
|
|
|
|
def add_heart_failure_handler(self, handler):
|
|
|
"""add a new handler for heart failure"""
|
|
|
self.log.debug("heartbeat::new heart failure handler: %s", handler)
|
|
|
self._failure_handlers.add(handler)
|
|
|
|
|
|
def beat(self):
|
|
|
self.pongstream.flush()
|
|
|
self.last_ping = self.lifetime
|
|
|
|
|
|
toc = time.time()
|
|
|
self.lifetime += toc-self.tic
|
|
|
self.tic = toc
|
|
|
if self.debug:
|
|
|
self.log.debug("heartbeat::sending %s", self.lifetime)
|
|
|
goodhearts = self.hearts.intersection(self.responses)
|
|
|
missed_beats = self.hearts.difference(goodhearts)
|
|
|
newhearts = self.responses.difference(goodhearts)
|
|
|
for heart in newhearts:
|
|
|
self.handle_new_heart(heart)
|
|
|
heartfailures, on_probation = self._check_missed(missed_beats, self.on_probation,
|
|
|
self.hearts)
|
|
|
for failure in heartfailures:
|
|
|
self.handle_heart_failure(failure)
|
|
|
self.on_probation = on_probation
|
|
|
self.responses = set()
|
|
|
#print self.on_probation, self.hearts
|
|
|
# self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts))
|
|
|
self.pingstream.send(str_to_bytes(str(self.lifetime)))
|
|
|
# flush stream to force immediate socket send
|
|
|
self.pingstream.flush()
|
|
|
|
|
|
def _check_missed(self, missed_beats, on_probation, hearts):
|
|
|
"""Update heartbeats on probation, identifying any that have too many misses.
|
|
|
"""
|
|
|
failures = []
|
|
|
new_probation = {}
|
|
|
for cur_heart in (b for b in missed_beats if b in hearts):
|
|
|
miss_count = on_probation.get(cur_heart, 0) + 1
|
|
|
self.log.info("heartbeat::missed %s : %s" % (cur_heart, miss_count))
|
|
|
if miss_count > self.max_heartmonitor_misses:
|
|
|
failures.append(cur_heart)
|
|
|
else:
|
|
|
new_probation[cur_heart] = miss_count
|
|
|
return failures, new_probation
|
|
|
|
|
|
def handle_new_heart(self, heart):
|
|
|
if self._new_handlers:
|
|
|
for handler in self._new_handlers:
|
|
|
handler(heart)
|
|
|
else:
|
|
|
self.log.info("heartbeat::yay, got new heart %s!", heart)
|
|
|
self.hearts.add(heart)
|
|
|
|
|
|
def handle_heart_failure(self, heart):
|
|
|
if self._failure_handlers:
|
|
|
for handler in self._failure_handlers:
|
|
|
try:
|
|
|
handler(heart)
|
|
|
except Exception as e:
|
|
|
self.log.error("heartbeat::Bad Handler! %s", handler, exc_info=True)
|
|
|
pass
|
|
|
else:
|
|
|
self.log.info("heartbeat::Heart %s failed :(", heart)
|
|
|
self.hearts.remove(heart)
|
|
|
|
|
|
|
|
|
@log_errors
|
|
|
def handle_pong(self, msg):
|
|
|
"a heart just beat"
|
|
|
current = str_to_bytes(str(self.lifetime))
|
|
|
last = str_to_bytes(str(self.last_ping))
|
|
|
if msg[1] == current:
|
|
|
delta = time.time()-self.tic
|
|
|
if self.debug:
|
|
|
self.log.debug("heartbeat::heart %r took %.2f ms to respond", msg[0], 1000*delta)
|
|
|
self.responses.add(msg[0])
|
|
|
elif msg[1] == last:
|
|
|
delta = time.time()-self.tic + (self.lifetime-self.last_ping)
|
|
|
self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond", msg[0], 1000*delta)
|
|
|
self.responses.add(msg[0])
|
|
|
else:
|
|
|
self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)", msg[1], self.lifetime)
|
|
|
|
|
|
|