##// END OF EJS Templates
Improvements to dependency handling...
Improvements to dependency handling Specifically: * add 'success_only' switch to Dependencies * Scheduler handles some cases where Dependencies are impossible to meet.

File last commit:

r3604:2c044319
r3607:d51586b9
Show More
heartmonitor.py
156 lines | 5.4 KiB | text/x-python | PythonLexer
#!/usr/bin/env python
"""
A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB,
and hearts are tracked based on their XREQ identities.
"""
from __future__ import print_function
import time
import uuid
import logging
import zmq
from zmq.devices import ProcessDevice,ThreadDevice
from zmq.eventloop import ioloop, zmqstream
class Heart(object):
"""A basic heart object for responding to a HeartMonitor.
This is a simple wrapper with defaults for the most common
Device model for responding to heartbeats.
It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
SUB/XREQ for in/out.
You can specify the XREQ's IDENTITY via the optional heart_id argument."""
device=None
id=None
def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.XREQ, heart_id=None):
self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
self.device.daemon=True
self.device.connect_in(in_addr)
self.device.connect_out(out_addr)
if in_type == zmq.SUB:
self.device.setsockopt_in(zmq.SUBSCRIBE, "")
if heart_id is None:
heart_id = str(uuid.uuid4())
self.device.setsockopt_out(zmq.IDENTITY, heart_id)
self.id = heart_id
def start(self):
return self.device.start()
class HeartMonitor(object):
"""A basic HeartMonitor class
pingstream: a PUB stream
pongstream: an XREP stream
period: the period of the heartbeat in milliseconds"""
loop=None
pingstream=None
pongstream=None
period=None
hearts=None
on_probation=None
last_ping=None
# debug=False
def __init__(self, loop, pingstream, pongstream, period=1000):
self.loop = loop
self.period = period
self.pingstream = pingstream
self.pongstream = pongstream
self.pongstream.on_recv(self.handle_pong)
self.hearts = set()
self.responses = set()
self.on_probation = set()
self.lifetime = 0
self.tic = time.time()
self._new_handlers = set()
self._failure_handlers = set()
def start(self):
self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
self.caller.start()
def add_new_heart_handler(self, handler):
"""add a new handler for new hearts"""
logging.debug("heartbeat::new_heart_handler: %s"%handler)
self._new_handlers.add(handler)
def add_heart_failure_handler(self, handler):
"""add a new handler for heart failure"""
logging.debug("heartbeat::new heart failure handler: %s"%handler)
self._failure_handlers.add(handler)
def beat(self):
self.pongstream.flush()
self.last_ping = self.lifetime
toc = time.time()
self.lifetime += toc-self.tic
self.tic = toc
# logging.debug("heartbeat::%s"%self.lifetime)
goodhearts = self.hearts.intersection(self.responses)
missed_beats = self.hearts.difference(goodhearts)
heartfailures = self.on_probation.intersection(missed_beats)
newhearts = self.responses.difference(goodhearts)
map(self.handle_new_heart, newhearts)
map(self.handle_heart_failure, heartfailures)
self.on_probation = missed_beats.intersection(self.hearts)
self.responses = set()
# print self.on_probation, self.hearts
# logging.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts)))
self.pingstream.send(str(self.lifetime))
def handle_new_heart(self, heart):
if self._new_handlers:
for handler in self._new_handlers:
handler(heart)
else:
logging.info("heartbeat::yay, got new heart %s!"%heart)
self.hearts.add(heart)
def handle_heart_failure(self, heart):
if self._failure_handlers:
for handler in self._failure_handlers:
try:
handler(heart)
except Exception as e:
logging.error("heartbeat::Bad Handler! %s"%handler, exc_info=True)
pass
else:
logging.info("heartbeat::Heart %s failed :("%heart)
self.hearts.remove(heart)
def handle_pong(self, msg):
"a heart just beat"
if msg[1] == str(self.lifetime):
delta = time.time()-self.tic
# logging.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
self.responses.add(msg[0])
elif msg[1] == str(self.last_ping):
delta = time.time()-self.tic + (self.lifetime-self.last_ping)
logging.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta))
self.responses.add(msg[0])
else:
logging.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"%
(msg[1],self.lifetime))
if __name__ == '__main__':
loop = ioloop.IOLoop.instance()
context = zmq.Context()
pub = context.socket(zmq.PUB)
pub.bind('tcp://127.0.0.1:5555')
xrep = context.socket(zmq.XREP)
xrep.bind('tcp://127.0.0.1:5556')
outstream = zmqstream.ZMQStream(pub, loop)
instream = zmqstream.ZMQStream(xrep, loop)
hb = HeartMonitor(loop, outstream, instream)
loop.start()