##// END OF EJS Templates
rework logging connections
rework logging connections

File last commit:

r3610:c221efd0
r3610:c221efd0
Show More
scheduler.py
511 lines | 18.4 KiB | text/x-python | PythonLexer
MinRK
general parallel code cleanup
r3556 """The Python scheduler for rich scheduling.
The Pure ZMQ scheduler does not allow routing schemes other than LRU,
nor does it check msg_id DAG dependencies. For those, a slightly slower
Python Scheduler exists.
"""
MinRK
added dependencies & Python scheduler
r3548 #----------------------------------------------------------------------
# Imports
#----------------------------------------------------------------------
MinRK
use print_function
r3553 from __future__ import print_function
MinRK
rework logging connections
r3610 import sys
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 import logging
MinRK
rework logging connections
r3610 from random import randint,random
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 from types import FunctionType
MinRK
added dependencies & Python scheduler
r3548
try:
import numpy
except ImportError:
numpy = None
import zmq
from zmq.eventloop import ioloop, zmqstream
# local imports
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 from IPython.external.decorator import decorator
MinRK
rework logging connections
r3610 # from IPython.config.configurable import Configurable
MinRK
Refactor newparallel to use Config system...
r3604 from IPython.utils.traitlets import Instance, Dict, List, Set
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603
MinRK
Improvements to dependency handling...
r3607 import error
MinRK
added dependencies & Python scheduler
r3548 from client import Client
from dependency import Dependency
import streamsession as ss
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 from entry_point import connect_logger, local_logger
MinRK
rework logging connections
r3610 from factory import LoggingFactory
MinRK
added dependencies & Python scheduler
r3548
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603
MinRK
added dependencies & Python scheduler
r3548 @decorator
def logged(f,self,*args,**kwargs):
MinRK
propagate iopub to clients
r3602 # print ("#--------------------")
MinRK
rework logging connections
r3610 self.log.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs))
MinRK
propagate iopub to clients
r3602 # print ("#--")
MinRK
added dependencies & Python scheduler
r3548 return f(self,*args, **kwargs)
#----------------------------------------------------------------------
# Chooser functions
#----------------------------------------------------------------------
def plainrandom(loads):
"""Plain random pick."""
n = len(loads)
return randint(0,n-1)
def lru(loads):
"""Always pick the front of the line.
MinRK
general parallel code cleanup
r3556 The content of `loads` is ignored.
MinRK
added dependencies & Python scheduler
r3548
Assumes LRU ordering of loads, with oldest first.
"""
return 0
def twobin(loads):
"""Pick two at random, use the LRU of the two.
The content of loads is ignored.
Assumes LRU ordering of loads, with oldest first.
"""
n = len(loads)
a = randint(0,n-1)
b = randint(0,n-1)
return min(a,b)
def weighted(loads):
"""Pick two at random using inverse load as weight.
Return the less loaded of the two.
"""
# weight 0 a million times more than 1:
weights = 1./(1e-6+numpy.array(loads))
sums = weights.cumsum()
t = sums[-1]
x = random()*t
y = random()*t
idx = 0
idy = 0
while sums[idx] < x:
idx += 1
while sums[idy] < y:
idy += 1
if weights[idy] > weights[idx]:
return idy
else:
return idx
def leastload(loads):
"""Always choose the lowest load.
If the lowest load occurs more than once, the first
occurance will be used. If loads has LRU ordering, this means
the LRU of those with the lowest load is chosen.
"""
return loads.index(min(loads))
#---------------------------------------------------------------------
# Classes
#---------------------------------------------------------------------
MinRK
Improvements to dependency handling...
r3607 # store empty default dependency:
MET = Dependency([])
MinRK
rework logging connections
r3610 class TaskScheduler(LoggingFactory):
MinRK
scheduler progress
r3551 """Python TaskScheduler object.
MinRK
added dependencies & Python scheduler
r3548
This is the simplest object that supports msg_id based
DAG dependencies. *Only* task msg_ids are checked, not
msg_ids of jobs submitted via the MUX queue.
"""
MinRK
Refactor newparallel to use Config system...
r3604 # input arguments:
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 scheme = Instance(FunctionType, default=leastload) # function for determining the destination
client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
io_loop = Instance(ioloop.IOLoop)
# internals:
MinRK
Refactor newparallel to use Config system...
r3604 dependencies = Dict() # dict by msg_id of [ msg_ids that depend on key ]
depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
pending = Dict() # dict by engine_uuid of submitted tasks
completed = Dict() # dict by engine_uuid of completed tasks
MinRK
Improvements to dependency handling...
r3607 failed = Dict() # dict by engine_uuid of failed tasks
destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
MinRK
Refactor newparallel to use Config system...
r3604 clients = Dict() # dict by msg_id for who submitted the task
targets = List() # list of target IDENTs
loads = List() # list of engine loads
MinRK
Improvements to dependency handling...
r3607 all_completed = Set() # set of all completed tasks
all_failed = Set() # set of all failed tasks
all_done = Set() # set of all finished tasks=union(completed,failed)
MinRK
Refactor newparallel to use Config system...
r3604 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
session = Instance(ss.StreamSession)
MinRK
added dependencies & Python scheduler
r3548
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 def __init__(self, **kwargs):
super(TaskScheduler, self).__init__(**kwargs)
MinRK
added dependencies & Python scheduler
r3548
self.session = ss.StreamSession(username="TaskScheduler")
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 self.engine_stream.on_recv(self.dispatch_result, copy=False)
MinRK
added dependencies & Python scheduler
r3548 self._notification_handlers = dict(
registration_notification = self._register_engine,
unregistration_notification = self._unregister_engine
)
self.notifier_stream.on_recv(self.dispatch_notification)
MinRK
rework logging connections
r3610 self.log.info("Scheduler started...%r"%self)
MinRK
added dependencies & Python scheduler
r3548
def resume_receiving(self):
MinRK
general parallel code cleanup
r3556 """Resume accepting jobs."""
MinRK
added dependencies & Python scheduler
r3548 self.client_stream.on_recv(self.dispatch_submission, copy=False)
def stop_receiving(self):
MinRK
general parallel code cleanup
r3556 """Stop accepting jobs while there are no engines.
Leave them in the ZMQ queue."""
MinRK
added dependencies & Python scheduler
r3548 self.client_stream.on_recv(None)
#-----------------------------------------------------------------------
# [Un]Registration Handling
#-----------------------------------------------------------------------
def dispatch_notification(self, msg):
"""dispatch register/unregister events."""
idents,msg = self.session.feed_identities(msg)
msg = self.session.unpack_message(msg)
msg_type = msg['msg_type']
handler = self._notification_handlers.get(msg_type, None)
if handler is None:
raise Exception("Unhandled message type: %s"%msg_type)
else:
try:
handler(str(msg['content']['queue']))
except KeyError:
MinRK
rework logging connections
r3610 self.log.error("task::Invalid notification msg: %s"%msg)
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603
MinRK
added dependencies & Python scheduler
r3548 @logged
def _register_engine(self, uid):
MinRK
general parallel code cleanup
r3556 """New engine with ident `uid` became available."""
MinRK
added dependencies & Python scheduler
r3548 # head of the line:
self.targets.insert(0,uid)
self.loads.insert(0,0)
# initialize sets
self.completed[uid] = set()
MinRK
Improvements to dependency handling...
r3607 self.failed[uid] = set()
MinRK
added dependencies & Python scheduler
r3548 self.pending[uid] = {}
if len(self.targets) == 1:
self.resume_receiving()
def _unregister_engine(self, uid):
MinRK
general parallel code cleanup
r3556 """Existing engine with ident `uid` became unavailable."""
MinRK
added dependencies & Python scheduler
r3548 if len(self.targets) == 1:
MinRK
general parallel code cleanup
r3556 # this was our only engine
MinRK
added dependencies & Python scheduler
r3548 self.stop_receiving()
MinRK
general parallel code cleanup
r3556
# handle any potentially finished tasks:
MinRK
added dependencies & Python scheduler
r3548 self.engine_stream.flush()
self.completed.pop(uid)
MinRK
Improvements to dependency handling...
r3607 self.failed.pop(uid)
# don't pop destinations, because it might be used later
# map(self.destinations.pop, self.completed.pop(uid))
# map(self.destinations.pop, self.failed.pop(uid))
MinRK
added dependencies & Python scheduler
r3548 lost = self.pending.pop(uid)
idx = self.targets.index(uid)
self.targets.pop(idx)
self.loads.pop(idx)
self.handle_stranded_tasks(lost)
def handle_stranded_tasks(self, lost):
MinRK
general parallel code cleanup
r3556 """Deal with jobs resident in an engine that died."""
MinRK
added dependencies & Python scheduler
r3548 # TODO: resubmit the tasks?
for msg_id in lost:
pass
#-----------------------------------------------------------------------
# Job Submission
#-----------------------------------------------------------------------
@logged
def dispatch_submission(self, raw_msg):
MinRK
general parallel code cleanup
r3556 """Dispatch job submission to appropriate handlers."""
MinRK
added dependencies & Python scheduler
r3548 # ensure targets up to date:
self.notifier_stream.flush()
try:
idents, msg = self.session.feed_identities(raw_msg, copy=False)
MinRK
general parallel code cleanup
r3556 except Exception as e:
MinRK
rework logging connections
r3610 self.log.error("task::Invaid msg: %s"%msg)
MinRK
added dependencies & Python scheduler
r3548 return
MinRK
quiet down scheduler printing, fix dep_id check in update_dependencies
r3563 # send to monitor
self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
MinRK
added dependencies & Python scheduler
r3548 msg = self.session.unpack_message(msg, content=False, copy=False)
header = msg['header']
msg_id = header['msg_id']
MinRK
general parallel code cleanup
r3556
# time dependencies
MinRK
added dependencies & Python scheduler
r3548 after = Dependency(header.get('after', []))
if after.mode == 'all':
MinRK
Improvements to dependency handling...
r3607 after.difference_update(self.all_completed)
if not after.success_only:
after.difference_update(self.all_failed)
if after.check(self.all_completed, self.all_failed):
MinRK
general parallel code cleanup
r3556 # recast as empty set, if `after` already met,
# to prevent unnecessary set comparisons
MinRK
Improvements to dependency handling...
r3607 after = MET
MinRK
added dependencies & Python scheduler
r3548
MinRK
general parallel code cleanup
r3556 # location dependencies
MinRK
added dependencies & Python scheduler
r3548 follow = Dependency(header.get('follow', []))
MinRK
Improvements to dependency handling...
r3607
# check if unreachable:
if after.unreachable(self.all_failed) or follow.unreachable(self.all_failed):
self.depending[msg_id] = [raw_msg,MET,MET]
return self.fail_unreachable(msg_id)
if after.check(self.all_completed, self.all_failed):
MinRK
added dependencies & Python scheduler
r3548 # time deps already met, try to run
if not self.maybe_run(msg_id, raw_msg, follow):
# can't run yet
self.save_unmet(msg_id, raw_msg, after, follow)
else:
self.save_unmet(msg_id, raw_msg, after, follow)
MinRK
general parallel code cleanup
r3556
MinRK
added dependencies & Python scheduler
r3548 @logged
MinRK
Improvements to dependency handling...
r3607 def fail_unreachable(self, msg_id):
"""a message has become unreachable"""
if msg_id not in self.depending:
MinRK
rework logging connections
r3610 self.log.error("msg %r already failed!"%msg_id)
MinRK
Improvements to dependency handling...
r3607 return
raw_msg, after, follow = self.depending.pop(msg_id)
for mid in follow.union(after):
if mid in self.dependencies:
self.dependencies[mid].remove(msg_id)
idents,msg = self.session.feed_identities(raw_msg, copy=False)
msg = self.session.unpack_message(msg, copy=False, content=False)
header = msg['header']
try:
raise error.ImpossibleDependency()
except:
content = ss.wrap_exception()
self.all_done.add(msg_id)
self.all_failed.add(msg_id)
msg = self.session.send(self.client_stream, 'apply_reply', content,
parent=header, ident=idents)
self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
self.update_dependencies(msg_id, success=False)
@logged
MinRK
added dependencies & Python scheduler
r3548 def maybe_run(self, msg_id, raw_msg, follow=None):
"""check location dependencies, and run if they are met."""
if follow:
def can_run(idx):
target = self.targets[idx]
return target not in self.blacklist.get(msg_id, []) and\
MinRK
Improvements to dependency handling...
r3607 follow.check(self.completed[target], self.failed[target])
MinRK
added dependencies & Python scheduler
r3548
indices = filter(can_run, range(len(self.targets)))
if not indices:
MinRK
Improvements to dependency handling...
r3607 # TODO evaluate unmeetable follow dependencies
if follow.mode == 'all':
dests = set()
relevant = self.all_completed if follow.success_only else self.all_done
for m in follow.intersection(relevant):
dests.add(self.destinations[m])
if len(dests) > 1:
self.fail_unreachable(msg_id)
MinRK
added dependencies & Python scheduler
r3548 return False
else:
indices = None
self.submit_task(msg_id, raw_msg, indices)
return True
@logged
MinRK
Improvements to dependency handling...
r3607 def save_unmet(self, msg_id, raw_msg, after, follow):
MinRK
added dependencies & Python scheduler
r3548 """Save a message for later submission when its dependencies are met."""
MinRK
Improvements to dependency handling...
r3607 self.depending[msg_id] = [raw_msg,after,follow]
# track the ids in follow or after, but not those already finished
MinRK
added dependencies & Python scheduler
r3548 for dep_id in after.union(follow).difference(self.all_done):
if dep_id not in self.dependencies:
self.dependencies[dep_id] = set()
self.dependencies[dep_id].add(msg_id)
MinRK
scheduler progress
r3551
MinRK
added dependencies & Python scheduler
r3548 @logged
def submit_task(self, msg_id, msg, follow=None, indices=None):
MinRK
general parallel code cleanup
r3556 """Submit a task to any of a subset of our targets."""
MinRK
added dependencies & Python scheduler
r3548 if indices:
loads = [self.loads[i] for i in indices]
else:
loads = self.loads
idx = self.scheme(loads)
if indices:
idx = indices[idx]
target = self.targets[idx]
MinRK
quiet down scheduler printing, fix dep_id check in update_dependencies
r3563 # print (target, map(str, msg[:3]))
MinRK
scheduler progress
r3551 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
self.engine_stream.send_multipart(msg, copy=False)
MinRK
added dependencies & Python scheduler
r3548 self.add_job(idx)
self.pending[target][msg_id] = (msg, follow)
MinRK
general parallel code cleanup
r3556 content = dict(msg_id=msg_id, engine_id=target)
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 self.session.send(self.mon_stream, 'task_destination', content=content,
ident=['tracktask',self.session.session])
MinRK
added dependencies & Python scheduler
r3548
#-----------------------------------------------------------------------
# Result Handling
#-----------------------------------------------------------------------
@logged
def dispatch_result(self, raw_msg):
try:
idents,msg = self.session.feed_identities(raw_msg, copy=False)
MinRK
general parallel code cleanup
r3556 except Exception as e:
MinRK
rework logging connections
r3610 self.log.error("task::Invaid result: %s"%msg)
MinRK
added dependencies & Python scheduler
r3548 return
msg = self.session.unpack_message(msg, content=False, copy=False)
header = msg['header']
if header.get('dependencies_met', True):
MinRK
Improvements to dependency handling...
r3607 success = (header['status'] == 'ok')
self.handle_result(idents, msg['parent_header'], raw_msg, success)
# send to Hub monitor
MinRK
added dependencies & Python scheduler
r3548 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
else:
MinRK
scheduler progress
r3551 self.handle_unmet_dependency(idents, msg['parent_header'])
MinRK
added dependencies & Python scheduler
r3548
@logged
MinRK
Improvements to dependency handling...
r3607 def handle_result(self, idents, parent, raw_msg, success=True):
MinRK
added dependencies & Python scheduler
r3548 # first, relay result to client
engine = idents[0]
client = idents[1]
# swap_ids for XREP-XREP mirror
raw_msg[:2] = [client,engine]
MinRK
quiet down scheduler printing, fix dep_id check in update_dependencies
r3563 # print (map(str, raw_msg[:4]))
MinRK
added dependencies & Python scheduler
r3548 self.client_stream.send_multipart(raw_msg, copy=False)
# now, update our data structures
msg_id = parent['msg_id']
self.pending[engine].pop(msg_id)
MinRK
Improvements to dependency handling...
r3607 if success:
self.completed[engine].add(msg_id)
self.all_completed.add(msg_id)
else:
self.failed[engine].add(msg_id)
self.all_failed.add(msg_id)
MinRK
add all completed task IDs to Scheduler.all_done
r3565 self.all_done.add(msg_id)
MinRK
Improvements to dependency handling...
r3607 self.destinations[msg_id] = engine
MinRK
added dependencies & Python scheduler
r3548
MinRK
Improvements to dependency handling...
r3607 self.update_dependencies(msg_id, success)
MinRK
added dependencies & Python scheduler
r3548
@logged
def handle_unmet_dependency(self, idents, parent):
engine = idents[0]
msg_id = parent['msg_id']
if msg_id not in self.blacklist:
self.blacklist[msg_id] = set()
self.blacklist[msg_id].add(engine)
raw_msg,follow = self.pending[engine].pop(msg_id)
MinRK
scheduler progress
r3551 if not self.maybe_run(msg_id, raw_msg, follow):
MinRK
added dependencies & Python scheduler
r3548 # resubmit failed, put it back in our dependency tree
MinRK
Improvements to dependency handling...
r3607 self.save_unmet(msg_id, raw_msg, MET, follow)
MinRK
added dependencies & Python scheduler
r3548 pass
MinRK
Improvements to dependency handling...
r3607
MinRK
added dependencies & Python scheduler
r3548 @logged
MinRK
Improvements to dependency handling...
r3607 def update_dependencies(self, dep_id, success=True):
MinRK
added dependencies & Python scheduler
r3548 """dep_id just finished. Update our dependency
table and submit any jobs that just became runable."""
MinRK
Improvements to dependency handling...
r3607 # print ("\n\n***********")
# pprint (dep_id)
# pprint (self.dependencies)
# pprint (self.depending)
# pprint (self.all_completed)
# pprint (self.all_failed)
# print ("\n\n***********\n\n")
MinRK
added dependencies & Python scheduler
r3548 if dep_id not in self.dependencies:
return
jobs = self.dependencies.pop(dep_id)
MinRK
Improvements to dependency handling...
r3607
for msg_id in jobs:
raw_msg, after, follow = self.depending[msg_id]
# if dep_id in after:
# if after.mode == 'all' and (success or not after.success_only):
# after.remove(dep_id)
if after.unreachable(self.all_failed) or follow.unreachable(self.all_failed):
self.fail_unreachable(msg_id)
elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
self.depending[msg_id][1] = MET
MinRK
added dependencies & Python scheduler
r3548 if self.maybe_run(msg_id, raw_msg, follow):
MinRK
Improvements to dependency handling...
r3607
self.depending.pop(msg_id)
for mid in follow.union(after):
MinRK
scheduler progress
r3551 if mid in self.dependencies:
self.dependencies[mid].remove(msg_id)
MinRK
added dependencies & Python scheduler
r3548
#----------------------------------------------------------------------
# methods to be overridden by subclasses
#----------------------------------------------------------------------
def add_job(self, idx):
"""Called after self.targets[idx] just got the job with header.
Override with subclasses. The default ordering is simple LRU.
The default loads are the number of outstanding jobs."""
self.loads[idx] += 1
for lis in (self.targets, self.loads):
lis.append(lis.pop(idx))
def finish_job(self, idx):
"""Called after self.targets[idx] just finished a job.
Override with subclasses."""
self.loads[idx] -= 1
MinRK
rework logging connections
r3610 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, logname='ZMQ', log_addr=None, loglevel=logging.DEBUG, scheme='weighted'):
MinRK
added dependencies & Python scheduler
r3548 from zmq.eventloop import ioloop
from zmq.eventloop.zmqstream import ZMQStream
ctx = zmq.Context()
loop = ioloop.IOLoop()
ins = ZMQStream(ctx.socket(zmq.XREP),loop)
ins.bind(in_addr)
outs = ZMQStream(ctx.socket(zmq.XREP),loop)
outs.bind(out_addr)
mons = ZMQStream(ctx.socket(zmq.PUB),loop)
mons.connect(mon_addr)
nots = ZMQStream(ctx.socket(zmq.SUB),loop)
nots.setsockopt(zmq.SUBSCRIBE, '')
nots.connect(not_addr)
MinRK
Refactor newparallel to use Config system...
r3604 scheme = globals().get(scheme, None)
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 # setup logging
if log_addr:
MinRK
rework logging connections
r3610 connect_logger(logname, ctx, log_addr, root="scheduler", loglevel=loglevel)
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603 else:
MinRK
rework logging connections
r3610 local_logger(logname, loglevel)
MinRK
improved logging + Hub,Engine,Scheduler are Configurable
r3603
scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
mon_stream=mons,notifier_stream=nots,
MinRK
rework logging connections
r3610 scheme=scheme,io_loop=loop, logname=logname)
MinRK
added dependencies & Python scheduler
r3548
MinRK
Refactor newparallel to use Config system...
r3604 try:
loop.start()
except KeyboardInterrupt:
print ("interrupted, exiting...", file=sys.__stderr__)
MinRK
added dependencies & Python scheduler
r3548
if __name__ == '__main__':
iface = 'tcp://127.0.0.1:%i'
launch_scheduler(iface%12345,iface%1236,iface%12347,iface%12348)