Show More
@@ -150,7 +150,7 b' class TaskScheduler(SessionFactory):' | |||
|
150 | 150 | unregistration_notification = self._unregister_engine |
|
151 | 151 | ) |
|
152 | 152 | self.notifier_stream.on_recv(self.dispatch_notification) |
|
153 |
self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, |
|
|
153 | self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz | |
|
154 | 154 | self.auditor.start() |
|
155 | 155 | self.log.info("Scheduler started...%r"%self) |
|
156 | 156 | |
@@ -209,19 +209,37 b' class TaskScheduler(SessionFactory):' | |||
|
209 | 209 | # map(self.destinations.pop, self.completed.pop(uid)) |
|
210 | 210 | # map(self.destinations.pop, self.failed.pop(uid)) |
|
211 | 211 | |
|
212 | lost = self.pending.pop(uid) | |
|
213 | ||
|
214 | 212 | idx = self.targets.index(uid) |
|
215 | 213 | self.targets.pop(idx) |
|
216 | 214 | self.loads.pop(idx) |
|
217 | 215 | |
|
218 | self.handle_stranded_tasks(lost) | |
|
216 | # wait 5 seconds before cleaning up pending jobs, since the results might | |
|
217 | # still be incoming | |
|
218 | if self.pending[uid]: | |
|
219 | dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop) | |
|
220 | dc.start() | |
|
219 | 221 | |
|
220 | def handle_stranded_tasks(self, lost): | |
|
222 | @logged | |
|
223 | def handle_stranded_tasks(self, engine): | |
|
221 | 224 | """Deal with jobs resident in an engine that died.""" |
|
222 | # TODO: resubmit the tasks? | |
|
223 | for msg_id in lost: | |
|
224 | pass | |
|
225 | lost = self.pending.pop(engine) | |
|
226 | ||
|
227 | for msg_id, (raw_msg,follow) in lost.iteritems(): | |
|
228 | self.all_failed.add(msg_id) | |
|
229 | self.all_done.add(msg_id) | |
|
230 | idents,msg = self.session.feed_identities(raw_msg, copy=False) | |
|
231 | msg = self.session.unpack_message(msg, copy=False, content=False) | |
|
232 | parent = msg['header'] | |
|
233 | idents = [idents[0],engine]+idents[1:] | |
|
234 | print (idents) | |
|
235 | try: | |
|
236 | raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id)) | |
|
237 | except: | |
|
238 | content = ss.wrap_exception() | |
|
239 | msg = self.session.send(self.client_stream, 'apply_reply', content, | |
|
240 | parent=parent, ident=idents) | |
|
241 | self.session.send(self.mon_stream, msg, ident=['outtask']+idents) | |
|
242 | self.update_dependencies(msg_id) | |
|
225 | 243 | |
|
226 | 244 | |
|
227 | 245 | #----------------------------------------------------------------------- |
@@ -359,7 +377,7 b' class TaskScheduler(SessionFactory):' | |||
|
359 | 377 | self.dependencies[dep_id].add(msg_id) |
|
360 | 378 | |
|
361 | 379 | @logged |
|
362 | def submit_task(self, msg_id, msg, follow=None, indices=None): | |
|
380 | def submit_task(self, msg_id, raw_msg, follow=None, indices=None): | |
|
363 | 381 | """Submit a task to any of a subset of our targets.""" |
|
364 | 382 | if indices: |
|
365 | 383 | loads = [self.loads[i] for i in indices] |
@@ -371,9 +389,9 b' class TaskScheduler(SessionFactory):' | |||
|
371 | 389 | target = self.targets[idx] |
|
372 | 390 | # print (target, map(str, msg[:3])) |
|
373 | 391 | self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False) |
|
374 | self.engine_stream.send_multipart(msg, copy=False) | |
|
392 | self.engine_stream.send_multipart(raw_msg, copy=False) | |
|
375 | 393 | self.add_job(idx) |
|
376 | self.pending[target][msg_id] = (msg, follow) | |
|
394 | self.pending[target][msg_id] = (raw_msg, follow) | |
|
377 | 395 | content = dict(msg_id=msg_id, engine_id=target) |
|
378 | 396 | self.session.send(self.mon_stream, 'task_destination', content=content, |
|
379 | 397 | ident=['tracktask',self.session.session]) |
@@ -409,6 +427,7 b' class TaskScheduler(SessionFactory):' | |||
|
409 | 427 | self.client_stream.send_multipart(raw_msg, copy=False) |
|
410 | 428 | # now, update our data structures |
|
411 | 429 | msg_id = parent['msg_id'] |
|
430 | self.blacklist.pop(msg_id, None) | |
|
412 | 431 | self.pending[engine].pop(msg_id) |
|
413 | 432 | if success: |
|
414 | 433 | self.completed[engine].add(msg_id) |
General Comments 0
You need to be logged in to leave comments.
Login now