##// END OF EJS Templates
use deque instead of heapq
MinRK -
Show More
@@ -19,12 +19,12 b' Authors:'
19 19 # Imports
20 20 #----------------------------------------------------------------------
21 21
22 import heapq
23 22 import logging
24 23 import sys
25 24 import time
26 25
27 from datetime import datetime, timedelta
26 from collections import deque
27 from datetime import datetime
28 28 from random import randint, random
29 29 from types import FunctionType
30 30
@@ -140,7 +140,7 b' class Job(object):'
140 140 self.after = after
141 141 self.follow = follow
142 142 self.timeout = timeout
143 self.removed = False # used for lazy-delete in heap-sorted queue
143 self.removed = False # used for lazy-delete from sorted queue
144 144
145 145 self.timestamp = time.time()
146 146 self.blacklist = set()
@@ -199,7 +199,9 b' class TaskScheduler(SessionFactory):'
199 199 query_stream = Instance(zmqstream.ZMQStream) # hub-facing DEALER stream
200 200
201 201 # internals:
202 queue = List() # heap-sorted list of Jobs
202 queue = Instance(deque) # sorted list of Jobs
203 def _queue_default(self):
204 return deque()
203 205 queue_map = Dict() # dict by msg_id of Jobs (for O(1) access to the Queue)
204 206 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
205 207 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
@@ -472,7 +474,8 b' class TaskScheduler(SessionFactory):'
472 474
473 475 The job may or may not have been run at this point.
474 476 """
475 if job.timeout >= (time.time() + 1):
477 now = time.time()
478 if job.timeout >= (now + 1):
476 479 self.log.warn("task %s timeout fired prematurely: %s > %s",
477 480 job.msg_id, job.timeout, now
478 481 )
@@ -581,7 +584,7 b' class TaskScheduler(SessionFactory):'
581 584 msg_id = job.msg_id
582 585 self.log.debug("Adding task %s to the queue", msg_id)
583 586 self.queue_map[msg_id] = job
584 heapq.heappush(self.queue, job)
587 self.queue.append(job)
585 588 # track the ids in follow or after, but not those already finished
586 589 for dep_id in job.after.union(job.follow).difference(self.all_done):
587 590 if dep_id not in self.graph:
@@ -726,11 +729,11 b' class TaskScheduler(SessionFactory):'
726 729 using_queue = True
727 730 else:
728 731 using_queue = False
729 jobs = heapq.heapify([ self.queue_map[msg_id] for msg_id in msg_ids ])
732 jobs = deque(sorted( self.queue_map[msg_id] for msg_id in msg_ids ))
730 733
731 734 to_restore = []
732 735 while jobs:
733 job = heapq.heappop(jobs)
736 job = jobs.popleft()
734 737 if job.removed:
735 738 continue
736 739 msg_id = job.msg_id
@@ -761,13 +764,13 b' class TaskScheduler(SessionFactory):'
761 764 if using_queue and put_it_back:
762 765 # popped a job from the queue but it neither ran nor failed,
763 766 # so we need to put it back when we are done
767 # make sure to_restore preserves the same ordering
764 768 to_restore.append(job)
765 769
766 770 # put back any tasks we popped but didn't run
767 for job in to_restore:
768 heapq.heappush(self.queue, job)
769
770
771 if using_queue:
772 self.queue.extendleft(to_restore)
773
771 774 #----------------------------------------------------------------------
772 775 # methods to be overridden by subclasses
773 776 #----------------------------------------------------------------------
General Comments 0
You need to be logged in to leave comments. Login now