Show More
@@ -53,14 +53,11 b' class EngineFactory(RegistrationFactory):' | |||||
53 | timeout=Float(5.0, config=True, |
|
53 | timeout=Float(5.0, config=True, | |
54 | help="""The time (in seconds) to wait for the Controller to respond |
|
54 | help="""The time (in seconds) to wait for the Controller to respond | |
55 | to registration requests before giving up.""") |
|
55 | to registration requests before giving up.""") | |
56 |
|
|
56 | max_heartbeat_misses=Integer(0, config=True, | |
57 | help="""The time (in ms) to check for a heartbeat ping from the |
|
|||
58 | Controller. Ensure that check period is bigger than the heartbeat period |
|
|||
59 | from the controller (set via "HeartMonitor.period" in the Controller config) |
|
|||
60 | so that at least one ping is received during each check period.""") |
|
|||
61 | hb_max_misses=Integer(5, config=True, |
|
|||
62 | help="""The maximum number of times a check for the heartbeat ping of a |
|
57 | help="""The maximum number of times a check for the heartbeat ping of a | |
63 |
controller can be missed before shutting down the engine. |
|
58 | controller can be missed before shutting down the engine. | |
|
59 | ||||
|
60 | If set to 0, the check is disabled.""") | |||
64 | sshserver=Unicode(config=True, |
|
61 | sshserver=Unicode(config=True, | |
65 | help="""The SSH server to use for tunneling connections to the Controller.""") |
|
62 | help="""The SSH server to use for tunneling connections to the Controller.""") | |
66 | sshkey=Unicode(config=True, |
|
63 | sshkey=Unicode(config=True, | |
@@ -68,12 +65,14 b' class EngineFactory(RegistrationFactory):' | |||||
68 | paramiko=Bool(sys.platform == 'win32', config=True, |
|
65 | paramiko=Bool(sys.platform == 'win32', config=True, | |
69 | help="""Whether to use paramiko instead of openssh for tunnels.""") |
|
66 | help="""Whether to use paramiko instead of openssh for tunnels.""") | |
70 |
|
67 | |||
|
68 | ||||
71 | # not configurable: |
|
69 | # not configurable: | |
72 | connection_info = Dict() |
|
70 | connection_info = Dict() | |
73 | user_ns = Dict() |
|
71 | user_ns = Dict() | |
74 | id = Integer(allow_none=True) |
|
72 | id = Integer(allow_none=True) | |
75 | registrar = Instance('zmq.eventloop.zmqstream.ZMQStream') |
|
73 | registrar = Instance('zmq.eventloop.zmqstream.ZMQStream') | |
76 | kernel = Instance(Kernel) |
|
74 | kernel = Instance(Kernel) | |
|
75 | hb_check_period=Integer() | |||
77 |
|
76 | |||
78 | # States for the heartbeat monitoring |
|
77 | # States for the heartbeat monitoring | |
79 | # Initial values for monitored and pinged must satisfy "monitored > pinged == False" so that |
|
78 | # Initial values for monitored and pinged must satisfy "monitored > pinged == False" so that | |
@@ -153,7 +152,7 b' class EngineFactory(RegistrationFactory):' | |||||
153 |
|
152 | |||
154 | def _report_ping(self, msg): |
|
153 | def _report_ping(self, msg): | |
155 | """Callback for when the heartmonitor.Heart receives a ping""" |
|
154 | """Callback for when the heartmonitor.Heart receives a ping""" | |
156 | self.log.debug("Received a ping: %s", msg) |
|
155 | #self.log.debug("Received a ping: %s", msg) | |
157 | self._hb_last_pinged = time.time() |
|
156 | self._hb_last_pinged = time.time() | |
158 |
|
157 | |||
159 | def complete_registration(self, msg, connect, maybe_tunnel): |
|
158 | def complete_registration(self, msg, connect, maybe_tunnel): | |
@@ -186,7 +185,9 b' class EngineFactory(RegistrationFactory):' | |||||
186 | self._hb_listener = zmqstream.ZMQStream(mon, self.loop) |
|
185 | self._hb_listener = zmqstream.ZMQStream(mon, self.loop) | |
187 | self._hb_listener.on_recv(self._report_ping) |
|
186 | self._hb_listener.on_recv(self._report_ping) | |
188 |
|
187 | |||
189 |
hb_monitor = |
|
188 | hb_monitor = None | |
|
189 | if self.max_heartbeat_misses > 0: | |||
|
190 | hb_monitor = "tcp://127.0.0.1:%i"%mport | |||
190 |
|
191 | |||
191 | heart = Heart(hb_ping, hb_pong, hb_monitor , heart_id=identity) |
|
192 | heart = Heart(hb_ping, hb_pong, hb_monitor , heart_id=identity) | |
192 | heart.start() |
|
193 | heart.start() | |
@@ -232,6 +233,20 b' class EngineFactory(RegistrationFactory):' | |||||
232 |
|
233 | |||
233 | self.kernel.shell.display_pub.topic = cast_bytes('engine.%i.displaypub' % self.id) |
|
234 | self.kernel.shell.display_pub.topic = cast_bytes('engine.%i.displaypub' % self.id) | |
234 |
|
235 | |||
|
236 | ||||
|
237 | # periodically check the heartbeat pings of the controller | |||
|
238 | # Should be started here and not in "start()" so that the right period can be taken | |||
|
239 | # from the hubs HeartBeatMonitor.period | |||
|
240 | if self.max_heartbeat_misses > 0: | |||
|
241 | # Use a slightly bigger check period than the hub signal period to not warn unnecessary | |||
|
242 | self.hb_check_period = int(content['hb_period'])+10 | |||
|
243 | self.log.info("Starting to monitor the heartbeat signal from the hub every %i ms." , self.hb_check_period) | |||
|
244 | self._hb_reporter = ioloop.PeriodicCallback(self._hb_monitor, self.hb_check_period, self.loop) | |||
|
245 | self._hb_reporter.start() | |||
|
246 | else: | |||
|
247 | self.log.info("Monitoring of the heartbeat signal from the hub is not enabled.") | |||
|
248 | ||||
|
249 | ||||
235 | # FIXME: This is a hack until IPKernelApp and IPEngineApp can be fully merged |
|
250 | # FIXME: This is a hack until IPKernelApp and IPEngineApp can be fully merged | |
236 | app = IPKernelApp(config=self.config, shell=self.kernel.shell, kernel=self.kernel, log=self.log) |
|
251 | app = IPKernelApp(config=self.config, shell=self.kernel.shell, kernel=self.kernel, log=self.log) | |
237 | app.init_profile_dir() |
|
252 | app.init_profile_dir() | |
@@ -266,11 +281,12 b' class EngineFactory(RegistrationFactory):' | |||||
266 | self._hb_missed_beats += 1 |
|
281 | self._hb_missed_beats += 1 | |
267 | self.log.warn("No heartbeat in the last %s ms (%s time(s) in a row).", self.hb_check_period, self._hb_missed_beats) |
|
282 | self.log.warn("No heartbeat in the last %s ms (%s time(s) in a row).", self.hb_check_period, self._hb_missed_beats) | |
268 | else: |
|
283 | else: | |
|
284 | #self.log.debug("Heartbeat received (after missing %s beats).", self._hb_missed_beats) | |||
269 | self._hb_missed_beats = 0 |
|
285 | self._hb_missed_beats = 0 | |
270 |
|
286 | |||
271 |
if self._hb_missed_beats >= self. |
|
287 | if self._hb_missed_beats >= self.max_heartbeat_misses: | |
272 | self.log.fatal("Maximum number of heartbeats misses reached (%s times %s ms), shutting down.", |
|
288 | self.log.fatal("Maximum number of heartbeats misses reached (%s times %s ms), shutting down.", | |
273 |
self. |
|
289 | self.max_heartbeat_misses, self.hb_check_period) | |
274 | self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id)) |
|
290 | self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id)) | |
275 | self.loop.stop() |
|
291 | self.loop.stop() | |
276 |
|
292 | |||
@@ -282,8 +298,5 b' class EngineFactory(RegistrationFactory):' | |||||
282 | dc.start() |
|
298 | dc.start() | |
283 | self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop) |
|
299 | self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop) | |
284 | self._abort_dc.start() |
|
300 | self._abort_dc.start() | |
285 | # periodically check the heartbeat pings of the controller |
|
|||
286 | self._hb_reporter = ioloop.PeriodicCallback(self._hb_monitor, self.hb_check_period, self.loop) |
|
|||
287 | self._hb_reporter.start() |
|
|||
288 |
|
301 | |||
289 |
|
302 |
General Comments 0
You need to be logged in to leave comments.
Login now