Show More
heartmonitor.py
163 lines
| 5.9 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3539 | #!/usr/bin/env python | ||
""" | ||||
A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB, | ||||
and hearts are tracked based on their XREQ identities. | ||||
""" | ||||
MinRK
|
r3660 | #----------------------------------------------------------------------------- | ||
# Copyright (C) 2010-2011 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3539 | |||
MinRK
|
r3553 | from __future__ import print_function | ||
MinRK
|
r3539 | import time | ||
MinRK
|
r3631 | import uuid | ||
MinRK
|
r3539 | |||
import zmq | ||||
MinRK
|
r3673 | from zmq.devices import ProcessDevice, ThreadDevice | ||
MinRK
|
r3539 | from zmq.eventloop import ioloop, zmqstream | ||
MinRK
|
r3610 | from IPython.utils.traitlets import Set, Instance, CFloat, Bool | ||
MinRK
|
r3673 | from IPython.parallel.factory import LoggingFactory | ||
MinRK
|
r3610 | |||
MinRK
|
r3539 | class Heart(object): | ||
"""A basic heart object for responding to a HeartMonitor. | ||||
This is a simple wrapper with defaults for the most common | ||||
Device model for responding to heartbeats. | ||||
It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using | ||||
SUB/XREQ for in/out. | ||||
You can specify the XREQ's IDENTITY via the optional heart_id argument.""" | ||||
device=None | ||||
id=None | ||||
def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.XREQ, heart_id=None): | ||||
MinRK
|
r3589 | self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type) | ||
MinRK
|
r3573 | self.device.daemon=True | ||
MinRK
|
r3539 | self.device.connect_in(in_addr) | ||
self.device.connect_out(out_addr) | ||||
if in_type == zmq.SUB: | ||||
self.device.setsockopt_in(zmq.SUBSCRIBE, "") | ||||
if heart_id is None: | ||||
heart_id = str(uuid.uuid4()) | ||||
self.device.setsockopt_out(zmq.IDENTITY, heart_id) | ||||
self.id = heart_id | ||||
def start(self): | ||||
return self.device.start() | ||||
MinRK
|
r3610 | class HeartMonitor(LoggingFactory): | ||
MinRK
|
r3539 | """A basic HeartMonitor class | ||
pingstream: a PUB stream | ||||
pongstream: an XREP stream | ||||
period: the period of the heartbeat in milliseconds""" | ||||
MinRK
|
r3610 | period=CFloat(1000, config=True) # in milliseconds | ||
pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream') | ||||
pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream') | ||||
loop = Instance('zmq.eventloop.ioloop.IOLoop') | ||||
def _loop_default(self): | ||||
return ioloop.IOLoop.instance() | ||||
debug=Bool(False) | ||||
# not settable: | ||||
hearts=Set() | ||||
responses=Set() | ||||
on_probation=Set() | ||||
last_ping=CFloat(0) | ||||
_new_handlers = Set() | ||||
_failure_handlers = Set() | ||||
lifetime = CFloat(0) | ||||
tic = CFloat(0) | ||||
def __init__(self, **kwargs): | ||||
super(HeartMonitor, self).__init__(**kwargs) | ||||
MinRK
|
r3539 | |||
self.pongstream.on_recv(self.handle_pong) | ||||
def start(self): | ||||
self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop) | ||||
self.caller.start() | ||||
def add_new_heart_handler(self, handler): | ||||
"""add a new handler for new hearts""" | ||||
MinRK
|
r3610 | self.log.debug("heartbeat::new_heart_handler: %s"%handler) | ||
MinRK
|
r3539 | self._new_handlers.add(handler) | ||
def add_heart_failure_handler(self, handler): | ||||
"""add a new handler for heart failure""" | ||||
MinRK
|
r3610 | self.log.debug("heartbeat::new heart failure handler: %s"%handler) | ||
MinRK
|
r3539 | self._failure_handlers.add(handler) | ||
def beat(self): | ||||
MinRK
|
r3542 | self.pongstream.flush() | ||
MinRK
|
r3539 | self.last_ping = self.lifetime | ||
toc = time.time() | ||||
self.lifetime += toc-self.tic | ||||
self.tic = toc | ||||
MinRK
|
r3610 | # self.log.debug("heartbeat::%s"%self.lifetime) | ||
MinRK
|
r3539 | goodhearts = self.hearts.intersection(self.responses) | ||
missed_beats = self.hearts.difference(goodhearts) | ||||
heartfailures = self.on_probation.intersection(missed_beats) | ||||
newhearts = self.responses.difference(goodhearts) | ||||
map(self.handle_new_heart, newhearts) | ||||
map(self.handle_heart_failure, heartfailures) | ||||
self.on_probation = missed_beats.intersection(self.hearts) | ||||
self.responses = set() | ||||
# print self.on_probation, self.hearts | ||||
MinRK
|
r3610 | # self.log.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts))) | ||
MinRK
|
r3539 | self.pingstream.send(str(self.lifetime)) | ||
def handle_new_heart(self, heart): | ||||
if self._new_handlers: | ||||
for handler in self._new_handlers: | ||||
handler(heart) | ||||
else: | ||||
MinRK
|
r3610 | self.log.info("heartbeat::yay, got new heart %s!"%heart) | ||
MinRK
|
r3539 | self.hearts.add(heart) | ||
def handle_heart_failure(self, heart): | ||||
if self._failure_handlers: | ||||
for handler in self._failure_handlers: | ||||
try: | ||||
handler(heart) | ||||
MinRK
|
r3556 | except Exception as e: | ||
MinRK
|
r3610 | self.log.error("heartbeat::Bad Handler! %s"%handler, exc_info=True) | ||
MinRK
|
r3539 | pass | ||
else: | ||||
MinRK
|
r3610 | self.log.info("heartbeat::Heart %s failed :("%heart) | ||
MinRK
|
r3539 | self.hearts.remove(heart) | ||
def handle_pong(self, msg): | ||||
"a heart just beat" | ||||
if msg[1] == str(self.lifetime): | ||||
delta = time.time()-self.tic | ||||
MinRK
|
r3610 | # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta)) | ||
MinRK
|
r3539 | self.responses.add(msg[0]) | ||
elif msg[1] == str(self.last_ping): | ||||
delta = time.time()-self.tic + (self.lifetime-self.last_ping) | ||||
MinRK
|
r3610 | self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta)) | ||
MinRK
|
r3539 | self.responses.add(msg[0]) | ||
else: | ||||
MinRK
|
r3610 | self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"% | ||
MinRK
|
r3539 | (msg[1],self.lifetime)) | ||
if __name__ == '__main__': | ||||
loop = ioloop.IOLoop.instance() | ||||
context = zmq.Context() | ||||
pub = context.socket(zmq.PUB) | ||||
pub.bind('tcp://127.0.0.1:5555') | ||||
xrep = context.socket(zmq.XREP) | ||||
xrep.bind('tcp://127.0.0.1:5556') | ||||
outstream = zmqstream.ZMQStream(pub, loop) | ||||
instream = zmqstream.ZMQStream(xrep, loop) | ||||
hb = HeartMonitor(loop, outstream, instream) | ||||
loop.start() | ||||