##// END OF EJS Templates
tasks on engines when they die fail instead of hang...
MinRK -
Show More
@@ -150,7 +150,7 b' class TaskScheduler(SessionFactory):'
150 150 unregistration_notification = self._unregister_engine
151 151 )
152 152 self.notifier_stream.on_recv(self.dispatch_notification)
153 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 1e3, self.loop) # 1 Hz
153 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
154 154 self.auditor.start()
155 155 self.log.info("Scheduler started...%r"%self)
156 156
@@ -209,19 +209,37 b' class TaskScheduler(SessionFactory):'
209 209 # map(self.destinations.pop, self.completed.pop(uid))
210 210 # map(self.destinations.pop, self.failed.pop(uid))
211 211
212 lost = self.pending.pop(uid)
213
214 212 idx = self.targets.index(uid)
215 213 self.targets.pop(idx)
216 214 self.loads.pop(idx)
217 215
218 self.handle_stranded_tasks(lost)
216 # wait 5 seconds before cleaning up pending jobs, since the results might
217 # still be incoming
218 if self.pending[uid]:
219 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
220 dc.start()
219 221
220 def handle_stranded_tasks(self, lost):
222 @logged
223 def handle_stranded_tasks(self, engine):
221 224 """Deal with jobs resident in an engine that died."""
222 # TODO: resubmit the tasks?
223 for msg_id in lost:
224 pass
225 lost = self.pending.pop(engine)
226
227 for msg_id, (raw_msg,follow) in lost.iteritems():
228 self.all_failed.add(msg_id)
229 self.all_done.add(msg_id)
230 idents,msg = self.session.feed_identities(raw_msg, copy=False)
231 msg = self.session.unpack_message(msg, copy=False, content=False)
232 parent = msg['header']
233 idents = [idents[0],engine]+idents[1:]
234 print (idents)
235 try:
236 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
237 except:
238 content = ss.wrap_exception()
239 msg = self.session.send(self.client_stream, 'apply_reply', content,
240 parent=parent, ident=idents)
241 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
242 self.update_dependencies(msg_id)
225 243
226 244
227 245 #-----------------------------------------------------------------------
@@ -359,7 +377,7 b' class TaskScheduler(SessionFactory):'
359 377 self.dependencies[dep_id].add(msg_id)
360 378
361 379 @logged
362 def submit_task(self, msg_id, msg, follow=None, indices=None):
380 def submit_task(self, msg_id, raw_msg, follow=None, indices=None):
363 381 """Submit a task to any of a subset of our targets."""
364 382 if indices:
365 383 loads = [self.loads[i] for i in indices]
@@ -371,9 +389,9 b' class TaskScheduler(SessionFactory):'
371 389 target = self.targets[idx]
372 390 # print (target, map(str, msg[:3]))
373 391 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
374 self.engine_stream.send_multipart(msg, copy=False)
392 self.engine_stream.send_multipart(raw_msg, copy=False)
375 393 self.add_job(idx)
376 self.pending[target][msg_id] = (msg, follow)
394 self.pending[target][msg_id] = (raw_msg, follow)
377 395 content = dict(msg_id=msg_id, engine_id=target)
378 396 self.session.send(self.mon_stream, 'task_destination', content=content,
379 397 ident=['tracktask',self.session.session])
@@ -409,6 +427,7 b' class TaskScheduler(SessionFactory):'
409 427 self.client_stream.send_multipart(raw_msg, copy=False)
410 428 # now, update our data structures
411 429 msg_id = parent['msg_id']
430 self.blacklist.pop(msg_id, None)
412 431 self.pending[engine].pop(msg_id)
413 432 if success:
414 433 self.completed[engine].add(msg_id)
@@ -24,7 +24,7 b' class ReverseDict(dict):'
24 24
25 25 def pop(self, key):
26 26 value = dict.pop(self, key)
27 self.d1.pop(value)
27 self._reverse.pop(value)
28 28 return value
29 29
30 30 def get(self, key, default=None):
General Comments 0
You need to be logged in to leave comments. Login now