scheduler.py
714 lines
| 26.7 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3556 | """The Python scheduler for rich scheduling. | ||
The Pure ZMQ scheduler does not allow routing schemes other than LRU, | ||||
nor does it check msg_id DAG dependencies. For those, a slightly slower | ||||
Python Scheduler exists. | ||||
MinRK
|
r4018 | |||
Authors: | ||||
* Min RK | ||||
MinRK
|
r3556 | """ | ||
MinRK
|
r3660 | #----------------------------------------------------------------------------- | ||
# Copyright (C) 2010-2011 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3556 | |||
MinRK
|
r3548 | #---------------------------------------------------------------------- | ||
# Imports | ||||
#---------------------------------------------------------------------- | ||||
MinRK
|
r3553 | from __future__ import print_function | ||
MinRK
|
r3631 | |||
MinRK
|
r3603 | import logging | ||
MinRK
|
r3631 | import sys | ||
from datetime import datetime, timedelta | ||||
MinRK
|
r3611 | from random import randint, random | ||
MinRK
|
r3603 | from types import FunctionType | ||
MinRK
|
r3631 | |||
MinRK
|
r3548 | try: | ||
import numpy | ||||
except ImportError: | ||||
numpy = None | ||||
import zmq | ||||
from zmq.eventloop import ioloop, zmqstream | ||||
# local imports | ||||
MinRK
|
r3603 | from IPython.external.decorator import decorator | ||
MinRK
|
r4092 | from IPython.config.application import Application | ||
MinRK
|
r3773 | from IPython.config.loader import Config | ||
MinRK
|
r4155 | from IPython.utils.traitlets import Instance, Dict, List, Set, Int, Enum, CBytes | ||
MinRK
|
r3603 | |||
MinRK
|
r3673 | from IPython.parallel import error | ||
from IPython.parallel.factory import SessionFactory | ||||
MinRK
|
r4161 | from IPython.parallel.util import connect_logger, local_logger, asbytes | ||
MinRK
|
r3548 | |||
MinRK
|
r3673 | from .dependency import Dependency | ||
MinRK
|
r3603 | |||
MinRK
|
r3548 | @decorator | ||
def logged(f,self,*args,**kwargs): | ||||
MinRK
|
r3602 | # print ("#--------------------") | ||
MinRK
|
r4140 | self.log.debug("scheduler::%s(*%s,**%s)", f.func_name, args, kwargs) | ||
MinRK
|
r3602 | # print ("#--") | ||
MinRK
|
r3548 | return f(self,*args, **kwargs) | ||
#---------------------------------------------------------------------- | ||||
# Chooser functions | ||||
#---------------------------------------------------------------------- | ||||
def plainrandom(loads): | ||||
"""Plain random pick.""" | ||||
n = len(loads) | ||||
return randint(0,n-1) | ||||
def lru(loads): | ||||
"""Always pick the front of the line. | ||||
MinRK
|
r3556 | The content of `loads` is ignored. | ||
MinRK
|
r3548 | |||
Assumes LRU ordering of loads, with oldest first. | ||||
""" | ||||
return 0 | ||||
def twobin(loads): | ||||
"""Pick two at random, use the LRU of the two. | ||||
The content of loads is ignored. | ||||
Assumes LRU ordering of loads, with oldest first. | ||||
""" | ||||
n = len(loads) | ||||
a = randint(0,n-1) | ||||
b = randint(0,n-1) | ||||
return min(a,b) | ||||
def weighted(loads): | ||||
"""Pick two at random using inverse load as weight. | ||||
Return the less loaded of the two. | ||||
""" | ||||
# weight 0 a million times more than 1: | ||||
weights = 1./(1e-6+numpy.array(loads)) | ||||
sums = weights.cumsum() | ||||
t = sums[-1] | ||||
x = random()*t | ||||
y = random()*t | ||||
idx = 0 | ||||
idy = 0 | ||||
while sums[idx] < x: | ||||
idx += 1 | ||||
while sums[idy] < y: | ||||
idy += 1 | ||||
if weights[idy] > weights[idx]: | ||||
return idy | ||||
else: | ||||
return idx | ||||
def leastload(loads): | ||||
"""Always choose the lowest load. | ||||
If the lowest load occurs more than once, the first | ||||
occurance will be used. If loads has LRU ordering, this means | ||||
the LRU of those with the lowest load is chosen. | ||||
""" | ||||
return loads.index(min(loads)) | ||||
#--------------------------------------------------------------------- | ||||
# Classes | ||||
#--------------------------------------------------------------------- | ||||
MinRK
|
r3607 | # store empty default dependency: | ||
MET = Dependency([]) | ||||
MinRK
|
r3611 | class TaskScheduler(SessionFactory): | ||
MinRK
|
r3551 | """Python TaskScheduler object. | ||
MinRK
|
r3548 | |||
This is the simplest object that supports msg_id based | ||||
DAG dependencies. *Only* task msg_ids are checked, not | ||||
msg_ids of jobs submitted via the MUX queue. | ||||
""" | ||||
MinRK
|
r3985 | hwm = Int(0, config=True, shortname='hwm', | ||
help="""specify the High Water Mark (HWM) for the downstream | ||||
socket in the Task scheduler. This is the maximum number | ||||
of allowed outstanding tasks on each engine.""" | ||||
) | ||||
scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'), | ||||
'leastload', config=True, shortname='scheme', allow_none=False, | ||||
help="""select the task scheduler scheme [default: Python LRU] | ||||
Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'""" | ||||
) | ||||
def _scheme_name_changed(self, old, new): | ||||
self.log.debug("Using scheme %r"%new) | ||||
self.scheme = globals()[new] | ||||
MinRK
|
r3781 | |||
MinRK
|
r3604 | # input arguments: | ||
MinRK
|
r3989 | scheme = Instance(FunctionType) # function for determining the destination | ||
def _scheme_default(self): | ||||
return leastload | ||||
MinRK
|
r3603 | client_stream = Instance(zmqstream.ZMQStream) # client-facing stream | ||
engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream | ||||
notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream | ||||
mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream | ||||
# internals: | ||||
MinRK
|
r3624 | graph = Dict() # dict by msg_id of [ msg_ids that depend on key ] | ||
MinRK
|
r3873 | retries = Dict() # dict by msg_id of retries remaining (non-neg ints) | ||
MinRK
|
r3781 | # waiting = List() # list of msg_ids ready to run, but haven't due to HWM | ||
MinRK
|
r3604 | depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow) | ||
pending = Dict() # dict by engine_uuid of submitted tasks | ||||
completed = Dict() # dict by engine_uuid of completed tasks | ||||
MinRK
|
r3607 | failed = Dict() # dict by engine_uuid of failed tasks | ||
destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed) | ||||
MinRK
|
r3604 | clients = Dict() # dict by msg_id for who submitted the task | ||
targets = List() # list of target IDENTs | ||||
loads = List() # list of engine loads | ||||
MinRK
|
r3781 | # full = Set() # set of IDENTs that have HWM outstanding tasks | ||
MinRK
|
r3607 | all_completed = Set() # set of all completed tasks | ||
all_failed = Set() # set of all failed tasks | ||||
all_done = Set() # set of all finished tasks=union(completed,failed) | ||||
MinRK
|
r3624 | all_ids = Set() # set of all submitted task IDs | ||
MinRK
|
r3604 | blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency | ||
MinRK
|
r3611 | auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback') | ||
MinRK
|
r3548 | |||
MinRK
|
r4155 | ident = CBytes() # ZMQ identity. This should just be self.session.session | ||
# but ensure Bytes | ||||
def _ident_default(self): | ||||
MinRK
|
r4161 | return asbytes(self.session.session) | ||
MinRK
|
r3548 | |||
MinRK
|
r3611 | def start(self): | ||
MinRK
|
r3603 | self.engine_stream.on_recv(self.dispatch_result, copy=False) | ||
MinRK
|
r3548 | self._notification_handlers = dict( | ||
registration_notification = self._register_engine, | ||||
unregistration_notification = self._unregister_engine | ||||
) | ||||
self.notifier_stream.on_recv(self.dispatch_notification) | ||||
MinRK
|
r3612 | self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz | ||
MinRK
|
r3611 | self.auditor.start() | ||
MinRK
|
r4008 | self.log.info("Scheduler started [%s]"%self.scheme_name) | ||
MinRK
|
r3548 | |||
def resume_receiving(self): | ||||
MinRK
|
r3556 | """Resume accepting jobs.""" | ||
MinRK
|
r3548 | self.client_stream.on_recv(self.dispatch_submission, copy=False) | ||
def stop_receiving(self): | ||||
MinRK
|
r3556 | """Stop accepting jobs while there are no engines. | ||
Leave them in the ZMQ queue.""" | ||||
MinRK
|
r3548 | self.client_stream.on_recv(None) | ||
#----------------------------------------------------------------------- | ||||
# [Un]Registration Handling | ||||
#----------------------------------------------------------------------- | ||||
def dispatch_notification(self, msg): | ||||
"""dispatch register/unregister events.""" | ||||
MinRK
|
r4000 | try: | ||
idents,msg = self.session.feed_identities(msg) | ||||
except ValueError: | ||||
MinRK
|
r4155 | self.log.warn("task::Invalid Message: %r",msg) | ||
MinRK
|
r4000 | return | ||
try: | ||||
msg = self.session.unpack_message(msg) | ||||
except ValueError: | ||||
self.log.warn("task::Unauthorized message from: %r"%idents) | ||||
return | ||||
MinRK
|
r3548 | msg_type = msg['msg_type'] | ||
MinRK
|
r4000 | |||
MinRK
|
r3548 | handler = self._notification_handlers.get(msg_type, None) | ||
if handler is None: | ||||
MinRK
|
r4000 | self.log.error("Unhandled message type: %r"%msg_type) | ||
MinRK
|
r3548 | else: | ||
try: | ||||
MinRK
|
r4161 | handler(asbytes(msg['content']['queue'])) | ||
MinRK
|
r4155 | except Exception: | ||
self.log.error("task::Invalid notification msg: %r",msg) | ||||
MinRK
|
r3603 | |||
MinRK
|
r3548 | def _register_engine(self, uid): | ||
MinRK
|
r3556 | """New engine with ident `uid` became available.""" | ||
MinRK
|
r3548 | # head of the line: | ||
self.targets.insert(0,uid) | ||||
self.loads.insert(0,0) | ||||
MinRK
|
r4155 | |||
MinRK
|
r3548 | # initialize sets | ||
self.completed[uid] = set() | ||||
MinRK
|
r3607 | self.failed[uid] = set() | ||
MinRK
|
r3548 | self.pending[uid] = {} | ||
if len(self.targets) == 1: | ||||
self.resume_receiving() | ||||
MinRK
|
r3873 | # rescan the graph: | ||
self.update_graph(None) | ||||
MinRK
|
r3548 | |||
def _unregister_engine(self, uid): | ||||
MinRK
|
r3556 | """Existing engine with ident `uid` became unavailable.""" | ||
MinRK
|
r3548 | if len(self.targets) == 1: | ||
MinRK
|
r3556 | # this was our only engine | ||
MinRK
|
r3548 | self.stop_receiving() | ||
MinRK
|
r3556 | |||
# handle any potentially finished tasks: | ||||
MinRK
|
r3548 | self.engine_stream.flush() | ||
MinRK
|
r3873 | # don't pop destinations, because they might be used later | ||
MinRK
|
r3607 | # map(self.destinations.pop, self.completed.pop(uid)) | ||
# map(self.destinations.pop, self.failed.pop(uid)) | ||||
MinRK
|
r3873 | |||
# prevent this engine from receiving work | ||||
MinRK
|
r3548 | idx = self.targets.index(uid) | ||
self.targets.pop(idx) | ||||
self.loads.pop(idx) | ||||
MinRK
|
r3612 | # wait 5 seconds before cleaning up pending jobs, since the results might | ||
# still be incoming | ||||
if self.pending[uid]: | ||||
dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop) | ||||
dc.start() | ||||
MinRK
|
r3873 | else: | ||
self.completed.pop(uid) | ||||
self.failed.pop(uid) | ||||
MinRK
|
r3548 | |||
MinRK
|
r3612 | def handle_stranded_tasks(self, engine): | ||
MinRK
|
r3556 | """Deal with jobs resident in an engine that died.""" | ||
MinRK
|
r3873 | lost = self.pending[engine] | ||
for msg_id in lost.keys(): | ||||
if msg_id not in self.pending[engine]: | ||||
# prevent double-handling of messages | ||||
continue | ||||
raw_msg = lost[msg_id][0] | ||||
MinRK
|
r3612 | idents,msg = self.session.feed_identities(raw_msg, copy=False) | ||
MinRK
|
r4000 | parent = self.session.unpack(msg[1].bytes) | ||
MinRK
|
r3873 | idents = [engine, idents[0]] | ||
# build fake error reply | ||||
MinRK
|
r3612 | try: | ||
raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id)) | ||||
except: | ||||
MinRK
|
r3644 | content = error.wrap_exception() | ||
MinRK
|
r3873 | msg = self.session.msg('apply_reply', content, parent=parent, subheader={'status':'error'}) | ||
raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents)) | ||||
# and dispatch it | ||||
self.dispatch_result(raw_reply) | ||||
# finally scrub completed/failed lists | ||||
self.completed.pop(engine) | ||||
self.failed.pop(engine) | ||||
MinRK
|
r3548 | |||
#----------------------------------------------------------------------- | ||||
# Job Submission | ||||
#----------------------------------------------------------------------- | ||||
def dispatch_submission(self, raw_msg): | ||||
MinRK
|
r3556 | """Dispatch job submission to appropriate handlers.""" | ||
MinRK
|
r3548 | # ensure targets up to date: | ||
self.notifier_stream.flush() | ||||
try: | ||||
idents, msg = self.session.feed_identities(raw_msg, copy=False) | ||||
MinRK
|
r3624 | msg = self.session.unpack_message(msg, content=False, copy=False) | ||
MinRK
|
r3781 | except Exception: | ||
MinRK
|
r3996 | self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True) | ||
MinRK
|
r3548 | return | ||
MinRK
|
r4000 | |||
MinRK
|
r3563 | # send to monitor | ||
MinRK
|
r4155 | self.mon_stream.send_multipart([b'intask']+raw_msg, copy=False) | ||
MinRK
|
r3563 | |||
MinRK
|
r3548 | header = msg['header'] | ||
msg_id = header['msg_id'] | ||||
MinRK
|
r3624 | self.all_ids.add(msg_id) | ||
MinRK
|
r3556 | |||
MinRK
|
r4155 | # get targets as a set of bytes objects | ||
# from a list of unicode objects | ||||
targets = header.get('targets', []) | ||||
MinRK
|
r4161 | targets = map(asbytes, targets) | ||
MinRK
|
r4155 | targets = set(targets) | ||
MinRK
|
r3873 | retries = header.get('retries', 0) | ||
self.retries[msg_id] = retries | ||||
MinRK
|
r3625 | |||
MinRK
|
r3556 | # time dependencies | ||
MinRK
|
r4141 | after = header.get('after', None) | ||
if after: | ||||
after = Dependency(after) | ||||
if after.all: | ||||
if after.success: | ||||
MinRK
|
r4154 | after = Dependency(after.difference(self.all_completed), | ||
success=after.success, | ||||
failure=after.failure, | ||||
all=after.all, | ||||
) | ||||
MinRK
|
r4141 | if after.failure: | ||
MinRK
|
r4154 | after = Dependency(after.difference(self.all_failed), | ||
success=after.success, | ||||
failure=after.failure, | ||||
all=after.all, | ||||
) | ||||
MinRK
|
r4141 | if after.check(self.all_completed, self.all_failed): | ||
# recast as empty set, if `after` already met, | ||||
# to prevent unnecessary set comparisons | ||||
after = MET | ||||
else: | ||||
MinRK
|
r3607 | after = MET | ||
MinRK
|
r3548 | |||
MinRK
|
r3556 | # location dependencies | ||
MinRK
|
r3548 | follow = Dependency(header.get('follow', [])) | ||
MinRK
|
r3624 | |||
MinRK
|
r3625 | # turn timeouts into datetime objects: | ||
timeout = header.get('timeout', None) | ||||
if timeout: | ||||
timeout = datetime.now() + timedelta(0,timeout,0) | ||||
args = [raw_msg, targets, after, follow, timeout] | ||||
# validate and reduce dependencies: | ||||
MinRK
|
r3624 | for dep in after,follow: | ||
MinRK
|
r4141 | if not dep: # empty dependency | ||
continue | ||||
MinRK
|
r3624 | # check valid: | ||
if msg_id in dep or dep.difference(self.all_ids): | ||||
MinRK
|
r3625 | self.depending[msg_id] = args | ||
MinRK
|
r3624 | return self.fail_unreachable(msg_id, error.InvalidDependency) | ||
# check if unreachable: | ||||
MinRK
|
r3664 | if dep.unreachable(self.all_completed, self.all_failed): | ||
MinRK
|
r3625 | self.depending[msg_id] = args | ||
MinRK
|
r3624 | return self.fail_unreachable(msg_id) | ||
MinRK
|
r3607 | |||
if after.check(self.all_completed, self.all_failed): | ||||
MinRK
|
r3548 | # time deps already met, try to run | ||
MinRK
|
r3625 | if not self.maybe_run(msg_id, *args): | ||
MinRK
|
r3548 | # can't run yet | ||
MinRK
|
r3873 | if msg_id not in self.all_failed: | ||
# could have failed as unreachable | ||||
self.save_unmet(msg_id, *args) | ||||
MinRK
|
r3548 | else: | ||
MinRK
|
r3625 | self.save_unmet(msg_id, *args) | ||
MinRK
|
r3556 | |||
MinRK
|
r3611 | def audit_timeouts(self): | ||
"""Audit all waiting tasks for expired timeouts.""" | ||||
now = datetime.now() | ||||
for msg_id in self.depending.keys(): | ||||
# must recheck, in case one failure cascaded to another: | ||||
if msg_id in self.depending: | ||||
MinRK
|
r3625 | raw,after,targets,follow,timeout = self.depending[msg_id] | ||
MinRK
|
r3611 | if timeout and timeout < now: | ||
MinRK
|
r3873 | self.fail_unreachable(msg_id, error.TaskTimeout) | ||
MinRK
|
r3611 | |||
MinRK
|
r3624 | def fail_unreachable(self, msg_id, why=error.ImpossibleDependency): | ||
MinRK
|
r3625 | """a task has become unreachable, send a reply with an ImpossibleDependency | ||
error.""" | ||||
MinRK
|
r3607 | if msg_id not in self.depending: | ||
MinRK
|
r4140 | self.log.error("msg %r already failed!", msg_id) | ||
MinRK
|
r3607 | return | ||
MinRK
|
r3625 | raw_msg,targets,after,follow,timeout = self.depending.pop(msg_id) | ||
MinRK
|
r3607 | for mid in follow.union(after): | ||
MinRK
|
r3624 | if mid in self.graph: | ||
self.graph[mid].remove(msg_id) | ||||
MinRK
|
r3607 | |||
MinRK
|
r3611 | # FIXME: unpacking a message I've already unpacked, but didn't save: | ||
MinRK
|
r3607 | idents,msg = self.session.feed_identities(raw_msg, copy=False) | ||
MinRK
|
r4000 | header = self.session.unpack(msg[1].bytes) | ||
MinRK
|
r3607 | |||
try: | ||||
MinRK
|
r3624 | raise why() | ||
MinRK
|
r3607 | except: | ||
MinRK
|
r3644 | content = error.wrap_exception() | ||
MinRK
|
r3607 | |||
self.all_done.add(msg_id) | ||||
self.all_failed.add(msg_id) | ||||
msg = self.session.send(self.client_stream, 'apply_reply', content, | ||||
parent=header, ident=idents) | ||||
MinRK
|
r4155 | self.session.send(self.mon_stream, msg, ident=[b'outtask']+idents) | ||
MinRK
|
r3607 | |||
MinRK
|
r3624 | self.update_graph(msg_id, success=False) | ||
MinRK
|
r3607 | |||
MinRK
|
r3625 | def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout): | ||
MinRK
|
r3548 | """check location dependencies, and run if they are met.""" | ||
MinRK
|
r3625 | blacklist = self.blacklist.setdefault(msg_id, set()) | ||
MinRK
|
r3781 | if follow or targets or blacklist or self.hwm: | ||
MinRK
|
r3625 | # we need a can_run filter | ||
MinRK
|
r3548 | def can_run(idx): | ||
MinRK
|
r3781 | # check hwm | ||
MinRK
|
r3873 | if self.hwm and self.loads[idx] == self.hwm: | ||
MinRK
|
r3625 | return False | ||
MinRK
|
r3781 | target = self.targets[idx] | ||
MinRK
|
r3625 | # check blacklist | ||
if target in blacklist: | ||||
return False | ||||
MinRK
|
r3781 | # check targets | ||
if targets and target not in targets: | ||||
return False | ||||
MinRK
|
r3625 | # check follow | ||
return follow.check(self.completed[target], self.failed[target]) | ||||
MinRK
|
r3548 | |||
indices = filter(can_run, range(len(self.targets))) | ||||
MinRK
|
r3873 | |||
MinRK
|
r3548 | if not indices: | ||
MinRK
|
r3625 | # couldn't run | ||
MinRK
|
r3624 | if follow.all: | ||
MinRK
|
r3625 | # check follow for impossibility | ||
MinRK
|
r3607 | dests = set() | ||
MinRK
|
r3664 | relevant = set() | ||
if follow.success: | ||||
relevant = self.all_completed | ||||
if follow.failure: | ||||
relevant = relevant.union(self.all_failed) | ||||
MinRK
|
r3607 | for m in follow.intersection(relevant): | ||
dests.add(self.destinations[m]) | ||||
if len(dests) > 1: | ||||
MinRK
|
r3873 | self.depending[msg_id] = (raw_msg, targets, after, follow, timeout) | ||
MinRK
|
r3607 | self.fail_unreachable(msg_id) | ||
MinRK
|
r3625 | return False | ||
if targets: | ||||
# check blacklist+targets for impossibility | ||||
targets.difference_update(blacklist) | ||||
if not targets or not targets.intersection(self.targets): | ||||
MinRK
|
r3873 | self.depending[msg_id] = (raw_msg, targets, after, follow, timeout) | ||
MinRK
|
r3625 | self.fail_unreachable(msg_id) | ||
return False | ||||
MinRK
|
r3548 | return False | ||
else: | ||||
indices = None | ||||
MinRK
|
r3625 | self.submit_task(msg_id, raw_msg, targets, follow, timeout, indices) | ||
MinRK
|
r3548 | return True | ||
MinRK
|
r3625 | def save_unmet(self, msg_id, raw_msg, targets, after, follow, timeout): | ||
MinRK
|
r3548 | """Save a message for later submission when its dependencies are met.""" | ||
MinRK
|
r3625 | self.depending[msg_id] = [raw_msg,targets,after,follow,timeout] | ||
MinRK
|
r3607 | # track the ids in follow or after, but not those already finished | ||
MinRK
|
r3548 | for dep_id in after.union(follow).difference(self.all_done): | ||
MinRK
|
r3624 | if dep_id not in self.graph: | ||
self.graph[dep_id] = set() | ||||
self.graph[dep_id].add(msg_id) | ||||
MinRK
|
r3551 | |||
MinRK
|
r3625 | def submit_task(self, msg_id, raw_msg, targets, follow, timeout, indices=None): | ||
MinRK
|
r3556 | """Submit a task to any of a subset of our targets.""" | ||
MinRK
|
r3548 | if indices: | ||
loads = [self.loads[i] for i in indices] | ||||
else: | ||||
loads = self.loads | ||||
idx = self.scheme(loads) | ||||
if indices: | ||||
idx = indices[idx] | ||||
target = self.targets[idx] | ||||
MinRK
|
r3563 | # print (target, map(str, msg[:3])) | ||
MinRK
|
r3781 | # send job to the engine | ||
MinRK
|
r3551 | self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False) | ||
MinRK
|
r3612 | self.engine_stream.send_multipart(raw_msg, copy=False) | ||
MinRK
|
r3781 | # update load | ||
MinRK
|
r3548 | self.add_job(idx) | ||
MinRK
|
r3625 | self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout) | ||
MinRK
|
r3781 | # notify Hub | ||
MinRK
|
r4160 | content = dict(msg_id=msg_id, engine_id=target.decode('ascii')) | ||
MinRK
|
r3603 | self.session.send(self.mon_stream, 'task_destination', content=content, | ||
MinRK
|
r4155 | ident=[b'tracktask',self.ident]) | ||
MinRK
|
r3781 | |||
MinRK
|
r3548 | |||
#----------------------------------------------------------------------- | ||||
# Result Handling | ||||
#----------------------------------------------------------------------- | ||||
def dispatch_result(self, raw_msg): | ||||
MinRK
|
r3625 | """dispatch method for result replies""" | ||
MinRK
|
r3548 | try: | ||
idents,msg = self.session.feed_identities(raw_msg, copy=False) | ||||
MinRK
|
r3624 | msg = self.session.unpack_message(msg, content=False, copy=False) | ||
MinRK
|
r3781 | engine = idents[0] | ||
MinRK
|
r3873 | try: | ||
idx = self.targets.index(engine) | ||||
except ValueError: | ||||
pass # skip load-update for dead engines | ||||
else: | ||||
self.finish_job(idx) | ||||
MinRK
|
r3781 | except Exception: | ||
MinRK
|
r4140 | self.log.error("task::Invaid result: %r", raw_msg, exc_info=True) | ||
MinRK
|
r3548 | return | ||
MinRK
|
r3781 | |||
MinRK
|
r3548 | header = msg['header'] | ||
MinRK
|
r3873 | parent = msg['parent_header'] | ||
MinRK
|
r3548 | if header.get('dependencies_met', True): | ||
MinRK
|
r3607 | success = (header['status'] == 'ok') | ||
MinRK
|
r3873 | msg_id = parent['msg_id'] | ||
retries = self.retries[msg_id] | ||||
if not success and retries > 0: | ||||
# failed | ||||
self.retries[msg_id] = retries - 1 | ||||
self.handle_unmet_dependency(idents, parent) | ||||
else: | ||||
del self.retries[msg_id] | ||||
# relay to client and update graph | ||||
self.handle_result(idents, parent, raw_msg, success) | ||||
# send to Hub monitor | ||||
MinRK
|
r4155 | self.mon_stream.send_multipart([b'outtask']+raw_msg, copy=False) | ||
MinRK
|
r3548 | else: | ||
MinRK
|
r3873 | self.handle_unmet_dependency(idents, parent) | ||
MinRK
|
r3548 | |||
MinRK
|
r3607 | def handle_result(self, idents, parent, raw_msg, success=True): | ||
MinRK
|
r3625 | """handle a real task result, either success or failure""" | ||
MinRK
|
r3548 | # first, relay result to client | ||
engine = idents[0] | ||||
client = idents[1] | ||||
# swap_ids for XREP-XREP mirror | ||||
raw_msg[:2] = [client,engine] | ||||
MinRK
|
r3563 | # print (map(str, raw_msg[:4])) | ||
MinRK
|
r3548 | self.client_stream.send_multipart(raw_msg, copy=False) | ||
# now, update our data structures | ||||
msg_id = parent['msg_id'] | ||||
MinRK
|
r3612 | self.blacklist.pop(msg_id, None) | ||
MinRK
|
r3548 | self.pending[engine].pop(msg_id) | ||
MinRK
|
r3607 | if success: | ||
self.completed[engine].add(msg_id) | ||||
self.all_completed.add(msg_id) | ||||
else: | ||||
self.failed[engine].add(msg_id) | ||||
self.all_failed.add(msg_id) | ||||
MinRK
|
r3565 | self.all_done.add(msg_id) | ||
MinRK
|
r3607 | self.destinations[msg_id] = engine | ||
MinRK
|
r3548 | |||
MinRK
|
r3624 | self.update_graph(msg_id, success) | ||
MinRK
|
r3548 | |||
def handle_unmet_dependency(self, idents, parent): | ||||
MinRK
|
r3625 | """handle an unmet dependency""" | ||
MinRK
|
r3548 | engine = idents[0] | ||
msg_id = parent['msg_id'] | ||||
MinRK
|
r3625 | |||
MinRK
|
r3548 | if msg_id not in self.blacklist: | ||
self.blacklist[msg_id] = set() | ||||
self.blacklist[msg_id].add(engine) | ||||
MinRK
|
r3625 | |||
args = self.pending[engine].pop(msg_id) | ||||
raw,targets,after,follow,timeout = args | ||||
if self.blacklist[msg_id] == targets: | ||||
self.depending[msg_id] = args | ||||
MinRK
|
r3781 | self.fail_unreachable(msg_id) | ||
MinRK
|
r3625 | elif not self.maybe_run(msg_id, *args): | ||
MinRK
|
r3873 | # resubmit failed | ||
if msg_id not in self.all_failed: | ||||
# put it back in our dependency tree | ||||
self.save_unmet(msg_id, *args) | ||||
MinRK
|
r3625 | |||
MinRK
|
r3781 | if self.hwm: | ||
MinRK
|
r3873 | try: | ||
idx = self.targets.index(engine) | ||||
except ValueError: | ||||
pass # skip load-update for dead engines | ||||
else: | ||||
if self.loads[idx] == self.hwm-1: | ||||
self.update_graph(None) | ||||
MinRK
|
r3781 | |||
MinRK
|
r3607 | |||
MinRK
|
r3781 | def update_graph(self, dep_id=None, success=True): | ||
MinRK
|
r3548 | """dep_id just finished. Update our dependency | ||
MinRK
|
r3781 | graph and submit any jobs that just became runable. | ||
MinRK
|
r3873 | Called with dep_id=None to update entire graph for hwm, but without finishing | ||
MinRK
|
r3781 | a task. | ||
""" | ||||
MinRK
|
r3607 | # print ("\n\n***********") | ||
# pprint (dep_id) | ||||
MinRK
|
r3624 | # pprint (self.graph) | ||
MinRK
|
r3607 | # pprint (self.depending) | ||
# pprint (self.all_completed) | ||||
# pprint (self.all_failed) | ||||
# print ("\n\n***********\n\n") | ||||
MinRK
|
r3781 | # update any jobs that depended on the dependency | ||
jobs = self.graph.pop(dep_id, []) | ||||
MinRK
|
r3873 | |||
# recheck *all* jobs if | ||||
# a) we have HWM and an engine just become no longer full | ||||
# or b) dep_id was given as None | ||||
if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]): | ||||
MinRK
|
r3781 | jobs = self.depending.keys() | ||
MinRK
|
r3607 | |||
for msg_id in jobs: | ||||
MinRK
|
r3625 | raw_msg, targets, after, follow, timeout = self.depending[msg_id] | ||
MinRK
|
r3607 | |||
MinRK
|
r4007 | if after.unreachable(self.all_completed, self.all_failed)\ | ||
or follow.unreachable(self.all_completed, self.all_failed): | ||||
MinRK
|
r3607 | self.fail_unreachable(msg_id) | ||
elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run | ||||
MinRK
|
r3625 | if self.maybe_run(msg_id, raw_msg, targets, MET, follow, timeout): | ||
MinRK
|
r3607 | |||
self.depending.pop(msg_id) | ||||
for mid in follow.union(after): | ||||
MinRK
|
r3624 | if mid in self.graph: | ||
self.graph[mid].remove(msg_id) | ||||
MinRK
|
r3548 | |||
#---------------------------------------------------------------------- | ||||
# methods to be overridden by subclasses | ||||
#---------------------------------------------------------------------- | ||||
def add_job(self, idx): | ||||
"""Called after self.targets[idx] just got the job with header. | ||||
Override with subclasses. The default ordering is simple LRU. | ||||
The default loads are the number of outstanding jobs.""" | ||||
self.loads[idx] += 1 | ||||
for lis in (self.targets, self.loads): | ||||
lis.append(lis.pop(idx)) | ||||
def finish_job(self, idx): | ||||
"""Called after self.targets[idx] just finished a job. | ||||
Override with subclasses.""" | ||||
self.loads[idx] -= 1 | ||||
MinRK
|
r4007 | def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None, | ||
logname='root', log_url=None, loglevel=logging.DEBUG, | ||||
MinRK
|
r4092 | identity=b'task', in_thread=False): | ||
ZMQStream = zmqstream.ZMQStream | ||||
MinRK
|
r3548 | |||
MinRK
|
r3773 | if config: | ||
# unwrap dict back into Config | ||||
config = Config(config) | ||||
MinRK
|
r4092 | if in_thread: | ||
# use instance() to get the same Context/Loop as our parent | ||||
ctx = zmq.Context.instance() | ||||
loop = ioloop.IOLoop.instance() | ||||
else: | ||||
# in a process, don't use instance() | ||||
# for safety with multiprocessing | ||||
ctx = zmq.Context() | ||||
loop = ioloop.IOLoop() | ||||
MinRK
|
r3548 | ins = ZMQStream(ctx.socket(zmq.XREP),loop) | ||
MinRK
|
r3658 | ins.setsockopt(zmq.IDENTITY, identity) | ||
MinRK
|
r3548 | ins.bind(in_addr) | ||
MinRK
|
r3658 | |||
MinRK
|
r3548 | outs = ZMQStream(ctx.socket(zmq.XREP),loop) | ||
MinRK
|
r3658 | outs.setsockopt(zmq.IDENTITY, identity) | ||
MinRK
|
r3548 | outs.bind(out_addr) | ||
MinRK
|
r4092 | mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop) | ||
MinRK
|
r3548 | mons.connect(mon_addr) | ||
MinRK
|
r4092 | nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop) | ||
Thomas Kluyver
|
r4110 | nots.setsockopt(zmq.SUBSCRIBE, b'') | ||
MinRK
|
r3548 | nots.connect(not_addr) | ||
MinRK
|
r4092 | # setup logging. | ||
if in_thread: | ||||
log = Application.instance().log | ||||
MinRK
|
r3603 | else: | ||
MinRK
|
r4092 | if log_url: | ||
log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel) | ||||
else: | ||||
log = local_logger(logname, loglevel) | ||||
MinRK
|
r3603 | |||
scheduler = TaskScheduler(client_stream=ins, engine_stream=outs, | ||||
MinRK
|
r3611 | mon_stream=mons, notifier_stream=nots, | ||
MinRK
|
r4007 | loop=loop, log=log, | ||
MinRK
|
r3622 | config=config) | ||
MinRK
|
r3611 | scheduler.start() | ||
MinRK
|
r4092 | if not in_thread: | ||
try: | ||||
loop.start() | ||||
except KeyboardInterrupt: | ||||
print ("interrupted, exiting...", file=sys.__stderr__) | ||||
MinRK
|
r3548 | |||