Show More
@@ -19,8 +19,7 b' Authors:' | |||||
19 | # Imports |
|
19 | # Imports | |
20 | #---------------------------------------------------------------------- |
|
20 | #---------------------------------------------------------------------- | |
21 |
|
21 | |||
22 | from __future__ import print_function |
|
22 | import heapq | |
23 |
|
||||
24 | import logging |
|
23 | import logging | |
25 | import sys |
|
24 | import sys | |
26 | import time |
|
25 | import time | |
@@ -141,11 +140,17 b' class Job(object):' | |||||
141 | self.after = after |
|
140 | self.after = after | |
142 | self.follow = follow |
|
141 | self.follow = follow | |
143 | self.timeout = timeout |
|
142 | self.timeout = timeout | |
144 |
|
143 | self.removed = False # used for lazy-delete in heap-sorted queue | ||
145 |
|
144 | |||
146 | self.timestamp = time.time() |
|
145 | self.timestamp = time.time() | |
147 | self.blacklist = set() |
|
146 | self.blacklist = set() | |
148 |
|
147 | |||
|
148 | def __lt__(self, other): | |||
|
149 | return self.timestamp < other.timestamp | |||
|
150 | ||||
|
151 | def __cmp__(self, other): | |||
|
152 | return cmp(self.timestamp, other.timestamp) | |||
|
153 | ||||
149 | @property |
|
154 | @property | |
150 | def dependents(self): |
|
155 | def dependents(self): | |
151 | return self.follow.union(self.after) |
|
156 | return self.follow.union(self.after) | |
@@ -194,10 +199,11 b' class TaskScheduler(SessionFactory):' | |||||
194 | query_stream = Instance(zmqstream.ZMQStream) # hub-facing DEALER stream |
|
199 | query_stream = Instance(zmqstream.ZMQStream) # hub-facing DEALER stream | |
195 |
|
200 | |||
196 | # internals: |
|
201 | # internals: | |
|
202 | queue = List() # heap-sorted list of Jobs | |||
|
203 | queue_map = Dict() # dict by msg_id of Jobs (for O(1) access to the Queue) | |||
197 | graph = Dict() # dict by msg_id of [ msg_ids that depend on key ] |
|
204 | graph = Dict() # dict by msg_id of [ msg_ids that depend on key ] | |
198 | retries = Dict() # dict by msg_id of retries remaining (non-neg ints) |
|
205 | retries = Dict() # dict by msg_id of retries remaining (non-neg ints) | |
199 | # waiting = List() # list of msg_ids ready to run, but haven't due to HWM |
|
206 | # waiting = List() # list of msg_ids ready to run, but haven't due to HWM | |
200 | depending = Dict() # dict by msg_id of Jobs |
|
|||
201 | pending = Dict() # dict by engine_uuid of submitted tasks |
|
207 | pending = Dict() # dict by engine_uuid of submitted tasks | |
202 | completed = Dict() # dict by engine_uuid of completed tasks |
|
208 | completed = Dict() # dict by engine_uuid of completed tasks | |
203 | failed = Dict() # dict by engine_uuid of failed tasks |
|
209 | failed = Dict() # dict by engine_uuid of failed tasks | |
@@ -447,11 +453,11 b' class TaskScheduler(SessionFactory):' | |||||
447 | continue |
|
453 | continue | |
448 | # check valid: |
|
454 | # check valid: | |
449 | if msg_id in dep or dep.difference(self.all_ids): |
|
455 | if msg_id in dep or dep.difference(self.all_ids): | |
450 |
self. |
|
456 | self.queue_map[msg_id] = job | |
451 | return self.fail_unreachable(msg_id, error.InvalidDependency) |
|
457 | return self.fail_unreachable(msg_id, error.InvalidDependency) | |
452 | # check if unreachable: |
|
458 | # check if unreachable: | |
453 | if dep.unreachable(self.all_completed, self.all_failed): |
|
459 | if dep.unreachable(self.all_completed, self.all_failed): | |
454 |
self. |
|
460 | self.queue_map[msg_id] = job | |
455 | return self.fail_unreachable(msg_id) |
|
461 | return self.fail_unreachable(msg_id) | |
456 |
|
462 | |||
457 | if after.check(self.all_completed, self.all_failed): |
|
463 | if after.check(self.all_completed, self.all_failed): | |
@@ -467,20 +473,22 b' class TaskScheduler(SessionFactory):' | |||||
467 | def audit_timeouts(self): |
|
473 | def audit_timeouts(self): | |
468 | """Audit all waiting tasks for expired timeouts.""" |
|
474 | """Audit all waiting tasks for expired timeouts.""" | |
469 | now = datetime.now() |
|
475 | now = datetime.now() | |
470 |
for msg_id in self. |
|
476 | for msg_id in self.queue_map.keys(): | |
471 | # must recheck, in case one failure cascaded to another: |
|
477 | # must recheck, in case one failure cascaded to another: | |
472 |
if msg_id in self. |
|
478 | if msg_id in self.queue_map: | |
473 |
job = self. |
|
479 | job = self.queue_map[msg_id] | |
474 | if job.timeout and job.timeout < now: |
|
480 | if job.timeout and job.timeout < now: | |
475 | self.fail_unreachable(msg_id, error.TaskTimeout) |
|
481 | self.fail_unreachable(msg_id, error.TaskTimeout) | |
476 |
|
482 | |||
477 | def fail_unreachable(self, msg_id, why=error.ImpossibleDependency): |
|
483 | def fail_unreachable(self, msg_id, why=error.ImpossibleDependency): | |
478 | """a task has become unreachable, send a reply with an ImpossibleDependency |
|
484 | """a task has become unreachable, send a reply with an ImpossibleDependency | |
479 | error.""" |
|
485 | error.""" | |
480 |
if msg_id not in self. |
|
486 | if msg_id not in self.queue_map: | |
481 | self.log.error("msg %r already failed!", msg_id) |
|
487 | self.log.error("msg %r already failed!", msg_id) | |
482 | return |
|
488 | return | |
483 |
job = self. |
|
489 | job = self.queue_map.pop(msg_id) | |
|
490 | # lazy-delete from the queue | |||
|
491 | job.removed = True | |||
484 | for mid in job.dependents: |
|
492 | for mid in job.dependents: | |
485 | if mid in self.graph: |
|
493 | if mid in self.graph: | |
486 | self.graph[mid].remove(msg_id) |
|
494 | self.graph[mid].remove(msg_id) | |
@@ -549,14 +557,14 b' class TaskScheduler(SessionFactory):' | |||||
549 | for m in job.follow.intersection(relevant): |
|
557 | for m in job.follow.intersection(relevant): | |
550 | dests.add(self.destinations[m]) |
|
558 | dests.add(self.destinations[m]) | |
551 | if len(dests) > 1: |
|
559 | if len(dests) > 1: | |
552 |
self. |
|
560 | self.queue_map[msg_id] = job | |
553 | self.fail_unreachable(msg_id) |
|
561 | self.fail_unreachable(msg_id) | |
554 | return False |
|
562 | return False | |
555 | if job.targets: |
|
563 | if job.targets: | |
556 | # check blacklist+targets for impossibility |
|
564 | # check blacklist+targets for impossibility | |
557 | job.targets.difference_update(job.blacklist) |
|
565 | job.targets.difference_update(job.blacklist) | |
558 | if not job.targets or not job.targets.intersection(self.targets): |
|
566 | if not job.targets or not job.targets.intersection(self.targets): | |
559 |
self. |
|
567 | self.queue_map[msg_id] = job | |
560 | self.fail_unreachable(msg_id) |
|
568 | self.fail_unreachable(msg_id) | |
561 | return False |
|
569 | return False | |
562 | return False |
|
570 | return False | |
@@ -569,7 +577,8 b' class TaskScheduler(SessionFactory):' | |||||
569 | def save_unmet(self, job): |
|
577 | def save_unmet(self, job): | |
570 | """Save a message for later submission when its dependencies are met.""" |
|
578 | """Save a message for later submission when its dependencies are met.""" | |
571 | msg_id = job.msg_id |
|
579 | msg_id = job.msg_id | |
572 |
self. |
|
580 | self.queue_map[msg_id] = job | |
|
581 | heapq.heappush(self.queue, job) | |||
573 | # track the ids in follow or after, but not those already finished |
|
582 | # track the ids in follow or after, but not those already finished | |
574 | for dep_id in job.after.union(job.follow).difference(self.all_done): |
|
583 | for dep_id in job.after.union(job.follow).difference(self.all_done): | |
575 | if dep_id not in self.graph: |
|
584 | if dep_id not in self.graph: | |
@@ -672,7 +681,7 b' class TaskScheduler(SessionFactory):' | |||||
672 | job.blacklist.add(engine) |
|
681 | job.blacklist.add(engine) | |
673 |
|
682 | |||
674 | if job.blacklist == job.targets: |
|
683 | if job.blacklist == job.targets: | |
675 |
self. |
|
684 | self.queue_map[msg_id] = job | |
676 | self.fail_unreachable(msg_id) |
|
685 | self.fail_unreachable(msg_id) | |
677 | elif not self.maybe_run(job): |
|
686 | elif not self.maybe_run(job): | |
678 | # resubmit failed |
|
687 | # resubmit failed | |
@@ -698,31 +707,42 b' class TaskScheduler(SessionFactory):' | |||||
698 | # print ("\n\n***********") |
|
707 | # print ("\n\n***********") | |
699 | # pprint (dep_id) |
|
708 | # pprint (dep_id) | |
700 | # pprint (self.graph) |
|
709 | # pprint (self.graph) | |
701 |
# pprint (self. |
|
710 | # pprint (self.queue_map) | |
702 | # pprint (self.all_completed) |
|
711 | # pprint (self.all_completed) | |
703 | # pprint (self.all_failed) |
|
712 | # pprint (self.all_failed) | |
704 | # print ("\n\n***********\n\n") |
|
713 | # print ("\n\n***********\n\n") | |
705 | # update any jobs that depended on the dependency |
|
714 | # update any jobs that depended on the dependency | |
706 |
|
|
715 | msg_ids = self.graph.pop(dep_id, []) | |
707 |
|
716 | |||
708 | # recheck *all* jobs if |
|
717 | # recheck *all* jobs if | |
709 | # a) we have HWM and an engine just become no longer full |
|
718 | # a) we have HWM and an engine just become no longer full | |
710 | # or b) dep_id was given as None |
|
719 | # or b) dep_id was given as None | |
711 |
|
720 | |||
712 | if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]): |
|
721 | if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]): | |
713 |
jobs = self. |
|
722 | jobs = self.queue | |
|
723 | using_queue = True | |||
|
724 | else: | |||
|
725 | using_queue = False | |||
|
726 | jobs = heapq.heapify([ self.queue_map[msg_id] for msg_id in msg_ids ]) | |||
714 |
|
727 | |||
715 | for msg_id in sorted(jobs, key=lambda msg_id: self.depending[msg_id].timestamp): |
|
728 | to_restore = [] | |
716 | job = self.depending[msg_id] |
|
729 | while jobs: | |
717 |
|
730 | job = heapq.heappop(jobs) | ||
|
731 | if job.removed: | |||
|
732 | continue | |||
|
733 | msg_id = job.msg_id | |||
|
734 | ||||
|
735 | put_it_back = True | |||
|
736 | ||||
718 | if job.after.unreachable(self.all_completed, self.all_failed)\ |
|
737 | if job.after.unreachable(self.all_completed, self.all_failed)\ | |
719 | or job.follow.unreachable(self.all_completed, self.all_failed): |
|
738 | or job.follow.unreachable(self.all_completed, self.all_failed): | |
720 | self.fail_unreachable(msg_id) |
|
739 | self.fail_unreachable(msg_id) | |
|
740 | put_it_back = False | |||
721 |
|
741 | |||
722 | elif job.after.check(self.all_completed, self.all_failed): # time deps met, maybe run |
|
742 | elif job.after.check(self.all_completed, self.all_failed): # time deps met, maybe run | |
723 | if self.maybe_run(job): |
|
743 | if self.maybe_run(job): | |
724 |
|
744 | put_it_back = False | ||
725 |
self. |
|
745 | self.queue_map.pop(msg_id) | |
726 | for mid in job.dependents: |
|
746 | for mid in job.dependents: | |
727 | if mid in self.graph: |
|
747 | if mid in self.graph: | |
728 | self.graph[mid].remove(msg_id) |
|
748 | self.graph[mid].remove(msg_id) | |
@@ -733,7 +753,17 b' class TaskScheduler(SessionFactory):' | |||||
733 | # non-full, and all tasks after the first are checked, |
|
753 | # non-full, and all tasks after the first are checked, | |
734 | # even though they can't run. |
|
754 | # even though they can't run. | |
735 | if not self.available_engines(): |
|
755 | if not self.available_engines(): | |
736 |
|
|
756 | break | |
|
757 | ||||
|
758 | if using_queue and put_it_back: | |||
|
759 | # popped a job from the queue but it neither ran nor failed, | |||
|
760 | # so we need to put it back when we are done | |||
|
761 | to_restore.append(job) | |||
|
762 | ||||
|
763 | # put back any tasks we popped but didn't run | |||
|
764 | for job in to_restore: | |||
|
765 | heapq.heappush(self.queue, job) | |||
|
766 | ||||
737 |
|
767 | |||
738 | #---------------------------------------------------------------------- |
|
768 | #---------------------------------------------------------------------- | |
739 | # methods to be overridden by subclasses |
|
769 | # methods to be overridden by subclasses |
General Comments 0
You need to be logged in to leave comments.
Login now