Show More
@@ -19,12 +19,12 b' Authors:' | |||
|
19 | 19 | # Imports |
|
20 | 20 | #---------------------------------------------------------------------- |
|
21 | 21 | |
|
22 | import heapq | |
|
23 | 22 | import logging |
|
24 | 23 | import sys |
|
25 | 24 | import time |
|
26 | 25 | |
|
27 | from datetime import datetime, timedelta | |
|
26 | from collections import deque | |
|
27 | from datetime import datetime | |
|
28 | 28 | from random import randint, random |
|
29 | 29 | from types import FunctionType |
|
30 | 30 | |
@@ -140,7 +140,7 b' class Job(object):' | |||
|
140 | 140 | self.after = after |
|
141 | 141 | self.follow = follow |
|
142 | 142 | self.timeout = timeout |
|
143 |
self.removed = False # used for lazy-delete |
|
|
143 | self.removed = False # used for lazy-delete from sorted queue | |
|
144 | 144 | |
|
145 | 145 | self.timestamp = time.time() |
|
146 | 146 | self.blacklist = set() |
@@ -199,7 +199,9 b' class TaskScheduler(SessionFactory):' | |||
|
199 | 199 | query_stream = Instance(zmqstream.ZMQStream) # hub-facing DEALER stream |
|
200 | 200 | |
|
201 | 201 | # internals: |
|
202 |
queue = |
|
|
202 | queue = Instance(deque) # sorted list of Jobs | |
|
203 | def _queue_default(self): | |
|
204 | return deque() | |
|
203 | 205 | queue_map = Dict() # dict by msg_id of Jobs (for O(1) access to the Queue) |
|
204 | 206 | graph = Dict() # dict by msg_id of [ msg_ids that depend on key ] |
|
205 | 207 | retries = Dict() # dict by msg_id of retries remaining (non-neg ints) |
@@ -472,7 +474,8 b' class TaskScheduler(SessionFactory):' | |||
|
472 | 474 | |
|
473 | 475 | The job may or may not have been run at this point. |
|
474 | 476 | """ |
|
475 |
|
|
|
477 | now = time.time() | |
|
478 | if job.timeout >= (now + 1): | |
|
476 | 479 | self.log.warn("task %s timeout fired prematurely: %s > %s", |
|
477 | 480 | job.msg_id, job.timeout, now |
|
478 | 481 | ) |
@@ -581,7 +584,7 b' class TaskScheduler(SessionFactory):' | |||
|
581 | 584 | msg_id = job.msg_id |
|
582 | 585 | self.log.debug("Adding task %s to the queue", msg_id) |
|
583 | 586 | self.queue_map[msg_id] = job |
|
584 |
|
|
|
587 | self.queue.append(job) | |
|
585 | 588 | # track the ids in follow or after, but not those already finished |
|
586 | 589 | for dep_id in job.after.union(job.follow).difference(self.all_done): |
|
587 | 590 | if dep_id not in self.graph: |
@@ -726,11 +729,11 b' class TaskScheduler(SessionFactory):' | |||
|
726 | 729 | using_queue = True |
|
727 | 730 | else: |
|
728 | 731 | using_queue = False |
|
729 |
jobs = |
|
|
732 | jobs = deque(sorted( self.queue_map[msg_id] for msg_id in msg_ids )) | |
|
730 | 733 | |
|
731 | 734 | to_restore = [] |
|
732 | 735 | while jobs: |
|
733 |
job = |
|
|
736 | job = jobs.popleft() | |
|
734 | 737 | if job.removed: |
|
735 | 738 | continue |
|
736 | 739 | msg_id = job.msg_id |
@@ -761,13 +764,13 b' class TaskScheduler(SessionFactory):' | |||
|
761 | 764 | if using_queue and put_it_back: |
|
762 | 765 | # popped a job from the queue but it neither ran nor failed, |
|
763 | 766 | # so we need to put it back when we are done |
|
767 | # make sure to_restore preserves the same ordering | |
|
764 | 768 | to_restore.append(job) |
|
765 | 769 | |
|
766 | 770 | # put back any tasks we popped but didn't run |
|
767 |
f |
|
|
768 | heapq.heappush(self.queue, job) | |
|
769 |
|
|
|
770 | ||
|
771 | if using_queue: | |
|
772 | self.queue.extendleft(to_restore) | |
|
773 | ||
|
771 | 774 | #---------------------------------------------------------------------- |
|
772 | 775 | # methods to be overridden by subclasses |
|
773 | 776 | #---------------------------------------------------------------------- |
General Comments 0
You need to be logged in to leave comments.
Login now