##// END OF EJS Templates
use heap-sorted queue...
MinRK -
Show More
@@ -19,8 +19,7 b' Authors:'
19 # Imports
19 # Imports
20 #----------------------------------------------------------------------
20 #----------------------------------------------------------------------
21
21
22 from __future__ import print_function
22 import heapq
23
24 import logging
23 import logging
25 import sys
24 import sys
26 import time
25 import time
@@ -141,11 +140,17 b' class Job(object):'
141 self.after = after
140 self.after = after
142 self.follow = follow
141 self.follow = follow
143 self.timeout = timeout
142 self.timeout = timeout
144
143 self.removed = False # used for lazy-delete in heap-sorted queue
145
144
146 self.timestamp = time.time()
145 self.timestamp = time.time()
147 self.blacklist = set()
146 self.blacklist = set()
148
147
148 def __lt__(self, other):
149 return self.timestamp < other.timestamp
150
151 def __cmp__(self, other):
152 return cmp(self.timestamp, other.timestamp)
153
149 @property
154 @property
150 def dependents(self):
155 def dependents(self):
151 return self.follow.union(self.after)
156 return self.follow.union(self.after)
@@ -194,10 +199,11 b' class TaskScheduler(SessionFactory):'
194 query_stream = Instance(zmqstream.ZMQStream) # hub-facing DEALER stream
199 query_stream = Instance(zmqstream.ZMQStream) # hub-facing DEALER stream
195
200
196 # internals:
201 # internals:
202 queue = List() # heap-sorted list of Jobs
203 queue_map = Dict() # dict by msg_id of Jobs (for O(1) access to the Queue)
197 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
204 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
198 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
205 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
199 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
206 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
200 depending = Dict() # dict by msg_id of Jobs
201 pending = Dict() # dict by engine_uuid of submitted tasks
207 pending = Dict() # dict by engine_uuid of submitted tasks
202 completed = Dict() # dict by engine_uuid of completed tasks
208 completed = Dict() # dict by engine_uuid of completed tasks
203 failed = Dict() # dict by engine_uuid of failed tasks
209 failed = Dict() # dict by engine_uuid of failed tasks
@@ -447,11 +453,11 b' class TaskScheduler(SessionFactory):'
447 continue
453 continue
448 # check valid:
454 # check valid:
449 if msg_id in dep or dep.difference(self.all_ids):
455 if msg_id in dep or dep.difference(self.all_ids):
450 self.depending[msg_id] = job
456 self.queue_map[msg_id] = job
451 return self.fail_unreachable(msg_id, error.InvalidDependency)
457 return self.fail_unreachable(msg_id, error.InvalidDependency)
452 # check if unreachable:
458 # check if unreachable:
453 if dep.unreachable(self.all_completed, self.all_failed):
459 if dep.unreachable(self.all_completed, self.all_failed):
454 self.depending[msg_id] = job
460 self.queue_map[msg_id] = job
455 return self.fail_unreachable(msg_id)
461 return self.fail_unreachable(msg_id)
456
462
457 if after.check(self.all_completed, self.all_failed):
463 if after.check(self.all_completed, self.all_failed):
@@ -467,20 +473,22 b' class TaskScheduler(SessionFactory):'
467 def audit_timeouts(self):
473 def audit_timeouts(self):
468 """Audit all waiting tasks for expired timeouts."""
474 """Audit all waiting tasks for expired timeouts."""
469 now = datetime.now()
475 now = datetime.now()
470 for msg_id in self.depending.keys():
476 for msg_id in self.queue_map.keys():
471 # must recheck, in case one failure cascaded to another:
477 # must recheck, in case one failure cascaded to another:
472 if msg_id in self.depending:
478 if msg_id in self.queue_map:
473 job = self.depending[msg_id]
479 job = self.queue_map[msg_id]
474 if job.timeout and job.timeout < now:
480 if job.timeout and job.timeout < now:
475 self.fail_unreachable(msg_id, error.TaskTimeout)
481 self.fail_unreachable(msg_id, error.TaskTimeout)
476
482
477 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
483 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
478 """a task has become unreachable, send a reply with an ImpossibleDependency
484 """a task has become unreachable, send a reply with an ImpossibleDependency
479 error."""
485 error."""
480 if msg_id not in self.depending:
486 if msg_id not in self.queue_map:
481 self.log.error("msg %r already failed!", msg_id)
487 self.log.error("msg %r already failed!", msg_id)
482 return
488 return
483 job = self.depending.pop(msg_id)
489 job = self.queue_map.pop(msg_id)
490 # lazy-delete from the queue
491 job.removed = True
484 for mid in job.dependents:
492 for mid in job.dependents:
485 if mid in self.graph:
493 if mid in self.graph:
486 self.graph[mid].remove(msg_id)
494 self.graph[mid].remove(msg_id)
@@ -549,14 +557,14 b' class TaskScheduler(SessionFactory):'
549 for m in job.follow.intersection(relevant):
557 for m in job.follow.intersection(relevant):
550 dests.add(self.destinations[m])
558 dests.add(self.destinations[m])
551 if len(dests) > 1:
559 if len(dests) > 1:
552 self.depending[msg_id] = job
560 self.queue_map[msg_id] = job
553 self.fail_unreachable(msg_id)
561 self.fail_unreachable(msg_id)
554 return False
562 return False
555 if job.targets:
563 if job.targets:
556 # check blacklist+targets for impossibility
564 # check blacklist+targets for impossibility
557 job.targets.difference_update(job.blacklist)
565 job.targets.difference_update(job.blacklist)
558 if not job.targets or not job.targets.intersection(self.targets):
566 if not job.targets or not job.targets.intersection(self.targets):
559 self.depending[msg_id] = job
567 self.queue_map[msg_id] = job
560 self.fail_unreachable(msg_id)
568 self.fail_unreachable(msg_id)
561 return False
569 return False
562 return False
570 return False
@@ -569,7 +577,8 b' class TaskScheduler(SessionFactory):'
569 def save_unmet(self, job):
577 def save_unmet(self, job):
570 """Save a message for later submission when its dependencies are met."""
578 """Save a message for later submission when its dependencies are met."""
571 msg_id = job.msg_id
579 msg_id = job.msg_id
572 self.depending[msg_id] = job
580 self.queue_map[msg_id] = job
581 heapq.heappush(self.queue, job)
573 # track the ids in follow or after, but not those already finished
582 # track the ids in follow or after, but not those already finished
574 for dep_id in job.after.union(job.follow).difference(self.all_done):
583 for dep_id in job.after.union(job.follow).difference(self.all_done):
575 if dep_id not in self.graph:
584 if dep_id not in self.graph:
@@ -672,7 +681,7 b' class TaskScheduler(SessionFactory):'
672 job.blacklist.add(engine)
681 job.blacklist.add(engine)
673
682
674 if job.blacklist == job.targets:
683 if job.blacklist == job.targets:
675 self.depending[msg_id] = job
684 self.queue_map[msg_id] = job
676 self.fail_unreachable(msg_id)
685 self.fail_unreachable(msg_id)
677 elif not self.maybe_run(job):
686 elif not self.maybe_run(job):
678 # resubmit failed
687 # resubmit failed
@@ -698,31 +707,42 b' class TaskScheduler(SessionFactory):'
698 # print ("\n\n***********")
707 # print ("\n\n***********")
699 # pprint (dep_id)
708 # pprint (dep_id)
700 # pprint (self.graph)
709 # pprint (self.graph)
701 # pprint (self.depending)
710 # pprint (self.queue_map)
702 # pprint (self.all_completed)
711 # pprint (self.all_completed)
703 # pprint (self.all_failed)
712 # pprint (self.all_failed)
704 # print ("\n\n***********\n\n")
713 # print ("\n\n***********\n\n")
705 # update any jobs that depended on the dependency
714 # update any jobs that depended on the dependency
706 jobs = self.graph.pop(dep_id, [])
715 msg_ids = self.graph.pop(dep_id, [])
707
716
708 # recheck *all* jobs if
717 # recheck *all* jobs if
709 # a) we have HWM and an engine just become no longer full
718 # a) we have HWM and an engine just become no longer full
710 # or b) dep_id was given as None
719 # or b) dep_id was given as None
711
720
712 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
721 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
713 jobs = self.depending.keys()
722 jobs = self.queue
723 using_queue = True
724 else:
725 using_queue = False
726 jobs = heapq.heapify([ self.queue_map[msg_id] for msg_id in msg_ids ])
714
727
715 for msg_id in sorted(jobs, key=lambda msg_id: self.depending[msg_id].timestamp):
728 to_restore = []
716 job = self.depending[msg_id]
729 while jobs:
717
730 job = heapq.heappop(jobs)
731 if job.removed:
732 continue
733 msg_id = job.msg_id
734
735 put_it_back = True
736
718 if job.after.unreachable(self.all_completed, self.all_failed)\
737 if job.after.unreachable(self.all_completed, self.all_failed)\
719 or job.follow.unreachable(self.all_completed, self.all_failed):
738 or job.follow.unreachable(self.all_completed, self.all_failed):
720 self.fail_unreachable(msg_id)
739 self.fail_unreachable(msg_id)
740 put_it_back = False
721
741
722 elif job.after.check(self.all_completed, self.all_failed): # time deps met, maybe run
742 elif job.after.check(self.all_completed, self.all_failed): # time deps met, maybe run
723 if self.maybe_run(job):
743 if self.maybe_run(job):
724
744 put_it_back = False
725 self.depending.pop(msg_id)
745 self.queue_map.pop(msg_id)
726 for mid in job.dependents:
746 for mid in job.dependents:
727 if mid in self.graph:
747 if mid in self.graph:
728 self.graph[mid].remove(msg_id)
748 self.graph[mid].remove(msg_id)
@@ -733,7 +753,17 b' class TaskScheduler(SessionFactory):'
733 # non-full, and all tasks after the first are checked,
753 # non-full, and all tasks after the first are checked,
734 # even though they can't run.
754 # even though they can't run.
735 if not self.available_engines():
755 if not self.available_engines():
736 return
756 break
757
758 if using_queue and put_it_back:
759 # popped a job from the queue but it neither ran nor failed,
760 # so we need to put it back when we are done
761 to_restore.append(job)
762
763 # put back any tasks we popped but didn't run
764 for job in to_restore:
765 heapq.heappush(self.queue, job)
766
737
767
738 #----------------------------------------------------------------------
768 #----------------------------------------------------------------------
739 # methods to be overridden by subclasses
769 # methods to be overridden by subclasses
General Comments 0
You need to be logged in to leave comments. Login now