Show More
@@ -307,7 +307,7 b' class Hub(LoggingFactory):' | |||||
307 | completed=Dict() # completed msg_ids keyed by engine_id |
|
307 | completed=Dict() # completed msg_ids keyed by engine_id | |
308 | all_completed=Set() # completed msg_ids keyed by engine_id |
|
308 | all_completed=Set() # completed msg_ids keyed by engine_id | |
309 | dead_engines=Set() # completed msg_ids keyed by engine_id |
|
309 | dead_engines=Set() # completed msg_ids keyed by engine_id | |
310 | # mia=None |
|
310 | unassigned=Set() # set of task msg_ds not yet assigned a destination | |
311 | incoming_registrations=Dict() |
|
311 | incoming_registrations=Dict() | |
312 | registration_timeout=Int() |
|
312 | registration_timeout=Int() | |
313 | _idcounter=Int(0) |
|
313 | _idcounter=Int(0) | |
@@ -640,6 +640,7 b' class Hub(LoggingFactory):' | |||||
640 | header = msg['header'] |
|
640 | header = msg['header'] | |
641 | msg_id = header['msg_id'] |
|
641 | msg_id = header['msg_id'] | |
642 | self.pending.add(msg_id) |
|
642 | self.pending.add(msg_id) | |
|
643 | self.unassigned.add(msg_id) | |||
643 | try: |
|
644 | try: | |
644 | # it's posible iopub arrived first: |
|
645 | # it's posible iopub arrived first: | |
645 | existing = self.db.get_record(msg_id) |
|
646 | existing = self.db.get_record(msg_id) | |
@@ -670,6 +671,8 b' class Hub(LoggingFactory):' | |||||
670 | self.log.warn("Task %r had no parent!"%msg) |
|
671 | self.log.warn("Task %r had no parent!"%msg) | |
671 | return |
|
672 | return | |
672 | msg_id = parent['msg_id'] |
|
673 | msg_id = parent['msg_id'] | |
|
674 | if msg_id in self.unassigned: | |||
|
675 | self.unassigned.remove(msg_id) | |||
673 |
|
676 | |||
674 | header = msg['header'] |
|
677 | header = msg['header'] | |
675 | engine_uuid = header.get('engine', None) |
|
678 | engine_uuid = header.get('engine', None) | |
@@ -713,8 +716,8 b' class Hub(LoggingFactory):' | |||||
713 | eid = self.by_ident[engine_uuid] |
|
716 | eid = self.by_ident[engine_uuid] | |
714 |
|
717 | |||
715 | self.log.info("task::task %s arrived on %s"%(msg_id, eid)) |
|
718 | self.log.info("task::task %s arrived on %s"%(msg_id, eid)) | |
716 |
|
|
719 | if msg_id in self.unassigned: | |
717 |
|
|
720 | self.unassigned.remove(msg_id) | |
718 | # else: |
|
721 | # else: | |
719 | # self.log.debug("task::task %s not listed as MIA?!"%(msg_id)) |
|
722 | # self.log.debug("task::task %s not listed as MIA?!"%(msg_id)) | |
720 |
|
723 | |||
@@ -1003,7 +1006,8 b' class Hub(LoggingFactory):' | |||||
1003 | completed = len(completed) |
|
1006 | completed = len(completed) | |
1004 | tasks = len(tasks) |
|
1007 | tasks = len(tasks) | |
1005 | content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks} |
|
1008 | content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks} | |
1006 | # pending |
|
1009 | content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned) | |
|
1010 | ||||
1007 | self.session.send(self.query, "queue_reply", content=content, ident=client_id) |
|
1011 | self.session.send(self.query, "queue_reply", content=content, ident=client_id) | |
1008 |
|
1012 | |||
1009 | def purge_results(self, client_id, msg): |
|
1013 | def purge_results(self, client_id, msg): |
@@ -380,8 +380,9 b' class Kernel(SessionFactory):' | |||||
380 | self.aborted.remove(msg_id) |
|
380 | self.aborted.remove(msg_id) | |
381 | # is it safe to assume a msg_id will not be resubmitted? |
|
381 | # is it safe to assume a msg_id will not be resubmitted? | |
382 | reply_type = msg['msg_type'].split('_')[0] + '_reply' |
|
382 | reply_type = msg['msg_type'].split('_')[0] + '_reply' | |
383 | reply_msg = self.session.send(stream, reply_type, |
|
383 | status = {'status' : 'aborted'} | |
384 | content={'status' : 'aborted'}, parent=msg, ident=idents) |
|
384 | reply_msg = self.session.send(stream, reply_type, subheader=status, | |
|
385 | content=status, parent=msg, ident=idents) | |||
385 | return |
|
386 | return | |
386 | handler = self.shell_handlers.get(msg['msg_type'], None) |
|
387 | handler = self.shell_handlers.get(msg['msg_type'], None) | |
387 | if handler is None: |
|
388 | if handler is None: |
General Comments 0
You need to be logged in to leave comments.
Login now