##// END OF EJS Templates
fix a couple of datetime entries in the Hub
MinRK -
Show More
@@ -1,1426 +1,1426 b''
1 1 """The IPython Controller Hub with 0MQ
2 2 This is the master object that handles connections from engines and clients,
3 3 and monitors traffic through the various queues.
4 4
5 5 Authors:
6 6
7 7 * Min RK
8 8 """
9 9 #-----------------------------------------------------------------------------
10 10 # Copyright (C) 2010-2011 The IPython Development Team
11 11 #
12 12 # Distributed under the terms of the BSD License. The full license is in
13 13 # the file COPYING, distributed as part of this software.
14 14 #-----------------------------------------------------------------------------
15 15
16 16 #-----------------------------------------------------------------------------
17 17 # Imports
18 18 #-----------------------------------------------------------------------------
19 19 from __future__ import print_function
20 20
21 21 import json
22 22 import os
23 23 import sys
24 24 import time
25 25 from datetime import datetime
26 26
27 27 import zmq
28 28 from zmq.eventloop import ioloop
29 29 from zmq.eventloop.zmqstream import ZMQStream
30 30
31 31 # internal:
32 32 from IPython.utils.importstring import import_item
33 33 from IPython.utils.jsonutil import extract_dates
34 34 from IPython.utils.localinterfaces import localhost
35 35 from IPython.utils.py3compat import cast_bytes, unicode_type, iteritems
36 36 from IPython.utils.traitlets import (
37 37 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
38 38 )
39 39
40 40 from IPython.parallel import error, util
41 41 from IPython.parallel.factory import RegistrationFactory
42 42
43 43 from IPython.kernel.zmq.session import SessionFactory
44 44
45 45 from .heartmonitor import HeartMonitor
46 46
47 47 #-----------------------------------------------------------------------------
48 48 # Code
49 49 #-----------------------------------------------------------------------------
50 50
51 51 def _passer(*args, **kwargs):
52 52 return
53 53
54 54 def _printer(*args, **kwargs):
55 55 print (args)
56 56 print (kwargs)
57 57
58 58 def empty_record():
59 59 """Return an empty dict with all record keys."""
60 60 return {
61 61 'msg_id' : None,
62 62 'header' : None,
63 63 'metadata' : None,
64 64 'content': None,
65 65 'buffers': None,
66 66 'submitted': None,
67 67 'client_uuid' : None,
68 68 'engine_uuid' : None,
69 69 'started': None,
70 70 'completed': None,
71 71 'resubmitted': None,
72 72 'received': None,
73 73 'result_header' : None,
74 74 'result_metadata' : None,
75 75 'result_content' : None,
76 76 'result_buffers' : None,
77 77 'queue' : None,
78 78 'pyin' : None,
79 79 'pyout': None,
80 80 'pyerr': None,
81 81 'stdout': '',
82 82 'stderr': '',
83 83 }
84 84
85 85 def init_record(msg):
86 86 """Initialize a TaskRecord based on a request."""
87 87 header = msg['header']
88 88 return {
89 89 'msg_id' : header['msg_id'],
90 90 'header' : header,
91 91 'content': msg['content'],
92 92 'metadata': msg['metadata'],
93 93 'buffers': msg['buffers'],
94 94 'submitted': header['date'],
95 95 'client_uuid' : None,
96 96 'engine_uuid' : None,
97 97 'started': None,
98 98 'completed': None,
99 99 'resubmitted': None,
100 100 'received': None,
101 101 'result_header' : None,
102 102 'result_metadata': None,
103 103 'result_content' : None,
104 104 'result_buffers' : None,
105 105 'queue' : None,
106 106 'pyin' : None,
107 107 'pyout': None,
108 108 'pyerr': None,
109 109 'stdout': '',
110 110 'stderr': '',
111 111 }
112 112
113 113
114 114 class EngineConnector(HasTraits):
115 115 """A simple object for accessing the various zmq connections of an object.
116 116 Attributes are:
117 117 id (int): engine ID
118 118 uuid (unicode): engine UUID
119 119 pending: set of msg_ids
120 120 stallback: DelayedCallback for stalled registration
121 121 """
122 122
123 123 id = Integer(0)
124 124 uuid = Unicode()
125 125 pending = Set()
126 126 stallback = Instance(ioloop.DelayedCallback)
127 127
128 128
129 129 _db_shortcuts = {
130 130 'sqlitedb' : 'IPython.parallel.controller.sqlitedb.SQLiteDB',
131 131 'mongodb' : 'IPython.parallel.controller.mongodb.MongoDB',
132 132 'dictdb' : 'IPython.parallel.controller.dictdb.DictDB',
133 133 'nodb' : 'IPython.parallel.controller.dictdb.NoDB',
134 134 }
135 135
136 136 class HubFactory(RegistrationFactory):
137 137 """The Configurable for setting up a Hub."""
138 138
139 139 # port-pairs for monitoredqueues:
140 140 hb = Tuple(Integer,Integer,config=True,
141 141 help="""PUB/ROUTER Port pair for Engine heartbeats""")
142 142 def _hb_default(self):
143 143 return tuple(util.select_random_ports(2))
144 144
145 145 mux = Tuple(Integer,Integer,config=True,
146 146 help="""Client/Engine Port pair for MUX queue""")
147 147
148 148 def _mux_default(self):
149 149 return tuple(util.select_random_ports(2))
150 150
151 151 task = Tuple(Integer,Integer,config=True,
152 152 help="""Client/Engine Port pair for Task queue""")
153 153 def _task_default(self):
154 154 return tuple(util.select_random_ports(2))
155 155
156 156 control = Tuple(Integer,Integer,config=True,
157 157 help="""Client/Engine Port pair for Control queue""")
158 158
159 159 def _control_default(self):
160 160 return tuple(util.select_random_ports(2))
161 161
162 162 iopub = Tuple(Integer,Integer,config=True,
163 163 help="""Client/Engine Port pair for IOPub relay""")
164 164
165 165 def _iopub_default(self):
166 166 return tuple(util.select_random_ports(2))
167 167
168 168 # single ports:
169 169 mon_port = Integer(config=True,
170 170 help="""Monitor (SUB) port for queue traffic""")
171 171
172 172 def _mon_port_default(self):
173 173 return util.select_random_ports(1)[0]
174 174
175 175 notifier_port = Integer(config=True,
176 176 help="""PUB port for sending engine status notifications""")
177 177
178 178 def _notifier_port_default(self):
179 179 return util.select_random_ports(1)[0]
180 180
181 181 engine_ip = Unicode(config=True,
182 182 help="IP on which to listen for engine connections. [default: loopback]")
183 183 def _engine_ip_default(self):
184 184 return localhost()
185 185 engine_transport = Unicode('tcp', config=True,
186 186 help="0MQ transport for engine connections. [default: tcp]")
187 187
188 188 client_ip = Unicode(config=True,
189 189 help="IP on which to listen for client connections. [default: loopback]")
190 190 client_transport = Unicode('tcp', config=True,
191 191 help="0MQ transport for client connections. [default : tcp]")
192 192
193 193 monitor_ip = Unicode(config=True,
194 194 help="IP on which to listen for monitor messages. [default: loopback]")
195 195 monitor_transport = Unicode('tcp', config=True,
196 196 help="0MQ transport for monitor messages. [default : tcp]")
197 197
198 198 _client_ip_default = _monitor_ip_default = _engine_ip_default
199 199
200 200
201 201 monitor_url = Unicode('')
202 202
203 203 db_class = DottedObjectName('NoDB',
204 204 config=True, help="""The class to use for the DB backend
205 205
206 206 Options include:
207 207
208 208 SQLiteDB: SQLite
209 209 MongoDB : use MongoDB
210 210 DictDB : in-memory storage (fastest, but be mindful of memory growth of the Hub)
211 211 NoDB : disable database altogether (default)
212 212
213 213 """)
214 214
215 215 # not configurable
216 216 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
217 217 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
218 218
219 219 def _ip_changed(self, name, old, new):
220 220 self.engine_ip = new
221 221 self.client_ip = new
222 222 self.monitor_ip = new
223 223 self._update_monitor_url()
224 224
225 225 def _update_monitor_url(self):
226 226 self.monitor_url = "%s://%s:%i" % (self.monitor_transport, self.monitor_ip, self.mon_port)
227 227
228 228 def _transport_changed(self, name, old, new):
229 229 self.engine_transport = new
230 230 self.client_transport = new
231 231 self.monitor_transport = new
232 232 self._update_monitor_url()
233 233
234 234 def __init__(self, **kwargs):
235 235 super(HubFactory, self).__init__(**kwargs)
236 236 self._update_monitor_url()
237 237
238 238
239 239 def construct(self):
240 240 self.init_hub()
241 241
242 242 def start(self):
243 243 self.heartmonitor.start()
244 244 self.log.info("Heartmonitor started")
245 245
246 246 def client_url(self, channel):
247 247 """return full zmq url for a named client channel"""
248 248 return "%s://%s:%i" % (self.client_transport, self.client_ip, self.client_info[channel])
249 249
250 250 def engine_url(self, channel):
251 251 """return full zmq url for a named engine channel"""
252 252 return "%s://%s:%i" % (self.engine_transport, self.engine_ip, self.engine_info[channel])
253 253
254 254 def init_hub(self):
255 255 """construct Hub object"""
256 256
257 257 ctx = self.context
258 258 loop = self.loop
259 259 if 'TaskScheduler.scheme_name' in self.config:
260 260 scheme = self.config.TaskScheduler.scheme_name
261 261 else:
262 262 from .scheduler import TaskScheduler
263 263 scheme = TaskScheduler.scheme_name.get_default_value()
264 264
265 265 # build connection dicts
266 266 engine = self.engine_info = {
267 267 'interface' : "%s://%s" % (self.engine_transport, self.engine_ip),
268 268 'registration' : self.regport,
269 269 'control' : self.control[1],
270 270 'mux' : self.mux[1],
271 271 'hb_ping' : self.hb[0],
272 272 'hb_pong' : self.hb[1],
273 273 'task' : self.task[1],
274 274 'iopub' : self.iopub[1],
275 275 }
276 276
277 277 client = self.client_info = {
278 278 'interface' : "%s://%s" % (self.client_transport, self.client_ip),
279 279 'registration' : self.regport,
280 280 'control' : self.control[0],
281 281 'mux' : self.mux[0],
282 282 'task' : self.task[0],
283 283 'task_scheme' : scheme,
284 284 'iopub' : self.iopub[0],
285 285 'notification' : self.notifier_port,
286 286 }
287 287
288 288 self.log.debug("Hub engine addrs: %s", self.engine_info)
289 289 self.log.debug("Hub client addrs: %s", self.client_info)
290 290
291 291 # Registrar socket
292 292 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
293 293 util.set_hwm(q, 0)
294 294 q.bind(self.client_url('registration'))
295 295 self.log.info("Hub listening on %s for registration.", self.client_url('registration'))
296 296 if self.client_ip != self.engine_ip:
297 297 q.bind(self.engine_url('registration'))
298 298 self.log.info("Hub listening on %s for registration.", self.engine_url('registration'))
299 299
300 300 ### Engine connections ###
301 301
302 302 # heartbeat
303 303 hpub = ctx.socket(zmq.PUB)
304 304 hpub.bind(self.engine_url('hb_ping'))
305 305 hrep = ctx.socket(zmq.ROUTER)
306 306 util.set_hwm(hrep, 0)
307 307 hrep.bind(self.engine_url('hb_pong'))
308 308 self.heartmonitor = HeartMonitor(loop=loop, parent=self, log=self.log,
309 309 pingstream=ZMQStream(hpub,loop),
310 310 pongstream=ZMQStream(hrep,loop)
311 311 )
312 312
313 313 ### Client connections ###
314 314
315 315 # Notifier socket
316 316 n = ZMQStream(ctx.socket(zmq.PUB), loop)
317 317 n.bind(self.client_url('notification'))
318 318
319 319 ### build and launch the queues ###
320 320
321 321 # monitor socket
322 322 sub = ctx.socket(zmq.SUB)
323 323 sub.setsockopt(zmq.SUBSCRIBE, b"")
324 324 sub.bind(self.monitor_url)
325 325 sub.bind('inproc://monitor')
326 326 sub = ZMQStream(sub, loop)
327 327
328 328 # connect the db
329 329 db_class = _db_shortcuts.get(self.db_class.lower(), self.db_class)
330 330 self.log.info('Hub using DB backend: %r', (db_class.split('.')[-1]))
331 331 self.db = import_item(str(db_class))(session=self.session.session,
332 332 parent=self, log=self.log)
333 333 time.sleep(.25)
334 334
335 335 # resubmit stream
336 336 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
337 337 url = util.disambiguate_url(self.client_url('task'))
338 338 r.connect(url)
339 339
340 340 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
341 341 query=q, notifier=n, resubmit=r, db=self.db,
342 342 engine_info=self.engine_info, client_info=self.client_info,
343 343 log=self.log)
344 344
345 345
346 346 class Hub(SessionFactory):
347 347 """The IPython Controller Hub with 0MQ connections
348 348
349 349 Parameters
350 350 ==========
351 351 loop: zmq IOLoop instance
352 352 session: Session object
353 353 <removed> context: zmq context for creating new connections (?)
354 354 queue: ZMQStream for monitoring the command queue (SUB)
355 355 query: ZMQStream for engine registration and client queries requests (ROUTER)
356 356 heartbeat: HeartMonitor object checking the pulse of the engines
357 357 notifier: ZMQStream for broadcasting engine registration changes (PUB)
358 358 db: connection to db for out of memory logging of commands
359 359 NotImplemented
360 360 engine_info: dict of zmq connection information for engines to connect
361 361 to the queues.
362 362 client_info: dict of zmq connection information for engines to connect
363 363 to the queues.
364 364 """
365 365
366 366 engine_state_file = Unicode()
367 367
368 368 # internal data structures:
369 369 ids=Set() # engine IDs
370 370 keytable=Dict()
371 371 by_ident=Dict()
372 372 engines=Dict()
373 373 clients=Dict()
374 374 hearts=Dict()
375 375 pending=Set()
376 376 queues=Dict() # pending msg_ids keyed by engine_id
377 377 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
378 378 completed=Dict() # completed msg_ids keyed by engine_id
379 379 all_completed=Set() # completed msg_ids keyed by engine_id
380 380 dead_engines=Set() # completed msg_ids keyed by engine_id
381 381 unassigned=Set() # set of task msg_ds not yet assigned a destination
382 382 incoming_registrations=Dict()
383 383 registration_timeout=Integer()
384 384 _idcounter=Integer(0)
385 385
386 386 # objects from constructor:
387 387 query=Instance(ZMQStream)
388 388 monitor=Instance(ZMQStream)
389 389 notifier=Instance(ZMQStream)
390 390 resubmit=Instance(ZMQStream)
391 391 heartmonitor=Instance(HeartMonitor)
392 392 db=Instance(object)
393 393 client_info=Dict()
394 394 engine_info=Dict()
395 395
396 396
397 397 def __init__(self, **kwargs):
398 398 """
399 399 # universal:
400 400 loop: IOLoop for creating future connections
401 401 session: streamsession for sending serialized data
402 402 # engine:
403 403 queue: ZMQStream for monitoring queue messages
404 404 query: ZMQStream for engine+client registration and client requests
405 405 heartbeat: HeartMonitor object for tracking engines
406 406 # extra:
407 407 db: ZMQStream for db connection (NotImplemented)
408 408 engine_info: zmq address/protocol dict for engine connections
409 409 client_info: zmq address/protocol dict for client connections
410 410 """
411 411
412 412 super(Hub, self).__init__(**kwargs)
413 413 self.registration_timeout = max(10000, 5*self.heartmonitor.period)
414 414
415 415 # register our callbacks
416 416 self.query.on_recv(self.dispatch_query)
417 417 self.monitor.on_recv(self.dispatch_monitor_traffic)
418 418
419 419 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
420 420 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
421 421
422 422 self.monitor_handlers = {b'in' : self.save_queue_request,
423 423 b'out': self.save_queue_result,
424 424 b'intask': self.save_task_request,
425 425 b'outtask': self.save_task_result,
426 426 b'tracktask': self.save_task_destination,
427 427 b'incontrol': _passer,
428 428 b'outcontrol': _passer,
429 429 b'iopub': self.save_iopub_message,
430 430 }
431 431
432 432 self.query_handlers = {'queue_request': self.queue_status,
433 433 'result_request': self.get_results,
434 434 'history_request': self.get_history,
435 435 'db_request': self.db_query,
436 436 'purge_request': self.purge_results,
437 437 'load_request': self.check_load,
438 438 'resubmit_request': self.resubmit_task,
439 439 'shutdown_request': self.shutdown_request,
440 440 'registration_request' : self.register_engine,
441 441 'unregistration_request' : self.unregister_engine,
442 442 'connection_request': self.connection_request,
443 443 }
444 444
445 445 # ignore resubmit replies
446 446 self.resubmit.on_recv(lambda msg: None, copy=False)
447 447
448 448 self.log.info("hub::created hub")
449 449
450 450 @property
451 451 def _next_id(self):
452 452 """gemerate a new ID.
453 453
454 454 No longer reuse old ids, just count from 0."""
455 455 newid = self._idcounter
456 456 self._idcounter += 1
457 457 return newid
458 458 # newid = 0
459 459 # incoming = [id[0] for id in itervalues(self.incoming_registrations)]
460 460 # # print newid, self.ids, self.incoming_registrations
461 461 # while newid in self.ids or newid in incoming:
462 462 # newid += 1
463 463 # return newid
464 464
465 465 #-----------------------------------------------------------------------------
466 466 # message validation
467 467 #-----------------------------------------------------------------------------
468 468
469 469 def _validate_targets(self, targets):
470 470 """turn any valid targets argument into a list of integer ids"""
471 471 if targets is None:
472 472 # default to all
473 473 return self.ids
474 474
475 475 if isinstance(targets, (int,str,unicode_type)):
476 476 # only one target specified
477 477 targets = [targets]
478 478 _targets = []
479 479 for t in targets:
480 480 # map raw identities to ids
481 481 if isinstance(t, (str,unicode_type)):
482 482 t = self.by_ident.get(cast_bytes(t), t)
483 483 _targets.append(t)
484 484 targets = _targets
485 485 bad_targets = [ t for t in targets if t not in self.ids ]
486 486 if bad_targets:
487 487 raise IndexError("No Such Engine: %r" % bad_targets)
488 488 if not targets:
489 489 raise IndexError("No Engines Registered")
490 490 return targets
491 491
492 492 #-----------------------------------------------------------------------------
493 493 # dispatch methods (1 per stream)
494 494 #-----------------------------------------------------------------------------
495 495
496 496
497 497 @util.log_errors
498 498 def dispatch_monitor_traffic(self, msg):
499 499 """all ME and Task queue messages come through here, as well as
500 500 IOPub traffic."""
501 501 self.log.debug("monitor traffic: %r", msg[0])
502 502 switch = msg[0]
503 503 try:
504 504 idents, msg = self.session.feed_identities(msg[1:])
505 505 except ValueError:
506 506 idents=[]
507 507 if not idents:
508 508 self.log.error("Monitor message without topic: %r", msg)
509 509 return
510 510 handler = self.monitor_handlers.get(switch, None)
511 511 if handler is not None:
512 512 handler(idents, msg)
513 513 else:
514 514 self.log.error("Unrecognized monitor topic: %r", switch)
515 515
516 516
517 517 @util.log_errors
518 518 def dispatch_query(self, msg):
519 519 """Route registration requests and queries from clients."""
520 520 try:
521 521 idents, msg = self.session.feed_identities(msg)
522 522 except ValueError:
523 523 idents = []
524 524 if not idents:
525 525 self.log.error("Bad Query Message: %r", msg)
526 526 return
527 527 client_id = idents[0]
528 528 try:
529 529 msg = self.session.unserialize(msg, content=True)
530 530 except Exception:
531 531 content = error.wrap_exception()
532 532 self.log.error("Bad Query Message: %r", msg, exc_info=True)
533 533 self.session.send(self.query, "hub_error", ident=client_id,
534 534 content=content)
535 535 return
536 536 # print client_id, header, parent, content
537 537 #switch on message type:
538 538 msg_type = msg['header']['msg_type']
539 539 self.log.info("client::client %r requested %r", client_id, msg_type)
540 540 handler = self.query_handlers.get(msg_type, None)
541 541 try:
542 542 assert handler is not None, "Bad Message Type: %r" % msg_type
543 543 except:
544 544 content = error.wrap_exception()
545 545 self.log.error("Bad Message Type: %r", msg_type, exc_info=True)
546 546 self.session.send(self.query, "hub_error", ident=client_id,
547 547 content=content)
548 548 return
549 549
550 550 else:
551 551 handler(idents, msg)
552 552
553 553 def dispatch_db(self, msg):
554 554 """"""
555 555 raise NotImplementedError
556 556
557 557 #---------------------------------------------------------------------------
558 558 # handler methods (1 per event)
559 559 #---------------------------------------------------------------------------
560 560
561 561 #----------------------- Heartbeat --------------------------------------
562 562
563 563 def handle_new_heart(self, heart):
564 564 """handler to attach to heartbeater.
565 565 Called when a new heart starts to beat.
566 566 Triggers completion of registration."""
567 567 self.log.debug("heartbeat::handle_new_heart(%r)", heart)
568 568 if heart not in self.incoming_registrations:
569 569 self.log.info("heartbeat::ignoring new heart: %r", heart)
570 570 else:
571 571 self.finish_registration(heart)
572 572
573 573
574 574 def handle_heart_failure(self, heart):
575 575 """handler to attach to heartbeater.
576 576 called when a previously registered heart fails to respond to beat request.
577 577 triggers unregistration"""
578 578 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
579 579 eid = self.hearts.get(heart, None)
580 580 uuid = self.engines[eid].uuid
581 581 if eid is None or self.keytable[eid] in self.dead_engines:
582 582 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
583 583 else:
584 584 self.unregister_engine(heart, dict(content=dict(id=eid, queue=uuid)))
585 585
586 586 #----------------------- MUX Queue Traffic ------------------------------
587 587
588 588 def save_queue_request(self, idents, msg):
589 589 if len(idents) < 2:
590 590 self.log.error("invalid identity prefix: %r", idents)
591 591 return
592 592 queue_id, client_id = idents[:2]
593 593 try:
594 594 msg = self.session.unserialize(msg)
595 595 except Exception:
596 596 self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
597 597 return
598 598
599 599 eid = self.by_ident.get(queue_id, None)
600 600 if eid is None:
601 601 self.log.error("queue::target %r not registered", queue_id)
602 602 self.log.debug("queue:: valid are: %r", self.by_ident.keys())
603 603 return
604 604 record = init_record(msg)
605 605 msg_id = record['msg_id']
606 606 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
607 607 # Unicode in records
608 608 record['engine_uuid'] = queue_id.decode('ascii')
609 609 record['client_uuid'] = msg['header']['session']
610 610 record['queue'] = 'mux'
611 611
612 612 try:
613 613 # it's posible iopub arrived first:
614 614 existing = self.db.get_record(msg_id)
615 615 for key,evalue in iteritems(existing):
616 616 rvalue = record.get(key, None)
617 617 if evalue and rvalue and evalue != rvalue:
618 618 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
619 619 elif evalue and not rvalue:
620 620 record[key] = evalue
621 621 try:
622 622 self.db.update_record(msg_id, record)
623 623 except Exception:
624 624 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
625 625 except KeyError:
626 626 try:
627 627 self.db.add_record(msg_id, record)
628 628 except Exception:
629 629 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
630 630
631 631
632 632 self.pending.add(msg_id)
633 633 self.queues[eid].append(msg_id)
634 634
635 635 def save_queue_result(self, idents, msg):
636 636 if len(idents) < 2:
637 637 self.log.error("invalid identity prefix: %r", idents)
638 638 return
639 639
640 640 client_id, queue_id = idents[:2]
641 641 try:
642 642 msg = self.session.unserialize(msg)
643 643 except Exception:
644 644 self.log.error("queue::engine %r sent invalid message to %r: %r",
645 645 queue_id, client_id, msg, exc_info=True)
646 646 return
647 647
648 648 eid = self.by_ident.get(queue_id, None)
649 649 if eid is None:
650 650 self.log.error("queue::unknown engine %r is sending a reply: ", queue_id)
651 651 return
652 652
653 653 parent = msg['parent_header']
654 654 if not parent:
655 655 return
656 656 msg_id = parent['msg_id']
657 657 if msg_id in self.pending:
658 658 self.pending.remove(msg_id)
659 659 self.all_completed.add(msg_id)
660 660 self.queues[eid].remove(msg_id)
661 661 self.completed[eid].append(msg_id)
662 662 self.log.info("queue::request %r completed on %s", msg_id, eid)
663 663 elif msg_id not in self.all_completed:
664 664 # it could be a result from a dead engine that died before delivering the
665 665 # result
666 666 self.log.warn("queue:: unknown msg finished %r", msg_id)
667 667 return
668 668 # update record anyway, because the unregistration could have been premature
669 669 rheader = msg['header']
670 670 md = msg['metadata']
671 671 completed = rheader['date']
672 started = md.get('started', None)
672 started = extract_dates(md.get('started', None))
673 673 result = {
674 674 'result_header' : rheader,
675 675 'result_metadata': md,
676 676 'result_content': msg['content'],
677 677 'received': datetime.now(),
678 678 'started' : started,
679 679 'completed' : completed
680 680 }
681 681
682 682 result['result_buffers'] = msg['buffers']
683 683 try:
684 684 self.db.update_record(msg_id, result)
685 685 except Exception:
686 686 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
687 687
688 688
689 689 #--------------------- Task Queue Traffic ------------------------------
690 690
691 691 def save_task_request(self, idents, msg):
692 692 """Save the submission of a task."""
693 693 client_id = idents[0]
694 694
695 695 try:
696 696 msg = self.session.unserialize(msg)
697 697 except Exception:
698 698 self.log.error("task::client %r sent invalid task message: %r",
699 699 client_id, msg, exc_info=True)
700 700 return
701 701 record = init_record(msg)
702 702
703 703 record['client_uuid'] = msg['header']['session']
704 704 record['queue'] = 'task'
705 705 header = msg['header']
706 706 msg_id = header['msg_id']
707 707 self.pending.add(msg_id)
708 708 self.unassigned.add(msg_id)
709 709 try:
710 710 # it's posible iopub arrived first:
711 711 existing = self.db.get_record(msg_id)
712 712 if existing['resubmitted']:
713 713 for key in ('submitted', 'client_uuid', 'buffers'):
714 714 # don't clobber these keys on resubmit
715 715 # submitted and client_uuid should be different
716 716 # and buffers might be big, and shouldn't have changed
717 717 record.pop(key)
718 718 # still check content,header which should not change
719 719 # but are not expensive to compare as buffers
720 720
721 721 for key,evalue in iteritems(existing):
722 722 if key.endswith('buffers'):
723 723 # don't compare buffers
724 724 continue
725 725 rvalue = record.get(key, None)
726 726 if evalue and rvalue and evalue != rvalue:
727 727 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
728 728 elif evalue and not rvalue:
729 729 record[key] = evalue
730 730 try:
731 731 self.db.update_record(msg_id, record)
732 732 except Exception:
733 733 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
734 734 except KeyError:
735 735 try:
736 736 self.db.add_record(msg_id, record)
737 737 except Exception:
738 738 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
739 739 except Exception:
740 740 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
741 741
742 742 def save_task_result(self, idents, msg):
743 743 """save the result of a completed task."""
744 744 client_id = idents[0]
745 745 try:
746 746 msg = self.session.unserialize(msg)
747 747 except Exception:
748 748 self.log.error("task::invalid task result message send to %r: %r",
749 749 client_id, msg, exc_info=True)
750 750 return
751 751
752 752 parent = msg['parent_header']
753 753 if not parent:
754 754 # print msg
755 755 self.log.warn("Task %r had no parent!", msg)
756 756 return
757 757 msg_id = parent['msg_id']
758 758 if msg_id in self.unassigned:
759 759 self.unassigned.remove(msg_id)
760 760
761 761 header = msg['header']
762 762 md = msg['metadata']
763 763 engine_uuid = md.get('engine', u'')
764 764 eid = self.by_ident.get(cast_bytes(engine_uuid), None)
765 765
766 766 status = md.get('status', None)
767 767
768 768 if msg_id in self.pending:
769 769 self.log.info("task::task %r finished on %s", msg_id, eid)
770 770 self.pending.remove(msg_id)
771 771 self.all_completed.add(msg_id)
772 772 if eid is not None:
773 773 if status != 'aborted':
774 774 self.completed[eid].append(msg_id)
775 775 if msg_id in self.tasks[eid]:
776 776 self.tasks[eid].remove(msg_id)
777 777 completed = header['date']
778 started = md.get('started', None)
778 started = extract_dates(md.get('started', None))
779 779 result = {
780 780 'result_header' : header,
781 781 'result_metadata': msg['metadata'],
782 782 'result_content': msg['content'],
783 783 'started' : started,
784 784 'completed' : completed,
785 785 'received' : datetime.now(),
786 786 'engine_uuid': engine_uuid,
787 787 }
788 788
789 789 result['result_buffers'] = msg['buffers']
790 790 try:
791 791 self.db.update_record(msg_id, result)
792 792 except Exception:
793 793 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
794 794
795 795 else:
796 796 self.log.debug("task::unknown task %r finished", msg_id)
797 797
798 798 def save_task_destination(self, idents, msg):
799 799 try:
800 800 msg = self.session.unserialize(msg, content=True)
801 801 except Exception:
802 802 self.log.error("task::invalid task tracking message", exc_info=True)
803 803 return
804 804 content = msg['content']
805 805 # print (content)
806 806 msg_id = content['msg_id']
807 807 engine_uuid = content['engine_id']
808 808 eid = self.by_ident[cast_bytes(engine_uuid)]
809 809
810 810 self.log.info("task::task %r arrived on %r", msg_id, eid)
811 811 if msg_id in self.unassigned:
812 812 self.unassigned.remove(msg_id)
813 813 # else:
814 814 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
815 815
816 816 self.tasks[eid].append(msg_id)
817 817 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
818 818 try:
819 819 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
820 820 except Exception:
821 821 self.log.error("DB Error saving task destination %r", msg_id, exc_info=True)
822 822
823 823
824 824 def mia_task_request(self, idents, msg):
825 825 raise NotImplementedError
826 826 client_id = idents[0]
827 827 # content = dict(mia=self.mia,status='ok')
828 828 # self.session.send('mia_reply', content=content, idents=client_id)
829 829
830 830
831 831 #--------------------- IOPub Traffic ------------------------------
832 832
833 833 def save_iopub_message(self, topics, msg):
834 834 """save an iopub message into the db"""
835 835 # print (topics)
836 836 try:
837 837 msg = self.session.unserialize(msg, content=True)
838 838 except Exception:
839 839 self.log.error("iopub::invalid IOPub message", exc_info=True)
840 840 return
841 841
842 842 parent = msg['parent_header']
843 843 if not parent:
844 844 self.log.warn("iopub::IOPub message lacks parent: %r", msg)
845 845 return
846 846 msg_id = parent['msg_id']
847 847 msg_type = msg['header']['msg_type']
848 848 content = msg['content']
849 849
850 850 # ensure msg_id is in db
851 851 try:
852 852 rec = self.db.get_record(msg_id)
853 853 except KeyError:
854 854 rec = empty_record()
855 855 rec['msg_id'] = msg_id
856 856 self.db.add_record(msg_id, rec)
857 857 # stream
858 858 d = {}
859 859 if msg_type == 'stream':
860 860 name = content['name']
861 861 s = rec[name] or ''
862 862 d[name] = s + content['data']
863 863
864 864 elif msg_type == 'pyerr':
865 865 d['pyerr'] = content
866 866 elif msg_type == 'pyin':
867 867 d['pyin'] = content['code']
868 868 elif msg_type in ('display_data', 'pyout'):
869 869 d[msg_type] = content
870 870 elif msg_type == 'status':
871 871 pass
872 872 elif msg_type == 'data_pub':
873 873 self.log.info("ignored data_pub message for %s" % msg_id)
874 874 else:
875 875 self.log.warn("unhandled iopub msg_type: %r", msg_type)
876 876
877 877 if not d:
878 878 return
879 879
880 880 try:
881 881 self.db.update_record(msg_id, d)
882 882 except Exception:
883 883 self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True)
884 884
885 885
886 886
887 887 #-------------------------------------------------------------------------
888 888 # Registration requests
889 889 #-------------------------------------------------------------------------
890 890
891 891 def connection_request(self, client_id, msg):
892 892 """Reply with connection addresses for clients."""
893 893 self.log.info("client::client %r connected", client_id)
894 894 content = dict(status='ok')
895 895 jsonable = {}
896 896 for k,v in iteritems(self.keytable):
897 897 if v not in self.dead_engines:
898 898 jsonable[str(k)] = v
899 899 content['engines'] = jsonable
900 900 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
901 901
902 902 def register_engine(self, reg, msg):
903 903 """Register a new engine."""
904 904 content = msg['content']
905 905 try:
906 906 uuid = content['uuid']
907 907 except KeyError:
908 908 self.log.error("registration::queue not specified", exc_info=True)
909 909 return
910 910
911 911 eid = self._next_id
912 912
913 913 self.log.debug("registration::register_engine(%i, %r)", eid, uuid)
914 914
915 915 content = dict(id=eid,status='ok',hb_period=self.heartmonitor.period)
916 916 # check if requesting available IDs:
917 917 if cast_bytes(uuid) in self.by_ident:
918 918 try:
919 919 raise KeyError("uuid %r in use" % uuid)
920 920 except:
921 921 content = error.wrap_exception()
922 922 self.log.error("uuid %r in use", uuid, exc_info=True)
923 923 else:
924 924 for h, ec in iteritems(self.incoming_registrations):
925 925 if uuid == h:
926 926 try:
927 927 raise KeyError("heart_id %r in use" % uuid)
928 928 except:
929 929 self.log.error("heart_id %r in use", uuid, exc_info=True)
930 930 content = error.wrap_exception()
931 931 break
932 932 elif uuid == ec.uuid:
933 933 try:
934 934 raise KeyError("uuid %r in use" % uuid)
935 935 except:
936 936 self.log.error("uuid %r in use", uuid, exc_info=True)
937 937 content = error.wrap_exception()
938 938 break
939 939
940 940 msg = self.session.send(self.query, "registration_reply",
941 941 content=content,
942 942 ident=reg)
943 943
944 944 heart = cast_bytes(uuid)
945 945
946 946 if content['status'] == 'ok':
947 947 if heart in self.heartmonitor.hearts:
948 948 # already beating
949 949 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid)
950 950 self.finish_registration(heart)
951 951 else:
952 952 purge = lambda : self._purge_stalled_registration(heart)
953 953 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
954 954 dc.start()
955 955 self.incoming_registrations[heart] = EngineConnector(id=eid,uuid=uuid,stallback=dc)
956 956 else:
957 957 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
958 958
959 959 return eid
960 960
961 961 def unregister_engine(self, ident, msg):
962 962 """Unregister an engine that explicitly requested to leave."""
963 963 try:
964 964 eid = msg['content']['id']
965 965 except:
966 966 self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
967 967 return
968 968 self.log.info("registration::unregister_engine(%r)", eid)
969 969 # print (eid)
970 970 uuid = self.keytable[eid]
971 971 content=dict(id=eid, uuid=uuid)
972 972 self.dead_engines.add(uuid)
973 973 # self.ids.remove(eid)
974 974 # uuid = self.keytable.pop(eid)
975 975 #
976 976 # ec = self.engines.pop(eid)
977 977 # self.hearts.pop(ec.heartbeat)
978 978 # self.by_ident.pop(ec.queue)
979 979 # self.completed.pop(eid)
980 980 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
981 981 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
982 982 dc.start()
983 983 ############## TODO: HANDLE IT ################
984 984
985 985 self._save_engine_state()
986 986
987 987 if self.notifier:
988 988 self.session.send(self.notifier, "unregistration_notification", content=content)
989 989
990 990 def _handle_stranded_msgs(self, eid, uuid):
991 991 """Handle messages known to be on an engine when the engine unregisters.
992 992
993 993 It is possible that this will fire prematurely - that is, an engine will
994 994 go down after completing a result, and the client will be notified
995 995 that the result failed and later receive the actual result.
996 996 """
997 997
998 998 outstanding = self.queues[eid]
999 999
1000 1000 for msg_id in outstanding:
1001 1001 self.pending.remove(msg_id)
1002 1002 self.all_completed.add(msg_id)
1003 1003 try:
1004 1004 raise error.EngineError("Engine %r died while running task %r" % (eid, msg_id))
1005 1005 except:
1006 1006 content = error.wrap_exception()
1007 1007 # build a fake header:
1008 1008 header = {}
1009 1009 header['engine'] = uuid
1010 1010 header['date'] = datetime.now()
1011 1011 rec = dict(result_content=content, result_header=header, result_buffers=[])
1012 1012 rec['completed'] = header['date']
1013 1013 rec['engine_uuid'] = uuid
1014 1014 try:
1015 1015 self.db.update_record(msg_id, rec)
1016 1016 except Exception:
1017 1017 self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True)
1018 1018
1019 1019
1020 1020 def finish_registration(self, heart):
1021 1021 """Second half of engine registration, called after our HeartMonitor
1022 1022 has received a beat from the Engine's Heart."""
1023 1023 try:
1024 1024 ec = self.incoming_registrations.pop(heart)
1025 1025 except KeyError:
1026 1026 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
1027 1027 return
1028 1028 self.log.info("registration::finished registering engine %i:%s", ec.id, ec.uuid)
1029 1029 if ec.stallback is not None:
1030 1030 ec.stallback.stop()
1031 1031 eid = ec.id
1032 1032 self.ids.add(eid)
1033 1033 self.keytable[eid] = ec.uuid
1034 1034 self.engines[eid] = ec
1035 1035 self.by_ident[cast_bytes(ec.uuid)] = ec.id
1036 1036 self.queues[eid] = list()
1037 1037 self.tasks[eid] = list()
1038 1038 self.completed[eid] = list()
1039 1039 self.hearts[heart] = eid
1040 1040 content = dict(id=eid, uuid=self.engines[eid].uuid)
1041 1041 if self.notifier:
1042 1042 self.session.send(self.notifier, "registration_notification", content=content)
1043 1043 self.log.info("engine::Engine Connected: %i", eid)
1044 1044
1045 1045 self._save_engine_state()
1046 1046
1047 1047 def _purge_stalled_registration(self, heart):
1048 1048 if heart in self.incoming_registrations:
1049 1049 ec = self.incoming_registrations.pop(heart)
1050 1050 self.log.info("registration::purging stalled registration: %i", ec.id)
1051 1051 else:
1052 1052 pass
1053 1053
1054 1054 #-------------------------------------------------------------------------
1055 1055 # Engine State
1056 1056 #-------------------------------------------------------------------------
1057 1057
1058 1058
1059 1059 def _cleanup_engine_state_file(self):
1060 1060 """cleanup engine state mapping"""
1061 1061
1062 1062 if os.path.exists(self.engine_state_file):
1063 1063 self.log.debug("cleaning up engine state: %s", self.engine_state_file)
1064 1064 try:
1065 1065 os.remove(self.engine_state_file)
1066 1066 except IOError:
1067 1067 self.log.error("Couldn't cleanup file: %s", self.engine_state_file, exc_info=True)
1068 1068
1069 1069
1070 1070 def _save_engine_state(self):
1071 1071 """save engine mapping to JSON file"""
1072 1072 if not self.engine_state_file:
1073 1073 return
1074 1074 self.log.debug("save engine state to %s" % self.engine_state_file)
1075 1075 state = {}
1076 1076 engines = {}
1077 1077 for eid, ec in iteritems(self.engines):
1078 1078 if ec.uuid not in self.dead_engines:
1079 1079 engines[eid] = ec.uuid
1080 1080
1081 1081 state['engines'] = engines
1082 1082
1083 1083 state['next_id'] = self._idcounter
1084 1084
1085 1085 with open(self.engine_state_file, 'w') as f:
1086 1086 json.dump(state, f)
1087 1087
1088 1088
1089 1089 def _load_engine_state(self):
1090 1090 """load engine mapping from JSON file"""
1091 1091 if not os.path.exists(self.engine_state_file):
1092 1092 return
1093 1093
1094 1094 self.log.info("loading engine state from %s" % self.engine_state_file)
1095 1095
1096 1096 with open(self.engine_state_file) as f:
1097 1097 state = json.load(f)
1098 1098
1099 1099 save_notifier = self.notifier
1100 1100 self.notifier = None
1101 1101 for eid, uuid in iteritems(state['engines']):
1102 1102 heart = uuid.encode('ascii')
1103 1103 # start with this heart as current and beating:
1104 1104 self.heartmonitor.responses.add(heart)
1105 1105 self.heartmonitor.hearts.add(heart)
1106 1106
1107 1107 self.incoming_registrations[heart] = EngineConnector(id=int(eid), uuid=uuid)
1108 1108 self.finish_registration(heart)
1109 1109
1110 1110 self.notifier = save_notifier
1111 1111
1112 1112 self._idcounter = state['next_id']
1113 1113
1114 1114 #-------------------------------------------------------------------------
1115 1115 # Client Requests
1116 1116 #-------------------------------------------------------------------------
1117 1117
1118 1118 def shutdown_request(self, client_id, msg):
1119 1119 """handle shutdown request."""
1120 1120 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1121 1121 # also notify other clients of shutdown
1122 1122 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1123 1123 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1124 1124 dc.start()
1125 1125
1126 1126 def _shutdown(self):
1127 1127 self.log.info("hub::hub shutting down.")
1128 1128 time.sleep(0.1)
1129 1129 sys.exit(0)
1130 1130
1131 1131
1132 1132 def check_load(self, client_id, msg):
1133 1133 content = msg['content']
1134 1134 try:
1135 1135 targets = content['targets']
1136 1136 targets = self._validate_targets(targets)
1137 1137 except:
1138 1138 content = error.wrap_exception()
1139 1139 self.session.send(self.query, "hub_error",
1140 1140 content=content, ident=client_id)
1141 1141 return
1142 1142
1143 1143 content = dict(status='ok')
1144 1144 # loads = {}
1145 1145 for t in targets:
1146 1146 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1147 1147 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1148 1148
1149 1149
1150 1150 def queue_status(self, client_id, msg):
1151 1151 """Return the Queue status of one or more targets.
1152 1152
1153 1153 If verbose, return the msg_ids, else return len of each type.
1154 1154
1155 1155 Keys:
1156 1156
1157 1157 * queue (pending MUX jobs)
1158 1158 * tasks (pending Task jobs)
1159 1159 * completed (finished jobs from both queues)
1160 1160 """
1161 1161 content = msg['content']
1162 1162 targets = content['targets']
1163 1163 try:
1164 1164 targets = self._validate_targets(targets)
1165 1165 except:
1166 1166 content = error.wrap_exception()
1167 1167 self.session.send(self.query, "hub_error",
1168 1168 content=content, ident=client_id)
1169 1169 return
1170 1170 verbose = content.get('verbose', False)
1171 1171 content = dict(status='ok')
1172 1172 for t in targets:
1173 1173 queue = self.queues[t]
1174 1174 completed = self.completed[t]
1175 1175 tasks = self.tasks[t]
1176 1176 if not verbose:
1177 1177 queue = len(queue)
1178 1178 completed = len(completed)
1179 1179 tasks = len(tasks)
1180 1180 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1181 1181 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1182 1182 # print (content)
1183 1183 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1184 1184
1185 1185 def purge_results(self, client_id, msg):
1186 1186 """Purge results from memory. This method is more valuable before we move
1187 1187 to a DB based message storage mechanism."""
1188 1188 content = msg['content']
1189 1189 self.log.info("Dropping records with %s", content)
1190 1190 msg_ids = content.get('msg_ids', [])
1191 1191 reply = dict(status='ok')
1192 1192 if msg_ids == 'all':
1193 1193 try:
1194 1194 self.db.drop_matching_records(dict(completed={'$ne':None}))
1195 1195 except Exception:
1196 1196 reply = error.wrap_exception()
1197 1197 else:
1198 1198 pending = [m for m in msg_ids if (m in self.pending)]
1199 1199 if pending:
1200 1200 try:
1201 1201 raise IndexError("msg pending: %r" % pending[0])
1202 1202 except:
1203 1203 reply = error.wrap_exception()
1204 1204 else:
1205 1205 try:
1206 1206 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1207 1207 except Exception:
1208 1208 reply = error.wrap_exception()
1209 1209
1210 1210 if reply['status'] == 'ok':
1211 1211 eids = content.get('engine_ids', [])
1212 1212 for eid in eids:
1213 1213 if eid not in self.engines:
1214 1214 try:
1215 1215 raise IndexError("No such engine: %i" % eid)
1216 1216 except:
1217 1217 reply = error.wrap_exception()
1218 1218 break
1219 1219 uid = self.engines[eid].uuid
1220 1220 try:
1221 1221 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1222 1222 except Exception:
1223 1223 reply = error.wrap_exception()
1224 1224 break
1225 1225
1226 1226 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1227 1227
1228 1228 def resubmit_task(self, client_id, msg):
1229 1229 """Resubmit one or more tasks."""
1230 1230 def finish(reply):
1231 1231 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1232 1232
1233 1233 content = msg['content']
1234 1234 msg_ids = content['msg_ids']
1235 1235 reply = dict(status='ok')
1236 1236 try:
1237 1237 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1238 1238 'header', 'content', 'buffers'])
1239 1239 except Exception:
1240 1240 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1241 1241 return finish(error.wrap_exception())
1242 1242
1243 1243 # validate msg_ids
1244 1244 found_ids = [ rec['msg_id'] for rec in records ]
1245 1245 pending_ids = [ msg_id for msg_id in found_ids if msg_id in self.pending ]
1246 1246 if len(records) > len(msg_ids):
1247 1247 try:
1248 1248 raise RuntimeError("DB appears to be in an inconsistent state."
1249 1249 "More matching records were found than should exist")
1250 1250 except Exception:
1251 1251 return finish(error.wrap_exception())
1252 1252 elif len(records) < len(msg_ids):
1253 1253 missing = [ m for m in msg_ids if m not in found_ids ]
1254 1254 try:
1255 1255 raise KeyError("No such msg(s): %r" % missing)
1256 1256 except KeyError:
1257 1257 return finish(error.wrap_exception())
1258 1258 elif pending_ids:
1259 1259 pass
1260 1260 # no need to raise on resubmit of pending task, now that we
1261 1261 # resubmit under new ID, but do we want to raise anyway?
1262 1262 # msg_id = invalid_ids[0]
1263 1263 # try:
1264 1264 # raise ValueError("Task(s) %r appears to be inflight" % )
1265 1265 # except Exception:
1266 1266 # return finish(error.wrap_exception())
1267 1267
1268 1268 # mapping of original IDs to resubmitted IDs
1269 1269 resubmitted = {}
1270 1270
1271 1271 # send the messages
1272 1272 for rec in records:
1273 1273 header = rec['header']
1274 1274 msg = self.session.msg(header['msg_type'], parent=header)
1275 1275 msg_id = msg['msg_id']
1276 1276 msg['content'] = rec['content']
1277 1277
1278 1278 # use the old header, but update msg_id and timestamp
1279 1279 fresh = msg['header']
1280 1280 header['msg_id'] = fresh['msg_id']
1281 1281 header['date'] = fresh['date']
1282 1282 msg['header'] = header
1283 1283
1284 1284 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1285 1285
1286 1286 resubmitted[rec['msg_id']] = msg_id
1287 1287 self.pending.add(msg_id)
1288 1288 msg['buffers'] = rec['buffers']
1289 1289 try:
1290 1290 self.db.add_record(msg_id, init_record(msg))
1291 1291 except Exception:
1292 1292 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1293 1293 return finish(error.wrap_exception())
1294 1294
1295 1295 finish(dict(status='ok', resubmitted=resubmitted))
1296 1296
1297 1297 # store the new IDs in the Task DB
1298 1298 for msg_id, resubmit_id in iteritems(resubmitted):
1299 1299 try:
1300 1300 self.db.update_record(msg_id, {'resubmitted' : resubmit_id})
1301 1301 except Exception:
1302 1302 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1303 1303
1304 1304
1305 1305 def _extract_record(self, rec):
1306 1306 """decompose a TaskRecord dict into subsection of reply for get_result"""
1307 1307 io_dict = {}
1308 1308 for key in ('pyin', 'pyout', 'pyerr', 'stdout', 'stderr'):
1309 1309 io_dict[key] = rec[key]
1310 1310 content = {
1311 1311 'header': rec['header'],
1312 1312 'metadata': rec['metadata'],
1313 1313 'result_metadata': rec['result_metadata'],
1314 1314 'result_header' : rec['result_header'],
1315 1315 'result_content': rec['result_content'],
1316 1316 'received' : rec['received'],
1317 1317 'io' : io_dict,
1318 1318 }
1319 1319 if rec['result_buffers']:
1320 1320 buffers = list(map(bytes, rec['result_buffers']))
1321 1321 else:
1322 1322 buffers = []
1323 1323
1324 1324 return content, buffers
1325 1325
1326 1326 def get_results(self, client_id, msg):
1327 1327 """Get the result of 1 or more messages."""
1328 1328 content = msg['content']
1329 1329 msg_ids = sorted(set(content['msg_ids']))
1330 1330 statusonly = content.get('status_only', False)
1331 1331 pending = []
1332 1332 completed = []
1333 1333 content = dict(status='ok')
1334 1334 content['pending'] = pending
1335 1335 content['completed'] = completed
1336 1336 buffers = []
1337 1337 if not statusonly:
1338 1338 try:
1339 1339 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1340 1340 # turn match list into dict, for faster lookup
1341 1341 records = {}
1342 1342 for rec in matches:
1343 1343 records[rec['msg_id']] = rec
1344 1344 except Exception:
1345 1345 content = error.wrap_exception()
1346 1346 self.session.send(self.query, "result_reply", content=content,
1347 1347 parent=msg, ident=client_id)
1348 1348 return
1349 1349 else:
1350 1350 records = {}
1351 1351 for msg_id in msg_ids:
1352 1352 if msg_id in self.pending:
1353 1353 pending.append(msg_id)
1354 1354 elif msg_id in self.all_completed:
1355 1355 completed.append(msg_id)
1356 1356 if not statusonly:
1357 1357 c,bufs = self._extract_record(records[msg_id])
1358 1358 content[msg_id] = c
1359 1359 buffers.extend(bufs)
1360 1360 elif msg_id in records:
1361 1361 if rec['completed']:
1362 1362 completed.append(msg_id)
1363 1363 c,bufs = self._extract_record(records[msg_id])
1364 1364 content[msg_id] = c
1365 1365 buffers.extend(bufs)
1366 1366 else:
1367 1367 pending.append(msg_id)
1368 1368 else:
1369 1369 try:
1370 1370 raise KeyError('No such message: '+msg_id)
1371 1371 except:
1372 1372 content = error.wrap_exception()
1373 1373 break
1374 1374 self.session.send(self.query, "result_reply", content=content,
1375 1375 parent=msg, ident=client_id,
1376 1376 buffers=buffers)
1377 1377
1378 1378 def get_history(self, client_id, msg):
1379 1379 """Get a list of all msg_ids in our DB records"""
1380 1380 try:
1381 1381 msg_ids = self.db.get_history()
1382 1382 except Exception as e:
1383 1383 content = error.wrap_exception()
1384 1384 else:
1385 1385 content = dict(status='ok', history=msg_ids)
1386 1386
1387 1387 self.session.send(self.query, "history_reply", content=content,
1388 1388 parent=msg, ident=client_id)
1389 1389
1390 1390 def db_query(self, client_id, msg):
1391 1391 """Perform a raw query on the task record database."""
1392 1392 content = msg['content']
1393 1393 query = extract_dates(content.get('query', {}))
1394 1394 keys = content.get('keys', None)
1395 1395 buffers = []
1396 1396 empty = list()
1397 1397 try:
1398 1398 records = self.db.find_records(query, keys)
1399 1399 except Exception as e:
1400 1400 content = error.wrap_exception()
1401 1401 else:
1402 1402 # extract buffers from reply content:
1403 1403 if keys is not None:
1404 1404 buffer_lens = [] if 'buffers' in keys else None
1405 1405 result_buffer_lens = [] if 'result_buffers' in keys else None
1406 1406 else:
1407 1407 buffer_lens = None
1408 1408 result_buffer_lens = None
1409 1409
1410 1410 for rec in records:
1411 1411 # buffers may be None, so double check
1412 1412 b = rec.pop('buffers', empty) or empty
1413 1413 if buffer_lens is not None:
1414 1414 buffer_lens.append(len(b))
1415 1415 buffers.extend(b)
1416 1416 rb = rec.pop('result_buffers', empty) or empty
1417 1417 if result_buffer_lens is not None:
1418 1418 result_buffer_lens.append(len(rb))
1419 1419 buffers.extend(rb)
1420 1420 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1421 1421 result_buffer_lens=result_buffer_lens)
1422 1422 # self.log.debug (content)
1423 1423 self.session.send(self.query, "db_reply", content=content,
1424 1424 parent=msg, ident=client_id,
1425 1425 buffers=buffers)
1426 1426
General Comments 0
You need to be logged in to leave comments. Login now