##// END OF EJS Templates
minor controller logging adjustments...
MinRK -
Show More
@@ -1,174 +1,174
1 1 #!/usr/bin/env python
2 2 """
3 3 A multi-heart Heartbeat system using PUB and XREP sockets. pings are sent out on the PUB,
4 4 and hearts are tracked based on their XREQ identities.
5 5
6 6 Authors:
7 7
8 8 * Min RK
9 9 """
10 10 #-----------------------------------------------------------------------------
11 11 # Copyright (C) 2010-2011 The IPython Development Team
12 12 #
13 13 # Distributed under the terms of the BSD License. The full license is in
14 14 # the file COPYING, distributed as part of this software.
15 15 #-----------------------------------------------------------------------------
16 16
17 17 from __future__ import print_function
18 18 import time
19 19 import uuid
20 20
21 21 import zmq
22 22 from zmq.devices import ThreadDevice
23 23 from zmq.eventloop import ioloop, zmqstream
24 24
25 25 from IPython.config.configurable import LoggingConfigurable
26 26 from IPython.utils.traitlets import Set, Instance, CFloat
27 27
28 28 from IPython.parallel.util import asbytes
29 29
30 30 class Heart(object):
31 31 """A basic heart object for responding to a HeartMonitor.
32 32 This is a simple wrapper with defaults for the most common
33 33 Device model for responding to heartbeats.
34 34
35 35 It simply builds a threadsafe zmq.FORWARDER Device, defaulting to using
36 36 SUB/XREQ for in/out.
37 37
38 38 You can specify the XREQ's IDENTITY via the optional heart_id argument."""
39 39 device=None
40 40 id=None
41 41 def __init__(self, in_addr, out_addr, in_type=zmq.SUB, out_type=zmq.DEALER, heart_id=None):
42 42 self.device = ThreadDevice(zmq.FORWARDER, in_type, out_type)
43 43 self.device.daemon=True
44 44 self.device.connect_in(in_addr)
45 45 self.device.connect_out(out_addr)
46 46 if in_type == zmq.SUB:
47 47 self.device.setsockopt_in(zmq.SUBSCRIBE, b"")
48 48 if heart_id is None:
49 49 heart_id = uuid.uuid4().bytes
50 50 self.device.setsockopt_out(zmq.IDENTITY, heart_id)
51 51 self.id = heart_id
52 52
53 53 def start(self):
54 54 return self.device.start()
55 55
56
56 57 class HeartMonitor(LoggingConfigurable):
57 58 """A basic HeartMonitor class
58 59 pingstream: a PUB stream
59 60 pongstream: an XREP stream
60 61 period: the period of the heartbeat in milliseconds"""
61 62
62 63 period=CFloat(1000, config=True,
63 64 help='The frequency at which the Hub pings the engines for heartbeats '
64 65 '(in ms)',
65 66 )
66 67
67 68 pingstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
68 69 pongstream=Instance('zmq.eventloop.zmqstream.ZMQStream')
69 70 loop = Instance('zmq.eventloop.ioloop.IOLoop')
70 71 def _loop_default(self):
71 72 return ioloop.IOLoop.instance()
72 73
73 74 # not settable:
74 75 hearts=Set()
75 76 responses=Set()
76 77 on_probation=Set()
77 78 last_ping=CFloat(0)
78 79 _new_handlers = Set()
79 80 _failure_handlers = Set()
80 81 lifetime = CFloat(0)
81 82 tic = CFloat(0)
82 83
83 84 def __init__(self, **kwargs):
84 85 super(HeartMonitor, self).__init__(**kwargs)
85 86
86 87 self.pongstream.on_recv(self.handle_pong)
87 88
88 89 def start(self):
89 90 self.tic = time.time()
90 91 self.caller = ioloop.PeriodicCallback(self.beat, self.period, self.loop)
91 92 self.caller.start()
92 93
93 94 def add_new_heart_handler(self, handler):
94 95 """add a new handler for new hearts"""
95 self.log.debug("heartbeat::new_heart_handler: %s"%handler)
96 self.log.debug("heartbeat::new_heart_handler: %s", handler)
96 97 self._new_handlers.add(handler)
97 98
98 99 def add_heart_failure_handler(self, handler):
99 100 """add a new handler for heart failure"""
100 self.log.debug("heartbeat::new heart failure handler: %s"%handler)
101 self.log.debug("heartbeat::new heart failure handler: %s", handler)
101 102 self._failure_handlers.add(handler)
102 103
103 104 def beat(self):
104 105 self.pongstream.flush()
105 106 self.last_ping = self.lifetime
106 107
107 108 toc = time.time()
108 109 self.lifetime += toc-self.tic
109 110 self.tic = toc
110 # self.log.debug("heartbeat::%s"%self.lifetime)
111 self.log.debug("heartbeat::sending %s", self.lifetime)
111 112 goodhearts = self.hearts.intersection(self.responses)
112 113 missed_beats = self.hearts.difference(goodhearts)
113 114 heartfailures = self.on_probation.intersection(missed_beats)
114 115 newhearts = self.responses.difference(goodhearts)
115 116 map(self.handle_new_heart, newhearts)
116 117 map(self.handle_heart_failure, heartfailures)
117 118 self.on_probation = missed_beats.intersection(self.hearts)
118 119 self.responses = set()
119 120 # print self.on_probation, self.hearts
120 # self.log.debug("heartbeat::beat %.3f, %i beating hearts"%(self.lifetime, len(self.hearts)))
121 # self.log.debug("heartbeat::beat %.3f, %i beating hearts", self.lifetime, len(self.hearts))
121 122 self.pingstream.send(asbytes(str(self.lifetime)))
122 123
123 124 def handle_new_heart(self, heart):
124 125 if self._new_handlers:
125 126 for handler in self._new_handlers:
126 127 handler(heart)
127 128 else:
128 self.log.info("heartbeat::yay, got new heart %s!"%heart)
129 self.log.info("heartbeat::yay, got new heart %s!", heart)
129 130 self.hearts.add(heart)
130 131
131 132 def handle_heart_failure(self, heart):
132 133 if self._failure_handlers:
133 134 for handler in self._failure_handlers:
134 135 try:
135 136 handler(heart)
136 137 except Exception as e:
137 self.log.error("heartbeat::Bad Handler! %s"%handler, exc_info=True)
138 self.log.error("heartbeat::Bad Handler! %s", handler, exc_info=True)
138 139 pass
139 140 else:
140 self.log.info("heartbeat::Heart %s failed :("%heart)
141 self.log.info("heartbeat::Heart %s failed :(", heart)
141 142 self.hearts.remove(heart)
142 143
143 144
144 145 def handle_pong(self, msg):
145 146 "a heart just beat"
146 147 current = asbytes(str(self.lifetime))
147 148 last = asbytes(str(self.last_ping))
148 149 if msg[1] == current:
149 150 delta = time.time()-self.tic
150 151 # self.log.debug("heartbeat::heart %r took %.2f ms to respond"%(msg[0], 1000*delta))
151 152 self.responses.add(msg[0])
152 153 elif msg[1] == last:
153 154 delta = time.time()-self.tic + (self.lifetime-self.last_ping)
154 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond"%(msg[0], 1000*delta))
155 self.log.warn("heartbeat::heart %r missed a beat, and took %.2f ms to respond", msg[0], 1000*delta)
155 156 self.responses.add(msg[0])
156 157 else:
157 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)"%
158 (msg[1],self.lifetime))
158 self.log.warn("heartbeat::got bad heartbeat (possibly old?): %s (current=%.3f)", msg[1], self.lifetime)
159 159
160 160
161 161 if __name__ == '__main__':
162 162 loop = ioloop.IOLoop.instance()
163 163 context = zmq.Context()
164 164 pub = context.socket(zmq.PUB)
165 165 pub.bind('tcp://127.0.0.1:5555')
166 166 xrep = context.socket(zmq.ROUTER)
167 167 xrep.bind('tcp://127.0.0.1:5556')
168 168
169 169 outstream = zmqstream.ZMQStream(pub, loop)
170 170 instream = zmqstream.ZMQStream(xrep, loop)
171 171
172 172 hb = HeartMonitor(loop, outstream, instream)
173 173
174 174 loop.start()
@@ -1,1290 +1,1293
1 1 """The IPython Controller Hub with 0MQ
2 2 This is the master object that handles connections from engines and clients,
3 3 and monitors traffic through the various queues.
4 4
5 5 Authors:
6 6
7 7 * Min RK
8 8 """
9 9 #-----------------------------------------------------------------------------
10 10 # Copyright (C) 2010-2011 The IPython Development Team
11 11 #
12 12 # Distributed under the terms of the BSD License. The full license is in
13 13 # the file COPYING, distributed as part of this software.
14 14 #-----------------------------------------------------------------------------
15 15
16 16 #-----------------------------------------------------------------------------
17 17 # Imports
18 18 #-----------------------------------------------------------------------------
19 19 from __future__ import print_function
20 20
21 21 import sys
22 22 import time
23 23 from datetime import datetime
24 24
25 25 import zmq
26 26 from zmq.eventloop import ioloop
27 27 from zmq.eventloop.zmqstream import ZMQStream
28 28
29 29 # internal:
30 30 from IPython.utils.importstring import import_item
31 31 from IPython.utils.traitlets import (
32 32 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
33 33 )
34 34
35 35 from IPython.parallel import error, util
36 36 from IPython.parallel.factory import RegistrationFactory
37 37
38 38 from IPython.zmq.session import SessionFactory
39 39
40 40 from .heartmonitor import HeartMonitor
41 41
42 42 #-----------------------------------------------------------------------------
43 43 # Code
44 44 #-----------------------------------------------------------------------------
45 45
46 46 def _passer(*args, **kwargs):
47 47 return
48 48
49 49 def _printer(*args, **kwargs):
50 50 print (args)
51 51 print (kwargs)
52 52
53 53 def empty_record():
54 54 """Return an empty dict with all record keys."""
55 55 return {
56 56 'msg_id' : None,
57 57 'header' : None,
58 58 'content': None,
59 59 'buffers': None,
60 60 'submitted': None,
61 61 'client_uuid' : None,
62 62 'engine_uuid' : None,
63 63 'started': None,
64 64 'completed': None,
65 65 'resubmitted': None,
66 66 'result_header' : None,
67 67 'result_content' : None,
68 68 'result_buffers' : None,
69 69 'queue' : None,
70 70 'pyin' : None,
71 71 'pyout': None,
72 72 'pyerr': None,
73 73 'stdout': '',
74 74 'stderr': '',
75 75 }
76 76
77 77 def init_record(msg):
78 78 """Initialize a TaskRecord based on a request."""
79 79 header = msg['header']
80 80 return {
81 81 'msg_id' : header['msg_id'],
82 82 'header' : header,
83 83 'content': msg['content'],
84 84 'buffers': msg['buffers'],
85 85 'submitted': header['date'],
86 86 'client_uuid' : None,
87 87 'engine_uuid' : None,
88 88 'started': None,
89 89 'completed': None,
90 90 'resubmitted': None,
91 91 'result_header' : None,
92 92 'result_content' : None,
93 93 'result_buffers' : None,
94 94 'queue' : None,
95 95 'pyin' : None,
96 96 'pyout': None,
97 97 'pyerr': None,
98 98 'stdout': '',
99 99 'stderr': '',
100 100 }
101 101
102 102
103 103 class EngineConnector(HasTraits):
104 104 """A simple object for accessing the various zmq connections of an object.
105 105 Attributes are:
106 106 id (int): engine ID
107 107 uuid (str): uuid (unused?)
108 108 queue (str): identity of queue's XREQ socket
109 109 registration (str): identity of registration XREQ socket
110 110 heartbeat (str): identity of heartbeat XREQ socket
111 111 """
112 112 id=Integer(0)
113 113 queue=CBytes()
114 114 control=CBytes()
115 115 registration=CBytes()
116 116 heartbeat=CBytes()
117 117 pending=Set()
118 118
119 119 class HubFactory(RegistrationFactory):
120 120 """The Configurable for setting up a Hub."""
121 121
122 122 # port-pairs for monitoredqueues:
123 123 hb = Tuple(Integer,Integer,config=True,
124 124 help="""XREQ/SUB Port pair for Engine heartbeats""")
125 125 def _hb_default(self):
126 126 return tuple(util.select_random_ports(2))
127 127
128 128 mux = Tuple(Integer,Integer,config=True,
129 129 help="""Engine/Client Port pair for MUX queue""")
130 130
131 131 def _mux_default(self):
132 132 return tuple(util.select_random_ports(2))
133 133
134 134 task = Tuple(Integer,Integer,config=True,
135 135 help="""Engine/Client Port pair for Task queue""")
136 136 def _task_default(self):
137 137 return tuple(util.select_random_ports(2))
138 138
139 139 control = Tuple(Integer,Integer,config=True,
140 140 help="""Engine/Client Port pair for Control queue""")
141 141
142 142 def _control_default(self):
143 143 return tuple(util.select_random_ports(2))
144 144
145 145 iopub = Tuple(Integer,Integer,config=True,
146 146 help="""Engine/Client Port pair for IOPub relay""")
147 147
148 148 def _iopub_default(self):
149 149 return tuple(util.select_random_ports(2))
150 150
151 151 # single ports:
152 152 mon_port = Integer(config=True,
153 153 help="""Monitor (SUB) port for queue traffic""")
154 154
155 155 def _mon_port_default(self):
156 156 return util.select_random_ports(1)[0]
157 157
158 158 notifier_port = Integer(config=True,
159 159 help="""PUB port for sending engine status notifications""")
160 160
161 161 def _notifier_port_default(self):
162 162 return util.select_random_ports(1)[0]
163 163
164 164 engine_ip = Unicode('127.0.0.1', config=True,
165 165 help="IP on which to listen for engine connections. [default: loopback]")
166 166 engine_transport = Unicode('tcp', config=True,
167 167 help="0MQ transport for engine connections. [default: tcp]")
168 168
169 169 client_ip = Unicode('127.0.0.1', config=True,
170 170 help="IP on which to listen for client connections. [default: loopback]")
171 171 client_transport = Unicode('tcp', config=True,
172 172 help="0MQ transport for client connections. [default : tcp]")
173 173
174 174 monitor_ip = Unicode('127.0.0.1', config=True,
175 175 help="IP on which to listen for monitor messages. [default: loopback]")
176 176 monitor_transport = Unicode('tcp', config=True,
177 177 help="0MQ transport for monitor messages. [default : tcp]")
178 178
179 179 monitor_url = Unicode('')
180 180
181 181 db_class = DottedObjectName('IPython.parallel.controller.dictdb.DictDB',
182 182 config=True, help="""The class to use for the DB backend""")
183 183
184 184 # not configurable
185 185 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
186 186 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
187 187
188 188 def _ip_changed(self, name, old, new):
189 189 self.engine_ip = new
190 190 self.client_ip = new
191 191 self.monitor_ip = new
192 192 self._update_monitor_url()
193 193
194 194 def _update_monitor_url(self):
195 195 self.monitor_url = "%s://%s:%i"%(self.monitor_transport, self.monitor_ip, self.mon_port)
196 196
197 197 def _transport_changed(self, name, old, new):
198 198 self.engine_transport = new
199 199 self.client_transport = new
200 200 self.monitor_transport = new
201 201 self._update_monitor_url()
202 202
203 203 def __init__(self, **kwargs):
204 204 super(HubFactory, self).__init__(**kwargs)
205 205 self._update_monitor_url()
206 206
207 207
208 208 def construct(self):
209 209 self.init_hub()
210 210
211 211 def start(self):
212 212 self.heartmonitor.start()
213 213 self.log.info("Heartmonitor started")
214 214
215 215 def init_hub(self):
216 216 """construct"""
217 217 client_iface = "%s://%s:"%(self.client_transport, self.client_ip) + "%i"
218 218 engine_iface = "%s://%s:"%(self.engine_transport, self.engine_ip) + "%i"
219 219
220 220 ctx = self.context
221 221 loop = self.loop
222 222
223 223 # Registrar socket
224 224 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
225 225 q.bind(client_iface % self.regport)
226 self.log.info("Hub listening on %s for registration."%(client_iface%self.regport))
226 self.log.info("Hub listening on %s for registration.", client_iface % self.regport)
227 227 if self.client_ip != self.engine_ip:
228 228 q.bind(engine_iface % self.regport)
229 self.log.info("Hub listening on %s for registration."%(engine_iface%self.regport))
229 self.log.info("Hub listening on %s for registration.", engine_iface % self.regport)
230 230
231 231 ### Engine connections ###
232 232
233 233 # heartbeat
234 234 hpub = ctx.socket(zmq.PUB)
235 235 hpub.bind(engine_iface % self.hb[0])
236 236 hrep = ctx.socket(zmq.ROUTER)
237 237 hrep.bind(engine_iface % self.hb[1])
238 238 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
239 239 pingstream=ZMQStream(hpub,loop),
240 240 pongstream=ZMQStream(hrep,loop)
241 241 )
242 242
243 243 ### Client connections ###
244 244 # Notifier socket
245 245 n = ZMQStream(ctx.socket(zmq.PUB), loop)
246 246 n.bind(client_iface%self.notifier_port)
247 247
248 248 ### build and launch the queues ###
249 249
250 250 # monitor socket
251 251 sub = ctx.socket(zmq.SUB)
252 252 sub.setsockopt(zmq.SUBSCRIBE, b"")
253 253 sub.bind(self.monitor_url)
254 254 sub.bind('inproc://monitor')
255 255 sub = ZMQStream(sub, loop)
256 256
257 257 # connect the db
258 258 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
259 259 # cdir = self.config.Global.cluster_dir
260 260 self.db = import_item(str(self.db_class))(session=self.session.session,
261 261 config=self.config, log=self.log)
262 262 time.sleep(.25)
263 263 try:
264 264 scheme = self.config.TaskScheduler.scheme_name
265 265 except AttributeError:
266 266 from .scheduler import TaskScheduler
267 267 scheme = TaskScheduler.scheme_name.get_default_value()
268 268 # build connection dicts
269 269 self.engine_info = {
270 270 'control' : engine_iface%self.control[1],
271 271 'mux': engine_iface%self.mux[1],
272 272 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
273 273 'task' : engine_iface%self.task[1],
274 274 'iopub' : engine_iface%self.iopub[1],
275 275 # 'monitor' : engine_iface%self.mon_port,
276 276 }
277 277
278 278 self.client_info = {
279 279 'control' : client_iface%self.control[0],
280 280 'mux': client_iface%self.mux[0],
281 281 'task' : (scheme, client_iface%self.task[0]),
282 282 'iopub' : client_iface%self.iopub[0],
283 283 'notification': client_iface%self.notifier_port
284 284 }
285 self.log.debug("Hub engine addrs: %s"%self.engine_info)
286 self.log.debug("Hub client addrs: %s"%self.client_info)
285 self.log.debug("Hub engine addrs: %s", self.engine_info)
286 self.log.debug("Hub client addrs: %s", self.client_info)
287 287
288 288 # resubmit stream
289 289 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
290 290 url = util.disambiguate_url(self.client_info['task'][-1])
291 291 r.setsockopt(zmq.IDENTITY, self.session.bsession)
292 292 r.connect(url)
293 293
294 294 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
295 295 query=q, notifier=n, resubmit=r, db=self.db,
296 296 engine_info=self.engine_info, client_info=self.client_info,
297 297 log=self.log)
298 298
299 299
300 300 class Hub(SessionFactory):
301 301 """The IPython Controller Hub with 0MQ connections
302 302
303 303 Parameters
304 304 ==========
305 305 loop: zmq IOLoop instance
306 306 session: Session object
307 307 <removed> context: zmq context for creating new connections (?)
308 308 queue: ZMQStream for monitoring the command queue (SUB)
309 309 query: ZMQStream for engine registration and client queries requests (XREP)
310 310 heartbeat: HeartMonitor object checking the pulse of the engines
311 311 notifier: ZMQStream for broadcasting engine registration changes (PUB)
312 312 db: connection to db for out of memory logging of commands
313 313 NotImplemented
314 314 engine_info: dict of zmq connection information for engines to connect
315 315 to the queues.
316 316 client_info: dict of zmq connection information for engines to connect
317 317 to the queues.
318 318 """
319 319 # internal data structures:
320 320 ids=Set() # engine IDs
321 321 keytable=Dict()
322 322 by_ident=Dict()
323 323 engines=Dict()
324 324 clients=Dict()
325 325 hearts=Dict()
326 326 pending=Set()
327 327 queues=Dict() # pending msg_ids keyed by engine_id
328 328 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
329 329 completed=Dict() # completed msg_ids keyed by engine_id
330 330 all_completed=Set() # completed msg_ids keyed by engine_id
331 331 dead_engines=Set() # completed msg_ids keyed by engine_id
332 332 unassigned=Set() # set of task msg_ds not yet assigned a destination
333 333 incoming_registrations=Dict()
334 334 registration_timeout=Integer()
335 335 _idcounter=Integer(0)
336 336
337 337 # objects from constructor:
338 338 query=Instance(ZMQStream)
339 339 monitor=Instance(ZMQStream)
340 340 notifier=Instance(ZMQStream)
341 341 resubmit=Instance(ZMQStream)
342 342 heartmonitor=Instance(HeartMonitor)
343 343 db=Instance(object)
344 344 client_info=Dict()
345 345 engine_info=Dict()
346 346
347 347
348 348 def __init__(self, **kwargs):
349 349 """
350 350 # universal:
351 351 loop: IOLoop for creating future connections
352 352 session: streamsession for sending serialized data
353 353 # engine:
354 354 queue: ZMQStream for monitoring queue messages
355 355 query: ZMQStream for engine+client registration and client requests
356 356 heartbeat: HeartMonitor object for tracking engines
357 357 # extra:
358 358 db: ZMQStream for db connection (NotImplemented)
359 359 engine_info: zmq address/protocol dict for engine connections
360 360 client_info: zmq address/protocol dict for client connections
361 361 """
362 362
363 363 super(Hub, self).__init__(**kwargs)
364 364 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
365 365
366 366 # validate connection dicts:
367 367 for k,v in self.client_info.iteritems():
368 368 if k == 'task':
369 369 util.validate_url_container(v[1])
370 370 else:
371 371 util.validate_url_container(v)
372 372 # util.validate_url_container(self.client_info)
373 373 util.validate_url_container(self.engine_info)
374 374
375 375 # register our callbacks
376 376 self.query.on_recv(self.dispatch_query)
377 377 self.monitor.on_recv(self.dispatch_monitor_traffic)
378 378
379 379 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
380 380 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
381 381
382 382 self.monitor_handlers = {b'in' : self.save_queue_request,
383 383 b'out': self.save_queue_result,
384 384 b'intask': self.save_task_request,
385 385 b'outtask': self.save_task_result,
386 386 b'tracktask': self.save_task_destination,
387 387 b'incontrol': _passer,
388 388 b'outcontrol': _passer,
389 389 b'iopub': self.save_iopub_message,
390 390 }
391 391
392 392 self.query_handlers = {'queue_request': self.queue_status,
393 393 'result_request': self.get_results,
394 394 'history_request': self.get_history,
395 395 'db_request': self.db_query,
396 396 'purge_request': self.purge_results,
397 397 'load_request': self.check_load,
398 398 'resubmit_request': self.resubmit_task,
399 399 'shutdown_request': self.shutdown_request,
400 400 'registration_request' : self.register_engine,
401 401 'unregistration_request' : self.unregister_engine,
402 402 'connection_request': self.connection_request,
403 403 }
404 404
405 405 # ignore resubmit replies
406 406 self.resubmit.on_recv(lambda msg: None, copy=False)
407 407
408 408 self.log.info("hub::created hub")
409 409
410 410 @property
411 411 def _next_id(self):
412 412 """gemerate a new ID.
413 413
414 414 No longer reuse old ids, just count from 0."""
415 415 newid = self._idcounter
416 416 self._idcounter += 1
417 417 return newid
418 418 # newid = 0
419 419 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
420 420 # # print newid, self.ids, self.incoming_registrations
421 421 # while newid in self.ids or newid in incoming:
422 422 # newid += 1
423 423 # return newid
424 424
425 425 #-----------------------------------------------------------------------------
426 426 # message validation
427 427 #-----------------------------------------------------------------------------
428 428
429 429 def _validate_targets(self, targets):
430 430 """turn any valid targets argument into a list of integer ids"""
431 431 if targets is None:
432 432 # default to all
433 433 targets = self.ids
434 434
435 435 if isinstance(targets, (int,str,unicode)):
436 436 # only one target specified
437 437 targets = [targets]
438 438 _targets = []
439 439 for t in targets:
440 440 # map raw identities to ids
441 441 if isinstance(t, (str,unicode)):
442 442 t = self.by_ident.get(t, t)
443 443 _targets.append(t)
444 444 targets = _targets
445 445 bad_targets = [ t for t in targets if t not in self.ids ]
446 446 if bad_targets:
447 447 raise IndexError("No Such Engine: %r"%bad_targets)
448 448 if not targets:
449 449 raise IndexError("No Engines Registered")
450 450 return targets
451 451
452 452 #-----------------------------------------------------------------------------
453 453 # dispatch methods (1 per stream)
454 454 #-----------------------------------------------------------------------------
455 455
456 456
457 457 def dispatch_monitor_traffic(self, msg):
458 458 """all ME and Task queue messages come through here, as well as
459 459 IOPub traffic."""
460 self.log.debug("monitor traffic: %r"%msg[:2])
460 self.log.debug("monitor traffic: %r", msg[:2])
461 461 switch = msg[0]
462 462 try:
463 463 idents, msg = self.session.feed_identities(msg[1:])
464 464 except ValueError:
465 465 idents=[]
466 466 if not idents:
467 self.log.error("Bad Monitor Message: %r"%msg)
467 self.log.error("Bad Monitor Message: %r", msg)
468 468 return
469 469 handler = self.monitor_handlers.get(switch, None)
470 470 if handler is not None:
471 471 handler(idents, msg)
472 472 else:
473 self.log.error("Invalid monitor topic: %r"%switch)
473 self.log.error("Invalid monitor topic: %r", switch)
474 474
475 475
476 476 def dispatch_query(self, msg):
477 477 """Route registration requests and queries from clients."""
478 478 try:
479 479 idents, msg = self.session.feed_identities(msg)
480 480 except ValueError:
481 481 idents = []
482 482 if not idents:
483 self.log.error("Bad Query Message: %r"%msg)
483 self.log.error("Bad Query Message: %r", msg)
484 484 return
485 485 client_id = idents[0]
486 486 try:
487 487 msg = self.session.unserialize(msg, content=True)
488 488 except Exception:
489 489 content = error.wrap_exception()
490 self.log.error("Bad Query Message: %r"%msg, exc_info=True)
490 self.log.error("Bad Query Message: %r", msg, exc_info=True)
491 491 self.session.send(self.query, "hub_error", ident=client_id,
492 492 content=content)
493 493 return
494 494 # print client_id, header, parent, content
495 495 #switch on message type:
496 496 msg_type = msg['header']['msg_type']
497 self.log.info("client::client %r requested %r"%(client_id, msg_type))
497 self.log.info("client::client %r requested %r", client_id, msg_type)
498 498 handler = self.query_handlers.get(msg_type, None)
499 499 try:
500 500 assert handler is not None, "Bad Message Type: %r"%msg_type
501 501 except:
502 502 content = error.wrap_exception()
503 self.log.error("Bad Message Type: %r"%msg_type, exc_info=True)
503 self.log.error("Bad Message Type: %r", msg_type, exc_info=True)
504 504 self.session.send(self.query, "hub_error", ident=client_id,
505 505 content=content)
506 506 return
507 507
508 508 else:
509 509 handler(idents, msg)
510 510
511 511 def dispatch_db(self, msg):
512 512 """"""
513 513 raise NotImplementedError
514 514
515 515 #---------------------------------------------------------------------------
516 516 # handler methods (1 per event)
517 517 #---------------------------------------------------------------------------
518 518
519 519 #----------------------- Heartbeat --------------------------------------
520 520
521 521 def handle_new_heart(self, heart):
522 522 """handler to attach to heartbeater.
523 523 Called when a new heart starts to beat.
524 524 Triggers completion of registration."""
525 self.log.debug("heartbeat::handle_new_heart(%r)"%heart)
525 self.log.debug("heartbeat::handle_new_heart(%r)", heart)
526 526 if heart not in self.incoming_registrations:
527 self.log.info("heartbeat::ignoring new heart: %r"%heart)
527 self.log.info("heartbeat::ignoring new heart: %r", heart)
528 528 else:
529 529 self.finish_registration(heart)
530 530
531 531
532 532 def handle_heart_failure(self, heart):
533 533 """handler to attach to heartbeater.
534 534 called when a previously registered heart fails to respond to beat request.
535 535 triggers unregistration"""
536 self.log.debug("heartbeat::handle_heart_failure(%r)"%heart)
536 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
537 537 eid = self.hearts.get(heart, None)
538 538 queue = self.engines[eid].queue
539 if eid is None:
540 self.log.info("heartbeat::ignoring heart failure %r"%heart)
539 if eid is None or self.keytable[eid] in self.dead_engines:
540 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
541 541 else:
542 542 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
543 543
544 544 #----------------------- MUX Queue Traffic ------------------------------
545 545
546 546 def save_queue_request(self, idents, msg):
547 547 if len(idents) < 2:
548 self.log.error("invalid identity prefix: %r"%idents)
548 self.log.error("invalid identity prefix: %r", idents)
549 549 return
550 550 queue_id, client_id = idents[:2]
551 551 try:
552 552 msg = self.session.unserialize(msg)
553 553 except Exception:
554 self.log.error("queue::client %r sent invalid message to %r: %r"%(client_id, queue_id, msg), exc_info=True)
554 self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
555 555 return
556 556
557 557 eid = self.by_ident.get(queue_id, None)
558 558 if eid is None:
559 self.log.error("queue::target %r not registered"%queue_id)
560 self.log.debug("queue:: valid are: %r"%(self.by_ident.keys()))
559 self.log.error("queue::target %r not registered", queue_id)
560 self.log.debug("queue:: valid are: %r", self.by_ident.keys())
561 561 return
562 562 record = init_record(msg)
563 563 msg_id = record['msg_id']
564 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
564 565 # Unicode in records
565 566 record['engine_uuid'] = queue_id.decode('ascii')
566 567 record['client_uuid'] = client_id.decode('ascii')
567 568 record['queue'] = 'mux'
568 569
569 570 try:
570 571 # it's posible iopub arrived first:
571 572 existing = self.db.get_record(msg_id)
572 573 for key,evalue in existing.iteritems():
573 574 rvalue = record.get(key, None)
574 575 if evalue and rvalue and evalue != rvalue:
575 self.log.warn("conflicting initial state for record: %r:%r <%r> %r"%(msg_id, rvalue, key, evalue))
576 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
576 577 elif evalue and not rvalue:
577 578 record[key] = evalue
578 579 try:
579 580 self.db.update_record(msg_id, record)
580 581 except Exception:
581 self.log.error("DB Error updating record %r"%msg_id, exc_info=True)
582 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
582 583 except KeyError:
583 584 try:
584 585 self.db.add_record(msg_id, record)
585 586 except Exception:
586 self.log.error("DB Error adding record %r"%msg_id, exc_info=True)
587 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
587 588
588 589
589 590 self.pending.add(msg_id)
590 591 self.queues[eid].append(msg_id)
591 592
592 593 def save_queue_result(self, idents, msg):
593 594 if len(idents) < 2:
594 self.log.error("invalid identity prefix: %r"%idents)
595 self.log.error("invalid identity prefix: %r", idents)
595 596 return
596 597
597 598 client_id, queue_id = idents[:2]
598 599 try:
599 600 msg = self.session.unserialize(msg)
600 601 except Exception:
601 self.log.error("queue::engine %r sent invalid message to %r: %r"%(
602 queue_id,client_id, msg), exc_info=True)
602 self.log.error("queue::engine %r sent invalid message to %r: %r",
603 queue_id, client_id, msg, exc_info=True)
603 604 return
604 605
605 606 eid = self.by_ident.get(queue_id, None)
606 607 if eid is None:
607 self.log.error("queue::unknown engine %r is sending a reply: "%queue_id)
608 self.log.error("queue::unknown engine %r is sending a reply: ", queue_id)
608 609 return
609 610
610 611 parent = msg['parent_header']
611 612 if not parent:
612 613 return
613 614 msg_id = parent['msg_id']
614 615 if msg_id in self.pending:
615 616 self.pending.remove(msg_id)
616 617 self.all_completed.add(msg_id)
617 618 self.queues[eid].remove(msg_id)
618 619 self.completed[eid].append(msg_id)
620 self.log.info("queue::request %r completed on %s", msg_id, eid)
619 621 elif msg_id not in self.all_completed:
620 622 # it could be a result from a dead engine that died before delivering the
621 623 # result
622 self.log.warn("queue:: unknown msg finished %r"%msg_id)
624 self.log.warn("queue:: unknown msg finished %r", msg_id)
623 625 return
624 626 # update record anyway, because the unregistration could have been premature
625 627 rheader = msg['header']
626 628 completed = rheader['date']
627 629 started = rheader.get('started', None)
628 630 result = {
629 631 'result_header' : rheader,
630 632 'result_content': msg['content'],
631 633 'started' : started,
632 634 'completed' : completed
633 635 }
634 636
635 637 result['result_buffers'] = msg['buffers']
636 638 try:
637 639 self.db.update_record(msg_id, result)
638 640 except Exception:
639 self.log.error("DB Error updating record %r"%msg_id, exc_info=True)
641 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
640 642
641 643
642 644 #--------------------- Task Queue Traffic ------------------------------
643 645
644 646 def save_task_request(self, idents, msg):
645 647 """Save the submission of a task."""
646 648 client_id = idents[0]
647 649
648 650 try:
649 651 msg = self.session.unserialize(msg)
650 652 except Exception:
651 self.log.error("task::client %r sent invalid task message: %r"%(
652 client_id, msg), exc_info=True)
653 self.log.error("task::client %r sent invalid task message: %r",
654 client_id, msg, exc_info=True)
653 655 return
654 656 record = init_record(msg)
655 657
656 658 record['client_uuid'] = client_id.decode('ascii')
657 659 record['queue'] = 'task'
658 660 header = msg['header']
659 661 msg_id = header['msg_id']
660 662 self.pending.add(msg_id)
661 663 self.unassigned.add(msg_id)
662 664 try:
663 665 # it's posible iopub arrived first:
664 666 existing = self.db.get_record(msg_id)
665 667 if existing['resubmitted']:
666 668 for key in ('submitted', 'client_uuid', 'buffers'):
667 669 # don't clobber these keys on resubmit
668 670 # submitted and client_uuid should be different
669 671 # and buffers might be big, and shouldn't have changed
670 672 record.pop(key)
671 673 # still check content,header which should not change
672 674 # but are not expensive to compare as buffers
673 675
674 676 for key,evalue in existing.iteritems():
675 677 if key.endswith('buffers'):
676 678 # don't compare buffers
677 679 continue
678 680 rvalue = record.get(key, None)
679 681 if evalue and rvalue and evalue != rvalue:
680 self.log.warn("conflicting initial state for record: %r:%r <%r> %r"%(msg_id, rvalue, key, evalue))
682 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
681 683 elif evalue and not rvalue:
682 684 record[key] = evalue
683 685 try:
684 686 self.db.update_record(msg_id, record)
685 687 except Exception:
686 self.log.error("DB Error updating record %r"%msg_id, exc_info=True)
688 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
687 689 except KeyError:
688 690 try:
689 691 self.db.add_record(msg_id, record)
690 692 except Exception:
691 self.log.error("DB Error adding record %r"%msg_id, exc_info=True)
693 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
692 694 except Exception:
693 self.log.error("DB Error saving task request %r"%msg_id, exc_info=True)
695 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
694 696
695 697 def save_task_result(self, idents, msg):
696 698 """save the result of a completed task."""
697 699 client_id = idents[0]
698 700 try:
699 701 msg = self.session.unserialize(msg)
700 702 except Exception:
701 self.log.error("task::invalid task result message send to %r: %r"%(
702 client_id, msg), exc_info=True)
703 self.log.error("task::invalid task result message send to %r: %r",
704 client_id, msg, exc_info=True)
703 705 return
704 706
705 707 parent = msg['parent_header']
706 708 if not parent:
707 709 # print msg
708 self.log.warn("Task %r had no parent!"%msg)
710 self.log.warn("Task %r had no parent!", msg)
709 711 return
710 712 msg_id = parent['msg_id']
711 713 if msg_id in self.unassigned:
712 714 self.unassigned.remove(msg_id)
713 715
714 716 header = msg['header']
715 717 engine_uuid = header.get('engine', None)
716 718 eid = self.by_ident.get(engine_uuid, None)
717 719
718 720 if msg_id in self.pending:
721 self.log.info("task::task %r finished on %s", msg_id, eid)
719 722 self.pending.remove(msg_id)
720 723 self.all_completed.add(msg_id)
721 724 if eid is not None:
722 725 self.completed[eid].append(msg_id)
723 726 if msg_id in self.tasks[eid]:
724 727 self.tasks[eid].remove(msg_id)
725 728 completed = header['date']
726 729 started = header.get('started', None)
727 730 result = {
728 731 'result_header' : header,
729 732 'result_content': msg['content'],
730 733 'started' : started,
731 734 'completed' : completed,
732 735 'engine_uuid': engine_uuid
733 736 }
734 737
735 738 result['result_buffers'] = msg['buffers']
736 739 try:
737 740 self.db.update_record(msg_id, result)
738 741 except Exception:
739 self.log.error("DB Error saving task request %r"%msg_id, exc_info=True)
742 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
740 743
741 744 else:
742 self.log.debug("task::unknown task %r finished"%msg_id)
745 self.log.debug("task::unknown task %r finished", msg_id)
743 746
744 747 def save_task_destination(self, idents, msg):
745 748 try:
746 749 msg = self.session.unserialize(msg, content=True)
747 750 except Exception:
748 751 self.log.error("task::invalid task tracking message", exc_info=True)
749 752 return
750 753 content = msg['content']
751 754 # print (content)
752 755 msg_id = content['msg_id']
753 756 engine_uuid = content['engine_id']
754 757 eid = self.by_ident[util.asbytes(engine_uuid)]
755 758
756 self.log.info("task::task %r arrived on %r"%(msg_id, eid))
759 self.log.info("task::task %r arrived on %r", msg_id, eid)
757 760 if msg_id in self.unassigned:
758 761 self.unassigned.remove(msg_id)
759 762 # else:
760 763 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
761 764
762 765 self.tasks[eid].append(msg_id)
763 766 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
764 767 try:
765 768 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
766 769 except Exception:
767 self.log.error("DB Error saving task destination %r"%msg_id, exc_info=True)
770 self.log.error("DB Error saving task destination %r", msg_id, exc_info=True)
768 771
769 772
770 773 def mia_task_request(self, idents, msg):
771 774 raise NotImplementedError
772 775 client_id = idents[0]
773 776 # content = dict(mia=self.mia,status='ok')
774 777 # self.session.send('mia_reply', content=content, idents=client_id)
775 778
776 779
777 780 #--------------------- IOPub Traffic ------------------------------
778 781
779 782 def save_iopub_message(self, topics, msg):
780 783 """save an iopub message into the db"""
781 784 # print (topics)
782 785 try:
783 786 msg = self.session.unserialize(msg, content=True)
784 787 except Exception:
785 788 self.log.error("iopub::invalid IOPub message", exc_info=True)
786 789 return
787 790
788 791 parent = msg['parent_header']
789 792 if not parent:
790 self.log.error("iopub::invalid IOPub message: %r"%msg)
793 self.log.error("iopub::invalid IOPub message: %r", msg)
791 794 return
792 795 msg_id = parent['msg_id']
793 796 msg_type = msg['header']['msg_type']
794 797 content = msg['content']
795 798
796 799 # ensure msg_id is in db
797 800 try:
798 801 rec = self.db.get_record(msg_id)
799 802 except KeyError:
800 803 rec = empty_record()
801 804 rec['msg_id'] = msg_id
802 805 self.db.add_record(msg_id, rec)
803 806 # stream
804 807 d = {}
805 808 if msg_type == 'stream':
806 809 name = content['name']
807 810 s = rec[name] or ''
808 811 d[name] = s + content['data']
809 812
810 813 elif msg_type == 'pyerr':
811 814 d['pyerr'] = content
812 815 elif msg_type == 'pyin':
813 816 d['pyin'] = content['code']
814 817 else:
815 818 d[msg_type] = content.get('data', '')
816 819
817 820 try:
818 821 self.db.update_record(msg_id, d)
819 822 except Exception:
820 self.log.error("DB Error saving iopub message %r"%msg_id, exc_info=True)
823 self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True)
821 824
822 825
823 826
824 827 #-------------------------------------------------------------------------
825 828 # Registration requests
826 829 #-------------------------------------------------------------------------
827 830
828 831 def connection_request(self, client_id, msg):
829 832 """Reply with connection addresses for clients."""
830 self.log.info("client::client %r connected"%client_id)
833 self.log.info("client::client %r connected", client_id)
831 834 content = dict(status='ok')
832 835 content.update(self.client_info)
833 836 jsonable = {}
834 837 for k,v in self.keytable.iteritems():
835 838 if v not in self.dead_engines:
836 839 jsonable[str(k)] = v.decode('ascii')
837 840 content['engines'] = jsonable
838 841 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
839 842
840 843 def register_engine(self, reg, msg):
841 844 """Register a new engine."""
842 845 content = msg['content']
843 846 try:
844 847 queue = util.asbytes(content['queue'])
845 848 except KeyError:
846 849 self.log.error("registration::queue not specified", exc_info=True)
847 850 return
848 851 heart = content.get('heartbeat', None)
849 852 if heart:
850 853 heart = util.asbytes(heart)
851 854 """register a new engine, and create the socket(s) necessary"""
852 855 eid = self._next_id
853 856 # print (eid, queue, reg, heart)
854 857
855 self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
858 self.log.debug("registration::register_engine(%i, %r, %r, %r)", eid, queue, reg, heart)
856 859
857 860 content = dict(id=eid,status='ok')
858 861 content.update(self.engine_info)
859 862 # check if requesting available IDs:
860 863 if queue in self.by_ident:
861 864 try:
862 865 raise KeyError("queue_id %r in use"%queue)
863 866 except:
864 867 content = error.wrap_exception()
865 self.log.error("queue_id %r in use"%queue, exc_info=True)
868 self.log.error("queue_id %r in use", queue, exc_info=True)
866 869 elif heart in self.hearts: # need to check unique hearts?
867 870 try:
868 871 raise KeyError("heart_id %r in use"%heart)
869 872 except:
870 self.log.error("heart_id %r in use"%heart, exc_info=True)
873 self.log.error("heart_id %r in use", heart, exc_info=True)
871 874 content = error.wrap_exception()
872 875 else:
873 876 for h, pack in self.incoming_registrations.iteritems():
874 877 if heart == h:
875 878 try:
876 879 raise KeyError("heart_id %r in use"%heart)
877 880 except:
878 self.log.error("heart_id %r in use"%heart, exc_info=True)
881 self.log.error("heart_id %r in use", heart, exc_info=True)
879 882 content = error.wrap_exception()
880 883 break
881 884 elif queue == pack[1]:
882 885 try:
883 886 raise KeyError("queue_id %r in use"%queue)
884 887 except:
885 self.log.error("queue_id %r in use"%queue, exc_info=True)
888 self.log.error("queue_id %r in use", queue, exc_info=True)
886 889 content = error.wrap_exception()
887 890 break
888 891
889 892 msg = self.session.send(self.query, "registration_reply",
890 893 content=content,
891 894 ident=reg)
892 895
893 896 if content['status'] == 'ok':
894 897 if heart in self.heartmonitor.hearts:
895 898 # already beating
896 899 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
897 900 self.finish_registration(heart)
898 901 else:
899 902 purge = lambda : self._purge_stalled_registration(heart)
900 903 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
901 904 dc.start()
902 905 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
903 906 else:
904 self.log.error("registration::registration %i failed: %r"%(eid, content['evalue']))
907 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
905 908 return eid
906 909
907 910 def unregister_engine(self, ident, msg):
908 911 """Unregister an engine that explicitly requested to leave."""
909 912 try:
910 913 eid = msg['content']['id']
911 914 except:
912 self.log.error("registration::bad engine id for unregistration: %r"%ident, exc_info=True)
915 self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
913 916 return
914 self.log.info("registration::unregister_engine(%r)"%eid)
917 self.log.info("registration::unregister_engine(%r)", eid)
915 918 # print (eid)
916 919 uuid = self.keytable[eid]
917 920 content=dict(id=eid, queue=uuid.decode('ascii'))
918 921 self.dead_engines.add(uuid)
919 922 # self.ids.remove(eid)
920 923 # uuid = self.keytable.pop(eid)
921 924 #
922 925 # ec = self.engines.pop(eid)
923 926 # self.hearts.pop(ec.heartbeat)
924 927 # self.by_ident.pop(ec.queue)
925 928 # self.completed.pop(eid)
926 929 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
927 930 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
928 931 dc.start()
929 932 ############## TODO: HANDLE IT ################
930 933
931 934 if self.notifier:
932 935 self.session.send(self.notifier, "unregistration_notification", content=content)
933 936
934 937 def _handle_stranded_msgs(self, eid, uuid):
935 938 """Handle messages known to be on an engine when the engine unregisters.
936 939
937 940 It is possible that this will fire prematurely - that is, an engine will
938 941 go down after completing a result, and the client will be notified
939 942 that the result failed and later receive the actual result.
940 943 """
941 944
942 945 outstanding = self.queues[eid]
943 946
944 947 for msg_id in outstanding:
945 948 self.pending.remove(msg_id)
946 949 self.all_completed.add(msg_id)
947 950 try:
948 951 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
949 952 except:
950 953 content = error.wrap_exception()
951 954 # build a fake header:
952 955 header = {}
953 956 header['engine'] = uuid
954 957 header['date'] = datetime.now()
955 958 rec = dict(result_content=content, result_header=header, result_buffers=[])
956 959 rec['completed'] = header['date']
957 960 rec['engine_uuid'] = uuid
958 961 try:
959 962 self.db.update_record(msg_id, rec)
960 963 except Exception:
961 self.log.error("DB Error handling stranded msg %r"%msg_id, exc_info=True)
964 self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True)
962 965
963 966
964 967 def finish_registration(self, heart):
965 968 """Second half of engine registration, called after our HeartMonitor
966 969 has received a beat from the Engine's Heart."""
967 970 try:
968 971 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
969 972 except KeyError:
970 973 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
971 974 return
972 self.log.info("registration::finished registering engine %i:%r"%(eid,queue))
975 self.log.info("registration::finished registering engine %i:%r", eid, queue)
973 976 if purge is not None:
974 977 purge.stop()
975 978 control = queue
976 979 self.ids.add(eid)
977 980 self.keytable[eid] = queue
978 981 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
979 982 control=control, heartbeat=heart)
980 983 self.by_ident[queue] = eid
981 984 self.queues[eid] = list()
982 985 self.tasks[eid] = list()
983 986 self.completed[eid] = list()
984 987 self.hearts[heart] = eid
985 988 content = dict(id=eid, queue=self.engines[eid].queue.decode('ascii'))
986 989 if self.notifier:
987 990 self.session.send(self.notifier, "registration_notification", content=content)
988 self.log.info("engine::Engine Connected: %i"%eid)
991 self.log.info("engine::Engine Connected: %i", eid)
989 992
990 993 def _purge_stalled_registration(self, heart):
991 994 if heart in self.incoming_registrations:
992 995 eid = self.incoming_registrations.pop(heart)[0]
993 self.log.info("registration::purging stalled registration: %i"%eid)
996 self.log.info("registration::purging stalled registration: %i", eid)
994 997 else:
995 998 pass
996 999
997 1000 #-------------------------------------------------------------------------
998 1001 # Client Requests
999 1002 #-------------------------------------------------------------------------
1000 1003
1001 1004 def shutdown_request(self, client_id, msg):
1002 1005 """handle shutdown request."""
1003 1006 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1004 1007 # also notify other clients of shutdown
1005 1008 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1006 1009 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1007 1010 dc.start()
1008 1011
1009 1012 def _shutdown(self):
1010 1013 self.log.info("hub::hub shutting down.")
1011 1014 time.sleep(0.1)
1012 1015 sys.exit(0)
1013 1016
1014 1017
1015 1018 def check_load(self, client_id, msg):
1016 1019 content = msg['content']
1017 1020 try:
1018 1021 targets = content['targets']
1019 1022 targets = self._validate_targets(targets)
1020 1023 except:
1021 1024 content = error.wrap_exception()
1022 1025 self.session.send(self.query, "hub_error",
1023 1026 content=content, ident=client_id)
1024 1027 return
1025 1028
1026 1029 content = dict(status='ok')
1027 1030 # loads = {}
1028 1031 for t in targets:
1029 1032 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1030 1033 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1031 1034
1032 1035
1033 1036 def queue_status(self, client_id, msg):
1034 1037 """Return the Queue status of one or more targets.
1035 1038 if verbose: return the msg_ids
1036 1039 else: return len of each type.
1037 1040 keys: queue (pending MUX jobs)
1038 1041 tasks (pending Task jobs)
1039 1042 completed (finished jobs from both queues)"""
1040 1043 content = msg['content']
1041 1044 targets = content['targets']
1042 1045 try:
1043 1046 targets = self._validate_targets(targets)
1044 1047 except:
1045 1048 content = error.wrap_exception()
1046 1049 self.session.send(self.query, "hub_error",
1047 1050 content=content, ident=client_id)
1048 1051 return
1049 1052 verbose = content.get('verbose', False)
1050 1053 content = dict(status='ok')
1051 1054 for t in targets:
1052 1055 queue = self.queues[t]
1053 1056 completed = self.completed[t]
1054 1057 tasks = self.tasks[t]
1055 1058 if not verbose:
1056 1059 queue = len(queue)
1057 1060 completed = len(completed)
1058 1061 tasks = len(tasks)
1059 1062 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1060 1063 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1061 1064 # print (content)
1062 1065 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1063 1066
1064 1067 def purge_results(self, client_id, msg):
1065 1068 """Purge results from memory. This method is more valuable before we move
1066 1069 to a DB based message storage mechanism."""
1067 1070 content = msg['content']
1068 1071 self.log.info("Dropping records with %s", content)
1069 1072 msg_ids = content.get('msg_ids', [])
1070 1073 reply = dict(status='ok')
1071 1074 if msg_ids == 'all':
1072 1075 try:
1073 1076 self.db.drop_matching_records(dict(completed={'$ne':None}))
1074 1077 except Exception:
1075 1078 reply = error.wrap_exception()
1076 1079 else:
1077 1080 pending = filter(lambda m: m in self.pending, msg_ids)
1078 1081 if pending:
1079 1082 try:
1080 1083 raise IndexError("msg pending: %r"%pending[0])
1081 1084 except:
1082 1085 reply = error.wrap_exception()
1083 1086 else:
1084 1087 try:
1085 1088 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1086 1089 except Exception:
1087 1090 reply = error.wrap_exception()
1088 1091
1089 1092 if reply['status'] == 'ok':
1090 1093 eids = content.get('engine_ids', [])
1091 1094 for eid in eids:
1092 1095 if eid not in self.engines:
1093 1096 try:
1094 1097 raise IndexError("No such engine: %i"%eid)
1095 1098 except:
1096 1099 reply = error.wrap_exception()
1097 1100 break
1098 1101 uid = self.engines[eid].queue
1099 1102 try:
1100 1103 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1101 1104 except Exception:
1102 1105 reply = error.wrap_exception()
1103 1106 break
1104 1107
1105 1108 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1106 1109
1107 1110 def resubmit_task(self, client_id, msg):
1108 1111 """Resubmit one or more tasks."""
1109 1112 def finish(reply):
1110 1113 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1111 1114
1112 1115 content = msg['content']
1113 1116 msg_ids = content['msg_ids']
1114 1117 reply = dict(status='ok')
1115 1118 try:
1116 1119 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1117 1120 'header', 'content', 'buffers'])
1118 1121 except Exception:
1119 1122 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1120 1123 return finish(error.wrap_exception())
1121 1124
1122 1125 # validate msg_ids
1123 1126 found_ids = [ rec['msg_id'] for rec in records ]
1124 1127 invalid_ids = filter(lambda m: m in self.pending, found_ids)
1125 1128 if len(records) > len(msg_ids):
1126 1129 try:
1127 1130 raise RuntimeError("DB appears to be in an inconsistent state."
1128 1131 "More matching records were found than should exist")
1129 1132 except Exception:
1130 1133 return finish(error.wrap_exception())
1131 1134 elif len(records) < len(msg_ids):
1132 1135 missing = [ m for m in msg_ids if m not in found_ids ]
1133 1136 try:
1134 1137 raise KeyError("No such msg(s): %r"%missing)
1135 1138 except KeyError:
1136 1139 return finish(error.wrap_exception())
1137 1140 elif invalid_ids:
1138 1141 msg_id = invalid_ids[0]
1139 1142 try:
1140 raise ValueError("Task %r appears to be inflight"%(msg_id))
1143 raise ValueError("Task %r appears to be inflight" % msg_id)
1141 1144 except Exception:
1142 1145 return finish(error.wrap_exception())
1143 1146
1144 1147 # clear the existing records
1145 1148 now = datetime.now()
1146 1149 rec = empty_record()
1147 1150 map(rec.pop, ['msg_id', 'header', 'content', 'buffers', 'submitted'])
1148 1151 rec['resubmitted'] = now
1149 1152 rec['queue'] = 'task'
1150 1153 rec['client_uuid'] = client_id[0]
1151 1154 try:
1152 1155 for msg_id in msg_ids:
1153 1156 self.all_completed.discard(msg_id)
1154 1157 self.db.update_record(msg_id, rec)
1155 1158 except Exception:
1156 1159 self.log.error('db::db error upating record', exc_info=True)
1157 1160 reply = error.wrap_exception()
1158 1161 else:
1159 1162 # send the messages
1160 1163 for rec in records:
1161 1164 header = rec['header']
1162 1165 # include resubmitted in header to prevent digest collision
1163 1166 header['resubmitted'] = now
1164 1167 msg = self.session.msg(header['msg_type'])
1165 1168 msg['content'] = rec['content']
1166 1169 msg['header'] = header
1167 1170 msg['header']['msg_id'] = rec['msg_id']
1168 1171 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1169 1172
1170 1173 finish(dict(status='ok'))
1171 1174
1172 1175
1173 1176 def _extract_record(self, rec):
1174 1177 """decompose a TaskRecord dict into subsection of reply for get_result"""
1175 1178 io_dict = {}
1176 1179 for key in 'pyin pyout pyerr stdout stderr'.split():
1177 1180 io_dict[key] = rec[key]
1178 1181 content = { 'result_content': rec['result_content'],
1179 1182 'header': rec['header'],
1180 1183 'result_header' : rec['result_header'],
1181 1184 'io' : io_dict,
1182 1185 }
1183 1186 if rec['result_buffers']:
1184 1187 buffers = map(bytes, rec['result_buffers'])
1185 1188 else:
1186 1189 buffers = []
1187 1190
1188 1191 return content, buffers
1189 1192
1190 1193 def get_results(self, client_id, msg):
1191 1194 """Get the result of 1 or more messages."""
1192 1195 content = msg['content']
1193 1196 msg_ids = sorted(set(content['msg_ids']))
1194 1197 statusonly = content.get('status_only', False)
1195 1198 pending = []
1196 1199 completed = []
1197 1200 content = dict(status='ok')
1198 1201 content['pending'] = pending
1199 1202 content['completed'] = completed
1200 1203 buffers = []
1201 1204 if not statusonly:
1202 1205 try:
1203 1206 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1204 1207 # turn match list into dict, for faster lookup
1205 1208 records = {}
1206 1209 for rec in matches:
1207 1210 records[rec['msg_id']] = rec
1208 1211 except Exception:
1209 1212 content = error.wrap_exception()
1210 1213 self.session.send(self.query, "result_reply", content=content,
1211 1214 parent=msg, ident=client_id)
1212 1215 return
1213 1216 else:
1214 1217 records = {}
1215 1218 for msg_id in msg_ids:
1216 1219 if msg_id in self.pending:
1217 1220 pending.append(msg_id)
1218 1221 elif msg_id in self.all_completed:
1219 1222 completed.append(msg_id)
1220 1223 if not statusonly:
1221 1224 c,bufs = self._extract_record(records[msg_id])
1222 1225 content[msg_id] = c
1223 1226 buffers.extend(bufs)
1224 1227 elif msg_id in records:
1225 1228 if rec['completed']:
1226 1229 completed.append(msg_id)
1227 1230 c,bufs = self._extract_record(records[msg_id])
1228 1231 content[msg_id] = c
1229 1232 buffers.extend(bufs)
1230 1233 else:
1231 1234 pending.append(msg_id)
1232 1235 else:
1233 1236 try:
1234 1237 raise KeyError('No such message: '+msg_id)
1235 1238 except:
1236 1239 content = error.wrap_exception()
1237 1240 break
1238 1241 self.session.send(self.query, "result_reply", content=content,
1239 1242 parent=msg, ident=client_id,
1240 1243 buffers=buffers)
1241 1244
1242 1245 def get_history(self, client_id, msg):
1243 1246 """Get a list of all msg_ids in our DB records"""
1244 1247 try:
1245 1248 msg_ids = self.db.get_history()
1246 1249 except Exception as e:
1247 1250 content = error.wrap_exception()
1248 1251 else:
1249 1252 content = dict(status='ok', history=msg_ids)
1250 1253
1251 1254 self.session.send(self.query, "history_reply", content=content,
1252 1255 parent=msg, ident=client_id)
1253 1256
1254 1257 def db_query(self, client_id, msg):
1255 1258 """Perform a raw query on the task record database."""
1256 1259 content = msg['content']
1257 1260 query = content.get('query', {})
1258 1261 keys = content.get('keys', None)
1259 1262 buffers = []
1260 1263 empty = list()
1261 1264 try:
1262 1265 records = self.db.find_records(query, keys)
1263 1266 except Exception as e:
1264 1267 content = error.wrap_exception()
1265 1268 else:
1266 1269 # extract buffers from reply content:
1267 1270 if keys is not None:
1268 1271 buffer_lens = [] if 'buffers' in keys else None
1269 1272 result_buffer_lens = [] if 'result_buffers' in keys else None
1270 1273 else:
1271 1274 buffer_lens = []
1272 1275 result_buffer_lens = []
1273 1276
1274 1277 for rec in records:
1275 1278 # buffers may be None, so double check
1276 1279 if buffer_lens is not None:
1277 1280 b = rec.pop('buffers', empty) or empty
1278 1281 buffer_lens.append(len(b))
1279 1282 buffers.extend(b)
1280 1283 if result_buffer_lens is not None:
1281 1284 rb = rec.pop('result_buffers', empty) or empty
1282 1285 result_buffer_lens.append(len(rb))
1283 1286 buffers.extend(rb)
1284 1287 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1285 1288 result_buffer_lens=result_buffer_lens)
1286 1289 # self.log.debug (content)
1287 1290 self.session.send(self.query, "db_reply", content=content,
1288 1291 parent=msg, ident=client_id,
1289 1292 buffers=buffers)
1290 1293
@@ -1,716 +1,716
1 1 """The Python scheduler for rich scheduling.
2 2
3 3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
4 4 nor does it check msg_id DAG dependencies. For those, a slightly slower
5 5 Python Scheduler exists.
6 6
7 7 Authors:
8 8
9 9 * Min RK
10 10 """
11 11 #-----------------------------------------------------------------------------
12 12 # Copyright (C) 2010-2011 The IPython Development Team
13 13 #
14 14 # Distributed under the terms of the BSD License. The full license is in
15 15 # the file COPYING, distributed as part of this software.
16 16 #-----------------------------------------------------------------------------
17 17
18 18 #----------------------------------------------------------------------
19 19 # Imports
20 20 #----------------------------------------------------------------------
21 21
22 22 from __future__ import print_function
23 23
24 24 import logging
25 25 import sys
26 26
27 27 from datetime import datetime, timedelta
28 28 from random import randint, random
29 29 from types import FunctionType
30 30
31 31 try:
32 32 import numpy
33 33 except ImportError:
34 34 numpy = None
35 35
36 36 import zmq
37 37 from zmq.eventloop import ioloop, zmqstream
38 38
39 39 # local imports
40 40 from IPython.external.decorator import decorator
41 41 from IPython.config.application import Application
42 42 from IPython.config.loader import Config
43 43 from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes
44 44
45 45 from IPython.parallel import error
46 46 from IPython.parallel.factory import SessionFactory
47 47 from IPython.parallel.util import connect_logger, local_logger, asbytes
48 48
49 49 from .dependency import Dependency
50 50
51 51 @decorator
52 52 def logged(f,self,*args,**kwargs):
53 53 # print ("#--------------------")
54 54 self.log.debug("scheduler::%s(*%s,**%s)", f.func_name, args, kwargs)
55 55 # print ("#--")
56 56 return f(self,*args, **kwargs)
57 57
58 58 #----------------------------------------------------------------------
59 59 # Chooser functions
60 60 #----------------------------------------------------------------------
61 61
62 62 def plainrandom(loads):
63 63 """Plain random pick."""
64 64 n = len(loads)
65 65 return randint(0,n-1)
66 66
67 67 def lru(loads):
68 68 """Always pick the front of the line.
69 69
70 70 The content of `loads` is ignored.
71 71
72 72 Assumes LRU ordering of loads, with oldest first.
73 73 """
74 74 return 0
75 75
76 76 def twobin(loads):
77 77 """Pick two at random, use the LRU of the two.
78 78
79 79 The content of loads is ignored.
80 80
81 81 Assumes LRU ordering of loads, with oldest first.
82 82 """
83 83 n = len(loads)
84 84 a = randint(0,n-1)
85 85 b = randint(0,n-1)
86 86 return min(a,b)
87 87
88 88 def weighted(loads):
89 89 """Pick two at random using inverse load as weight.
90 90
91 91 Return the less loaded of the two.
92 92 """
93 93 # weight 0 a million times more than 1:
94 94 weights = 1./(1e-6+numpy.array(loads))
95 95 sums = weights.cumsum()
96 96 t = sums[-1]
97 97 x = random()*t
98 98 y = random()*t
99 99 idx = 0
100 100 idy = 0
101 101 while sums[idx] < x:
102 102 idx += 1
103 103 while sums[idy] < y:
104 104 idy += 1
105 105 if weights[idy] > weights[idx]:
106 106 return idy
107 107 else:
108 108 return idx
109 109
110 110 def leastload(loads):
111 111 """Always choose the lowest load.
112 112
113 113 If the lowest load occurs more than once, the first
114 114 occurance will be used. If loads has LRU ordering, this means
115 115 the LRU of those with the lowest load is chosen.
116 116 """
117 117 return loads.index(min(loads))
118 118
119 119 #---------------------------------------------------------------------
120 120 # Classes
121 121 #---------------------------------------------------------------------
122 122 # store empty default dependency:
123 123 MET = Dependency([])
124 124
125 125 class TaskScheduler(SessionFactory):
126 126 """Python TaskScheduler object.
127 127
128 128 This is the simplest object that supports msg_id based
129 129 DAG dependencies. *Only* task msg_ids are checked, not
130 130 msg_ids of jobs submitted via the MUX queue.
131 131
132 132 """
133 133
134 134 hwm = Integer(0, config=True, shortname='hwm',
135 135 help="""specify the High Water Mark (HWM) for the downstream
136 136 socket in the Task scheduler. This is the maximum number
137 137 of allowed outstanding tasks on each engine."""
138 138 )
139 139 scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'),
140 140 'leastload', config=True, shortname='scheme', allow_none=False,
141 141 help="""select the task scheduler scheme [default: Python LRU]
142 142 Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'"""
143 143 )
144 144 def _scheme_name_changed(self, old, new):
145 145 self.log.debug("Using scheme %r"%new)
146 146 self.scheme = globals()[new]
147 147
148 148 # input arguments:
149 149 scheme = Instance(FunctionType) # function for determining the destination
150 150 def _scheme_default(self):
151 151 return leastload
152 152 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
153 153 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
154 154 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
155 155 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
156 156
157 157 # internals:
158 158 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
159 159 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
160 160 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
161 161 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
162 162 pending = Dict() # dict by engine_uuid of submitted tasks
163 163 completed = Dict() # dict by engine_uuid of completed tasks
164 164 failed = Dict() # dict by engine_uuid of failed tasks
165 165 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
166 166 clients = Dict() # dict by msg_id for who submitted the task
167 167 targets = List() # list of target IDENTs
168 168 loads = List() # list of engine loads
169 169 # full = Set() # set of IDENTs that have HWM outstanding tasks
170 170 all_completed = Set() # set of all completed tasks
171 171 all_failed = Set() # set of all failed tasks
172 172 all_done = Set() # set of all finished tasks=union(completed,failed)
173 173 all_ids = Set() # set of all submitted task IDs
174 174 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
175 175 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
176 176
177 177 ident = CBytes() # ZMQ identity. This should just be self.session.session
178 178 # but ensure Bytes
179 179 def _ident_default(self):
180 180 return self.session.bsession
181 181
182 182 def start(self):
183 183 self.engine_stream.on_recv(self.dispatch_result, copy=False)
184 184 self._notification_handlers = dict(
185 185 registration_notification = self._register_engine,
186 186 unregistration_notification = self._unregister_engine
187 187 )
188 188 self.notifier_stream.on_recv(self.dispatch_notification)
189 189 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
190 190 self.auditor.start()
191 191 self.log.info("Scheduler started [%s]"%self.scheme_name)
192 192
193 193 def resume_receiving(self):
194 194 """Resume accepting jobs."""
195 195 self.client_stream.on_recv(self.dispatch_submission, copy=False)
196 196
197 197 def stop_receiving(self):
198 198 """Stop accepting jobs while there are no engines.
199 199 Leave them in the ZMQ queue."""
200 200 self.client_stream.on_recv(None)
201 201
202 202 #-----------------------------------------------------------------------
203 203 # [Un]Registration Handling
204 204 #-----------------------------------------------------------------------
205 205
206 206 def dispatch_notification(self, msg):
207 207 """dispatch register/unregister events."""
208 208 try:
209 209 idents,msg = self.session.feed_identities(msg)
210 210 except ValueError:
211 211 self.log.warn("task::Invalid Message: %r",msg)
212 212 return
213 213 try:
214 214 msg = self.session.unserialize(msg)
215 215 except ValueError:
216 216 self.log.warn("task::Unauthorized message from: %r"%idents)
217 217 return
218 218
219 219 msg_type = msg['header']['msg_type']
220 220
221 221 handler = self._notification_handlers.get(msg_type, None)
222 222 if handler is None:
223 223 self.log.error("Unhandled message type: %r"%msg_type)
224 224 else:
225 225 try:
226 226 handler(asbytes(msg['content']['queue']))
227 227 except Exception:
228 self.log.error("task::Invalid notification msg: %r",msg)
228 self.log.error("task::Invalid notification msg: %r", msg, exc_info=True)
229 229
230 230 def _register_engine(self, uid):
231 231 """New engine with ident `uid` became available."""
232 232 # head of the line:
233 233 self.targets.insert(0,uid)
234 234 self.loads.insert(0,0)
235 235
236 236 # initialize sets
237 237 self.completed[uid] = set()
238 238 self.failed[uid] = set()
239 239 self.pending[uid] = {}
240 240 if len(self.targets) == 1:
241 241 self.resume_receiving()
242 242 # rescan the graph:
243 243 self.update_graph(None)
244 244
245 245 def _unregister_engine(self, uid):
246 246 """Existing engine with ident `uid` became unavailable."""
247 247 if len(self.targets) == 1:
248 248 # this was our only engine
249 249 self.stop_receiving()
250 250
251 251 # handle any potentially finished tasks:
252 252 self.engine_stream.flush()
253 253
254 254 # don't pop destinations, because they might be used later
255 255 # map(self.destinations.pop, self.completed.pop(uid))
256 256 # map(self.destinations.pop, self.failed.pop(uid))
257 257
258 258 # prevent this engine from receiving work
259 259 idx = self.targets.index(uid)
260 260 self.targets.pop(idx)
261 261 self.loads.pop(idx)
262 262
263 263 # wait 5 seconds before cleaning up pending jobs, since the results might
264 264 # still be incoming
265 265 if self.pending[uid]:
266 266 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
267 267 dc.start()
268 268 else:
269 269 self.completed.pop(uid)
270 270 self.failed.pop(uid)
271 271
272 272
273 273 def handle_stranded_tasks(self, engine):
274 274 """Deal with jobs resident in an engine that died."""
275 275 lost = self.pending[engine]
276 276 for msg_id in lost.keys():
277 277 if msg_id not in self.pending[engine]:
278 278 # prevent double-handling of messages
279 279 continue
280 280
281 281 raw_msg = lost[msg_id][0]
282 282 idents,msg = self.session.feed_identities(raw_msg, copy=False)
283 283 parent = self.session.unpack(msg[1].bytes)
284 284 idents = [engine, idents[0]]
285 285
286 286 # build fake error reply
287 287 try:
288 288 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
289 289 except:
290 290 content = error.wrap_exception()
291 291 msg = self.session.msg('apply_reply', content, parent=parent, subheader={'status':'error'})
292 292 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
293 293 # and dispatch it
294 294 self.dispatch_result(raw_reply)
295 295
296 296 # finally scrub completed/failed lists
297 297 self.completed.pop(engine)
298 298 self.failed.pop(engine)
299 299
300 300
301 301 #-----------------------------------------------------------------------
302 302 # Job Submission
303 303 #-----------------------------------------------------------------------
304 304 def dispatch_submission(self, raw_msg):
305 305 """Dispatch job submission to appropriate handlers."""
306 306 # ensure targets up to date:
307 307 self.notifier_stream.flush()
308 308 try:
309 309 idents, msg = self.session.feed_identities(raw_msg, copy=False)
310 310 msg = self.session.unserialize(msg, content=False, copy=False)
311 311 except Exception:
312 312 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
313 313 return
314 314
315 315
316 316 # send to monitor
317 317 self.mon_stream.send_multipart([b'intask']+raw_msg, copy=False)
318 318
319 319 header = msg['header']
320 320 msg_id = header['msg_id']
321 321 self.all_ids.add(msg_id)
322 322
323 323 # get targets as a set of bytes objects
324 324 # from a list of unicode objects
325 325 targets = header.get('targets', [])
326 326 targets = map(asbytes, targets)
327 327 targets = set(targets)
328 328
329 329 retries = header.get('retries', 0)
330 330 self.retries[msg_id] = retries
331 331
332 332 # time dependencies
333 333 after = header.get('after', None)
334 334 if after:
335 335 after = Dependency(after)
336 336 if after.all:
337 337 if after.success:
338 338 after = Dependency(after.difference(self.all_completed),
339 339 success=after.success,
340 340 failure=after.failure,
341 341 all=after.all,
342 342 )
343 343 if after.failure:
344 344 after = Dependency(after.difference(self.all_failed),
345 345 success=after.success,
346 346 failure=after.failure,
347 347 all=after.all,
348 348 )
349 349 if after.check(self.all_completed, self.all_failed):
350 350 # recast as empty set, if `after` already met,
351 351 # to prevent unnecessary set comparisons
352 352 after = MET
353 353 else:
354 354 after = MET
355 355
356 356 # location dependencies
357 357 follow = Dependency(header.get('follow', []))
358 358
359 359 # turn timeouts into datetime objects:
360 360 timeout = header.get('timeout', None)
361 361 if timeout:
362 362 # cast to float, because jsonlib returns floats as decimal.Decimal,
363 363 # which timedelta does not accept
364 364 timeout = datetime.now() + timedelta(0,float(timeout),0)
365 365
366 366 args = [raw_msg, targets, after, follow, timeout]
367 367
368 368 # validate and reduce dependencies:
369 369 for dep in after,follow:
370 370 if not dep: # empty dependency
371 371 continue
372 372 # check valid:
373 373 if msg_id in dep or dep.difference(self.all_ids):
374 374 self.depending[msg_id] = args
375 375 return self.fail_unreachable(msg_id, error.InvalidDependency)
376 376 # check if unreachable:
377 377 if dep.unreachable(self.all_completed, self.all_failed):
378 378 self.depending[msg_id] = args
379 379 return self.fail_unreachable(msg_id)
380 380
381 381 if after.check(self.all_completed, self.all_failed):
382 382 # time deps already met, try to run
383 383 if not self.maybe_run(msg_id, *args):
384 384 # can't run yet
385 385 if msg_id not in self.all_failed:
386 386 # could have failed as unreachable
387 387 self.save_unmet(msg_id, *args)
388 388 else:
389 389 self.save_unmet(msg_id, *args)
390 390
391 391 def audit_timeouts(self):
392 392 """Audit all waiting tasks for expired timeouts."""
393 393 now = datetime.now()
394 394 for msg_id in self.depending.keys():
395 395 # must recheck, in case one failure cascaded to another:
396 396 if msg_id in self.depending:
397 397 raw,after,targets,follow,timeout = self.depending[msg_id]
398 398 if timeout and timeout < now:
399 399 self.fail_unreachable(msg_id, error.TaskTimeout)
400 400
401 401 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
402 402 """a task has become unreachable, send a reply with an ImpossibleDependency
403 403 error."""
404 404 if msg_id not in self.depending:
405 405 self.log.error("msg %r already failed!", msg_id)
406 406 return
407 407 raw_msg,targets,after,follow,timeout = self.depending.pop(msg_id)
408 408 for mid in follow.union(after):
409 409 if mid in self.graph:
410 410 self.graph[mid].remove(msg_id)
411 411
412 412 # FIXME: unpacking a message I've already unpacked, but didn't save:
413 413 idents,msg = self.session.feed_identities(raw_msg, copy=False)
414 414 header = self.session.unpack(msg[1].bytes)
415 415
416 416 try:
417 417 raise why()
418 418 except:
419 419 content = error.wrap_exception()
420 420
421 421 self.all_done.add(msg_id)
422 422 self.all_failed.add(msg_id)
423 423
424 424 msg = self.session.send(self.client_stream, 'apply_reply', content,
425 425 parent=header, ident=idents)
426 426 self.session.send(self.mon_stream, msg, ident=[b'outtask']+idents)
427 427
428 428 self.update_graph(msg_id, success=False)
429 429
430 430 def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout):
431 431 """check location dependencies, and run if they are met."""
432 432 blacklist = self.blacklist.setdefault(msg_id, set())
433 433 if follow or targets or blacklist or self.hwm:
434 434 # we need a can_run filter
435 435 def can_run(idx):
436 436 # check hwm
437 437 if self.hwm and self.loads[idx] == self.hwm:
438 438 return False
439 439 target = self.targets[idx]
440 440 # check blacklist
441 441 if target in blacklist:
442 442 return False
443 443 # check targets
444 444 if targets and target not in targets:
445 445 return False
446 446 # check follow
447 447 return follow.check(self.completed[target], self.failed[target])
448 448
449 449 indices = filter(can_run, range(len(self.targets)))
450 450
451 451 if not indices:
452 452 # couldn't run
453 453 if follow.all:
454 454 # check follow for impossibility
455 455 dests = set()
456 456 relevant = set()
457 457 if follow.success:
458 458 relevant = self.all_completed
459 459 if follow.failure:
460 460 relevant = relevant.union(self.all_failed)
461 461 for m in follow.intersection(relevant):
462 462 dests.add(self.destinations[m])
463 463 if len(dests) > 1:
464 464 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
465 465 self.fail_unreachable(msg_id)
466 466 return False
467 467 if targets:
468 468 # check blacklist+targets for impossibility
469 469 targets.difference_update(blacklist)
470 470 if not targets or not targets.intersection(self.targets):
471 471 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
472 472 self.fail_unreachable(msg_id)
473 473 return False
474 474 return False
475 475 else:
476 476 indices = None
477 477
478 478 self.submit_task(msg_id, raw_msg, targets, follow, timeout, indices)
479 479 return True
480 480
481 481 def save_unmet(self, msg_id, raw_msg, targets, after, follow, timeout):
482 482 """Save a message for later submission when its dependencies are met."""
483 483 self.depending[msg_id] = [raw_msg,targets,after,follow,timeout]
484 484 # track the ids in follow or after, but not those already finished
485 485 for dep_id in after.union(follow).difference(self.all_done):
486 486 if dep_id not in self.graph:
487 487 self.graph[dep_id] = set()
488 488 self.graph[dep_id].add(msg_id)
489 489
490 490 def submit_task(self, msg_id, raw_msg, targets, follow, timeout, indices=None):
491 491 """Submit a task to any of a subset of our targets."""
492 492 if indices:
493 493 loads = [self.loads[i] for i in indices]
494 494 else:
495 495 loads = self.loads
496 496 idx = self.scheme(loads)
497 497 if indices:
498 498 idx = indices[idx]
499 499 target = self.targets[idx]
500 500 # print (target, map(str, msg[:3]))
501 501 # send job to the engine
502 502 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
503 503 self.engine_stream.send_multipart(raw_msg, copy=False)
504 504 # update load
505 505 self.add_job(idx)
506 506 self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout)
507 507 # notify Hub
508 508 content = dict(msg_id=msg_id, engine_id=target.decode('ascii'))
509 509 self.session.send(self.mon_stream, 'task_destination', content=content,
510 510 ident=[b'tracktask',self.ident])
511 511
512 512
513 513 #-----------------------------------------------------------------------
514 514 # Result Handling
515 515 #-----------------------------------------------------------------------
516 516 def dispatch_result(self, raw_msg):
517 517 """dispatch method for result replies"""
518 518 try:
519 519 idents,msg = self.session.feed_identities(raw_msg, copy=False)
520 520 msg = self.session.unserialize(msg, content=False, copy=False)
521 521 engine = idents[0]
522 522 try:
523 523 idx = self.targets.index(engine)
524 524 except ValueError:
525 525 pass # skip load-update for dead engines
526 526 else:
527 527 self.finish_job(idx)
528 528 except Exception:
529 529 self.log.error("task::Invaid result: %r", raw_msg, exc_info=True)
530 530 return
531 531
532 532 header = msg['header']
533 533 parent = msg['parent_header']
534 534 if header.get('dependencies_met', True):
535 535 success = (header['status'] == 'ok')
536 536 msg_id = parent['msg_id']
537 537 retries = self.retries[msg_id]
538 538 if not success and retries > 0:
539 539 # failed
540 540 self.retries[msg_id] = retries - 1
541 541 self.handle_unmet_dependency(idents, parent)
542 542 else:
543 543 del self.retries[msg_id]
544 544 # relay to client and update graph
545 545 self.handle_result(idents, parent, raw_msg, success)
546 546 # send to Hub monitor
547 547 self.mon_stream.send_multipart([b'outtask']+raw_msg, copy=False)
548 548 else:
549 549 self.handle_unmet_dependency(idents, parent)
550 550
551 551 def handle_result(self, idents, parent, raw_msg, success=True):
552 552 """handle a real task result, either success or failure"""
553 553 # first, relay result to client
554 554 engine = idents[0]
555 555 client = idents[1]
556 556 # swap_ids for XREP-XREP mirror
557 557 raw_msg[:2] = [client,engine]
558 558 # print (map(str, raw_msg[:4]))
559 559 self.client_stream.send_multipart(raw_msg, copy=False)
560 560 # now, update our data structures
561 561 msg_id = parent['msg_id']
562 562 self.blacklist.pop(msg_id, None)
563 563 self.pending[engine].pop(msg_id)
564 564 if success:
565 565 self.completed[engine].add(msg_id)
566 566 self.all_completed.add(msg_id)
567 567 else:
568 568 self.failed[engine].add(msg_id)
569 569 self.all_failed.add(msg_id)
570 570 self.all_done.add(msg_id)
571 571 self.destinations[msg_id] = engine
572 572
573 573 self.update_graph(msg_id, success)
574 574
575 575 def handle_unmet_dependency(self, idents, parent):
576 576 """handle an unmet dependency"""
577 577 engine = idents[0]
578 578 msg_id = parent['msg_id']
579 579
580 580 if msg_id not in self.blacklist:
581 581 self.blacklist[msg_id] = set()
582 582 self.blacklist[msg_id].add(engine)
583 583
584 584 args = self.pending[engine].pop(msg_id)
585 585 raw,targets,after,follow,timeout = args
586 586
587 587 if self.blacklist[msg_id] == targets:
588 588 self.depending[msg_id] = args
589 589 self.fail_unreachable(msg_id)
590 590 elif not self.maybe_run(msg_id, *args):
591 591 # resubmit failed
592 592 if msg_id not in self.all_failed:
593 593 # put it back in our dependency tree
594 594 self.save_unmet(msg_id, *args)
595 595
596 596 if self.hwm:
597 597 try:
598 598 idx = self.targets.index(engine)
599 599 except ValueError:
600 600 pass # skip load-update for dead engines
601 601 else:
602 602 if self.loads[idx] == self.hwm-1:
603 603 self.update_graph(None)
604 604
605 605
606 606
607 607 def update_graph(self, dep_id=None, success=True):
608 608 """dep_id just finished. Update our dependency
609 609 graph and submit any jobs that just became runable.
610 610
611 611 Called with dep_id=None to update entire graph for hwm, but without finishing
612 612 a task.
613 613 """
614 614 # print ("\n\n***********")
615 615 # pprint (dep_id)
616 616 # pprint (self.graph)
617 617 # pprint (self.depending)
618 618 # pprint (self.all_completed)
619 619 # pprint (self.all_failed)
620 620 # print ("\n\n***********\n\n")
621 621 # update any jobs that depended on the dependency
622 622 jobs = self.graph.pop(dep_id, [])
623 623
624 624 # recheck *all* jobs if
625 625 # a) we have HWM and an engine just become no longer full
626 626 # or b) dep_id was given as None
627 627 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
628 628 jobs = self.depending.keys()
629 629
630 630 for msg_id in jobs:
631 631 raw_msg, targets, after, follow, timeout = self.depending[msg_id]
632 632
633 633 if after.unreachable(self.all_completed, self.all_failed)\
634 634 or follow.unreachable(self.all_completed, self.all_failed):
635 635 self.fail_unreachable(msg_id)
636 636
637 637 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
638 638 if self.maybe_run(msg_id, raw_msg, targets, MET, follow, timeout):
639 639
640 640 self.depending.pop(msg_id)
641 641 for mid in follow.union(after):
642 642 if mid in self.graph:
643 643 self.graph[mid].remove(msg_id)
644 644
645 645 #----------------------------------------------------------------------
646 646 # methods to be overridden by subclasses
647 647 #----------------------------------------------------------------------
648 648
649 649 def add_job(self, idx):
650 650 """Called after self.targets[idx] just got the job with header.
651 651 Override with subclasses. The default ordering is simple LRU.
652 652 The default loads are the number of outstanding jobs."""
653 653 self.loads[idx] += 1
654 654 for lis in (self.targets, self.loads):
655 655 lis.append(lis.pop(idx))
656 656
657 657
658 658 def finish_job(self, idx):
659 659 """Called after self.targets[idx] just finished a job.
660 660 Override with subclasses."""
661 661 self.loads[idx] -= 1
662 662
663 663
664 664
665 665 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,
666 666 logname='root', log_url=None, loglevel=logging.DEBUG,
667 667 identity=b'task', in_thread=False):
668 668
669 669 ZMQStream = zmqstream.ZMQStream
670 670
671 671 if config:
672 672 # unwrap dict back into Config
673 673 config = Config(config)
674 674
675 675 if in_thread:
676 676 # use instance() to get the same Context/Loop as our parent
677 677 ctx = zmq.Context.instance()
678 678 loop = ioloop.IOLoop.instance()
679 679 else:
680 680 # in a process, don't use instance()
681 681 # for safety with multiprocessing
682 682 ctx = zmq.Context()
683 683 loop = ioloop.IOLoop()
684 684 ins = ZMQStream(ctx.socket(zmq.ROUTER),loop)
685 685 ins.setsockopt(zmq.IDENTITY, identity)
686 686 ins.bind(in_addr)
687 687
688 688 outs = ZMQStream(ctx.socket(zmq.ROUTER),loop)
689 689 outs.setsockopt(zmq.IDENTITY, identity)
690 690 outs.bind(out_addr)
691 691 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
692 692 mons.connect(mon_addr)
693 693 nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop)
694 694 nots.setsockopt(zmq.SUBSCRIBE, b'')
695 695 nots.connect(not_addr)
696 696
697 697 # setup logging.
698 698 if in_thread:
699 699 log = Application.instance().log
700 700 else:
701 701 if log_url:
702 702 log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
703 703 else:
704 704 log = local_logger(logname, loglevel)
705 705
706 706 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
707 707 mon_stream=mons, notifier_stream=nots,
708 708 loop=loop, log=log,
709 709 config=config)
710 710 scheduler.start()
711 711 if not in_thread:
712 712 try:
713 713 loop.start()
714 714 except KeyboardInterrupt:
715 715 print ("interrupted, exiting...", file=sys.__stderr__)
716 716
General Comments 0
You need to be logged in to leave comments. Login now