Show More
@@ -71,11 +71,11 b' c = get_config()' | |||||
71 | # dying engines, dependencies, or engine-subset load-balancing. |
|
71 | # dying engines, dependencies, or engine-subset load-balancing. | |
72 | # c.ControllerFactory.scheme = 'pure' |
|
72 | # c.ControllerFactory.scheme = 'pure' | |
73 |
|
73 | |||
74 |
# The |
|
74 | # The Python scheduler can limit the number of outstanding tasks per engine | |
75 |
# by using |
|
75 | # by using an HWM option. This allows engines with long-running tasks | |
76 | # to not steal too many tasks from other engines. The default is 0, which |
|
76 | # to not steal too many tasks from other engines. The default is 0, which | |
77 | # means agressively distribute messages, never waiting for them to finish. |
|
77 | # means agressively distribute messages, never waiting for them to finish. | |
78 | # c.ControllerFactory.hwm = 1 |
|
78 | # c.TaskScheduler.hwm = 0 | |
79 |
|
79 | |||
80 | # Whether to use Threads or Processes to start the Schedulers. Threads will |
|
80 | # Whether to use Threads or Processes to start the Schedulers. Threads will | |
81 | # use less resources, but potentially reduce throughput. Default is to |
|
81 | # use less resources, but potentially reduce throughput. Default is to |
@@ -35,7 +35,7 b' from zmq.eventloop import ioloop, zmqstream' | |||||
35 | # local imports |
|
35 | # local imports | |
36 | from IPython.external.decorator import decorator |
|
36 | from IPython.external.decorator import decorator | |
37 | from IPython.config.loader import Config |
|
37 | from IPython.config.loader import Config | |
38 | from IPython.utils.traitlets import Instance, Dict, List, Set |
|
38 | from IPython.utils.traitlets import Instance, Dict, List, Set, Int | |
39 |
|
39 | |||
40 | from IPython.parallel import error |
|
40 | from IPython.parallel import error | |
41 | from IPython.parallel.factory import SessionFactory |
|
41 | from IPython.parallel.factory import SessionFactory | |
@@ -126,6 +126,8 b' class TaskScheduler(SessionFactory):' | |||||
126 |
|
126 | |||
127 | """ |
|
127 | """ | |
128 |
|
128 | |||
|
129 | hwm = Int(0, config=True) # limit number of outstanding tasks | |||
|
130 | ||||
129 | # input arguments: |
|
131 | # input arguments: | |
130 | scheme = Instance(FunctionType, default=leastload) # function for determining the destination |
|
132 | scheme = Instance(FunctionType, default=leastload) # function for determining the destination | |
131 | client_stream = Instance(zmqstream.ZMQStream) # client-facing stream |
|
133 | client_stream = Instance(zmqstream.ZMQStream) # client-facing stream | |
@@ -135,6 +137,7 b' class TaskScheduler(SessionFactory):' | |||||
135 |
|
137 | |||
136 | # internals: |
|
138 | # internals: | |
137 | graph = Dict() # dict by msg_id of [ msg_ids that depend on key ] |
|
139 | graph = Dict() # dict by msg_id of [ msg_ids that depend on key ] | |
|
140 | # waiting = List() # list of msg_ids ready to run, but haven't due to HWM | |||
138 | depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow) |
|
141 | depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow) | |
139 | pending = Dict() # dict by engine_uuid of submitted tasks |
|
142 | pending = Dict() # dict by engine_uuid of submitted tasks | |
140 | completed = Dict() # dict by engine_uuid of completed tasks |
|
143 | completed = Dict() # dict by engine_uuid of completed tasks | |
@@ -143,6 +146,7 b' class TaskScheduler(SessionFactory):' | |||||
143 | clients = Dict() # dict by msg_id for who submitted the task |
|
146 | clients = Dict() # dict by msg_id for who submitted the task | |
144 | targets = List() # list of target IDENTs |
|
147 | targets = List() # list of target IDENTs | |
145 | loads = List() # list of engine loads |
|
148 | loads = List() # list of engine loads | |
|
149 | # full = Set() # set of IDENTs that have HWM outstanding tasks | |||
146 | all_completed = Set() # set of all completed tasks |
|
150 | all_completed = Set() # set of all completed tasks | |
147 | all_failed = Set() # set of all failed tasks |
|
151 | all_failed = Set() # set of all failed tasks | |
148 | all_done = Set() # set of all finished tasks=union(completed,failed) |
|
152 | all_done = Set() # set of all finished tasks=union(completed,failed) | |
@@ -216,7 +220,6 b' class TaskScheduler(SessionFactory):' | |||||
216 | # don't pop destinations, because it might be used later |
|
220 | # don't pop destinations, because it might be used later | |
217 | # map(self.destinations.pop, self.completed.pop(uid)) |
|
221 | # map(self.destinations.pop, self.completed.pop(uid)) | |
218 | # map(self.destinations.pop, self.failed.pop(uid)) |
|
222 | # map(self.destinations.pop, self.failed.pop(uid)) | |
219 |
|
||||
220 | idx = self.targets.index(uid) |
|
223 | idx = self.targets.index(uid) | |
221 | self.targets.pop(idx) |
|
224 | self.targets.pop(idx) | |
222 | self.loads.pop(idx) |
|
225 | self.loads.pop(idx) | |
@@ -261,7 +264,7 b' class TaskScheduler(SessionFactory):' | |||||
261 | try: |
|
264 | try: | |
262 | idents, msg = self.session.feed_identities(raw_msg, copy=False) |
|
265 | idents, msg = self.session.feed_identities(raw_msg, copy=False) | |
263 | msg = self.session.unpack_message(msg, content=False, copy=False) |
|
266 | msg = self.session.unpack_message(msg, content=False, copy=False) | |
264 | except: |
|
267 | except Exception: | |
265 | self.log.error("task::Invaid task: %s"%raw_msg, exc_info=True) |
|
268 | self.log.error("task::Invaid task: %s"%raw_msg, exc_info=True) | |
266 | return |
|
269 | return | |
267 |
|
270 | |||
@@ -362,16 +365,19 b' class TaskScheduler(SessionFactory):' | |||||
362 | def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout): |
|
365 | def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout): | |
363 | """check location dependencies, and run if they are met.""" |
|
366 | """check location dependencies, and run if they are met.""" | |
364 | blacklist = self.blacklist.setdefault(msg_id, set()) |
|
367 | blacklist = self.blacklist.setdefault(msg_id, set()) | |
365 | if follow or targets or blacklist: |
|
368 | if follow or targets or blacklist or self.hwm: | |
366 | # we need a can_run filter |
|
369 | # we need a can_run filter | |
367 | def can_run(idx): |
|
370 | def can_run(idx): | |
368 | target = self.targets[idx] |
|
371 | # check hwm | |
369 | # check targets |
|
372 | if self.loads[idx] == self.hwm: | |
370 | if targets and target not in targets: |
|
|||
371 | return False |
|
373 | return False | |
|
374 | target = self.targets[idx] | |||
372 | # check blacklist |
|
375 | # check blacklist | |
373 | if target in blacklist: |
|
376 | if target in blacklist: | |
374 | return False |
|
377 | return False | |
|
378 | # check targets | |||
|
379 | if targets and target not in targets: | |||
|
380 | return False | |||
375 | # check follow |
|
381 | # check follow | |
376 | return follow.check(self.completed[target], self.failed[target]) |
|
382 | return follow.check(self.completed[target], self.failed[target]) | |
377 |
|
383 | |||
@@ -426,13 +432,17 b' class TaskScheduler(SessionFactory):' | |||||
426 | idx = indices[idx] |
|
432 | idx = indices[idx] | |
427 | target = self.targets[idx] |
|
433 | target = self.targets[idx] | |
428 | # print (target, map(str, msg[:3])) |
|
434 | # print (target, map(str, msg[:3])) | |
|
435 | # send job to the engine | |||
429 | self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False) |
|
436 | self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False) | |
430 | self.engine_stream.send_multipart(raw_msg, copy=False) |
|
437 | self.engine_stream.send_multipart(raw_msg, copy=False) | |
|
438 | # update load | |||
431 | self.add_job(idx) |
|
439 | self.add_job(idx) | |
432 | self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout) |
|
440 | self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout) | |
|
441 | # notify Hub | |||
433 | content = dict(msg_id=msg_id, engine_id=target) |
|
442 | content = dict(msg_id=msg_id, engine_id=target) | |
434 | self.session.send(self.mon_stream, 'task_destination', content=content, |
|
443 | self.session.send(self.mon_stream, 'task_destination', content=content, | |
435 | ident=['tracktask',self.session.session]) |
|
444 | ident=['tracktask',self.session.session]) | |
|
445 | ||||
436 |
|
446 | |||
437 | #----------------------------------------------------------------------- |
|
447 | #----------------------------------------------------------------------- | |
438 | # Result Handling |
|
448 | # Result Handling | |
@@ -443,10 +453,13 b' class TaskScheduler(SessionFactory):' | |||||
443 | try: |
|
453 | try: | |
444 | idents,msg = self.session.feed_identities(raw_msg, copy=False) |
|
454 | idents,msg = self.session.feed_identities(raw_msg, copy=False) | |
445 | msg = self.session.unpack_message(msg, content=False, copy=False) |
|
455 | msg = self.session.unpack_message(msg, content=False, copy=False) | |
446 | except: |
|
456 | engine = idents[0] | |
|
457 | idx = self.targets.index(engine) | |||
|
458 | self.finish_job(idx) | |||
|
459 | except Exception: | |||
447 | self.log.error("task::Invaid result: %s"%raw_msg, exc_info=True) |
|
460 | self.log.error("task::Invaid result: %s"%raw_msg, exc_info=True) | |
448 | return |
|
461 | return | |
449 |
|
462 | |||
450 | header = msg['header'] |
|
463 | header = msg['header'] | |
451 | if header.get('dependencies_met', True): |
|
464 | if header.get('dependencies_met', True): | |
452 | success = (header['status'] == 'ok') |
|
465 | success = (header['status'] == 'ok') | |
@@ -496,17 +509,26 b' class TaskScheduler(SessionFactory):' | |||||
496 |
|
509 | |||
497 | if self.blacklist[msg_id] == targets: |
|
510 | if self.blacklist[msg_id] == targets: | |
498 | self.depending[msg_id] = args |
|
511 | self.depending[msg_id] = args | |
499 |
|
|
512 | self.fail_unreachable(msg_id) | |
500 |
|
||||
501 | elif not self.maybe_run(msg_id, *args): |
|
513 | elif not self.maybe_run(msg_id, *args): | |
502 | # resubmit failed, put it back in our dependency tree |
|
514 | # resubmit failed, put it back in our dependency tree | |
503 | self.save_unmet(msg_id, *args) |
|
515 | self.save_unmet(msg_id, *args) | |
504 |
|
516 | |||
|
517 | if self.hwm: | |||
|
518 | idx = self.targets.index(engine) | |||
|
519 | if self.loads[idx] == self.hwm-1: | |||
|
520 | self.update_graph(None) | |||
|
521 | ||||
|
522 | ||||
505 |
|
523 | |||
506 | @logged |
|
524 | @logged | |
507 | def update_graph(self, dep_id, success=True): |
|
525 | def update_graph(self, dep_id=None, success=True): | |
508 | """dep_id just finished. Update our dependency |
|
526 | """dep_id just finished. Update our dependency | |
509 |
graph and submit any jobs that just became runable. |
|
527 | graph and submit any jobs that just became runable. | |
|
528 | ||||
|
529 | Called with dep_id=None to update graph for hwm, but without finishing | |||
|
530 | a task. | |||
|
531 | """ | |||
510 | # print ("\n\n***********") |
|
532 | # print ("\n\n***********") | |
511 | # pprint (dep_id) |
|
533 | # pprint (dep_id) | |
512 | # pprint (self.graph) |
|
534 | # pprint (self.graph) | |
@@ -514,9 +536,12 b' class TaskScheduler(SessionFactory):' | |||||
514 | # pprint (self.all_completed) |
|
536 | # pprint (self.all_completed) | |
515 | # pprint (self.all_failed) |
|
537 | # pprint (self.all_failed) | |
516 | # print ("\n\n***********\n\n") |
|
538 | # print ("\n\n***********\n\n") | |
517 | if dep_id not in self.graph: |
|
539 | # update any jobs that depended on the dependency | |
518 | return |
|
540 | jobs = self.graph.pop(dep_id, []) | |
519 | jobs = self.graph.pop(dep_id) |
|
541 | # if we have HWM and an engine just become no longer full | |
|
542 | # recheck *all* jobs: | |||
|
543 | if self.hwm and any( [ load==self.hwm-1 for load in self.loads]): | |||
|
544 | jobs = self.depending.keys() | |||
520 |
|
545 | |||
521 | for msg_id in jobs: |
|
546 | for msg_id in jobs: | |
522 | raw_msg, targets, after, follow, timeout = self.depending[msg_id] |
|
547 | raw_msg, targets, after, follow, timeout = self.depending[msg_id] |
General Comments 0
You need to be logged in to leave comments.
Login now