##// END OF EJS Templates
adjust Scheduler timeout logic...
MinRK -
Show More
@@ -140,9 +140,10 b' class Job(object):'
140 140 self.after = after
141 141 self.follow = follow
142 142 self.timeout = timeout
143 self.removed = False # used for lazy-delete from sorted queue
144 143
144 self.removed = False # used for lazy-delete from sorted queue
145 145 self.timestamp = time.time()
146 self.timeout_id = 0
146 147 self.blacklist = set()
147 148
148 149 def __lt__(self, other):
@@ -155,6 +156,7 b' class Job(object):'
155 156 def dependents(self):
156 157 return self.follow.union(self.after)
157 158
159
158 160 class TaskScheduler(SessionFactory):
159 161 """Python TaskScheduler object.
160 162
@@ -433,19 +435,14 b' class TaskScheduler(SessionFactory):'
433 435 # location dependencies
434 436 follow = Dependency(md.get('follow', []))
435 437
436 # turn timeouts into datetime objects:
437 438 timeout = md.get('timeout', None)
438 439 if timeout:
439 timeout = time.time() + float(timeout)
440 timeout = float(timeout)
440 441
441 442 job = Job(msg_id=msg_id, raw_msg=raw_msg, idents=idents, msg=msg,
442 443 header=header, targets=targets, after=after, follow=follow,
443 444 timeout=timeout, metadata=md,
444 445 )
445 if timeout:
446 # schedule timeout callback
447 self.loop.add_timeout(timeout, lambda : self.job_timeout(job))
448
449 446 # validate and reduce dependencies:
450 447 for dep in after,follow:
451 448 if not dep: # empty dependency
@@ -469,11 +466,14 b' class TaskScheduler(SessionFactory):'
469 466 else:
470 467 self.save_unmet(job)
471 468
472 def job_timeout(self, job):
469 def job_timeout(self, job, timeout_id):
473 470 """callback for a job's timeout.
474 471
475 472 The job may or may not have been run at this point.
476 473 """
474 if job.timeout_id != timeout_id:
475 # not the most recent call
476 return
477 477 now = time.time()
478 478 if job.timeout >= (now + 1):
479 479 self.log.warn("task %s timeout fired prematurely: %s > %s",
@@ -590,6 +590,14 b' class TaskScheduler(SessionFactory):'
590 590 if dep_id not in self.graph:
591 591 self.graph[dep_id] = set()
592 592 self.graph[dep_id].add(msg_id)
593
594 # schedule timeout callback
595 if job.timeout:
596 timeout_id = job.timeout_id = job.timeout_id + 1
597 self.loop.add_timeout(time.time() + job.timeout,
598 lambda : self.job_timeout(job, timeout_id)
599 )
600
593 601
594 602 def submit_task(self, job, indices=None):
595 603 """Submit a task to any of a subset of our targets."""
@@ -633,7 +641,7 b' class TaskScheduler(SessionFactory):'
633 641 else:
634 642 self.finish_job(idx)
635 643 except Exception:
636 self.log.error("task::Invaid result: %r", raw_msg, exc_info=True)
644 self.log.error("task::Invalid result: %r", raw_msg, exc_info=True)
637 645 return
638 646
639 647 md = msg['metadata']
General Comments 0
You need to be logged in to leave comments. Login now