scheduler.py
860 lines
| 30.6 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3556 | """The Python scheduler for rich scheduling. | ||
The Pure ZMQ scheduler does not allow routing schemes other than LRU, | ||||
nor does it check msg_id DAG dependencies. For those, a slightly slower | ||||
Python Scheduler exists. | ||||
MinRK
|
r4018 | |||
Authors: | ||||
* Min RK | ||||
MinRK
|
r3556 | """ | ||
MinRK
|
r3660 | #----------------------------------------------------------------------------- | ||
# Copyright (C) 2010-2011 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3556 | |||
MinRK
|
r3548 | #---------------------------------------------------------------------- | ||
# Imports | ||||
#---------------------------------------------------------------------- | ||||
MinRK
|
r3603 | import logging | ||
MinRK
|
r3631 | import sys | ||
MinRK
|
r6178 | import time | ||
MinRK
|
r3631 | |||
MinRK
|
r10079 | from collections import deque | ||
from datetime import datetime | ||||
MinRK
|
r3611 | from random import randint, random | ||
MinRK
|
r3603 | from types import FunctionType | ||
MinRK
|
r3631 | |||
MinRK
|
r3548 | try: | ||
import numpy | ||||
except ImportError: | ||||
numpy = None | ||||
import zmq | ||||
from zmq.eventloop import ioloop, zmqstream | ||||
# local imports | ||||
MinRK
|
r3603 | from IPython.external.decorator import decorator | ||
MinRK
|
r4092 | from IPython.config.application import Application | ||
MinRK
|
r3773 | from IPython.config.loader import Config | ||
MinRK
|
r5344 | from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes | ||
MinRK
|
r6813 | from IPython.utils.py3compat import cast_bytes | ||
MinRK
|
r3603 | |||
MinRK
|
r6324 | from IPython.parallel import error, util | ||
MinRK
|
r3673 | from IPython.parallel.factory import SessionFactory | ||
MinRK
|
r6813 | from IPython.parallel.util import connect_logger, local_logger | ||
MinRK
|
r3548 | |||
MinRK
|
r3673 | from .dependency import Dependency | ||
MinRK
|
r3603 | |||
MinRK
|
r3548 | @decorator | ||
def logged(f,self,*args,**kwargs): | ||||
MinRK
|
r3602 | # print ("#--------------------") | ||
MinRK
|
r4140 | self.log.debug("scheduler::%s(*%s,**%s)", f.func_name, args, kwargs) | ||
MinRK
|
r3602 | # print ("#--") | ||
MinRK
|
r3548 | return f(self,*args, **kwargs) | ||
#---------------------------------------------------------------------- | ||||
# Chooser functions | ||||
#---------------------------------------------------------------------- | ||||
def plainrandom(loads): | ||||
"""Plain random pick.""" | ||||
n = len(loads) | ||||
return randint(0,n-1) | ||||
def lru(loads): | ||||
"""Always pick the front of the line. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3556 | The content of `loads` is ignored. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3548 | Assumes LRU ordering of loads, with oldest first. | ||
""" | ||||
return 0 | ||||
def twobin(loads): | ||||
"""Pick two at random, use the LRU of the two. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3548 | The content of loads is ignored. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3548 | Assumes LRU ordering of loads, with oldest first. | ||
""" | ||||
n = len(loads) | ||||
a = randint(0,n-1) | ||||
b = randint(0,n-1) | ||||
return min(a,b) | ||||
def weighted(loads): | ||||
"""Pick two at random using inverse load as weight. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3548 | Return the less loaded of the two. | ||
""" | ||||
# weight 0 a million times more than 1: | ||||
weights = 1./(1e-6+numpy.array(loads)) | ||||
sums = weights.cumsum() | ||||
t = sums[-1] | ||||
x = random()*t | ||||
y = random()*t | ||||
idx = 0 | ||||
idy = 0 | ||||
while sums[idx] < x: | ||||
idx += 1 | ||||
while sums[idy] < y: | ||||
idy += 1 | ||||
if weights[idy] > weights[idx]: | ||||
return idy | ||||
else: | ||||
return idx | ||||
def leastload(loads): | ||||
"""Always choose the lowest load. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3548 | If the lowest load occurs more than once, the first | ||
occurance will be used. If loads has LRU ordering, this means | ||||
the LRU of those with the lowest load is chosen. | ||||
""" | ||||
return loads.index(min(loads)) | ||||
#--------------------------------------------------------------------- | ||||
# Classes | ||||
#--------------------------------------------------------------------- | ||||
MinRK
|
r6178 | |||
MinRK
|
r3607 | # store empty default dependency: | ||
MET = Dependency([]) | ||||
MinRK
|
r6178 | |||
class Job(object): | ||||
"""Simple container for a job""" | ||||
MinRK
|
r7957 | def __init__(self, msg_id, raw_msg, idents, msg, header, metadata, | ||
targets, after, follow, timeout): | ||||
MinRK
|
r6178 | self.msg_id = msg_id | ||
self.raw_msg = raw_msg | ||||
self.idents = idents | ||||
self.msg = msg | ||||
self.header = header | ||||
MinRK
|
r7957 | self.metadata = metadata | ||
MinRK
|
r6178 | self.targets = targets | ||
self.after = after | ||||
self.follow = follow | ||||
self.timeout = timeout | ||||
MinRK
|
r12785 | self.removed = False # used for lazy-delete from sorted queue | ||
MinRK
|
r6178 | self.timestamp = time.time() | ||
MinRK
|
r12785 | self.timeout_id = 0 | ||
MinRK
|
r6178 | self.blacklist = set() | ||
MinRK
|
r10076 | def __lt__(self, other): | ||
return self.timestamp < other.timestamp | ||||
def __cmp__(self, other): | ||||
return cmp(self.timestamp, other.timestamp) | ||||
MinRK
|
r6178 | @property | ||
def dependents(self): | ||||
return self.follow.union(self.after) | ||||
MinRK
|
r12785 | |||
MinRK
|
r3611 | class TaskScheduler(SessionFactory): | ||
MinRK
|
r3551 | """Python TaskScheduler object. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3548 | This is the simplest object that supports msg_id based | ||
DAG dependencies. *Only* task msg_ids are checked, not | ||||
msg_ids of jobs submitted via the MUX queue. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3548 | """ | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r5918 | hwm = Integer(1, config=True, | ||
MinRK
|
r3985 | help="""specify the High Water Mark (HWM) for the downstream | ||
socket in the Task scheduler. This is the maximum number | ||||
MinRK
|
r5918 | of allowed outstanding tasks on each engine. | ||
The default (1) means that only one task can be outstanding on each | ||||
engine. Setting TaskScheduler.hwm=0 means there is no limit, and the | ||||
engines continue to be assigned tasks while they are working, | ||||
effectively hiding network latency behind computation, but can result | ||||
in an imbalance of work when submitting many heterogenous tasks all at | ||||
once. Any positive value greater than one is a compromise between the | ||||
two. | ||||
""" | ||||
MinRK
|
r3985 | ) | ||
scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'), | ||||
MinRK
|
r5918 | 'leastload', config=True, allow_none=False, | ||
MinRK
|
r3985 | help="""select the task scheduler scheme [default: Python LRU] | ||
Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'""" | ||||
) | ||||
def _scheme_name_changed(self, old, new): | ||||
self.log.debug("Using scheme %r"%new) | ||||
self.scheme = globals()[new] | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3604 | # input arguments: | ||
MinRK
|
r3989 | scheme = Instance(FunctionType) # function for determining the destination | ||
def _scheme_default(self): | ||||
return leastload | ||||
MinRK
|
r3603 | client_stream = Instance(zmqstream.ZMQStream) # client-facing stream | ||
engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream | ||||
notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream | ||||
mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream | ||||
MinRK
|
r7891 | query_stream = Instance(zmqstream.ZMQStream) # hub-facing DEALER stream | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3603 | # internals: | ||
MinRK
|
r10079 | queue = Instance(deque) # sorted list of Jobs | ||
def _queue_default(self): | ||||
return deque() | ||||
MinRK
|
r10076 | queue_map = Dict() # dict by msg_id of Jobs (for O(1) access to the Queue) | ||
MinRK
|
r3624 | graph = Dict() # dict by msg_id of [ msg_ids that depend on key ] | ||
MinRK
|
r3873 | retries = Dict() # dict by msg_id of retries remaining (non-neg ints) | ||
MinRK
|
r3781 | # waiting = List() # list of msg_ids ready to run, but haven't due to HWM | ||
MinRK
|
r3604 | pending = Dict() # dict by engine_uuid of submitted tasks | ||
completed = Dict() # dict by engine_uuid of completed tasks | ||||
MinRK
|
r3607 | failed = Dict() # dict by engine_uuid of failed tasks | ||
destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed) | ||||
MinRK
|
r3604 | clients = Dict() # dict by msg_id for who submitted the task | ||
targets = List() # list of target IDENTs | ||||
loads = List() # list of engine loads | ||||
MinRK
|
r3781 | # full = Set() # set of IDENTs that have HWM outstanding tasks | ||
MinRK
|
r3607 | all_completed = Set() # set of all completed tasks | ||
all_failed = Set() # set of all failed tasks | ||||
all_done = Set() # set of all finished tasks=union(completed,failed) | ||||
MinRK
|
r3624 | all_ids = Set() # set of all submitted task IDs | ||
MinRK
|
r6178 | |||
MinRK
|
r4155 | ident = CBytes() # ZMQ identity. This should just be self.session.session | ||
# but ensure Bytes | ||||
def _ident_default(self): | ||||
MinRK
|
r4770 | return self.session.bsession | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3611 | def start(self): | ||
MinRK
|
r7891 | self.query_stream.on_recv(self.dispatch_query_reply) | ||
self.session.send(self.query_stream, "connection_request", {}) | ||||
MinRK
|
r3603 | self.engine_stream.on_recv(self.dispatch_result, copy=False) | ||
MinRK
|
r6092 | self.client_stream.on_recv(self.dispatch_submission, copy=False) | ||
MinRK
|
r3548 | self._notification_handlers = dict( | ||
registration_notification = self._register_engine, | ||||
unregistration_notification = self._unregister_engine | ||||
) | ||||
self.notifier_stream.on_recv(self.dispatch_notification) | ||||
MinRK
|
r10078 | self.log.info("Scheduler started [%s]" % self.scheme_name) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3548 | def resume_receiving(self): | ||
MinRK
|
r3556 | """Resume accepting jobs.""" | ||
MinRK
|
r3548 | self.client_stream.on_recv(self.dispatch_submission, copy=False) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3548 | def stop_receiving(self): | ||
MinRK
|
r3556 | """Stop accepting jobs while there are no engines. | ||
Leave them in the ZMQ queue.""" | ||||
MinRK
|
r3548 | self.client_stream.on_recv(None) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3548 | #----------------------------------------------------------------------- | ||
# [Un]Registration Handling | ||||
#----------------------------------------------------------------------- | ||||
MinRK
|
r7891 | |||
def dispatch_query_reply(self, msg): | ||||
"""handle reply to our initial connection request""" | ||||
try: | ||||
idents,msg = self.session.feed_identities(msg) | ||||
except ValueError: | ||||
self.log.warn("task::Invalid Message: %r",msg) | ||||
return | ||||
try: | ||||
msg = self.session.unserialize(msg) | ||||
except ValueError: | ||||
self.log.warn("task::Unauthorized message from: %r"%idents) | ||||
return | ||||
content = msg['content'] | ||||
for uuid in content.get('engines', {}).values(): | ||||
MinRK
|
r7892 | self._register_engine(cast_bytes(uuid)) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r6324 | |||
@util.log_errors | ||||
MinRK
|
r3548 | def dispatch_notification(self, msg): | ||
"""dispatch register/unregister events.""" | ||||
MinRK
|
r4000 | try: | ||
idents,msg = self.session.feed_identities(msg) | ||||
except ValueError: | ||||
MinRK
|
r4155 | self.log.warn("task::Invalid Message: %r",msg) | ||
MinRK
|
r4000 | return | ||
try: | ||||
Brian E. Granger
|
r4231 | msg = self.session.unserialize(msg) | ||
MinRK
|
r4000 | except ValueError: | ||
self.log.warn("task::Unauthorized message from: %r"%idents) | ||||
return | ||||
Bernardo B. Marques
|
r4872 | |||
Brian E. Granger
|
r4230 | msg_type = msg['header']['msg_type'] | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3548 | handler = self._notification_handlers.get(msg_type, None) | ||
if handler is None: | ||||
MinRK
|
r4000 | self.log.error("Unhandled message type: %r"%msg_type) | ||
MinRK
|
r3548 | else: | ||
try: | ||||
MinRK
|
r7891 | handler(cast_bytes(msg['content']['uuid'])) | ||
MinRK
|
r4155 | except Exception: | ||
MinRK
|
r5695 | self.log.error("task::Invalid notification msg: %r", msg, exc_info=True) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3548 | def _register_engine(self, uid): | ||
MinRK
|
r3556 | """New engine with ident `uid` became available.""" | ||
MinRK
|
r3548 | # head of the line: | ||
self.targets.insert(0,uid) | ||||
self.loads.insert(0,0) | ||||
MinRK
|
r4155 | |||
MinRK
|
r3548 | # initialize sets | ||
self.completed[uid] = set() | ||||
MinRK
|
r3607 | self.failed[uid] = set() | ||
MinRK
|
r3548 | self.pending[uid] = {} | ||
MinRK
|
r6092 | |||
MinRK
|
r3873 | # rescan the graph: | ||
self.update_graph(None) | ||||
MinRK
|
r3548 | |||
def _unregister_engine(self, uid): | ||||
MinRK
|
r3556 | """Existing engine with ident `uid` became unavailable.""" | ||
MinRK
|
r3548 | if len(self.targets) == 1: | ||
MinRK
|
r3556 | # this was our only engine | ||
MinRK
|
r6092 | pass | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3556 | # handle any potentially finished tasks: | ||
MinRK
|
r3548 | self.engine_stream.flush() | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3873 | # don't pop destinations, because they might be used later | ||
MinRK
|
r3607 | # map(self.destinations.pop, self.completed.pop(uid)) | ||
# map(self.destinations.pop, self.failed.pop(uid)) | ||||
MinRK
|
r3873 | |||
# prevent this engine from receiving work | ||||
MinRK
|
r3548 | idx = self.targets.index(uid) | ||
self.targets.pop(idx) | ||||
self.loads.pop(idx) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3612 | # wait 5 seconds before cleaning up pending jobs, since the results might | ||
# still be incoming | ||||
if self.pending[uid]: | ||||
dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop) | ||||
dc.start() | ||||
MinRK
|
r3873 | else: | ||
self.completed.pop(uid) | ||||
self.failed.pop(uid) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3612 | def handle_stranded_tasks(self, engine): | ||
MinRK
|
r3556 | """Deal with jobs resident in an engine that died.""" | ||
MinRK
|
r3873 | lost = self.pending[engine] | ||
for msg_id in lost.keys(): | ||||
if msg_id not in self.pending[engine]: | ||||
# prevent double-handling of messages | ||||
continue | ||||
MinRK
|
r6813 | raw_msg = lost[msg_id].raw_msg | ||
MinRK
|
r3612 | idents,msg = self.session.feed_identities(raw_msg, copy=False) | ||
MinRK
|
r4000 | parent = self.session.unpack(msg[1].bytes) | ||
MinRK
|
r3873 | idents = [engine, idents[0]] | ||
# build fake error reply | ||||
MinRK
|
r3612 | try: | ||
raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id)) | ||||
except: | ||||
MinRK
|
r3644 | content = error.wrap_exception() | ||
MinRK
|
r7957 | # build fake metadata | ||
md = dict( | ||||
status=u'error', | ||||
MinRK
|
r12539 | engine=engine.decode('ascii'), | ||
MinRK
|
r6068 | date=datetime.now(), | ||
) | ||||
MinRK
|
r7957 | msg = self.session.msg('apply_reply', content, parent=parent, metadata=md) | ||
MinRK
|
r3873 | raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents)) | ||
# and dispatch it | ||||
self.dispatch_result(raw_reply) | ||||
# finally scrub completed/failed lists | ||||
self.completed.pop(engine) | ||||
self.failed.pop(engine) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3548 | #----------------------------------------------------------------------- | ||
# Job Submission | ||||
#----------------------------------------------------------------------- | ||||
MinRK
|
r6324 | |||
MinRK
|
r10074 | |||
MinRK
|
r6324 | @util.log_errors | ||
MinRK
|
r3548 | def dispatch_submission(self, raw_msg): | ||
MinRK
|
r3556 | """Dispatch job submission to appropriate handlers.""" | ||
MinRK
|
r3548 | # ensure targets up to date: | ||
self.notifier_stream.flush() | ||||
try: | ||||
idents, msg = self.session.feed_identities(raw_msg, copy=False) | ||||
Brian E. Granger
|
r4231 | msg = self.session.unserialize(msg, content=False, copy=False) | ||
MinRK
|
r3781 | except Exception: | ||
MinRK
|
r3996 | self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True) | ||
MinRK
|
r3548 | return | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3563 | # send to monitor | ||
MinRK
|
r4155 | self.mon_stream.send_multipart([b'intask']+raw_msg, copy=False) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3548 | header = msg['header'] | ||
MinRK
|
r7957 | md = msg['metadata'] | ||
MinRK
|
r3548 | msg_id = header['msg_id'] | ||
MinRK
|
r3624 | self.all_ids.add(msg_id) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4155 | # get targets as a set of bytes objects | ||
# from a list of unicode objects | ||||
MinRK
|
r7957 | targets = md.get('targets', []) | ||
MinRK
|
r6813 | targets = map(cast_bytes, targets) | ||
MinRK
|
r4155 | targets = set(targets) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r7957 | retries = md.get('retries', 0) | ||
MinRK
|
r3873 | self.retries[msg_id] = retries | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3556 | # time dependencies | ||
MinRK
|
r7957 | after = md.get('after', None) | ||
MinRK
|
r4141 | if after: | ||
after = Dependency(after) | ||||
if after.all: | ||||
if after.success: | ||||
MinRK
|
r4154 | after = Dependency(after.difference(self.all_completed), | ||
success=after.success, | ||||
failure=after.failure, | ||||
all=after.all, | ||||
) | ||||
MinRK
|
r4141 | if after.failure: | ||
MinRK
|
r4154 | after = Dependency(after.difference(self.all_failed), | ||
success=after.success, | ||||
failure=after.failure, | ||||
all=after.all, | ||||
) | ||||
MinRK
|
r4141 | if after.check(self.all_completed, self.all_failed): | ||
# recast as empty set, if `after` already met, | ||||
# to prevent unnecessary set comparisons | ||||
after = MET | ||||
else: | ||||
MinRK
|
r3607 | after = MET | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3556 | # location dependencies | ||
MinRK
|
r7957 | follow = Dependency(md.get('follow', [])) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r7957 | timeout = md.get('timeout', None) | ||
MinRK
|
r3625 | if timeout: | ||
MinRK
|
r12785 | timeout = float(timeout) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r6178 | job = Job(msg_id=msg_id, raw_msg=raw_msg, idents=idents, msg=msg, | ||
header=header, targets=targets, after=after, follow=follow, | ||||
MinRK
|
r7957 | timeout=timeout, metadata=md, | ||
MinRK
|
r6178 | ) | ||
MinRK
|
r3625 | # validate and reduce dependencies: | ||
MinRK
|
r3624 | for dep in after,follow: | ||
MinRK
|
r4141 | if not dep: # empty dependency | ||
continue | ||||
MinRK
|
r3624 | # check valid: | ||
if msg_id in dep or dep.difference(self.all_ids): | ||||
MinRK
|
r10076 | self.queue_map[msg_id] = job | ||
MinRK
|
r3624 | return self.fail_unreachable(msg_id, error.InvalidDependency) | ||
# check if unreachable: | ||||
MinRK
|
r3664 | if dep.unreachable(self.all_completed, self.all_failed): | ||
MinRK
|
r10076 | self.queue_map[msg_id] = job | ||
MinRK
|
r3624 | return self.fail_unreachable(msg_id) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3607 | if after.check(self.all_completed, self.all_failed): | ||
MinRK
|
r3548 | # time deps already met, try to run | ||
MinRK
|
r6178 | if not self.maybe_run(job): | ||
MinRK
|
r3548 | # can't run yet | ||
MinRK
|
r3873 | if msg_id not in self.all_failed: | ||
# could have failed as unreachable | ||||
MinRK
|
r6178 | self.save_unmet(job) | ||
MinRK
|
r3548 | else: | ||
MinRK
|
r6178 | self.save_unmet(job) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r12785 | def job_timeout(self, job, timeout_id): | ||
MinRK
|
r10078 | """callback for a job's timeout. | ||
The job may or may not have been run at this point. | ||||
""" | ||||
MinRK
|
r12785 | if job.timeout_id != timeout_id: | ||
# not the most recent call | ||||
return | ||||
MinRK
|
r10079 | now = time.time() | ||
if job.timeout >= (now + 1): | ||||
MinRK
|
r10078 | self.log.warn("task %s timeout fired prematurely: %s > %s", | ||
job.msg_id, job.timeout, now | ||||
) | ||||
if job.msg_id in self.queue_map: | ||||
# still waiting, but ran out of time | ||||
self.log.info("task %r timed out", job.msg_id) | ||||
self.fail_unreachable(job.msg_id, error.TaskTimeout) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3624 | def fail_unreachable(self, msg_id, why=error.ImpossibleDependency): | ||
MinRK
|
r3625 | """a task has become unreachable, send a reply with an ImpossibleDependency | ||
error.""" | ||||
MinRK
|
r10076 | if msg_id not in self.queue_map: | ||
MinRK
|
r10078 | self.log.error("task %r already failed!", msg_id) | ||
MinRK
|
r3607 | return | ||
MinRK
|
r10076 | job = self.queue_map.pop(msg_id) | ||
# lazy-delete from the queue | ||||
job.removed = True | ||||
MinRK
|
r6178 | for mid in job.dependents: | ||
MinRK
|
r3624 | if mid in self.graph: | ||
self.graph[mid].remove(msg_id) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3607 | try: | ||
MinRK
|
r3624 | raise why() | ||
MinRK
|
r3607 | except: | ||
MinRK
|
r3644 | content = error.wrap_exception() | ||
MinRK
|
r10078 | self.log.debug("task %r failing as unreachable with: %s", msg_id, content['ename']) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3607 | self.all_done.add(msg_id) | ||
self.all_failed.add(msg_id) | ||||
Bernardo B. Marques
|
r4872 | |||
msg = self.session.send(self.client_stream, 'apply_reply', content, | ||||
MinRK
|
r6178 | parent=job.header, ident=job.idents) | ||
self.session.send(self.mon_stream, msg, ident=[b'outtask']+job.idents) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3624 | self.update_graph(msg_id, success=False) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r10074 | def available_engines(self): | ||
"""return a list of available engine indices based on HWM""" | ||||
if not self.hwm: | ||||
return range(len(self.targets)) | ||||
available = [] | ||||
for idx in range(len(self.targets)): | ||||
if self.loads[idx] < self.hwm: | ||||
available.append(idx) | ||||
return available | ||||
MinRK
|
r6178 | def maybe_run(self, job): | ||
MinRK
|
r3548 | """check location dependencies, and run if they are met.""" | ||
MinRK
|
r6178 | msg_id = job.msg_id | ||
MinRK
|
r6092 | self.log.debug("Attempting to assign task %s", msg_id) | ||
MinRK
|
r10074 | available = self.available_engines() | ||
if not available: | ||||
MinRK
|
r6092 | # no engines, definitely can't run | ||
return False | ||||
MinRK
|
r6178 | if job.follow or job.targets or job.blacklist or self.hwm: | ||
MinRK
|
r3625 | # we need a can_run filter | ||
MinRK
|
r3548 | def can_run(idx): | ||
MinRK
|
r3781 | # check hwm | ||
MinRK
|
r3873 | if self.hwm and self.loads[idx] == self.hwm: | ||
MinRK
|
r3625 | return False | ||
MinRK
|
r3781 | target = self.targets[idx] | ||
MinRK
|
r3625 | # check blacklist | ||
MinRK
|
r6178 | if target in job.blacklist: | ||
MinRK
|
r3625 | return False | ||
MinRK
|
r3781 | # check targets | ||
MinRK
|
r6178 | if job.targets and target not in job.targets: | ||
MinRK
|
r3781 | return False | ||
MinRK
|
r3625 | # check follow | ||
MinRK
|
r6178 | return job.follow.check(self.completed[target], self.failed[target]) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r10074 | indices = filter(can_run, available) | ||
MinRK
|
r3873 | |||
MinRK
|
r3548 | if not indices: | ||
MinRK
|
r3625 | # couldn't run | ||
MinRK
|
r6178 | if job.follow.all: | ||
MinRK
|
r3625 | # check follow for impossibility | ||
MinRK
|
r3607 | dests = set() | ||
MinRK
|
r3664 | relevant = set() | ||
MinRK
|
r6178 | if job.follow.success: | ||
MinRK
|
r3664 | relevant = self.all_completed | ||
MinRK
|
r6178 | if job.follow.failure: | ||
MinRK
|
r3664 | relevant = relevant.union(self.all_failed) | ||
MinRK
|
r6178 | for m in job.follow.intersection(relevant): | ||
MinRK
|
r3607 | dests.add(self.destinations[m]) | ||
if len(dests) > 1: | ||||
MinRK
|
r10076 | self.queue_map[msg_id] = job | ||
MinRK
|
r3607 | self.fail_unreachable(msg_id) | ||
MinRK
|
r3625 | return False | ||
MinRK
|
r6178 | if job.targets: | ||
MinRK
|
r3625 | # check blacklist+targets for impossibility | ||
MinRK
|
r6270 | job.targets.difference_update(job.blacklist) | ||
MinRK
|
r6178 | if not job.targets or not job.targets.intersection(self.targets): | ||
MinRK
|
r10076 | self.queue_map[msg_id] = job | ||
MinRK
|
r3625 | self.fail_unreachable(msg_id) | ||
return False | ||||
MinRK
|
r3548 | return False | ||
else: | ||||
indices = None | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r6178 | self.submit_task(job, indices) | ||
MinRK
|
r3548 | return True | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r6178 | def save_unmet(self, job): | ||
MinRK
|
r3548 | """Save a message for later submission when its dependencies are met.""" | ||
MinRK
|
r6178 | msg_id = job.msg_id | ||
MinRK
|
r10077 | self.log.debug("Adding task %s to the queue", msg_id) | ||
MinRK
|
r10076 | self.queue_map[msg_id] = job | ||
MinRK
|
r10079 | self.queue.append(job) | ||
MinRK
|
r3607 | # track the ids in follow or after, but not those already finished | ||
MinRK
|
r6178 | for dep_id in job.after.union(job.follow).difference(self.all_done): | ||
MinRK
|
r3624 | if dep_id not in self.graph: | ||
self.graph[dep_id] = set() | ||||
self.graph[dep_id].add(msg_id) | ||||
MinRK
|
r12785 | |||
# schedule timeout callback | ||||
if job.timeout: | ||||
timeout_id = job.timeout_id = job.timeout_id + 1 | ||||
self.loop.add_timeout(time.time() + job.timeout, | ||||
lambda : self.job_timeout(job, timeout_id) | ||||
) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r6178 | def submit_task(self, job, indices=None): | ||
MinRK
|
r3556 | """Submit a task to any of a subset of our targets.""" | ||
MinRK
|
r3548 | if indices: | ||
loads = [self.loads[i] for i in indices] | ||||
else: | ||||
loads = self.loads | ||||
idx = self.scheme(loads) | ||||
if indices: | ||||
idx = indices[idx] | ||||
target = self.targets[idx] | ||||
MinRK
|
r3563 | # print (target, map(str, msg[:3])) | ||
MinRK
|
r3781 | # send job to the engine | ||
MinRK
|
r3551 | self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False) | ||
MinRK
|
r6178 | self.engine_stream.send_multipart(job.raw_msg, copy=False) | ||
MinRK
|
r3781 | # update load | ||
MinRK
|
r3548 | self.add_job(idx) | ||
MinRK
|
r6178 | self.pending[target][job.msg_id] = job | ||
MinRK
|
r3781 | # notify Hub | ||
MinRK
|
r6178 | content = dict(msg_id=job.msg_id, engine_id=target.decode('ascii')) | ||
Bernardo B. Marques
|
r4872 | self.session.send(self.mon_stream, 'task_destination', content=content, | ||
MinRK
|
r4155 | ident=[b'tracktask',self.ident]) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3548 | #----------------------------------------------------------------------- | ||
# Result Handling | ||||
#----------------------------------------------------------------------- | ||||
MinRK
|
r6324 | |||
@util.log_errors | ||||
MinRK
|
r3548 | def dispatch_result(self, raw_msg): | ||
MinRK
|
r3625 | """dispatch method for result replies""" | ||
MinRK
|
r3548 | try: | ||
idents,msg = self.session.feed_identities(raw_msg, copy=False) | ||||
Brian E. Granger
|
r4231 | msg = self.session.unserialize(msg, content=False, copy=False) | ||
MinRK
|
r3781 | engine = idents[0] | ||
MinRK
|
r3873 | try: | ||
idx = self.targets.index(engine) | ||||
except ValueError: | ||||
pass # skip load-update for dead engines | ||||
else: | ||||
self.finish_job(idx) | ||||
MinRK
|
r3781 | except Exception: | ||
MinRK
|
r12785 | self.log.error("task::Invalid result: %r", raw_msg, exc_info=True) | ||
MinRK
|
r3548 | return | ||
MinRK
|
r3781 | |||
MinRK
|
r7957 | md = msg['metadata'] | ||
MinRK
|
r3873 | parent = msg['parent_header'] | ||
MinRK
|
r7957 | if md.get('dependencies_met', True): | ||
success = (md['status'] == 'ok') | ||||
MinRK
|
r3873 | msg_id = parent['msg_id'] | ||
retries = self.retries[msg_id] | ||||
if not success and retries > 0: | ||||
# failed | ||||
self.retries[msg_id] = retries - 1 | ||||
self.handle_unmet_dependency(idents, parent) | ||||
else: | ||||
del self.retries[msg_id] | ||||
# relay to client and update graph | ||||
self.handle_result(idents, parent, raw_msg, success) | ||||
# send to Hub monitor | ||||
MinRK
|
r4155 | self.mon_stream.send_multipart([b'outtask']+raw_msg, copy=False) | ||
MinRK
|
r3548 | else: | ||
MinRK
|
r3873 | self.handle_unmet_dependency(idents, parent) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3607 | def handle_result(self, idents, parent, raw_msg, success=True): | ||
MinRK
|
r3625 | """handle a real task result, either success or failure""" | ||
MinRK
|
r3548 | # first, relay result to client | ||
engine = idents[0] | ||||
client = idents[1] | ||||
MinRK
|
r7538 | # swap_ids for ROUTER-ROUTER mirror | ||
MinRK
|
r3548 | raw_msg[:2] = [client,engine] | ||
MinRK
|
r3563 | # print (map(str, raw_msg[:4])) | ||
MinRK
|
r3548 | self.client_stream.send_multipart(raw_msg, copy=False) | ||
# now, update our data structures | ||||
msg_id = parent['msg_id'] | ||||
self.pending[engine].pop(msg_id) | ||||
MinRK
|
r3607 | if success: | ||
self.completed[engine].add(msg_id) | ||||
self.all_completed.add(msg_id) | ||||
else: | ||||
self.failed[engine].add(msg_id) | ||||
self.all_failed.add(msg_id) | ||||
MinRK
|
r3565 | self.all_done.add(msg_id) | ||
MinRK
|
r3607 | self.destinations[msg_id] = engine | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3624 | self.update_graph(msg_id, success) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3548 | def handle_unmet_dependency(self, idents, parent): | ||
MinRK
|
r3625 | """handle an unmet dependency""" | ||
MinRK
|
r3548 | engine = idents[0] | ||
msg_id = parent['msg_id'] | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r6178 | job = self.pending[engine].pop(msg_id) | ||
job.blacklist.add(engine) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r6178 | if job.blacklist == job.targets: | ||
MinRK
|
r10076 | self.queue_map[msg_id] = job | ||
MinRK
|
r3781 | self.fail_unreachable(msg_id) | ||
MinRK
|
r6178 | elif not self.maybe_run(job): | ||
MinRK
|
r3873 | # resubmit failed | ||
if msg_id not in self.all_failed: | ||||
# put it back in our dependency tree | ||||
MinRK
|
r6178 | self.save_unmet(job) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3781 | if self.hwm: | ||
MinRK
|
r3873 | try: | ||
idx = self.targets.index(engine) | ||||
except ValueError: | ||||
pass # skip load-update for dead engines | ||||
else: | ||||
if self.loads[idx] == self.hwm-1: | ||||
self.update_graph(None) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3781 | def update_graph(self, dep_id=None, success=True): | ||
MinRK
|
r3548 | """dep_id just finished. Update our dependency | ||
MinRK
|
r10074 | graph and submit any jobs that just became runnable. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r10074 | Called with dep_id=None to update entire graph for hwm, but without finishing a task. | ||
MinRK
|
r3781 | """ | ||
MinRK
|
r3607 | # print ("\n\n***********") | ||
# pprint (dep_id) | ||||
MinRK
|
r3624 | # pprint (self.graph) | ||
MinRK
|
r10076 | # pprint (self.queue_map) | ||
MinRK
|
r3607 | # pprint (self.all_completed) | ||
# pprint (self.all_failed) | ||||
# print ("\n\n***********\n\n") | ||||
MinRK
|
r3781 | # update any jobs that depended on the dependency | ||
MinRK
|
r10076 | msg_ids = self.graph.pop(dep_id, []) | ||
MinRK
|
r3873 | |||
# recheck *all* jobs if | ||||
# a) we have HWM and an engine just become no longer full | ||||
# or b) dep_id was given as None | ||||
MinRK
|
r6178 | |||
MinRK
|
r3873 | if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]): | ||
MinRK
|
r10076 | jobs = self.queue | ||
using_queue = True | ||||
else: | ||||
using_queue = False | ||||
MinRK
|
r10079 | jobs = deque(sorted( self.queue_map[msg_id] for msg_id in msg_ids )) | ||
MinRK
|
r6178 | |||
MinRK
|
r10076 | to_restore = [] | ||
while jobs: | ||||
MinRK
|
r10079 | job = jobs.popleft() | ||
MinRK
|
r10076 | if job.removed: | ||
continue | ||||
msg_id = job.msg_id | ||||
put_it_back = True | ||||
MinRK
|
r6178 | if job.after.unreachable(self.all_completed, self.all_failed)\ | ||
or job.follow.unreachable(self.all_completed, self.all_failed): | ||||
MinRK
|
r3607 | self.fail_unreachable(msg_id) | ||
MinRK
|
r10076 | put_it_back = False | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r6178 | elif job.after.check(self.all_completed, self.all_failed): # time deps met, maybe run | ||
if self.maybe_run(job): | ||||
MinRK
|
r10076 | put_it_back = False | ||
self.queue_map.pop(msg_id) | ||||
MinRK
|
r6178 | for mid in job.dependents: | ||
MinRK
|
r3624 | if mid in self.graph: | ||
self.graph[mid].remove(msg_id) | ||||
MinRK
|
r10074 | |||
# abort the loop if we just filled up all of our engines. | ||||
# avoids an O(N) operation in situation of full queue, | ||||
# where graph update is triggered as soon as an engine becomes | ||||
# non-full, and all tasks after the first are checked, | ||||
# even though they can't run. | ||||
if not self.available_engines(): | ||||
MinRK
|
r10076 | break | ||
if using_queue and put_it_back: | ||||
# popped a job from the queue but it neither ran nor failed, | ||||
# so we need to put it back when we are done | ||||
MinRK
|
r10079 | # make sure to_restore preserves the same ordering | ||
MinRK
|
r10076 | to_restore.append(job) | ||
# put back any tasks we popped but didn't run | ||||
MinRK
|
r10079 | if using_queue: | ||
self.queue.extendleft(to_restore) | ||||
MinRK
|
r3548 | #---------------------------------------------------------------------- | ||
# methods to be overridden by subclasses | ||||
#---------------------------------------------------------------------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3548 | def add_job(self, idx): | ||
"""Called after self.targets[idx] just got the job with header. | ||||
Override with subclasses. The default ordering is simple LRU. | ||||
The default loads are the number of outstanding jobs.""" | ||||
self.loads[idx] += 1 | ||||
for lis in (self.targets, self.loads): | ||||
lis.append(lis.pop(idx)) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3548 | def finish_job(self, idx): | ||
"""Called after self.targets[idx] just finished a job. | ||||
Override with subclasses.""" | ||||
self.loads[idx] -= 1 | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3548 | |||
MinRK
|
r7891 | def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, reg_addr, config=None, | ||
MinRK
|
r4007 | logname='root', log_url=None, loglevel=logging.DEBUG, | ||
MinRK
|
r4092 | identity=b'task', in_thread=False): | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4092 | ZMQStream = zmqstream.ZMQStream | ||
MinRK
|
r10080 | |||
MinRK
|
r3773 | if config: | ||
# unwrap dict back into Config | ||||
config = Config(config) | ||||
MinRK
|
r4092 | if in_thread: | ||
# use instance() to get the same Context/Loop as our parent | ||||
ctx = zmq.Context.instance() | ||||
loop = ioloop.IOLoop.instance() | ||||
else: | ||||
Bernardo B. Marques
|
r4872 | # in a process, don't use instance() | ||
MinRK
|
r4092 | # for safety with multiprocessing | ||
ctx = zmq.Context() | ||||
loop = ioloop.IOLoop() | ||||
MinRK
|
r4725 | ins = ZMQStream(ctx.socket(zmq.ROUTER),loop) | ||
MinRK
|
r10614 | util.set_hwm(ins, 0) | ||
MinRK
|
r7893 | ins.setsockopt(zmq.IDENTITY, identity + b'_in') | ||
MinRK
|
r3548 | ins.bind(in_addr) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r4725 | outs = ZMQStream(ctx.socket(zmq.ROUTER),loop) | ||
MinRK
|
r10614 | util.set_hwm(outs, 0) | ||
MinRK
|
r7893 | outs.setsockopt(zmq.IDENTITY, identity + b'_out') | ||
MinRK
|
r3548 | outs.bind(out_addr) | ||
MinRK
|
r4092 | mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop) | ||
MinRK
|
r10614 | util.set_hwm(mons, 0) | ||
MinRK
|
r3548 | mons.connect(mon_addr) | ||
MinRK
|
r4092 | nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop) | ||
Thomas Kluyver
|
r4110 | nots.setsockopt(zmq.SUBSCRIBE, b'') | ||
MinRK
|
r3548 | nots.connect(not_addr) | ||
MinRK
|
r7891 | |||
querys = ZMQStream(ctx.socket(zmq.DEALER),loop) | ||||
querys.connect(reg_addr) | ||||
MinRK
|
r4092 | # setup logging. | ||
if in_thread: | ||||
log = Application.instance().log | ||||
MinRK
|
r3603 | else: | ||
MinRK
|
r4092 | if log_url: | ||
log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel) | ||||
else: | ||||
log = local_logger(logname, loglevel) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3603 | scheduler = TaskScheduler(client_stream=ins, engine_stream=outs, | ||
MinRK
|
r3611 | mon_stream=mons, notifier_stream=nots, | ||
MinRK
|
r7891 | query_stream=querys, | ||
MinRK
|
r4007 | loop=loop, log=log, | ||
MinRK
|
r3622 | config=config) | ||
MinRK
|
r3611 | scheduler.start() | ||
MinRK
|
r4092 | if not in_thread: | ||
try: | ||||
loop.start() | ||||
except KeyboardInterrupt: | ||||
MinRK
|
r6072 | scheduler.log.critical("Interrupted, exiting...") | ||
MinRK
|
r3548 | |||