##// END OF EJS Templates
Rework the heartbeat checking (configureable, period from hub)...
Jan Schulz -
Show More
@@ -53,14 +53,11 b' class EngineFactory(RegistrationFactory):'
53 timeout=Float(5.0, config=True,
53 timeout=Float(5.0, config=True,
54 help="""The time (in seconds) to wait for the Controller to respond
54 help="""The time (in seconds) to wait for the Controller to respond
55 to registration requests before giving up.""")
55 to registration requests before giving up.""")
56 hb_check_period=Integer(5000, config=True,
56 max_heartbeat_misses=Integer(0, config=True,
57 help="""The time (in ms) to check for a heartbeat ping from the
58 Controller. Ensure that check period is bigger than the heartbeat period
59 from the controller (set via "HeartMonitor.period" in the Controller config)
60 so that at least one ping is received during each check period.""")
61 hb_max_misses=Integer(5, config=True,
62 help="""The maximum number of times a check for the heartbeat ping of a
57 help="""The maximum number of times a check for the heartbeat ping of a
63 controller can be missed before shutting down the engine.""")
58 controller can be missed before shutting down the engine.
59
60 If set to 0, the check is disabled.""")
64 sshserver=Unicode(config=True,
61 sshserver=Unicode(config=True,
65 help="""The SSH server to use for tunneling connections to the Controller.""")
62 help="""The SSH server to use for tunneling connections to the Controller.""")
66 sshkey=Unicode(config=True,
63 sshkey=Unicode(config=True,
@@ -68,12 +65,14 b' class EngineFactory(RegistrationFactory):'
68 paramiko=Bool(sys.platform == 'win32', config=True,
65 paramiko=Bool(sys.platform == 'win32', config=True,
69 help="""Whether to use paramiko instead of openssh for tunnels.""")
66 help="""Whether to use paramiko instead of openssh for tunnels.""")
70
67
68
71 # not configurable:
69 # not configurable:
72 connection_info = Dict()
70 connection_info = Dict()
73 user_ns = Dict()
71 user_ns = Dict()
74 id = Integer(allow_none=True)
72 id = Integer(allow_none=True)
75 registrar = Instance('zmq.eventloop.zmqstream.ZMQStream')
73 registrar = Instance('zmq.eventloop.zmqstream.ZMQStream')
76 kernel = Instance(Kernel)
74 kernel = Instance(Kernel)
75 hb_check_period=Integer()
77
76
78 # States for the heartbeat monitoring
77 # States for the heartbeat monitoring
79 # Initial values for monitored and pinged must satisfy "monitored > pinged == False" so that
78 # Initial values for monitored and pinged must satisfy "monitored > pinged == False" so that
@@ -153,7 +152,7 b' class EngineFactory(RegistrationFactory):'
153
152
154 def _report_ping(self, msg):
153 def _report_ping(self, msg):
155 """Callback for when the heartmonitor.Heart receives a ping"""
154 """Callback for when the heartmonitor.Heart receives a ping"""
156 self.log.debug("Received a ping: %s", msg)
155 #self.log.debug("Received a ping: %s", msg)
157 self._hb_last_pinged = time.time()
156 self._hb_last_pinged = time.time()
158
157
159 def complete_registration(self, msg, connect, maybe_tunnel):
158 def complete_registration(self, msg, connect, maybe_tunnel):
@@ -186,7 +185,9 b' class EngineFactory(RegistrationFactory):'
186 self._hb_listener = zmqstream.ZMQStream(mon, self.loop)
185 self._hb_listener = zmqstream.ZMQStream(mon, self.loop)
187 self._hb_listener.on_recv(self._report_ping)
186 self._hb_listener.on_recv(self._report_ping)
188
187
189 hb_monitor = "tcp://127.0.0.1:%i"%mport
188 hb_monitor = None
189 if self.max_heartbeat_misses > 0:
190 hb_monitor = "tcp://127.0.0.1:%i"%mport
190
191
191 heart = Heart(hb_ping, hb_pong, hb_monitor , heart_id=identity)
192 heart = Heart(hb_ping, hb_pong, hb_monitor , heart_id=identity)
192 heart.start()
193 heart.start()
@@ -232,6 +233,20 b' class EngineFactory(RegistrationFactory):'
232
233
233 self.kernel.shell.display_pub.topic = cast_bytes('engine.%i.displaypub' % self.id)
234 self.kernel.shell.display_pub.topic = cast_bytes('engine.%i.displaypub' % self.id)
234
235
236
237 # periodically check the heartbeat pings of the controller
238 # Should be started here and not in "start()" so that the right period can be taken
239 # from the hubs HeartBeatMonitor.period
240 if self.max_heartbeat_misses > 0:
241 # Use a slightly bigger check period than the hub signal period to not warn unnecessary
242 self.hb_check_period = int(content['hb_period'])+10
243 self.log.info("Starting to monitor the heartbeat signal from the hub every %i ms." , self.hb_check_period)
244 self._hb_reporter = ioloop.PeriodicCallback(self._hb_monitor, self.hb_check_period, self.loop)
245 self._hb_reporter.start()
246 else:
247 self.log.info("Monitoring of the heartbeat signal from the hub is not enabled.")
248
249
235 # FIXME: This is a hack until IPKernelApp and IPEngineApp can be fully merged
250 # FIXME: This is a hack until IPKernelApp and IPEngineApp can be fully merged
236 app = IPKernelApp(config=self.config, shell=self.kernel.shell, kernel=self.kernel, log=self.log)
251 app = IPKernelApp(config=self.config, shell=self.kernel.shell, kernel=self.kernel, log=self.log)
237 app.init_profile_dir()
252 app.init_profile_dir()
@@ -266,11 +281,12 b' class EngineFactory(RegistrationFactory):'
266 self._hb_missed_beats += 1
281 self._hb_missed_beats += 1
267 self.log.warn("No heartbeat in the last %s ms (%s time(s) in a row).", self.hb_check_period, self._hb_missed_beats)
282 self.log.warn("No heartbeat in the last %s ms (%s time(s) in a row).", self.hb_check_period, self._hb_missed_beats)
268 else:
283 else:
284 #self.log.debug("Heartbeat received (after missing %s beats).", self._hb_missed_beats)
269 self._hb_missed_beats = 0
285 self._hb_missed_beats = 0
270
286
271 if self._hb_missed_beats >= self.hb_max_misses:
287 if self._hb_missed_beats >= self.max_heartbeat_misses:
272 self.log.fatal("Maximum number of heartbeats misses reached (%s times %s ms), shutting down.",
288 self.log.fatal("Maximum number of heartbeats misses reached (%s times %s ms), shutting down.",
273 self.hb_max_misses, self.hb_check_period)
289 self.max_heartbeat_misses, self.hb_check_period)
274 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
290 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
275 self.loop.stop()
291 self.loop.stop()
276
292
@@ -282,8 +298,5 b' class EngineFactory(RegistrationFactory):'
282 dc.start()
298 dc.start()
283 self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop)
299 self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop)
284 self._abort_dc.start()
300 self._abort_dc.start()
285 # periodically check the heartbeat pings of the controller
286 self._hb_reporter = ioloop.PeriodicCallback(self._hb_monitor, self.hb_check_period, self.loop)
287 self._hb_reporter.start()
288
301
289
302
General Comments 0
You need to be logged in to leave comments. Login now