diff --git a/IPython/parallel/engine/engine.py b/IPython/parallel/engine/engine.py index 840442e..933f21a 100644 --- a/IPython/parallel/engine/engine.py +++ b/IPython/parallel/engine/engine.py @@ -53,14 +53,11 @@ class EngineFactory(RegistrationFactory): timeout=Float(5.0, config=True, help="""The time (in seconds) to wait for the Controller to respond to registration requests before giving up.""") - hb_check_period=Integer(5000, config=True, - help="""The time (in ms) to check for a heartbeat ping from the - Controller. Ensure that check period is bigger than the heartbeat period - from the controller (set via "HeartMonitor.period" in the Controller config) - so that at least one ping is received during each check period.""") - hb_max_misses=Integer(5, config=True, + max_heartbeat_misses=Integer(0, config=True, help="""The maximum number of times a check for the heartbeat ping of a - controller can be missed before shutting down the engine.""") + controller can be missed before shutting down the engine. + + If set to 0, the check is disabled.""") sshserver=Unicode(config=True, help="""The SSH server to use for tunneling connections to the Controller.""") sshkey=Unicode(config=True, @@ -68,12 +65,14 @@ class EngineFactory(RegistrationFactory): paramiko=Bool(sys.platform == 'win32', config=True, help="""Whether to use paramiko instead of openssh for tunnels.""") + # not configurable: connection_info = Dict() user_ns = Dict() id = Integer(allow_none=True) registrar = Instance('zmq.eventloop.zmqstream.ZMQStream') kernel = Instance(Kernel) + hb_check_period=Integer() # States for the heartbeat monitoring # Initial values for monitored and pinged must satisfy "monitored > pinged == False" so that @@ -153,7 +152,7 @@ class EngineFactory(RegistrationFactory): def _report_ping(self, msg): """Callback for when the heartmonitor.Heart receives a ping""" - self.log.debug("Received a ping: %s", msg) + #self.log.debug("Received a ping: %s", msg) self._hb_last_pinged = time.time() def complete_registration(self, msg, connect, maybe_tunnel): @@ -186,7 +185,9 @@ class EngineFactory(RegistrationFactory): self._hb_listener = zmqstream.ZMQStream(mon, self.loop) self._hb_listener.on_recv(self._report_ping) - hb_monitor = "tcp://127.0.0.1:%i"%mport + hb_monitor = None + if self.max_heartbeat_misses > 0: + hb_monitor = "tcp://127.0.0.1:%i"%mport heart = Heart(hb_ping, hb_pong, hb_monitor , heart_id=identity) heart.start() @@ -232,6 +233,20 @@ class EngineFactory(RegistrationFactory): self.kernel.shell.display_pub.topic = cast_bytes('engine.%i.displaypub' % self.id) + + # periodically check the heartbeat pings of the controller + # Should be started here and not in "start()" so that the right period can be taken + # from the hubs HeartBeatMonitor.period + if self.max_heartbeat_misses > 0: + # Use a slightly bigger check period than the hub signal period to not warn unnecessary + self.hb_check_period = int(content['hb_period'])+10 + self.log.info("Starting to monitor the heartbeat signal from the hub every %i ms." , self.hb_check_period) + self._hb_reporter = ioloop.PeriodicCallback(self._hb_monitor, self.hb_check_period, self.loop) + self._hb_reporter.start() + else: + self.log.info("Monitoring of the heartbeat signal from the hub is not enabled.") + + # FIXME: This is a hack until IPKernelApp and IPEngineApp can be fully merged app = IPKernelApp(config=self.config, shell=self.kernel.shell, kernel=self.kernel, log=self.log) app.init_profile_dir() @@ -266,11 +281,12 @@ class EngineFactory(RegistrationFactory): self._hb_missed_beats += 1 self.log.warn("No heartbeat in the last %s ms (%s time(s) in a row).", self.hb_check_period, self._hb_missed_beats) else: + #self.log.debug("Heartbeat received (after missing %s beats).", self._hb_missed_beats) self._hb_missed_beats = 0 - if self._hb_missed_beats >= self.hb_max_misses: + if self._hb_missed_beats >= self.max_heartbeat_misses: self.log.fatal("Maximum number of heartbeats misses reached (%s times %s ms), shutting down.", - self.hb_max_misses, self.hb_check_period) + self.max_heartbeat_misses, self.hb_check_period) self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id)) self.loop.stop() @@ -282,8 +298,5 @@ class EngineFactory(RegistrationFactory): dc.start() self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop) self._abort_dc.start() - # periodically check the heartbeat pings of the controller - self._hb_reporter = ioloop.PeriodicCallback(self._hb_monitor, self.hb_check_period, self.loop) - self._hb_reporter.start()