Show More
@@ -19,12 +19,12 b' Authors:' | |||||
19 | # Imports |
|
19 | # Imports | |
20 | #---------------------------------------------------------------------- |
|
20 | #---------------------------------------------------------------------- | |
21 |
|
21 | |||
22 | import heapq |
|
|||
23 | import logging |
|
22 | import logging | |
24 | import sys |
|
23 | import sys | |
25 | import time |
|
24 | import time | |
26 |
|
25 | |||
27 | from datetime import datetime, timedelta |
|
26 | from collections import deque | |
|
27 | from datetime import datetime | |||
28 | from random import randint, random |
|
28 | from random import randint, random | |
29 | from types import FunctionType |
|
29 | from types import FunctionType | |
30 |
|
30 | |||
@@ -140,7 +140,7 b' class Job(object):' | |||||
140 | self.after = after |
|
140 | self.after = after | |
141 | self.follow = follow |
|
141 | self.follow = follow | |
142 | self.timeout = timeout |
|
142 | self.timeout = timeout | |
143 |
self.removed = False # used for lazy-delete |
|
143 | self.removed = False # used for lazy-delete from sorted queue | |
144 |
|
144 | |||
145 | self.timestamp = time.time() |
|
145 | self.timestamp = time.time() | |
146 | self.blacklist = set() |
|
146 | self.blacklist = set() | |
@@ -199,7 +199,9 b' class TaskScheduler(SessionFactory):' | |||||
199 | query_stream = Instance(zmqstream.ZMQStream) # hub-facing DEALER stream |
|
199 | query_stream = Instance(zmqstream.ZMQStream) # hub-facing DEALER stream | |
200 |
|
200 | |||
201 | # internals: |
|
201 | # internals: | |
202 |
queue = |
|
202 | queue = Instance(deque) # sorted list of Jobs | |
|
203 | def _queue_default(self): | |||
|
204 | return deque() | |||
203 | queue_map = Dict() # dict by msg_id of Jobs (for O(1) access to the Queue) |
|
205 | queue_map = Dict() # dict by msg_id of Jobs (for O(1) access to the Queue) | |
204 | graph = Dict() # dict by msg_id of [ msg_ids that depend on key ] |
|
206 | graph = Dict() # dict by msg_id of [ msg_ids that depend on key ] | |
205 | retries = Dict() # dict by msg_id of retries remaining (non-neg ints) |
|
207 | retries = Dict() # dict by msg_id of retries remaining (non-neg ints) | |
@@ -472,7 +474,8 b' class TaskScheduler(SessionFactory):' | |||||
472 |
|
474 | |||
473 | The job may or may not have been run at this point. |
|
475 | The job may or may not have been run at this point. | |
474 | """ |
|
476 | """ | |
475 |
|
|
477 | now = time.time() | |
|
478 | if job.timeout >= (now + 1): | |||
476 | self.log.warn("task %s timeout fired prematurely: %s > %s", |
|
479 | self.log.warn("task %s timeout fired prematurely: %s > %s", | |
477 | job.msg_id, job.timeout, now |
|
480 | job.msg_id, job.timeout, now | |
478 | ) |
|
481 | ) | |
@@ -581,7 +584,7 b' class TaskScheduler(SessionFactory):' | |||||
581 | msg_id = job.msg_id |
|
584 | msg_id = job.msg_id | |
582 | self.log.debug("Adding task %s to the queue", msg_id) |
|
585 | self.log.debug("Adding task %s to the queue", msg_id) | |
583 | self.queue_map[msg_id] = job |
|
586 | self.queue_map[msg_id] = job | |
584 |
|
|
587 | self.queue.append(job) | |
585 | # track the ids in follow or after, but not those already finished |
|
588 | # track the ids in follow or after, but not those already finished | |
586 | for dep_id in job.after.union(job.follow).difference(self.all_done): |
|
589 | for dep_id in job.after.union(job.follow).difference(self.all_done): | |
587 | if dep_id not in self.graph: |
|
590 | if dep_id not in self.graph: | |
@@ -726,11 +729,11 b' class TaskScheduler(SessionFactory):' | |||||
726 | using_queue = True |
|
729 | using_queue = True | |
727 | else: |
|
730 | else: | |
728 | using_queue = False |
|
731 | using_queue = False | |
729 |
jobs = |
|
732 | jobs = deque(sorted( self.queue_map[msg_id] for msg_id in msg_ids )) | |
730 |
|
733 | |||
731 | to_restore = [] |
|
734 | to_restore = [] | |
732 | while jobs: |
|
735 | while jobs: | |
733 |
job = |
|
736 | job = jobs.popleft() | |
734 | if job.removed: |
|
737 | if job.removed: | |
735 | continue |
|
738 | continue | |
736 | msg_id = job.msg_id |
|
739 | msg_id = job.msg_id | |
@@ -761,12 +764,12 b' class TaskScheduler(SessionFactory):' | |||||
761 | if using_queue and put_it_back: |
|
764 | if using_queue and put_it_back: | |
762 | # popped a job from the queue but it neither ran nor failed, |
|
765 | # popped a job from the queue but it neither ran nor failed, | |
763 | # so we need to put it back when we are done |
|
766 | # so we need to put it back when we are done | |
|
767 | # make sure to_restore preserves the same ordering | |||
764 | to_restore.append(job) |
|
768 | to_restore.append(job) | |
765 |
|
769 | |||
766 | # put back any tasks we popped but didn't run |
|
770 | # put back any tasks we popped but didn't run | |
767 |
f |
|
771 | if using_queue: | |
768 | heapq.heappush(self.queue, job) |
|
772 | self.queue.extendleft(to_restore) | |
769 |
|
||||
770 |
|
773 | |||
771 | #---------------------------------------------------------------------- |
|
774 | #---------------------------------------------------------------------- | |
772 | # methods to be overridden by subclasses |
|
775 | # methods to be overridden by subclasses |
General Comments 0
You need to be logged in to leave comments.
Login now