From fa8f1e1db97b0da4cb7959c657cf117b36eb12bb 2013-03-29 19:15:51
From: MinRK <benjaminrk@gmail.com>
Date: 2013-03-29 19:15:51
Subject: [PATCH] use deque instead of heapq
---

diff --git a/IPython/parallel/controller/scheduler.py b/IPython/parallel/controller/scheduler.py
index 61fa37d..6fca9e4 100644
--- a/IPython/parallel/controller/scheduler.py
+++ b/IPython/parallel/controller/scheduler.py
@@ -19,12 +19,12 @@ Authors:
 # Imports
 #----------------------------------------------------------------------
 
-import heapq
 import logging
 import sys
 import time
 
-from datetime import datetime, timedelta
+from collections import deque
+from datetime import datetime
 from random import randint, random
 from types import FunctionType
 
@@ -140,7 +140,7 @@ class Job(object):
         self.after = after
         self.follow = follow
         self.timeout = timeout
-        self.removed = False # used for lazy-delete in heap-sorted queue
+        self.removed = False # used for lazy-delete from sorted queue
         
         self.timestamp = time.time()
         self.blacklist = set()
@@ -199,7 +199,9 @@ class TaskScheduler(SessionFactory):
     query_stream = Instance(zmqstream.ZMQStream) # hub-facing DEALER stream
 
     # internals:
-    queue = List() # heap-sorted list of Jobs
+    queue = Instance(deque) # sorted list of Jobs
+    def _queue_default(self):
+        return deque()
     queue_map = Dict() # dict by msg_id of Jobs (for O(1) access to the Queue)
     graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
     retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
@@ -472,7 +474,8 @@ class TaskScheduler(SessionFactory):
         
         The job may or may not have been run at this point.
         """
-        if job.timeout >= (time.time() + 1):
+        now = time.time()
+        if job.timeout >= (now + 1):
             self.log.warn("task %s timeout fired prematurely: %s > %s",
                 job.msg_id, job.timeout, now
             )
@@ -581,7 +584,7 @@ class TaskScheduler(SessionFactory):
         msg_id = job.msg_id
         self.log.debug("Adding task %s to the queue", msg_id)
         self.queue_map[msg_id] = job
-        heapq.heappush(self.queue, job)
+        self.queue.append(job)
         # track the ids in follow or after, but not those already finished
         for dep_id in job.after.union(job.follow).difference(self.all_done):
             if dep_id not in self.graph:
@@ -726,11 +729,11 @@ class TaskScheduler(SessionFactory):
             using_queue = True
         else:
             using_queue = False
-            jobs = heapq.heapify([ self.queue_map[msg_id] for msg_id in msg_ids ])
+            jobs = deque(sorted( self.queue_map[msg_id] for msg_id in msg_ids ))
         
         to_restore = []
         while jobs:
-            job = heapq.heappop(jobs)
+            job = jobs.popleft()
             if job.removed:
                 continue
             msg_id = job.msg_id
@@ -761,13 +764,13 @@ class TaskScheduler(SessionFactory):
             if using_queue and put_it_back:
                 # popped a job from the queue but it neither ran nor failed,
                 # so we need to put it back when we are done
+                # make sure to_restore preserves the same ordering
                 to_restore.append(job)
         
         # put back any tasks we popped but didn't run
-        for job in to_restore:
-            heapq.heappush(self.queue, job)
-        
-
+        if using_queue:
+            self.queue.extendleft(to_restore)
+    
     #----------------------------------------------------------------------
     # methods to be overridden by subclasses
     #----------------------------------------------------------------------