##// END OF EJS Templates
allow_none=False by default for Type and Instance
allow_none=False by default for Type and Instance

File last commit:

r20940:4c8e4259
r20940:4c8e4259
Show More
heartmonitor.py
193 lines | 7.2 KiB | text/x-python | PythonLexer
MinRK
prep newparallel for rebase...
r3539 #!/usr/bin/env python
"""
MinRK
remove remaining references to deprecated XREP/XREQ names...
r7538 A multi-heart Heartbeat system using PUB and ROUTER sockets. pings are sent out on the PUB,
and hearts are tracked based on their DEALER identities.
MinRK
prep newparallel for rebase...
r3539 """
MinRK
add secondary `debug` flag for heartbeats...
r17713
# Copyright (c) IPython Development Team.
# Distributed under the terms of the Modified BSD License.
MinRK
prep newparallel for rebase...
r3539
MinRK
use print_function
r3553 from __future__ import print_function
MinRK
prep newparallel for rebase...
r3539 import time
MinRK
resort imports in a cleaner order
r3631 import uuid
MinRK
prep newparallel for rebase...
r3539
import zmq
Jan Schulz
Make heartmonitor.Heart monitorable...
r8282 from zmq.devices import ThreadDevice, ThreadMonitoredQueue
MinRK
prep newparallel for rebase...
r3539 from zmq.eventloop import ioloop, zmqstream
MinRK
add LoggingConfigurable base class
r4016 from IPython.config.configurable import LoggingConfigurable
MinRK
discard parallel.util.asbytes in favor of py3compat.cast_bytes
r6813 from IPython.utils.py3compat import str_to_bytes
MinRK
add secondary `debug` flag for heartbeats...
r17713 from IPython.utils.traitlets import Set, Instance, CFloat, Integer, Dict, Bool
MinRK
rework logging connections
r3610
Min RK
s/IPython.parallel/ipython_parallel/
r20860 from ipython_parallel.util import log_errors
MinRK
update parallel code for py3k...
r4155
MinRK
prep newparallel for rebase...
r3539 class Heart(object):
"""A basic heart object for responding to a HeartMonitor.
This is a simple wrapper with defaults for the most common
Device model for responding to heartbeats.
Bernardo B. Marques
remove all trailling spaces
r4872
It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
MinRK
remove remaining references to deprecated XREP/XREQ names...
r7538 SUB/DEALER for in/out.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
remove remaining references to deprecated XREP/XREQ names...
r7538 You can specify the DEALER's IDENTITY via the optional heart_id argument."""
MinRK
prep newparallel for rebase...
r3539 device=None
id=None
Jan Schulz
Make heartmonitor.Heart monitorable...
r8282 def __init__(self, in_addr, out_addr, mon_addr=None, in_type=zmq.SUB, out_type=zmq.DEALER, mon_type=zmq.PUB, heart_id=None):
if mon_addr is None:
self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
else:
self.device = ThreadMonitoredQueue(in_type, out_type, mon_type, in_prefix=b"", out_prefix=b"")
MinRK
parallel heart also gets its own Context
r5886 # do not allow the device to share global Context.instance,
# which is the default behavior in pyzmq > 2.1.10
self.device.context_factory = zmq.Context
MinRK
Moved parallel test files to parallel subpackages...
r3573 self.device.daemon=True
MinRK
prep newparallel for rebase...
r3539 self.device.connect_in(in_addr)
self.device.connect_out(out_addr)
Jan Schulz
Make heartmonitor.Heart monitorable...
r8282 if mon_addr is not None:
self.device.connect_mon(mon_addr)
MinRK
prep newparallel for rebase...
r3539 if in_type == zmq.SUB:
MinRK
update parallel code for py3k...
r4155 self.device.setsockopt_in(zmq.SUBSCRIBE, b"")
MinRK
prep newparallel for rebase...
r3539 if heart_id is None:
MinRK
enforce ascii identities in parallel code...
r4160 heart_id = uuid.uuid4().bytes
MinRK
prep newparallel for rebase...
r3539 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
self.id = heart_id
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 def start(self):
return self.device.start()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
minor controller logging adjustments...
r5695
MinRK
add LoggingConfigurable base class
r4016 class HeartMonitor(LoggingConfigurable):
MinRK
prep newparallel for rebase...
r3539 """A basic HeartMonitor class
pingstream: a PUB stream
MinRK
remove remaining references to deprecated XREP/XREQ names...
r7538 pongstream: an ROUTER stream
MinRK
prep newparallel for rebase...
r3539 period: the period of the heartbeat in milliseconds"""
MinRK
add secondary `debug` flag for heartbeats...
r17713
debug = Bool(False, config=True,
help="""Whether to include every heartbeat in debugging output.
Has to be set explicitly, because there will be *a lot* of output.
"""
)
MinRK
relax default heartbeat period in IPython.parallel to 3s (from 1s)...
r5953 period = Integer(3000, config=True,
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 help='The frequency at which the Hub pings the engines for heartbeats '
MinRK
fix typo in HeartMonitor.period helpstring
r5387 '(in ms)',
MinRK
all ipcluster scripts in some degree of working order with new config
r3985 )
chapmanb
Lower default misses to 10 and improve documentation of option
r10593 max_heartmonitor_misses = Integer(10, config=True,
help='Allowed consecutive missed pings from controller Hub to engine before unregistering.',
chapmanb
Provide configuration hook to specify allowable heartmonitor misses from controller checking engine connectivity
r10572 )
Bernardo B. Marques
remove all trailling spaces
r4872
Sylvain Corlay
allow_none=False by default for Type and Instance
r20940 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream', allow_none=True)
pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream', allow_none=True)
loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=True)
MinRK
rework logging connections
r3610 def _loop_default(self):
return ioloop.IOLoop.instance()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
rework logging connections
r3610 # not settable:
hearts=Set()
responses=Set()
chapmanb
Provide configuration hook to specify allowable heartmonitor misses from controller checking engine connectivity
r10572 on_probation=Dict()
MinRK
rework logging connections
r3610 last_ping=CFloat(0)
_new_handlers = Set()
_failure_handlers = Set()
lifetime = CFloat(0)
tic = CFloat(0)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
rework logging connections
r3610 def __init__(self, **kwargs):
super(HeartMonitor, self).__init__(**kwargs)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 self.pongstream.on_recv(self.handle_pong)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 def start(self):
MinRK
set HeartMonitor.tic on start...
r5677 self.tic = time.time()
MinRK
prep newparallel for rebase...
r3539 self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
self.caller.start()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 def add_new_heart_handler(self, handler):
"""add a new handler for new hearts"""
MinRK
minor controller logging adjustments...
r5695 self.log.debug("heartbeat::new_heart_handler: %s", handler)
MinRK
prep newparallel for rebase...
r3539 self._new_handlers.add(handler)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 def add_heart_failure_handler(self, handler):
"""add a new handler for heart failure"""
MinRK
minor controller logging adjustments...
r5695 self.log.debug("heartbeat::new heart failure handler: %s", handler)
MinRK
prep newparallel for rebase...
r3539 self._failure_handlers.add(handler)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 def beat(self):
Bernardo B. Marques
remove all trailling spaces
r4872 self.pongstream.flush()
MinRK
prep newparallel for rebase...
r3539 self.last_ping = self.lifetime
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 toc = time.time()
self.lifetime += toc-self.tic
self.tic = toc
MinRK
add secondary `debug` flag for heartbeats...
r17713 if self.debug:
self.log.debug("heartbeat::sending %s", self.lifetime)
MinRK
prep newparallel for rebase...
r3539 goodhearts = self.hearts.intersection(self.responses)
missed_beats = self.hearts.difference(goodhearts)
newhearts = self.responses.difference(goodhearts)
Thomas Kluyver
Fix parallel test suite
r13383 for heart in newhearts:
self.handle_new_heart(heart)
chapmanb
Provide configuration hook to specify allowable heartmonitor misses from controller checking engine connectivity
r10572 heartfailures, on_probation = self._check_missed(missed_beats, self.on_probation,
self.hearts)
Thomas Kluyver
Fix parallel test suite
r13383 for failure in heartfailures:
self.handle_heart_failure(failure)
chapmanb
Provide configuration hook to specify allowable heartmonitor misses from controller checking engine connectivity
r10572 self.on_probation = on_probation
MinRK
prep newparallel for rebase...
r3539 self.responses = set()
Jan Schulz
Make heartmonitor.Heart monitorable...
r8282 #print self.on_probation, self.hearts
MinRK
minor controller logging adjustments...
r5695 # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts))
MinRK
discard parallel.util.asbytes in favor of py3compat.cast_bytes
r6813 self.pingstream.send(str_to_bytes(str(self.lifetime)))
MinRK
flush outgoing heartbeats...
r5952 # flush stream to force immediate socket send
self.pingstream.flush()
Bernardo B. Marques
remove all trailling spaces
r4872
chapmanb
Provide configuration hook to specify allowable heartmonitor misses from controller checking engine connectivity
r10572 def _check_missed(self, missed_beats, on_probation, hearts):
"""Update heartbeats on probation, identifying any that have too many misses.
"""
failures = []
new_probation = {}
for cur_heart in (b for b in missed_beats if b in hearts):
miss_count = on_probation.get(cur_heart, 0) + 1
chapmanb
Improve documentation of default to indicate consecutive and supply logging of misses
r10592 self.log.info("heartbeat::missed %s : %s" % (cur_heart, miss_count))
chapmanb
Provide configuration hook to specify allowable heartmonitor misses from controller checking engine connectivity
r10572 if miss_count > self.max_heartmonitor_misses:
failures.append(cur_heart)
else:
new_probation[cur_heart] = miss_count
return failures, new_probation
MinRK
prep newparallel for rebase...
r3539 def handle_new_heart(self, heart):
if self._new_handlers:
for handler in self._new_handlers:
handler(heart)
else:
MinRK
minor controller logging adjustments...
r5695 self.log.info("heartbeat::yay, got new heart %s!", heart)
MinRK
prep newparallel for rebase...
r3539 self.hearts.add(heart)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
prep newparallel for rebase...
r3539 def handle_heart_failure(self, heart):
if self._failure_handlers:
for handler in self._failure_handlers:
try:
handler(heart)
MinRK
general parallel code cleanup
r3556 except Exception as e:
MinRK
minor controller logging adjustments...
r5695 self.log.error("heartbeat::Bad Handler! %s", handler, exc_info=True)
MinRK
prep newparallel for rebase...
r3539 pass
else:
MinRK
minor controller logging adjustments...
r5695 self.log.info("heartbeat::Heart %s failed :(", heart)
MinRK
prep newparallel for rebase...
r3539 self.hearts.remove(heart)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add log_errors decorator for on_recv callbacks...
r6324 @log_errors
MinRK
prep newparallel for rebase...
r3539 def handle_pong(self, msg):
"a heart just beat"
MinRK
discard parallel.util.asbytes in favor of py3compat.cast_bytes
r6813 current = str_to_bytes(str(self.lifetime))
last = str_to_bytes(str(self.last_ping))
MinRK
update parallel code for py3k...
r4155 if msg[1] == current:
MinRK
prep newparallel for rebase...
r3539 delta = time.time()-self.tic
MinRK
add secondary `debug` flag for heartbeats...
r17713 if self.debug:
self.log.debug("heartbeat::heart %r took %.2f ms to respond", msg[0], 1000*delta)
MinRK
prep newparallel for rebase...
r3539 self.responses.add(msg[0])
MinRK
update parallel code for py3k...
r4155 elif msg[1] == last:
MinRK
prep newparallel for rebase...
r3539 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
MinRK
minor controller logging adjustments...
r5695 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond", msg[0], 1000*delta)
MinRK
prep newparallel for rebase...
r3539 self.responses.add(msg[0])
else:
MinRK
minor controller logging adjustments...
r5695 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)", msg[1], self.lifetime)
MinRK
prep newparallel for rebase...
r3539