From d8c2d4f9bae3da5c51941f11584f976362d0886a 2013-05-06 19:56:32 From: chapmanb Date: 2013-05-06 19:56:32 Subject: [PATCH] Provide configuration hook to specify allowable heartmonitor misses from controller checking engine connectivity --- diff --git a/IPython/parallel/controller/heartmonitor.py b/IPython/parallel/controller/heartmonitor.py index ccfb093..53b4591 100755 --- a/IPython/parallel/controller/heartmonitor.py +++ b/IPython/parallel/controller/heartmonitor.py @@ -24,7 +24,7 @@ from zmq.eventloop import ioloop, zmqstream from IPython.config.configurable import LoggingConfigurable from IPython.utils.py3compat import str_to_bytes -from IPython.utils.traitlets import Set, Instance, CFloat, Integer +from IPython.utils.traitlets import Set, Instance, CFloat, Integer, Dict from IPython.parallel.util import log_errors @@ -74,6 +74,9 @@ class HeartMonitor(LoggingConfigurable): help='The frequency at which the Hub pings the engines for heartbeats ' '(in ms)', ) + max_heartmonitor_misses = Integer(2, config=True, + help='Allow misses from engine to controller heart monitor before shutting down.', + ) pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream') pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream') @@ -84,7 +87,7 @@ class HeartMonitor(LoggingConfigurable): # not settable: hearts=Set() responses=Set() - on_probation=Set() + on_probation=Dict() last_ping=CFloat(0) _new_handlers = Set() _failure_handlers = Set() @@ -121,11 +124,12 @@ class HeartMonitor(LoggingConfigurable): self.log.debug("heartbeat::sending %s", self.lifetime) goodhearts = self.hearts.intersection(self.responses) missed_beats = self.hearts.difference(goodhearts) - heartfailures = self.on_probation.intersection(missed_beats) newhearts = self.responses.difference(goodhearts) map(self.handle_new_heart, newhearts) + heartfailures, on_probation = self._check_missed(missed_beats, self.on_probation, + self.hearts) map(self.handle_heart_failure, heartfailures) - self.on_probation = missed_beats.intersection(self.hearts) + self.on_probation = on_probation self.responses = set() #print self.on_probation, self.hearts # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts)) @@ -133,6 +137,19 @@ class HeartMonitor(LoggingConfigurable): # flush stream to force immediate socket send self.pingstream.flush() + def _check_missed(self, missed_beats, on_probation, hearts): + """Update heartbeats on probation, identifying any that have too many misses. + """ + failures = [] + new_probation = {} + for cur_heart in (b for b in missed_beats if b in hearts): + miss_count = on_probation.get(cur_heart, 0) + 1 + if miss_count > self.max_heartmonitor_misses: + failures.append(cur_heart) + else: + new_probation[cur_heart] = miss_count + return failures, new_probation + def handle_new_heart(self, heart): if self._new_handlers: for handler in self._new_handlers: