##// END OF EJS Templates
Provide configuration hook to specify allowable heartmonitor misses from controller checking engine connectivity
chapmanb -
Show More
@@ -24,7 +24,7 b' from zmq.eventloop import ioloop, zmqstream'
24
24
25 from IPython.config.configurable import LoggingConfigurable
25 from IPython.config.configurable import LoggingConfigurable
26 from IPython.utils.py3compat import str_to_bytes
26 from IPython.utils.py3compat import str_to_bytes
27 from IPython.utils.traitlets import Set, Instance, CFloat, Integer
27 from IPython.utils.traitlets import Set, Instance, CFloat, Integer, Dict
28
28
29 from IPython.parallel.util import log_errors
29 from IPython.parallel.util import log_errors
30
30
@@ -74,6 +74,9 b' class HeartMonitor(LoggingConfigurable):'
74 help='The frequency at which the Hub pings the engines for heartbeats '
74 help='The frequency at which the Hub pings the engines for heartbeats '
75 '(in ms)',
75 '(in ms)',
76 )
76 )
77 max_heartmonitor_misses = Integer(2, config=True,
78 help='Allow misses from engine to controller heart monitor before shutting down.',
79 )
77
80
78 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
81 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
79 pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
82 pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
@@ -84,7 +87,7 b' class HeartMonitor(LoggingConfigurable):'
84 # not settable:
87 # not settable:
85 hearts=Set()
88 hearts=Set()
86 responses=Set()
89 responses=Set()
87 on_probation=Set()
90 on_probation=Dict()
88 last_ping=CFloat(0)
91 last_ping=CFloat(0)
89 _new_handlers = Set()
92 _new_handlers = Set()
90 _failure_handlers = Set()
93 _failure_handlers = Set()
@@ -121,11 +124,12 b' class HeartMonitor(LoggingConfigurable):'
121 self.log.debug("heartbeat::sending %s", self.lifetime)
124 self.log.debug("heartbeat::sending %s", self.lifetime)
122 goodhearts = self.hearts.intersection(self.responses)
125 goodhearts = self.hearts.intersection(self.responses)
123 missed_beats = self.hearts.difference(goodhearts)
126 missed_beats = self.hearts.difference(goodhearts)
124 heartfailures = self.on_probation.intersection(missed_beats)
125 newhearts = self.responses.difference(goodhearts)
127 newhearts = self.responses.difference(goodhearts)
126 map(self.handle_new_heart, newhearts)
128 map(self.handle_new_heart, newhearts)
129 heartfailures, on_probation = self._check_missed(missed_beats, self.on_probation,
130 self.hearts)
127 map(self.handle_heart_failure, heartfailures)
131 map(self.handle_heart_failure, heartfailures)
128 self.on_probation = missed_beats.intersection(self.hearts)
132 self.on_probation = on_probation
129 self.responses = set()
133 self.responses = set()
130 #print self.on_probation, self.hearts
134 #print self.on_probation, self.hearts
131 # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts))
135 # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts))
@@ -133,6 +137,19 b' class HeartMonitor(LoggingConfigurable):'
133 # flush stream to force immediate socket send
137 # flush stream to force immediate socket send
134 self.pingstream.flush()
138 self.pingstream.flush()
135
139
140 def _check_missed(self, missed_beats, on_probation, hearts):
141 """Update heartbeats on probation, identifying any that have too many misses.
142 """
143 failures = []
144 new_probation = {}
145 for cur_heart in (b for b in missed_beats if b in hearts):
146 miss_count = on_probation.get(cur_heart, 0) + 1
147 if miss_count > self.max_heartmonitor_misses:
148 failures.append(cur_heart)
149 else:
150 new_probation[cur_heart] = miss_count
151 return failures, new_probation
152
136 def handle_new_heart(self, heart):
153 def handle_new_heart(self, heart):
137 if self._new_handlers:
154 if self._new_handlers:
138 for handler in self._new_handlers:
155 for handler in self._new_handlers:
General Comments 0
You need to be logged in to leave comments. Login now