heartmonitor.py
190 lines
| 7.1 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3539 | #!/usr/bin/env python | ||
""" | ||||
MinRK
|
r7538 | A multi-heart Heartbeat system using PUB and ROUTER sockets. pings are sent out on the PUB, | ||
and hearts are tracked based on their DEALER identities. | ||||
MinRK
|
r4018 | |||
Authors: | ||||
* Min RK | ||||
MinRK
|
r3539 | """ | ||
MinRK
|
r3660 | #----------------------------------------------------------------------------- | ||
# Copyright (C) 2010-2011 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3539 | |||
MinRK
|
r3553 | from __future__ import print_function | ||
MinRK
|
r3539 | import time | ||
MinRK
|
r3631 | import uuid | ||
MinRK
|
r3539 | |||
import zmq | ||||
Jan Schulz
|
r8282 | from zmq.devices import ThreadDevice, ThreadMonitoredQueue | ||
MinRK
|
r3539 | from zmq.eventloop import ioloop, zmqstream | ||
MinRK
|
r4016 | from IPython.config.configurable import LoggingConfigurable | ||
MinRK
|
r6813 | from IPython.utils.py3compat import str_to_bytes | ||
chapmanb
|
r10572 | from IPython.utils.traitlets import Set, Instance, CFloat, Integer, Dict | ||
MinRK
|
r3610 | |||
MinRK
|
r6813 | from IPython.parallel.util import log_errors | ||
MinRK
|
r4155 | |||
MinRK
|
r3539 | class Heart(object): | ||
"""A basic heart object for responding to a HeartMonitor. | ||||
This is a simple wrapper with defaults for the most common | ||||
Device model for responding to heartbeats. | ||||
Bernardo B. Marques
|
r4872 | |||
It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using | ||||
MinRK
|
r7538 | SUB/DEALER for in/out. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r7538 | You can specify the DEALER's IDENTITY via the optional heart_id argument.""" | ||
MinRK
|
r3539 | device=None | ||
id=None | ||||
Jan Schulz
|
r8282 | def __init__(self, in_addr, out_addr, mon_addr=None, in_type=zmq.SUB, out_type=zmq.DEALER, mon_type=zmq.PUB, heart_id=None): | ||
if mon_addr is None: | ||||
self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type) | ||||
else: | ||||
self.device = ThreadMonitoredQueue(in_type, out_type, mon_type, in_prefix=b"", out_prefix=b"") | ||||
MinRK
|
r5886 | # do not allow the device to share global Context.instance, | ||
# which is the default behavior in pyzmq > 2.1.10 | ||||
self.device.context_factory = zmq.Context | ||||
MinRK
|
r3573 | self.device.daemon=True | ||
MinRK
|
r3539 | self.device.connect_in(in_addr) | ||
self.device.connect_out(out_addr) | ||||
Jan Schulz
|
r8282 | if mon_addr is not None: | ||
self.device.connect_mon(mon_addr) | ||||
MinRK
|
r3539 | if in_type == zmq.SUB: | ||
MinRK
|
r4155 | self.device.setsockopt_in(zmq.SUBSCRIBE, b"") | ||
MinRK
|
r3539 | if heart_id is None: | ||
MinRK
|
r4160 | heart_id = uuid.uuid4().bytes | ||
MinRK
|
r3539 | self.device.setsockopt_out(zmq.IDENTITY, heart_id) | ||
self.id = heart_id | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | def start(self): | ||
return self.device.start() | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r5695 | |||
MinRK
|
r4016 | class HeartMonitor(LoggingConfigurable): | ||
MinRK
|
r3539 | """A basic HeartMonitor class | ||
pingstream: a PUB stream | ||||
MinRK
|
r7538 | pongstream: an ROUTER stream | ||
MinRK
|
r3539 | period: the period of the heartbeat in milliseconds""" | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r5953 | period = Integer(3000, config=True, | ||
MinRK
|
r3985 | help='The frequency at which the Hub pings the engines for heartbeats ' | ||
MinRK
|
r5387 | '(in ms)', | ||
MinRK
|
r3985 | ) | ||
chapmanb
|
r10593 | max_heartmonitor_misses = Integer(10, config=True, | ||
help='Allowed consecutive missed pings from controller Hub to engine before unregistering.', | ||||
chapmanb
|
r10572 | ) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3610 | pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream') | ||
pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream') | ||||
loop = Instance('zmq.eventloop.ioloop.IOLoop') | ||||
def _loop_default(self): | ||||
return ioloop.IOLoop.instance() | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3610 | # not settable: | ||
hearts=Set() | ||||
responses=Set() | ||||
chapmanb
|
r10572 | on_probation=Dict() | ||
MinRK
|
r3610 | last_ping=CFloat(0) | ||
_new_handlers = Set() | ||||
_failure_handlers = Set() | ||||
lifetime = CFloat(0) | ||||
tic = CFloat(0) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3610 | def __init__(self, **kwargs): | ||
super(HeartMonitor, self).__init__(**kwargs) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | self.pongstream.on_recv(self.handle_pong) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | def start(self): | ||
MinRK
|
r5677 | self.tic = time.time() | ||
MinRK
|
r3539 | self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop) | ||
self.caller.start() | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | def add_new_heart_handler(self, handler): | ||
"""add a new handler for new hearts""" | ||||
MinRK
|
r5695 | self.log.debug("heartbeat::new_heart_handler: %s", handler) | ||
MinRK
|
r3539 | self._new_handlers.add(handler) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | def add_heart_failure_handler(self, handler): | ||
"""add a new handler for heart failure""" | ||||
MinRK
|
r5695 | self.log.debug("heartbeat::new heart failure handler: %s", handler) | ||
MinRK
|
r3539 | self._failure_handlers.add(handler) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | def beat(self): | ||
Bernardo B. Marques
|
r4872 | self.pongstream.flush() | ||
MinRK
|
r3539 | self.last_ping = self.lifetime | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | toc = time.time() | ||
self.lifetime += toc-self.tic | ||||
self.tic = toc | ||||
MinRK
|
r5695 | self.log.debug("heartbeat::sending %s", self.lifetime) | ||
MinRK
|
r3539 | goodhearts = self.hearts.intersection(self.responses) | ||
missed_beats = self.hearts.difference(goodhearts) | ||||
newhearts = self.responses.difference(goodhearts) | ||||
map(self.handle_new_heart, newhearts) | ||||
chapmanb
|
r10572 | heartfailures, on_probation = self._check_missed(missed_beats, self.on_probation, | ||
self.hearts) | ||||
MinRK
|
r3539 | map(self.handle_heart_failure, heartfailures) | ||
chapmanb
|
r10572 | self.on_probation = on_probation | ||
MinRK
|
r3539 | self.responses = set() | ||
Jan Schulz
|
r8282 | #print self.on_probation, self.hearts | ||
MinRK
|
r5695 | # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts)) | ||
MinRK
|
r6813 | self.pingstream.send(str_to_bytes(str(self.lifetime))) | ||
MinRK
|
r5952 | # flush stream to force immediate socket send | ||
self.pingstream.flush() | ||||
Bernardo B. Marques
|
r4872 | |||
chapmanb
|
r10572 | def _check_missed(self, missed_beats, on_probation, hearts): | ||
"""Update heartbeats on probation, identifying any that have too many misses. | ||||
""" | ||||
failures = [] | ||||
new_probation = {} | ||||
for cur_heart in (b for b in missed_beats if b in hearts): | ||||
miss_count = on_probation.get(cur_heart, 0) + 1 | ||||
chapmanb
|
r10592 | self.log.info("heartbeat::missed %s : %s" % (cur_heart, miss_count)) | ||
chapmanb
|
r10572 | if miss_count > self.max_heartmonitor_misses: | ||
failures.append(cur_heart) | ||||
else: | ||||
new_probation[cur_heart] = miss_count | ||||
return failures, new_probation | ||||
MinRK
|
r3539 | def handle_new_heart(self, heart): | ||
if self._new_handlers: | ||||
for handler in self._new_handlers: | ||||
handler(heart) | ||||
else: | ||||
MinRK
|
r5695 | self.log.info("heartbeat::yay, got new heart %s!", heart) | ||
MinRK
|
r3539 | self.hearts.add(heart) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3539 | def handle_heart_failure(self, heart): | ||
if self._failure_handlers: | ||||
for handler in self._failure_handlers: | ||||
try: | ||||
handler(heart) | ||||
MinRK
|
r3556 | except Exception as e: | ||
MinRK
|
r5695 | self.log.error("heartbeat::Bad Handler! %s", handler, exc_info=True) | ||
MinRK
|
r3539 | pass | ||
else: | ||||
MinRK
|
r5695 | self.log.info("heartbeat::Heart %s failed :(", heart) | ||
MinRK
|
r3539 | self.hearts.remove(heart) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r6324 | @log_errors | ||
MinRK
|
r3539 | def handle_pong(self, msg): | ||
"a heart just beat" | ||||
MinRK
|
r6813 | current = str_to_bytes(str(self.lifetime)) | ||
last = str_to_bytes(str(self.last_ping)) | ||||
MinRK
|
r4155 | if msg[1] == current: | ||
MinRK
|
r3539 | delta = time.time()-self.tic | ||
MinRK
|
r3610 | # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta)) | ||
MinRK
|
r3539 | self.responses.add(msg[0]) | ||
MinRK
|
r4155 | elif msg[1] == last: | ||
MinRK
|
r3539 | delta = time.time()-self.tic + (self.lifetime-self.last_ping) | ||
MinRK
|
r5695 | self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond", msg[0], 1000*delta) | ||
MinRK
|
r3539 | self.responses.add(msg[0]) | ||
else: | ||||
MinRK
|
r5695 | self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)", msg[1], self.lifetime) | ||
MinRK
|
r3539 | |||