|
|
#!/usr/bin/env python
|
|
|
"""
|
|
|
A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB,
|
|
|
and hearts are tracked based on their XREQ identities.
|
|
|
"""
|
|
|
|
|
|
import time
|
|
|
import uuid
|
|
|
|
|
|
import zmq
|
|
|
from zmq.devices import ProcessDevice
|
|
|
from zmq.eventloop import ioloop, zmqstream
|
|
|
|
|
|
#internal
|
|
|
from IPython.zmq.log import logger
|
|
|
|
|
|
class Heart(object):
|
|
|
"""A basic heart object for responding to a HeartMonitor.
|
|
|
This is a simple wrapper with defaults for the most common
|
|
|
Device model for responding to heartbeats.
|
|
|
|
|
|
It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
|
|
|
SUB/XREQ for in/out.
|
|
|
|
|
|
You can specify the XREQ's IDENTITY via the optional heart_id argument."""
|
|
|
device=None
|
|
|
id=None
|
|
|
def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.XREQ, heart_id=None):
|
|
|
self.device = ProcessDevice(zmq.FORWARDER, in_type, out_type)
|
|
|
self.device.connect_in(in_addr)
|
|
|
self.device.connect_out(out_addr)
|
|
|
if in_type == zmq.SUB:
|
|
|
self.device.setsockopt_in(zmq.SUBSCRIBE, "")
|
|
|
if heart_id is None:
|
|
|
heart_id = str(uuid.uuid4())
|
|
|
self.device.setsockopt_out(zmq.IDENTITY, heart_id)
|
|
|
self.id = heart_id
|
|
|
|
|
|
def start(self):
|
|
|
return self.device.start()
|
|
|
|
|
|
class HeartMonitor(object):
|
|
|
"""A basic HeartMonitor class
|
|
|
pingstream: a PUB stream
|
|
|
pongstream: an XREP stream
|
|
|
period: the period of the heartbeat in milliseconds"""
|
|
|
loop=None
|
|
|
pingstream=None
|
|
|
pongstream=None
|
|
|
period=None
|
|
|
hearts=None
|
|
|
on_probation=None
|
|
|
last_ping=None
|
|
|
|
|
|
def __init__(self, loop, pingstream, pongstream, period=1000):
|
|
|
self.loop = loop
|
|
|
self.period = period
|
|
|
|
|
|
self.pingstream = pingstream
|
|
|
self.pongstream = pongstream
|
|
|
self.pongstream.on_recv(self.handle_pong)
|
|
|
|
|
|
self.hearts = set()
|
|
|
self.responses = set()
|
|
|
self.on_probation = set()
|
|
|
self.lifetime = 0
|
|
|
self.tic = time.time()
|
|
|
|
|
|
self._new_handlers = set()
|
|
|
self._failure_handlers = set()
|
|
|
|
|
|
def start(self):
|
|
|
self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
|
|
|
self.caller.start()
|
|
|
|
|
|
def add_new_heart_handler(self, handler):
|
|
|
"""add a new handler for new hearts"""
|
|
|
logger.debug("heartbeat::new_heart_handler: %s"%handler)
|
|
|
self._new_handlers.add(handler)
|
|
|
|
|
|
def add_heart_failure_handler(self, handler):
|
|
|
"""add a new handler for heart failure"""
|
|
|
logger.debug("heartbeat::new heart failure handler: %s"%handler)
|
|
|
self._failure_handlers.add(handler)
|
|
|
|
|
|
# def _flush(self):
|
|
|
# """override IOLoop triggers"""
|
|
|
# while True:
|
|
|
# try:
|
|
|
# msg = self.pongstream.socket.recv_multipart(zmq.NOBLOCK)
|
|
|
# logger.warn("IOLoop triggered beat with incoming heartbeat waiting to be handled")
|
|
|
# except zmq.ZMQError:
|
|
|
# return
|
|
|
# else:
|
|
|
# self.handle_pong(msg)
|
|
|
# # print '.'
|
|
|
#
|
|
|
|
|
|
def beat(self):
|
|
|
self.pongstream.flush()
|
|
|
self.last_ping = self.lifetime
|
|
|
|
|
|
toc = time.time()
|
|
|
self.lifetime += toc-self.tic
|
|
|
self.tic = toc
|
|
|
logger.debug("heartbeat::%s"%self.lifetime)
|
|
|
goodhearts = self.hearts.intersection(self.responses)
|
|
|
missed_beats = self.hearts.difference(goodhearts)
|
|
|
heartfailures = self.on_probation.intersection(missed_beats)
|
|
|
newhearts = self.responses.difference(goodhearts)
|
|
|
map(self.handle_new_heart, newhearts)
|
|
|
map(self.handle_heart_failure, heartfailures)
|
|
|
self.on_probation = missed_beats.intersection(self.hearts)
|
|
|
self.responses = set()
|
|
|
# print self.on_probation, self.hearts
|
|
|
# logger.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts)))
|
|
|
self.pingstream.send(str(self.lifetime))
|
|
|
|
|
|
def handle_new_heart(self, heart):
|
|
|
if self._new_handlers:
|
|
|
for handler in self._new_handlers:
|
|
|
handler(heart)
|
|
|
else:
|
|
|
logger.info("heartbeat::yay, got new heart %s!"%heart)
|
|
|
self.hearts.add(heart)
|
|
|
|
|
|
def handle_heart_failure(self, heart):
|
|
|
if self._failure_handlers:
|
|
|
for handler in self._failure_handlers:
|
|
|
try:
|
|
|
handler(heart)
|
|
|
except Exception, e:
|
|
|
print e
|
|
|
logger.error("heartbeat::Bad Handler! %s"%handler)
|
|
|
pass
|
|
|
else:
|
|
|
logger.info("heartbeat::Heart %s failed :("%heart)
|
|
|
self.hearts.remove(heart)
|
|
|
|
|
|
|
|
|
def handle_pong(self, msg):
|
|
|
"a heart just beat"
|
|
|
if msg[1] == str(self.lifetime):
|
|
|
delta = time.time()-self.tic
|
|
|
logger.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
|
|
|
self.responses.add(msg[0])
|
|
|
elif msg[1] == str(self.last_ping):
|
|
|
delta = time.time()-self.tic + (self.lifetime-self.last_ping)
|
|
|
logger.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta))
|
|
|
self.responses.add(msg[0])
|
|
|
else:
|
|
|
logger.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"%
|
|
|
(msg[1],self.lifetime))
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
loop = ioloop.IOLoop.instance()
|
|
|
context = zmq.Context()
|
|
|
pub = context.socket(zmq.PUB)
|
|
|
pub.bind('tcp://127.0.0.1:5555')
|
|
|
xrep = context.socket(zmq.XREP)
|
|
|
xrep.bind('tcp://127.0.0.1:5556')
|
|
|
|
|
|
outstream = zmqstream.ZMQStream(pub, loop)
|
|
|
instream = zmqstream.ZMQStream(xrep, loop)
|
|
|
|
|
|
hb = HeartMonitor(loop, outstream, instream)
|
|
|
|
|
|
loop.start()
|
|
|
|