scheduler.py
581 lines
| 21.6 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3556 | """The Python scheduler for rich scheduling. | ||
The Pure ZMQ scheduler does not allow routing schemes other than LRU, | ||||
nor does it check msg_id DAG dependencies. For those, a slightly slower | ||||
Python Scheduler exists. | ||||
""" | ||||
MinRK
|
r3548 | #---------------------------------------------------------------------- | ||
# Imports | ||||
#---------------------------------------------------------------------- | ||||
MinRK
|
r3553 | from __future__ import print_function | ||
MinRK
|
r3631 | |||
MinRK
|
r3603 | import logging | ||
MinRK
|
r3631 | import sys | ||
from datetime import datetime, timedelta | ||||
MinRK
|
r3611 | from random import randint, random | ||
MinRK
|
r3603 | from types import FunctionType | ||
MinRK
|
r3631 | |||
MinRK
|
r3548 | try: | ||
import numpy | ||||
except ImportError: | ||||
numpy = None | ||||
import zmq | ||||
from zmq.eventloop import ioloop, zmqstream | ||||
# local imports | ||||
MinRK
|
r3603 | from IPython.external.decorator import decorator | ||
MinRK
|
r3604 | from IPython.utils.traitlets import Instance, Dict, List, Set | ||
MinRK
|
r3603 | |||
MinRK
|
r3607 | import error | ||
MinRK
|
r3548 | import streamsession as ss | ||
MinRK
|
r3631 | from dependency import Dependency | ||
MinRK
|
r3603 | from entry_point import connect_logger, local_logger | ||
MinRK
|
r3611 | from factory import SessionFactory | ||
MinRK
|
r3548 | |||
MinRK
|
r3603 | |||
MinRK
|
r3548 | @decorator | ||
def logged(f,self,*args,**kwargs): | ||||
MinRK
|
r3602 | # print ("#--------------------") | ||
MinRK
|
r3610 | self.log.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs)) | ||
MinRK
|
r3602 | # print ("#--") | ||
MinRK
|
r3548 | return f(self,*args, **kwargs) | ||
#---------------------------------------------------------------------- | ||||
# Chooser functions | ||||
#---------------------------------------------------------------------- | ||||
def plainrandom(loads): | ||||
"""Plain random pick.""" | ||||
n = len(loads) | ||||
return randint(0,n-1) | ||||
def lru(loads): | ||||
"""Always pick the front of the line. | ||||
MinRK
|
r3556 | The content of `loads` is ignored. | ||
MinRK
|
r3548 | |||
Assumes LRU ordering of loads, with oldest first. | ||||
""" | ||||
return 0 | ||||
def twobin(loads): | ||||
"""Pick two at random, use the LRU of the two. | ||||
The content of loads is ignored. | ||||
Assumes LRU ordering of loads, with oldest first. | ||||
""" | ||||
n = len(loads) | ||||
a = randint(0,n-1) | ||||
b = randint(0,n-1) | ||||
return min(a,b) | ||||
def weighted(loads): | ||||
"""Pick two at random using inverse load as weight. | ||||
Return the less loaded of the two. | ||||
""" | ||||
# weight 0 a million times more than 1: | ||||
weights = 1./(1e-6+numpy.array(loads)) | ||||
sums = weights.cumsum() | ||||
t = sums[-1] | ||||
x = random()*t | ||||
y = random()*t | ||||
idx = 0 | ||||
idy = 0 | ||||
while sums[idx] < x: | ||||
idx += 1 | ||||
while sums[idy] < y: | ||||
idy += 1 | ||||
if weights[idy] > weights[idx]: | ||||
return idy | ||||
else: | ||||
return idx | ||||
def leastload(loads): | ||||
"""Always choose the lowest load. | ||||
If the lowest load occurs more than once, the first | ||||
occurance will be used. If loads has LRU ordering, this means | ||||
the LRU of those with the lowest load is chosen. | ||||
""" | ||||
return loads.index(min(loads)) | ||||
#--------------------------------------------------------------------- | ||||
# Classes | ||||
#--------------------------------------------------------------------- | ||||
MinRK
|
r3607 | # store empty default dependency: | ||
MET = Dependency([]) | ||||
MinRK
|
r3611 | class TaskScheduler(SessionFactory): | ||
MinRK
|
r3551 | """Python TaskScheduler object. | ||
MinRK
|
r3548 | |||
This is the simplest object that supports msg_id based | ||||
DAG dependencies. *Only* task msg_ids are checked, not | ||||
msg_ids of jobs submitted via the MUX queue. | ||||
""" | ||||
MinRK
|
r3604 | # input arguments: | ||
MinRK
|
r3603 | scheme = Instance(FunctionType, default=leastload) # function for determining the destination | ||
client_stream = Instance(zmqstream.ZMQStream) # client-facing stream | ||||
engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream | ||||
notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream | ||||
mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream | ||||
# internals: | ||||
MinRK
|
r3624 | graph = Dict() # dict by msg_id of [ msg_ids that depend on key ] | ||
MinRK
|
r3604 | depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow) | ||
pending = Dict() # dict by engine_uuid of submitted tasks | ||||
completed = Dict() # dict by engine_uuid of completed tasks | ||||
MinRK
|
r3607 | failed = Dict() # dict by engine_uuid of failed tasks | ||
destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed) | ||||
MinRK
|
r3604 | clients = Dict() # dict by msg_id for who submitted the task | ||
targets = List() # list of target IDENTs | ||||
loads = List() # list of engine loads | ||||
MinRK
|
r3607 | all_completed = Set() # set of all completed tasks | ||
all_failed = Set() # set of all failed tasks | ||||
all_done = Set() # set of all finished tasks=union(completed,failed) | ||||
MinRK
|
r3624 | all_ids = Set() # set of all submitted task IDs | ||
MinRK
|
r3604 | blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency | ||
MinRK
|
r3611 | auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback') | ||
MinRK
|
r3548 | |||
MinRK
|
r3611 | def start(self): | ||
MinRK
|
r3603 | self.engine_stream.on_recv(self.dispatch_result, copy=False) | ||
MinRK
|
r3548 | self._notification_handlers = dict( | ||
registration_notification = self._register_engine, | ||||
unregistration_notification = self._unregister_engine | ||||
) | ||||
self.notifier_stream.on_recv(self.dispatch_notification) | ||||
MinRK
|
r3612 | self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz | ||
MinRK
|
r3611 | self.auditor.start() | ||
MinRK
|
r3610 | self.log.info("Scheduler started...%r"%self) | ||
MinRK
|
r3548 | |||
def resume_receiving(self): | ||||
MinRK
|
r3556 | """Resume accepting jobs.""" | ||
MinRK
|
r3548 | self.client_stream.on_recv(self.dispatch_submission, copy=False) | ||
def stop_receiving(self): | ||||
MinRK
|
r3556 | """Stop accepting jobs while there are no engines. | ||
Leave them in the ZMQ queue.""" | ||||
MinRK
|
r3548 | self.client_stream.on_recv(None) | ||
#----------------------------------------------------------------------- | ||||
# [Un]Registration Handling | ||||
#----------------------------------------------------------------------- | ||||
def dispatch_notification(self, msg): | ||||
"""dispatch register/unregister events.""" | ||||
idents,msg = self.session.feed_identities(msg) | ||||
msg = self.session.unpack_message(msg) | ||||
msg_type = msg['msg_type'] | ||||
handler = self._notification_handlers.get(msg_type, None) | ||||
if handler is None: | ||||
raise Exception("Unhandled message type: %s"%msg_type) | ||||
else: | ||||
try: | ||||
handler(str(msg['content']['queue'])) | ||||
except KeyError: | ||||
MinRK
|
r3610 | self.log.error("task::Invalid notification msg: %s"%msg) | ||
MinRK
|
r3603 | |||
MinRK
|
r3548 | @logged | ||
def _register_engine(self, uid): | ||||
MinRK
|
r3556 | """New engine with ident `uid` became available.""" | ||
MinRK
|
r3548 | # head of the line: | ||
self.targets.insert(0,uid) | ||||
self.loads.insert(0,0) | ||||
# initialize sets | ||||
self.completed[uid] = set() | ||||
MinRK
|
r3607 | self.failed[uid] = set() | ||
MinRK
|
r3548 | self.pending[uid] = {} | ||
if len(self.targets) == 1: | ||||
self.resume_receiving() | ||||
def _unregister_engine(self, uid): | ||||
MinRK
|
r3556 | """Existing engine with ident `uid` became unavailable.""" | ||
MinRK
|
r3548 | if len(self.targets) == 1: | ||
MinRK
|
r3556 | # this was our only engine | ||
MinRK
|
r3548 | self.stop_receiving() | ||
MinRK
|
r3556 | |||
# handle any potentially finished tasks: | ||||
MinRK
|
r3548 | self.engine_stream.flush() | ||
self.completed.pop(uid) | ||||
MinRK
|
r3607 | self.failed.pop(uid) | ||
# don't pop destinations, because it might be used later | ||||
# map(self.destinations.pop, self.completed.pop(uid)) | ||||
# map(self.destinations.pop, self.failed.pop(uid)) | ||||
MinRK
|
r3548 | idx = self.targets.index(uid) | ||
self.targets.pop(idx) | ||||
self.loads.pop(idx) | ||||
MinRK
|
r3612 | # wait 5 seconds before cleaning up pending jobs, since the results might | ||
# still be incoming | ||||
if self.pending[uid]: | ||||
dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop) | ||||
dc.start() | ||||
MinRK
|
r3548 | |||
MinRK
|
r3612 | @logged | ||
def handle_stranded_tasks(self, engine): | ||||
MinRK
|
r3556 | """Deal with jobs resident in an engine that died.""" | ||
MinRK
|
r3612 | lost = self.pending.pop(engine) | ||
MinRK
|
r3628 | for msg_id, (raw_msg, targets, MET, follow, timeout) in lost.iteritems(): | ||
MinRK
|
r3612 | self.all_failed.add(msg_id) | ||
self.all_done.add(msg_id) | ||||
idents,msg = self.session.feed_identities(raw_msg, copy=False) | ||||
msg = self.session.unpack_message(msg, copy=False, content=False) | ||||
parent = msg['header'] | ||||
idents = [idents[0],engine]+idents[1:] | ||||
print (idents) | ||||
try: | ||||
raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id)) | ||||
except: | ||||
content = ss.wrap_exception() | ||||
msg = self.session.send(self.client_stream, 'apply_reply', content, | ||||
parent=parent, ident=idents) | ||||
self.session.send(self.mon_stream, msg, ident=['outtask']+idents) | ||||
MinRK
|
r3624 | self.update_graph(msg_id) | ||
MinRK
|
r3548 | |||
#----------------------------------------------------------------------- | ||||
# Job Submission | ||||
#----------------------------------------------------------------------- | ||||
@logged | ||||
def dispatch_submission(self, raw_msg): | ||||
MinRK
|
r3556 | """Dispatch job submission to appropriate handlers.""" | ||
MinRK
|
r3548 | # ensure targets up to date: | ||
self.notifier_stream.flush() | ||||
try: | ||||
idents, msg = self.session.feed_identities(raw_msg, copy=False) | ||||
MinRK
|
r3624 | msg = self.session.unpack_message(msg, content=False, copy=False) | ||
except: | ||||
self.log.error("task::Invaid task: %s"%raw_msg, exc_info=True) | ||||
MinRK
|
r3548 | return | ||
MinRK
|
r3563 | # send to monitor | ||
self.mon_stream.send_multipart(['intask']+raw_msg, copy=False) | ||||
MinRK
|
r3548 | header = msg['header'] | ||
msg_id = header['msg_id'] | ||||
MinRK
|
r3624 | self.all_ids.add(msg_id) | ||
MinRK
|
r3556 | |||
MinRK
|
r3625 | # targets | ||
targets = set(header.get('targets', [])) | ||||
MinRK
|
r3556 | # time dependencies | ||
MinRK
|
r3548 | after = Dependency(header.get('after', [])) | ||
MinRK
|
r3624 | if after.all: | ||
MinRK
|
r3607 | after.difference_update(self.all_completed) | ||
if not after.success_only: | ||||
after.difference_update(self.all_failed) | ||||
if after.check(self.all_completed, self.all_failed): | ||||
MinRK
|
r3556 | # recast as empty set, if `after` already met, | ||
# to prevent unnecessary set comparisons | ||||
MinRK
|
r3607 | after = MET | ||
MinRK
|
r3548 | |||
MinRK
|
r3556 | # location dependencies | ||
MinRK
|
r3548 | follow = Dependency(header.get('follow', [])) | ||
MinRK
|
r3624 | |||
MinRK
|
r3625 | # turn timeouts into datetime objects: | ||
timeout = header.get('timeout', None) | ||||
if timeout: | ||||
timeout = datetime.now() + timedelta(0,timeout,0) | ||||
args = [raw_msg, targets, after, follow, timeout] | ||||
# validate and reduce dependencies: | ||||
MinRK
|
r3624 | for dep in after,follow: | ||
# check valid: | ||||
if msg_id in dep or dep.difference(self.all_ids): | ||||
MinRK
|
r3625 | self.depending[msg_id] = args | ||
MinRK
|
r3624 | return self.fail_unreachable(msg_id, error.InvalidDependency) | ||
# check if unreachable: | ||||
if dep.unreachable(self.all_failed): | ||||
MinRK
|
r3625 | self.depending[msg_id] = args | ||
MinRK
|
r3624 | return self.fail_unreachable(msg_id) | ||
MinRK
|
r3607 | |||
if after.check(self.all_completed, self.all_failed): | ||||
MinRK
|
r3548 | # time deps already met, try to run | ||
MinRK
|
r3625 | if not self.maybe_run(msg_id, *args): | ||
MinRK
|
r3548 | # can't run yet | ||
MinRK
|
r3625 | self.save_unmet(msg_id, *args) | ||
MinRK
|
r3548 | else: | ||
MinRK
|
r3625 | self.save_unmet(msg_id, *args) | ||
MinRK
|
r3556 | |||
MinRK
|
r3622 | # @logged | ||
MinRK
|
r3611 | def audit_timeouts(self): | ||
"""Audit all waiting tasks for expired timeouts.""" | ||||
now = datetime.now() | ||||
for msg_id in self.depending.keys(): | ||||
# must recheck, in case one failure cascaded to another: | ||||
if msg_id in self.depending: | ||||
MinRK
|
r3625 | raw,after,targets,follow,timeout = self.depending[msg_id] | ||
MinRK
|
r3611 | if timeout and timeout < now: | ||
self.fail_unreachable(msg_id, timeout=True) | ||||
@logged | ||||
MinRK
|
r3624 | def fail_unreachable(self, msg_id, why=error.ImpossibleDependency): | ||
MinRK
|
r3625 | """a task has become unreachable, send a reply with an ImpossibleDependency | ||
error.""" | ||||
MinRK
|
r3607 | if msg_id not in self.depending: | ||
MinRK
|
r3610 | self.log.error("msg %r already failed!"%msg_id) | ||
MinRK
|
r3607 | return | ||
MinRK
|
r3625 | raw_msg,targets,after,follow,timeout = self.depending.pop(msg_id) | ||
MinRK
|
r3607 | for mid in follow.union(after): | ||
MinRK
|
r3624 | if mid in self.graph: | ||
self.graph[mid].remove(msg_id) | ||||
MinRK
|
r3607 | |||
MinRK
|
r3611 | # FIXME: unpacking a message I've already unpacked, but didn't save: | ||
MinRK
|
r3607 | idents,msg = self.session.feed_identities(raw_msg, copy=False) | ||
msg = self.session.unpack_message(msg, copy=False, content=False) | ||||
header = msg['header'] | ||||
try: | ||||
MinRK
|
r3624 | raise why() | ||
MinRK
|
r3607 | except: | ||
content = ss.wrap_exception() | ||||
self.all_done.add(msg_id) | ||||
self.all_failed.add(msg_id) | ||||
msg = self.session.send(self.client_stream, 'apply_reply', content, | ||||
parent=header, ident=idents) | ||||
self.session.send(self.mon_stream, msg, ident=['outtask']+idents) | ||||
MinRK
|
r3624 | self.update_graph(msg_id, success=False) | ||
MinRK
|
r3607 | |||
@logged | ||||
MinRK
|
r3625 | def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout): | ||
MinRK
|
r3548 | """check location dependencies, and run if they are met.""" | ||
MinRK
|
r3625 | blacklist = self.blacklist.setdefault(msg_id, set()) | ||
if follow or targets or blacklist: | ||||
# we need a can_run filter | ||||
MinRK
|
r3548 | def can_run(idx): | ||
target = self.targets[idx] | ||||
MinRK
|
r3625 | # check targets | ||
if targets and target not in targets: | ||||
return False | ||||
# check blacklist | ||||
if target in blacklist: | ||||
return False | ||||
# check follow | ||||
return follow.check(self.completed[target], self.failed[target]) | ||||
MinRK
|
r3548 | |||
indices = filter(can_run, range(len(self.targets))) | ||||
if not indices: | ||||
MinRK
|
r3625 | # couldn't run | ||
MinRK
|
r3624 | if follow.all: | ||
MinRK
|
r3625 | # check follow for impossibility | ||
MinRK
|
r3607 | dests = set() | ||
relevant = self.all_completed if follow.success_only else self.all_done | ||||
for m in follow.intersection(relevant): | ||||
dests.add(self.destinations[m]) | ||||
if len(dests) > 1: | ||||
self.fail_unreachable(msg_id) | ||||
MinRK
|
r3625 | return False | ||
if targets: | ||||
# check blacklist+targets for impossibility | ||||
targets.difference_update(blacklist) | ||||
if not targets or not targets.intersection(self.targets): | ||||
self.fail_unreachable(msg_id) | ||||
return False | ||||
MinRK
|
r3548 | return False | ||
else: | ||||
indices = None | ||||
MinRK
|
r3625 | self.submit_task(msg_id, raw_msg, targets, follow, timeout, indices) | ||
MinRK
|
r3548 | return True | ||
@logged | ||||
MinRK
|
r3625 | def save_unmet(self, msg_id, raw_msg, targets, after, follow, timeout): | ||
MinRK
|
r3548 | """Save a message for later submission when its dependencies are met.""" | ||
MinRK
|
r3625 | self.depending[msg_id] = [raw_msg,targets,after,follow,timeout] | ||
MinRK
|
r3607 | # track the ids in follow or after, but not those already finished | ||
MinRK
|
r3548 | for dep_id in after.union(follow).difference(self.all_done): | ||
MinRK
|
r3624 | if dep_id not in self.graph: | ||
self.graph[dep_id] = set() | ||||
self.graph[dep_id].add(msg_id) | ||||
MinRK
|
r3551 | |||
MinRK
|
r3548 | @logged | ||
MinRK
|
r3625 | def submit_task(self, msg_id, raw_msg, targets, follow, timeout, indices=None): | ||
MinRK
|
r3556 | """Submit a task to any of a subset of our targets.""" | ||
MinRK
|
r3548 | if indices: | ||
loads = [self.loads[i] for i in indices] | ||||
else: | ||||
loads = self.loads | ||||
idx = self.scheme(loads) | ||||
if indices: | ||||
idx = indices[idx] | ||||
target = self.targets[idx] | ||||
MinRK
|
r3563 | # print (target, map(str, msg[:3])) | ||
MinRK
|
r3551 | self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False) | ||
MinRK
|
r3612 | self.engine_stream.send_multipart(raw_msg, copy=False) | ||
MinRK
|
r3548 | self.add_job(idx) | ||
MinRK
|
r3625 | self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout) | ||
MinRK
|
r3556 | content = dict(msg_id=msg_id, engine_id=target) | ||
MinRK
|
r3603 | self.session.send(self.mon_stream, 'task_destination', content=content, | ||
ident=['tracktask',self.session.session]) | ||||
MinRK
|
r3548 | |||
#----------------------------------------------------------------------- | ||||
# Result Handling | ||||
#----------------------------------------------------------------------- | ||||
@logged | ||||
def dispatch_result(self, raw_msg): | ||||
MinRK
|
r3625 | """dispatch method for result replies""" | ||
MinRK
|
r3548 | try: | ||
idents,msg = self.session.feed_identities(raw_msg, copy=False) | ||||
MinRK
|
r3624 | msg = self.session.unpack_message(msg, content=False, copy=False) | ||
except: | ||||
self.log.error("task::Invaid result: %s"%raw_msg, exc_info=True) | ||||
MinRK
|
r3548 | return | ||
MinRK
|
r3624 | |||
MinRK
|
r3548 | header = msg['header'] | ||
if header.get('dependencies_met', True): | ||||
MinRK
|
r3607 | success = (header['status'] == 'ok') | ||
self.handle_result(idents, msg['parent_header'], raw_msg, success) | ||||
# send to Hub monitor | ||||
MinRK
|
r3548 | self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False) | ||
else: | ||||
MinRK
|
r3551 | self.handle_unmet_dependency(idents, msg['parent_header']) | ||
MinRK
|
r3548 | |||
@logged | ||||
MinRK
|
r3607 | def handle_result(self, idents, parent, raw_msg, success=True): | ||
MinRK
|
r3625 | """handle a real task result, either success or failure""" | ||
MinRK
|
r3548 | # first, relay result to client | ||
engine = idents[0] | ||||
client = idents[1] | ||||
# swap_ids for XREP-XREP mirror | ||||
raw_msg[:2] = [client,engine] | ||||
MinRK
|
r3563 | # print (map(str, raw_msg[:4])) | ||
MinRK
|
r3548 | self.client_stream.send_multipart(raw_msg, copy=False) | ||
# now, update our data structures | ||||
msg_id = parent['msg_id'] | ||||
MinRK
|
r3612 | self.blacklist.pop(msg_id, None) | ||
MinRK
|
r3548 | self.pending[engine].pop(msg_id) | ||
MinRK
|
r3607 | if success: | ||
self.completed[engine].add(msg_id) | ||||
self.all_completed.add(msg_id) | ||||
else: | ||||
self.failed[engine].add(msg_id) | ||||
self.all_failed.add(msg_id) | ||||
MinRK
|
r3565 | self.all_done.add(msg_id) | ||
MinRK
|
r3607 | self.destinations[msg_id] = engine | ||
MinRK
|
r3548 | |||
MinRK
|
r3624 | self.update_graph(msg_id, success) | ||
MinRK
|
r3548 | |||
@logged | ||||
def handle_unmet_dependency(self, idents, parent): | ||||
MinRK
|
r3625 | """handle an unmet dependency""" | ||
MinRK
|
r3548 | engine = idents[0] | ||
msg_id = parent['msg_id'] | ||||
MinRK
|
r3625 | |||
MinRK
|
r3548 | if msg_id not in self.blacklist: | ||
self.blacklist[msg_id] = set() | ||||
self.blacklist[msg_id].add(engine) | ||||
MinRK
|
r3625 | |||
args = self.pending[engine].pop(msg_id) | ||||
raw,targets,after,follow,timeout = args | ||||
if self.blacklist[msg_id] == targets: | ||||
self.depending[msg_id] = args | ||||
return self.fail_unreachable(msg_id) | ||||
elif not self.maybe_run(msg_id, *args): | ||||
MinRK
|
r3548 | # resubmit failed, put it back in our dependency tree | ||
MinRK
|
r3625 | self.save_unmet(msg_id, *args) | ||
MinRK
|
r3607 | |||
MinRK
|
r3548 | @logged | ||
MinRK
|
r3624 | def update_graph(self, dep_id, success=True): | ||
MinRK
|
r3548 | """dep_id just finished. Update our dependency | ||
MinRK
|
r3625 | graph and submit any jobs that just became runable.""" | ||
MinRK
|
r3607 | # print ("\n\n***********") | ||
# pprint (dep_id) | ||||
MinRK
|
r3624 | # pprint (self.graph) | ||
MinRK
|
r3607 | # pprint (self.depending) | ||
# pprint (self.all_completed) | ||||
# pprint (self.all_failed) | ||||
# print ("\n\n***********\n\n") | ||||
MinRK
|
r3624 | if dep_id not in self.graph: | ||
MinRK
|
r3548 | return | ||
MinRK
|
r3624 | jobs = self.graph.pop(dep_id) | ||
MinRK
|
r3607 | |||
for msg_id in jobs: | ||||
MinRK
|
r3625 | raw_msg, targets, after, follow, timeout = self.depending[msg_id] | ||
MinRK
|
r3607 | # if dep_id in after: | ||
MinRK
|
r3624 | # if after.all and (success or not after.success_only): | ||
MinRK
|
r3607 | # after.remove(dep_id) | ||
if after.unreachable(self.all_failed) or follow.unreachable(self.all_failed): | ||||
self.fail_unreachable(msg_id) | ||||
elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run | ||||
MinRK
|
r3625 | if self.maybe_run(msg_id, raw_msg, targets, MET, follow, timeout): | ||
MinRK
|
r3607 | |||
self.depending.pop(msg_id) | ||||
for mid in follow.union(after): | ||||
MinRK
|
r3624 | if mid in self.graph: | ||
self.graph[mid].remove(msg_id) | ||||
MinRK
|
r3548 | |||
#---------------------------------------------------------------------- | ||||
# methods to be overridden by subclasses | ||||
#---------------------------------------------------------------------- | ||||
def add_job(self, idx): | ||||
"""Called after self.targets[idx] just got the job with header. | ||||
Override with subclasses. The default ordering is simple LRU. | ||||
The default loads are the number of outstanding jobs.""" | ||||
self.loads[idx] += 1 | ||||
for lis in (self.targets, self.loads): | ||||
lis.append(lis.pop(idx)) | ||||
def finish_job(self, idx): | ||||
"""Called after self.targets[idx] just finished a job. | ||||
Override with subclasses.""" | ||||
self.loads[idx] -= 1 | ||||
MinRK
|
r3624 | def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='ZMQ', | ||
log_addr=None, loglevel=logging.DEBUG, scheme='lru'): | ||||
MinRK
|
r3548 | from zmq.eventloop import ioloop | ||
from zmq.eventloop.zmqstream import ZMQStream | ||||
ctx = zmq.Context() | ||||
loop = ioloop.IOLoop() | ||||
MinRK
|
r3622 | print (in_addr, out_addr, mon_addr, not_addr) | ||
MinRK
|
r3548 | ins = ZMQStream(ctx.socket(zmq.XREP),loop) | ||
ins.bind(in_addr) | ||||
outs = ZMQStream(ctx.socket(zmq.XREP),loop) | ||||
outs.bind(out_addr) | ||||
mons = ZMQStream(ctx.socket(zmq.PUB),loop) | ||||
mons.connect(mon_addr) | ||||
nots = ZMQStream(ctx.socket(zmq.SUB),loop) | ||||
nots.setsockopt(zmq.SUBSCRIBE, '') | ||||
nots.connect(not_addr) | ||||
MinRK
|
r3604 | scheme = globals().get(scheme, None) | ||
MinRK
|
r3603 | # setup logging | ||
if log_addr: | ||||
MinRK
|
r3610 | connect_logger(logname, ctx, log_addr, root="scheduler", loglevel=loglevel) | ||
MinRK
|
r3603 | else: | ||
MinRK
|
r3610 | local_logger(logname, loglevel) | ||
MinRK
|
r3603 | |||
scheduler = TaskScheduler(client_stream=ins, engine_stream=outs, | ||||
MinRK
|
r3611 | mon_stream=mons, notifier_stream=nots, | ||
MinRK
|
r3622 | scheme=scheme, loop=loop, logname=logname, | ||
config=config) | ||||
MinRK
|
r3611 | scheduler.start() | ||
MinRK
|
r3604 | try: | ||
loop.start() | ||||
except KeyboardInterrupt: | ||||
print ("interrupted, exiting...", file=sys.__stderr__) | ||||
MinRK
|
r3548 | |||