##// END OF EJS Templates
Enable heartbeat montoring on default...
Jan Schulz -
Show More
@@ -1,303 +1,303 b''
1 1 """A simple engine that talks to a controller over 0MQ.
2 2 it handles registration, etc. and launches a kernel
3 3 connected to the Controller's Schedulers.
4 4
5 5 Authors:
6 6
7 7 * Min RK
8 8 """
9 9 #-----------------------------------------------------------------------------
10 10 # Copyright (C) 2010-2011 The IPython Development Team
11 11 #
12 12 # Distributed under the terms of the BSD License. The full license is in
13 13 # the file COPYING, distributed as part of this software.
14 14 #-----------------------------------------------------------------------------
15 15
16 16 from __future__ import print_function
17 17
18 18 import sys
19 19 import time
20 20 from getpass import getpass
21 21
22 22 import zmq
23 23 from zmq.eventloop import ioloop, zmqstream
24 24
25 25 from IPython.external.ssh import tunnel
26 26 # internal
27 27 from IPython.utils.traitlets import (
28 28 Instance, Dict, Integer, Type, Float, Integer, Unicode, CBytes, Bool
29 29 )
30 30 from IPython.utils.py3compat import cast_bytes
31 31
32 32 from IPython.parallel.controller.heartmonitor import Heart
33 33 from IPython.parallel.factory import RegistrationFactory
34 34 from IPython.parallel.util import disambiguate_url
35 35
36 36 from IPython.zmq.session import Message
37 37 from IPython.zmq.ipkernel import Kernel, IPKernelApp
38 38
39 39 class EngineFactory(RegistrationFactory):
40 40 """IPython engine"""
41 41
42 42 # configurables:
43 43 out_stream_factory=Type('IPython.zmq.iostream.OutStream', config=True,
44 44 help="""The OutStream for handling stdout/err.
45 45 Typically 'IPython.zmq.iostream.OutStream'""")
46 46 display_hook_factory=Type('IPython.zmq.displayhook.ZMQDisplayHook', config=True,
47 47 help="""The class for handling displayhook.
48 48 Typically 'IPython.zmq.displayhook.ZMQDisplayHook'""")
49 49 location=Unicode(config=True,
50 50 help="""The location (an IP address) of the controller. This is
51 51 used for disambiguating URLs, to determine whether
52 52 loopback should be used to connect or the public address.""")
53 53 timeout=Float(5.0, config=True,
54 54 help="""The time (in seconds) to wait for the Controller to respond
55 55 to registration requests before giving up.""")
56 max_heartbeat_misses=Integer(0, config=True,
56 max_heartbeat_misses=Integer(50, config=True,
57 57 help="""The maximum number of times a check for the heartbeat ping of a
58 58 controller can be missed before shutting down the engine.
59 59
60 60 If set to 0, the check is disabled.""")
61 61 sshserver=Unicode(config=True,
62 62 help="""The SSH server to use for tunneling connections to the Controller.""")
63 63 sshkey=Unicode(config=True,
64 64 help="""The SSH private key file to use when tunneling connections to the Controller.""")
65 65 paramiko=Bool(sys.platform == 'win32', config=True,
66 66 help="""Whether to use paramiko instead of openssh for tunnels.""")
67 67
68 68
69 69 # not configurable:
70 70 connection_info = Dict()
71 71 user_ns = Dict()
72 72 id = Integer(allow_none=True)
73 73 registrar = Instance('zmq.eventloop.zmqstream.ZMQStream')
74 74 kernel = Instance(Kernel)
75 75 hb_check_period=Integer()
76 76
77 77 # States for the heartbeat monitoring
78 78 # Initial values for monitored and pinged must satisfy "monitored > pinged == False" so that
79 79 # during the first check no "missed" ping is reported. Must be floats for Python 3 compatibility.
80 80 _hb_last_pinged = 0.0
81 81 _hb_last_monitored = 0.0
82 82 _hb_missed_beats = 0
83 83 # The zmq Stream which receives the pings from the Heart
84 84 _hb_listener = None
85 85
86 86 bident = CBytes()
87 87 ident = Unicode()
88 88 def _ident_changed(self, name, old, new):
89 89 self.bident = cast_bytes(new)
90 90 using_ssh=Bool(False)
91 91
92 92
93 93 def __init__(self, **kwargs):
94 94 super(EngineFactory, self).__init__(**kwargs)
95 95 self.ident = self.session.session
96 96
97 97 def init_connector(self):
98 98 """construct connection function, which handles tunnels."""
99 99 self.using_ssh = bool(self.sshkey or self.sshserver)
100 100
101 101 if self.sshkey and not self.sshserver:
102 102 # We are using ssh directly to the controller, tunneling localhost to localhost
103 103 self.sshserver = self.url.split('://')[1].split(':')[0]
104 104
105 105 if self.using_ssh:
106 106 if tunnel.try_passwordless_ssh(self.sshserver, self.sshkey, self.paramiko):
107 107 password=False
108 108 else:
109 109 password = getpass("SSH Password for %s: "%self.sshserver)
110 110 else:
111 111 password = False
112 112
113 113 def connect(s, url):
114 114 url = disambiguate_url(url, self.location)
115 115 if self.using_ssh:
116 116 self.log.debug("Tunneling connection to %s via %s", url, self.sshserver)
117 117 return tunnel.tunnel_connection(s, url, self.sshserver,
118 118 keyfile=self.sshkey, paramiko=self.paramiko,
119 119 password=password,
120 120 )
121 121 else:
122 122 return s.connect(url)
123 123
124 124 def maybe_tunnel(url):
125 125 """like connect, but don't complete the connection (for use by heartbeat)"""
126 126 url = disambiguate_url(url, self.location)
127 127 if self.using_ssh:
128 128 self.log.debug("Tunneling connection to %s via %s", url, self.sshserver)
129 129 url,tunnelobj = tunnel.open_tunnel(url, self.sshserver,
130 130 keyfile=self.sshkey, paramiko=self.paramiko,
131 131 password=password,
132 132 )
133 133 return str(url)
134 134 return connect, maybe_tunnel
135 135
136 136 def register(self):
137 137 """send the registration_request"""
138 138
139 139 self.log.info("Registering with controller at %s"%self.url)
140 140 ctx = self.context
141 141 connect,maybe_tunnel = self.init_connector()
142 142 reg = ctx.socket(zmq.DEALER)
143 143 reg.setsockopt(zmq.IDENTITY, self.bident)
144 144 connect(reg, self.url)
145 145 self.registrar = zmqstream.ZMQStream(reg, self.loop)
146 146
147 147
148 148 content = dict(uuid=self.ident)
149 149 self.registrar.on_recv(lambda msg: self.complete_registration(msg, connect, maybe_tunnel))
150 150 # print (self.session.key)
151 151 self.session.send(self.registrar, "registration_request", content=content)
152 152
153 153 def _report_ping(self, msg):
154 154 """Callback for when the heartmonitor.Heart receives a ping"""
155 155 #self.log.debug("Received a ping: %s", msg)
156 156 self._hb_last_pinged = time.time()
157 157
158 158 def complete_registration(self, msg, connect, maybe_tunnel):
159 159 # print msg
160 160 self._abort_dc.stop()
161 161 ctx = self.context
162 162 loop = self.loop
163 163 identity = self.bident
164 164 idents,msg = self.session.feed_identities(msg)
165 165 msg = self.session.unserialize(msg)
166 166 content = msg['content']
167 167 info = self.connection_info
168 168
169 169 def url(key):
170 170 """get zmq url for given channel"""
171 171 return str(info["interface"] + ":%i" % info[key])
172 172
173 173 if content['status'] == 'ok':
174 174 self.id = int(content['id'])
175 175
176 176 # launch heartbeat
177 177 # possibly forward hb ports with tunnels
178 178 hb_ping = maybe_tunnel(url('hb_ping'))
179 179 hb_pong = maybe_tunnel(url('hb_pong'))
180 180
181 181 hb_monitor = None
182 182 if self.max_heartbeat_misses > 0:
183 183 # Add a monitor socket which will record the last time a ping was seen
184 184 mon = self.context.socket(zmq.SUB)
185 185 mport = mon.bind_to_random_port('tcp://127.0.0.1')
186 186 mon.setsockopt(zmq.SUBSCRIBE, b"")
187 187 self._hb_listener = zmqstream.ZMQStream(mon, self.loop)
188 188 self._hb_listener.on_recv(self._report_ping)
189 189
190 190
191 191 hb_monitor = "tcp://127.0.0.1:%i"%mport
192 192
193 193 heart = Heart(hb_ping, hb_pong, hb_monitor , heart_id=identity)
194 194 heart.start()
195 195
196 196 # create Shell Connections (MUX, Task, etc.):
197 197 shell_addrs = url('mux'), url('task')
198 198
199 199 # Use only one shell stream for mux and tasks
200 200 stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
201 201 stream.setsockopt(zmq.IDENTITY, identity)
202 202 shell_streams = [stream]
203 203 for addr in shell_addrs:
204 204 connect(stream, addr)
205 205
206 206 # control stream:
207 207 control_addr = url('control')
208 208 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
209 209 control_stream.setsockopt(zmq.IDENTITY, identity)
210 210 connect(control_stream, control_addr)
211 211
212 212 # create iopub stream:
213 213 iopub_addr = url('iopub')
214 214 iopub_socket = ctx.socket(zmq.PUB)
215 215 iopub_socket.setsockopt(zmq.IDENTITY, identity)
216 216 connect(iopub_socket, iopub_addr)
217 217
218 218 # disable history:
219 219 self.config.HistoryManager.hist_file = ':memory:'
220 220
221 221 # Redirect input streams and set a display hook.
222 222 if self.out_stream_factory:
223 223 sys.stdout = self.out_stream_factory(self.session, iopub_socket, u'stdout')
224 224 sys.stdout.topic = cast_bytes('engine.%i.stdout' % self.id)
225 225 sys.stderr = self.out_stream_factory(self.session, iopub_socket, u'stderr')
226 226 sys.stderr.topic = cast_bytes('engine.%i.stderr' % self.id)
227 227 if self.display_hook_factory:
228 228 sys.displayhook = self.display_hook_factory(self.session, iopub_socket)
229 229 sys.displayhook.topic = cast_bytes('engine.%i.pyout' % self.id)
230 230
231 231 self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
232 232 control_stream=control_stream, shell_streams=shell_streams, iopub_socket=iopub_socket,
233 233 loop=loop, user_ns=self.user_ns, log=self.log)
234 234
235 235 self.kernel.shell.display_pub.topic = cast_bytes('engine.%i.displaypub' % self.id)
236 236
237 237
238 238 # periodically check the heartbeat pings of the controller
239 239 # Should be started here and not in "start()" so that the right period can be taken
240 240 # from the hubs HeartBeatMonitor.period
241 241 if self.max_heartbeat_misses > 0:
242 242 # Use a slightly bigger check period than the hub signal period to not warn unnecessary
243 243 self.hb_check_period = int(content['hb_period'])+10
244 244 self.log.info("Starting to monitor the heartbeat signal from the hub every %i ms." , self.hb_check_period)
245 245 self._hb_reporter = ioloop.PeriodicCallback(self._hb_monitor, self.hb_check_period, self.loop)
246 246 self._hb_reporter.start()
247 247 else:
248 248 self.log.info("Monitoring of the heartbeat signal from the hub is not enabled.")
249 249
250 250
251 251 # FIXME: This is a hack until IPKernelApp and IPEngineApp can be fully merged
252 252 app = IPKernelApp(config=self.config, shell=self.kernel.shell, kernel=self.kernel, log=self.log)
253 253 app.init_profile_dir()
254 254 app.init_code()
255 255
256 256 self.kernel.start()
257 257 else:
258 258 self.log.fatal("Registration Failed: %s"%msg)
259 259 raise Exception("Registration Failed: %s"%msg)
260 260
261 261 self.log.info("Completed registration with id %i"%self.id)
262 262
263 263
264 264 def abort(self):
265 265 self.log.fatal("Registration timed out after %.1f seconds"%self.timeout)
266 266 if self.url.startswith('127.'):
267 267 self.log.fatal("""
268 268 If the controller and engines are not on the same machine,
269 269 you will have to instruct the controller to listen on an external IP (in ipcontroller_config.py):
270 270 c.HubFactory.ip='*' # for all interfaces, internal and external
271 271 c.HubFactory.ip='192.168.1.101' # or any interface that the engines can see
272 272 or tunnel connections via ssh.
273 273 """)
274 274 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
275 275 time.sleep(1)
276 276 sys.exit(255)
277 277
278 278 def _hb_monitor(self):
279 279 """Callback to monitor the heartbeat from the controller"""
280 280 self._hb_listener.flush()
281 281 if self._hb_last_monitored > self._hb_last_pinged:
282 282 self._hb_missed_beats += 1
283 283 self.log.warn("No heartbeat in the last %s ms (%s time(s) in a row).", self.hb_check_period, self._hb_missed_beats)
284 284 else:
285 285 #self.log.debug("Heartbeat received (after missing %s beats).", self._hb_missed_beats)
286 286 self._hb_missed_beats = 0
287 287
288 288 if self._hb_missed_beats >= self.max_heartbeat_misses:
289 289 self.log.fatal("Maximum number of heartbeats misses reached (%s times %s ms), shutting down.",
290 290 self.max_heartbeat_misses, self.hb_check_period)
291 291 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
292 292 self.loop.stop()
293 293
294 294 self._hb_last_monitored = time.time()
295 295
296 296
297 297 def start(self):
298 298 dc = ioloop.DelayedCallback(self.register, 0, self.loop)
299 299 dc.start()
300 300 self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop)
301 301 self._abort_dc.start()
302 302
303 303
General Comments 0
You need to be logged in to leave comments. Login now