##// END OF EJS Templates
Monitor the heartbeat of the cluster controller...
Jan Schulz -
Show More
@@ -1,236 +1,285
1 """A simple engine that talks to a controller over 0MQ.
1 """A simple engine that talks to a controller over 0MQ.
2 it handles registration, etc. and launches a kernel
2 it handles registration, etc. and launches a kernel
3 connected to the Controller's Schedulers.
3 connected to the Controller's Schedulers.
4
4
5 Authors:
5 Authors:
6
6
7 * Min RK
7 * Min RK
8 """
8 """
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Copyright (C) 2010-2011 The IPython Development Team
10 # Copyright (C) 2010-2011 The IPython Development Team
11 #
11 #
12 # Distributed under the terms of the BSD License. The full license is in
12 # Distributed under the terms of the BSD License. The full license is in
13 # the file COPYING, distributed as part of this software.
13 # the file COPYING, distributed as part of this software.
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15
15
16 from __future__ import print_function
16 from __future__ import print_function
17
17
18 import sys
18 import sys
19 import time
19 import time
20 from getpass import getpass
20 from getpass import getpass
21
21
22 import zmq
22 import zmq
23 from zmq.eventloop import ioloop, zmqstream
23 from zmq.eventloop import ioloop, zmqstream
24
24
25 from IPython.external.ssh import tunnel
25 from IPython.external.ssh import tunnel
26 # internal
26 # internal
27 from IPython.utils.traitlets import (
27 from IPython.utils.traitlets import (
28 Instance, Dict, Integer, Type, CFloat, Unicode, CBytes, Bool
28 Instance, Dict, Integer, Type, CFloat, CInt, Unicode, CBytes, Bool
29 )
29 )
30 from IPython.utils.py3compat import cast_bytes
30 from IPython.utils.py3compat import cast_bytes
31
31
32 from IPython.parallel.controller.heartmonitor import Heart
32 from IPython.parallel.controller.heartmonitor import Heart
33 from IPython.parallel.factory import RegistrationFactory
33 from IPython.parallel.factory import RegistrationFactory
34 from IPython.parallel.util import disambiguate_url
34 from IPython.parallel.util import disambiguate_url
35
35
36 from IPython.zmq.session import Message
36 from IPython.zmq.session import Message
37 from IPython.zmq.ipkernel import Kernel, IPKernelApp
37 from IPython.zmq.ipkernel import Kernel, IPKernelApp
38
38
39 class EngineFactory(RegistrationFactory):
39 class EngineFactory(RegistrationFactory):
40 """IPython engine"""
40 """IPython engine"""
41
41
42 # configurables:
42 # configurables:
43 out_stream_factory=Type('IPython.zmq.iostream.OutStream', config=True,
43 out_stream_factory=Type('IPython.zmq.iostream.OutStream', config=True,
44 help="""The OutStream for handling stdout/err.
44 help="""The OutStream for handling stdout/err.
45 Typically 'IPython.zmq.iostream.OutStream'""")
45 Typically 'IPython.zmq.iostream.OutStream'""")
46 display_hook_factory=Type('IPython.zmq.displayhook.ZMQDisplayHook', config=True,
46 display_hook_factory=Type('IPython.zmq.displayhook.ZMQDisplayHook', config=True,
47 help="""The class for handling displayhook.
47 help="""The class for handling displayhook.
48 Typically 'IPython.zmq.displayhook.ZMQDisplayHook'""")
48 Typically 'IPython.zmq.displayhook.ZMQDisplayHook'""")
49 location=Unicode(config=True,
49 location=Unicode(config=True,
50 help="""The location (an IP address) of the controller. This is
50 help="""The location (an IP address) of the controller. This is
51 used for disambiguating URLs, to determine whether
51 used for disambiguating URLs, to determine whether
52 loopback should be used to connect or the public address.""")
52 loopback should be used to connect or the public address.""")
53 timeout=CFloat(5, config=True,
53 timeout=CFloat(5, config=True,
54 help="""The time (in seconds) to wait for the Controller to respond
54 help="""The time (in seconds) to wait for the Controller to respond
55 to registration requests before giving up.""")
55 to registration requests before giving up.""")
56 hb_check_period=CFloat(5, config=True,
57 help="""The time (in seconds) to check for a heartbeat ping from the
58 Controller.""")
59 hb_max_misses=CInt(5, config=True,
60 help="""The maximum number of times a check for the heartbeat ping of a
61 controller can be missed before shutting down the engine.""")
56 sshserver=Unicode(config=True,
62 sshserver=Unicode(config=True,
57 help="""The SSH server to use for tunneling connections to the Controller.""")
63 help="""The SSH server to use for tunneling connections to the Controller.""")
58 sshkey=Unicode(config=True,
64 sshkey=Unicode(config=True,
59 help="""The SSH private key file to use when tunneling connections to the Controller.""")
65 help="""The SSH private key file to use when tunneling connections to the Controller.""")
60 paramiko=Bool(sys.platform == 'win32', config=True,
66 paramiko=Bool(sys.platform == 'win32', config=True,
61 help="""Whether to use paramiko instead of openssh for tunnels.""")
67 help="""Whether to use paramiko instead of openssh for tunnels.""")
62
68
63 # not configurable:
69 # not configurable:
64 connection_info = Dict()
70 connection_info = Dict()
65 user_ns = Dict()
71 user_ns = Dict()
66 id = Integer(allow_none=True)
72 id = Integer(allow_none=True)
67 registrar = Instance('zmq.eventloop.zmqstream.ZMQStream')
73 registrar = Instance('zmq.eventloop.zmqstream.ZMQStream')
68 kernel = Instance(Kernel)
74 kernel = Instance(Kernel)
69
75
76 # States for the heartbeat monitoring
77 _hb_last_pinged = None
78 _hb_last_monitored = None
79 _hb_missed_beats = 0
80 # The zmq Stream which receives the pings from the Heart
81 _hb_listener = None
82
70 bident = CBytes()
83 bident = CBytes()
71 ident = Unicode()
84 ident = Unicode()
72 def _ident_changed(self, name, old, new):
85 def _ident_changed(self, name, old, new):
73 self.bident = cast_bytes(new)
86 self.bident = cast_bytes(new)
74 using_ssh=Bool(False)
87 using_ssh=Bool(False)
75
88
76
89
77 def __init__(self, **kwargs):
90 def __init__(self, **kwargs):
78 super(EngineFactory, self).__init__(**kwargs)
91 super(EngineFactory, self).__init__(**kwargs)
79 self.ident = self.session.session
92 self.ident = self.session.session
80
93
81 def init_connector(self):
94 def init_connector(self):
82 """construct connection function, which handles tunnels."""
95 """construct connection function, which handles tunnels."""
83 self.using_ssh = bool(self.sshkey or self.sshserver)
96 self.using_ssh = bool(self.sshkey or self.sshserver)
84
97
85 if self.sshkey and not self.sshserver:
98 if self.sshkey and not self.sshserver:
86 # We are using ssh directly to the controller, tunneling localhost to localhost
99 # We are using ssh directly to the controller, tunneling localhost to localhost
87 self.sshserver = self.url.split('://')[1].split(':')[0]
100 self.sshserver = self.url.split('://')[1].split(':')[0]
88
101
89 if self.using_ssh:
102 if self.using_ssh:
90 if tunnel.try_passwordless_ssh(self.sshserver, self.sshkey, self.paramiko):
103 if tunnel.try_passwordless_ssh(self.sshserver, self.sshkey, self.paramiko):
91 password=False
104 password=False
92 else:
105 else:
93 password = getpass("SSH Password for %s: "%self.sshserver)
106 password = getpass("SSH Password for %s: "%self.sshserver)
94 else:
107 else:
95 password = False
108 password = False
96
109
97 def connect(s, url):
110 def connect(s, url):
98 url = disambiguate_url(url, self.location)
111 url = disambiguate_url(url, self.location)
99 if self.using_ssh:
112 if self.using_ssh:
100 self.log.debug("Tunneling connection to %s via %s", url, self.sshserver)
113 self.log.debug("Tunneling connection to %s via %s", url, self.sshserver)
101 return tunnel.tunnel_connection(s, url, self.sshserver,
114 return tunnel.tunnel_connection(s, url, self.sshserver,
102 keyfile=self.sshkey, paramiko=self.paramiko,
115 keyfile=self.sshkey, paramiko=self.paramiko,
103 password=password,
116 password=password,
104 )
117 )
105 else:
118 else:
106 return s.connect(url)
119 return s.connect(url)
107
120
108 def maybe_tunnel(url):
121 def maybe_tunnel(url):
109 """like connect, but don't complete the connection (for use by heartbeat)"""
122 """like connect, but don't complete the connection (for use by heartbeat)"""
110 url = disambiguate_url(url, self.location)
123 url = disambiguate_url(url, self.location)
111 if self.using_ssh:
124 if self.using_ssh:
112 self.log.debug("Tunneling connection to %s via %s", url, self.sshserver)
125 self.log.debug("Tunneling connection to %s via %s", url, self.sshserver)
113 url,tunnelobj = tunnel.open_tunnel(url, self.sshserver,
126 url,tunnelobj = tunnel.open_tunnel(url, self.sshserver,
114 keyfile=self.sshkey, paramiko=self.paramiko,
127 keyfile=self.sshkey, paramiko=self.paramiko,
115 password=password,
128 password=password,
116 )
129 )
117 return str(url)
130 return str(url)
118 return connect, maybe_tunnel
131 return connect, maybe_tunnel
119
132
120 def register(self):
133 def register(self):
121 """send the registration_request"""
134 """send the registration_request"""
122
135
123 self.log.info("Registering with controller at %s"%self.url)
136 self.log.info("Registering with controller at %s"%self.url)
124 ctx = self.context
137 ctx = self.context
125 connect,maybe_tunnel = self.init_connector()
138 connect,maybe_tunnel = self.init_connector()
126 reg = ctx.socket(zmq.DEALER)
139 reg = ctx.socket(zmq.DEALER)
127 reg.setsockopt(zmq.IDENTITY, self.bident)
140 reg.setsockopt(zmq.IDENTITY, self.bident)
128 connect(reg, self.url)
141 connect(reg, self.url)
129 self.registrar = zmqstream.ZMQStream(reg, self.loop)
142 self.registrar = zmqstream.ZMQStream(reg, self.loop)
130
143
131
144
132 content = dict(uuid=self.ident)
145 content = dict(uuid=self.ident)
133 self.registrar.on_recv(lambda msg: self.complete_registration(msg, connect, maybe_tunnel))
146 self.registrar.on_recv(lambda msg: self.complete_registration(msg, connect, maybe_tunnel))
134 # print (self.session.key)
147 # print (self.session.key)
135 self.session.send(self.registrar, "registration_request", content=content)
148 self.session.send(self.registrar, "registration_request", content=content)
136
149
150 def _report_ping(self, msg):
151 """Callback for when the heartmonitor.Heart receives a ping"""
152 self.log.debug("Received a ping: %s", msg)
153 self._hb_last_pinged = time.time()
154
137 def complete_registration(self, msg, connect, maybe_tunnel):
155 def complete_registration(self, msg, connect, maybe_tunnel):
138 # print msg
156 # print msg
139 self._abort_dc.stop()
157 self._abort_dc.stop()
140 ctx = self.context
158 ctx = self.context
141 loop = self.loop
159 loop = self.loop
142 identity = self.bident
160 identity = self.bident
143 idents,msg = self.session.feed_identities(msg)
161 idents,msg = self.session.feed_identities(msg)
144 msg = self.session.unserialize(msg)
162 msg = self.session.unserialize(msg)
145 content = msg['content']
163 content = msg['content']
146 info = self.connection_info
164 info = self.connection_info
147
165
148 def url(key):
166 def url(key):
149 """get zmq url for given channel"""
167 """get zmq url for given channel"""
150 return str(info["interface"] + ":%i" % info[key])
168 return str(info["interface"] + ":%i" % info[key])
151
169
152 if content['status'] == 'ok':
170 if content['status'] == 'ok':
153 self.id = int(content['id'])
171 self.id = int(content['id'])
154
172
155 # launch heartbeat
173 # launch heartbeat
156 # possibly forward hb ports with tunnels
174 # possibly forward hb ports with tunnels
157 hb_ping = maybe_tunnel(url('hb_ping'))
175 hb_ping = maybe_tunnel(url('hb_ping'))
158 hb_pong = maybe_tunnel(url('hb_pong'))
176 hb_pong = maybe_tunnel(url('hb_pong'))
159
177
160 heart = Heart(hb_ping, hb_pong, heart_id=identity)
178 # Add a monitor socket which will record the last time a ping was seen
179 mon = self.context.socket(zmq.SUB)
180 mport = mon.bind_to_random_port('tcp://127.0.0.1')
181 mon.setsockopt(zmq.SUBSCRIBE, b"")
182 self._hb_listener = zmqstream.ZMQStream(mon, self.loop)
183 self._hb_listener.on_recv(self._report_ping)
184
185 hb_monitor = "tcp://127.0.0.1:%i"%mport
186
187 heart = Heart(hb_ping, hb_pong, hb_monitor , heart_id=identity)
161 heart.start()
188 heart.start()
162
189
163 # create Shell Connections (MUX, Task, etc.):
190 # create Shell Connections (MUX, Task, etc.):
164 shell_addrs = url('mux'), url('task')
191 shell_addrs = url('mux'), url('task')
165
192
166 # Use only one shell stream for mux and tasks
193 # Use only one shell stream for mux and tasks
167 stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
194 stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
168 stream.setsockopt(zmq.IDENTITY, identity)
195 stream.setsockopt(zmq.IDENTITY, identity)
169 shell_streams = [stream]
196 shell_streams = [stream]
170 for addr in shell_addrs:
197 for addr in shell_addrs:
171 connect(stream, addr)
198 connect(stream, addr)
172
199
173 # control stream:
200 # control stream:
174 control_addr = url('control')
201 control_addr = url('control')
175 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
202 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
176 control_stream.setsockopt(zmq.IDENTITY, identity)
203 control_stream.setsockopt(zmq.IDENTITY, identity)
177 connect(control_stream, control_addr)
204 connect(control_stream, control_addr)
178
205
179 # create iopub stream:
206 # create iopub stream:
180 iopub_addr = url('iopub')
207 iopub_addr = url('iopub')
181 iopub_socket = ctx.socket(zmq.PUB)
208 iopub_socket = ctx.socket(zmq.PUB)
182 iopub_socket.setsockopt(zmq.IDENTITY, identity)
209 iopub_socket.setsockopt(zmq.IDENTITY, identity)
183 connect(iopub_socket, iopub_addr)
210 connect(iopub_socket, iopub_addr)
184
211
185 # disable history:
212 # disable history:
186 self.config.HistoryManager.hist_file = ':memory:'
213 self.config.HistoryManager.hist_file = ':memory:'
187
214
188 # Redirect input streams and set a display hook.
215 # Redirect input streams and set a display hook.
189 if self.out_stream_factory:
216 if self.out_stream_factory:
190 sys.stdout = self.out_stream_factory(self.session, iopub_socket, u'stdout')
217 sys.stdout = self.out_stream_factory(self.session, iopub_socket, u'stdout')
191 sys.stdout.topic = cast_bytes('engine.%i.stdout' % self.id)
218 sys.stdout.topic = cast_bytes('engine.%i.stdout' % self.id)
192 sys.stderr = self.out_stream_factory(self.session, iopub_socket, u'stderr')
219 sys.stderr = self.out_stream_factory(self.session, iopub_socket, u'stderr')
193 sys.stderr.topic = cast_bytes('engine.%i.stderr' % self.id)
220 sys.stderr.topic = cast_bytes('engine.%i.stderr' % self.id)
194 if self.display_hook_factory:
221 if self.display_hook_factory:
195 sys.displayhook = self.display_hook_factory(self.session, iopub_socket)
222 sys.displayhook = self.display_hook_factory(self.session, iopub_socket)
196 sys.displayhook.topic = cast_bytes('engine.%i.pyout' % self.id)
223 sys.displayhook.topic = cast_bytes('engine.%i.pyout' % self.id)
197
224
198 self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
225 self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
199 control_stream=control_stream, shell_streams=shell_streams, iopub_socket=iopub_socket,
226 control_stream=control_stream, shell_streams=shell_streams, iopub_socket=iopub_socket,
200 loop=loop, user_ns=self.user_ns, log=self.log)
227 loop=loop, user_ns=self.user_ns, log=self.log)
201
228
202 self.kernel.shell.display_pub.topic = cast_bytes('engine.%i.displaypub' % self.id)
229 self.kernel.shell.display_pub.topic = cast_bytes('engine.%i.displaypub' % self.id)
203
230
204 # FIXME: This is a hack until IPKernelApp and IPEngineApp can be fully merged
231 # FIXME: This is a hack until IPKernelApp and IPEngineApp can be fully merged
205 app = IPKernelApp(config=self.config, shell=self.kernel.shell, kernel=self.kernel, log=self.log)
232 app = IPKernelApp(config=self.config, shell=self.kernel.shell, kernel=self.kernel, log=self.log)
206 app.init_profile_dir()
233 app.init_profile_dir()
207 app.init_code()
234 app.init_code()
208
235
209 self.kernel.start()
236 self.kernel.start()
210 else:
237 else:
211 self.log.fatal("Registration Failed: %s"%msg)
238 self.log.fatal("Registration Failed: %s"%msg)
212 raise Exception("Registration Failed: %s"%msg)
239 raise Exception("Registration Failed: %s"%msg)
213
240
214 self.log.info("Completed registration with id %i"%self.id)
241 self.log.info("Completed registration with id %i"%self.id)
215
242
216
243
217 def abort(self):
244 def abort(self):
218 self.log.fatal("Registration timed out after %.1f seconds"%self.timeout)
245 self.log.fatal("Registration timed out after %.1f seconds"%self.timeout)
219 if self.url.startswith('127.'):
246 if self.url.startswith('127.'):
220 self.log.fatal("""
247 self.log.fatal("""
221 If the controller and engines are not on the same machine,
248 If the controller and engines are not on the same machine,
222 you will have to instruct the controller to listen on an external IP (in ipcontroller_config.py):
249 you will have to instruct the controller to listen on an external IP (in ipcontroller_config.py):
223 c.HubFactory.ip='*' # for all interfaces, internal and external
250 c.HubFactory.ip='*' # for all interfaces, internal and external
224 c.HubFactory.ip='192.168.1.101' # or any interface that the engines can see
251 c.HubFactory.ip='192.168.1.101' # or any interface that the engines can see
225 or tunnel connections via ssh.
252 or tunnel connections via ssh.
226 """)
253 """)
227 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
254 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
228 time.sleep(1)
255 time.sleep(1)
229 sys.exit(255)
256 sys.exit(255)
230
257
258 def _hb_monitor(self):
259 """Callback to monitor the heartbeat from the controller"""
260 self._hb_listener.flush()
261 if self._hb_last_monitored > self._hb_last_pinged:
262 self._hb_missed_beats += 1
263 self.log.warn("No heartbeat in the last %s seconds.", self.hb_check_period)
264 else:
265 self._hb_missed_beats = 0
266
267 if self._hb_missed_beats >= self.hb_max_misses:
268 self.log.fatal("Maximum number of heartbeats misses reached (%s times %s seconds), shutting down.",
269 self.hb_max_misses, self.hb_check_period)
270 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
271 self.loop.stop()
272
273 self._hb_last_monitored = time.time()
274
275
231 def start(self):
276 def start(self):
232 dc = ioloop.DelayedCallback(self.register, 0, self.loop)
277 dc = ioloop.DelayedCallback(self.register, 0, self.loop)
233 dc.start()
278 dc.start()
234 self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop)
279 self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop)
235 self._abort_dc.start()
280 self._abort_dc.start()
281 # periodically check the heartbeat pings of the controller
282 self._hb_reporter = ioloop.PeriodicCallback(self._hb_monitor, self.hb_check_period* 1000, self.loop)
283 self._hb_reporter.start()
284
236
285
General Comments 0
You need to be logged in to leave comments. Login now