##// END OF EJS Templates
tooltip on <tab>
tooltip on <tab>

File last commit:

r5387:d0a2c650
r5398:c06eaa65
Show More
heartmonitor.py
173 lines | 5.9 KiB | text/x-python | PythonLexer
MinRK
prep newparallel for rebase...
r3539 #!/usr/bin/env python
"""
A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB,
and hearts are tracked based on their XREQ identities.
MinRK
update recently changed modules with Authors in docstring
r4018
Authors:
* Min RK
MinRK
prep newparallel for rebase...
r3539 """
MinRK
copyright statements
r3660 #-----------------------------------------------------------------------------
# Copyright (C) 2010-2011 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
MinRK
prep newparallel for rebase...
r3539
MinRK
use print_function
r3553 from __future__ import print_function
MinRK
prep newparallel for rebase...
r3539 import time
MinRK
resort imports in a cleaner order
r3631 import uuid
MinRK
prep newparallel for rebase...
r3539
import zmq
MinRK
cleanup parallel traits...
r3988 from zmq.devices import ThreadDevice
MinRK
prep newparallel for rebase...
r3539 from zmq.eventloop import ioloop, zmqstream
MinRK
add LoggingConfigurable base class
r4016 from IPython.config.configurable import LoggingConfigurable
MinRK
cleanup parallel traits...
r3988 from IPython.utils.traitlets import Set, Instance, CFloat
MinRK
rework logging connections
r3610
MinRK
cleanup per review...
r4161 from IPython.parallel.util import asbytes
MinRK
update parallel code for py3k...
r4155
MinRK
prep newparallel for rebase...
r3539 class Heart(object):
"""A basic heart object for responding to a HeartMonitor.
This is a simple wrapper with defaults for the most common
Device model for responding to heartbeats.
Bernardo B. Marques
remove all trailling spaces
r4872
It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
MinRK
prep newparallel for rebase...
r3539 SUB/XREQ for in/out.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 You can specify the XREQ's IDENTITY via the optional heart_id argument."""
device=None
id=None
MinRK
use ROUTER/DEALER socket names instead of XREP/XREQ...
r4725 def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.DEALER, heart_id=None):
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
MinRK
Moved parallel test files to parallel subpackages...
r3573 self.device.daemon=True
MinRK
prep newparallel for rebase...
r3539 self.device.connect_in(in_addr)
self.device.connect_out(out_addr)
if in_type == zmq.SUB:
MinRK
update parallel code for py3k...
r4155 self.device.setsockopt_in(zmq.SUBSCRIBE, b"")
MinRK
prep newparallel for rebase...
r3539 if heart_id is None:
MinRK
enforce ascii identities in parallel code...
r4160 heart_id = uuid.uuid4().bytes
MinRK
prep newparallel for rebase...
r3539 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
self.id = heart_id
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 def start(self):
return self.device.start()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add LoggingConfigurable base class
r4016 class HeartMonitor(LoggingConfigurable):
MinRK
prep newparallel for rebase...
r3539 """A basic HeartMonitor class
pingstream: a PUB stream
pongstream: an XREP stream
period: the period of the heartbeat in milliseconds"""
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 period=CFloat(1000, config=True,
help='The frequency at which the Hub pings the engines for heartbeats '
MinRK
fix typo in HeartMonitor.period helpstring
r5387 '(in ms)',
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 )
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
rework logging connections
r3610 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
loop = Instance('zmq.eventloop.ioloop.IOLoop')
def _loop_default(self):
return ioloop.IOLoop.instance()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
rework logging connections
r3610 # not settable:
hearts=Set()
responses=Set()
on_probation=Set()
last_ping=CFloat(0)
_new_handlers = Set()
_failure_handlers = Set()
lifetime = CFloat(0)
tic = CFloat(0)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
rework logging connections
r3610 def __init__(self, **kwargs):
super(HeartMonitor, self).__init__(**kwargs)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 self.pongstream.on_recv(self.handle_pong)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 def start(self):
self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
self.caller.start()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 def add_new_heart_handler(self, handler):
"""add a new handler for new hearts"""
MinRK
rework logging connections
r3610 self.log.debug("heartbeat::new_heart_handler: %s"%handler)
MinRK
prep newparallel for rebase...
r3539 self._new_handlers.add(handler)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 def add_heart_failure_handler(self, handler):
"""add a new handler for heart failure"""
MinRK
rework logging connections
r3610 self.log.debug("heartbeat::new heart failure handler: %s"%handler)
MinRK
prep newparallel for rebase...
r3539 self._failure_handlers.add(handler)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 def beat(self):
Bernardo B. Marques
remove all trailling spaces
r4872 self.pongstream.flush()
MinRK
prep newparallel for rebase...
r3539 self.last_ping = self.lifetime
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 toc = time.time()
self.lifetime += toc-self.tic
self.tic = toc
MinRK
rework logging connections
r3610 # self.log.debug("heartbeat::%s"%self.lifetime)
MinRK
prep newparallel for rebase...
r3539 goodhearts = self.hearts.intersection(self.responses)
missed_beats = self.hearts.difference(goodhearts)
heartfailures = self.on_probation.intersection(missed_beats)
newhearts = self.responses.difference(goodhearts)
map(self.handle_new_heart, newhearts)
map(self.handle_heart_failure, heartfailures)
self.on_probation = missed_beats.intersection(self.hearts)
self.responses = set()
# print self.on_probation, self.hearts
MinRK
rework logging connections
r3610 # self.log.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts)))
MinRK
cleanup per review...
r4161 self.pingstream.send(asbytes(str(self.lifetime)))
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 def handle_new_heart(self, heart):
if self._new_handlers:
for handler in self._new_handlers:
handler(heart)
else:
MinRK
rework logging connections
r3610 self.log.info("heartbeat::yay, got new heart %s!"%heart)
MinRK
prep newparallel for rebase...
r3539 self.hearts.add(heart)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 def handle_heart_failure(self, heart):
if self._failure_handlers:
for handler in self._failure_handlers:
try:
handler(heart)
MinRK
general parallel code cleanup
r3556 except Exception as e:
MinRK
rework logging connections
r3610 self.log.error("heartbeat::Bad Handler! %s"%handler, exc_info=True)
MinRK
prep newparallel for rebase...
r3539 pass
else:
MinRK
rework logging connections
r3610 self.log.info("heartbeat::Heart %s failed :("%heart)
MinRK
prep newparallel for rebase...
r3539 self.hearts.remove(heart)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 def handle_pong(self, msg):
"a heart just beat"
MinRK
cleanup per review...
r4161 current = asbytes(str(self.lifetime))
last = asbytes(str(self.last_ping))
MinRK
update parallel code for py3k...
r4155 if msg[1] == current:
MinRK
prep newparallel for rebase...
r3539 delta = time.time()-self.tic
MinRK
rework logging connections
r3610 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
MinRK
prep newparallel for rebase...
r3539 self.responses.add(msg[0])
MinRK
update parallel code for py3k...
r4155 elif msg[1] == last:
MinRK
prep newparallel for rebase...
r3539 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
MinRK
rework logging connections
r3610 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta))
MinRK
prep newparallel for rebase...
r3539 self.responses.add(msg[0])
else:
MinRK
rework logging connections
r3610 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"%
MinRK
prep newparallel for rebase...
r3539 (msg[1],self.lifetime))
if __name__ == '__main__':
loop = ioloop.IOLoop.instance()
context = zmq.Context()
pub = context.socket(zmq.PUB)
pub.bind('tcp://127.0.0.1:5555')
MinRK
use ROUTER/DEALER socket names instead of XREP/XREQ...
r4725 xrep = context.socket(zmq.ROUTER)
MinRK
prep newparallel for rebase...
r3539 xrep.bind('tcp://127.0.0.1:5556')
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 outstream = zmqstream.ZMQStream(pub, loop)
instream = zmqstream.ZMQStream(xrep, loop)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 hb = HeartMonitor(loop, outstream, instream)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 loop.start()