heartmonitor.py
173 lines
| 6.0 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3539 | #!/usr/bin/env python | ||
""" | ||||
A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB, | ||||
and hearts are tracked based on their XREQ identities. | ||||
MinRK
|
r4018 | |||
Authors: | ||||
* Min RK | ||||
MinRK
|
r3539 | """ | ||
MinRK
|
r3660 | #----------------------------------------------------------------------------- | ||
# Copyright (C) 2010-2011 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3539 | |||
MinRK
|
r3553 | from __future__ import print_function | ||
MinRK
|
r3539 | import time | ||
MinRK
|
r3631 | import uuid | ||
MinRK
|
r3539 | |||
import zmq | ||||
MinRK
|
r3988 | from zmq.devices import ThreadDevice | ||
MinRK
|
r3539 | from zmq.eventloop import ioloop, zmqstream | ||
MinRK
|
r4016 | from IPython.config.configurable import LoggingConfigurable | ||
MinRK
|
r3988 | from IPython.utils.traitlets import Set, Instance, CFloat | ||
MinRK
|
r3610 | |||
MinRK
|
r4161 | from IPython.parallel.util import asbytes | ||
MinRK
|
r4155 | |||
MinRK
|
r3539 | class Heart(object): | ||
"""A basic heart object for responding to a HeartMonitor. | ||||
This is a simple wrapper with defaults for the most common | ||||
Device model for responding to heartbeats. | ||||
Bernardo B. Marques
|
r4872 | |||
It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using | ||||
MinRK
|
r3539 | SUB/XREQ for in/out. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | You can specify the XREQ's IDENTITY via the optional heart_id argument.""" | ||
device=None | ||||
id=None | ||||
MinRK
|
r4725 | def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.DEALER, heart_id=None): | ||
MinRK
|
r3589 | self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type) | ||
MinRK
|
r3573 | self.device.daemon=True | ||
MinRK
|
r3539 | self.device.connect_in(in_addr) | ||
self.device.connect_out(out_addr) | ||||
if in_type == zmq.SUB: | ||||
MinRK
|
r4155 | self.device.setsockopt_in(zmq.SUBSCRIBE, b"") | ||
MinRK
|
r3539 | if heart_id is None: | ||
MinRK
|
r4160 | heart_id = uuid.uuid4().bytes | ||
MinRK
|
r3539 | self.device.setsockopt_out(zmq.IDENTITY, heart_id) | ||
self.id = heart_id | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | def start(self): | ||
return self.device.start() | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4016 | class HeartMonitor(LoggingConfigurable): | ||
MinRK
|
r3539 | """A basic HeartMonitor class | ||
pingstream: a PUB stream | ||||
pongstream: an XREP stream | ||||
period: the period of the heartbeat in milliseconds""" | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3985 | period=CFloat(1000, config=True, | ||
help='The frequency at which the Hub pings the engines for heartbeats ' | ||||
' (in ms) [default: 100]', | ||||
) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3610 | pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream') | ||
pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream') | ||||
loop = Instance('zmq.eventloop.ioloop.IOLoop') | ||||
def _loop_default(self): | ||||
return ioloop.IOLoop.instance() | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3610 | # not settable: | ||
hearts=Set() | ||||
responses=Set() | ||||
on_probation=Set() | ||||
last_ping=CFloat(0) | ||||
_new_handlers = Set() | ||||
_failure_handlers = Set() | ||||
lifetime = CFloat(0) | ||||
tic = CFloat(0) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3610 | def __init__(self, **kwargs): | ||
super(HeartMonitor, self).__init__(**kwargs) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | self.pongstream.on_recv(self.handle_pong) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | def start(self): | ||
self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop) | ||||
self.caller.start() | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | def add_new_heart_handler(self, handler): | ||
"""add a new handler for new hearts""" | ||||
MinRK
|
r3610 | self.log.debug("heartbeat::new_heart_handler: %s"%handler) | ||
MinRK
|
r3539 | self._new_handlers.add(handler) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | def add_heart_failure_handler(self, handler): | ||
"""add a new handler for heart failure""" | ||||
MinRK
|
r3610 | self.log.debug("heartbeat::new heart failure handler: %s"%handler) | ||
MinRK
|
r3539 | self._failure_handlers.add(handler) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | def beat(self): | ||
Bernardo B. Marques
|
r4872 | self.pongstream.flush() | ||
MinRK
|
r3539 | self.last_ping = self.lifetime | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | toc = time.time() | ||
self.lifetime += toc-self.tic | ||||
self.tic = toc | ||||
MinRK
|
r3610 | # self.log.debug("heartbeat::%s"%self.lifetime) | ||
MinRK
|
r3539 | goodhearts = self.hearts.intersection(self.responses) | ||
missed_beats = self.hearts.difference(goodhearts) | ||||
heartfailures = self.on_probation.intersection(missed_beats) | ||||
newhearts = self.responses.difference(goodhearts) | ||||
map(self.handle_new_heart, newhearts) | ||||
map(self.handle_heart_failure, heartfailures) | ||||
self.on_probation = missed_beats.intersection(self.hearts) | ||||
self.responses = set() | ||||
# print self.on_probation, self.hearts | ||||
MinRK
|
r3610 | # self.log.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts))) | ||
MinRK
|
r4161 | self.pingstream.send(asbytes(str(self.lifetime))) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | def handle_new_heart(self, heart): | ||
if self._new_handlers: | ||||
for handler in self._new_handlers: | ||||
handler(heart) | ||||
else: | ||||
MinRK
|
r3610 | self.log.info("heartbeat::yay, got new heart %s!"%heart) | ||
MinRK
|
r3539 | self.hearts.add(heart) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | def handle_heart_failure(self, heart): | ||
if self._failure_handlers: | ||||
for handler in self._failure_handlers: | ||||
try: | ||||
handler(heart) | ||||
MinRK
|
r3556 | except Exception as e: | ||
MinRK
|
r3610 | self.log.error("heartbeat::Bad Handler! %s"%handler, exc_info=True) | ||
MinRK
|
r3539 | pass | ||
else: | ||||
MinRK
|
r3610 | self.log.info("heartbeat::Heart %s failed :("%heart) | ||
MinRK
|
r3539 | self.hearts.remove(heart) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | def handle_pong(self, msg): | ||
"a heart just beat" | ||||
MinRK
|
r4161 | current = asbytes(str(self.lifetime)) | ||
last = asbytes(str(self.last_ping)) | ||||
MinRK
|
r4155 | if msg[1] == current: | ||
MinRK
|
r3539 | delta = time.time()-self.tic | ||
MinRK
|
r3610 | # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta)) | ||
MinRK
|
r3539 | self.responses.add(msg[0]) | ||
MinRK
|
r4155 | elif msg[1] == last: | ||
MinRK
|
r3539 | delta = time.time()-self.tic + (self.lifetime-self.last_ping) | ||
MinRK
|
r3610 | self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta)) | ||
MinRK
|
r3539 | self.responses.add(msg[0]) | ||
else: | ||||
MinRK
|
r3610 | self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"% | ||
MinRK
|
r3539 | (msg[1],self.lifetime)) | ||
if __name__ == '__main__': | ||||
loop = ioloop.IOLoop.instance() | ||||
context = zmq.Context() | ||||
pub = context.socket(zmq.PUB) | ||||
pub.bind('tcp://127.0.0.1:5555') | ||||
MinRK
|
r4725 | xrep = context.socket(zmq.ROUTER) | ||
MinRK
|
r3539 | xrep.bind('tcp://127.0.0.1:5556') | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | outstream = zmqstream.ZMQStream(pub, loop) | ||
instream = zmqstream.ZMQStream(xrep, loop) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | hb = HeartMonitor(loop, outstream, instream) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | loop.start() | ||