Show More
@@ -140,9 +140,10 b' class Job(object):' | |||
|
140 | 140 | self.after = after |
|
141 | 141 | self.follow = follow |
|
142 | 142 | self.timeout = timeout |
|
143 | self.removed = False # used for lazy-delete from sorted queue | |
|
144 | 143 | |
|
144 | self.removed = False # used for lazy-delete from sorted queue | |
|
145 | 145 | self.timestamp = time.time() |
|
146 | self.timeout_id = 0 | |
|
146 | 147 | self.blacklist = set() |
|
147 | 148 | |
|
148 | 149 | def __lt__(self, other): |
@@ -155,6 +156,7 b' class Job(object):' | |||
|
155 | 156 | def dependents(self): |
|
156 | 157 | return self.follow.union(self.after) |
|
157 | 158 | |
|
159 | ||
|
158 | 160 | class TaskScheduler(SessionFactory): |
|
159 | 161 | """Python TaskScheduler object. |
|
160 | 162 | |
@@ -433,19 +435,14 b' class TaskScheduler(SessionFactory):' | |||
|
433 | 435 | # location dependencies |
|
434 | 436 | follow = Dependency(md.get('follow', [])) |
|
435 | 437 | |
|
436 | # turn timeouts into datetime objects: | |
|
437 | 438 | timeout = md.get('timeout', None) |
|
438 | 439 | if timeout: |
|
439 |
timeout = |
|
|
440 | timeout = float(timeout) | |
|
440 | 441 | |
|
441 | 442 | job = Job(msg_id=msg_id, raw_msg=raw_msg, idents=idents, msg=msg, |
|
442 | 443 | header=header, targets=targets, after=after, follow=follow, |
|
443 | 444 | timeout=timeout, metadata=md, |
|
444 | 445 | ) |
|
445 | if timeout: | |
|
446 | # schedule timeout callback | |
|
447 | self.loop.add_timeout(timeout, lambda : self.job_timeout(job)) | |
|
448 | ||
|
449 | 446 | # validate and reduce dependencies: |
|
450 | 447 | for dep in after,follow: |
|
451 | 448 | if not dep: # empty dependency |
@@ -469,11 +466,14 b' class TaskScheduler(SessionFactory):' | |||
|
469 | 466 | else: |
|
470 | 467 | self.save_unmet(job) |
|
471 | 468 | |
|
472 | def job_timeout(self, job): | |
|
469 | def job_timeout(self, job, timeout_id): | |
|
473 | 470 | """callback for a job's timeout. |
|
474 | 471 | |
|
475 | 472 | The job may or may not have been run at this point. |
|
476 | 473 | """ |
|
474 | if job.timeout_id != timeout_id: | |
|
475 | # not the most recent call | |
|
476 | return | |
|
477 | 477 | now = time.time() |
|
478 | 478 | if job.timeout >= (now + 1): |
|
479 | 479 | self.log.warn("task %s timeout fired prematurely: %s > %s", |
@@ -590,6 +590,14 b' class TaskScheduler(SessionFactory):' | |||
|
590 | 590 | if dep_id not in self.graph: |
|
591 | 591 | self.graph[dep_id] = set() |
|
592 | 592 | self.graph[dep_id].add(msg_id) |
|
593 | ||
|
594 | # schedule timeout callback | |
|
595 | if job.timeout: | |
|
596 | timeout_id = job.timeout_id = job.timeout_id + 1 | |
|
597 | self.loop.add_timeout(time.time() + job.timeout, | |
|
598 | lambda : self.job_timeout(job, timeout_id) | |
|
599 | ) | |
|
600 | ||
|
593 | 601 | |
|
594 | 602 | def submit_task(self, job, indices=None): |
|
595 | 603 | """Submit a task to any of a subset of our targets.""" |
@@ -633,7 +641,7 b' class TaskScheduler(SessionFactory):' | |||
|
633 | 641 | else: |
|
634 | 642 | self.finish_job(idx) |
|
635 | 643 | except Exception: |
|
636 | self.log.error("task::Invaid result: %r", raw_msg, exc_info=True) | |
|
644 | self.log.error("task::Invalid result: %r", raw_msg, exc_info=True) | |
|
637 | 645 | return |
|
638 | 646 | |
|
639 | 647 | md = msg['metadata'] |
General Comments 0
You need to be logged in to leave comments.
Login now