##// END OF EJS Templates
use heap-sorted queue...
MinRK -
Show More
@@ -19,8 +19,7 b' Authors:'
19 19 # Imports
20 20 #----------------------------------------------------------------------
21 21
22 from __future__ import print_function
23
22 import heapq
24 23 import logging
25 24 import sys
26 25 import time
@@ -141,11 +140,17 b' class Job(object):'
141 140 self.after = after
142 141 self.follow = follow
143 142 self.timeout = timeout
144
143 self.removed = False # used for lazy-delete in heap-sorted queue
145 144
146 145 self.timestamp = time.time()
147 146 self.blacklist = set()
148 147
148 def __lt__(self, other):
149 return self.timestamp < other.timestamp
150
151 def __cmp__(self, other):
152 return cmp(self.timestamp, other.timestamp)
153
149 154 @property
150 155 def dependents(self):
151 156 return self.follow.union(self.after)
@@ -194,10 +199,11 b' class TaskScheduler(SessionFactory):'
194 199 query_stream = Instance(zmqstream.ZMQStream) # hub-facing DEALER stream
195 200
196 201 # internals:
202 queue = List() # heap-sorted list of Jobs
203 queue_map = Dict() # dict by msg_id of Jobs (for O(1) access to the Queue)
197 204 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
198 205 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
199 206 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
200 depending = Dict() # dict by msg_id of Jobs
201 207 pending = Dict() # dict by engine_uuid of submitted tasks
202 208 completed = Dict() # dict by engine_uuid of completed tasks
203 209 failed = Dict() # dict by engine_uuid of failed tasks
@@ -447,11 +453,11 b' class TaskScheduler(SessionFactory):'
447 453 continue
448 454 # check valid:
449 455 if msg_id in dep or dep.difference(self.all_ids):
450 self.depending[msg_id] = job
456 self.queue_map[msg_id] = job
451 457 return self.fail_unreachable(msg_id, error.InvalidDependency)
452 458 # check if unreachable:
453 459 if dep.unreachable(self.all_completed, self.all_failed):
454 self.depending[msg_id] = job
460 self.queue_map[msg_id] = job
455 461 return self.fail_unreachable(msg_id)
456 462
457 463 if after.check(self.all_completed, self.all_failed):
@@ -467,20 +473,22 b' class TaskScheduler(SessionFactory):'
467 473 def audit_timeouts(self):
468 474 """Audit all waiting tasks for expired timeouts."""
469 475 now = datetime.now()
470 for msg_id in self.depending.keys():
476 for msg_id in self.queue_map.keys():
471 477 # must recheck, in case one failure cascaded to another:
472 if msg_id in self.depending:
473 job = self.depending[msg_id]
478 if msg_id in self.queue_map:
479 job = self.queue_map[msg_id]
474 480 if job.timeout and job.timeout < now:
475 481 self.fail_unreachable(msg_id, error.TaskTimeout)
476 482
477 483 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
478 484 """a task has become unreachable, send a reply with an ImpossibleDependency
479 485 error."""
480 if msg_id not in self.depending:
486 if msg_id not in self.queue_map:
481 487 self.log.error("msg %r already failed!", msg_id)
482 488 return
483 job = self.depending.pop(msg_id)
489 job = self.queue_map.pop(msg_id)
490 # lazy-delete from the queue
491 job.removed = True
484 492 for mid in job.dependents:
485 493 if mid in self.graph:
486 494 self.graph[mid].remove(msg_id)
@@ -549,14 +557,14 b' class TaskScheduler(SessionFactory):'
549 557 for m in job.follow.intersection(relevant):
550 558 dests.add(self.destinations[m])
551 559 if len(dests) > 1:
552 self.depending[msg_id] = job
560 self.queue_map[msg_id] = job
553 561 self.fail_unreachable(msg_id)
554 562 return False
555 563 if job.targets:
556 564 # check blacklist+targets for impossibility
557 565 job.targets.difference_update(job.blacklist)
558 566 if not job.targets or not job.targets.intersection(self.targets):
559 self.depending[msg_id] = job
567 self.queue_map[msg_id] = job
560 568 self.fail_unreachable(msg_id)
561 569 return False
562 570 return False
@@ -569,7 +577,8 b' class TaskScheduler(SessionFactory):'
569 577 def save_unmet(self, job):
570 578 """Save a message for later submission when its dependencies are met."""
571 579 msg_id = job.msg_id
572 self.depending[msg_id] = job
580 self.queue_map[msg_id] = job
581 heapq.heappush(self.queue, job)
573 582 # track the ids in follow or after, but not those already finished
574 583 for dep_id in job.after.union(job.follow).difference(self.all_done):
575 584 if dep_id not in self.graph:
@@ -672,7 +681,7 b' class TaskScheduler(SessionFactory):'
672 681 job.blacklist.add(engine)
673 682
674 683 if job.blacklist == job.targets:
675 self.depending[msg_id] = job
684 self.queue_map[msg_id] = job
676 685 self.fail_unreachable(msg_id)
677 686 elif not self.maybe_run(job):
678 687 # resubmit failed
@@ -698,31 +707,42 b' class TaskScheduler(SessionFactory):'
698 707 # print ("\n\n***********")
699 708 # pprint (dep_id)
700 709 # pprint (self.graph)
701 # pprint (self.depending)
710 # pprint (self.queue_map)
702 711 # pprint (self.all_completed)
703 712 # pprint (self.all_failed)
704 713 # print ("\n\n***********\n\n")
705 714 # update any jobs that depended on the dependency
706 jobs = self.graph.pop(dep_id, [])
715 msg_ids = self.graph.pop(dep_id, [])
707 716
708 717 # recheck *all* jobs if
709 718 # a) we have HWM and an engine just become no longer full
710 719 # or b) dep_id was given as None
711 720
712 721 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
713 jobs = self.depending.keys()
722 jobs = self.queue
723 using_queue = True
724 else:
725 using_queue = False
726 jobs = heapq.heapify([ self.queue_map[msg_id] for msg_id in msg_ids ])
714 727
715 for msg_id in sorted(jobs, key=lambda msg_id: self.depending[msg_id].timestamp):
716 job = self.depending[msg_id]
717
728 to_restore = []
729 while jobs:
730 job = heapq.heappop(jobs)
731 if job.removed:
732 continue
733 msg_id = job.msg_id
734
735 put_it_back = True
736
718 737 if job.after.unreachable(self.all_completed, self.all_failed)\
719 738 or job.follow.unreachable(self.all_completed, self.all_failed):
720 739 self.fail_unreachable(msg_id)
740 put_it_back = False
721 741
722 742 elif job.after.check(self.all_completed, self.all_failed): # time deps met, maybe run
723 743 if self.maybe_run(job):
724
725 self.depending.pop(msg_id)
744 put_it_back = False
745 self.queue_map.pop(msg_id)
726 746 for mid in job.dependents:
727 747 if mid in self.graph:
728 748 self.graph[mid].remove(msg_id)
@@ -733,7 +753,17 b' class TaskScheduler(SessionFactory):'
733 753 # non-full, and all tasks after the first are checked,
734 754 # even though they can't run.
735 755 if not self.available_engines():
736 return
756 break
757
758 if using_queue and put_it_back:
759 # popped a job from the queue but it neither ran nor failed,
760 # so we need to put it back when we are done
761 to_restore.append(job)
762
763 # put back any tasks we popped but didn't run
764 for job in to_restore:
765 heapq.heappush(self.queue, job)
766
737 767
738 768 #----------------------------------------------------------------------
739 769 # methods to be overridden by subclasses
General Comments 0
You need to be logged in to leave comments. Login now