##// END OF EJS Templates
Monitor the heartbeat of the cluster controller...
Jan Schulz -
Show More
@@ -1,236 +1,285 b''
1 1 """A simple engine that talks to a controller over 0MQ.
2 2 it handles registration, etc. and launches a kernel
3 3 connected to the Controller's Schedulers.
4 4
5 5 Authors:
6 6
7 7 * Min RK
8 8 """
9 9 #-----------------------------------------------------------------------------
10 10 # Copyright (C) 2010-2011 The IPython Development Team
11 11 #
12 12 # Distributed under the terms of the BSD License. The full license is in
13 13 # the file COPYING, distributed as part of this software.
14 14 #-----------------------------------------------------------------------------
15 15
16 16 from __future__ import print_function
17 17
18 18 import sys
19 19 import time
20 20 from getpass import getpass
21 21
22 22 import zmq
23 23 from zmq.eventloop import ioloop, zmqstream
24 24
25 25 from IPython.external.ssh import tunnel
26 26 # internal
27 27 from IPython.utils.traitlets import (
28 Instance, Dict, Integer, Type, CFloat, Unicode, CBytes, Bool
28 Instance, Dict, Integer, Type, CFloat, CInt, Unicode, CBytes, Bool
29 29 )
30 30 from IPython.utils.py3compat import cast_bytes
31 31
32 32 from IPython.parallel.controller.heartmonitor import Heart
33 33 from IPython.parallel.factory import RegistrationFactory
34 34 from IPython.parallel.util import disambiguate_url
35 35
36 36 from IPython.zmq.session import Message
37 37 from IPython.zmq.ipkernel import Kernel, IPKernelApp
38 38
39 39 class EngineFactory(RegistrationFactory):
40 40 """IPython engine"""
41 41
42 42 # configurables:
43 43 out_stream_factory=Type('IPython.zmq.iostream.OutStream', config=True,
44 44 help="""The OutStream for handling stdout/err.
45 45 Typically 'IPython.zmq.iostream.OutStream'""")
46 46 display_hook_factory=Type('IPython.zmq.displayhook.ZMQDisplayHook', config=True,
47 47 help="""The class for handling displayhook.
48 48 Typically 'IPython.zmq.displayhook.ZMQDisplayHook'""")
49 49 location=Unicode(config=True,
50 50 help="""The location (an IP address) of the controller. This is
51 51 used for disambiguating URLs, to determine whether
52 52 loopback should be used to connect or the public address.""")
53 53 timeout=CFloat(5, config=True,
54 54 help="""The time (in seconds) to wait for the Controller to respond
55 55 to registration requests before giving up.""")
56 hb_check_period=CFloat(5, config=True,
57 help="""The time (in seconds) to check for a heartbeat ping from the
58 Controller.""")
59 hb_max_misses=CInt(5, config=True,
60 help="""The maximum number of times a check for the heartbeat ping of a
61 controller can be missed before shutting down the engine.""")
56 62 sshserver=Unicode(config=True,
57 63 help="""The SSH server to use for tunneling connections to the Controller.""")
58 64 sshkey=Unicode(config=True,
59 65 help="""The SSH private key file to use when tunneling connections to the Controller.""")
60 66 paramiko=Bool(sys.platform == 'win32', config=True,
61 67 help="""Whether to use paramiko instead of openssh for tunnels.""")
62 68
63 69 # not configurable:
64 70 connection_info = Dict()
65 71 user_ns = Dict()
66 72 id = Integer(allow_none=True)
67 73 registrar = Instance('zmq.eventloop.zmqstream.ZMQStream')
68 74 kernel = Instance(Kernel)
75
76 # States for the heartbeat monitoring
77 _hb_last_pinged = None
78 _hb_last_monitored = None
79 _hb_missed_beats = 0
80 # The zmq Stream which receives the pings from the Heart
81 _hb_listener = None
69 82
70 83 bident = CBytes()
71 84 ident = Unicode()
72 85 def _ident_changed(self, name, old, new):
73 86 self.bident = cast_bytes(new)
74 87 using_ssh=Bool(False)
75 88
76 89
77 90 def __init__(self, **kwargs):
78 91 super(EngineFactory, self).__init__(**kwargs)
79 92 self.ident = self.session.session
80 93
81 94 def init_connector(self):
82 95 """construct connection function, which handles tunnels."""
83 96 self.using_ssh = bool(self.sshkey or self.sshserver)
84 97
85 98 if self.sshkey and not self.sshserver:
86 99 # We are using ssh directly to the controller, tunneling localhost to localhost
87 100 self.sshserver = self.url.split('://')[1].split(':')[0]
88 101
89 102 if self.using_ssh:
90 103 if tunnel.try_passwordless_ssh(self.sshserver, self.sshkey, self.paramiko):
91 104 password=False
92 105 else:
93 106 password = getpass("SSH Password for %s: "%self.sshserver)
94 107 else:
95 108 password = False
96 109
97 110 def connect(s, url):
98 111 url = disambiguate_url(url, self.location)
99 112 if self.using_ssh:
100 113 self.log.debug("Tunneling connection to %s via %s", url, self.sshserver)
101 114 return tunnel.tunnel_connection(s, url, self.sshserver,
102 115 keyfile=self.sshkey, paramiko=self.paramiko,
103 116 password=password,
104 117 )
105 118 else:
106 119 return s.connect(url)
107 120
108 121 def maybe_tunnel(url):
109 122 """like connect, but don't complete the connection (for use by heartbeat)"""
110 123 url = disambiguate_url(url, self.location)
111 124 if self.using_ssh:
112 125 self.log.debug("Tunneling connection to %s via %s", url, self.sshserver)
113 126 url,tunnelobj = tunnel.open_tunnel(url, self.sshserver,
114 127 keyfile=self.sshkey, paramiko=self.paramiko,
115 128 password=password,
116 129 )
117 130 return str(url)
118 131 return connect, maybe_tunnel
119 132
120 133 def register(self):
121 134 """send the registration_request"""
122 135
123 136 self.log.info("Registering with controller at %s"%self.url)
124 137 ctx = self.context
125 138 connect,maybe_tunnel = self.init_connector()
126 139 reg = ctx.socket(zmq.DEALER)
127 140 reg.setsockopt(zmq.IDENTITY, self.bident)
128 141 connect(reg, self.url)
129 142 self.registrar = zmqstream.ZMQStream(reg, self.loop)
130 143
131 144
132 145 content = dict(uuid=self.ident)
133 146 self.registrar.on_recv(lambda msg: self.complete_registration(msg, connect, maybe_tunnel))
134 147 # print (self.session.key)
135 148 self.session.send(self.registrar, "registration_request", content=content)
136 149
150 def _report_ping(self, msg):
151 """Callback for when the heartmonitor.Heart receives a ping"""
152 self.log.debug("Received a ping: %s", msg)
153 self._hb_last_pinged = time.time()
154
137 155 def complete_registration(self, msg, connect, maybe_tunnel):
138 156 # print msg
139 157 self._abort_dc.stop()
140 158 ctx = self.context
141 159 loop = self.loop
142 160 identity = self.bident
143 161 idents,msg = self.session.feed_identities(msg)
144 162 msg = self.session.unserialize(msg)
145 163 content = msg['content']
146 164 info = self.connection_info
147 165
148 166 def url(key):
149 167 """get zmq url for given channel"""
150 168 return str(info["interface"] + ":%i" % info[key])
151 169
152 170 if content['status'] == 'ok':
153 171 self.id = int(content['id'])
154 172
155 173 # launch heartbeat
156 174 # possibly forward hb ports with tunnels
157 175 hb_ping = maybe_tunnel(url('hb_ping'))
158 176 hb_pong = maybe_tunnel(url('hb_pong'))
177
178 # Add a monitor socket which will record the last time a ping was seen
179 mon = self.context.socket(zmq.SUB)
180 mport = mon.bind_to_random_port('tcp://127.0.0.1')
181 mon.setsockopt(zmq.SUBSCRIBE, b"")
182 self._hb_listener = zmqstream.ZMQStream(mon, self.loop)
183 self._hb_listener.on_recv(self._report_ping)
184
185 hb_monitor = "tcp://127.0.0.1:%i"%mport
159 186
160 heart = Heart(hb_ping, hb_pong, heart_id=identity)
187 heart = Heart(hb_ping, hb_pong, hb_monitor , heart_id=identity)
161 188 heart.start()
162 189
163 190 # create Shell Connections (MUX, Task, etc.):
164 191 shell_addrs = url('mux'), url('task')
165 192
166 193 # Use only one shell stream for mux and tasks
167 194 stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
168 195 stream.setsockopt(zmq.IDENTITY, identity)
169 196 shell_streams = [stream]
170 197 for addr in shell_addrs:
171 198 connect(stream, addr)
172 199
173 200 # control stream:
174 201 control_addr = url('control')
175 202 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
176 203 control_stream.setsockopt(zmq.IDENTITY, identity)
177 204 connect(control_stream, control_addr)
178 205
179 206 # create iopub stream:
180 207 iopub_addr = url('iopub')
181 208 iopub_socket = ctx.socket(zmq.PUB)
182 209 iopub_socket.setsockopt(zmq.IDENTITY, identity)
183 210 connect(iopub_socket, iopub_addr)
184 211
185 212 # disable history:
186 213 self.config.HistoryManager.hist_file = ':memory:'
187 214
188 215 # Redirect input streams and set a display hook.
189 216 if self.out_stream_factory:
190 217 sys.stdout = self.out_stream_factory(self.session, iopub_socket, u'stdout')
191 218 sys.stdout.topic = cast_bytes('engine.%i.stdout' % self.id)
192 219 sys.stderr = self.out_stream_factory(self.session, iopub_socket, u'stderr')
193 220 sys.stderr.topic = cast_bytes('engine.%i.stderr' % self.id)
194 221 if self.display_hook_factory:
195 222 sys.displayhook = self.display_hook_factory(self.session, iopub_socket)
196 223 sys.displayhook.topic = cast_bytes('engine.%i.pyout' % self.id)
197 224
198 225 self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
199 226 control_stream=control_stream, shell_streams=shell_streams, iopub_socket=iopub_socket,
200 227 loop=loop, user_ns=self.user_ns, log=self.log)
201 228
202 229 self.kernel.shell.display_pub.topic = cast_bytes('engine.%i.displaypub' % self.id)
203 230
204 231 # FIXME: This is a hack until IPKernelApp and IPEngineApp can be fully merged
205 232 app = IPKernelApp(config=self.config, shell=self.kernel.shell, kernel=self.kernel, log=self.log)
206 233 app.init_profile_dir()
207 234 app.init_code()
208 235
209 236 self.kernel.start()
210 237 else:
211 238 self.log.fatal("Registration Failed: %s"%msg)
212 239 raise Exception("Registration Failed: %s"%msg)
213 240
214 241 self.log.info("Completed registration with id %i"%self.id)
215 242
216 243
217 244 def abort(self):
218 245 self.log.fatal("Registration timed out after %.1f seconds"%self.timeout)
219 246 if self.url.startswith('127.'):
220 247 self.log.fatal("""
221 248 If the controller and engines are not on the same machine,
222 249 you will have to instruct the controller to listen on an external IP (in ipcontroller_config.py):
223 250 c.HubFactory.ip='*' # for all interfaces, internal and external
224 251 c.HubFactory.ip='192.168.1.101' # or any interface that the engines can see
225 252 or tunnel connections via ssh.
226 253 """)
227 254 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
228 255 time.sleep(1)
229 256 sys.exit(255)
230 257
258 def _hb_monitor(self):
259 """Callback to monitor the heartbeat from the controller"""
260 self._hb_listener.flush()
261 if self._hb_last_monitored > self._hb_last_pinged:
262 self._hb_missed_beats += 1
263 self.log.warn("No heartbeat in the last %s seconds.", self.hb_check_period)
264 else:
265 self._hb_missed_beats = 0
266
267 if self._hb_missed_beats >= self.hb_max_misses:
268 self.log.fatal("Maximum number of heartbeats misses reached (%s times %s seconds), shutting down.",
269 self.hb_max_misses, self.hb_check_period)
270 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
271 self.loop.stop()
272
273 self._hb_last_monitored = time.time()
274
275
231 276 def start(self):
232 277 dc = ioloop.DelayedCallback(self.register, 0, self.loop)
233 278 dc.start()
234 279 self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop)
235 280 self._abort_dc.start()
281 # periodically check the heartbeat pings of the controller
282 self._hb_reporter = ioloop.PeriodicCallback(self._hb_monitor, self.hb_check_period* 1000, self.loop)
283 self._hb_reporter.start()
284
236 285
General Comments 0
You need to be logged in to leave comments. Login now