From 2ea5e806c568f455f4d842ec849a128f190052e1 2013-03-29 18:22:43 From: MinRK Date: 2013-03-29 18:22:43 Subject: [PATCH] use heap-sorted queue avoids having to sort the queue when updating the graph also renamed `depending` to `queue_map`, to better describe its role. --- diff --git a/IPython/parallel/controller/scheduler.py b/IPython/parallel/controller/scheduler.py index 7e2b2cb..0133406 100644 --- a/IPython/parallel/controller/scheduler.py +++ b/IPython/parallel/controller/scheduler.py @@ -19,8 +19,7 @@ Authors: # Imports #---------------------------------------------------------------------- -from __future__ import print_function - +import heapq import logging import sys import time @@ -141,11 +140,17 @@ class Job(object): self.after = after self.follow = follow self.timeout = timeout - + self.removed = False # used for lazy-delete in heap-sorted queue self.timestamp = time.time() self.blacklist = set() + def __lt__(self, other): + return self.timestamp < other.timestamp + + def __cmp__(self, other): + return cmp(self.timestamp, other.timestamp) + @property def dependents(self): return self.follow.union(self.after) @@ -194,10 +199,11 @@ class TaskScheduler(SessionFactory): query_stream = Instance(zmqstream.ZMQStream) # hub-facing DEALER stream # internals: + queue = List() # heap-sorted list of Jobs + queue_map = Dict() # dict by msg_id of Jobs (for O(1) access to the Queue) graph = Dict() # dict by msg_id of [ msg_ids that depend on key ] retries = Dict() # dict by msg_id of retries remaining (non-neg ints) # waiting = List() # list of msg_ids ready to run, but haven't due to HWM - depending = Dict() # dict by msg_id of Jobs pending = Dict() # dict by engine_uuid of submitted tasks completed = Dict() # dict by engine_uuid of completed tasks failed = Dict() # dict by engine_uuid of failed tasks @@ -447,11 +453,11 @@ class TaskScheduler(SessionFactory): continue # check valid: if msg_id in dep or dep.difference(self.all_ids): - self.depending[msg_id] = job + self.queue_map[msg_id] = job return self.fail_unreachable(msg_id, error.InvalidDependency) # check if unreachable: if dep.unreachable(self.all_completed, self.all_failed): - self.depending[msg_id] = job + self.queue_map[msg_id] = job return self.fail_unreachable(msg_id) if after.check(self.all_completed, self.all_failed): @@ -467,20 +473,22 @@ class TaskScheduler(SessionFactory): def audit_timeouts(self): """Audit all waiting tasks for expired timeouts.""" now = datetime.now() - for msg_id in self.depending.keys(): + for msg_id in self.queue_map.keys(): # must recheck, in case one failure cascaded to another: - if msg_id in self.depending: - job = self.depending[msg_id] + if msg_id in self.queue_map: + job = self.queue_map[msg_id] if job.timeout and job.timeout < now: self.fail_unreachable(msg_id, error.TaskTimeout) def fail_unreachable(self, msg_id, why=error.ImpossibleDependency): """a task has become unreachable, send a reply with an ImpossibleDependency error.""" - if msg_id not in self.depending: + if msg_id not in self.queue_map: self.log.error("msg %r already failed!", msg_id) return - job = self.depending.pop(msg_id) + job = self.queue_map.pop(msg_id) + # lazy-delete from the queue + job.removed = True for mid in job.dependents: if mid in self.graph: self.graph[mid].remove(msg_id) @@ -549,14 +557,14 @@ class TaskScheduler(SessionFactory): for m in job.follow.intersection(relevant): dests.add(self.destinations[m]) if len(dests) > 1: - self.depending[msg_id] = job + self.queue_map[msg_id] = job self.fail_unreachable(msg_id) return False if job.targets: # check blacklist+targets for impossibility job.targets.difference_update(job.blacklist) if not job.targets or not job.targets.intersection(self.targets): - self.depending[msg_id] = job + self.queue_map[msg_id] = job self.fail_unreachable(msg_id) return False return False @@ -569,7 +577,8 @@ class TaskScheduler(SessionFactory): def save_unmet(self, job): """Save a message for later submission when its dependencies are met.""" msg_id = job.msg_id - self.depending[msg_id] = job + self.queue_map[msg_id] = job + heapq.heappush(self.queue, job) # track the ids in follow or after, but not those already finished for dep_id in job.after.union(job.follow).difference(self.all_done): if dep_id not in self.graph: @@ -672,7 +681,7 @@ class TaskScheduler(SessionFactory): job.blacklist.add(engine) if job.blacklist == job.targets: - self.depending[msg_id] = job + self.queue_map[msg_id] = job self.fail_unreachable(msg_id) elif not self.maybe_run(job): # resubmit failed @@ -698,31 +707,42 @@ class TaskScheduler(SessionFactory): # print ("\n\n***********") # pprint (dep_id) # pprint (self.graph) - # pprint (self.depending) + # pprint (self.queue_map) # pprint (self.all_completed) # pprint (self.all_failed) # print ("\n\n***********\n\n") # update any jobs that depended on the dependency - jobs = self.graph.pop(dep_id, []) + msg_ids = self.graph.pop(dep_id, []) # recheck *all* jobs if # a) we have HWM and an engine just become no longer full # or b) dep_id was given as None if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]): - jobs = self.depending.keys() + jobs = self.queue + using_queue = True + else: + using_queue = False + jobs = heapq.heapify([ self.queue_map[msg_id] for msg_id in msg_ids ]) - for msg_id in sorted(jobs, key=lambda msg_id: self.depending[msg_id].timestamp): - job = self.depending[msg_id] - + to_restore = [] + while jobs: + job = heapq.heappop(jobs) + if job.removed: + continue + msg_id = job.msg_id + + put_it_back = True + if job.after.unreachable(self.all_completed, self.all_failed)\ or job.follow.unreachable(self.all_completed, self.all_failed): self.fail_unreachable(msg_id) + put_it_back = False elif job.after.check(self.all_completed, self.all_failed): # time deps met, maybe run if self.maybe_run(job): - - self.depending.pop(msg_id) + put_it_back = False + self.queue_map.pop(msg_id) for mid in job.dependents: if mid in self.graph: self.graph[mid].remove(msg_id) @@ -733,7 +753,17 @@ class TaskScheduler(SessionFactory): # non-full, and all tasks after the first are checked, # even though they can't run. if not self.available_engines(): - return + break + + if using_queue and put_it_back: + # popped a job from the queue but it neither ran nor failed, + # so we need to put it back when we are done + to_restore.append(job) + + # put back any tasks we popped but didn't run + for job in to_restore: + heapq.heappush(self.queue, job) + #---------------------------------------------------------------------- # methods to be overridden by subclasses