Show More
@@ -25,7 +25,7 b' from zmq.eventloop import ioloop, zmqstream' | |||||
25 | from IPython.external.ssh import tunnel |
|
25 | from IPython.external.ssh import tunnel | |
26 | # internal |
|
26 | # internal | |
27 | from IPython.utils.traitlets import ( |
|
27 | from IPython.utils.traitlets import ( | |
28 | Instance, Dict, Integer, Type, CFloat, Unicode, CBytes, Bool |
|
28 | Instance, Dict, Integer, Type, CFloat, CInt, Unicode, CBytes, Bool | |
29 | ) |
|
29 | ) | |
30 | from IPython.utils.py3compat import cast_bytes |
|
30 | from IPython.utils.py3compat import cast_bytes | |
31 |
|
31 | |||
@@ -53,6 +53,12 b' class EngineFactory(RegistrationFactory):' | |||||
53 | timeout=CFloat(5, config=True, |
|
53 | timeout=CFloat(5, config=True, | |
54 | help="""The time (in seconds) to wait for the Controller to respond |
|
54 | help="""The time (in seconds) to wait for the Controller to respond | |
55 | to registration requests before giving up.""") |
|
55 | to registration requests before giving up.""") | |
|
56 | hb_check_period=CFloat(5, config=True, | |||
|
57 | help="""The time (in seconds) to check for a heartbeat ping from the | |||
|
58 | Controller.""") | |||
|
59 | hb_max_misses=CInt(5, config=True, | |||
|
60 | help="""The maximum number of times a check for the heartbeat ping of a | |||
|
61 | controller can be missed before shutting down the engine.""") | |||
56 | sshserver=Unicode(config=True, |
|
62 | sshserver=Unicode(config=True, | |
57 | help="""The SSH server to use for tunneling connections to the Controller.""") |
|
63 | help="""The SSH server to use for tunneling connections to the Controller.""") | |
58 | sshkey=Unicode(config=True, |
|
64 | sshkey=Unicode(config=True, | |
@@ -66,6 +72,13 b' class EngineFactory(RegistrationFactory):' | |||||
66 | id = Integer(allow_none=True) |
|
72 | id = Integer(allow_none=True) | |
67 | registrar = Instance('zmq.eventloop.zmqstream.ZMQStream') |
|
73 | registrar = Instance('zmq.eventloop.zmqstream.ZMQStream') | |
68 | kernel = Instance(Kernel) |
|
74 | kernel = Instance(Kernel) | |
|
75 | ||||
|
76 | # States for the heartbeat monitoring | |||
|
77 | _hb_last_pinged = None | |||
|
78 | _hb_last_monitored = None | |||
|
79 | _hb_missed_beats = 0 | |||
|
80 | # The zmq Stream which receives the pings from the Heart | |||
|
81 | _hb_listener = None | |||
69 |
|
82 | |||
70 | bident = CBytes() |
|
83 | bident = CBytes() | |
71 | ident = Unicode() |
|
84 | ident = Unicode() | |
@@ -134,6 +147,11 b' class EngineFactory(RegistrationFactory):' | |||||
134 | # print (self.session.key) |
|
147 | # print (self.session.key) | |
135 | self.session.send(self.registrar, "registration_request", content=content) |
|
148 | self.session.send(self.registrar, "registration_request", content=content) | |
136 |
|
149 | |||
|
150 | def _report_ping(self, msg): | |||
|
151 | """Callback for when the heartmonitor.Heart receives a ping""" | |||
|
152 | self.log.debug("Received a ping: %s", msg) | |||
|
153 | self._hb_last_pinged = time.time() | |||
|
154 | ||||
137 | def complete_registration(self, msg, connect, maybe_tunnel): |
|
155 | def complete_registration(self, msg, connect, maybe_tunnel): | |
138 | # print msg |
|
156 | # print msg | |
139 | self._abort_dc.stop() |
|
157 | self._abort_dc.stop() | |
@@ -156,8 +174,17 b' class EngineFactory(RegistrationFactory):' | |||||
156 | # possibly forward hb ports with tunnels |
|
174 | # possibly forward hb ports with tunnels | |
157 | hb_ping = maybe_tunnel(url('hb_ping')) |
|
175 | hb_ping = maybe_tunnel(url('hb_ping')) | |
158 | hb_pong = maybe_tunnel(url('hb_pong')) |
|
176 | hb_pong = maybe_tunnel(url('hb_pong')) | |
|
177 | ||||
|
178 | # Add a monitor socket which will record the last time a ping was seen | |||
|
179 | mon = self.context.socket(zmq.SUB) | |||
|
180 | mport = mon.bind_to_random_port('tcp://127.0.0.1') | |||
|
181 | mon.setsockopt(zmq.SUBSCRIBE, b"") | |||
|
182 | self._hb_listener = zmqstream.ZMQStream(mon, self.loop) | |||
|
183 | self._hb_listener.on_recv(self._report_ping) | |||
|
184 | ||||
|
185 | hb_monitor = "tcp://127.0.0.1:%i"%mport | |||
159 |
|
186 | |||
160 | heart = Heart(hb_ping, hb_pong, heart_id=identity) |
|
187 | heart = Heart(hb_ping, hb_pong, hb_monitor , heart_id=identity) | |
161 | heart.start() |
|
188 | heart.start() | |
162 |
|
189 | |||
163 | # create Shell Connections (MUX, Task, etc.): |
|
190 | # create Shell Connections (MUX, Task, etc.): | |
@@ -228,9 +255,31 b' class EngineFactory(RegistrationFactory):' | |||||
228 | time.sleep(1) |
|
255 | time.sleep(1) | |
229 | sys.exit(255) |
|
256 | sys.exit(255) | |
230 |
|
257 | |||
|
258 | def _hb_monitor(self): | |||
|
259 | """Callback to monitor the heartbeat from the controller""" | |||
|
260 | self._hb_listener.flush() | |||
|
261 | if self._hb_last_monitored > self._hb_last_pinged: | |||
|
262 | self._hb_missed_beats += 1 | |||
|
263 | self.log.warn("No heartbeat in the last %s seconds.", self.hb_check_period) | |||
|
264 | else: | |||
|
265 | self._hb_missed_beats = 0 | |||
|
266 | ||||
|
267 | if self._hb_missed_beats >= self.hb_max_misses: | |||
|
268 | self.log.fatal("Maximum number of heartbeats misses reached (%s times %s seconds), shutting down.", | |||
|
269 | self.hb_max_misses, self.hb_check_period) | |||
|
270 | self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id)) | |||
|
271 | self.loop.stop() | |||
|
272 | ||||
|
273 | self._hb_last_monitored = time.time() | |||
|
274 | ||||
|
275 | ||||
231 | def start(self): |
|
276 | def start(self): | |
232 | dc = ioloop.DelayedCallback(self.register, 0, self.loop) |
|
277 | dc = ioloop.DelayedCallback(self.register, 0, self.loop) | |
233 | dc.start() |
|
278 | dc.start() | |
234 | self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop) |
|
279 | self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop) | |
235 | self._abort_dc.start() |
|
280 | self._abort_dc.start() | |
|
281 | # periodically check the heartbeat pings of the controller | |||
|
282 | self._hb_reporter = ioloop.PeriodicCallback(self._hb_monitor, self.hb_check_period* 1000, self.loop) | |||
|
283 | self._hb_reporter.start() | |||
|
284 | ||||
236 |
|
285 |
General Comments 0
You need to be logged in to leave comments.
Login now