Show More
@@ -150,7 +150,7 b' class TaskScheduler(SessionFactory):' | |||||
150 | unregistration_notification = self._unregister_engine |
|
150 | unregistration_notification = self._unregister_engine | |
151 | ) |
|
151 | ) | |
152 | self.notifier_stream.on_recv(self.dispatch_notification) |
|
152 | self.notifier_stream.on_recv(self.dispatch_notification) | |
153 |
self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, |
|
153 | self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz | |
154 | self.auditor.start() |
|
154 | self.auditor.start() | |
155 | self.log.info("Scheduler started...%r"%self) |
|
155 | self.log.info("Scheduler started...%r"%self) | |
156 |
|
156 | |||
@@ -209,19 +209,37 b' class TaskScheduler(SessionFactory):' | |||||
209 | # map(self.destinations.pop, self.completed.pop(uid)) |
|
209 | # map(self.destinations.pop, self.completed.pop(uid)) | |
210 | # map(self.destinations.pop, self.failed.pop(uid)) |
|
210 | # map(self.destinations.pop, self.failed.pop(uid)) | |
211 |
|
211 | |||
212 | lost = self.pending.pop(uid) |
|
|||
213 |
|
||||
214 | idx = self.targets.index(uid) |
|
212 | idx = self.targets.index(uid) | |
215 | self.targets.pop(idx) |
|
213 | self.targets.pop(idx) | |
216 | self.loads.pop(idx) |
|
214 | self.loads.pop(idx) | |
217 |
|
215 | |||
218 | self.handle_stranded_tasks(lost) |
|
216 | # wait 5 seconds before cleaning up pending jobs, since the results might | |
|
217 | # still be incoming | |||
|
218 | if self.pending[uid]: | |||
|
219 | dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop) | |||
|
220 | dc.start() | |||
219 |
|
221 | |||
220 | def handle_stranded_tasks(self, lost): |
|
222 | @logged | |
|
223 | def handle_stranded_tasks(self, engine): | |||
221 | """Deal with jobs resident in an engine that died.""" |
|
224 | """Deal with jobs resident in an engine that died.""" | |
222 | # TODO: resubmit the tasks? |
|
225 | lost = self.pending.pop(engine) | |
223 | for msg_id in lost: |
|
226 | ||
224 | pass |
|
227 | for msg_id, (raw_msg,follow) in lost.iteritems(): | |
|
228 | self.all_failed.add(msg_id) | |||
|
229 | self.all_done.add(msg_id) | |||
|
230 | idents,msg = self.session.feed_identities(raw_msg, copy=False) | |||
|
231 | msg = self.session.unpack_message(msg, copy=False, content=False) | |||
|
232 | parent = msg['header'] | |||
|
233 | idents = [idents[0],engine]+idents[1:] | |||
|
234 | print (idents) | |||
|
235 | try: | |||
|
236 | raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id)) | |||
|
237 | except: | |||
|
238 | content = ss.wrap_exception() | |||
|
239 | msg = self.session.send(self.client_stream, 'apply_reply', content, | |||
|
240 | parent=parent, ident=idents) | |||
|
241 | self.session.send(self.mon_stream, msg, ident=['outtask']+idents) | |||
|
242 | self.update_dependencies(msg_id) | |||
225 |
|
243 | |||
226 |
|
244 | |||
227 | #----------------------------------------------------------------------- |
|
245 | #----------------------------------------------------------------------- | |
@@ -359,7 +377,7 b' class TaskScheduler(SessionFactory):' | |||||
359 | self.dependencies[dep_id].add(msg_id) |
|
377 | self.dependencies[dep_id].add(msg_id) | |
360 |
|
378 | |||
361 | @logged |
|
379 | @logged | |
362 | def submit_task(self, msg_id, msg, follow=None, indices=None): |
|
380 | def submit_task(self, msg_id, raw_msg, follow=None, indices=None): | |
363 | """Submit a task to any of a subset of our targets.""" |
|
381 | """Submit a task to any of a subset of our targets.""" | |
364 | if indices: |
|
382 | if indices: | |
365 | loads = [self.loads[i] for i in indices] |
|
383 | loads = [self.loads[i] for i in indices] | |
@@ -371,9 +389,9 b' class TaskScheduler(SessionFactory):' | |||||
371 | target = self.targets[idx] |
|
389 | target = self.targets[idx] | |
372 | # print (target, map(str, msg[:3])) |
|
390 | # print (target, map(str, msg[:3])) | |
373 | self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False) |
|
391 | self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False) | |
374 | self.engine_stream.send_multipart(msg, copy=False) |
|
392 | self.engine_stream.send_multipart(raw_msg, copy=False) | |
375 | self.add_job(idx) |
|
393 | self.add_job(idx) | |
376 | self.pending[target][msg_id] = (msg, follow) |
|
394 | self.pending[target][msg_id] = (raw_msg, follow) | |
377 | content = dict(msg_id=msg_id, engine_id=target) |
|
395 | content = dict(msg_id=msg_id, engine_id=target) | |
378 | self.session.send(self.mon_stream, 'task_destination', content=content, |
|
396 | self.session.send(self.mon_stream, 'task_destination', content=content, | |
379 | ident=['tracktask',self.session.session]) |
|
397 | ident=['tracktask',self.session.session]) | |
@@ -409,6 +427,7 b' class TaskScheduler(SessionFactory):' | |||||
409 | self.client_stream.send_multipart(raw_msg, copy=False) |
|
427 | self.client_stream.send_multipart(raw_msg, copy=False) | |
410 | # now, update our data structures |
|
428 | # now, update our data structures | |
411 | msg_id = parent['msg_id'] |
|
429 | msg_id = parent['msg_id'] | |
|
430 | self.blacklist.pop(msg_id, None) | |||
412 | self.pending[engine].pop(msg_id) |
|
431 | self.pending[engine].pop(msg_id) | |
413 | if success: |
|
432 | if success: | |
414 | self.completed[engine].add(msg_id) |
|
433 | self.completed[engine].add(msg_id) |
@@ -24,7 +24,7 b' class ReverseDict(dict):' | |||||
24 |
|
24 | |||
25 | def pop(self, key): |
|
25 | def pop(self, key): | |
26 | value = dict.pop(self, key) |
|
26 | value = dict.pop(self, key) | |
27 |
self. |
|
27 | self._reverse.pop(value) | |
28 | return value |
|
28 | return value | |
29 |
|
29 | |||
30 | def get(self, key, default=None): |
|
30 | def get(self, key, default=None): |
General Comments 0
You need to be logged in to leave comments.
Login now