Show More
@@ -25,7 +25,7 b' from zmq.eventloop import ioloop, zmqstream' | |||
|
25 | 25 | from IPython.external.ssh import tunnel |
|
26 | 26 | # internal |
|
27 | 27 | from IPython.utils.traitlets import ( |
|
28 | Instance, Dict, Integer, Type, CFloat, Unicode, CBytes, Bool | |
|
28 | Instance, Dict, Integer, Type, CFloat, CInt, Unicode, CBytes, Bool | |
|
29 | 29 | ) |
|
30 | 30 | from IPython.utils.py3compat import cast_bytes |
|
31 | 31 | |
@@ -53,6 +53,12 b' class EngineFactory(RegistrationFactory):' | |||
|
53 | 53 | timeout=CFloat(5, config=True, |
|
54 | 54 | help="""The time (in seconds) to wait for the Controller to respond |
|
55 | 55 | to registration requests before giving up.""") |
|
56 | hb_check_period=CFloat(5, config=True, | |
|
57 | help="""The time (in seconds) to check for a heartbeat ping from the | |
|
58 | Controller.""") | |
|
59 | hb_max_misses=CInt(5, config=True, | |
|
60 | help="""The maximum number of times a check for the heartbeat ping of a | |
|
61 | controller can be missed before shutting down the engine.""") | |
|
56 | 62 | sshserver=Unicode(config=True, |
|
57 | 63 | help="""The SSH server to use for tunneling connections to the Controller.""") |
|
58 | 64 | sshkey=Unicode(config=True, |
@@ -66,6 +72,13 b' class EngineFactory(RegistrationFactory):' | |||
|
66 | 72 | id = Integer(allow_none=True) |
|
67 | 73 | registrar = Instance('zmq.eventloop.zmqstream.ZMQStream') |
|
68 | 74 | kernel = Instance(Kernel) |
|
75 | ||
|
76 | # States for the heartbeat monitoring | |
|
77 | _hb_last_pinged = None | |
|
78 | _hb_last_monitored = None | |
|
79 | _hb_missed_beats = 0 | |
|
80 | # The zmq Stream which receives the pings from the Heart | |
|
81 | _hb_listener = None | |
|
69 | 82 | |
|
70 | 83 | bident = CBytes() |
|
71 | 84 | ident = Unicode() |
@@ -134,6 +147,11 b' class EngineFactory(RegistrationFactory):' | |||
|
134 | 147 | # print (self.session.key) |
|
135 | 148 | self.session.send(self.registrar, "registration_request", content=content) |
|
136 | 149 | |
|
150 | def _report_ping(self, msg): | |
|
151 | """Callback for when the heartmonitor.Heart receives a ping""" | |
|
152 | self.log.debug("Received a ping: %s", msg) | |
|
153 | self._hb_last_pinged = time.time() | |
|
154 | ||
|
137 | 155 | def complete_registration(self, msg, connect, maybe_tunnel): |
|
138 | 156 | # print msg |
|
139 | 157 | self._abort_dc.stop() |
@@ -156,8 +174,17 b' class EngineFactory(RegistrationFactory):' | |||
|
156 | 174 | # possibly forward hb ports with tunnels |
|
157 | 175 | hb_ping = maybe_tunnel(url('hb_ping')) |
|
158 | 176 | hb_pong = maybe_tunnel(url('hb_pong')) |
|
177 | ||
|
178 | # Add a monitor socket which will record the last time a ping was seen | |
|
179 | mon = self.context.socket(zmq.SUB) | |
|
180 | mport = mon.bind_to_random_port('tcp://127.0.0.1') | |
|
181 | mon.setsockopt(zmq.SUBSCRIBE, b"") | |
|
182 | self._hb_listener = zmqstream.ZMQStream(mon, self.loop) | |
|
183 | self._hb_listener.on_recv(self._report_ping) | |
|
184 | ||
|
185 | hb_monitor = "tcp://127.0.0.1:%i"%mport | |
|
159 | 186 | |
|
160 | heart = Heart(hb_ping, hb_pong, heart_id=identity) | |
|
187 | heart = Heart(hb_ping, hb_pong, hb_monitor , heart_id=identity) | |
|
161 | 188 | heart.start() |
|
162 | 189 | |
|
163 | 190 | # create Shell Connections (MUX, Task, etc.): |
@@ -228,9 +255,31 b' class EngineFactory(RegistrationFactory):' | |||
|
228 | 255 | time.sleep(1) |
|
229 | 256 | sys.exit(255) |
|
230 | 257 | |
|
258 | def _hb_monitor(self): | |
|
259 | """Callback to monitor the heartbeat from the controller""" | |
|
260 | self._hb_listener.flush() | |
|
261 | if self._hb_last_monitored > self._hb_last_pinged: | |
|
262 | self._hb_missed_beats += 1 | |
|
263 | self.log.warn("No heartbeat in the last %s seconds.", self.hb_check_period) | |
|
264 | else: | |
|
265 | self._hb_missed_beats = 0 | |
|
266 | ||
|
267 | if self._hb_missed_beats >= self.hb_max_misses: | |
|
268 | self.log.fatal("Maximum number of heartbeats misses reached (%s times %s seconds), shutting down.", | |
|
269 | self.hb_max_misses, self.hb_check_period) | |
|
270 | self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id)) | |
|
271 | self.loop.stop() | |
|
272 | ||
|
273 | self._hb_last_monitored = time.time() | |
|
274 | ||
|
275 | ||
|
231 | 276 | def start(self): |
|
232 | 277 | dc = ioloop.DelayedCallback(self.register, 0, self.loop) |
|
233 | 278 | dc.start() |
|
234 | 279 | self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop) |
|
235 | 280 | self._abort_dc.start() |
|
281 | # periodically check the heartbeat pings of the controller | |
|
282 | self._hb_reporter = ioloop.PeriodicCallback(self._hb_monitor, self.hb_check_period* 1000, self.loop) | |
|
283 | self._hb_reporter.start() | |
|
284 | ||
|
236 | 285 |
General Comments 0
You need to be logged in to leave comments.
Login now