Show More
@@ -217,8 +217,6 b' class TaskScheduler(SessionFactory):' | |||
|
217 | 217 | all_done = Set() # set of all finished tasks=union(completed,failed) |
|
218 | 218 | all_ids = Set() # set of all submitted task IDs |
|
219 | 219 | |
|
220 | auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback') | |
|
221 | ||
|
222 | 220 | ident = CBytes() # ZMQ identity. This should just be self.session.session |
|
223 | 221 | # but ensure Bytes |
|
224 | 222 | def _ident_default(self): |
@@ -236,9 +234,7 b' class TaskScheduler(SessionFactory):' | |||
|
236 | 234 | unregistration_notification = self._unregister_engine |
|
237 | 235 | ) |
|
238 | 236 | self.notifier_stream.on_recv(self.dispatch_notification) |
|
239 | self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz | |
|
240 | self.auditor.start() | |
|
241 | self.log.info("Scheduler started [%s]"%self.scheme_name) | |
|
237 | self.log.info("Scheduler started [%s]" % self.scheme_name) | |
|
242 | 238 | |
|
243 | 239 | def resume_receiving(self): |
|
244 | 240 | """Resume accepting jobs.""" |
@@ -438,15 +434,16 b' class TaskScheduler(SessionFactory):' | |||
|
438 | 434 | # turn timeouts into datetime objects: |
|
439 | 435 | timeout = md.get('timeout', None) |
|
440 | 436 | if timeout: |
|
441 | # cast to float, because jsonlib returns floats as decimal.Decimal, | |
|
442 | # which timedelta does not accept | |
|
443 | timeout = datetime.now() + timedelta(0,float(timeout),0) | |
|
437 | timeout = time.time() + float(timeout) | |
|
444 | 438 | |
|
445 | 439 | job = Job(msg_id=msg_id, raw_msg=raw_msg, idents=idents, msg=msg, |
|
446 | 440 | header=header, targets=targets, after=after, follow=follow, |
|
447 | 441 | timeout=timeout, metadata=md, |
|
448 | 442 | ) |
|
449 | ||
|
443 | if timeout: | |
|
444 | # schedule timeout callback | |
|
445 | self.loop.add_timeout(timeout, lambda : self.job_timeout(job)) | |
|
446 | ||
|
450 | 447 | # validate and reduce dependencies: |
|
451 | 448 | for dep in after,follow: |
|
452 | 449 | if not dep: # empty dependency |
@@ -470,21 +467,25 b' class TaskScheduler(SessionFactory):' | |||
|
470 | 467 | else: |
|
471 | 468 | self.save_unmet(job) |
|
472 | 469 | |
|
473 |
def |
|
|
474 |
""" |
|
|
475 | now = datetime.now() | |
|
476 | for msg_id in self.queue_map.keys(): | |
|
477 | # must recheck, in case one failure cascaded to another: | |
|
478 | if msg_id in self.queue_map: | |
|
479 | job = self.queue_map[msg_id] | |
|
480 |
|
|
|
481 | self.fail_unreachable(msg_id, error.TaskTimeout) | |
|
470 | def job_timeout(self, job): | |
|
471 | """callback for a job's timeout. | |
|
472 | ||
|
473 | The job may or may not have been run at this point. | |
|
474 | """ | |
|
475 | if job.timeout >= (time.time() + 1): | |
|
476 | self.log.warn("task %s timeout fired prematurely: %s > %s", | |
|
477 | job.msg_id, job.timeout, now | |
|
478 | ) | |
|
479 | if job.msg_id in self.queue_map: | |
|
480 | # still waiting, but ran out of time | |
|
481 | self.log.info("task %r timed out", job.msg_id) | |
|
482 | self.fail_unreachable(job.msg_id, error.TaskTimeout) | |
|
482 | 483 | |
|
483 | 484 | def fail_unreachable(self, msg_id, why=error.ImpossibleDependency): |
|
484 | 485 | """a task has become unreachable, send a reply with an ImpossibleDependency |
|
485 | 486 | error.""" |
|
486 | 487 | if msg_id not in self.queue_map: |
|
487 |
self.log.error(" |
|
|
488 | self.log.error("task %r already failed!", msg_id) | |
|
488 | 489 | return |
|
489 | 490 | job = self.queue_map.pop(msg_id) |
|
490 | 491 | # lazy-delete from the queue |
@@ -497,6 +498,7 b' class TaskScheduler(SessionFactory):' | |||
|
497 | 498 | raise why() |
|
498 | 499 | except: |
|
499 | 500 | content = error.wrap_exception() |
|
501 | self.log.debug("task %r failing as unreachable with: %s", msg_id, content['ename']) | |
|
500 | 502 | |
|
501 | 503 | self.all_done.add(msg_id) |
|
502 | 504 | self.all_failed.add(msg_id) |
@@ -791,7 +793,7 b' def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, reg_addr, config=Non' | |||
|
791 | 793 | identity=b'task', in_thread=False): |
|
792 | 794 | |
|
793 | 795 | ZMQStream = zmqstream.ZMQStream |
|
794 | ||
|
796 | loglevel = logging.DEBUG | |
|
795 | 797 | if config: |
|
796 | 798 | # unwrap dict back into Config |
|
797 | 799 | config = Config(config) |
General Comments 0
You need to be logged in to leave comments.
Login now