##// END OF EJS Templates
Rework the heartbeat checking (configureable, period from hub)...
Jan Schulz -
Show More
@@ -53,14 +53,11 b' class EngineFactory(RegistrationFactory):'
53 53 timeout=Float(5.0, config=True,
54 54 help="""The time (in seconds) to wait for the Controller to respond
55 55 to registration requests before giving up.""")
56 hb_check_period=Integer(5000, config=True,
57 help="""The time (in ms) to check for a heartbeat ping from the
58 Controller. Ensure that check period is bigger than the heartbeat period
59 from the controller (set via "HeartMonitor.period" in the Controller config)
60 so that at least one ping is received during each check period.""")
61 hb_max_misses=Integer(5, config=True,
56 max_heartbeat_misses=Integer(0, config=True,
62 57 help="""The maximum number of times a check for the heartbeat ping of a
63 controller can be missed before shutting down the engine.""")
58 controller can be missed before shutting down the engine.
59
60 If set to 0, the check is disabled.""")
64 61 sshserver=Unicode(config=True,
65 62 help="""The SSH server to use for tunneling connections to the Controller.""")
66 63 sshkey=Unicode(config=True,
@@ -68,12 +65,14 b' class EngineFactory(RegistrationFactory):'
68 65 paramiko=Bool(sys.platform == 'win32', config=True,
69 66 help="""Whether to use paramiko instead of openssh for tunnels.""")
70 67
68
71 69 # not configurable:
72 70 connection_info = Dict()
73 71 user_ns = Dict()
74 72 id = Integer(allow_none=True)
75 73 registrar = Instance('zmq.eventloop.zmqstream.ZMQStream')
76 74 kernel = Instance(Kernel)
75 hb_check_period=Integer()
77 76
78 77 # States for the heartbeat monitoring
79 78 # Initial values for monitored and pinged must satisfy "monitored > pinged == False" so that
@@ -153,7 +152,7 b' class EngineFactory(RegistrationFactory):'
153 152
154 153 def _report_ping(self, msg):
155 154 """Callback for when the heartmonitor.Heart receives a ping"""
156 self.log.debug("Received a ping: %s", msg)
155 #self.log.debug("Received a ping: %s", msg)
157 156 self._hb_last_pinged = time.time()
158 157
159 158 def complete_registration(self, msg, connect, maybe_tunnel):
@@ -186,7 +185,9 b' class EngineFactory(RegistrationFactory):'
186 185 self._hb_listener = zmqstream.ZMQStream(mon, self.loop)
187 186 self._hb_listener.on_recv(self._report_ping)
188 187
189 hb_monitor = "tcp://127.0.0.1:%i"%mport
188 hb_monitor = None
189 if self.max_heartbeat_misses > 0:
190 hb_monitor = "tcp://127.0.0.1:%i"%mport
190 191
191 192 heart = Heart(hb_ping, hb_pong, hb_monitor , heart_id=identity)
192 193 heart.start()
@@ -232,6 +233,20 b' class EngineFactory(RegistrationFactory):'
232 233
233 234 self.kernel.shell.display_pub.topic = cast_bytes('engine.%i.displaypub' % self.id)
234 235
236
237 # periodically check the heartbeat pings of the controller
238 # Should be started here and not in "start()" so that the right period can be taken
239 # from the hubs HeartBeatMonitor.period
240 if self.max_heartbeat_misses > 0:
241 # Use a slightly bigger check period than the hub signal period to not warn unnecessary
242 self.hb_check_period = int(content['hb_period'])+10
243 self.log.info("Starting to monitor the heartbeat signal from the hub every %i ms." , self.hb_check_period)
244 self._hb_reporter = ioloop.PeriodicCallback(self._hb_monitor, self.hb_check_period, self.loop)
245 self._hb_reporter.start()
246 else:
247 self.log.info("Monitoring of the heartbeat signal from the hub is not enabled.")
248
249
235 250 # FIXME: This is a hack until IPKernelApp and IPEngineApp can be fully merged
236 251 app = IPKernelApp(config=self.config, shell=self.kernel.shell, kernel=self.kernel, log=self.log)
237 252 app.init_profile_dir()
@@ -266,11 +281,12 b' class EngineFactory(RegistrationFactory):'
266 281 self._hb_missed_beats += 1
267 282 self.log.warn("No heartbeat in the last %s ms (%s time(s) in a row).", self.hb_check_period, self._hb_missed_beats)
268 283 else:
284 #self.log.debug("Heartbeat received (after missing %s beats).", self._hb_missed_beats)
269 285 self._hb_missed_beats = 0
270 286
271 if self._hb_missed_beats >= self.hb_max_misses:
287 if self._hb_missed_beats >= self.max_heartbeat_misses:
272 288 self.log.fatal("Maximum number of heartbeats misses reached (%s times %s ms), shutting down.",
273 self.hb_max_misses, self.hb_check_period)
289 self.max_heartbeat_misses, self.hb_check_period)
274 290 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
275 291 self.loop.stop()
276 292
@@ -282,8 +298,5 b' class EngineFactory(RegistrationFactory):'
282 298 dc.start()
283 299 self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop)
284 300 self._abort_dc.start()
285 # periodically check the heartbeat pings of the controller
286 self._hb_reporter = ioloop.PeriodicCallback(self._hb_monitor, self.hb_check_period, self.loop)
287 self._hb_reporter.start()
288 301
289 302
General Comments 0
You need to be logged in to leave comments. Login now