##// END OF EJS Templates
add HWM to TaskScheduler...
MinRK -
Show More
@@ -71,11 +71,11 b' c = get_config()'
71 # dying engines, dependencies, or engine-subset load-balancing.
71 # dying engines, dependencies, or engine-subset load-balancing.
72 # c.ControllerFactory.scheme = 'pure'
72 # c.ControllerFactory.scheme = 'pure'
73
73
74 # The pure ZMQ scheduler can limit the number of outstanding tasks per engine
74 # The Python scheduler can limit the number of outstanding tasks per engine
75 # by using the ZMQ HWM option. This allows engines with long-running tasks
75 # by using an HWM option. This allows engines with long-running tasks
76 # to not steal too many tasks from other engines. The default is 0, which
76 # to not steal too many tasks from other engines. The default is 0, which
77 # means agressively distribute messages, never waiting for them to finish.
77 # means agressively distribute messages, never waiting for them to finish.
78 # c.ControllerFactory.hwm = 1
78 # c.TaskScheduler.hwm = 0
79
79
80 # Whether to use Threads or Processes to start the Schedulers. Threads will
80 # Whether to use Threads or Processes to start the Schedulers. Threads will
81 # use less resources, but potentially reduce throughput. Default is to
81 # use less resources, but potentially reduce throughput. Default is to
@@ -35,7 +35,7 b' from zmq.eventloop import ioloop, zmqstream'
35 # local imports
35 # local imports
36 from IPython.external.decorator import decorator
36 from IPython.external.decorator import decorator
37 from IPython.config.loader import Config
37 from IPython.config.loader import Config
38 from IPython.utils.traitlets import Instance, Dict, List, Set
38 from IPython.utils.traitlets import Instance, Dict, List, Set, Int
39
39
40 from IPython.parallel import error
40 from IPython.parallel import error
41 from IPython.parallel.factory import SessionFactory
41 from IPython.parallel.factory import SessionFactory
@@ -126,6 +126,8 b' class TaskScheduler(SessionFactory):'
126
126
127 """
127 """
128
128
129 hwm = Int(0, config=True) # limit number of outstanding tasks
130
129 # input arguments:
131 # input arguments:
130 scheme = Instance(FunctionType, default=leastload) # function for determining the destination
132 scheme = Instance(FunctionType, default=leastload) # function for determining the destination
131 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
133 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
@@ -135,6 +137,7 b' class TaskScheduler(SessionFactory):'
135
137
136 # internals:
138 # internals:
137 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
139 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
140 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
138 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
141 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
139 pending = Dict() # dict by engine_uuid of submitted tasks
142 pending = Dict() # dict by engine_uuid of submitted tasks
140 completed = Dict() # dict by engine_uuid of completed tasks
143 completed = Dict() # dict by engine_uuid of completed tasks
@@ -143,6 +146,7 b' class TaskScheduler(SessionFactory):'
143 clients = Dict() # dict by msg_id for who submitted the task
146 clients = Dict() # dict by msg_id for who submitted the task
144 targets = List() # list of target IDENTs
147 targets = List() # list of target IDENTs
145 loads = List() # list of engine loads
148 loads = List() # list of engine loads
149 # full = Set() # set of IDENTs that have HWM outstanding tasks
146 all_completed = Set() # set of all completed tasks
150 all_completed = Set() # set of all completed tasks
147 all_failed = Set() # set of all failed tasks
151 all_failed = Set() # set of all failed tasks
148 all_done = Set() # set of all finished tasks=union(completed,failed)
152 all_done = Set() # set of all finished tasks=union(completed,failed)
@@ -216,7 +220,6 b' class TaskScheduler(SessionFactory):'
216 # don't pop destinations, because it might be used later
220 # don't pop destinations, because it might be used later
217 # map(self.destinations.pop, self.completed.pop(uid))
221 # map(self.destinations.pop, self.completed.pop(uid))
218 # map(self.destinations.pop, self.failed.pop(uid))
222 # map(self.destinations.pop, self.failed.pop(uid))
219
220 idx = self.targets.index(uid)
223 idx = self.targets.index(uid)
221 self.targets.pop(idx)
224 self.targets.pop(idx)
222 self.loads.pop(idx)
225 self.loads.pop(idx)
@@ -261,7 +264,7 b' class TaskScheduler(SessionFactory):'
261 try:
264 try:
262 idents, msg = self.session.feed_identities(raw_msg, copy=False)
265 idents, msg = self.session.feed_identities(raw_msg, copy=False)
263 msg = self.session.unpack_message(msg, content=False, copy=False)
266 msg = self.session.unpack_message(msg, content=False, copy=False)
264 except:
267 except Exception:
265 self.log.error("task::Invaid task: %s"%raw_msg, exc_info=True)
268 self.log.error("task::Invaid task: %s"%raw_msg, exc_info=True)
266 return
269 return
267
270
@@ -362,16 +365,19 b' class TaskScheduler(SessionFactory):'
362 def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout):
365 def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout):
363 """check location dependencies, and run if they are met."""
366 """check location dependencies, and run if they are met."""
364 blacklist = self.blacklist.setdefault(msg_id, set())
367 blacklist = self.blacklist.setdefault(msg_id, set())
365 if follow or targets or blacklist:
368 if follow or targets or blacklist or self.hwm:
366 # we need a can_run filter
369 # we need a can_run filter
367 def can_run(idx):
370 def can_run(idx):
368 target = self.targets[idx]
371 # check hwm
369 # check targets
372 if self.loads[idx] == self.hwm:
370 if targets and target not in targets:
371 return False
373 return False
374 target = self.targets[idx]
372 # check blacklist
375 # check blacklist
373 if target in blacklist:
376 if target in blacklist:
374 return False
377 return False
378 # check targets
379 if targets and target not in targets:
380 return False
375 # check follow
381 # check follow
376 return follow.check(self.completed[target], self.failed[target])
382 return follow.check(self.completed[target], self.failed[target])
377
383
@@ -426,13 +432,17 b' class TaskScheduler(SessionFactory):'
426 idx = indices[idx]
432 idx = indices[idx]
427 target = self.targets[idx]
433 target = self.targets[idx]
428 # print (target, map(str, msg[:3]))
434 # print (target, map(str, msg[:3]))
435 # send job to the engine
429 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
436 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
430 self.engine_stream.send_multipart(raw_msg, copy=False)
437 self.engine_stream.send_multipart(raw_msg, copy=False)
438 # update load
431 self.add_job(idx)
439 self.add_job(idx)
432 self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout)
440 self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout)
441 # notify Hub
433 content = dict(msg_id=msg_id, engine_id=target)
442 content = dict(msg_id=msg_id, engine_id=target)
434 self.session.send(self.mon_stream, 'task_destination', content=content,
443 self.session.send(self.mon_stream, 'task_destination', content=content,
435 ident=['tracktask',self.session.session])
444 ident=['tracktask',self.session.session])
445
436
446
437 #-----------------------------------------------------------------------
447 #-----------------------------------------------------------------------
438 # Result Handling
448 # Result Handling
@@ -443,10 +453,13 b' class TaskScheduler(SessionFactory):'
443 try:
453 try:
444 idents,msg = self.session.feed_identities(raw_msg, copy=False)
454 idents,msg = self.session.feed_identities(raw_msg, copy=False)
445 msg = self.session.unpack_message(msg, content=False, copy=False)
455 msg = self.session.unpack_message(msg, content=False, copy=False)
446 except:
456 engine = idents[0]
457 idx = self.targets.index(engine)
458 self.finish_job(idx)
459 except Exception:
447 self.log.error("task::Invaid result: %s"%raw_msg, exc_info=True)
460 self.log.error("task::Invaid result: %s"%raw_msg, exc_info=True)
448 return
461 return
449
462
450 header = msg['header']
463 header = msg['header']
451 if header.get('dependencies_met', True):
464 if header.get('dependencies_met', True):
452 success = (header['status'] == 'ok')
465 success = (header['status'] == 'ok')
@@ -496,17 +509,26 b' class TaskScheduler(SessionFactory):'
496
509
497 if self.blacklist[msg_id] == targets:
510 if self.blacklist[msg_id] == targets:
498 self.depending[msg_id] = args
511 self.depending[msg_id] = args
499 return self.fail_unreachable(msg_id)
512 self.fail_unreachable(msg_id)
500
501 elif not self.maybe_run(msg_id, *args):
513 elif not self.maybe_run(msg_id, *args):
502 # resubmit failed, put it back in our dependency tree
514 # resubmit failed, put it back in our dependency tree
503 self.save_unmet(msg_id, *args)
515 self.save_unmet(msg_id, *args)
504
516
517 if self.hwm:
518 idx = self.targets.index(engine)
519 if self.loads[idx] == self.hwm-1:
520 self.update_graph(None)
521
522
505
523
506 @logged
524 @logged
507 def update_graph(self, dep_id, success=True):
525 def update_graph(self, dep_id=None, success=True):
508 """dep_id just finished. Update our dependency
526 """dep_id just finished. Update our dependency
509 graph and submit any jobs that just became runable."""
527 graph and submit any jobs that just became runable.
528
529 Called with dep_id=None to update graph for hwm, but without finishing
530 a task.
531 """
510 # print ("\n\n***********")
532 # print ("\n\n***********")
511 # pprint (dep_id)
533 # pprint (dep_id)
512 # pprint (self.graph)
534 # pprint (self.graph)
@@ -514,9 +536,12 b' class TaskScheduler(SessionFactory):'
514 # pprint (self.all_completed)
536 # pprint (self.all_completed)
515 # pprint (self.all_failed)
537 # pprint (self.all_failed)
516 # print ("\n\n***********\n\n")
538 # print ("\n\n***********\n\n")
517 if dep_id not in self.graph:
539 # update any jobs that depended on the dependency
518 return
540 jobs = self.graph.pop(dep_id, [])
519 jobs = self.graph.pop(dep_id)
541 # if we have HWM and an engine just become no longer full
542 # recheck *all* jobs:
543 if self.hwm and any( [ load==self.hwm-1 for load in self.loads]):
544 jobs = self.depending.keys()
520
545
521 for msg_id in jobs:
546 for msg_id in jobs:
522 raw_msg, targets, after, follow, timeout = self.depending[msg_id]
547 raw_msg, targets, after, follow, timeout = self.depending[msg_id]
General Comments 0
You need to be logged in to leave comments. Login now