##// END OF EJS Templates
check whether all engines are at HWM in a few places...
MinRK -
Show More
@@ -371,7 +371,7 b' class TaskScheduler(SessionFactory):'
371 # Job Submission
371 # Job Submission
372 #-----------------------------------------------------------------------
372 #-----------------------------------------------------------------------
373
373
374
374
375 @util.log_errors
375 @util.log_errors
376 def dispatch_submission(self, raw_msg):
376 def dispatch_submission(self, raw_msg):
377 """Dispatch job submission to appropriate handlers."""
377 """Dispatch job submission to appropriate handlers."""
@@ -499,11 +499,22 b' class TaskScheduler(SessionFactory):'
499
499
500 self.update_graph(msg_id, success=False)
500 self.update_graph(msg_id, success=False)
501
501
502 def available_engines(self):
503 """return a list of available engine indices based on HWM"""
504 if not self.hwm:
505 return range(len(self.targets))
506 available = []
507 for idx in range(len(self.targets)):
508 if self.loads[idx] < self.hwm:
509 available.append(idx)
510 return available
511
502 def maybe_run(self, job):
512 def maybe_run(self, job):
503 """check location dependencies, and run if they are met."""
513 """check location dependencies, and run if they are met."""
504 msg_id = job.msg_id
514 msg_id = job.msg_id
505 self.log.debug("Attempting to assign task %s", msg_id)
515 self.log.debug("Attempting to assign task %s", msg_id)
506 if not self.targets:
516 available = self.available_engines()
517 if not available:
507 # no engines, definitely can't run
518 # no engines, definitely can't run
508 return False
519 return False
509
520
@@ -523,7 +534,7 b' class TaskScheduler(SessionFactory):'
523 # check follow
534 # check follow
524 return job.follow.check(self.completed[target], self.failed[target])
535 return job.follow.check(self.completed[target], self.failed[target])
525
536
526 indices = filter(can_run, range(len(self.targets)))
537 indices = filter(can_run, available)
527
538
528 if not indices:
539 if not indices:
529 # couldn't run
540 # couldn't run
@@ -678,14 +689,11 b' class TaskScheduler(SessionFactory):'
678 if self.loads[idx] == self.hwm-1:
689 if self.loads[idx] == self.hwm-1:
679 self.update_graph(None)
690 self.update_graph(None)
680
691
681
682
683 def update_graph(self, dep_id=None, success=True):
692 def update_graph(self, dep_id=None, success=True):
684 """dep_id just finished. Update our dependency
693 """dep_id just finished. Update our dependency
685 graph and submit any jobs that just became runable.
694 graph and submit any jobs that just became runnable.
686
695
687 Called with dep_id=None to update entire graph for hwm, but without finishing
696 Called with dep_id=None to update entire graph for hwm, but without finishing a task.
688 a task.
689 """
697 """
690 # print ("\n\n***********")
698 # print ("\n\n***********")
691 # pprint (dep_id)
699 # pprint (dep_id)
@@ -718,6 +726,14 b' class TaskScheduler(SessionFactory):'
718 for mid in job.dependents:
726 for mid in job.dependents:
719 if mid in self.graph:
727 if mid in self.graph:
720 self.graph[mid].remove(msg_id)
728 self.graph[mid].remove(msg_id)
729
730 # abort the loop if we just filled up all of our engines.
731 # avoids an O(N) operation in situation of full queue,
732 # where graph update is triggered as soon as an engine becomes
733 # non-full, and all tasks after the first are checked,
734 # even though they can't run.
735 if not self.available_engines():
736 return
721
737
722 #----------------------------------------------------------------------
738 #----------------------------------------------------------------------
723 # methods to be overridden by subclasses
739 # methods to be overridden by subclasses
General Comments 0
You need to be logged in to leave comments. Login now