Show More
@@ -19,8 +19,7 b' Authors:' | |||
|
19 | 19 | # Imports |
|
20 | 20 | #---------------------------------------------------------------------- |
|
21 | 21 | |
|
22 | from __future__ import print_function | |
|
23 | ||
|
22 | import heapq | |
|
24 | 23 | import logging |
|
25 | 24 | import sys |
|
26 | 25 | import time |
@@ -141,11 +140,17 b' class Job(object):' | |||
|
141 | 140 | self.after = after |
|
142 | 141 | self.follow = follow |
|
143 | 142 | self.timeout = timeout |
|
144 | ||
|
143 | self.removed = False # used for lazy-delete in heap-sorted queue | |
|
145 | 144 | |
|
146 | 145 | self.timestamp = time.time() |
|
147 | 146 | self.blacklist = set() |
|
148 | 147 | |
|
148 | def __lt__(self, other): | |
|
149 | return self.timestamp < other.timestamp | |
|
150 | ||
|
151 | def __cmp__(self, other): | |
|
152 | return cmp(self.timestamp, other.timestamp) | |
|
153 | ||
|
149 | 154 | @property |
|
150 | 155 | def dependents(self): |
|
151 | 156 | return self.follow.union(self.after) |
@@ -194,10 +199,11 b' class TaskScheduler(SessionFactory):' | |||
|
194 | 199 | query_stream = Instance(zmqstream.ZMQStream) # hub-facing DEALER stream |
|
195 | 200 | |
|
196 | 201 | # internals: |
|
202 | queue = List() # heap-sorted list of Jobs | |
|
203 | queue_map = Dict() # dict by msg_id of Jobs (for O(1) access to the Queue) | |
|
197 | 204 | graph = Dict() # dict by msg_id of [ msg_ids that depend on key ] |
|
198 | 205 | retries = Dict() # dict by msg_id of retries remaining (non-neg ints) |
|
199 | 206 | # waiting = List() # list of msg_ids ready to run, but haven't due to HWM |
|
200 | depending = Dict() # dict by msg_id of Jobs | |
|
201 | 207 | pending = Dict() # dict by engine_uuid of submitted tasks |
|
202 | 208 | completed = Dict() # dict by engine_uuid of completed tasks |
|
203 | 209 | failed = Dict() # dict by engine_uuid of failed tasks |
@@ -447,11 +453,11 b' class TaskScheduler(SessionFactory):' | |||
|
447 | 453 | continue |
|
448 | 454 | # check valid: |
|
449 | 455 | if msg_id in dep or dep.difference(self.all_ids): |
|
450 |
self. |
|
|
456 | self.queue_map[msg_id] = job | |
|
451 | 457 | return self.fail_unreachable(msg_id, error.InvalidDependency) |
|
452 | 458 | # check if unreachable: |
|
453 | 459 | if dep.unreachable(self.all_completed, self.all_failed): |
|
454 |
self. |
|
|
460 | self.queue_map[msg_id] = job | |
|
455 | 461 | return self.fail_unreachable(msg_id) |
|
456 | 462 | |
|
457 | 463 | if after.check(self.all_completed, self.all_failed): |
@@ -467,20 +473,22 b' class TaskScheduler(SessionFactory):' | |||
|
467 | 473 | def audit_timeouts(self): |
|
468 | 474 | """Audit all waiting tasks for expired timeouts.""" |
|
469 | 475 | now = datetime.now() |
|
470 |
for msg_id in self. |
|
|
476 | for msg_id in self.queue_map.keys(): | |
|
471 | 477 | # must recheck, in case one failure cascaded to another: |
|
472 |
if msg_id in self. |
|
|
473 |
job = self. |
|
|
478 | if msg_id in self.queue_map: | |
|
479 | job = self.queue_map[msg_id] | |
|
474 | 480 | if job.timeout and job.timeout < now: |
|
475 | 481 | self.fail_unreachable(msg_id, error.TaskTimeout) |
|
476 | 482 | |
|
477 | 483 | def fail_unreachable(self, msg_id, why=error.ImpossibleDependency): |
|
478 | 484 | """a task has become unreachable, send a reply with an ImpossibleDependency |
|
479 | 485 | error.""" |
|
480 |
if msg_id not in self. |
|
|
486 | if msg_id not in self.queue_map: | |
|
481 | 487 | self.log.error("msg %r already failed!", msg_id) |
|
482 | 488 | return |
|
483 |
job = self. |
|
|
489 | job = self.queue_map.pop(msg_id) | |
|
490 | # lazy-delete from the queue | |
|
491 | job.removed = True | |
|
484 | 492 | for mid in job.dependents: |
|
485 | 493 | if mid in self.graph: |
|
486 | 494 | self.graph[mid].remove(msg_id) |
@@ -549,14 +557,14 b' class TaskScheduler(SessionFactory):' | |||
|
549 | 557 | for m in job.follow.intersection(relevant): |
|
550 | 558 | dests.add(self.destinations[m]) |
|
551 | 559 | if len(dests) > 1: |
|
552 |
self. |
|
|
560 | self.queue_map[msg_id] = job | |
|
553 | 561 | self.fail_unreachable(msg_id) |
|
554 | 562 | return False |
|
555 | 563 | if job.targets: |
|
556 | 564 | # check blacklist+targets for impossibility |
|
557 | 565 | job.targets.difference_update(job.blacklist) |
|
558 | 566 | if not job.targets or not job.targets.intersection(self.targets): |
|
559 |
self. |
|
|
567 | self.queue_map[msg_id] = job | |
|
560 | 568 | self.fail_unreachable(msg_id) |
|
561 | 569 | return False |
|
562 | 570 | return False |
@@ -569,7 +577,8 b' class TaskScheduler(SessionFactory):' | |||
|
569 | 577 | def save_unmet(self, job): |
|
570 | 578 | """Save a message for later submission when its dependencies are met.""" |
|
571 | 579 | msg_id = job.msg_id |
|
572 |
self. |
|
|
580 | self.queue_map[msg_id] = job | |
|
581 | heapq.heappush(self.queue, job) | |
|
573 | 582 | # track the ids in follow or after, but not those already finished |
|
574 | 583 | for dep_id in job.after.union(job.follow).difference(self.all_done): |
|
575 | 584 | if dep_id not in self.graph: |
@@ -672,7 +681,7 b' class TaskScheduler(SessionFactory):' | |||
|
672 | 681 | job.blacklist.add(engine) |
|
673 | 682 | |
|
674 | 683 | if job.blacklist == job.targets: |
|
675 |
self. |
|
|
684 | self.queue_map[msg_id] = job | |
|
676 | 685 | self.fail_unreachable(msg_id) |
|
677 | 686 | elif not self.maybe_run(job): |
|
678 | 687 | # resubmit failed |
@@ -698,31 +707,42 b' class TaskScheduler(SessionFactory):' | |||
|
698 | 707 | # print ("\n\n***********") |
|
699 | 708 | # pprint (dep_id) |
|
700 | 709 | # pprint (self.graph) |
|
701 |
# pprint (self. |
|
|
710 | # pprint (self.queue_map) | |
|
702 | 711 | # pprint (self.all_completed) |
|
703 | 712 | # pprint (self.all_failed) |
|
704 | 713 | # print ("\n\n***********\n\n") |
|
705 | 714 | # update any jobs that depended on the dependency |
|
706 |
|
|
|
715 | msg_ids = self.graph.pop(dep_id, []) | |
|
707 | 716 | |
|
708 | 717 | # recheck *all* jobs if |
|
709 | 718 | # a) we have HWM and an engine just become no longer full |
|
710 | 719 | # or b) dep_id was given as None |
|
711 | 720 | |
|
712 | 721 | if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]): |
|
713 |
jobs = self. |
|
|
722 | jobs = self.queue | |
|
723 | using_queue = True | |
|
724 | else: | |
|
725 | using_queue = False | |
|
726 | jobs = heapq.heapify([ self.queue_map[msg_id] for msg_id in msg_ids ]) | |
|
714 | 727 | |
|
715 | for msg_id in sorted(jobs, key=lambda msg_id: self.depending[msg_id].timestamp): | |
|
716 | job = self.depending[msg_id] | |
|
717 | ||
|
728 | to_restore = [] | |
|
729 | while jobs: | |
|
730 | job = heapq.heappop(jobs) | |
|
731 | if job.removed: | |
|
732 | continue | |
|
733 | msg_id = job.msg_id | |
|
734 | ||
|
735 | put_it_back = True | |
|
736 | ||
|
718 | 737 | if job.after.unreachable(self.all_completed, self.all_failed)\ |
|
719 | 738 | or job.follow.unreachable(self.all_completed, self.all_failed): |
|
720 | 739 | self.fail_unreachable(msg_id) |
|
740 | put_it_back = False | |
|
721 | 741 | |
|
722 | 742 | elif job.after.check(self.all_completed, self.all_failed): # time deps met, maybe run |
|
723 | 743 | if self.maybe_run(job): |
|
724 | ||
|
725 |
self. |
|
|
744 | put_it_back = False | |
|
745 | self.queue_map.pop(msg_id) | |
|
726 | 746 | for mid in job.dependents: |
|
727 | 747 | if mid in self.graph: |
|
728 | 748 | self.graph[mid].remove(msg_id) |
@@ -733,7 +753,17 b' class TaskScheduler(SessionFactory):' | |||
|
733 | 753 | # non-full, and all tasks after the first are checked, |
|
734 | 754 | # even though they can't run. |
|
735 | 755 | if not self.available_engines(): |
|
736 |
|
|
|
756 | break | |
|
757 | ||
|
758 | if using_queue and put_it_back: | |
|
759 | # popped a job from the queue but it neither ran nor failed, | |
|
760 | # so we need to put it back when we are done | |
|
761 | to_restore.append(job) | |
|
762 | ||
|
763 | # put back any tasks we popped but didn't run | |
|
764 | for job in to_restore: | |
|
765 | heapq.heappush(self.queue, job) | |
|
766 | ||
|
737 | 767 | |
|
738 | 768 | #---------------------------------------------------------------------- |
|
739 | 769 | # methods to be overridden by subclasses |
General Comments 0
You need to be logged in to leave comments.
Login now