##// END OF EJS Templates
Rework the heartbeat checking (configureable, period from hub)...
Jan Schulz -
Show More
@@ -1,289 +1,302 b''
1 1 """A simple engine that talks to a controller over 0MQ.
2 2 it handles registration, etc. and launches a kernel
3 3 connected to the Controller's Schedulers.
4 4
5 5 Authors:
6 6
7 7 * Min RK
8 8 """
9 9 #-----------------------------------------------------------------------------
10 10 # Copyright (C) 2010-2011 The IPython Development Team
11 11 #
12 12 # Distributed under the terms of the BSD License. The full license is in
13 13 # the file COPYING, distributed as part of this software.
14 14 #-----------------------------------------------------------------------------
15 15
16 16 from __future__ import print_function
17 17
18 18 import sys
19 19 import time
20 20 from getpass import getpass
21 21
22 22 import zmq
23 23 from zmq.eventloop import ioloop, zmqstream
24 24
25 25 from IPython.external.ssh import tunnel
26 26 # internal
27 27 from IPython.utils.traitlets import (
28 28 Instance, Dict, Integer, Type, Float, Integer, Unicode, CBytes, Bool
29 29 )
30 30 from IPython.utils.py3compat import cast_bytes
31 31
32 32 from IPython.parallel.controller.heartmonitor import Heart
33 33 from IPython.parallel.factory import RegistrationFactory
34 34 from IPython.parallel.util import disambiguate_url
35 35
36 36 from IPython.zmq.session import Message
37 37 from IPython.zmq.ipkernel import Kernel, IPKernelApp
38 38
39 39 class EngineFactory(RegistrationFactory):
40 40 """IPython engine"""
41 41
42 42 # configurables:
43 43 out_stream_factory=Type('IPython.zmq.iostream.OutStream', config=True,
44 44 help="""The OutStream for handling stdout/err.
45 45 Typically 'IPython.zmq.iostream.OutStream'""")
46 46 display_hook_factory=Type('IPython.zmq.displayhook.ZMQDisplayHook', config=True,
47 47 help="""The class for handling displayhook.
48 48 Typically 'IPython.zmq.displayhook.ZMQDisplayHook'""")
49 49 location=Unicode(config=True,
50 50 help="""The location (an IP address) of the controller. This is
51 51 used for disambiguating URLs, to determine whether
52 52 loopback should be used to connect or the public address.""")
53 53 timeout=Float(5.0, config=True,
54 54 help="""The time (in seconds) to wait for the Controller to respond
55 55 to registration requests before giving up.""")
56 hb_check_period=Integer(5000, config=True,
57 help="""The time (in ms) to check for a heartbeat ping from the
58 Controller. Ensure that check period is bigger than the heartbeat period
59 from the controller (set via "HeartMonitor.period" in the Controller config)
60 so that at least one ping is received during each check period.""")
61 hb_max_misses=Integer(5, config=True,
56 max_heartbeat_misses=Integer(0, config=True,
62 57 help="""The maximum number of times a check for the heartbeat ping of a
63 controller can be missed before shutting down the engine.""")
58 controller can be missed before shutting down the engine.
59
60 If set to 0, the check is disabled.""")
64 61 sshserver=Unicode(config=True,
65 62 help="""The SSH server to use for tunneling connections to the Controller.""")
66 63 sshkey=Unicode(config=True,
67 64 help="""The SSH private key file to use when tunneling connections to the Controller.""")
68 65 paramiko=Bool(sys.platform == 'win32', config=True,
69 66 help="""Whether to use paramiko instead of openssh for tunnels.""")
70 67
68
71 69 # not configurable:
72 70 connection_info = Dict()
73 71 user_ns = Dict()
74 72 id = Integer(allow_none=True)
75 73 registrar = Instance('zmq.eventloop.zmqstream.ZMQStream')
76 74 kernel = Instance(Kernel)
75 hb_check_period=Integer()
77 76
78 77 # States for the heartbeat monitoring
79 78 # Initial values for monitored and pinged must satisfy "monitored > pinged == False" so that
80 79 # during the first check no "missed" ping is reported. Must be floats for Python 3 compatibility.
81 80 _hb_last_pinged = 0.0
82 81 _hb_last_monitored = 0.0
83 82 _hb_missed_beats = 0
84 83 # The zmq Stream which receives the pings from the Heart
85 84 _hb_listener = None
86 85
87 86 bident = CBytes()
88 87 ident = Unicode()
89 88 def _ident_changed(self, name, old, new):
90 89 self.bident = cast_bytes(new)
91 90 using_ssh=Bool(False)
92 91
93 92
94 93 def __init__(self, **kwargs):
95 94 super(EngineFactory, self).__init__(**kwargs)
96 95 self.ident = self.session.session
97 96
98 97 def init_connector(self):
99 98 """construct connection function, which handles tunnels."""
100 99 self.using_ssh = bool(self.sshkey or self.sshserver)
101 100
102 101 if self.sshkey and not self.sshserver:
103 102 # We are using ssh directly to the controller, tunneling localhost to localhost
104 103 self.sshserver = self.url.split('://')[1].split(':')[0]
105 104
106 105 if self.using_ssh:
107 106 if tunnel.try_passwordless_ssh(self.sshserver, self.sshkey, self.paramiko):
108 107 password=False
109 108 else:
110 109 password = getpass("SSH Password for %s: "%self.sshserver)
111 110 else:
112 111 password = False
113 112
114 113 def connect(s, url):
115 114 url = disambiguate_url(url, self.location)
116 115 if self.using_ssh:
117 116 self.log.debug("Tunneling connection to %s via %s", url, self.sshserver)
118 117 return tunnel.tunnel_connection(s, url, self.sshserver,
119 118 keyfile=self.sshkey, paramiko=self.paramiko,
120 119 password=password,
121 120 )
122 121 else:
123 122 return s.connect(url)
124 123
125 124 def maybe_tunnel(url):
126 125 """like connect, but don't complete the connection (for use by heartbeat)"""
127 126 url = disambiguate_url(url, self.location)
128 127 if self.using_ssh:
129 128 self.log.debug("Tunneling connection to %s via %s", url, self.sshserver)
130 129 url,tunnelobj = tunnel.open_tunnel(url, self.sshserver,
131 130 keyfile=self.sshkey, paramiko=self.paramiko,
132 131 password=password,
133 132 )
134 133 return str(url)
135 134 return connect, maybe_tunnel
136 135
137 136 def register(self):
138 137 """send the registration_request"""
139 138
140 139 self.log.info("Registering with controller at %s"%self.url)
141 140 ctx = self.context
142 141 connect,maybe_tunnel = self.init_connector()
143 142 reg = ctx.socket(zmq.DEALER)
144 143 reg.setsockopt(zmq.IDENTITY, self.bident)
145 144 connect(reg, self.url)
146 145 self.registrar = zmqstream.ZMQStream(reg, self.loop)
147 146
148 147
149 148 content = dict(uuid=self.ident)
150 149 self.registrar.on_recv(lambda msg: self.complete_registration(msg, connect, maybe_tunnel))
151 150 # print (self.session.key)
152 151 self.session.send(self.registrar, "registration_request", content=content)
153 152
154 153 def _report_ping(self, msg):
155 154 """Callback for when the heartmonitor.Heart receives a ping"""
156 self.log.debug("Received a ping: %s", msg)
155 #self.log.debug("Received a ping: %s", msg)
157 156 self._hb_last_pinged = time.time()
158 157
159 158 def complete_registration(self, msg, connect, maybe_tunnel):
160 159 # print msg
161 160 self._abort_dc.stop()
162 161 ctx = self.context
163 162 loop = self.loop
164 163 identity = self.bident
165 164 idents,msg = self.session.feed_identities(msg)
166 165 msg = self.session.unserialize(msg)
167 166 content = msg['content']
168 167 info = self.connection_info
169 168
170 169 def url(key):
171 170 """get zmq url for given channel"""
172 171 return str(info["interface"] + ":%i" % info[key])
173 172
174 173 if content['status'] == 'ok':
175 174 self.id = int(content['id'])
176 175
177 176 # launch heartbeat
178 177 # possibly forward hb ports with tunnels
179 178 hb_ping = maybe_tunnel(url('hb_ping'))
180 179 hb_pong = maybe_tunnel(url('hb_pong'))
181 180
182 181 # Add a monitor socket which will record the last time a ping was seen
183 182 mon = self.context.socket(zmq.SUB)
184 183 mport = mon.bind_to_random_port('tcp://127.0.0.1')
185 184 mon.setsockopt(zmq.SUBSCRIBE, b"")
186 185 self._hb_listener = zmqstream.ZMQStream(mon, self.loop)
187 186 self._hb_listener.on_recv(self._report_ping)
188 187
189 hb_monitor = "tcp://127.0.0.1:%i"%mport
188 hb_monitor = None
189 if self.max_heartbeat_misses > 0:
190 hb_monitor = "tcp://127.0.0.1:%i"%mport
190 191
191 192 heart = Heart(hb_ping, hb_pong, hb_monitor , heart_id=identity)
192 193 heart.start()
193 194
194 195 # create Shell Connections (MUX, Task, etc.):
195 196 shell_addrs = url('mux'), url('task')
196 197
197 198 # Use only one shell stream for mux and tasks
198 199 stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
199 200 stream.setsockopt(zmq.IDENTITY, identity)
200 201 shell_streams = [stream]
201 202 for addr in shell_addrs:
202 203 connect(stream, addr)
203 204
204 205 # control stream:
205 206 control_addr = url('control')
206 207 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
207 208 control_stream.setsockopt(zmq.IDENTITY, identity)
208 209 connect(control_stream, control_addr)
209 210
210 211 # create iopub stream:
211 212 iopub_addr = url('iopub')
212 213 iopub_socket = ctx.socket(zmq.PUB)
213 214 iopub_socket.setsockopt(zmq.IDENTITY, identity)
214 215 connect(iopub_socket, iopub_addr)
215 216
216 217 # disable history:
217 218 self.config.HistoryManager.hist_file = ':memory:'
218 219
219 220 # Redirect input streams and set a display hook.
220 221 if self.out_stream_factory:
221 222 sys.stdout = self.out_stream_factory(self.session, iopub_socket, u'stdout')
222 223 sys.stdout.topic = cast_bytes('engine.%i.stdout' % self.id)
223 224 sys.stderr = self.out_stream_factory(self.session, iopub_socket, u'stderr')
224 225 sys.stderr.topic = cast_bytes('engine.%i.stderr' % self.id)
225 226 if self.display_hook_factory:
226 227 sys.displayhook = self.display_hook_factory(self.session, iopub_socket)
227 228 sys.displayhook.topic = cast_bytes('engine.%i.pyout' % self.id)
228 229
229 230 self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
230 231 control_stream=control_stream, shell_streams=shell_streams, iopub_socket=iopub_socket,
231 232 loop=loop, user_ns=self.user_ns, log=self.log)
232 233
233 234 self.kernel.shell.display_pub.topic = cast_bytes('engine.%i.displaypub' % self.id)
234 235
236
237 # periodically check the heartbeat pings of the controller
238 # Should be started here and not in "start()" so that the right period can be taken
239 # from the hubs HeartBeatMonitor.period
240 if self.max_heartbeat_misses > 0:
241 # Use a slightly bigger check period than the hub signal period to not warn unnecessary
242 self.hb_check_period = int(content['hb_period'])+10
243 self.log.info("Starting to monitor the heartbeat signal from the hub every %i ms." , self.hb_check_period)
244 self._hb_reporter = ioloop.PeriodicCallback(self._hb_monitor, self.hb_check_period, self.loop)
245 self._hb_reporter.start()
246 else:
247 self.log.info("Monitoring of the heartbeat signal from the hub is not enabled.")
248
249
235 250 # FIXME: This is a hack until IPKernelApp and IPEngineApp can be fully merged
236 251 app = IPKernelApp(config=self.config, shell=self.kernel.shell, kernel=self.kernel, log=self.log)
237 252 app.init_profile_dir()
238 253 app.init_code()
239 254
240 255 self.kernel.start()
241 256 else:
242 257 self.log.fatal("Registration Failed: %s"%msg)
243 258 raise Exception("Registration Failed: %s"%msg)
244 259
245 260 self.log.info("Completed registration with id %i"%self.id)
246 261
247 262
248 263 def abort(self):
249 264 self.log.fatal("Registration timed out after %.1f seconds"%self.timeout)
250 265 if self.url.startswith('127.'):
251 266 self.log.fatal("""
252 267 If the controller and engines are not on the same machine,
253 268 you will have to instruct the controller to listen on an external IP (in ipcontroller_config.py):
254 269 c.HubFactory.ip='*' # for all interfaces, internal and external
255 270 c.HubFactory.ip='192.168.1.101' # or any interface that the engines can see
256 271 or tunnel connections via ssh.
257 272 """)
258 273 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
259 274 time.sleep(1)
260 275 sys.exit(255)
261 276
262 277 def _hb_monitor(self):
263 278 """Callback to monitor the heartbeat from the controller"""
264 279 self._hb_listener.flush()
265 280 if self._hb_last_monitored > self._hb_last_pinged:
266 281 self._hb_missed_beats += 1
267 282 self.log.warn("No heartbeat in the last %s ms (%s time(s) in a row).", self.hb_check_period, self._hb_missed_beats)
268 283 else:
284 #self.log.debug("Heartbeat received (after missing %s beats).", self._hb_missed_beats)
269 285 self._hb_missed_beats = 0
270 286
271 if self._hb_missed_beats >= self.hb_max_misses:
287 if self._hb_missed_beats >= self.max_heartbeat_misses:
272 288 self.log.fatal("Maximum number of heartbeats misses reached (%s times %s ms), shutting down.",
273 self.hb_max_misses, self.hb_check_period)
289 self.max_heartbeat_misses, self.hb_check_period)
274 290 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
275 291 self.loop.stop()
276 292
277 293 self._hb_last_monitored = time.time()
278 294
279 295
280 296 def start(self):
281 297 dc = ioloop.DelayedCallback(self.register, 0, self.loop)
282 298 dc.start()
283 299 self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop)
284 300 self._abort_dc.start()
285 # periodically check the heartbeat pings of the controller
286 self._hb_reporter = ioloop.PeriodicCallback(self._hb_monitor, self.hb_check_period, self.loop)
287 self._hb_reporter.start()
288 301
289 302
General Comments 0
You need to be logged in to leave comments. Login now