heartmonitor.py
156 lines
| 5.4 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3539 | #!/usr/bin/env python | ||
""" | ||||
A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB, | ||||
and hearts are tracked based on their XREQ identities. | ||||
""" | ||||
MinRK
|
r3553 | from __future__ import print_function | ||
MinRK
|
r3539 | import time | ||
import uuid | ||||
MinRK
|
r3603 | import logging | ||
MinRK
|
r3539 | |||
import zmq | ||||
MinRK
|
r3589 | from zmq.devices import ProcessDevice,ThreadDevice | ||
MinRK
|
r3539 | from zmq.eventloop import ioloop, zmqstream | ||
class Heart(object): | ||||
"""A basic heart object for responding to a HeartMonitor. | ||||
This is a simple wrapper with defaults for the most common | ||||
Device model for responding to heartbeats. | ||||
It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using | ||||
SUB/XREQ for in/out. | ||||
You can specify the XREQ's IDENTITY via the optional heart_id argument.""" | ||||
device=None | ||||
id=None | ||||
def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.XREQ, heart_id=None): | ||||
MinRK
|
r3589 | self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type) | ||
MinRK
|
r3573 | self.device.daemon=True | ||
MinRK
|
r3539 | self.device.connect_in(in_addr) | ||
self.device.connect_out(out_addr) | ||||
if in_type == zmq.SUB: | ||||
self.device.setsockopt_in(zmq.SUBSCRIBE, "") | ||||
if heart_id is None: | ||||
heart_id = str(uuid.uuid4()) | ||||
self.device.setsockopt_out(zmq.IDENTITY, heart_id) | ||||
self.id = heart_id | ||||
def start(self): | ||||
return self.device.start() | ||||
class HeartMonitor(object): | ||||
"""A basic HeartMonitor class | ||||
pingstream: a PUB stream | ||||
pongstream: an XREP stream | ||||
period: the period of the heartbeat in milliseconds""" | ||||
loop=None | ||||
pingstream=None | ||||
pongstream=None | ||||
period=None | ||||
hearts=None | ||||
on_probation=None | ||||
last_ping=None | ||||
MinRK
|
r3603 | # debug=False | ||
MinRK
|
r3539 | |||
def __init__(self, loop, pingstream, pongstream, period=1000): | ||||
self.loop = loop | ||||
self.period = period | ||||
self.pingstream = pingstream | ||||
self.pongstream = pongstream | ||||
self.pongstream.on_recv(self.handle_pong) | ||||
self.hearts = set() | ||||
self.responses = set() | ||||
self.on_probation = set() | ||||
self.lifetime = 0 | ||||
self.tic = time.time() | ||||
self._new_handlers = set() | ||||
self._failure_handlers = set() | ||||
def start(self): | ||||
self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop) | ||||
self.caller.start() | ||||
def add_new_heart_handler(self, handler): | ||||
"""add a new handler for new hearts""" | ||||
MinRK
|
r3604 | logging.debug("heartbeat::new_heart_handler: %s"%handler) | ||
MinRK
|
r3539 | self._new_handlers.add(handler) | ||
def add_heart_failure_handler(self, handler): | ||||
"""add a new handler for heart failure""" | ||||
MinRK
|
r3604 | logging.debug("heartbeat::new heart failure handler: %s"%handler) | ||
MinRK
|
r3539 | self._failure_handlers.add(handler) | ||
def beat(self): | ||||
MinRK
|
r3542 | self.pongstream.flush() | ||
MinRK
|
r3539 | self.last_ping = self.lifetime | ||
toc = time.time() | ||||
self.lifetime += toc-self.tic | ||||
self.tic = toc | ||||
MinRK
|
r3604 | # logging.debug("heartbeat::%s"%self.lifetime) | ||
MinRK
|
r3539 | goodhearts = self.hearts.intersection(self.responses) | ||
missed_beats = self.hearts.difference(goodhearts) | ||||
heartfailures = self.on_probation.intersection(missed_beats) | ||||
newhearts = self.responses.difference(goodhearts) | ||||
map(self.handle_new_heart, newhearts) | ||||
map(self.handle_heart_failure, heartfailures) | ||||
self.on_probation = missed_beats.intersection(self.hearts) | ||||
self.responses = set() | ||||
# print self.on_probation, self.hearts | ||||
MinRK
|
r3604 | # logging.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts))) | ||
MinRK
|
r3539 | self.pingstream.send(str(self.lifetime)) | ||
def handle_new_heart(self, heart): | ||||
if self._new_handlers: | ||||
for handler in self._new_handlers: | ||||
handler(heart) | ||||
else: | ||||
MinRK
|
r3604 | logging.info("heartbeat::yay, got new heart %s!"%heart) | ||
MinRK
|
r3539 | self.hearts.add(heart) | ||
def handle_heart_failure(self, heart): | ||||
if self._failure_handlers: | ||||
for handler in self._failure_handlers: | ||||
try: | ||||
handler(heart) | ||||
MinRK
|
r3556 | except Exception as e: | ||
MinRK
|
r3604 | logging.error("heartbeat::Bad Handler! %s"%handler, exc_info=True) | ||
MinRK
|
r3539 | pass | ||
else: | ||||
MinRK
|
r3604 | logging.info("heartbeat::Heart %s failed :("%heart) | ||
MinRK
|
r3539 | self.hearts.remove(heart) | ||
def handle_pong(self, msg): | ||||
"a heart just beat" | ||||
if msg[1] == str(self.lifetime): | ||||
delta = time.time()-self.tic | ||||
MinRK
|
r3604 | # logging.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta)) | ||
MinRK
|
r3539 | self.responses.add(msg[0]) | ||
elif msg[1] == str(self.last_ping): | ||||
delta = time.time()-self.tic + (self.lifetime-self.last_ping) | ||||
MinRK
|
r3604 | logging.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta)) | ||
MinRK
|
r3539 | self.responses.add(msg[0]) | ||
else: | ||||
MinRK
|
r3604 | logging.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"% | ||
MinRK
|
r3539 | (msg[1],self.lifetime)) | ||
if __name__ == '__main__': | ||||
loop = ioloop.IOLoop.instance() | ||||
context = zmq.Context() | ||||
pub = context.socket(zmq.PUB) | ||||
pub.bind('tcp://127.0.0.1:5555') | ||||
xrep = context.socket(zmq.XREP) | ||||
xrep.bind('tcp://127.0.0.1:5556') | ||||
outstream = zmqstream.ZMQStream(pub, loop) | ||||
instream = zmqstream.ZMQStream(xrep, loop) | ||||
hb = HeartMonitor(loop, outstream, instream) | ||||
loop.start() | ||||