Show More
@@ -307,7 +307,7 b' class Hub(LoggingFactory):' | |||
|
307 | 307 | completed=Dict() # completed msg_ids keyed by engine_id |
|
308 | 308 | all_completed=Set() # completed msg_ids keyed by engine_id |
|
309 | 309 | dead_engines=Set() # completed msg_ids keyed by engine_id |
|
310 | # mia=None | |
|
310 | unassigned=Set() # set of task msg_ds not yet assigned a destination | |
|
311 | 311 | incoming_registrations=Dict() |
|
312 | 312 | registration_timeout=Int() |
|
313 | 313 | _idcounter=Int(0) |
@@ -640,6 +640,7 b' class Hub(LoggingFactory):' | |||
|
640 | 640 | header = msg['header'] |
|
641 | 641 | msg_id = header['msg_id'] |
|
642 | 642 | self.pending.add(msg_id) |
|
643 | self.unassigned.add(msg_id) | |
|
643 | 644 | try: |
|
644 | 645 | # it's posible iopub arrived first: |
|
645 | 646 | existing = self.db.get_record(msg_id) |
@@ -670,6 +671,8 b' class Hub(LoggingFactory):' | |||
|
670 | 671 | self.log.warn("Task %r had no parent!"%msg) |
|
671 | 672 | return |
|
672 | 673 | msg_id = parent['msg_id'] |
|
674 | if msg_id in self.unassigned: | |
|
675 | self.unassigned.remove(msg_id) | |
|
673 | 676 | |
|
674 | 677 | header = msg['header'] |
|
675 | 678 | engine_uuid = header.get('engine', None) |
@@ -713,8 +716,8 b' class Hub(LoggingFactory):' | |||
|
713 | 716 | eid = self.by_ident[engine_uuid] |
|
714 | 717 | |
|
715 | 718 | self.log.info("task::task %s arrived on %s"%(msg_id, eid)) |
|
716 |
|
|
|
717 |
|
|
|
719 | if msg_id in self.unassigned: | |
|
720 | self.unassigned.remove(msg_id) | |
|
718 | 721 | # else: |
|
719 | 722 | # self.log.debug("task::task %s not listed as MIA?!"%(msg_id)) |
|
720 | 723 | |
@@ -1003,7 +1006,8 b' class Hub(LoggingFactory):' | |||
|
1003 | 1006 | completed = len(completed) |
|
1004 | 1007 | tasks = len(tasks) |
|
1005 | 1008 | content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks} |
|
1006 | # pending | |
|
1009 | content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned) | |
|
1010 | ||
|
1007 | 1011 | self.session.send(self.query, "queue_reply", content=content, ident=client_id) |
|
1008 | 1012 | |
|
1009 | 1013 | def purge_results(self, client_id, msg): |
@@ -380,8 +380,9 b' class Kernel(SessionFactory):' | |||
|
380 | 380 | self.aborted.remove(msg_id) |
|
381 | 381 | # is it safe to assume a msg_id will not be resubmitted? |
|
382 | 382 | reply_type = msg['msg_type'].split('_')[0] + '_reply' |
|
383 | reply_msg = self.session.send(stream, reply_type, | |
|
384 | content={'status' : 'aborted'}, parent=msg, ident=idents) | |
|
383 | status = {'status' : 'aborted'} | |
|
384 | reply_msg = self.session.send(stream, reply_type, subheader=status, | |
|
385 | content=status, parent=msg, ident=idents) | |
|
385 | 386 | return |
|
386 | 387 | handler = self.shell_handlers.get(msg['msg_type'], None) |
|
387 | 388 | if handler is None: |
General Comments 0
You need to be logged in to leave comments.
Login now