From 958637f3ff2a9aecf9ffdf85c81a67b9756d0a2f 2012-10-14 01:51:10 From: Jan Schulz Date: 2012-10-14 01:51:10 Subject: [PATCH] Rework the heartbeat checking (configureable, period from hub) Make the new heartbeat monitor checks configureable. Also use the heartbeat monitor period from the hub as the value for the check period. This way you don't need to ensure that the period on the engine side is bigger than on the hub side. --- diff --git a/IPython/parallel/engine/engine.py b/IPython/parallel/engine/engine.py index 840442e..933f21a 100644 --- a/IPython/parallel/engine/engine.py +++ b/IPython/parallel/engine/engine.py @@ -53,14 +53,11 @@ class EngineFactory(RegistrationFactory): timeout=Float(5.0, config=True, help="""The time (in seconds) to wait for the Controller to respond to registration requests before giving up.""") - hb_check_period=Integer(5000, config=True, - help="""The time (in ms) to check for a heartbeat ping from the - Controller. Ensure that check period is bigger than the heartbeat period - from the controller (set via "HeartMonitor.period" in the Controller config) - so that at least one ping is received during each check period.""") - hb_max_misses=Integer(5, config=True, + max_heartbeat_misses=Integer(0, config=True, help="""The maximum number of times a check for the heartbeat ping of a - controller can be missed before shutting down the engine.""") + controller can be missed before shutting down the engine. + + If set to 0, the check is disabled.""") sshserver=Unicode(config=True, help="""The SSH server to use for tunneling connections to the Controller.""") sshkey=Unicode(config=True, @@ -68,12 +65,14 @@ class EngineFactory(RegistrationFactory): paramiko=Bool(sys.platform == 'win32', config=True, help="""Whether to use paramiko instead of openssh for tunnels.""") + # not configurable: connection_info = Dict() user_ns = Dict() id = Integer(allow_none=True) registrar = Instance('zmq.eventloop.zmqstream.ZMQStream') kernel = Instance(Kernel) + hb_check_period=Integer() # States for the heartbeat monitoring # Initial values for monitored and pinged must satisfy "monitored > pinged == False" so that @@ -153,7 +152,7 @@ class EngineFactory(RegistrationFactory): def _report_ping(self, msg): """Callback for when the heartmonitor.Heart receives a ping""" - self.log.debug("Received a ping: %s", msg) + #self.log.debug("Received a ping: %s", msg) self._hb_last_pinged = time.time() def complete_registration(self, msg, connect, maybe_tunnel): @@ -186,7 +185,9 @@ class EngineFactory(RegistrationFactory): self._hb_listener = zmqstream.ZMQStream(mon, self.loop) self._hb_listener.on_recv(self._report_ping) - hb_monitor = "tcp://127.0.0.1:%i"%mport + hb_monitor = None + if self.max_heartbeat_misses > 0: + hb_monitor = "tcp://127.0.0.1:%i"%mport heart = Heart(hb_ping, hb_pong, hb_monitor , heart_id=identity) heart.start() @@ -232,6 +233,20 @@ class EngineFactory(RegistrationFactory): self.kernel.shell.display_pub.topic = cast_bytes('engine.%i.displaypub' % self.id) + + # periodically check the heartbeat pings of the controller + # Should be started here and not in "start()" so that the right period can be taken + # from the hubs HeartBeatMonitor.period + if self.max_heartbeat_misses > 0: + # Use a slightly bigger check period than the hub signal period to not warn unnecessary + self.hb_check_period = int(content['hb_period'])+10 + self.log.info("Starting to monitor the heartbeat signal from the hub every %i ms." , self.hb_check_period) + self._hb_reporter = ioloop.PeriodicCallback(self._hb_monitor, self.hb_check_period, self.loop) + self._hb_reporter.start() + else: + self.log.info("Monitoring of the heartbeat signal from the hub is not enabled.") + + # FIXME: This is a hack until IPKernelApp and IPEngineApp can be fully merged app = IPKernelApp(config=self.config, shell=self.kernel.shell, kernel=self.kernel, log=self.log) app.init_profile_dir() @@ -266,11 +281,12 @@ class EngineFactory(RegistrationFactory): self._hb_missed_beats += 1 self.log.warn("No heartbeat in the last %s ms (%s time(s) in a row).", self.hb_check_period, self._hb_missed_beats) else: + #self.log.debug("Heartbeat received (after missing %s beats).", self._hb_missed_beats) self._hb_missed_beats = 0 - if self._hb_missed_beats >= self.hb_max_misses: + if self._hb_missed_beats >= self.max_heartbeat_misses: self.log.fatal("Maximum number of heartbeats misses reached (%s times %s ms), shutting down.", - self.hb_max_misses, self.hb_check_period) + self.max_heartbeat_misses, self.hb_check_period) self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id)) self.loop.stop() @@ -282,8 +298,5 @@ class EngineFactory(RegistrationFactory): dc.start() self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop) self._abort_dc.start() - # periodically check the heartbeat pings of the controller - self._hb_reporter = ioloop.PeriodicCallback(self._hb_monitor, self.hb_check_period, self.loop) - self._hb_reporter.start()