##// END OF EJS Templates
scheduler progress
scheduler progress

File last commit:

r3542:69e9c2c5
r3551:7f42766d
Show More
heartmonitor.py
169 lines | 5.7 KiB | text/x-python | PythonLexer
#!/usr/bin/env python
"""
A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB,
and hearts are tracked based on their XREQ identities.
"""
import time
import uuid
import zmq
from zmq.devices import ProcessDevice
from zmq.eventloop import ioloop, zmqstream
#internal
from IPython.zmq.log import logger
class Heart(object):
"""A basic heart object for responding to a HeartMonitor.
This is a simple wrapper with defaults for the most common
Device model for responding to heartbeats.
It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
SUB/XREQ for in/out.
You can specify the XREQ's IDENTITY via the optional heart_id argument."""
device=None
id=None
def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.XREQ, heart_id=None):
self.device = ProcessDevice(zmq.FORWARDER, in_type, out_type)
self.device.connect_in(in_addr)
self.device.connect_out(out_addr)
if in_type == zmq.SUB:
self.device.setsockopt_in(zmq.SUBSCRIBE, "")
if heart_id is None:
heart_id = str(uuid.uuid4())
self.device.setsockopt_out(zmq.IDENTITY, heart_id)
self.id = heart_id
def start(self):
return self.device.start()
class HeartMonitor(object):
"""A basic HeartMonitor class
pingstream: a PUB stream
pongstream: an XREP stream
period: the period of the heartbeat in milliseconds"""
loop=None
pingstream=None
pongstream=None
period=None
hearts=None
on_probation=None
last_ping=None
def __init__(self, loop, pingstream, pongstream, period=1000):
self.loop = loop
self.period = period
self.pingstream = pingstream
self.pongstream = pongstream
self.pongstream.on_recv(self.handle_pong)
self.hearts = set()
self.responses = set()
self.on_probation = set()
self.lifetime = 0
self.tic = time.time()
self._new_handlers = set()
self._failure_handlers = set()
def start(self):
self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
self.caller.start()
def add_new_heart_handler(self, handler):
"""add a new handler for new hearts"""
logger.debug("heartbeat::new_heart_handler: %s"%handler)
self._new_handlers.add(handler)
def add_heart_failure_handler(self, handler):
"""add a new handler for heart failure"""
logger.debug("heartbeat::new heart failure handler: %s"%handler)
self._failure_handlers.add(handler)
# def _flush(self):
# """override IOLoop triggers"""
# while True:
# try:
# msg = self.pongstream.socket.recv_multipart(zmq.NOBLOCK)
# logger.warn("IOLoop triggered beat with incoming heartbeat waiting to be handled")
# except zmq.ZMQError:
# return
# else:
# self.handle_pong(msg)
# # print '.'
#
def beat(self):
self.pongstream.flush()
self.last_ping = self.lifetime
toc = time.time()
self.lifetime += toc-self.tic
self.tic = toc
logger.debug("heartbeat::%s"%self.lifetime)
goodhearts = self.hearts.intersection(self.responses)
missed_beats = self.hearts.difference(goodhearts)
heartfailures = self.on_probation.intersection(missed_beats)
newhearts = self.responses.difference(goodhearts)
map(self.handle_new_heart, newhearts)
map(self.handle_heart_failure, heartfailures)
self.on_probation = missed_beats.intersection(self.hearts)
self.responses = set()
# print self.on_probation, self.hearts
# logger.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts)))
self.pingstream.send(str(self.lifetime))
def handle_new_heart(self, heart):
if self._new_handlers:
for handler in self._new_handlers:
handler(heart)
else:
logger.info("heartbeat::yay, got new heart %s!"%heart)
self.hearts.add(heart)
def handle_heart_failure(self, heart):
if self._failure_handlers:
for handler in self._failure_handlers:
try:
handler(heart)
except Exception, e:
print e
logger.error("heartbeat::Bad Handler! %s"%handler)
pass
else:
logger.info("heartbeat::Heart %s failed :("%heart)
self.hearts.remove(heart)
def handle_pong(self, msg):
"a heart just beat"
if msg[1] == str(self.lifetime):
delta = time.time()-self.tic
logger.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
self.responses.add(msg[0])
elif msg[1] == str(self.last_ping):
delta = time.time()-self.tic + (self.lifetime-self.last_ping)
logger.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta))
self.responses.add(msg[0])
else:
logger.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"%
(msg[1],self.lifetime))
if __name__ == '__main__':
loop = ioloop.IOLoop.instance()
context = zmq.Context()
pub = context.socket(zmq.PUB)
pub.bind('tcp://127.0.0.1:5555')
xrep = context.socket(zmq.XREP)
xrep.bind('tcp://127.0.0.1:5556')
outstream = zmqstream.ZMQStream(pub, loop)
instream = zmqstream.ZMQStream(xrep, loop)
hb = HeartMonitor(loop, outstream, instream)
loop.start()