Show More
@@ -71,11 +71,11 b' c = get_config()' | |||
|
71 | 71 | # dying engines, dependencies, or engine-subset load-balancing. |
|
72 | 72 | # c.ControllerFactory.scheme = 'pure' |
|
73 | 73 | |
|
74 |
# The |
|
|
75 |
# by using |
|
|
74 | # The Python scheduler can limit the number of outstanding tasks per engine | |
|
75 | # by using an HWM option. This allows engines with long-running tasks | |
|
76 | 76 | # to not steal too many tasks from other engines. The default is 0, which |
|
77 | 77 | # means agressively distribute messages, never waiting for them to finish. |
|
78 | # c.ControllerFactory.hwm = 1 | |
|
78 | # c.TaskScheduler.hwm = 0 | |
|
79 | 79 | |
|
80 | 80 | # Whether to use Threads or Processes to start the Schedulers. Threads will |
|
81 | 81 | # use less resources, but potentially reduce throughput. Default is to |
@@ -35,7 +35,7 b' from zmq.eventloop import ioloop, zmqstream' | |||
|
35 | 35 | # local imports |
|
36 | 36 | from IPython.external.decorator import decorator |
|
37 | 37 | from IPython.config.loader import Config |
|
38 | from IPython.utils.traitlets import Instance, Dict, List, Set | |
|
38 | from IPython.utils.traitlets import Instance, Dict, List, Set, Int | |
|
39 | 39 | |
|
40 | 40 | from IPython.parallel import error |
|
41 | 41 | from IPython.parallel.factory import SessionFactory |
@@ -126,6 +126,8 b' class TaskScheduler(SessionFactory):' | |||
|
126 | 126 | |
|
127 | 127 | """ |
|
128 | 128 | |
|
129 | hwm = Int(0, config=True) # limit number of outstanding tasks | |
|
130 | ||
|
129 | 131 | # input arguments: |
|
130 | 132 | scheme = Instance(FunctionType, default=leastload) # function for determining the destination |
|
131 | 133 | client_stream = Instance(zmqstream.ZMQStream) # client-facing stream |
@@ -135,6 +137,7 b' class TaskScheduler(SessionFactory):' | |||
|
135 | 137 | |
|
136 | 138 | # internals: |
|
137 | 139 | graph = Dict() # dict by msg_id of [ msg_ids that depend on key ] |
|
140 | # waiting = List() # list of msg_ids ready to run, but haven't due to HWM | |
|
138 | 141 | depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow) |
|
139 | 142 | pending = Dict() # dict by engine_uuid of submitted tasks |
|
140 | 143 | completed = Dict() # dict by engine_uuid of completed tasks |
@@ -143,6 +146,7 b' class TaskScheduler(SessionFactory):' | |||
|
143 | 146 | clients = Dict() # dict by msg_id for who submitted the task |
|
144 | 147 | targets = List() # list of target IDENTs |
|
145 | 148 | loads = List() # list of engine loads |
|
149 | # full = Set() # set of IDENTs that have HWM outstanding tasks | |
|
146 | 150 | all_completed = Set() # set of all completed tasks |
|
147 | 151 | all_failed = Set() # set of all failed tasks |
|
148 | 152 | all_done = Set() # set of all finished tasks=union(completed,failed) |
@@ -216,7 +220,6 b' class TaskScheduler(SessionFactory):' | |||
|
216 | 220 | # don't pop destinations, because it might be used later |
|
217 | 221 | # map(self.destinations.pop, self.completed.pop(uid)) |
|
218 | 222 | # map(self.destinations.pop, self.failed.pop(uid)) |
|
219 | ||
|
220 | 223 | idx = self.targets.index(uid) |
|
221 | 224 | self.targets.pop(idx) |
|
222 | 225 | self.loads.pop(idx) |
@@ -261,7 +264,7 b' class TaskScheduler(SessionFactory):' | |||
|
261 | 264 | try: |
|
262 | 265 | idents, msg = self.session.feed_identities(raw_msg, copy=False) |
|
263 | 266 | msg = self.session.unpack_message(msg, content=False, copy=False) |
|
264 | except: | |
|
267 | except Exception: | |
|
265 | 268 | self.log.error("task::Invaid task: %s"%raw_msg, exc_info=True) |
|
266 | 269 | return |
|
267 | 270 | |
@@ -362,16 +365,19 b' class TaskScheduler(SessionFactory):' | |||
|
362 | 365 | def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout): |
|
363 | 366 | """check location dependencies, and run if they are met.""" |
|
364 | 367 | blacklist = self.blacklist.setdefault(msg_id, set()) |
|
365 | if follow or targets or blacklist: | |
|
368 | if follow or targets or blacklist or self.hwm: | |
|
366 | 369 | # we need a can_run filter |
|
367 | 370 | def can_run(idx): |
|
368 | target = self.targets[idx] | |
|
369 | # check targets | |
|
370 | if targets and target not in targets: | |
|
371 | # check hwm | |
|
372 | if self.loads[idx] == self.hwm: | |
|
371 | 373 | return False |
|
374 | target = self.targets[idx] | |
|
372 | 375 | # check blacklist |
|
373 | 376 | if target in blacklist: |
|
374 | 377 | return False |
|
378 | # check targets | |
|
379 | if targets and target not in targets: | |
|
380 | return False | |
|
375 | 381 | # check follow |
|
376 | 382 | return follow.check(self.completed[target], self.failed[target]) |
|
377 | 383 | |
@@ -426,14 +432,18 b' class TaskScheduler(SessionFactory):' | |||
|
426 | 432 | idx = indices[idx] |
|
427 | 433 | target = self.targets[idx] |
|
428 | 434 | # print (target, map(str, msg[:3])) |
|
435 | # send job to the engine | |
|
429 | 436 | self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False) |
|
430 | 437 | self.engine_stream.send_multipart(raw_msg, copy=False) |
|
438 | # update load | |
|
431 | 439 | self.add_job(idx) |
|
432 | 440 | self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout) |
|
441 | # notify Hub | |
|
433 | 442 | content = dict(msg_id=msg_id, engine_id=target) |
|
434 | 443 | self.session.send(self.mon_stream, 'task_destination', content=content, |
|
435 | 444 | ident=['tracktask',self.session.session]) |
|
436 | 445 | |
|
446 | ||
|
437 | 447 | #----------------------------------------------------------------------- |
|
438 | 448 | # Result Handling |
|
439 | 449 | #----------------------------------------------------------------------- |
@@ -443,7 +453,10 b' class TaskScheduler(SessionFactory):' | |||
|
443 | 453 | try: |
|
444 | 454 | idents,msg = self.session.feed_identities(raw_msg, copy=False) |
|
445 | 455 | msg = self.session.unpack_message(msg, content=False, copy=False) |
|
446 | except: | |
|
456 | engine = idents[0] | |
|
457 | idx = self.targets.index(engine) | |
|
458 | self.finish_job(idx) | |
|
459 | except Exception: | |
|
447 | 460 | self.log.error("task::Invaid result: %s"%raw_msg, exc_info=True) |
|
448 | 461 | return |
|
449 | 462 | |
@@ -496,17 +509,26 b' class TaskScheduler(SessionFactory):' | |||
|
496 | 509 | |
|
497 | 510 | if self.blacklist[msg_id] == targets: |
|
498 | 511 | self.depending[msg_id] = args |
|
499 |
|
|
|
500 | ||
|
512 | self.fail_unreachable(msg_id) | |
|
501 | 513 | elif not self.maybe_run(msg_id, *args): |
|
502 | 514 | # resubmit failed, put it back in our dependency tree |
|
503 | 515 | self.save_unmet(msg_id, *args) |
|
504 | 516 | |
|
517 | if self.hwm: | |
|
518 | idx = self.targets.index(engine) | |
|
519 | if self.loads[idx] == self.hwm-1: | |
|
520 | self.update_graph(None) | |
|
521 | ||
|
522 | ||
|
505 | 523 | |
|
506 | 524 | @logged |
|
507 | def update_graph(self, dep_id, success=True): | |
|
525 | def update_graph(self, dep_id=None, success=True): | |
|
508 | 526 | """dep_id just finished. Update our dependency |
|
509 |
graph and submit any jobs that just became runable. |
|
|
527 | graph and submit any jobs that just became runable. | |
|
528 | ||
|
529 | Called with dep_id=None to update graph for hwm, but without finishing | |
|
530 | a task. | |
|
531 | """ | |
|
510 | 532 | # print ("\n\n***********") |
|
511 | 533 | # pprint (dep_id) |
|
512 | 534 | # pprint (self.graph) |
@@ -514,9 +536,12 b' class TaskScheduler(SessionFactory):' | |||
|
514 | 536 | # pprint (self.all_completed) |
|
515 | 537 | # pprint (self.all_failed) |
|
516 | 538 | # print ("\n\n***********\n\n") |
|
517 | if dep_id not in self.graph: | |
|
518 | return | |
|
519 | jobs = self.graph.pop(dep_id) | |
|
539 | # update any jobs that depended on the dependency | |
|
540 | jobs = self.graph.pop(dep_id, []) | |
|
541 | # if we have HWM and an engine just become no longer full | |
|
542 | # recheck *all* jobs: | |
|
543 | if self.hwm and any( [ load==self.hwm-1 for load in self.loads]): | |
|
544 | jobs = self.depending.keys() | |
|
520 | 545 | |
|
521 | 546 | for msg_id in jobs: |
|
522 | 547 | raw_msg, targets, after, follow, timeout = self.depending[msg_id] |
General Comments 0
You need to be logged in to leave comments.
Login now