diff --git a/IPython/zmq/parallel/scheduler.py b/IPython/zmq/parallel/scheduler.py index 216c512..5f4f687 100644 --- a/IPython/zmq/parallel/scheduler.py +++ b/IPython/zmq/parallel/scheduler.py @@ -150,7 +150,7 @@ class TaskScheduler(SessionFactory): unregistration_notification = self._unregister_engine ) self.notifier_stream.on_recv(self.dispatch_notification) - self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 1e3, self.loop) # 1 Hz + self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz self.auditor.start() self.log.info("Scheduler started...%r"%self) @@ -209,19 +209,37 @@ class TaskScheduler(SessionFactory): # map(self.destinations.pop, self.completed.pop(uid)) # map(self.destinations.pop, self.failed.pop(uid)) - lost = self.pending.pop(uid) - idx = self.targets.index(uid) self.targets.pop(idx) self.loads.pop(idx) - self.handle_stranded_tasks(lost) + # wait 5 seconds before cleaning up pending jobs, since the results might + # still be incoming + if self.pending[uid]: + dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop) + dc.start() - def handle_stranded_tasks(self, lost): + @logged + def handle_stranded_tasks(self, engine): """Deal with jobs resident in an engine that died.""" - # TODO: resubmit the tasks? - for msg_id in lost: - pass + lost = self.pending.pop(engine) + + for msg_id, (raw_msg,follow) in lost.iteritems(): + self.all_failed.add(msg_id) + self.all_done.add(msg_id) + idents,msg = self.session.feed_identities(raw_msg, copy=False) + msg = self.session.unpack_message(msg, copy=False, content=False) + parent = msg['header'] + idents = [idents[0],engine]+idents[1:] + print (idents) + try: + raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id)) + except: + content = ss.wrap_exception() + msg = self.session.send(self.client_stream, 'apply_reply', content, + parent=parent, ident=idents) + self.session.send(self.mon_stream, msg, ident=['outtask']+idents) + self.update_dependencies(msg_id) #----------------------------------------------------------------------- @@ -359,7 +377,7 @@ class TaskScheduler(SessionFactory): self.dependencies[dep_id].add(msg_id) @logged - def submit_task(self, msg_id, msg, follow=None, indices=None): + def submit_task(self, msg_id, raw_msg, follow=None, indices=None): """Submit a task to any of a subset of our targets.""" if indices: loads = [self.loads[i] for i in indices] @@ -371,9 +389,9 @@ class TaskScheduler(SessionFactory): target = self.targets[idx] # print (target, map(str, msg[:3])) self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False) - self.engine_stream.send_multipart(msg, copy=False) + self.engine_stream.send_multipart(raw_msg, copy=False) self.add_job(idx) - self.pending[target][msg_id] = (msg, follow) + self.pending[target][msg_id] = (raw_msg, follow) content = dict(msg_id=msg_id, engine_id=target) self.session.send(self.mon_stream, 'task_destination', content=content, ident=['tracktask',self.session.session]) @@ -409,6 +427,7 @@ class TaskScheduler(SessionFactory): self.client_stream.send_multipart(raw_msg, copy=False) # now, update our data structures msg_id = parent['msg_id'] + self.blacklist.pop(msg_id, None) self.pending[engine].pop(msg_id) if success: self.completed[engine].add(msg_id) diff --git a/IPython/zmq/parallel/util.py b/IPython/zmq/parallel/util.py index 61d9512..cd3038b 100644 --- a/IPython/zmq/parallel/util.py +++ b/IPython/zmq/parallel/util.py @@ -24,7 +24,7 @@ class ReverseDict(dict): def pop(self, key): value = dict.pop(self, key) - self.d1.pop(value) + self._reverse.pop(value) return value def get(self, key, default=None):