From 9566c9d239be64d5079cbd3e371c445cbb9fc0b9 2011-04-08 22:21:21 From: MinRK Date: 2011-04-08 22:21:21 Subject: [PATCH] better handle aborted/unschedulers tasks --- diff --git a/IPython/parallel/controller/hub.py b/IPython/parallel/controller/hub.py index 57a1ecf..a32dde7 100755 --- a/IPython/parallel/controller/hub.py +++ b/IPython/parallel/controller/hub.py @@ -307,7 +307,7 @@ class Hub(LoggingFactory): completed=Dict() # completed msg_ids keyed by engine_id all_completed=Set() # completed msg_ids keyed by engine_id dead_engines=Set() # completed msg_ids keyed by engine_id - # mia=None + unassigned=Set() # set of task msg_ds not yet assigned a destination incoming_registrations=Dict() registration_timeout=Int() _idcounter=Int(0) @@ -640,6 +640,7 @@ class Hub(LoggingFactory): header = msg['header'] msg_id = header['msg_id'] self.pending.add(msg_id) + self.unassigned.add(msg_id) try: # it's posible iopub arrived first: existing = self.db.get_record(msg_id) @@ -670,6 +671,8 @@ class Hub(LoggingFactory): self.log.warn("Task %r had no parent!"%msg) return msg_id = parent['msg_id'] + if msg_id in self.unassigned: + self.unassigned.remove(msg_id) header = msg['header'] engine_uuid = header.get('engine', None) @@ -713,8 +716,8 @@ class Hub(LoggingFactory): eid = self.by_ident[engine_uuid] self.log.info("task::task %s arrived on %s"%(msg_id, eid)) - # if msg_id in self.mia: - # self.mia.remove(msg_id) + if msg_id in self.unassigned: + self.unassigned.remove(msg_id) # else: # self.log.debug("task::task %s not listed as MIA?!"%(msg_id)) @@ -1003,7 +1006,8 @@ class Hub(LoggingFactory): completed = len(completed) tasks = len(tasks) content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks} - # pending + content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned) + self.session.send(self.query, "queue_reply", content=content, ident=client_id) def purge_results(self, client_id, msg): diff --git a/IPython/parallel/engine/streamkernel.py b/IPython/parallel/engine/streamkernel.py index 8aaf71e..6c03e06 100755 --- a/IPython/parallel/engine/streamkernel.py +++ b/IPython/parallel/engine/streamkernel.py @@ -380,8 +380,9 @@ class Kernel(SessionFactory): self.aborted.remove(msg_id) # is it safe to assume a msg_id will not be resubmitted? reply_type = msg['msg_type'].split('_')[0] + '_reply' - reply_msg = self.session.send(stream, reply_type, - content={'status' : 'aborted'}, parent=msg, ident=idents) + status = {'status' : 'aborted'} + reply_msg = self.session.send(stream, reply_type, subheader=status, + content=status, parent=msg, ident=idents) return handler = self.shell_handlers.get(msg['msg_type'], None) if handler is None: