Show More
@@ -371,7 +371,7 b' class TaskScheduler(SessionFactory):' | |||
|
371 | 371 | # Job Submission |
|
372 | 372 | #----------------------------------------------------------------------- |
|
373 | 373 | |
|
374 | ||
|
374 | ||
|
375 | 375 | @util.log_errors |
|
376 | 376 | def dispatch_submission(self, raw_msg): |
|
377 | 377 | """Dispatch job submission to appropriate handlers.""" |
@@ -499,11 +499,22 b' class TaskScheduler(SessionFactory):' | |||
|
499 | 499 | |
|
500 | 500 | self.update_graph(msg_id, success=False) |
|
501 | 501 | |
|
502 | def available_engines(self): | |
|
503 | """return a list of available engine indices based on HWM""" | |
|
504 | if not self.hwm: | |
|
505 | return range(len(self.targets)) | |
|
506 | available = [] | |
|
507 | for idx in range(len(self.targets)): | |
|
508 | if self.loads[idx] < self.hwm: | |
|
509 | available.append(idx) | |
|
510 | return available | |
|
511 | ||
|
502 | 512 | def maybe_run(self, job): |
|
503 | 513 | """check location dependencies, and run if they are met.""" |
|
504 | 514 | msg_id = job.msg_id |
|
505 | 515 | self.log.debug("Attempting to assign task %s", msg_id) |
|
506 | if not self.targets: | |
|
516 | available = self.available_engines() | |
|
517 | if not available: | |
|
507 | 518 | # no engines, definitely can't run |
|
508 | 519 | return False |
|
509 | 520 | |
@@ -523,7 +534,7 b' class TaskScheduler(SessionFactory):' | |||
|
523 | 534 | # check follow |
|
524 | 535 | return job.follow.check(self.completed[target], self.failed[target]) |
|
525 | 536 | |
|
526 |
indices = filter(can_run, |
|
|
537 | indices = filter(can_run, available) | |
|
527 | 538 | |
|
528 | 539 | if not indices: |
|
529 | 540 | # couldn't run |
@@ -678,14 +689,11 b' class TaskScheduler(SessionFactory):' | |||
|
678 | 689 | if self.loads[idx] == self.hwm-1: |
|
679 | 690 | self.update_graph(None) |
|
680 | 691 | |
|
681 | ||
|
682 | ||
|
683 | 692 | def update_graph(self, dep_id=None, success=True): |
|
684 | 693 | """dep_id just finished. Update our dependency |
|
685 | graph and submit any jobs that just became runable. | |
|
694 | graph and submit any jobs that just became runnable. | |
|
686 | 695 | |
|
687 | Called with dep_id=None to update entire graph for hwm, but without finishing | |
|
688 | a task. | |
|
696 | Called with dep_id=None to update entire graph for hwm, but without finishing a task. | |
|
689 | 697 | """ |
|
690 | 698 | # print ("\n\n***********") |
|
691 | 699 | # pprint (dep_id) |
@@ -718,6 +726,14 b' class TaskScheduler(SessionFactory):' | |||
|
718 | 726 | for mid in job.dependents: |
|
719 | 727 | if mid in self.graph: |
|
720 | 728 | self.graph[mid].remove(msg_id) |
|
729 | ||
|
730 | # abort the loop if we just filled up all of our engines. | |
|
731 | # avoids an O(N) operation in situation of full queue, | |
|
732 | # where graph update is triggered as soon as an engine becomes | |
|
733 | # non-full, and all tasks after the first are checked, | |
|
734 | # even though they can't run. | |
|
735 | if not self.available_engines(): | |
|
736 | return | |
|
721 | 737 | |
|
722 | 738 | #---------------------------------------------------------------------- |
|
723 | 739 | # methods to be overridden by subclasses |
General Comments 0
You need to be logged in to leave comments.
Login now