##// END OF EJS Templates
adjust Scheduler timeout logic...
MinRK -
Show More
@@ -140,9 +140,10 b' class Job(object):'
140 self.after = after
140 self.after = after
141 self.follow = follow
141 self.follow = follow
142 self.timeout = timeout
142 self.timeout = timeout
143 self.removed = False # used for lazy-delete from sorted queue
144
143
144 self.removed = False # used for lazy-delete from sorted queue
145 self.timestamp = time.time()
145 self.timestamp = time.time()
146 self.timeout_id = 0
146 self.blacklist = set()
147 self.blacklist = set()
147
148
148 def __lt__(self, other):
149 def __lt__(self, other):
@@ -155,6 +156,7 b' class Job(object):'
155 def dependents(self):
156 def dependents(self):
156 return self.follow.union(self.after)
157 return self.follow.union(self.after)
157
158
159
158 class TaskScheduler(SessionFactory):
160 class TaskScheduler(SessionFactory):
159 """Python TaskScheduler object.
161 """Python TaskScheduler object.
160
162
@@ -433,19 +435,14 b' class TaskScheduler(SessionFactory):'
433 # location dependencies
435 # location dependencies
434 follow = Dependency(md.get('follow', []))
436 follow = Dependency(md.get('follow', []))
435
437
436 # turn timeouts into datetime objects:
437 timeout = md.get('timeout', None)
438 timeout = md.get('timeout', None)
438 if timeout:
439 if timeout:
439 timeout = time.time() + float(timeout)
440 timeout = float(timeout)
440
441
441 job = Job(msg_id=msg_id, raw_msg=raw_msg, idents=idents, msg=msg,
442 job = Job(msg_id=msg_id, raw_msg=raw_msg, idents=idents, msg=msg,
442 header=header, targets=targets, after=after, follow=follow,
443 header=header, targets=targets, after=after, follow=follow,
443 timeout=timeout, metadata=md,
444 timeout=timeout, metadata=md,
444 )
445 )
445 if timeout:
446 # schedule timeout callback
447 self.loop.add_timeout(timeout, lambda : self.job_timeout(job))
448
449 # validate and reduce dependencies:
446 # validate and reduce dependencies:
450 for dep in after,follow:
447 for dep in after,follow:
451 if not dep: # empty dependency
448 if not dep: # empty dependency
@@ -469,11 +466,14 b' class TaskScheduler(SessionFactory):'
469 else:
466 else:
470 self.save_unmet(job)
467 self.save_unmet(job)
471
468
472 def job_timeout(self, job):
469 def job_timeout(self, job, timeout_id):
473 """callback for a job's timeout.
470 """callback for a job's timeout.
474
471
475 The job may or may not have been run at this point.
472 The job may or may not have been run at this point.
476 """
473 """
474 if job.timeout_id != timeout_id:
475 # not the most recent call
476 return
477 now = time.time()
477 now = time.time()
478 if job.timeout >= (now + 1):
478 if job.timeout >= (now + 1):
479 self.log.warn("task %s timeout fired prematurely: %s > %s",
479 self.log.warn("task %s timeout fired prematurely: %s > %s",
@@ -590,6 +590,14 b' class TaskScheduler(SessionFactory):'
590 if dep_id not in self.graph:
590 if dep_id not in self.graph:
591 self.graph[dep_id] = set()
591 self.graph[dep_id] = set()
592 self.graph[dep_id].add(msg_id)
592 self.graph[dep_id].add(msg_id)
593
594 # schedule timeout callback
595 if job.timeout:
596 timeout_id = job.timeout_id = job.timeout_id + 1
597 self.loop.add_timeout(time.time() + job.timeout,
598 lambda : self.job_timeout(job, timeout_id)
599 )
600
593
601
594 def submit_task(self, job, indices=None):
602 def submit_task(self, job, indices=None):
595 """Submit a task to any of a subset of our targets."""
603 """Submit a task to any of a subset of our targets."""
@@ -633,7 +641,7 b' class TaskScheduler(SessionFactory):'
633 else:
641 else:
634 self.finish_job(idx)
642 self.finish_job(idx)
635 except Exception:
643 except Exception:
636 self.log.error("task::Invaid result: %r", raw_msg, exc_info=True)
644 self.log.error("task::Invalid result: %r", raw_msg, exc_info=True)
637 return
645 return
638
646
639 md = msg['metadata']
647 md = msg['metadata']
General Comments 0
You need to be logged in to leave comments. Login now