##// END OF EJS Templates
update connections and diagrams for reduced sockets
update connections and diagrams for reduced sockets

File last commit:

r3642:a0bfb1f7
r3658:8fb951e7
Show More
heartmonitor.py
158 lines | 5.5 KiB | text/x-python | PythonLexer
MinRK
prep newparallel for rebase...
r3539 #!/usr/bin/env python
"""
A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB,
and hearts are tracked based on their XREQ identities.
"""
MinRK
use print_function
r3553 from __future__ import print_function
MinRK
prep newparallel for rebase...
r3539 import time
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 import logging
MinRK
resort imports in a cleaner order
r3631 import uuid
MinRK
prep newparallel for rebase...
r3539
import zmq
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589 from zmq.devices import ProcessDevice,ThreadDevice
MinRK
prep newparallel for rebase...
r3539 from zmq.eventloop import ioloop, zmqstream
MinRK
rework logging connections
r3610 from IPython.utils.traitlets import Set, Instance, CFloat, Bool
MinRK
eliminate relative imports
r3642 from .factory import LoggingFactory
MinRK
rework logging connections
r3610
MinRK
prep newparallel for rebase...
r3539 class Heart(object):
"""A basic heart object for responding to a HeartMonitor.
This is a simple wrapper with defaults for the most common
Device model for responding to heartbeats.
It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
SUB/XREQ for in/out.
You can specify the XREQ's IDENTITY via the optional heart_id argument."""
device=None
id=None
def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.XREQ, heart_id=None):
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
MinRK
Moved parallel test files to parallel subpackages...
r3573 self.device.daemon=True
MinRK
prep newparallel for rebase...
r3539 self.device.connect_in(in_addr)
self.device.connect_out(out_addr)
if in_type == zmq.SUB:
self.device.setsockopt_in(zmq.SUBSCRIBE, "")
if heart_id is None:
heart_id = str(uuid.uuid4())
self.device.setsockopt_out(zmq.IDENTITY, heart_id)
self.id = heart_id
def start(self):
return self.device.start()
MinRK
rework logging connections
r3610 class HeartMonitor(LoggingFactory):
MinRK
prep newparallel for rebase...
r3539 """A basic HeartMonitor class
pingstream: a PUB stream
pongstream: an XREP stream
period: the period of the heartbeat in milliseconds"""
MinRK
rework logging connections
r3610 period=CFloat(1000, config=True) # in milliseconds
pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
loop = Instance('zmq.eventloop.ioloop.IOLoop')
def _loop_default(self):
return ioloop.IOLoop.instance()
debug=Bool(False)
# not settable:
hearts=Set()
responses=Set()
on_probation=Set()
last_ping=CFloat(0)
_new_handlers = Set()
_failure_handlers = Set()
lifetime = CFloat(0)
tic = CFloat(0)
def __init__(self, **kwargs):
super(HeartMonitor, self).__init__(**kwargs)
MinRK
prep newparallel for rebase...
r3539
self.pongstream.on_recv(self.handle_pong)
def start(self):
self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
self.caller.start()
def add_new_heart_handler(self, handler):
"""add a new handler for new hearts"""
MinRK
rework logging connections
r3610 self.log.debug("heartbeat::new_heart_handler: %s"%handler)
MinRK
prep newparallel for rebase...
r3539 self._new_handlers.add(handler)
def add_heart_failure_handler(self, handler):
"""add a new handler for heart failure"""
MinRK
rework logging connections
r3610 self.log.debug("heartbeat::new heart failure handler: %s"%handler)
MinRK
prep newparallel for rebase...
r3539 self._failure_handlers.add(handler)
def beat(self):
MinRK
heartmonitor uses flush
r3542 self.pongstream.flush()
MinRK
prep newparallel for rebase...
r3539 self.last_ping = self.lifetime
toc = time.time()
self.lifetime += toc-self.tic
self.tic = toc
MinRK
rework logging connections
r3610 # self.log.debug("heartbeat::%s"%self.lifetime)
MinRK
prep newparallel for rebase...
r3539 goodhearts = self.hearts.intersection(self.responses)
missed_beats = self.hearts.difference(goodhearts)
heartfailures = self.on_probation.intersection(missed_beats)
newhearts = self.responses.difference(goodhearts)
map(self.handle_new_heart, newhearts)
map(self.handle_heart_failure, heartfailures)
self.on_probation = missed_beats.intersection(self.hearts)
self.responses = set()
# print self.on_probation, self.hearts
MinRK
rework logging connections
r3610 # self.log.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts)))
MinRK
prep newparallel for rebase...
r3539 self.pingstream.send(str(self.lifetime))
def handle_new_heart(self, heart):
if self._new_handlers:
for handler in self._new_handlers:
handler(heart)
else:
MinRK
rework logging connections
r3610 self.log.info("heartbeat::yay, got new heart %s!"%heart)
MinRK
prep newparallel for rebase...
r3539 self.hearts.add(heart)
def handle_heart_failure(self, heart):
if self._failure_handlers:
for handler in self._failure_handlers:
try:
handler(heart)
MinRK
general parallel code cleanup
r3556 except Exception as e:
MinRK
rework logging connections
r3610 self.log.error("heartbeat::Bad Handler! %s"%handler, exc_info=True)
MinRK
prep newparallel for rebase...
r3539 pass
else:
MinRK
rework logging connections
r3610 self.log.info("heartbeat::Heart %s failed :("%heart)
MinRK
prep newparallel for rebase...
r3539 self.hearts.remove(heart)
def handle_pong(self, msg):
"a heart just beat"
if msg[1] == str(self.lifetime):
delta = time.time()-self.tic
MinRK
rework logging connections
r3610 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
MinRK
prep newparallel for rebase...
r3539 self.responses.add(msg[0])
elif msg[1] == str(self.last_ping):
delta = time.time()-self.tic + (self.lifetime-self.last_ping)
MinRK
rework logging connections
r3610 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta))
MinRK
prep newparallel for rebase...
r3539 self.responses.add(msg[0])
else:
MinRK
rework logging connections
r3610 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"%
MinRK
prep newparallel for rebase...
r3539 (msg[1],self.lifetime))
if __name__ == '__main__':
loop = ioloop.IOLoop.instance()
context = zmq.Context()
pub = context.socket(zmq.PUB)
pub.bind('tcp://127.0.0.1:5555')
xrep = context.socket(zmq.XREP)
xrep.bind('tcp://127.0.0.1:5556')
outstream = zmqstream.ZMQStream(pub, loop)
instream = zmqstream.ZMQStream(xrep, loop)
hb = HeartMonitor(loop, outstream, instream)
loop.start()