##// END OF EJS Templates
Monitor the heartbeat of the cluster controller...
Jan Schulz -
Show More
@@ -25,7 +25,7 b' from zmq.eventloop import ioloop, zmqstream'
25 from IPython.external.ssh import tunnel
25 from IPython.external.ssh import tunnel
26 # internal
26 # internal
27 from IPython.utils.traitlets import (
27 from IPython.utils.traitlets import (
28 Instance, Dict, Integer, Type, CFloat, Unicode, CBytes, Bool
28 Instance, Dict, Integer, Type, CFloat, CInt, Unicode, CBytes, Bool
29 )
29 )
30 from IPython.utils.py3compat import cast_bytes
30 from IPython.utils.py3compat import cast_bytes
31
31
@@ -53,6 +53,12 b' class EngineFactory(RegistrationFactory):'
53 timeout=CFloat(5, config=True,
53 timeout=CFloat(5, config=True,
54 help="""The time (in seconds) to wait for the Controller to respond
54 help="""The time (in seconds) to wait for the Controller to respond
55 to registration requests before giving up.""")
55 to registration requests before giving up.""")
56 hb_check_period=CFloat(5, config=True,
57 help="""The time (in seconds) to check for a heartbeat ping from the
58 Controller.""")
59 hb_max_misses=CInt(5, config=True,
60 help="""The maximum number of times a check for the heartbeat ping of a
61 controller can be missed before shutting down the engine.""")
56 sshserver=Unicode(config=True,
62 sshserver=Unicode(config=True,
57 help="""The SSH server to use for tunneling connections to the Controller.""")
63 help="""The SSH server to use for tunneling connections to the Controller.""")
58 sshkey=Unicode(config=True,
64 sshkey=Unicode(config=True,
@@ -67,6 +73,13 b' class EngineFactory(RegistrationFactory):'
67 registrar = Instance('zmq.eventloop.zmqstream.ZMQStream')
73 registrar = Instance('zmq.eventloop.zmqstream.ZMQStream')
68 kernel = Instance(Kernel)
74 kernel = Instance(Kernel)
69
75
76 # States for the heartbeat monitoring
77 _hb_last_pinged = None
78 _hb_last_monitored = None
79 _hb_missed_beats = 0
80 # The zmq Stream which receives the pings from the Heart
81 _hb_listener = None
82
70 bident = CBytes()
83 bident = CBytes()
71 ident = Unicode()
84 ident = Unicode()
72 def _ident_changed(self, name, old, new):
85 def _ident_changed(self, name, old, new):
@@ -134,6 +147,11 b' class EngineFactory(RegistrationFactory):'
134 # print (self.session.key)
147 # print (self.session.key)
135 self.session.send(self.registrar, "registration_request", content=content)
148 self.session.send(self.registrar, "registration_request", content=content)
136
149
150 def _report_ping(self, msg):
151 """Callback for when the heartmonitor.Heart receives a ping"""
152 self.log.debug("Received a ping: %s", msg)
153 self._hb_last_pinged = time.time()
154
137 def complete_registration(self, msg, connect, maybe_tunnel):
155 def complete_registration(self, msg, connect, maybe_tunnel):
138 # print msg
156 # print msg
139 self._abort_dc.stop()
157 self._abort_dc.stop()
@@ -157,7 +175,16 b' class EngineFactory(RegistrationFactory):'
157 hb_ping = maybe_tunnel(url('hb_ping'))
175 hb_ping = maybe_tunnel(url('hb_ping'))
158 hb_pong = maybe_tunnel(url('hb_pong'))
176 hb_pong = maybe_tunnel(url('hb_pong'))
159
177
160 heart = Heart(hb_ping, hb_pong, heart_id=identity)
178 # Add a monitor socket which will record the last time a ping was seen
179 mon = self.context.socket(zmq.SUB)
180 mport = mon.bind_to_random_port('tcp://127.0.0.1')
181 mon.setsockopt(zmq.SUBSCRIBE, b"")
182 self._hb_listener = zmqstream.ZMQStream(mon, self.loop)
183 self._hb_listener.on_recv(self._report_ping)
184
185 hb_monitor = "tcp://127.0.0.1:%i"%mport
186
187 heart = Heart(hb_ping, hb_pong, hb_monitor , heart_id=identity)
161 heart.start()
188 heart.start()
162
189
163 # create Shell Connections (MUX, Task, etc.):
190 # create Shell Connections (MUX, Task, etc.):
@@ -228,9 +255,31 b' class EngineFactory(RegistrationFactory):'
228 time.sleep(1)
255 time.sleep(1)
229 sys.exit(255)
256 sys.exit(255)
230
257
258 def _hb_monitor(self):
259 """Callback to monitor the heartbeat from the controller"""
260 self._hb_listener.flush()
261 if self._hb_last_monitored > self._hb_last_pinged:
262 self._hb_missed_beats += 1
263 self.log.warn("No heartbeat in the last %s seconds.", self.hb_check_period)
264 else:
265 self._hb_missed_beats = 0
266
267 if self._hb_missed_beats >= self.hb_max_misses:
268 self.log.fatal("Maximum number of heartbeats misses reached (%s times %s seconds), shutting down.",
269 self.hb_max_misses, self.hb_check_period)
270 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
271 self.loop.stop()
272
273 self._hb_last_monitored = time.time()
274
275
231 def start(self):
276 def start(self):
232 dc = ioloop.DelayedCallback(self.register, 0, self.loop)
277 dc = ioloop.DelayedCallback(self.register, 0, self.loop)
233 dc.start()
278 dc.start()
234 self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop)
279 self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop)
235 self._abort_dc.start()
280 self._abort_dc.start()
281 # periodically check the heartbeat pings of the controller
282 self._hb_reporter = ioloop.PeriodicCallback(self._hb_monitor, self.hb_check_period* 1000, self.loop)
283 self._hb_reporter.start()
284
236
285
General Comments 0
You need to be logged in to leave comments. Login now