|
|
# encoding: utf-8
|
|
|
# -*- test-case-name: IPython.kernel.tests.test_task -*-
|
|
|
|
|
|
"""Task farming representation of the ControllerService."""
|
|
|
|
|
|
__docformat__ = "restructuredtext en"
|
|
|
|
|
|
#-------------------------------------------------------------------------------
|
|
|
# Copyright (C) 2008 The IPython Development Team
|
|
|
#
|
|
|
# Distributed under the terms of the BSD License. The full license is in
|
|
|
# the file COPYING, distributed as part of this software.
|
|
|
#-------------------------------------------------------------------------------
|
|
|
|
|
|
#-------------------------------------------------------------------------------
|
|
|
# Imports
|
|
|
#-------------------------------------------------------------------------------
|
|
|
|
|
|
import copy, time
|
|
|
from types import FunctionType as function
|
|
|
|
|
|
import zope.interface as zi, string
|
|
|
from twisted.internet import defer, reactor
|
|
|
from twisted.python import components, log, failure
|
|
|
|
|
|
# from IPython.genutils import time
|
|
|
|
|
|
from IPython.kernel import engineservice as es, error
|
|
|
from IPython.kernel import controllerservice as cs
|
|
|
from IPython.kernel.twistedutil import gatherBoth, DeferredList
|
|
|
|
|
|
from IPython.kernel.pickleutil import can,uncan, CannedFunction
|
|
|
|
|
|
def canTask(task):
|
|
|
t = copy.copy(task)
|
|
|
t.depend = can(t.depend)
|
|
|
if t.recovery_task:
|
|
|
t.recovery_task = canTask(t.recovery_task)
|
|
|
return t
|
|
|
|
|
|
def uncanTask(task):
|
|
|
t = copy.copy(task)
|
|
|
t.depend = uncan(t.depend)
|
|
|
if t.recovery_task and t.recovery_task is not task:
|
|
|
t.recovery_task = uncanTask(t.recovery_task)
|
|
|
return t
|
|
|
|
|
|
time_format = '%Y/%m/%d %H:%M:%S'
|
|
|
|
|
|
class Task(object):
|
|
|
"""Our representation of a task for the `TaskController` interface.
|
|
|
|
|
|
The user should create instances of this class to represent a task that
|
|
|
needs to be done.
|
|
|
|
|
|
:Parameters:
|
|
|
expression : str
|
|
|
A str that is valid python code that is the task.
|
|
|
pull : str or list of str
|
|
|
The names of objects to be pulled as results. If not specified,
|
|
|
will return {'result', None}
|
|
|
push : dict
|
|
|
A dict of objects to be pushed into the engines namespace before
|
|
|
execution of the expression.
|
|
|
clear_before : boolean
|
|
|
Should the engine's namespace be cleared before the task is run.
|
|
|
Default=False.
|
|
|
clear_after : boolean
|
|
|
Should the engine's namespace be cleared after the task is run.
|
|
|
Default=False.
|
|
|
retries : int
|
|
|
The number of times to resumbit the task if it fails. Default=0.
|
|
|
recovery_task : Task
|
|
|
This is the Task to be run when the task has exhausted its retries
|
|
|
Default=None.
|
|
|
depend : bool function(properties)
|
|
|
This is the dependency function for the Task, which determines
|
|
|
whether a task can be run on a Worker. `depend` is called with
|
|
|
one argument, the worker's properties dict, and should return
|
|
|
True if the worker meets the dependencies or False if it does
|
|
|
not.
|
|
|
Default=None - run on any worker
|
|
|
options : dict
|
|
|
Any other keyword options for more elaborate uses of tasks
|
|
|
|
|
|
Examples
|
|
|
--------
|
|
|
|
|
|
>>> t = Task('dostuff(args)')
|
|
|
>>> t = Task('a=5', pull='a')
|
|
|
>>> t = Task('a=5\nb=4', pull=['a','b'])
|
|
|
>>> t = Task('os.kill(os.getpid(),9)', retries=100) # this is a bad idea
|
|
|
# A dependency case:
|
|
|
>>> def hasMPI(props):
|
|
|
... return props.get('mpi') is not None
|
|
|
>>> t = Task('mpi.send(blah,blah)', depend = hasMPI)
|
|
|
"""
|
|
|
|
|
|
def __init__(self, expression, pull=None, push=None,
|
|
|
clear_before=False, clear_after=False, retries=0,
|
|
|
recovery_task=None, depend=None, **options):
|
|
|
self.expression = expression
|
|
|
if isinstance(pull, str):
|
|
|
self.pull = [pull]
|
|
|
else:
|
|
|
self.pull = pull
|
|
|
self.push = push
|
|
|
self.clear_before = clear_before
|
|
|
self.clear_after = clear_after
|
|
|
self.retries=retries
|
|
|
self.recovery_task = recovery_task
|
|
|
self.depend = depend
|
|
|
self.options = options
|
|
|
self.taskid = None
|
|
|
|
|
|
class ResultNS:
|
|
|
"""The result namespace object for use in TaskResult objects as tr.ns.
|
|
|
It builds an object from a dictionary, such that it has attributes
|
|
|
according to the key,value pairs of the dictionary.
|
|
|
|
|
|
This works by calling setattr on ALL key,value pairs in the dict. If a user
|
|
|
chooses to overwrite the `__repr__` or `__getattr__` attributes, they can.
|
|
|
This can be a bad idea, as it may corrupt standard behavior of the
|
|
|
ns object.
|
|
|
|
|
|
Example
|
|
|
--------
|
|
|
|
|
|
>>> ns = ResultNS({'a':17,'foo':range(3)})
|
|
|
>>> print ns
|
|
|
NS{'a':17,'foo':range(3)}
|
|
|
>>> ns.a
|
|
|
17
|
|
|
>>> ns['foo']
|
|
|
[0,1,2]
|
|
|
"""
|
|
|
def __init__(self, dikt):
|
|
|
for k,v in dikt.iteritems():
|
|
|
setattr(self,k,v)
|
|
|
|
|
|
def __repr__(self):
|
|
|
l = dir(self)
|
|
|
d = {}
|
|
|
for k in l:
|
|
|
# do not print private objects
|
|
|
if k[:2] != '__' and k[-2:] != '__':
|
|
|
d[k] = getattr(self, k)
|
|
|
return "NS"+repr(d)
|
|
|
|
|
|
def __getitem__(self, key):
|
|
|
return getattr(self, key)
|
|
|
|
|
|
class TaskResult(object):
|
|
|
"""
|
|
|
An object for returning task results.
|
|
|
|
|
|
This object encapsulates the results of a task. On task
|
|
|
success it will have a keys attribute that will have a list
|
|
|
of the variables that have been pulled back. These variables
|
|
|
are accessible as attributes of this class as well. On
|
|
|
success the failure attribute will be None.
|
|
|
|
|
|
In task failure, keys will be empty, but failure will contain
|
|
|
the failure object that encapsulates the remote exception.
|
|
|
One can also simply call the raiseException() method of
|
|
|
this class to re-raise any remote exception in the local
|
|
|
session.
|
|
|
|
|
|
The TaskResult has a .ns member, which is a property for access
|
|
|
to the results. If the Task had pull=['a', 'b'], then the
|
|
|
Task Result will have attributes tr.ns.a, tr.ns.b for those values.
|
|
|
Accessing tr.ns will raise the remote failure if the task failed.
|
|
|
|
|
|
The engineid attribute should have the engineid of the engine
|
|
|
that ran the task. But, because engines can come and go in
|
|
|
the ipython task system, the engineid may not continue to be
|
|
|
valid or accurate.
|
|
|
|
|
|
The taskid attribute simply gives the taskid that the task
|
|
|
is tracked under.
|
|
|
"""
|
|
|
taskid = None
|
|
|
|
|
|
def _getNS(self):
|
|
|
if isinstance(self.failure, failure.Failure):
|
|
|
return self.failure.raiseException()
|
|
|
else:
|
|
|
return self._ns
|
|
|
|
|
|
def _setNS(self, v):
|
|
|
raise Exception("I am protected!")
|
|
|
|
|
|
ns = property(_getNS, _setNS)
|
|
|
|
|
|
def __init__(self, results, engineid):
|
|
|
self.engineid = engineid
|
|
|
if isinstance(results, failure.Failure):
|
|
|
self.failure = results
|
|
|
self.results = {}
|
|
|
else:
|
|
|
self.results = results
|
|
|
self.failure = None
|
|
|
|
|
|
self._ns = ResultNS(self.results)
|
|
|
|
|
|
self.keys = self.results.keys()
|
|
|
|
|
|
def __repr__(self):
|
|
|
if self.failure is not None:
|
|
|
contents = self.failure
|
|
|
else:
|
|
|
contents = self.results
|
|
|
return "TaskResult[ID:%r]:%r"%(self.taskid, contents)
|
|
|
|
|
|
def __getitem__(self, key):
|
|
|
if self.failure is not None:
|
|
|
self.raiseException()
|
|
|
return self.results[key]
|
|
|
|
|
|
def raiseException(self):
|
|
|
"""Re-raise any remote exceptions in the local python session."""
|
|
|
if self.failure is not None:
|
|
|
self.failure.raiseException()
|
|
|
|
|
|
|
|
|
class IWorker(zi.Interface):
|
|
|
"""The Basic Worker Interface.
|
|
|
|
|
|
A worked is a representation of an Engine that is ready to run tasks.
|
|
|
"""
|
|
|
|
|
|
zi.Attribute("workerid", "the id of the worker")
|
|
|
|
|
|
def run(task):
|
|
|
"""Run task in worker's namespace.
|
|
|
|
|
|
:Parameters:
|
|
|
task : a `Task` object
|
|
|
|
|
|
:Returns: `Deferred` to a `TaskResult` object.
|
|
|
"""
|
|
|
|
|
|
|
|
|
class WorkerFromQueuedEngine(object):
|
|
|
"""Adapt an `IQueuedEngine` to an `IWorker` object"""
|
|
|
zi.implements(IWorker)
|
|
|
|
|
|
def __init__(self, qe):
|
|
|
self.queuedEngine = qe
|
|
|
self.workerid = None
|
|
|
|
|
|
def _get_properties(self):
|
|
|
return self.queuedEngine.properties
|
|
|
|
|
|
properties = property(_get_properties, lambda self, _:None)
|
|
|
|
|
|
def run(self, task):
|
|
|
"""Run task in worker's namespace.
|
|
|
|
|
|
:Parameters:
|
|
|
task : a `Task` object
|
|
|
|
|
|
:Returns: `Deferred` to a `TaskResult` object.
|
|
|
"""
|
|
|
if task.clear_before:
|
|
|
d = self.queuedEngine.reset()
|
|
|
else:
|
|
|
d = defer.succeed(None)
|
|
|
|
|
|
if task.push is not None:
|
|
|
d.addCallback(lambda r: self.queuedEngine.push(task.push))
|
|
|
|
|
|
d.addCallback(lambda r: self.queuedEngine.execute(task.expression))
|
|
|
|
|
|
if task.pull is not None:
|
|
|
d.addCallback(lambda r: self.queuedEngine.pull(task.pull))
|
|
|
else:
|
|
|
d.addCallback(lambda r: None)
|
|
|
|
|
|
def reseter(result):
|
|
|
self.queuedEngine.reset()
|
|
|
return result
|
|
|
|
|
|
if task.clear_after:
|
|
|
d.addBoth(reseter)
|
|
|
|
|
|
return d.addBoth(self._zipResults, task.pull, time.time(), time.localtime())
|
|
|
|
|
|
def _zipResults(self, result, names, start, start_struct):
|
|
|
"""Callback for construting the TaskResult object."""
|
|
|
if isinstance(result, failure.Failure):
|
|
|
tr = TaskResult(result, self.queuedEngine.id)
|
|
|
else:
|
|
|
if names is None:
|
|
|
resultDict = {}
|
|
|
elif len(names) == 1:
|
|
|
resultDict = {names[0]:result}
|
|
|
else:
|
|
|
resultDict = dict(zip(names, result))
|
|
|
tr = TaskResult(resultDict, self.queuedEngine.id)
|
|
|
# the time info
|
|
|
tr.submitted = time.strftime(time_format, start_struct)
|
|
|
tr.completed = time.strftime(time_format)
|
|
|
tr.duration = time.time()-start
|
|
|
return tr
|
|
|
|
|
|
|
|
|
components.registerAdapter(WorkerFromQueuedEngine, es.IEngineQueued, IWorker)
|
|
|
|
|
|
class IScheduler(zi.Interface):
|
|
|
"""The interface for a Scheduler.
|
|
|
"""
|
|
|
zi.Attribute("nworkers", "the number of unassigned workers")
|
|
|
zi.Attribute("ntasks", "the number of unscheduled tasks")
|
|
|
zi.Attribute("workerids", "a list of the worker ids")
|
|
|
zi.Attribute("taskids", "a list of the task ids")
|
|
|
|
|
|
def add_task(task, **flags):
|
|
|
"""Add a task to the queue of the Scheduler.
|
|
|
|
|
|
:Parameters:
|
|
|
task : a `Task` object
|
|
|
The task to be queued.
|
|
|
flags : dict
|
|
|
General keywords for more sophisticated scheduling
|
|
|
"""
|
|
|
|
|
|
def pop_task(id=None):
|
|
|
"""Pops a Task object.
|
|
|
|
|
|
This gets the next task to be run. If no `id` is requested, the highest priority
|
|
|
task is returned.
|
|
|
|
|
|
:Parameters:
|
|
|
id
|
|
|
The id of the task to be popped. The default (None) is to return
|
|
|
the highest priority task.
|
|
|
|
|
|
:Returns: a `Task` object
|
|
|
|
|
|
:Exceptions:
|
|
|
IndexError : raised if no taskid in queue
|
|
|
"""
|
|
|
|
|
|
def add_worker(worker, **flags):
|
|
|
"""Add a worker to the worker queue.
|
|
|
|
|
|
:Parameters:
|
|
|
worker : an IWorker implementing object
|
|
|
flags : General keywords for more sophisticated scheduling
|
|
|
"""
|
|
|
|
|
|
def pop_worker(id=None):
|
|
|
"""Pops an IWorker object that is ready to do work.
|
|
|
|
|
|
This gets the next IWorker that is ready to do work.
|
|
|
|
|
|
:Parameters:
|
|
|
id : if specified, will pop worker with workerid=id, else pops
|
|
|
highest priority worker. Defaults to None.
|
|
|
|
|
|
:Returns:
|
|
|
an IWorker object
|
|
|
|
|
|
:Exceptions:
|
|
|
IndexError : raised if no workerid in queue
|
|
|
"""
|
|
|
|
|
|
def ready():
|
|
|
"""Returns True if there is something to do, False otherwise"""
|
|
|
|
|
|
def schedule():
|
|
|
"""Returns a tuple of the worker and task pair for the next
|
|
|
task to be run.
|
|
|
"""
|
|
|
|
|
|
|
|
|
class FIFOScheduler(object):
|
|
|
"""A basic First-In-First-Out (Queue) Scheduler.
|
|
|
This is the default Scheduler for the TaskController.
|
|
|
See the docstrings for IScheduler for interface details.
|
|
|
"""
|
|
|
|
|
|
zi.implements(IScheduler)
|
|
|
|
|
|
def __init__(self):
|
|
|
self.tasks = []
|
|
|
self.workers = []
|
|
|
|
|
|
def _ntasks(self):
|
|
|
return len(self.tasks)
|
|
|
|
|
|
def _nworkers(self):
|
|
|
return len(self.workers)
|
|
|
|
|
|
ntasks = property(_ntasks, lambda self, _:None)
|
|
|
nworkers = property(_nworkers, lambda self, _:None)
|
|
|
|
|
|
def _taskids(self):
|
|
|
return [t.taskid for t in self.tasks]
|
|
|
|
|
|
def _workerids(self):
|
|
|
return [w.workerid for w in self.workers]
|
|
|
|
|
|
taskids = property(_taskids, lambda self,_:None)
|
|
|
workerids = property(_workerids, lambda self,_:None)
|
|
|
|
|
|
def add_task(self, task, **flags):
|
|
|
self.tasks.append(task)
|
|
|
|
|
|
def pop_task(self, id=None):
|
|
|
if id is None:
|
|
|
return self.tasks.pop(0)
|
|
|
else:
|
|
|
for i in range(len(self.tasks)):
|
|
|
taskid = self.tasks[i].taskid
|
|
|
if id == taskid:
|
|
|
return self.tasks.pop(i)
|
|
|
raise IndexError("No task #%i"%id)
|
|
|
|
|
|
def add_worker(self, worker, **flags):
|
|
|
self.workers.append(worker)
|
|
|
|
|
|
def pop_worker(self, id=None):
|
|
|
if id is None:
|
|
|
return self.workers.pop(0)
|
|
|
else:
|
|
|
for i in range(len(self.workers)):
|
|
|
workerid = self.workers[i].workerid
|
|
|
if id == workerid:
|
|
|
return self.workers.pop(i)
|
|
|
raise IndexError("No worker #%i"%id)
|
|
|
|
|
|
def schedule(self):
|
|
|
for t in self.tasks:
|
|
|
for w in self.workers:
|
|
|
try:# do not allow exceptions to break this
|
|
|
cando = t.depend is None or t.depend(w.properties)
|
|
|
except:
|
|
|
cando = False
|
|
|
if cando:
|
|
|
return self.pop_worker(w.workerid), self.pop_task(t.taskid)
|
|
|
return None, None
|
|
|
|
|
|
|
|
|
|
|
|
class LIFOScheduler(FIFOScheduler):
|
|
|
"""A Last-In-First-Out (Stack) Scheduler. This scheduler should naively
|
|
|
reward fast engines by giving them more jobs. This risks starvation, but
|
|
|
only in cases with low load, where starvation does not really matter.
|
|
|
"""
|
|
|
|
|
|
def add_task(self, task, **flags):
|
|
|
# self.tasks.reverse()
|
|
|
self.tasks.insert(0, task)
|
|
|
# self.tasks.reverse()
|
|
|
|
|
|
def add_worker(self, worker, **flags):
|
|
|
# self.workers.reverse()
|
|
|
self.workers.insert(0, worker)
|
|
|
# self.workers.reverse()
|
|
|
|
|
|
|
|
|
class ITaskController(cs.IControllerBase):
|
|
|
"""The Task based interface to a `ControllerService` object
|
|
|
|
|
|
This adapts a `ControllerService` to the ITaskController interface.
|
|
|
"""
|
|
|
|
|
|
def run(task):
|
|
|
"""Run a task.
|
|
|
|
|
|
:Parameters:
|
|
|
task : an IPython `Task` object
|
|
|
|
|
|
:Returns: the integer ID of the task
|
|
|
"""
|
|
|
|
|
|
def get_task_result(taskid, block=False):
|
|
|
"""Get the result of a task by its ID.
|
|
|
|
|
|
:Parameters:
|
|
|
taskid : int
|
|
|
the id of the task whose result is requested
|
|
|
|
|
|
:Returns: `Deferred` to (taskid, actualResult) if the task is done, and None
|
|
|
if not.
|
|
|
|
|
|
:Exceptions:
|
|
|
actualResult will be an `IndexError` if no such task has been submitted
|
|
|
"""
|
|
|
|
|
|
def abort(taskid):
|
|
|
"""Remove task from queue if task is has not been submitted.
|
|
|
|
|
|
If the task has already been submitted, wait for it to finish and discard
|
|
|
results and prevent resubmission.
|
|
|
|
|
|
:Parameters:
|
|
|
taskid : the id of the task to be aborted
|
|
|
|
|
|
:Returns:
|
|
|
`Deferred` to abort attempt completion. Will be None on success.
|
|
|
|
|
|
:Exceptions:
|
|
|
deferred will fail with `IndexError` if no such task has been submitted
|
|
|
or the task has already completed.
|
|
|
"""
|
|
|
|
|
|
def barrier(taskids):
|
|
|
"""Block until the list of taskids are completed.
|
|
|
|
|
|
Returns None on success.
|
|
|
"""
|
|
|
|
|
|
def spin():
|
|
|
"""touch the scheduler, to resume scheduling without submitting
|
|
|
a task.
|
|
|
"""
|
|
|
|
|
|
def queue_status(self, verbose=False):
|
|
|
"""Get a dictionary with the current state of the task queue.
|
|
|
|
|
|
If verbose is True, then return lists of taskids, otherwise,
|
|
|
return the number of tasks with each status.
|
|
|
"""
|
|
|
|
|
|
|
|
|
class TaskController(cs.ControllerAdapterBase):
|
|
|
"""The Task based interface to a Controller object.
|
|
|
|
|
|
If you want to use a different scheduler, just subclass this and set
|
|
|
the `SchedulerClass` member to the *class* of your chosen scheduler.
|
|
|
"""
|
|
|
|
|
|
zi.implements(ITaskController)
|
|
|
SchedulerClass = FIFOScheduler
|
|
|
|
|
|
timeout = 30
|
|
|
|
|
|
def __init__(self, controller):
|
|
|
self.controller = controller
|
|
|
self.controller.on_register_engine_do(self.registerWorker, True)
|
|
|
self.controller.on_unregister_engine_do(self.unregisterWorker, True)
|
|
|
self.taskid = 0
|
|
|
self.failurePenalty = 1 # the time in seconds to penalize
|
|
|
# a worker for failing a task
|
|
|
self.pendingTasks = {} # dict of {workerid:(taskid, task)}
|
|
|
self.deferredResults = {} # dict of {taskid:deferred}
|
|
|
self.finishedResults = {} # dict of {taskid:actualResult}
|
|
|
self.workers = {} # dict of {workerid:worker}
|
|
|
self.abortPending = [] # dict of {taskid:abortDeferred}
|
|
|
self.idleLater = None # delayed call object for timeout
|
|
|
self.scheduler = self.SchedulerClass()
|
|
|
|
|
|
for id in self.controller.engines.keys():
|
|
|
self.workers[id] = IWorker(self.controller.engines[id])
|
|
|
self.workers[id].workerid = id
|
|
|
self.schedule.add_worker(self.workers[id])
|
|
|
|
|
|
def registerWorker(self, id):
|
|
|
"""Called by controller.register_engine."""
|
|
|
if self.workers.get(id):
|
|
|
raise "We already have one! This should not happen."
|
|
|
self.workers[id] = IWorker(self.controller.engines[id])
|
|
|
self.workers[id].workerid = id
|
|
|
if not self.pendingTasks.has_key(id):# if not working
|
|
|
self.scheduler.add_worker(self.workers[id])
|
|
|
self.distributeTasks()
|
|
|
|
|
|
def unregisterWorker(self, id):
|
|
|
"""Called by controller.unregister_engine"""
|
|
|
|
|
|
if self.workers.has_key(id):
|
|
|
try:
|
|
|
self.scheduler.pop_worker(id)
|
|
|
except IndexError:
|
|
|
pass
|
|
|
self.workers.pop(id)
|
|
|
|
|
|
def _pendingTaskIDs(self):
|
|
|
return [t.taskid for t in self.pendingTasks.values()]
|
|
|
|
|
|
#---------------------------------------------------------------------------
|
|
|
# Interface methods
|
|
|
#---------------------------------------------------------------------------
|
|
|
|
|
|
def run(self, task):
|
|
|
"""Run a task and return `Deferred` to its taskid."""
|
|
|
task.taskid = self.taskid
|
|
|
task.start = time.localtime()
|
|
|
self.taskid += 1
|
|
|
d = defer.Deferred()
|
|
|
self.scheduler.add_task(task)
|
|
|
# log.msg('Queuing task: %i' % task.taskid)
|
|
|
|
|
|
self.deferredResults[task.taskid] = []
|
|
|
self.distributeTasks()
|
|
|
return defer.succeed(task.taskid)
|
|
|
|
|
|
def get_task_result(self, taskid, block=False):
|
|
|
"""Returns a `Deferred` to a TaskResult tuple or None."""
|
|
|
# log.msg("Getting task result: %i" % taskid)
|
|
|
if self.finishedResults.has_key(taskid):
|
|
|
tr = self.finishedResults[taskid]
|
|
|
return defer.succeed(tr)
|
|
|
elif self.deferredResults.has_key(taskid):
|
|
|
if block:
|
|
|
d = defer.Deferred()
|
|
|
self.deferredResults[taskid].append(d)
|
|
|
return d
|
|
|
else:
|
|
|
return defer.succeed(None)
|
|
|
else:
|
|
|
return defer.fail(IndexError("task ID not registered: %r" % taskid))
|
|
|
|
|
|
def abort(self, taskid):
|
|
|
"""Remove a task from the queue if it has not been run already."""
|
|
|
if not isinstance(taskid, int):
|
|
|
return defer.fail(failure.Failure(TypeError("an integer task id expected: %r" % taskid)))
|
|
|
try:
|
|
|
self.scheduler.pop_task(taskid)
|
|
|
except IndexError, e:
|
|
|
if taskid in self.finishedResults.keys():
|
|
|
d = defer.fail(IndexError("Task Already Completed"))
|
|
|
elif taskid in self.abortPending:
|
|
|
d = defer.fail(IndexError("Task Already Aborted"))
|
|
|
elif taskid in self._pendingTaskIDs():# task is pending
|
|
|
self.abortPending.append(taskid)
|
|
|
d = defer.succeed(None)
|
|
|
else:
|
|
|
d = defer.fail(e)
|
|
|
else:
|
|
|
d = defer.execute(self._doAbort, taskid)
|
|
|
|
|
|
return d
|
|
|
|
|
|
def barrier(self, taskids):
|
|
|
dList = []
|
|
|
if isinstance(taskids, int):
|
|
|
taskids = [taskids]
|
|
|
for id in taskids:
|
|
|
d = self.get_task_result(id, block=True)
|
|
|
dList.append(d)
|
|
|
d = DeferredList(dList, consumeErrors=1)
|
|
|
d.addCallbacks(lambda r: None)
|
|
|
return d
|
|
|
|
|
|
def spin(self):
|
|
|
return defer.succeed(self.distributeTasks())
|
|
|
|
|
|
def queue_status(self, verbose=False):
|
|
|
pending = self._pendingTaskIDs()
|
|
|
failed = []
|
|
|
succeeded = []
|
|
|
for k,v in self.finishedResults.iteritems():
|
|
|
if not isinstance(v, failure.Failure):
|
|
|
if hasattr(v,'failure'):
|
|
|
if v.failure is None:
|
|
|
succeeded.append(k)
|
|
|
else:
|
|
|
failed.append(k)
|
|
|
scheduled = self.scheduler.taskids
|
|
|
if verbose:
|
|
|
result = dict(pending=pending, failed=failed,
|
|
|
succeeded=succeeded, scheduled=scheduled)
|
|
|
else:
|
|
|
result = dict(pending=len(pending),failed=len(failed),
|
|
|
succeeded=len(succeeded),scheduled=len(scheduled))
|
|
|
return defer.succeed(result)
|
|
|
|
|
|
#---------------------------------------------------------------------------
|
|
|
# Queue methods
|
|
|
#---------------------------------------------------------------------------
|
|
|
|
|
|
def _doAbort(self, taskid):
|
|
|
"""Helper function for aborting a pending task."""
|
|
|
# log.msg("Task aborted: %i" % taskid)
|
|
|
result = failure.Failure(error.TaskAborted())
|
|
|
self._finishTask(taskid, result)
|
|
|
if taskid in self.abortPending:
|
|
|
self.abortPending.remove(taskid)
|
|
|
|
|
|
def _finishTask(self, taskid, result):
|
|
|
dlist = self.deferredResults.pop(taskid)
|
|
|
result.taskid = taskid # The TaskResult should save the taskid
|
|
|
self.finishedResults[taskid] = result
|
|
|
for d in dlist:
|
|
|
d.callback(result)
|
|
|
|
|
|
def distributeTasks(self):
|
|
|
"""Distribute tasks while self.scheduler has things to do."""
|
|
|
# log.msg("distributing Tasks")
|
|
|
worker, task = self.scheduler.schedule()
|
|
|
if not worker and not task:
|
|
|
if self.idleLater and self.idleLater.called:# we are inside failIdle
|
|
|
self.idleLater = None
|
|
|
else:
|
|
|
self.checkIdle()
|
|
|
return False
|
|
|
# else something to do:
|
|
|
while worker and task:
|
|
|
# get worker and task
|
|
|
# add to pending
|
|
|
self.pendingTasks[worker.workerid] = task
|
|
|
# run/link callbacks
|
|
|
d = worker.run(task)
|
|
|
# log.msg("Running task %i on worker %i" %(task.taskid, worker.workerid))
|
|
|
d.addBoth(self.taskCompleted, task.taskid, worker.workerid)
|
|
|
worker, task = self.scheduler.schedule()
|
|
|
# check for idle timeout:
|
|
|
self.checkIdle()
|
|
|
return True
|
|
|
|
|
|
def checkIdle(self):
|
|
|
if self.idleLater and not self.idleLater.called:
|
|
|
self.idleLater.cancel()
|
|
|
if self.scheduler.ntasks and self.workers and \
|
|
|
self.scheduler.nworkers == len(self.workers):
|
|
|
self.idleLater = reactor.callLater(self.timeout, self.failIdle)
|
|
|
else:
|
|
|
self.idleLater = None
|
|
|
|
|
|
def failIdle(self):
|
|
|
if not self.distributeTasks():
|
|
|
while self.scheduler.ntasks:
|
|
|
t = self.scheduler.pop_task()
|
|
|
msg = "task %i failed to execute due to unmet dependencies"%t.taskid
|
|
|
msg += " for %i seconds"%self.timeout
|
|
|
# log.msg("Task aborted by timeout: %i" % t.taskid)
|
|
|
f = failure.Failure(error.TaskTimeout(msg))
|
|
|
self._finishTask(t.taskid, f)
|
|
|
self.idleLater = None
|
|
|
|
|
|
|
|
|
def taskCompleted(self, result, taskid, workerid):
|
|
|
"""This is the err/callback for a completed task."""
|
|
|
try:
|
|
|
task = self.pendingTasks.pop(workerid)
|
|
|
except:
|
|
|
# this should not happen
|
|
|
log.msg("Tried to pop bad pending task %i from worker %i"%(taskid, workerid))
|
|
|
log.msg("Result: %r"%result)
|
|
|
log.msg("Pending tasks: %s"%self.pendingTasks)
|
|
|
return
|
|
|
|
|
|
# Check if aborted while pending
|
|
|
aborted = False
|
|
|
if taskid in self.abortPending:
|
|
|
self._doAbort(taskid)
|
|
|
aborted = True
|
|
|
|
|
|
if not aborted:
|
|
|
if result.failure is not None and isinstance(result.failure, failure.Failure): # we failed
|
|
|
log.msg("Task %i failed on worker %i"% (taskid, workerid))
|
|
|
if task.retries > 0: # resubmit
|
|
|
task.retries -= 1
|
|
|
self.scheduler.add_task(task)
|
|
|
s = "Resubmitting task %i, %i retries remaining" %(taskid, task.retries)
|
|
|
log.msg(s)
|
|
|
self.distributeTasks()
|
|
|
elif isinstance(task.recovery_task, Task) and \
|
|
|
task.recovery_task.retries > -1:
|
|
|
# retries = -1 is to prevent infinite recovery_task loop
|
|
|
task.retries = -1
|
|
|
task.recovery_task.taskid = taskid
|
|
|
task = task.recovery_task
|
|
|
self.scheduler.add_task(task)
|
|
|
s = "Recovering task %i, %i retries remaining" %(taskid, task.retries)
|
|
|
log.msg(s)
|
|
|
self.distributeTasks()
|
|
|
else: # done trying
|
|
|
self._finishTask(taskid, result)
|
|
|
# wait a second before readmitting a worker that failed
|
|
|
# it may have died, and not yet been unregistered
|
|
|
reactor.callLater(self.failurePenalty, self.readmitWorker, workerid)
|
|
|
else: # we succeeded
|
|
|
# log.msg("Task completed: %i"% taskid)
|
|
|
self._finishTask(taskid, result)
|
|
|
self.readmitWorker(workerid)
|
|
|
else:# we aborted the task
|
|
|
if result.failure is not None and isinstance(result.failure, failure.Failure): # it failed, penalize worker
|
|
|
reactor.callLater(self.failurePenalty, self.readmitWorker, workerid)
|
|
|
else:
|
|
|
self.readmitWorker(workerid)
|
|
|
|
|
|
def readmitWorker(self, workerid):
|
|
|
"""Readmit a worker to the scheduler.
|
|
|
|
|
|
This is outside `taskCompleted` because of the `failurePenalty` being
|
|
|
implemented through `reactor.callLater`.
|
|
|
"""
|
|
|
|
|
|
if workerid in self.workers.keys() and workerid not in self.pendingTasks.keys():
|
|
|
self.scheduler.add_worker(self.workers[workerid])
|
|
|
self.distributeTasks()
|
|
|
|
|
|
|
|
|
components.registerAdapter(TaskController, cs.IControllerBase, ITaskController)
|
|
|
|