Show More
@@ -24,7 +24,7 b' from zmq.eventloop import ioloop, zmqstream' | |||||
24 |
|
24 | |||
25 | from IPython.config.configurable import LoggingConfigurable |
|
25 | from IPython.config.configurable import LoggingConfigurable | |
26 | from IPython.utils.py3compat import str_to_bytes |
|
26 | from IPython.utils.py3compat import str_to_bytes | |
27 | from IPython.utils.traitlets import Set, Instance, CFloat, Integer |
|
27 | from IPython.utils.traitlets import Set, Instance, CFloat, Integer, Dict | |
28 |
|
28 | |||
29 | from IPython.parallel.util import log_errors |
|
29 | from IPython.parallel.util import log_errors | |
30 |
|
30 | |||
@@ -74,6 +74,9 b' class HeartMonitor(LoggingConfigurable):' | |||||
74 | help='The frequency at which the Hub pings the engines for heartbeats ' |
|
74 | help='The frequency at which the Hub pings the engines for heartbeats ' | |
75 | '(in ms)', |
|
75 | '(in ms)', | |
76 | ) |
|
76 | ) | |
|
77 | max_heartmonitor_misses = Integer(2, config=True, | |||
|
78 | help='Allow misses from engine to controller heart monitor before shutting down.', | |||
|
79 | ) | |||
77 |
|
80 | |||
78 | pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream') |
|
81 | pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream') | |
79 | pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream') |
|
82 | pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream') | |
@@ -84,7 +87,7 b' class HeartMonitor(LoggingConfigurable):' | |||||
84 | # not settable: |
|
87 | # not settable: | |
85 | hearts=Set() |
|
88 | hearts=Set() | |
86 | responses=Set() |
|
89 | responses=Set() | |
87 |
on_probation= |
|
90 | on_probation=Dict() | |
88 | last_ping=CFloat(0) |
|
91 | last_ping=CFloat(0) | |
89 | _new_handlers = Set() |
|
92 | _new_handlers = Set() | |
90 | _failure_handlers = Set() |
|
93 | _failure_handlers = Set() | |
@@ -121,11 +124,12 b' class HeartMonitor(LoggingConfigurable):' | |||||
121 | self.log.debug("heartbeat::sending %s", self.lifetime) |
|
124 | self.log.debug("heartbeat::sending %s", self.lifetime) | |
122 | goodhearts = self.hearts.intersection(self.responses) |
|
125 | goodhearts = self.hearts.intersection(self.responses) | |
123 | missed_beats = self.hearts.difference(goodhearts) |
|
126 | missed_beats = self.hearts.difference(goodhearts) | |
124 | heartfailures = self.on_probation.intersection(missed_beats) |
|
|||
125 | newhearts = self.responses.difference(goodhearts) |
|
127 | newhearts = self.responses.difference(goodhearts) | |
126 | map(self.handle_new_heart, newhearts) |
|
128 | map(self.handle_new_heart, newhearts) | |
|
129 | heartfailures, on_probation = self._check_missed(missed_beats, self.on_probation, | |||
|
130 | self.hearts) | |||
127 | map(self.handle_heart_failure, heartfailures) |
|
131 | map(self.handle_heart_failure, heartfailures) | |
128 |
self.on_probation = |
|
132 | self.on_probation = on_probation | |
129 | self.responses = set() |
|
133 | self.responses = set() | |
130 | #print self.on_probation, self.hearts |
|
134 | #print self.on_probation, self.hearts | |
131 | # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts)) |
|
135 | # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts)) | |
@@ -133,6 +137,19 b' class HeartMonitor(LoggingConfigurable):' | |||||
133 | # flush stream to force immediate socket send |
|
137 | # flush stream to force immediate socket send | |
134 | self.pingstream.flush() |
|
138 | self.pingstream.flush() | |
135 |
|
139 | |||
|
140 | def _check_missed(self, missed_beats, on_probation, hearts): | |||
|
141 | """Update heartbeats on probation, identifying any that have too many misses. | |||
|
142 | """ | |||
|
143 | failures = [] | |||
|
144 | new_probation = {} | |||
|
145 | for cur_heart in (b for b in missed_beats if b in hearts): | |||
|
146 | miss_count = on_probation.get(cur_heart, 0) + 1 | |||
|
147 | if miss_count > self.max_heartmonitor_misses: | |||
|
148 | failures.append(cur_heart) | |||
|
149 | else: | |||
|
150 | new_probation[cur_heart] = miss_count | |||
|
151 | return failures, new_probation | |||
|
152 | ||||
136 | def handle_new_heart(self, heart): |
|
153 | def handle_new_heart(self, heart): | |
137 | if self._new_handlers: |
|
154 | if self._new_handlers: | |
138 | for handler in self._new_handlers: |
|
155 | for handler in self._new_handlers: |
General Comments 0
You need to be logged in to leave comments.
Login now