##// END OF EJS Templates
check whether all engines are at HWM in a few places...
MinRK -
Show More
@@ -371,7 +371,7 b' class TaskScheduler(SessionFactory):'
371 371 # Job Submission
372 372 #-----------------------------------------------------------------------
373 373
374
374
375 375 @util.log_errors
376 376 def dispatch_submission(self, raw_msg):
377 377 """Dispatch job submission to appropriate handlers."""
@@ -499,11 +499,22 b' class TaskScheduler(SessionFactory):'
499 499
500 500 self.update_graph(msg_id, success=False)
501 501
502 def available_engines(self):
503 """return a list of available engine indices based on HWM"""
504 if not self.hwm:
505 return range(len(self.targets))
506 available = []
507 for idx in range(len(self.targets)):
508 if self.loads[idx] < self.hwm:
509 available.append(idx)
510 return available
511
502 512 def maybe_run(self, job):
503 513 """check location dependencies, and run if they are met."""
504 514 msg_id = job.msg_id
505 515 self.log.debug("Attempting to assign task %s", msg_id)
506 if not self.targets:
516 available = self.available_engines()
517 if not available:
507 518 # no engines, definitely can't run
508 519 return False
509 520
@@ -523,7 +534,7 b' class TaskScheduler(SessionFactory):'
523 534 # check follow
524 535 return job.follow.check(self.completed[target], self.failed[target])
525 536
526 indices = filter(can_run, range(len(self.targets)))
537 indices = filter(can_run, available)
527 538
528 539 if not indices:
529 540 # couldn't run
@@ -678,14 +689,11 b' class TaskScheduler(SessionFactory):'
678 689 if self.loads[idx] == self.hwm-1:
679 690 self.update_graph(None)
680 691
681
682
683 692 def update_graph(self, dep_id=None, success=True):
684 693 """dep_id just finished. Update our dependency
685 graph and submit any jobs that just became runable.
694 graph and submit any jobs that just became runnable.
686 695
687 Called with dep_id=None to update entire graph for hwm, but without finishing
688 a task.
696 Called with dep_id=None to update entire graph for hwm, but without finishing a task.
689 697 """
690 698 # print ("\n\n***********")
691 699 # pprint (dep_id)
@@ -718,6 +726,14 b' class TaskScheduler(SessionFactory):'
718 726 for mid in job.dependents:
719 727 if mid in self.graph:
720 728 self.graph[mid].remove(msg_id)
729
730 # abort the loop if we just filled up all of our engines.
731 # avoids an O(N) operation in situation of full queue,
732 # where graph update is triggered as soon as an engine becomes
733 # non-full, and all tasks after the first are checked,
734 # even though they can't run.
735 if not self.available_engines():
736 return
721 737
722 738 #----------------------------------------------------------------------
723 739 # methods to be overridden by subclasses
General Comments 0
You need to be logged in to leave comments. Login now