##// END OF EJS Templates
ignore data_pub in Hub
MinRK -
Show More
@@ -1,1412 +1,1414 b''
1 1 """The IPython Controller Hub with 0MQ
2 2 This is the master object that handles connections from engines and clients,
3 3 and monitors traffic through the various queues.
4 4
5 5 Authors:
6 6
7 7 * Min RK
8 8 """
9 9 #-----------------------------------------------------------------------------
10 10 # Copyright (C) 2010-2011 The IPython Development Team
11 11 #
12 12 # Distributed under the terms of the BSD License. The full license is in
13 13 # the file COPYING, distributed as part of this software.
14 14 #-----------------------------------------------------------------------------
15 15
16 16 #-----------------------------------------------------------------------------
17 17 # Imports
18 18 #-----------------------------------------------------------------------------
19 19 from __future__ import print_function
20 20
21 21 import json
22 22 import os
23 23 import sys
24 24 import time
25 25 from datetime import datetime
26 26
27 27 import zmq
28 28 from zmq.eventloop import ioloop
29 29 from zmq.eventloop.zmqstream import ZMQStream
30 30
31 31 # internal:
32 32 from IPython.utils.importstring import import_item
33 33 from IPython.utils.py3compat import cast_bytes
34 34 from IPython.utils.traitlets import (
35 35 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
36 36 )
37 37
38 38 from IPython.parallel import error, util
39 39 from IPython.parallel.factory import RegistrationFactory
40 40
41 41 from IPython.zmq.session import SessionFactory
42 42
43 43 from .heartmonitor import HeartMonitor
44 44
45 45 #-----------------------------------------------------------------------------
46 46 # Code
47 47 #-----------------------------------------------------------------------------
48 48
49 49 def _passer(*args, **kwargs):
50 50 return
51 51
52 52 def _printer(*args, **kwargs):
53 53 print (args)
54 54 print (kwargs)
55 55
56 56 def empty_record():
57 57 """Return an empty dict with all record keys."""
58 58 return {
59 59 'msg_id' : None,
60 60 'header' : None,
61 61 'metadata' : None,
62 62 'content': None,
63 63 'buffers': None,
64 64 'submitted': None,
65 65 'client_uuid' : None,
66 66 'engine_uuid' : None,
67 67 'started': None,
68 68 'completed': None,
69 69 'resubmitted': None,
70 70 'received': None,
71 71 'result_header' : None,
72 72 'result_metadata' : None,
73 73 'result_content' : None,
74 74 'result_buffers' : None,
75 75 'queue' : None,
76 76 'pyin' : None,
77 77 'pyout': None,
78 78 'pyerr': None,
79 79 'stdout': '',
80 80 'stderr': '',
81 81 }
82 82
83 83 def init_record(msg):
84 84 """Initialize a TaskRecord based on a request."""
85 85 header = msg['header']
86 86 return {
87 87 'msg_id' : header['msg_id'],
88 88 'header' : header,
89 89 'content': msg['content'],
90 90 'metadata': msg['metadata'],
91 91 'buffers': msg['buffers'],
92 92 'submitted': header['date'],
93 93 'client_uuid' : None,
94 94 'engine_uuid' : None,
95 95 'started': None,
96 96 'completed': None,
97 97 'resubmitted': None,
98 98 'received': None,
99 99 'result_header' : None,
100 100 'result_metadata': None,
101 101 'result_content' : None,
102 102 'result_buffers' : None,
103 103 'queue' : None,
104 104 'pyin' : None,
105 105 'pyout': None,
106 106 'pyerr': None,
107 107 'stdout': '',
108 108 'stderr': '',
109 109 }
110 110
111 111
112 112 class EngineConnector(HasTraits):
113 113 """A simple object for accessing the various zmq connections of an object.
114 114 Attributes are:
115 115 id (int): engine ID
116 116 uuid (unicode): engine UUID
117 117 pending: set of msg_ids
118 118 stallback: DelayedCallback for stalled registration
119 119 """
120 120
121 121 id = Integer(0)
122 122 uuid = Unicode()
123 123 pending = Set()
124 124 stallback = Instance(ioloop.DelayedCallback)
125 125
126 126
127 127 _db_shortcuts = {
128 128 'sqlitedb' : 'IPython.parallel.controller.sqlitedb.SQLiteDB',
129 129 'mongodb' : 'IPython.parallel.controller.mongodb.MongoDB',
130 130 'dictdb' : 'IPython.parallel.controller.dictdb.DictDB',
131 131 'nodb' : 'IPython.parallel.controller.dictdb.NoDB',
132 132 }
133 133
134 134 class HubFactory(RegistrationFactory):
135 135 """The Configurable for setting up a Hub."""
136 136
137 137 # port-pairs for monitoredqueues:
138 138 hb = Tuple(Integer,Integer,config=True,
139 139 help="""PUB/ROUTER Port pair for Engine heartbeats""")
140 140 def _hb_default(self):
141 141 return tuple(util.select_random_ports(2))
142 142
143 143 mux = Tuple(Integer,Integer,config=True,
144 144 help="""Client/Engine Port pair for MUX queue""")
145 145
146 146 def _mux_default(self):
147 147 return tuple(util.select_random_ports(2))
148 148
149 149 task = Tuple(Integer,Integer,config=True,
150 150 help="""Client/Engine Port pair for Task queue""")
151 151 def _task_default(self):
152 152 return tuple(util.select_random_ports(2))
153 153
154 154 control = Tuple(Integer,Integer,config=True,
155 155 help="""Client/Engine Port pair for Control queue""")
156 156
157 157 def _control_default(self):
158 158 return tuple(util.select_random_ports(2))
159 159
160 160 iopub = Tuple(Integer,Integer,config=True,
161 161 help="""Client/Engine Port pair for IOPub relay""")
162 162
163 163 def _iopub_default(self):
164 164 return tuple(util.select_random_ports(2))
165 165
166 166 # single ports:
167 167 mon_port = Integer(config=True,
168 168 help="""Monitor (SUB) port for queue traffic""")
169 169
170 170 def _mon_port_default(self):
171 171 return util.select_random_ports(1)[0]
172 172
173 173 notifier_port = Integer(config=True,
174 174 help="""PUB port for sending engine status notifications""")
175 175
176 176 def _notifier_port_default(self):
177 177 return util.select_random_ports(1)[0]
178 178
179 179 engine_ip = Unicode('127.0.0.1', config=True,
180 180 help="IP on which to listen for engine connections. [default: loopback]")
181 181 engine_transport = Unicode('tcp', config=True,
182 182 help="0MQ transport for engine connections. [default: tcp]")
183 183
184 184 client_ip = Unicode('127.0.0.1', config=True,
185 185 help="IP on which to listen for client connections. [default: loopback]")
186 186 client_transport = Unicode('tcp', config=True,
187 187 help="0MQ transport for client connections. [default : tcp]")
188 188
189 189 monitor_ip = Unicode('127.0.0.1', config=True,
190 190 help="IP on which to listen for monitor messages. [default: loopback]")
191 191 monitor_transport = Unicode('tcp', config=True,
192 192 help="0MQ transport for monitor messages. [default : tcp]")
193 193
194 194 monitor_url = Unicode('')
195 195
196 196 db_class = DottedObjectName('NoDB',
197 197 config=True, help="""The class to use for the DB backend
198 198
199 199 Options include:
200 200
201 201 SQLiteDB: SQLite
202 202 MongoDB : use MongoDB
203 203 DictDB : in-memory storage (fastest, but be mindful of memory growth of the Hub)
204 204 NoDB : disable database altogether (default)
205 205
206 206 """)
207 207
208 208 # not configurable
209 209 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
210 210 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
211 211
212 212 def _ip_changed(self, name, old, new):
213 213 self.engine_ip = new
214 214 self.client_ip = new
215 215 self.monitor_ip = new
216 216 self._update_monitor_url()
217 217
218 218 def _update_monitor_url(self):
219 219 self.monitor_url = "%s://%s:%i" % (self.monitor_transport, self.monitor_ip, self.mon_port)
220 220
221 221 def _transport_changed(self, name, old, new):
222 222 self.engine_transport = new
223 223 self.client_transport = new
224 224 self.monitor_transport = new
225 225 self._update_monitor_url()
226 226
227 227 def __init__(self, **kwargs):
228 228 super(HubFactory, self).__init__(**kwargs)
229 229 self._update_monitor_url()
230 230
231 231
232 232 def construct(self):
233 233 self.init_hub()
234 234
235 235 def start(self):
236 236 self.heartmonitor.start()
237 237 self.log.info("Heartmonitor started")
238 238
239 239 def client_url(self, channel):
240 240 """return full zmq url for a named client channel"""
241 241 return "%s://%s:%i" % (self.client_transport, self.client_ip, self.client_info[channel])
242 242
243 243 def engine_url(self, channel):
244 244 """return full zmq url for a named engine channel"""
245 245 return "%s://%s:%i" % (self.engine_transport, self.engine_ip, self.engine_info[channel])
246 246
247 247 def init_hub(self):
248 248 """construct Hub object"""
249 249
250 250 ctx = self.context
251 251 loop = self.loop
252 252
253 253 try:
254 254 scheme = self.config.TaskScheduler.scheme_name
255 255 except AttributeError:
256 256 from .scheduler import TaskScheduler
257 257 scheme = TaskScheduler.scheme_name.get_default_value()
258 258
259 259 # build connection dicts
260 260 engine = self.engine_info = {
261 261 'interface' : "%s://%s" % (self.engine_transport, self.engine_ip),
262 262 'registration' : self.regport,
263 263 'control' : self.control[1],
264 264 'mux' : self.mux[1],
265 265 'hb_ping' : self.hb[0],
266 266 'hb_pong' : self.hb[1],
267 267 'task' : self.task[1],
268 268 'iopub' : self.iopub[1],
269 269 }
270 270
271 271 client = self.client_info = {
272 272 'interface' : "%s://%s" % (self.client_transport, self.client_ip),
273 273 'registration' : self.regport,
274 274 'control' : self.control[0],
275 275 'mux' : self.mux[0],
276 276 'task' : self.task[0],
277 277 'task_scheme' : scheme,
278 278 'iopub' : self.iopub[0],
279 279 'notification' : self.notifier_port,
280 280 }
281 281
282 282 self.log.debug("Hub engine addrs: %s", self.engine_info)
283 283 self.log.debug("Hub client addrs: %s", self.client_info)
284 284
285 285 # Registrar socket
286 286 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
287 287 q.bind(self.client_url('registration'))
288 288 self.log.info("Hub listening on %s for registration.", self.client_url('registration'))
289 289 if self.client_ip != self.engine_ip:
290 290 q.bind(self.engine_url('registration'))
291 291 self.log.info("Hub listening on %s for registration.", self.engine_url('registration'))
292 292
293 293 ### Engine connections ###
294 294
295 295 # heartbeat
296 296 hpub = ctx.socket(zmq.PUB)
297 297 hpub.bind(self.engine_url('hb_ping'))
298 298 hrep = ctx.socket(zmq.ROUTER)
299 299 hrep.bind(self.engine_url('hb_pong'))
300 300 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
301 301 pingstream=ZMQStream(hpub,loop),
302 302 pongstream=ZMQStream(hrep,loop)
303 303 )
304 304
305 305 ### Client connections ###
306 306
307 307 # Notifier socket
308 308 n = ZMQStream(ctx.socket(zmq.PUB), loop)
309 309 n.bind(self.client_url('notification'))
310 310
311 311 ### build and launch the queues ###
312 312
313 313 # monitor socket
314 314 sub = ctx.socket(zmq.SUB)
315 315 sub.setsockopt(zmq.SUBSCRIBE, b"")
316 316 sub.bind(self.monitor_url)
317 317 sub.bind('inproc://monitor')
318 318 sub = ZMQStream(sub, loop)
319 319
320 320 # connect the db
321 321 db_class = _db_shortcuts.get(self.db_class.lower(), self.db_class)
322 322 self.log.info('Hub using DB backend: %r', (db_class.split('.')[-1]))
323 323 self.db = import_item(str(db_class))(session=self.session.session,
324 324 config=self.config, log=self.log)
325 325 time.sleep(.25)
326 326
327 327 # resubmit stream
328 328 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
329 329 url = util.disambiguate_url(self.client_url('task'))
330 330 r.connect(url)
331 331
332 332 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
333 333 query=q, notifier=n, resubmit=r, db=self.db,
334 334 engine_info=self.engine_info, client_info=self.client_info,
335 335 log=self.log)
336 336
337 337
338 338 class Hub(SessionFactory):
339 339 """The IPython Controller Hub with 0MQ connections
340 340
341 341 Parameters
342 342 ==========
343 343 loop: zmq IOLoop instance
344 344 session: Session object
345 345 <removed> context: zmq context for creating new connections (?)
346 346 queue: ZMQStream for monitoring the command queue (SUB)
347 347 query: ZMQStream for engine registration and client queries requests (ROUTER)
348 348 heartbeat: HeartMonitor object checking the pulse of the engines
349 349 notifier: ZMQStream for broadcasting engine registration changes (PUB)
350 350 db: connection to db for out of memory logging of commands
351 351 NotImplemented
352 352 engine_info: dict of zmq connection information for engines to connect
353 353 to the queues.
354 354 client_info: dict of zmq connection information for engines to connect
355 355 to the queues.
356 356 """
357 357
358 358 engine_state_file = Unicode()
359 359
360 360 # internal data structures:
361 361 ids=Set() # engine IDs
362 362 keytable=Dict()
363 363 by_ident=Dict()
364 364 engines=Dict()
365 365 clients=Dict()
366 366 hearts=Dict()
367 367 pending=Set()
368 368 queues=Dict() # pending msg_ids keyed by engine_id
369 369 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
370 370 completed=Dict() # completed msg_ids keyed by engine_id
371 371 all_completed=Set() # completed msg_ids keyed by engine_id
372 372 dead_engines=Set() # completed msg_ids keyed by engine_id
373 373 unassigned=Set() # set of task msg_ds not yet assigned a destination
374 374 incoming_registrations=Dict()
375 375 registration_timeout=Integer()
376 376 _idcounter=Integer(0)
377 377
378 378 # objects from constructor:
379 379 query=Instance(ZMQStream)
380 380 monitor=Instance(ZMQStream)
381 381 notifier=Instance(ZMQStream)
382 382 resubmit=Instance(ZMQStream)
383 383 heartmonitor=Instance(HeartMonitor)
384 384 db=Instance(object)
385 385 client_info=Dict()
386 386 engine_info=Dict()
387 387
388 388
389 389 def __init__(self, **kwargs):
390 390 """
391 391 # universal:
392 392 loop: IOLoop for creating future connections
393 393 session: streamsession for sending serialized data
394 394 # engine:
395 395 queue: ZMQStream for monitoring queue messages
396 396 query: ZMQStream for engine+client registration and client requests
397 397 heartbeat: HeartMonitor object for tracking engines
398 398 # extra:
399 399 db: ZMQStream for db connection (NotImplemented)
400 400 engine_info: zmq address/protocol dict for engine connections
401 401 client_info: zmq address/protocol dict for client connections
402 402 """
403 403
404 404 super(Hub, self).__init__(**kwargs)
405 405 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
406 406
407 407 # register our callbacks
408 408 self.query.on_recv(self.dispatch_query)
409 409 self.monitor.on_recv(self.dispatch_monitor_traffic)
410 410
411 411 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
412 412 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
413 413
414 414 self.monitor_handlers = {b'in' : self.save_queue_request,
415 415 b'out': self.save_queue_result,
416 416 b'intask': self.save_task_request,
417 417 b'outtask': self.save_task_result,
418 418 b'tracktask': self.save_task_destination,
419 419 b'incontrol': _passer,
420 420 b'outcontrol': _passer,
421 421 b'iopub': self.save_iopub_message,
422 422 }
423 423
424 424 self.query_handlers = {'queue_request': self.queue_status,
425 425 'result_request': self.get_results,
426 426 'history_request': self.get_history,
427 427 'db_request': self.db_query,
428 428 'purge_request': self.purge_results,
429 429 'load_request': self.check_load,
430 430 'resubmit_request': self.resubmit_task,
431 431 'shutdown_request': self.shutdown_request,
432 432 'registration_request' : self.register_engine,
433 433 'unregistration_request' : self.unregister_engine,
434 434 'connection_request': self.connection_request,
435 435 }
436 436
437 437 # ignore resubmit replies
438 438 self.resubmit.on_recv(lambda msg: None, copy=False)
439 439
440 440 self.log.info("hub::created hub")
441 441
442 442 @property
443 443 def _next_id(self):
444 444 """gemerate a new ID.
445 445
446 446 No longer reuse old ids, just count from 0."""
447 447 newid = self._idcounter
448 448 self._idcounter += 1
449 449 return newid
450 450 # newid = 0
451 451 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
452 452 # # print newid, self.ids, self.incoming_registrations
453 453 # while newid in self.ids or newid in incoming:
454 454 # newid += 1
455 455 # return newid
456 456
457 457 #-----------------------------------------------------------------------------
458 458 # message validation
459 459 #-----------------------------------------------------------------------------
460 460
461 461 def _validate_targets(self, targets):
462 462 """turn any valid targets argument into a list of integer ids"""
463 463 if targets is None:
464 464 # default to all
465 465 return self.ids
466 466
467 467 if isinstance(targets, (int,str,unicode)):
468 468 # only one target specified
469 469 targets = [targets]
470 470 _targets = []
471 471 for t in targets:
472 472 # map raw identities to ids
473 473 if isinstance(t, (str,unicode)):
474 474 t = self.by_ident.get(cast_bytes(t), t)
475 475 _targets.append(t)
476 476 targets = _targets
477 477 bad_targets = [ t for t in targets if t not in self.ids ]
478 478 if bad_targets:
479 479 raise IndexError("No Such Engine: %r" % bad_targets)
480 480 if not targets:
481 481 raise IndexError("No Engines Registered")
482 482 return targets
483 483
484 484 #-----------------------------------------------------------------------------
485 485 # dispatch methods (1 per stream)
486 486 #-----------------------------------------------------------------------------
487 487
488 488
489 489 @util.log_errors
490 490 def dispatch_monitor_traffic(self, msg):
491 491 """all ME and Task queue messages come through here, as well as
492 492 IOPub traffic."""
493 493 self.log.debug("monitor traffic: %r", msg[0])
494 494 switch = msg[0]
495 495 try:
496 496 idents, msg = self.session.feed_identities(msg[1:])
497 497 except ValueError:
498 498 idents=[]
499 499 if not idents:
500 500 self.log.error("Monitor message without topic: %r", msg)
501 501 return
502 502 handler = self.monitor_handlers.get(switch, None)
503 503 if handler is not None:
504 504 handler(idents, msg)
505 505 else:
506 506 self.log.error("Unrecognized monitor topic: %r", switch)
507 507
508 508
509 509 @util.log_errors
510 510 def dispatch_query(self, msg):
511 511 """Route registration requests and queries from clients."""
512 512 try:
513 513 idents, msg = self.session.feed_identities(msg)
514 514 except ValueError:
515 515 idents = []
516 516 if not idents:
517 517 self.log.error("Bad Query Message: %r", msg)
518 518 return
519 519 client_id = idents[0]
520 520 try:
521 521 msg = self.session.unserialize(msg, content=True)
522 522 except Exception:
523 523 content = error.wrap_exception()
524 524 self.log.error("Bad Query Message: %r", msg, exc_info=True)
525 525 self.session.send(self.query, "hub_error", ident=client_id,
526 526 content=content)
527 527 return
528 528 # print client_id, header, parent, content
529 529 #switch on message type:
530 530 msg_type = msg['header']['msg_type']
531 531 self.log.info("client::client %r requested %r", client_id, msg_type)
532 532 handler = self.query_handlers.get(msg_type, None)
533 533 try:
534 534 assert handler is not None, "Bad Message Type: %r" % msg_type
535 535 except:
536 536 content = error.wrap_exception()
537 537 self.log.error("Bad Message Type: %r", msg_type, exc_info=True)
538 538 self.session.send(self.query, "hub_error", ident=client_id,
539 539 content=content)
540 540 return
541 541
542 542 else:
543 543 handler(idents, msg)
544 544
545 545 def dispatch_db(self, msg):
546 546 """"""
547 547 raise NotImplementedError
548 548
549 549 #---------------------------------------------------------------------------
550 550 # handler methods (1 per event)
551 551 #---------------------------------------------------------------------------
552 552
553 553 #----------------------- Heartbeat --------------------------------------
554 554
555 555 def handle_new_heart(self, heart):
556 556 """handler to attach to heartbeater.
557 557 Called when a new heart starts to beat.
558 558 Triggers completion of registration."""
559 559 self.log.debug("heartbeat::handle_new_heart(%r)", heart)
560 560 if heart not in self.incoming_registrations:
561 561 self.log.info("heartbeat::ignoring new heart: %r", heart)
562 562 else:
563 563 self.finish_registration(heart)
564 564
565 565
566 566 def handle_heart_failure(self, heart):
567 567 """handler to attach to heartbeater.
568 568 called when a previously registered heart fails to respond to beat request.
569 569 triggers unregistration"""
570 570 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
571 571 eid = self.hearts.get(heart, None)
572 572 uuid = self.engines[eid].uuid
573 573 if eid is None or self.keytable[eid] in self.dead_engines:
574 574 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
575 575 else:
576 576 self.unregister_engine(heart, dict(content=dict(id=eid, queue=uuid)))
577 577
578 578 #----------------------- MUX Queue Traffic ------------------------------
579 579
580 580 def save_queue_request(self, idents, msg):
581 581 if len(idents) < 2:
582 582 self.log.error("invalid identity prefix: %r", idents)
583 583 return
584 584 queue_id, client_id = idents[:2]
585 585 try:
586 586 msg = self.session.unserialize(msg)
587 587 except Exception:
588 588 self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
589 589 return
590 590
591 591 eid = self.by_ident.get(queue_id, None)
592 592 if eid is None:
593 593 self.log.error("queue::target %r not registered", queue_id)
594 594 self.log.debug("queue:: valid are: %r", self.by_ident.keys())
595 595 return
596 596 record = init_record(msg)
597 597 msg_id = record['msg_id']
598 598 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
599 599 # Unicode in records
600 600 record['engine_uuid'] = queue_id.decode('ascii')
601 601 record['client_uuid'] = msg['header']['session']
602 602 record['queue'] = 'mux'
603 603
604 604 try:
605 605 # it's posible iopub arrived first:
606 606 existing = self.db.get_record(msg_id)
607 607 for key,evalue in existing.iteritems():
608 608 rvalue = record.get(key, None)
609 609 if evalue and rvalue and evalue != rvalue:
610 610 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
611 611 elif evalue and not rvalue:
612 612 record[key] = evalue
613 613 try:
614 614 self.db.update_record(msg_id, record)
615 615 except Exception:
616 616 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
617 617 except KeyError:
618 618 try:
619 619 self.db.add_record(msg_id, record)
620 620 except Exception:
621 621 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
622 622
623 623
624 624 self.pending.add(msg_id)
625 625 self.queues[eid].append(msg_id)
626 626
627 627 def save_queue_result(self, idents, msg):
628 628 if len(idents) < 2:
629 629 self.log.error("invalid identity prefix: %r", idents)
630 630 return
631 631
632 632 client_id, queue_id = idents[:2]
633 633 try:
634 634 msg = self.session.unserialize(msg)
635 635 except Exception:
636 636 self.log.error("queue::engine %r sent invalid message to %r: %r",
637 637 queue_id, client_id, msg, exc_info=True)
638 638 return
639 639
640 640 eid = self.by_ident.get(queue_id, None)
641 641 if eid is None:
642 642 self.log.error("queue::unknown engine %r is sending a reply: ", queue_id)
643 643 return
644 644
645 645 parent = msg['parent_header']
646 646 if not parent:
647 647 return
648 648 msg_id = parent['msg_id']
649 649 if msg_id in self.pending:
650 650 self.pending.remove(msg_id)
651 651 self.all_completed.add(msg_id)
652 652 self.queues[eid].remove(msg_id)
653 653 self.completed[eid].append(msg_id)
654 654 self.log.info("queue::request %r completed on %s", msg_id, eid)
655 655 elif msg_id not in self.all_completed:
656 656 # it could be a result from a dead engine that died before delivering the
657 657 # result
658 658 self.log.warn("queue:: unknown msg finished %r", msg_id)
659 659 return
660 660 # update record anyway, because the unregistration could have been premature
661 661 rheader = msg['header']
662 662 md = msg['metadata']
663 663 completed = rheader['date']
664 664 started = md.get('started', None)
665 665 result = {
666 666 'result_header' : rheader,
667 667 'result_metadata': md,
668 668 'result_content': msg['content'],
669 669 'received': datetime.now(),
670 670 'started' : started,
671 671 'completed' : completed
672 672 }
673 673
674 674 result['result_buffers'] = msg['buffers']
675 675 try:
676 676 self.db.update_record(msg_id, result)
677 677 except Exception:
678 678 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
679 679
680 680
681 681 #--------------------- Task Queue Traffic ------------------------------
682 682
683 683 def save_task_request(self, idents, msg):
684 684 """Save the submission of a task."""
685 685 client_id = idents[0]
686 686
687 687 try:
688 688 msg = self.session.unserialize(msg)
689 689 except Exception:
690 690 self.log.error("task::client %r sent invalid task message: %r",
691 691 client_id, msg, exc_info=True)
692 692 return
693 693 record = init_record(msg)
694 694
695 695 record['client_uuid'] = msg['header']['session']
696 696 record['queue'] = 'task'
697 697 header = msg['header']
698 698 msg_id = header['msg_id']
699 699 self.pending.add(msg_id)
700 700 self.unassigned.add(msg_id)
701 701 try:
702 702 # it's posible iopub arrived first:
703 703 existing = self.db.get_record(msg_id)
704 704 if existing['resubmitted']:
705 705 for key in ('submitted', 'client_uuid', 'buffers'):
706 706 # don't clobber these keys on resubmit
707 707 # submitted and client_uuid should be different
708 708 # and buffers might be big, and shouldn't have changed
709 709 record.pop(key)
710 710 # still check content,header which should not change
711 711 # but are not expensive to compare as buffers
712 712
713 713 for key,evalue in existing.iteritems():
714 714 if key.endswith('buffers'):
715 715 # don't compare buffers
716 716 continue
717 717 rvalue = record.get(key, None)
718 718 if evalue and rvalue and evalue != rvalue:
719 719 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
720 720 elif evalue and not rvalue:
721 721 record[key] = evalue
722 722 try:
723 723 self.db.update_record(msg_id, record)
724 724 except Exception:
725 725 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
726 726 except KeyError:
727 727 try:
728 728 self.db.add_record(msg_id, record)
729 729 except Exception:
730 730 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
731 731 except Exception:
732 732 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
733 733
734 734 def save_task_result(self, idents, msg):
735 735 """save the result of a completed task."""
736 736 client_id = idents[0]
737 737 try:
738 738 msg = self.session.unserialize(msg)
739 739 except Exception:
740 740 self.log.error("task::invalid task result message send to %r: %r",
741 741 client_id, msg, exc_info=True)
742 742 return
743 743
744 744 parent = msg['parent_header']
745 745 if not parent:
746 746 # print msg
747 747 self.log.warn("Task %r had no parent!", msg)
748 748 return
749 749 msg_id = parent['msg_id']
750 750 if msg_id in self.unassigned:
751 751 self.unassigned.remove(msg_id)
752 752
753 753 header = msg['header']
754 754 md = msg['metadata']
755 755 engine_uuid = md.get('engine', u'')
756 756 eid = self.by_ident.get(cast_bytes(engine_uuid), None)
757 757
758 758 status = md.get('status', None)
759 759
760 760 if msg_id in self.pending:
761 761 self.log.info("task::task %r finished on %s", msg_id, eid)
762 762 self.pending.remove(msg_id)
763 763 self.all_completed.add(msg_id)
764 764 if eid is not None:
765 765 if status != 'aborted':
766 766 self.completed[eid].append(msg_id)
767 767 if msg_id in self.tasks[eid]:
768 768 self.tasks[eid].remove(msg_id)
769 769 completed = header['date']
770 770 started = md.get('started', None)
771 771 result = {
772 772 'result_header' : header,
773 773 'result_metadata': msg['metadata'],
774 774 'result_content': msg['content'],
775 775 'started' : started,
776 776 'completed' : completed,
777 777 'received' : datetime.now(),
778 778 'engine_uuid': engine_uuid,
779 779 }
780 780
781 781 result['result_buffers'] = msg['buffers']
782 782 try:
783 783 self.db.update_record(msg_id, result)
784 784 except Exception:
785 785 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
786 786
787 787 else:
788 788 self.log.debug("task::unknown task %r finished", msg_id)
789 789
790 790 def save_task_destination(self, idents, msg):
791 791 try:
792 792 msg = self.session.unserialize(msg, content=True)
793 793 except Exception:
794 794 self.log.error("task::invalid task tracking message", exc_info=True)
795 795 return
796 796 content = msg['content']
797 797 # print (content)
798 798 msg_id = content['msg_id']
799 799 engine_uuid = content['engine_id']
800 800 eid = self.by_ident[cast_bytes(engine_uuid)]
801 801
802 802 self.log.info("task::task %r arrived on %r", msg_id, eid)
803 803 if msg_id in self.unassigned:
804 804 self.unassigned.remove(msg_id)
805 805 # else:
806 806 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
807 807
808 808 self.tasks[eid].append(msg_id)
809 809 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
810 810 try:
811 811 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
812 812 except Exception:
813 813 self.log.error("DB Error saving task destination %r", msg_id, exc_info=True)
814 814
815 815
816 816 def mia_task_request(self, idents, msg):
817 817 raise NotImplementedError
818 818 client_id = idents[0]
819 819 # content = dict(mia=self.mia,status='ok')
820 820 # self.session.send('mia_reply', content=content, idents=client_id)
821 821
822 822
823 823 #--------------------- IOPub Traffic ------------------------------
824 824
825 825 def save_iopub_message(self, topics, msg):
826 826 """save an iopub message into the db"""
827 827 # print (topics)
828 828 try:
829 829 msg = self.session.unserialize(msg, content=True)
830 830 except Exception:
831 831 self.log.error("iopub::invalid IOPub message", exc_info=True)
832 832 return
833 833
834 834 parent = msg['parent_header']
835 835 if not parent:
836 836 self.log.warn("iopub::IOPub message lacks parent: %r", msg)
837 837 return
838 838 msg_id = parent['msg_id']
839 839 msg_type = msg['header']['msg_type']
840 840 content = msg['content']
841 841
842 842 # ensure msg_id is in db
843 843 try:
844 844 rec = self.db.get_record(msg_id)
845 845 except KeyError:
846 846 rec = empty_record()
847 847 rec['msg_id'] = msg_id
848 848 self.db.add_record(msg_id, rec)
849 849 # stream
850 850 d = {}
851 851 if msg_type == 'stream':
852 852 name = content['name']
853 853 s = rec[name] or ''
854 854 d[name] = s + content['data']
855 855
856 856 elif msg_type == 'pyerr':
857 857 d['pyerr'] = content
858 858 elif msg_type == 'pyin':
859 859 d['pyin'] = content['code']
860 860 elif msg_type in ('display_data', 'pyout'):
861 861 d[msg_type] = content
862 862 elif msg_type == 'status':
863 863 pass
864 elif msg_type == 'data_pub':
865 self.log.info("ignored data_pub message for %s" % msg_id)
864 866 else:
865 867 self.log.warn("unhandled iopub msg_type: %r", msg_type)
866 868
867 869 if not d:
868 870 return
869 871
870 872 try:
871 873 self.db.update_record(msg_id, d)
872 874 except Exception:
873 875 self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True)
874 876
875 877
876 878
877 879 #-------------------------------------------------------------------------
878 880 # Registration requests
879 881 #-------------------------------------------------------------------------
880 882
881 883 def connection_request(self, client_id, msg):
882 884 """Reply with connection addresses for clients."""
883 885 self.log.info("client::client %r connected", client_id)
884 886 content = dict(status='ok')
885 887 jsonable = {}
886 888 for k,v in self.keytable.iteritems():
887 889 if v not in self.dead_engines:
888 890 jsonable[str(k)] = v
889 891 content['engines'] = jsonable
890 892 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
891 893
892 894 def register_engine(self, reg, msg):
893 895 """Register a new engine."""
894 896 content = msg['content']
895 897 try:
896 898 uuid = content['uuid']
897 899 except KeyError:
898 900 self.log.error("registration::queue not specified", exc_info=True)
899 901 return
900 902
901 903 eid = self._next_id
902 904
903 905 self.log.debug("registration::register_engine(%i, %r)", eid, uuid)
904 906
905 907 content = dict(id=eid,status='ok')
906 908 # check if requesting available IDs:
907 909 if cast_bytes(uuid) in self.by_ident:
908 910 try:
909 911 raise KeyError("uuid %r in use" % uuid)
910 912 except:
911 913 content = error.wrap_exception()
912 914 self.log.error("uuid %r in use", uuid, exc_info=True)
913 915 else:
914 916 for h, ec in self.incoming_registrations.iteritems():
915 917 if uuid == h:
916 918 try:
917 919 raise KeyError("heart_id %r in use" % uuid)
918 920 except:
919 921 self.log.error("heart_id %r in use", uuid, exc_info=True)
920 922 content = error.wrap_exception()
921 923 break
922 924 elif uuid == ec.uuid:
923 925 try:
924 926 raise KeyError("uuid %r in use" % uuid)
925 927 except:
926 928 self.log.error("uuid %r in use", uuid, exc_info=True)
927 929 content = error.wrap_exception()
928 930 break
929 931
930 932 msg = self.session.send(self.query, "registration_reply",
931 933 content=content,
932 934 ident=reg)
933 935
934 936 heart = cast_bytes(uuid)
935 937
936 938 if content['status'] == 'ok':
937 939 if heart in self.heartmonitor.hearts:
938 940 # already beating
939 941 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid)
940 942 self.finish_registration(heart)
941 943 else:
942 944 purge = lambda : self._purge_stalled_registration(heart)
943 945 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
944 946 dc.start()
945 947 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid,stallback=dc)
946 948 else:
947 949 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
948 950
949 951 return eid
950 952
951 953 def unregister_engine(self, ident, msg):
952 954 """Unregister an engine that explicitly requested to leave."""
953 955 try:
954 956 eid = msg['content']['id']
955 957 except:
956 958 self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
957 959 return
958 960 self.log.info("registration::unregister_engine(%r)", eid)
959 961 # print (eid)
960 962 uuid = self.keytable[eid]
961 963 content=dict(id=eid, uuid=uuid)
962 964 self.dead_engines.add(uuid)
963 965 # self.ids.remove(eid)
964 966 # uuid = self.keytable.pop(eid)
965 967 #
966 968 # ec = self.engines.pop(eid)
967 969 # self.hearts.pop(ec.heartbeat)
968 970 # self.by_ident.pop(ec.queue)
969 971 # self.completed.pop(eid)
970 972 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
971 973 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
972 974 dc.start()
973 975 ############## TODO: HANDLE IT ################
974 976
975 977 self._save_engine_state()
976 978
977 979 if self.notifier:
978 980 self.session.send(self.notifier, "unregistration_notification", content=content)
979 981
980 982 def _handle_stranded_msgs(self, eid, uuid):
981 983 """Handle messages known to be on an engine when the engine unregisters.
982 984
983 985 It is possible that this will fire prematurely - that is, an engine will
984 986 go down after completing a result, and the client will be notified
985 987 that the result failed and later receive the actual result.
986 988 """
987 989
988 990 outstanding = self.queues[eid]
989 991
990 992 for msg_id in outstanding:
991 993 self.pending.remove(msg_id)
992 994 self.all_completed.add(msg_id)
993 995 try:
994 996 raise error.EngineError("Engine %r died while running task %r" % (eid, msg_id))
995 997 except:
996 998 content = error.wrap_exception()
997 999 # build a fake header:
998 1000 header = {}
999 1001 header['engine'] = uuid
1000 1002 header['date'] = datetime.now()
1001 1003 rec = dict(result_content=content, result_header=header, result_buffers=[])
1002 1004 rec['completed'] = header['date']
1003 1005 rec['engine_uuid'] = uuid
1004 1006 try:
1005 1007 self.db.update_record(msg_id, rec)
1006 1008 except Exception:
1007 1009 self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True)
1008 1010
1009 1011
1010 1012 def finish_registration(self, heart):
1011 1013 """Second half of engine registration, called after our HeartMonitor
1012 1014 has received a beat from the Engine's Heart."""
1013 1015 try:
1014 1016 ec = self.incoming_registrations.pop(heart)
1015 1017 except KeyError:
1016 1018 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
1017 1019 return
1018 1020 self.log.info("registration::finished registering engine %i:%s", ec.id, ec.uuid)
1019 1021 if ec.stallback is not None:
1020 1022 ec.stallback.stop()
1021 1023 eid = ec.id
1022 1024 self.ids.add(eid)
1023 1025 self.keytable[eid] = ec.uuid
1024 1026 self.engines[eid] = ec
1025 1027 self.by_ident[cast_bytes(ec.uuid)] = ec.id
1026 1028 self.queues[eid] = list()
1027 1029 self.tasks[eid] = list()
1028 1030 self.completed[eid] = list()
1029 1031 self.hearts[heart] = eid
1030 1032 content = dict(id=eid, uuid=self.engines[eid].uuid)
1031 1033 if self.notifier:
1032 1034 self.session.send(self.notifier, "registration_notification", content=content)
1033 1035 self.log.info("engine::Engine Connected: %i", eid)
1034 1036
1035 1037 self._save_engine_state()
1036 1038
1037 1039 def _purge_stalled_registration(self, heart):
1038 1040 if heart in self.incoming_registrations:
1039 1041 ec = self.incoming_registrations.pop(heart)
1040 1042 self.log.info("registration::purging stalled registration: %i", ec.id)
1041 1043 else:
1042 1044 pass
1043 1045
1044 1046 #-------------------------------------------------------------------------
1045 1047 # Engine State
1046 1048 #-------------------------------------------------------------------------
1047 1049
1048 1050
1049 1051 def _cleanup_engine_state_file(self):
1050 1052 """cleanup engine state mapping"""
1051 1053
1052 1054 if os.path.exists(self.engine_state_file):
1053 1055 self.log.debug("cleaning up engine state: %s", self.engine_state_file)
1054 1056 try:
1055 1057 os.remove(self.engine_state_file)
1056 1058 except IOError:
1057 1059 self.log.error("Couldn't cleanup file: %s", self.engine_state_file, exc_info=True)
1058 1060
1059 1061
1060 1062 def _save_engine_state(self):
1061 1063 """save engine mapping to JSON file"""
1062 1064 if not self.engine_state_file:
1063 1065 return
1064 1066 self.log.debug("save engine state to %s" % self.engine_state_file)
1065 1067 state = {}
1066 1068 engines = {}
1067 1069 for eid, ec in self.engines.iteritems():
1068 1070 if ec.uuid not in self.dead_engines:
1069 1071 engines[eid] = ec.uuid
1070 1072
1071 1073 state['engines'] = engines
1072 1074
1073 1075 state['next_id'] = self._idcounter
1074 1076
1075 1077 with open(self.engine_state_file, 'w') as f:
1076 1078 json.dump(state, f)
1077 1079
1078 1080
1079 1081 def _load_engine_state(self):
1080 1082 """load engine mapping from JSON file"""
1081 1083 if not os.path.exists(self.engine_state_file):
1082 1084 return
1083 1085
1084 1086 self.log.info("loading engine state from %s" % self.engine_state_file)
1085 1087
1086 1088 with open(self.engine_state_file) as f:
1087 1089 state = json.load(f)
1088 1090
1089 1091 save_notifier = self.notifier
1090 1092 self.notifier = None
1091 1093 for eid, uuid in state['engines'].iteritems():
1092 1094 heart = uuid.encode('ascii')
1093 1095 # start with this heart as current and beating:
1094 1096 self.heartmonitor.responses.add(heart)
1095 1097 self.heartmonitor.hearts.add(heart)
1096 1098
1097 1099 self.incoming_registrations[heart] = EngineConnector(id=int(eid), uuid=uuid)
1098 1100 self.finish_registration(heart)
1099 1101
1100 1102 self.notifier = save_notifier
1101 1103
1102 1104 self._idcounter = state['next_id']
1103 1105
1104 1106 #-------------------------------------------------------------------------
1105 1107 # Client Requests
1106 1108 #-------------------------------------------------------------------------
1107 1109
1108 1110 def shutdown_request(self, client_id, msg):
1109 1111 """handle shutdown request."""
1110 1112 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1111 1113 # also notify other clients of shutdown
1112 1114 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1113 1115 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1114 1116 dc.start()
1115 1117
1116 1118 def _shutdown(self):
1117 1119 self.log.info("hub::hub shutting down.")
1118 1120 time.sleep(0.1)
1119 1121 sys.exit(0)
1120 1122
1121 1123
1122 1124 def check_load(self, client_id, msg):
1123 1125 content = msg['content']
1124 1126 try:
1125 1127 targets = content['targets']
1126 1128 targets = self._validate_targets(targets)
1127 1129 except:
1128 1130 content = error.wrap_exception()
1129 1131 self.session.send(self.query, "hub_error",
1130 1132 content=content, ident=client_id)
1131 1133 return
1132 1134
1133 1135 content = dict(status='ok')
1134 1136 # loads = {}
1135 1137 for t in targets:
1136 1138 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1137 1139 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1138 1140
1139 1141
1140 1142 def queue_status(self, client_id, msg):
1141 1143 """Return the Queue status of one or more targets.
1142 1144 if verbose: return the msg_ids
1143 1145 else: return len of each type.
1144 1146 keys: queue (pending MUX jobs)
1145 1147 tasks (pending Task jobs)
1146 1148 completed (finished jobs from both queues)"""
1147 1149 content = msg['content']
1148 1150 targets = content['targets']
1149 1151 try:
1150 1152 targets = self._validate_targets(targets)
1151 1153 except:
1152 1154 content = error.wrap_exception()
1153 1155 self.session.send(self.query, "hub_error",
1154 1156 content=content, ident=client_id)
1155 1157 return
1156 1158 verbose = content.get('verbose', False)
1157 1159 content = dict(status='ok')
1158 1160 for t in targets:
1159 1161 queue = self.queues[t]
1160 1162 completed = self.completed[t]
1161 1163 tasks = self.tasks[t]
1162 1164 if not verbose:
1163 1165 queue = len(queue)
1164 1166 completed = len(completed)
1165 1167 tasks = len(tasks)
1166 1168 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1167 1169 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1168 1170 # print (content)
1169 1171 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1170 1172
1171 1173 def purge_results(self, client_id, msg):
1172 1174 """Purge results from memory. This method is more valuable before we move
1173 1175 to a DB based message storage mechanism."""
1174 1176 content = msg['content']
1175 1177 self.log.info("Dropping records with %s", content)
1176 1178 msg_ids = content.get('msg_ids', [])
1177 1179 reply = dict(status='ok')
1178 1180 if msg_ids == 'all':
1179 1181 try:
1180 1182 self.db.drop_matching_records(dict(completed={'$ne':None}))
1181 1183 except Exception:
1182 1184 reply = error.wrap_exception()
1183 1185 else:
1184 1186 pending = filter(lambda m: m in self.pending, msg_ids)
1185 1187 if pending:
1186 1188 try:
1187 1189 raise IndexError("msg pending: %r" % pending[0])
1188 1190 except:
1189 1191 reply = error.wrap_exception()
1190 1192 else:
1191 1193 try:
1192 1194 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1193 1195 except Exception:
1194 1196 reply = error.wrap_exception()
1195 1197
1196 1198 if reply['status'] == 'ok':
1197 1199 eids = content.get('engine_ids', [])
1198 1200 for eid in eids:
1199 1201 if eid not in self.engines:
1200 1202 try:
1201 1203 raise IndexError("No such engine: %i" % eid)
1202 1204 except:
1203 1205 reply = error.wrap_exception()
1204 1206 break
1205 1207 uid = self.engines[eid].uuid
1206 1208 try:
1207 1209 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1208 1210 except Exception:
1209 1211 reply = error.wrap_exception()
1210 1212 break
1211 1213
1212 1214 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1213 1215
1214 1216 def resubmit_task(self, client_id, msg):
1215 1217 """Resubmit one or more tasks."""
1216 1218 def finish(reply):
1217 1219 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1218 1220
1219 1221 content = msg['content']
1220 1222 msg_ids = content['msg_ids']
1221 1223 reply = dict(status='ok')
1222 1224 try:
1223 1225 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1224 1226 'header', 'content', 'buffers'])
1225 1227 except Exception:
1226 1228 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1227 1229 return finish(error.wrap_exception())
1228 1230
1229 1231 # validate msg_ids
1230 1232 found_ids = [ rec['msg_id'] for rec in records ]
1231 1233 pending_ids = [ msg_id for msg_id in found_ids if msg_id in self.pending ]
1232 1234 if len(records) > len(msg_ids):
1233 1235 try:
1234 1236 raise RuntimeError("DB appears to be in an inconsistent state."
1235 1237 "More matching records were found than should exist")
1236 1238 except Exception:
1237 1239 return finish(error.wrap_exception())
1238 1240 elif len(records) < len(msg_ids):
1239 1241 missing = [ m for m in msg_ids if m not in found_ids ]
1240 1242 try:
1241 1243 raise KeyError("No such msg(s): %r" % missing)
1242 1244 except KeyError:
1243 1245 return finish(error.wrap_exception())
1244 1246 elif pending_ids:
1245 1247 pass
1246 1248 # no need to raise on resubmit of pending task, now that we
1247 1249 # resubmit under new ID, but do we want to raise anyway?
1248 1250 # msg_id = invalid_ids[0]
1249 1251 # try:
1250 1252 # raise ValueError("Task(s) %r appears to be inflight" % )
1251 1253 # except Exception:
1252 1254 # return finish(error.wrap_exception())
1253 1255
1254 1256 # mapping of original IDs to resubmitted IDs
1255 1257 resubmitted = {}
1256 1258
1257 1259 # send the messages
1258 1260 for rec in records:
1259 1261 header = rec['header']
1260 1262 msg = self.session.msg(header['msg_type'], parent=header)
1261 1263 msg_id = msg['msg_id']
1262 1264 msg['content'] = rec['content']
1263 1265
1264 1266 # use the old header, but update msg_id and timestamp
1265 1267 fresh = msg['header']
1266 1268 header['msg_id'] = fresh['msg_id']
1267 1269 header['date'] = fresh['date']
1268 1270 msg['header'] = header
1269 1271
1270 1272 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1271 1273
1272 1274 resubmitted[rec['msg_id']] = msg_id
1273 1275 self.pending.add(msg_id)
1274 1276 msg['buffers'] = rec['buffers']
1275 1277 try:
1276 1278 self.db.add_record(msg_id, init_record(msg))
1277 1279 except Exception:
1278 1280 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1279 1281 return finish(error.wrap_exception())
1280 1282
1281 1283 finish(dict(status='ok', resubmitted=resubmitted))
1282 1284
1283 1285 # store the new IDs in the Task DB
1284 1286 for msg_id, resubmit_id in resubmitted.iteritems():
1285 1287 try:
1286 1288 self.db.update_record(msg_id, {'resubmitted' : resubmit_id})
1287 1289 except Exception:
1288 1290 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1289 1291
1290 1292
1291 1293 def _extract_record(self, rec):
1292 1294 """decompose a TaskRecord dict into subsection of reply for get_result"""
1293 1295 io_dict = {}
1294 1296 for key in ('pyin', 'pyout', 'pyerr', 'stdout', 'stderr'):
1295 1297 io_dict[key] = rec[key]
1296 1298 content = {
1297 1299 'header': rec['header'],
1298 1300 'metadata': rec['metadata'],
1299 1301 'result_metadata': rec['result_metadata'],
1300 1302 'result_header' : rec['result_header'],
1301 1303 'result_content': rec['result_content'],
1302 1304 'received' : rec['received'],
1303 1305 'io' : io_dict,
1304 1306 }
1305 1307 if rec['result_buffers']:
1306 1308 buffers = map(bytes, rec['result_buffers'])
1307 1309 else:
1308 1310 buffers = []
1309 1311
1310 1312 return content, buffers
1311 1313
1312 1314 def get_results(self, client_id, msg):
1313 1315 """Get the result of 1 or more messages."""
1314 1316 content = msg['content']
1315 1317 msg_ids = sorted(set(content['msg_ids']))
1316 1318 statusonly = content.get('status_only', False)
1317 1319 pending = []
1318 1320 completed = []
1319 1321 content = dict(status='ok')
1320 1322 content['pending'] = pending
1321 1323 content['completed'] = completed
1322 1324 buffers = []
1323 1325 if not statusonly:
1324 1326 try:
1325 1327 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1326 1328 # turn match list into dict, for faster lookup
1327 1329 records = {}
1328 1330 for rec in matches:
1329 1331 records[rec['msg_id']] = rec
1330 1332 except Exception:
1331 1333 content = error.wrap_exception()
1332 1334 self.session.send(self.query, "result_reply", content=content,
1333 1335 parent=msg, ident=client_id)
1334 1336 return
1335 1337 else:
1336 1338 records = {}
1337 1339 for msg_id in msg_ids:
1338 1340 if msg_id in self.pending:
1339 1341 pending.append(msg_id)
1340 1342 elif msg_id in self.all_completed:
1341 1343 completed.append(msg_id)
1342 1344 if not statusonly:
1343 1345 c,bufs = self._extract_record(records[msg_id])
1344 1346 content[msg_id] = c
1345 1347 buffers.extend(bufs)
1346 1348 elif msg_id in records:
1347 1349 if rec['completed']:
1348 1350 completed.append(msg_id)
1349 1351 c,bufs = self._extract_record(records[msg_id])
1350 1352 content[msg_id] = c
1351 1353 buffers.extend(bufs)
1352 1354 else:
1353 1355 pending.append(msg_id)
1354 1356 else:
1355 1357 try:
1356 1358 raise KeyError('No such message: '+msg_id)
1357 1359 except:
1358 1360 content = error.wrap_exception()
1359 1361 break
1360 1362 self.session.send(self.query, "result_reply", content=content,
1361 1363 parent=msg, ident=client_id,
1362 1364 buffers=buffers)
1363 1365
1364 1366 def get_history(self, client_id, msg):
1365 1367 """Get a list of all msg_ids in our DB records"""
1366 1368 try:
1367 1369 msg_ids = self.db.get_history()
1368 1370 except Exception as e:
1369 1371 content = error.wrap_exception()
1370 1372 else:
1371 1373 content = dict(status='ok', history=msg_ids)
1372 1374
1373 1375 self.session.send(self.query, "history_reply", content=content,
1374 1376 parent=msg, ident=client_id)
1375 1377
1376 1378 def db_query(self, client_id, msg):
1377 1379 """Perform a raw query on the task record database."""
1378 1380 content = msg['content']
1379 1381 query = content.get('query', {})
1380 1382 keys = content.get('keys', None)
1381 1383 buffers = []
1382 1384 empty = list()
1383 1385 try:
1384 1386 records = self.db.find_records(query, keys)
1385 1387 except Exception as e:
1386 1388 content = error.wrap_exception()
1387 1389 else:
1388 1390 # extract buffers from reply content:
1389 1391 if keys is not None:
1390 1392 buffer_lens = [] if 'buffers' in keys else None
1391 1393 result_buffer_lens = [] if 'result_buffers' in keys else None
1392 1394 else:
1393 1395 buffer_lens = None
1394 1396 result_buffer_lens = None
1395 1397
1396 1398 for rec in records:
1397 1399 # buffers may be None, so double check
1398 1400 b = rec.pop('buffers', empty) or empty
1399 1401 if buffer_lens is not None:
1400 1402 buffer_lens.append(len(b))
1401 1403 buffers.extend(b)
1402 1404 rb = rec.pop('result_buffers', empty) or empty
1403 1405 if result_buffer_lens is not None:
1404 1406 result_buffer_lens.append(len(rb))
1405 1407 buffers.extend(rb)
1406 1408 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1407 1409 result_buffer_lens=result_buffer_lens)
1408 1410 # self.log.debug (content)
1409 1411 self.session.send(self.query, "db_reply", content=content,
1410 1412 parent=msg, ident=client_id,
1411 1413 buffers=buffers)
1412 1414
General Comments 0
You need to be logged in to leave comments. Login now