##// END OF EJS Templates
use deque instead of heapq
MinRK -
Show More
@@ -19,12 +19,12 b' Authors:'
19 # Imports
19 # Imports
20 #----------------------------------------------------------------------
20 #----------------------------------------------------------------------
21
21
22 import heapq
23 import logging
22 import logging
24 import sys
23 import sys
25 import time
24 import time
26
25
27 from datetime import datetime, timedelta
26 from collections import deque
27 from datetime import datetime
28 from random import randint, random
28 from random import randint, random
29 from types import FunctionType
29 from types import FunctionType
30
30
@@ -140,7 +140,7 b' class Job(object):'
140 self.after = after
140 self.after = after
141 self.follow = follow
141 self.follow = follow
142 self.timeout = timeout
142 self.timeout = timeout
143 self.removed = False # used for lazy-delete in heap-sorted queue
143 self.removed = False # used for lazy-delete from sorted queue
144
144
145 self.timestamp = time.time()
145 self.timestamp = time.time()
146 self.blacklist = set()
146 self.blacklist = set()
@@ -199,7 +199,9 b' class TaskScheduler(SessionFactory):'
199 query_stream = Instance(zmqstream.ZMQStream) # hub-facing DEALER stream
199 query_stream = Instance(zmqstream.ZMQStream) # hub-facing DEALER stream
200
200
201 # internals:
201 # internals:
202 queue = List() # heap-sorted list of Jobs
202 queue = Instance(deque) # sorted list of Jobs
203 def _queue_default(self):
204 return deque()
203 queue_map = Dict() # dict by msg_id of Jobs (for O(1) access to the Queue)
205 queue_map = Dict() # dict by msg_id of Jobs (for O(1) access to the Queue)
204 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
206 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
205 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
207 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
@@ -472,7 +474,8 b' class TaskScheduler(SessionFactory):'
472
474
473 The job may or may not have been run at this point.
475 The job may or may not have been run at this point.
474 """
476 """
475 if job.timeout >= (time.time() + 1):
477 now = time.time()
478 if job.timeout >= (now + 1):
476 self.log.warn("task %s timeout fired prematurely: %s > %s",
479 self.log.warn("task %s timeout fired prematurely: %s > %s",
477 job.msg_id, job.timeout, now
480 job.msg_id, job.timeout, now
478 )
481 )
@@ -581,7 +584,7 b' class TaskScheduler(SessionFactory):'
581 msg_id = job.msg_id
584 msg_id = job.msg_id
582 self.log.debug("Adding task %s to the queue", msg_id)
585 self.log.debug("Adding task %s to the queue", msg_id)
583 self.queue_map[msg_id] = job
586 self.queue_map[msg_id] = job
584 heapq.heappush(self.queue, job)
587 self.queue.append(job)
585 # track the ids in follow or after, but not those already finished
588 # track the ids in follow or after, but not those already finished
586 for dep_id in job.after.union(job.follow).difference(self.all_done):
589 for dep_id in job.after.union(job.follow).difference(self.all_done):
587 if dep_id not in self.graph:
590 if dep_id not in self.graph:
@@ -726,11 +729,11 b' class TaskScheduler(SessionFactory):'
726 using_queue = True
729 using_queue = True
727 else:
730 else:
728 using_queue = False
731 using_queue = False
729 jobs = heapq.heapify([ self.queue_map[msg_id] for msg_id in msg_ids ])
732 jobs = deque(sorted( self.queue_map[msg_id] for msg_id in msg_ids ))
730
733
731 to_restore = []
734 to_restore = []
732 while jobs:
735 while jobs:
733 job = heapq.heappop(jobs)
736 job = jobs.popleft()
734 if job.removed:
737 if job.removed:
735 continue
738 continue
736 msg_id = job.msg_id
739 msg_id = job.msg_id
@@ -761,12 +764,12 b' class TaskScheduler(SessionFactory):'
761 if using_queue and put_it_back:
764 if using_queue and put_it_back:
762 # popped a job from the queue but it neither ran nor failed,
765 # popped a job from the queue but it neither ran nor failed,
763 # so we need to put it back when we are done
766 # so we need to put it back when we are done
767 # make sure to_restore preserves the same ordering
764 to_restore.append(job)
768 to_restore.append(job)
765
769
766 # put back any tasks we popped but didn't run
770 # put back any tasks we popped but didn't run
767 for job in to_restore:
771 if using_queue:
768 heapq.heappush(self.queue, job)
772 self.queue.extendleft(to_restore)
769
770
773
771 #----------------------------------------------------------------------
774 #----------------------------------------------------------------------
772 # methods to be overridden by subclasses
775 # methods to be overridden by subclasses
General Comments 0
You need to be logged in to leave comments. Login now