##// END OF EJS Templates
tasks on engines when they die fail instead of hang...
MinRK -
Show More
@@ -150,7 +150,7 b' class TaskScheduler(SessionFactory):'
150 unregistration_notification = self._unregister_engine
150 unregistration_notification = self._unregister_engine
151 )
151 )
152 self.notifier_stream.on_recv(self.dispatch_notification)
152 self.notifier_stream.on_recv(self.dispatch_notification)
153 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 1e3, self.loop) # 1 Hz
153 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
154 self.auditor.start()
154 self.auditor.start()
155 self.log.info("Scheduler started...%r"%self)
155 self.log.info("Scheduler started...%r"%self)
156
156
@@ -209,19 +209,37 b' class TaskScheduler(SessionFactory):'
209 # map(self.destinations.pop, self.completed.pop(uid))
209 # map(self.destinations.pop, self.completed.pop(uid))
210 # map(self.destinations.pop, self.failed.pop(uid))
210 # map(self.destinations.pop, self.failed.pop(uid))
211
211
212 lost = self.pending.pop(uid)
213
214 idx = self.targets.index(uid)
212 idx = self.targets.index(uid)
215 self.targets.pop(idx)
213 self.targets.pop(idx)
216 self.loads.pop(idx)
214 self.loads.pop(idx)
217
215
218 self.handle_stranded_tasks(lost)
216 # wait 5 seconds before cleaning up pending jobs, since the results might
217 # still be incoming
218 if self.pending[uid]:
219 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
220 dc.start()
219
221
220 def handle_stranded_tasks(self, lost):
222 @logged
223 def handle_stranded_tasks(self, engine):
221 """Deal with jobs resident in an engine that died."""
224 """Deal with jobs resident in an engine that died."""
222 # TODO: resubmit the tasks?
225 lost = self.pending.pop(engine)
223 for msg_id in lost:
226
224 pass
227 for msg_id, (raw_msg,follow) in lost.iteritems():
228 self.all_failed.add(msg_id)
229 self.all_done.add(msg_id)
230 idents,msg = self.session.feed_identities(raw_msg, copy=False)
231 msg = self.session.unpack_message(msg, copy=False, content=False)
232 parent = msg['header']
233 idents = [idents[0],engine]+idents[1:]
234 print (idents)
235 try:
236 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
237 except:
238 content = ss.wrap_exception()
239 msg = self.session.send(self.client_stream, 'apply_reply', content,
240 parent=parent, ident=idents)
241 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
242 self.update_dependencies(msg_id)
225
243
226
244
227 #-----------------------------------------------------------------------
245 #-----------------------------------------------------------------------
@@ -359,7 +377,7 b' class TaskScheduler(SessionFactory):'
359 self.dependencies[dep_id].add(msg_id)
377 self.dependencies[dep_id].add(msg_id)
360
378
361 @logged
379 @logged
362 def submit_task(self, msg_id, msg, follow=None, indices=None):
380 def submit_task(self, msg_id, raw_msg, follow=None, indices=None):
363 """Submit a task to any of a subset of our targets."""
381 """Submit a task to any of a subset of our targets."""
364 if indices:
382 if indices:
365 loads = [self.loads[i] for i in indices]
383 loads = [self.loads[i] for i in indices]
@@ -371,9 +389,9 b' class TaskScheduler(SessionFactory):'
371 target = self.targets[idx]
389 target = self.targets[idx]
372 # print (target, map(str, msg[:3]))
390 # print (target, map(str, msg[:3]))
373 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
391 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
374 self.engine_stream.send_multipart(msg, copy=False)
392 self.engine_stream.send_multipart(raw_msg, copy=False)
375 self.add_job(idx)
393 self.add_job(idx)
376 self.pending[target][msg_id] = (msg, follow)
394 self.pending[target][msg_id] = (raw_msg, follow)
377 content = dict(msg_id=msg_id, engine_id=target)
395 content = dict(msg_id=msg_id, engine_id=target)
378 self.session.send(self.mon_stream, 'task_destination', content=content,
396 self.session.send(self.mon_stream, 'task_destination', content=content,
379 ident=['tracktask',self.session.session])
397 ident=['tracktask',self.session.session])
@@ -409,6 +427,7 b' class TaskScheduler(SessionFactory):'
409 self.client_stream.send_multipart(raw_msg, copy=False)
427 self.client_stream.send_multipart(raw_msg, copy=False)
410 # now, update our data structures
428 # now, update our data structures
411 msg_id = parent['msg_id']
429 msg_id = parent['msg_id']
430 self.blacklist.pop(msg_id, None)
412 self.pending[engine].pop(msg_id)
431 self.pending[engine].pop(msg_id)
413 if success:
432 if success:
414 self.completed[engine].add(msg_id)
433 self.completed[engine].add(msg_id)
@@ -24,7 +24,7 b' class ReverseDict(dict):'
24
24
25 def pop(self, key):
25 def pop(self, key):
26 value = dict.pop(self, key)
26 value = dict.pop(self, key)
27 self.d1.pop(value)
27 self._reverse.pop(value)
28 return value
28 return value
29
29
30 def get(self, key, default=None):
30 def get(self, key, default=None):
General Comments 0
You need to be logged in to leave comments. Login now