##// END OF EJS Templates
add HWM to TaskScheduler...
MinRK -
Show More
@@ -71,11 +71,11 b' c = get_config()'
71 71 # dying engines, dependencies, or engine-subset load-balancing.
72 72 # c.ControllerFactory.scheme = 'pure'
73 73
74 # The pure ZMQ scheduler can limit the number of outstanding tasks per engine
75 # by using the ZMQ HWM option. This allows engines with long-running tasks
74 # The Python scheduler can limit the number of outstanding tasks per engine
75 # by using an HWM option. This allows engines with long-running tasks
76 76 # to not steal too many tasks from other engines. The default is 0, which
77 77 # means agressively distribute messages, never waiting for them to finish.
78 # c.ControllerFactory.hwm = 1
78 # c.TaskScheduler.hwm = 0
79 79
80 80 # Whether to use Threads or Processes to start the Schedulers. Threads will
81 81 # use less resources, but potentially reduce throughput. Default is to
@@ -35,7 +35,7 b' from zmq.eventloop import ioloop, zmqstream'
35 35 # local imports
36 36 from IPython.external.decorator import decorator
37 37 from IPython.config.loader import Config
38 from IPython.utils.traitlets import Instance, Dict, List, Set
38 from IPython.utils.traitlets import Instance, Dict, List, Set, Int
39 39
40 40 from IPython.parallel import error
41 41 from IPython.parallel.factory import SessionFactory
@@ -126,6 +126,8 b' class TaskScheduler(SessionFactory):'
126 126
127 127 """
128 128
129 hwm = Int(0, config=True) # limit number of outstanding tasks
130
129 131 # input arguments:
130 132 scheme = Instance(FunctionType, default=leastload) # function for determining the destination
131 133 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
@@ -135,6 +137,7 b' class TaskScheduler(SessionFactory):'
135 137
136 138 # internals:
137 139 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
140 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
138 141 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
139 142 pending = Dict() # dict by engine_uuid of submitted tasks
140 143 completed = Dict() # dict by engine_uuid of completed tasks
@@ -143,6 +146,7 b' class TaskScheduler(SessionFactory):'
143 146 clients = Dict() # dict by msg_id for who submitted the task
144 147 targets = List() # list of target IDENTs
145 148 loads = List() # list of engine loads
149 # full = Set() # set of IDENTs that have HWM outstanding tasks
146 150 all_completed = Set() # set of all completed tasks
147 151 all_failed = Set() # set of all failed tasks
148 152 all_done = Set() # set of all finished tasks=union(completed,failed)
@@ -216,7 +220,6 b' class TaskScheduler(SessionFactory):'
216 220 # don't pop destinations, because it might be used later
217 221 # map(self.destinations.pop, self.completed.pop(uid))
218 222 # map(self.destinations.pop, self.failed.pop(uid))
219
220 223 idx = self.targets.index(uid)
221 224 self.targets.pop(idx)
222 225 self.loads.pop(idx)
@@ -261,7 +264,7 b' class TaskScheduler(SessionFactory):'
261 264 try:
262 265 idents, msg = self.session.feed_identities(raw_msg, copy=False)
263 266 msg = self.session.unpack_message(msg, content=False, copy=False)
264 except:
267 except Exception:
265 268 self.log.error("task::Invaid task: %s"%raw_msg, exc_info=True)
266 269 return
267 270
@@ -362,16 +365,19 b' class TaskScheduler(SessionFactory):'
362 365 def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout):
363 366 """check location dependencies, and run if they are met."""
364 367 blacklist = self.blacklist.setdefault(msg_id, set())
365 if follow or targets or blacklist:
368 if follow or targets or blacklist or self.hwm:
366 369 # we need a can_run filter
367 370 def can_run(idx):
368 target = self.targets[idx]
369 # check targets
370 if targets and target not in targets:
371 # check hwm
372 if self.loads[idx] == self.hwm:
371 373 return False
374 target = self.targets[idx]
372 375 # check blacklist
373 376 if target in blacklist:
374 377 return False
378 # check targets
379 if targets and target not in targets:
380 return False
375 381 # check follow
376 382 return follow.check(self.completed[target], self.failed[target])
377 383
@@ -426,13 +432,17 b' class TaskScheduler(SessionFactory):'
426 432 idx = indices[idx]
427 433 target = self.targets[idx]
428 434 # print (target, map(str, msg[:3]))
435 # send job to the engine
429 436 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
430 437 self.engine_stream.send_multipart(raw_msg, copy=False)
438 # update load
431 439 self.add_job(idx)
432 440 self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout)
441 # notify Hub
433 442 content = dict(msg_id=msg_id, engine_id=target)
434 443 self.session.send(self.mon_stream, 'task_destination', content=content,
435 444 ident=['tracktask',self.session.session])
445
436 446
437 447 #-----------------------------------------------------------------------
438 448 # Result Handling
@@ -443,10 +453,13 b' class TaskScheduler(SessionFactory):'
443 453 try:
444 454 idents,msg = self.session.feed_identities(raw_msg, copy=False)
445 455 msg = self.session.unpack_message(msg, content=False, copy=False)
446 except:
456 engine = idents[0]
457 idx = self.targets.index(engine)
458 self.finish_job(idx)
459 except Exception:
447 460 self.log.error("task::Invaid result: %s"%raw_msg, exc_info=True)
448 461 return
449
462
450 463 header = msg['header']
451 464 if header.get('dependencies_met', True):
452 465 success = (header['status'] == 'ok')
@@ -496,17 +509,26 b' class TaskScheduler(SessionFactory):'
496 509
497 510 if self.blacklist[msg_id] == targets:
498 511 self.depending[msg_id] = args
499 return self.fail_unreachable(msg_id)
500
512 self.fail_unreachable(msg_id)
501 513 elif not self.maybe_run(msg_id, *args):
502 514 # resubmit failed, put it back in our dependency tree
503 515 self.save_unmet(msg_id, *args)
504 516
517 if self.hwm:
518 idx = self.targets.index(engine)
519 if self.loads[idx] == self.hwm-1:
520 self.update_graph(None)
521
522
505 523
506 524 @logged
507 def update_graph(self, dep_id, success=True):
525 def update_graph(self, dep_id=None, success=True):
508 526 """dep_id just finished. Update our dependency
509 graph and submit any jobs that just became runable."""
527 graph and submit any jobs that just became runable.
528
529 Called with dep_id=None to update graph for hwm, but without finishing
530 a task.
531 """
510 532 # print ("\n\n***********")
511 533 # pprint (dep_id)
512 534 # pprint (self.graph)
@@ -514,9 +536,12 b' class TaskScheduler(SessionFactory):'
514 536 # pprint (self.all_completed)
515 537 # pprint (self.all_failed)
516 538 # print ("\n\n***********\n\n")
517 if dep_id not in self.graph:
518 return
519 jobs = self.graph.pop(dep_id)
539 # update any jobs that depended on the dependency
540 jobs = self.graph.pop(dep_id, [])
541 # if we have HWM and an engine just become no longer full
542 # recheck *all* jobs:
543 if self.hwm and any( [ load==self.hwm-1 for load in self.loads]):
544 jobs = self.depending.keys()
520 545
521 546 for msg_id in jobs:
522 547 raw_msg, targets, after, follow, timeout = self.depending[msg_id]
General Comments 0
You need to be logged in to leave comments. Login now