diff --git a/IPython/parallel/controller/scheduler.py b/IPython/parallel/controller/scheduler.py index c0c8f1c..7e2b2cb 100644 --- a/IPython/parallel/controller/scheduler.py +++ b/IPython/parallel/controller/scheduler.py @@ -371,7 +371,7 @@ class TaskScheduler(SessionFactory): # Job Submission #----------------------------------------------------------------------- - + @util.log_errors def dispatch_submission(self, raw_msg): """Dispatch job submission to appropriate handlers.""" @@ -499,11 +499,22 @@ class TaskScheduler(SessionFactory): self.update_graph(msg_id, success=False) + def available_engines(self): + """return a list of available engine indices based on HWM""" + if not self.hwm: + return range(len(self.targets)) + available = [] + for idx in range(len(self.targets)): + if self.loads[idx] < self.hwm: + available.append(idx) + return available + def maybe_run(self, job): """check location dependencies, and run if they are met.""" msg_id = job.msg_id self.log.debug("Attempting to assign task %s", msg_id) - if not self.targets: + available = self.available_engines() + if not available: # no engines, definitely can't run return False @@ -523,7 +534,7 @@ class TaskScheduler(SessionFactory): # check follow return job.follow.check(self.completed[target], self.failed[target]) - indices = filter(can_run, range(len(self.targets))) + indices = filter(can_run, available) if not indices: # couldn't run @@ -678,14 +689,11 @@ class TaskScheduler(SessionFactory): if self.loads[idx] == self.hwm-1: self.update_graph(None) - - def update_graph(self, dep_id=None, success=True): """dep_id just finished. Update our dependency - graph and submit any jobs that just became runable. + graph and submit any jobs that just became runnable. - Called with dep_id=None to update entire graph for hwm, but without finishing - a task. + Called with dep_id=None to update entire graph for hwm, but without finishing a task. """ # print ("\n\n***********") # pprint (dep_id) @@ -718,6 +726,14 @@ class TaskScheduler(SessionFactory): for mid in job.dependents: if mid in self.graph: self.graph[mid].remove(msg_id) + + # abort the loop if we just filled up all of our engines. + # avoids an O(N) operation in situation of full queue, + # where graph update is triggered as soon as an engine becomes + # non-full, and all tasks after the first are checked, + # even though they can't run. + if not self.available_engines(): + return #---------------------------------------------------------------------- # methods to be overridden by subclasses