Show More
@@ -371,7 +371,7 b' class TaskScheduler(SessionFactory):' | |||||
371 | # Job Submission |
|
371 | # Job Submission | |
372 | #----------------------------------------------------------------------- |
|
372 | #----------------------------------------------------------------------- | |
373 |
|
373 | |||
374 |
|
374 | |||
375 | @util.log_errors |
|
375 | @util.log_errors | |
376 | def dispatch_submission(self, raw_msg): |
|
376 | def dispatch_submission(self, raw_msg): | |
377 | """Dispatch job submission to appropriate handlers.""" |
|
377 | """Dispatch job submission to appropriate handlers.""" | |
@@ -499,11 +499,22 b' class TaskScheduler(SessionFactory):' | |||||
499 |
|
499 | |||
500 | self.update_graph(msg_id, success=False) |
|
500 | self.update_graph(msg_id, success=False) | |
501 |
|
501 | |||
|
502 | def available_engines(self): | |||
|
503 | """return a list of available engine indices based on HWM""" | |||
|
504 | if not self.hwm: | |||
|
505 | return range(len(self.targets)) | |||
|
506 | available = [] | |||
|
507 | for idx in range(len(self.targets)): | |||
|
508 | if self.loads[idx] < self.hwm: | |||
|
509 | available.append(idx) | |||
|
510 | return available | |||
|
511 | ||||
502 | def maybe_run(self, job): |
|
512 | def maybe_run(self, job): | |
503 | """check location dependencies, and run if they are met.""" |
|
513 | """check location dependencies, and run if they are met.""" | |
504 | msg_id = job.msg_id |
|
514 | msg_id = job.msg_id | |
505 | self.log.debug("Attempting to assign task %s", msg_id) |
|
515 | self.log.debug("Attempting to assign task %s", msg_id) | |
506 | if not self.targets: |
|
516 | available = self.available_engines() | |
|
517 | if not available: | |||
507 | # no engines, definitely can't run |
|
518 | # no engines, definitely can't run | |
508 | return False |
|
519 | return False | |
509 |
|
520 | |||
@@ -523,7 +534,7 b' class TaskScheduler(SessionFactory):' | |||||
523 | # check follow |
|
534 | # check follow | |
524 | return job.follow.check(self.completed[target], self.failed[target]) |
|
535 | return job.follow.check(self.completed[target], self.failed[target]) | |
525 |
|
536 | |||
526 |
indices = filter(can_run, |
|
537 | indices = filter(can_run, available) | |
527 |
|
538 | |||
528 | if not indices: |
|
539 | if not indices: | |
529 | # couldn't run |
|
540 | # couldn't run | |
@@ -678,14 +689,11 b' class TaskScheduler(SessionFactory):' | |||||
678 | if self.loads[idx] == self.hwm-1: |
|
689 | if self.loads[idx] == self.hwm-1: | |
679 | self.update_graph(None) |
|
690 | self.update_graph(None) | |
680 |
|
691 | |||
681 |
|
||||
682 |
|
||||
683 | def update_graph(self, dep_id=None, success=True): |
|
692 | def update_graph(self, dep_id=None, success=True): | |
684 | """dep_id just finished. Update our dependency |
|
693 | """dep_id just finished. Update our dependency | |
685 | graph and submit any jobs that just became runable. |
|
694 | graph and submit any jobs that just became runnable. | |
686 |
|
695 | |||
687 | Called with dep_id=None to update entire graph for hwm, but without finishing |
|
696 | Called with dep_id=None to update entire graph for hwm, but without finishing a task. | |
688 | a task. |
|
|||
689 | """ |
|
697 | """ | |
690 | # print ("\n\n***********") |
|
698 | # print ("\n\n***********") | |
691 | # pprint (dep_id) |
|
699 | # pprint (dep_id) | |
@@ -718,6 +726,14 b' class TaskScheduler(SessionFactory):' | |||||
718 | for mid in job.dependents: |
|
726 | for mid in job.dependents: | |
719 | if mid in self.graph: |
|
727 | if mid in self.graph: | |
720 | self.graph[mid].remove(msg_id) |
|
728 | self.graph[mid].remove(msg_id) | |
|
729 | ||||
|
730 | # abort the loop if we just filled up all of our engines. | |||
|
731 | # avoids an O(N) operation in situation of full queue, | |||
|
732 | # where graph update is triggered as soon as an engine becomes | |||
|
733 | # non-full, and all tasks after the first are checked, | |||
|
734 | # even though they can't run. | |||
|
735 | if not self.available_engines(): | |||
|
736 | return | |||
721 |
|
737 | |||
722 | #---------------------------------------------------------------------- |
|
738 | #---------------------------------------------------------------------- | |
723 | # methods to be overridden by subclasses |
|
739 | # methods to be overridden by subclasses |
General Comments 0
You need to be logged in to leave comments.
Login now