Show More
@@ -53,14 +53,11 b' class EngineFactory(RegistrationFactory):' | |||
|
53 | 53 | timeout=Float(5.0, config=True, |
|
54 | 54 | help="""The time (in seconds) to wait for the Controller to respond |
|
55 | 55 | to registration requests before giving up.""") |
|
56 |
|
|
|
57 | help="""The time (in ms) to check for a heartbeat ping from the | |
|
58 | Controller. Ensure that check period is bigger than the heartbeat period | |
|
59 | from the controller (set via "HeartMonitor.period" in the Controller config) | |
|
60 | so that at least one ping is received during each check period.""") | |
|
61 | hb_max_misses=Integer(5, config=True, | |
|
56 | max_heartbeat_misses=Integer(0, config=True, | |
|
62 | 57 | help="""The maximum number of times a check for the heartbeat ping of a |
|
63 |
controller can be missed before shutting down the engine. |
|
|
58 | controller can be missed before shutting down the engine. | |
|
59 | ||
|
60 | If set to 0, the check is disabled.""") | |
|
64 | 61 | sshserver=Unicode(config=True, |
|
65 | 62 | help="""The SSH server to use for tunneling connections to the Controller.""") |
|
66 | 63 | sshkey=Unicode(config=True, |
@@ -68,12 +65,14 b' class EngineFactory(RegistrationFactory):' | |||
|
68 | 65 | paramiko=Bool(sys.platform == 'win32', config=True, |
|
69 | 66 | help="""Whether to use paramiko instead of openssh for tunnels.""") |
|
70 | 67 | |
|
68 | ||
|
71 | 69 | # not configurable: |
|
72 | 70 | connection_info = Dict() |
|
73 | 71 | user_ns = Dict() |
|
74 | 72 | id = Integer(allow_none=True) |
|
75 | 73 | registrar = Instance('zmq.eventloop.zmqstream.ZMQStream') |
|
76 | 74 | kernel = Instance(Kernel) |
|
75 | hb_check_period=Integer() | |
|
77 | 76 | |
|
78 | 77 | # States for the heartbeat monitoring |
|
79 | 78 | # Initial values for monitored and pinged must satisfy "monitored > pinged == False" so that |
@@ -153,7 +152,7 b' class EngineFactory(RegistrationFactory):' | |||
|
153 | 152 | |
|
154 | 153 | def _report_ping(self, msg): |
|
155 | 154 | """Callback for when the heartmonitor.Heart receives a ping""" |
|
156 | self.log.debug("Received a ping: %s", msg) | |
|
155 | #self.log.debug("Received a ping: %s", msg) | |
|
157 | 156 | self._hb_last_pinged = time.time() |
|
158 | 157 | |
|
159 | 158 | def complete_registration(self, msg, connect, maybe_tunnel): |
@@ -186,7 +185,9 b' class EngineFactory(RegistrationFactory):' | |||
|
186 | 185 | self._hb_listener = zmqstream.ZMQStream(mon, self.loop) |
|
187 | 186 | self._hb_listener.on_recv(self._report_ping) |
|
188 | 187 | |
|
189 |
hb_monitor = |
|
|
188 | hb_monitor = None | |
|
189 | if self.max_heartbeat_misses > 0: | |
|
190 | hb_monitor = "tcp://127.0.0.1:%i"%mport | |
|
190 | 191 | |
|
191 | 192 | heart = Heart(hb_ping, hb_pong, hb_monitor , heart_id=identity) |
|
192 | 193 | heart.start() |
@@ -232,6 +233,20 b' class EngineFactory(RegistrationFactory):' | |||
|
232 | 233 | |
|
233 | 234 | self.kernel.shell.display_pub.topic = cast_bytes('engine.%i.displaypub' % self.id) |
|
234 | 235 | |
|
236 | ||
|
237 | # periodically check the heartbeat pings of the controller | |
|
238 | # Should be started here and not in "start()" so that the right period can be taken | |
|
239 | # from the hubs HeartBeatMonitor.period | |
|
240 | if self.max_heartbeat_misses > 0: | |
|
241 | # Use a slightly bigger check period than the hub signal period to not warn unnecessary | |
|
242 | self.hb_check_period = int(content['hb_period'])+10 | |
|
243 | self.log.info("Starting to monitor the heartbeat signal from the hub every %i ms." , self.hb_check_period) | |
|
244 | self._hb_reporter = ioloop.PeriodicCallback(self._hb_monitor, self.hb_check_period, self.loop) | |
|
245 | self._hb_reporter.start() | |
|
246 | else: | |
|
247 | self.log.info("Monitoring of the heartbeat signal from the hub is not enabled.") | |
|
248 | ||
|
249 | ||
|
235 | 250 | # FIXME: This is a hack until IPKernelApp and IPEngineApp can be fully merged |
|
236 | 251 | app = IPKernelApp(config=self.config, shell=self.kernel.shell, kernel=self.kernel, log=self.log) |
|
237 | 252 | app.init_profile_dir() |
@@ -266,11 +281,12 b' class EngineFactory(RegistrationFactory):' | |||
|
266 | 281 | self._hb_missed_beats += 1 |
|
267 | 282 | self.log.warn("No heartbeat in the last %s ms (%s time(s) in a row).", self.hb_check_period, self._hb_missed_beats) |
|
268 | 283 | else: |
|
284 | #self.log.debug("Heartbeat received (after missing %s beats).", self._hb_missed_beats) | |
|
269 | 285 | self._hb_missed_beats = 0 |
|
270 | 286 | |
|
271 |
if self._hb_missed_beats >= self. |
|
|
287 | if self._hb_missed_beats >= self.max_heartbeat_misses: | |
|
272 | 288 | self.log.fatal("Maximum number of heartbeats misses reached (%s times %s ms), shutting down.", |
|
273 |
self. |
|
|
289 | self.max_heartbeat_misses, self.hb_check_period) | |
|
274 | 290 | self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id)) |
|
275 | 291 | self.loop.stop() |
|
276 | 292 | |
@@ -282,8 +298,5 b' class EngineFactory(RegistrationFactory):' | |||
|
282 | 298 | dc.start() |
|
283 | 299 | self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop) |
|
284 | 300 | self._abort_dc.start() |
|
285 | # periodically check the heartbeat pings of the controller | |
|
286 | self._hb_reporter = ioloop.PeriodicCallback(self._hb_monitor, self.hb_check_period, self.loop) | |
|
287 | self._hb_reporter.start() | |
|
288 | 301 | |
|
289 | 302 |
General Comments 0
You need to be logged in to leave comments.
Login now