diff --git a/IPython/parallel/controller/scheduler.py b/IPython/parallel/controller/scheduler.py index 61fa37d..6fca9e4 100644 --- a/IPython/parallel/controller/scheduler.py +++ b/IPython/parallel/controller/scheduler.py @@ -19,12 +19,12 @@ Authors: # Imports #---------------------------------------------------------------------- -import heapq import logging import sys import time -from datetime import datetime, timedelta +from collections import deque +from datetime import datetime from random import randint, random from types import FunctionType @@ -140,7 +140,7 @@ class Job(object): self.after = after self.follow = follow self.timeout = timeout - self.removed = False # used for lazy-delete in heap-sorted queue + self.removed = False # used for lazy-delete from sorted queue self.timestamp = time.time() self.blacklist = set() @@ -199,7 +199,9 @@ class TaskScheduler(SessionFactory): query_stream = Instance(zmqstream.ZMQStream) # hub-facing DEALER stream # internals: - queue = List() # heap-sorted list of Jobs + queue = Instance(deque) # sorted list of Jobs + def _queue_default(self): + return deque() queue_map = Dict() # dict by msg_id of Jobs (for O(1) access to the Queue) graph = Dict() # dict by msg_id of [ msg_ids that depend on key ] retries = Dict() # dict by msg_id of retries remaining (non-neg ints) @@ -472,7 +474,8 @@ class TaskScheduler(SessionFactory): The job may or may not have been run at this point. """ - if job.timeout >= (time.time() + 1): + now = time.time() + if job.timeout >= (now + 1): self.log.warn("task %s timeout fired prematurely: %s > %s", job.msg_id, job.timeout, now ) @@ -581,7 +584,7 @@ class TaskScheduler(SessionFactory): msg_id = job.msg_id self.log.debug("Adding task %s to the queue", msg_id) self.queue_map[msg_id] = job - heapq.heappush(self.queue, job) + self.queue.append(job) # track the ids in follow or after, but not those already finished for dep_id in job.after.union(job.follow).difference(self.all_done): if dep_id not in self.graph: @@ -726,11 +729,11 @@ class TaskScheduler(SessionFactory): using_queue = True else: using_queue = False - jobs = heapq.heapify([ self.queue_map[msg_id] for msg_id in msg_ids ]) + jobs = deque(sorted( self.queue_map[msg_id] for msg_id in msg_ids )) to_restore = [] while jobs: - job = heapq.heappop(jobs) + job = jobs.popleft() if job.removed: continue msg_id = job.msg_id @@ -761,13 +764,13 @@ class TaskScheduler(SessionFactory): if using_queue and put_it_back: # popped a job from the queue but it neither ran nor failed, # so we need to put it back when we are done + # make sure to_restore preserves the same ordering to_restore.append(job) # put back any tasks we popped but didn't run - for job in to_restore: - heapq.heappush(self.queue, job) - - + if using_queue: + self.queue.extendleft(to_restore) + #---------------------------------------------------------------------- # methods to be overridden by subclasses #----------------------------------------------------------------------