Show More
@@ -140,9 +140,10 b' class Job(object):' | |||||
140 | self.after = after |
|
140 | self.after = after | |
141 | self.follow = follow |
|
141 | self.follow = follow | |
142 | self.timeout = timeout |
|
142 | self.timeout = timeout | |
143 | self.removed = False # used for lazy-delete from sorted queue |
|
|||
144 |
|
143 | |||
|
144 | self.removed = False # used for lazy-delete from sorted queue | |||
145 | self.timestamp = time.time() |
|
145 | self.timestamp = time.time() | |
|
146 | self.timeout_id = 0 | |||
146 | self.blacklist = set() |
|
147 | self.blacklist = set() | |
147 |
|
148 | |||
148 | def __lt__(self, other): |
|
149 | def __lt__(self, other): | |
@@ -155,6 +156,7 b' class Job(object):' | |||||
155 | def dependents(self): |
|
156 | def dependents(self): | |
156 | return self.follow.union(self.after) |
|
157 | return self.follow.union(self.after) | |
157 |
|
158 | |||
|
159 | ||||
158 | class TaskScheduler(SessionFactory): |
|
160 | class TaskScheduler(SessionFactory): | |
159 | """Python TaskScheduler object. |
|
161 | """Python TaskScheduler object. | |
160 |
|
162 | |||
@@ -433,19 +435,14 b' class TaskScheduler(SessionFactory):' | |||||
433 | # location dependencies |
|
435 | # location dependencies | |
434 | follow = Dependency(md.get('follow', [])) |
|
436 | follow = Dependency(md.get('follow', [])) | |
435 |
|
437 | |||
436 | # turn timeouts into datetime objects: |
|
|||
437 | timeout = md.get('timeout', None) |
|
438 | timeout = md.get('timeout', None) | |
438 | if timeout: |
|
439 | if timeout: | |
439 |
timeout = |
|
440 | timeout = float(timeout) | |
440 |
|
441 | |||
441 | job = Job(msg_id=msg_id, raw_msg=raw_msg, idents=idents, msg=msg, |
|
442 | job = Job(msg_id=msg_id, raw_msg=raw_msg, idents=idents, msg=msg, | |
442 | header=header, targets=targets, after=after, follow=follow, |
|
443 | header=header, targets=targets, after=after, follow=follow, | |
443 | timeout=timeout, metadata=md, |
|
444 | timeout=timeout, metadata=md, | |
444 | ) |
|
445 | ) | |
445 | if timeout: |
|
|||
446 | # schedule timeout callback |
|
|||
447 | self.loop.add_timeout(timeout, lambda : self.job_timeout(job)) |
|
|||
448 |
|
||||
449 | # validate and reduce dependencies: |
|
446 | # validate and reduce dependencies: | |
450 | for dep in after,follow: |
|
447 | for dep in after,follow: | |
451 | if not dep: # empty dependency |
|
448 | if not dep: # empty dependency | |
@@ -469,11 +466,14 b' class TaskScheduler(SessionFactory):' | |||||
469 | else: |
|
466 | else: | |
470 | self.save_unmet(job) |
|
467 | self.save_unmet(job) | |
471 |
|
468 | |||
472 | def job_timeout(self, job): |
|
469 | def job_timeout(self, job, timeout_id): | |
473 | """callback for a job's timeout. |
|
470 | """callback for a job's timeout. | |
474 |
|
471 | |||
475 | The job may or may not have been run at this point. |
|
472 | The job may or may not have been run at this point. | |
476 | """ |
|
473 | """ | |
|
474 | if job.timeout_id != timeout_id: | |||
|
475 | # not the most recent call | |||
|
476 | return | |||
477 | now = time.time() |
|
477 | now = time.time() | |
478 | if job.timeout >= (now + 1): |
|
478 | if job.timeout >= (now + 1): | |
479 | self.log.warn("task %s timeout fired prematurely: %s > %s", |
|
479 | self.log.warn("task %s timeout fired prematurely: %s > %s", | |
@@ -590,6 +590,14 b' class TaskScheduler(SessionFactory):' | |||||
590 | if dep_id not in self.graph: |
|
590 | if dep_id not in self.graph: | |
591 | self.graph[dep_id] = set() |
|
591 | self.graph[dep_id] = set() | |
592 | self.graph[dep_id].add(msg_id) |
|
592 | self.graph[dep_id].add(msg_id) | |
|
593 | ||||
|
594 | # schedule timeout callback | |||
|
595 | if job.timeout: | |||
|
596 | timeout_id = job.timeout_id = job.timeout_id + 1 | |||
|
597 | self.loop.add_timeout(time.time() + job.timeout, | |||
|
598 | lambda : self.job_timeout(job, timeout_id) | |||
|
599 | ) | |||
|
600 | ||||
593 |
|
601 | |||
594 | def submit_task(self, job, indices=None): |
|
602 | def submit_task(self, job, indices=None): | |
595 | """Submit a task to any of a subset of our targets.""" |
|
603 | """Submit a task to any of a subset of our targets.""" | |
@@ -633,7 +641,7 b' class TaskScheduler(SessionFactory):' | |||||
633 | else: |
|
641 | else: | |
634 | self.finish_job(idx) |
|
642 | self.finish_job(idx) | |
635 | except Exception: |
|
643 | except Exception: | |
636 | self.log.error("task::Invaid result: %r", raw_msg, exc_info=True) |
|
644 | self.log.error("task::Invalid result: %r", raw_msg, exc_info=True) | |
637 | return |
|
645 | return | |
638 |
|
646 | |||
639 | md = msg['metadata'] |
|
647 | md = msg['metadata'] |
General Comments 0
You need to be logged in to leave comments.
Login now