task.py
1113 lines
| 38.4 KiB
| text/x-python
|
PythonLexer
Brian E Granger
|
r1234 | # encoding: utf-8 | ||
# -*- test-case-name: IPython.kernel.tests.test_task -*- | ||||
"""Task farming representation of the ControllerService.""" | ||||
__docformat__ = "restructuredtext en" | ||||
Brian E Granger
|
r1395 | #----------------------------------------------------------------------------- | ||
Brian E Granger
|
r1234 | # Copyright (C) 2008 The IPython Development Team | ||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
Brian E Granger
|
r1395 | #----------------------------------------------------------------------------- | ||
Brian E Granger
|
r1234 | |||
Brian E Granger
|
r1395 | #----------------------------------------------------------------------------- | ||
Brian E Granger
|
r1234 | # Imports | ||
Brian E Granger
|
r1395 | #----------------------------------------------------------------------------- | ||
Brian E Granger
|
r1234 | |||
import copy, time | ||||
Brian E Granger
|
r1344 | from types import FunctionType | ||
Brian E Granger
|
r1234 | |||
import zope.interface as zi, string | ||||
from twisted.internet import defer, reactor | ||||
from twisted.python import components, log, failure | ||||
Brian E Granger
|
r1395 | from IPython.kernel.util import printer | ||
Brian E Granger
|
r1234 | from IPython.kernel import engineservice as es, error | ||
from IPython.kernel import controllerservice as cs | ||||
from IPython.kernel.twistedutil import gatherBoth, DeferredList | ||||
Brian E Granger
|
r1395 | from IPython.kernel.pickleutil import can, uncan, CannedFunction | ||
Brian E Granger
|
r1234 | |||
Brian E Granger
|
r1395 | #----------------------------------------------------------------------------- | ||
# Definition of the Task objects | ||||
#----------------------------------------------------------------------------- | ||||
Brian E Granger
|
r1234 | |||
time_format = '%Y/%m/%d %H:%M:%S' | ||||
Brian E Granger
|
r1395 | class ITask(zi.Interface): | ||
""" | ||||
This interface provides a generic definition of what constitutes a task. | ||||
There are two sides to a task. First a task needs to take input from | ||||
a user to determine what work is performed by the task. Second, the | ||||
task needs to have the logic that knows how to turn that information | ||||
info specific calls to a worker, through the `IQueuedEngine` interface. | ||||
Brian E Granger
|
r1234 | |||
Brian E Granger
|
r1395 | Many method in this class get two things passed to them: a Deferred | ||
and an IQueuedEngine implementer. Such methods should register callbacks | ||||
on the Deferred that use the IQueuedEngine to accomplish something. See | ||||
the existing task objects for examples. | ||||
Brian E Granger
|
r1234 | """ | ||
Brian E Granger
|
r1395 | zi.Attribute('retries','How many times to retry the task') | ||
zi.Attribute('recovery_task','A task to try if the initial one fails') | ||||
zi.Attribute('taskid','the id of the task') | ||||
def start_time(result): | ||||
""" | ||||
Do anything needed to start the timing of the task. | ||||
Must simply return the result after starting the timers. | ||||
""" | ||||
def stop_time(result): | ||||
""" | ||||
Do anything needed to stop the timing of the task. | ||||
Must simply return the result after stopping the timers. This | ||||
method will usually set attributes that are used by `process_result` | ||||
in building result of the task. | ||||
""" | ||||
def pre_task(d, queued_engine): | ||||
"""Do something with the queued_engine before the task is run. | ||||
This method should simply add callbacks to the input Deferred | ||||
that do something with the `queued_engine` before the task is run. | ||||
:Parameters: | ||||
d : Deferred | ||||
The deferred that actions should be attached to | ||||
queued_engine : IQueuedEngine implementer | ||||
The worker that has been allocated to perform the task | ||||
""" | ||||
def post_task(d, queued_engine): | ||||
"""Do something with the queued_engine after the task is run. | ||||
This method should simply add callbacks to the input Deferred | ||||
that do something with the `queued_engine` before the task is run. | ||||
:Parameters: | ||||
d : Deferred | ||||
The deferred that actions should be attached to | ||||
queued_engine : IQueuedEngine implementer | ||||
The worker that has been allocated to perform the task | ||||
""" | ||||
def submit_task(d, queued_engine): | ||||
"""Submit a task using the `queued_engine` we have been allocated. | ||||
When a task is ready to run, this method is called. This method | ||||
must take the internal information of the task and make suitable | ||||
calls on the queued_engine to have the actual work done. | ||||
This method should simply add callbacks to the input Deferred | ||||
that do something with the `queued_engine` before the task is run. | ||||
:Parameters: | ||||
d : Deferred | ||||
The deferred that actions should be attached to | ||||
queued_engine : IQueuedEngine implementer | ||||
The worker that has been allocated to perform the task | ||||
""" | ||||
def process_result(d, result, engine_id): | ||||
"""Take a raw task result. | ||||
Objects that implement `ITask` can choose how the result of running | ||||
the task is presented. This method takes the raw result and | ||||
does this logic. Two example are the `MapTask` which simply returns | ||||
the raw result or a `Failure` object and the `StringTask` which | ||||
returns a `TaskResult` object. | ||||
:Parameters: | ||||
d : Deferred | ||||
The deferred that actions should be attached to | ||||
result : object | ||||
The raw task result that needs to be wrapped | ||||
engine_id : int | ||||
The id of the engine that did the task | ||||
:Returns: | ||||
The result, as a tuple of the form: (success, result). | ||||
Here, success is a boolean indicating if the task | ||||
succeeded or failed and result is the result. | ||||
""" | ||||
def check_depend(properties): | ||||
"""Check properties to see if the task should be run. | ||||
:Parameters: | ||||
properties : dict | ||||
A dictionary of properties that an engine has set | ||||
:Returns: | ||||
True if the task should be run, False otherwise | ||||
""" | ||||
def can_task(self): | ||||
"""Serialize (can) any functions in the task for pickling. | ||||
Subclasses must override this method and make sure that all | ||||
functions in the task are canned by calling `can` on the | ||||
function. | ||||
""" | ||||
def uncan_task(self): | ||||
"""Unserialize (uncan) any canned function in the task.""" | ||||
class BaseTask(object): | ||||
""" | ||||
Common fuctionality for all objects implementing `ITask`. | ||||
""" | ||||
zi.implements(ITask) | ||||
def __init__(self, clear_before=False, clear_after=False, retries=0, | ||||
recovery_task=None, depend=None): | ||||
""" | ||||
Make a generic task. | ||||
:Parameters: | ||||
clear_before : boolean | ||||
Should the engines namespace be cleared before the task | ||||
is run | ||||
clear_after : boolean | ||||
Should the engines namespace be clear after the task is run | ||||
retries : int | ||||
The number of times a task should be retries upon failure | ||||
recovery_task : any task object | ||||
If a task fails and it has a recovery_task, that is run | ||||
upon a retry | ||||
depend : FunctionType | ||||
A function that is called to test for properties. This function | ||||
must take one argument, the properties dict and return a boolean | ||||
""" | ||||
self.clear_before = clear_before | ||||
self.clear_after = clear_after | ||||
self.retries = retries | ||||
self.recovery_task = recovery_task | ||||
self.depend = depend | ||||
self.taskid = None | ||||
def start_time(self, result): | ||||
""" | ||||
Start the basic timers. | ||||
""" | ||||
self.start = time.time() | ||||
self.start_struct = time.localtime() | ||||
return result | ||||
def stop_time(self, result): | ||||
""" | ||||
Stop the basic timers. | ||||
""" | ||||
self.stop = time.time() | ||||
self.stop_struct = time.localtime() | ||||
self.duration = self.stop - self.start | ||||
self.submitted = time.strftime(time_format, self.start_struct) | ||||
self.completed = time.strftime(time_format) | ||||
return result | ||||
def pre_task(self, d, queued_engine): | ||||
""" | ||||
Clear the engine before running the task if clear_before is set. | ||||
""" | ||||
if self.clear_before: | ||||
d.addCallback(lambda r: queued_engine.reset()) | ||||
def post_task(self, d, queued_engine): | ||||
""" | ||||
Clear the engine after running the task if clear_after is set. | ||||
""" | ||||
def reseter(result): | ||||
queued_engine.reset() | ||||
return result | ||||
if self.clear_after: | ||||
d.addBoth(reseter) | ||||
def submit_task(self, d, queued_engine): | ||||
raise NotImplementedError('submit_task must be implemented in a subclass') | ||||
def process_result(self, result, engine_id): | ||||
""" | ||||
Process a task result. | ||||
This is the default `process_result` that just returns the raw | ||||
result or a `Failure`. | ||||
""" | ||||
if isinstance(result, failure.Failure): | ||||
return (False, result) | ||||
else: | ||||
return (True, result) | ||||
def check_depend(self, properties): | ||||
""" | ||||
Calls self.depend(properties) to see if a task should be run. | ||||
""" | ||||
if self.depend is not None: | ||||
return self.depend(properties) | ||||
else: | ||||
return True | ||||
def can_task(self): | ||||
self.depend = can(self.depend) | ||||
if isinstance(self.recovery_task, BaseTask): | ||||
self.recovery_task.can_task() | ||||
def uncan_task(self): | ||||
self.depend = uncan(self.depend) | ||||
if isinstance(self.recovery_task, BaseTask): | ||||
self.recovery_task.uncan_task() | ||||
class MapTask(BaseTask): | ||||
""" | ||||
A task that consists of a function and arguments. | ||||
""" | ||||
zi.implements(ITask) | ||||
def __init__(self, function, args=None, kwargs=None, clear_before=False, | ||||
clear_after=False, retries=0, recovery_task=None, depend=None): | ||||
""" | ||||
Create a task based on a function, args and kwargs. | ||||
This is a simple type of task that consists of calling: | ||||
function(*args, **kwargs) and wrapping the result in a `TaskResult`. | ||||
The return value of the function, or a `Failure` wrapping an | ||||
exception is the task result for this type of task. | ||||
""" | ||||
BaseTask.__init__(self, clear_before, clear_after, retries, | ||||
recovery_task, depend) | ||||
if not isinstance(function, FunctionType): | ||||
raise TypeError('a task function must be a FunctionType') | ||||
self.function = function | ||||
Brian E Granger
|
r1344 | if args is None: | ||
self.args = () | ||||
else: | ||||
self.args = args | ||||
Brian E Granger
|
r1395 | if not isinstance(self.args, (list, tuple)): | ||
raise TypeError('a task args must be a list or tuple') | ||||
Brian E Granger
|
r1344 | if kwargs is None: | ||
self.kwargs = {} | ||||
else: | ||||
self.kwargs = kwargs | ||||
Brian E Granger
|
r1395 | if not isinstance(self.kwargs, dict): | ||
raise TypeError('a task kwargs must be a dict') | ||||
def submit_task(self, d, queued_engine): | ||||
d.addCallback(lambda r: queued_engine.push_function( | ||||
dict(_ipython_task_function=self.function)) | ||||
) | ||||
d.addCallback(lambda r: queued_engine.push( | ||||
dict(_ipython_task_args=self.args,_ipython_task_kwargs=self.kwargs)) | ||||
) | ||||
d.addCallback(lambda r: queued_engine.execute( | ||||
'_ipython_task_result = _ipython_task_function(*_ipython_task_args,**_ipython_task_kwargs)') | ||||
) | ||||
d.addCallback(lambda r: queued_engine.pull('_ipython_task_result')) | ||||
def can_task(self): | ||||
self.function = can(self.function) | ||||
BaseTask.can_task(self) | ||||
def uncan_task(self): | ||||
self.function = uncan(self.function) | ||||
BaseTask.uncan_task(self) | ||||
class StringTask(BaseTask): | ||||
""" | ||||
A task that consists of a string of Python code to run. | ||||
""" | ||||
def __init__(self, expression, pull=None, push=None, | ||||
clear_before=False, clear_after=False, retries=0, | ||||
recovery_task=None, depend=None): | ||||
""" | ||||
Create a task based on a Python expression and variables | ||||
This type of task lets you push a set of variables to the engines | ||||
namespace, run a Python string in that namespace and then bring back | ||||
a different set of Python variables as the result. | ||||
Because this type of task can return many results (through the | ||||
`pull` keyword argument) it returns a special `TaskResult` object | ||||
that wraps the pulled variables, statistics about the run and | ||||
any exceptions raised. | ||||
""" | ||||
if not isinstance(expression, str): | ||||
raise TypeError('a task expression must be a string') | ||||
self.expression = expression | ||||
if pull==None: | ||||
self.pull = () | ||||
elif isinstance(pull, str): | ||||
self.pull = (pull,) | ||||
elif isinstance(pull, (list, tuple)): | ||||
Brian E Granger
|
r1234 | self.pull = pull | ||
Brian E Granger
|
r1395 | else: | ||
raise TypeError('pull must be str or a sequence of strs') | ||||
if push==None: | ||||
self.push = {} | ||||
elif isinstance(push, dict): | ||||
self.push = push | ||||
else: | ||||
raise TypeError('push must be a dict') | ||||
BaseTask.__init__(self, clear_before, clear_after, retries, | ||||
recovery_task, depend) | ||||
def submit_task(self, d, queued_engine): | ||||
if self.push is not None: | ||||
d.addCallback(lambda r: queued_engine.push(self.push)) | ||||
d.addCallback(lambda r: queued_engine.execute(self.expression)) | ||||
if self.pull is not None: | ||||
d.addCallback(lambda r: queued_engine.pull(self.pull)) | ||||
else: | ||||
d.addCallback(lambda r: None) | ||||
def process_result(self, result, engine_id): | ||||
if isinstance(result, failure.Failure): | ||||
tr = TaskResult(result, engine_id) | ||||
else: | ||||
if self.pull is None: | ||||
resultDict = {} | ||||
elif len(self.pull) == 1: | ||||
resultDict = {self.pull[0]:result} | ||||
else: | ||||
resultDict = dict(zip(self.pull, result)) | ||||
tr = TaskResult(resultDict, engine_id) | ||||
# Assign task attributes | ||||
tr.submitted = self.submitted | ||||
tr.completed = self.completed | ||||
tr.duration = self.duration | ||||
if hasattr(self,'taskid'): | ||||
tr.taskid = self.taskid | ||||
else: | ||||
tr.taskid = None | ||||
if isinstance(result, failure.Failure): | ||||
return (False, tr) | ||||
else: | ||||
return (True, tr) | ||||
Brian E Granger
|
r1234 | |||
Brian E Granger
|
r1395 | class ResultNS(object): | ||
""" | ||||
A dict like object for holding the results of a task. | ||||
The result namespace object for use in `TaskResult` objects as tr.ns. | ||||
Brian E Granger
|
r1234 | It builds an object from a dictionary, such that it has attributes | ||
according to the key,value pairs of the dictionary. | ||||
This works by calling setattr on ALL key,value pairs in the dict. If a user | ||||
chooses to overwrite the `__repr__` or `__getattr__` attributes, they can. | ||||
This can be a bad idea, as it may corrupt standard behavior of the | ||||
ns object. | ||||
Example | ||||
-------- | ||||
>>> ns = ResultNS({'a':17,'foo':range(3)}) | ||||
>>> print ns | ||||
NS{'a':17,'foo':range(3)} | ||||
>>> ns.a | ||||
17 | ||||
>>> ns['foo'] | ||||
[0,1,2] | ||||
""" | ||||
def __init__(self, dikt): | ||||
for k,v in dikt.iteritems(): | ||||
setattr(self,k,v) | ||||
def __repr__(self): | ||||
l = dir(self) | ||||
d = {} | ||||
for k in l: | ||||
# do not print private objects | ||||
if k[:2] != '__' and k[-2:] != '__': | ||||
d[k] = getattr(self, k) | ||||
return "NS"+repr(d) | ||||
def __getitem__(self, key): | ||||
return getattr(self, key) | ||||
class TaskResult(object): | ||||
""" | ||||
Brian E Granger
|
r1395 | An object for returning task results for certain types of tasks. | ||
Brian E Granger
|
r1234 | |||
This object encapsulates the results of a task. On task | ||||
success it will have a keys attribute that will have a list | ||||
of the variables that have been pulled back. These variables | ||||
are accessible as attributes of this class as well. On | ||||
success the failure attribute will be None. | ||||
In task failure, keys will be empty, but failure will contain | ||||
the failure object that encapsulates the remote exception. | ||||
Brian E Granger
|
r1395 | One can also simply call the `raise_exception` method of | ||
Brian E Granger
|
r1234 | this class to re-raise any remote exception in the local | ||
session. | ||||
Brian E Granger
|
r1395 | The `TaskResult` has a `.ns` member, which is a property for access | ||
Brian E Granger
|
r1234 | to the results. If the Task had pull=['a', 'b'], then the | ||
Brian E Granger
|
r1395 | Task Result will have attributes `tr.ns.a`, `tr.ns.b` for those values. | ||
Accessing `tr.ns` will raise the remote failure if the task failed. | ||||
Brian E Granger
|
r1234 | |||
Brian E Granger
|
r1395 | The `engineid` attribute should have the `engineid` of the engine | ||
that ran the task. But, because engines can come and go, | ||||
the `engineid` may not continue to be | ||||
Brian E Granger
|
r1234 | valid or accurate. | ||
Brian E Granger
|
r1395 | The `taskid` attribute simply gives the `taskid` that the task | ||
Brian E Granger
|
r1234 | is tracked under. | ||
""" | ||||
taskid = None | ||||
def _getNS(self): | ||||
if isinstance(self.failure, failure.Failure): | ||||
return self.failure.raiseException() | ||||
else: | ||||
return self._ns | ||||
def _setNS(self, v): | ||||
Brian E Granger
|
r1396 | raise Exception("the ns attribute cannot be changed") | ||
Brian E Granger
|
r1234 | |||
ns = property(_getNS, _setNS) | ||||
def __init__(self, results, engineid): | ||||
self.engineid = engineid | ||||
if isinstance(results, failure.Failure): | ||||
self.failure = results | ||||
self.results = {} | ||||
else: | ||||
self.results = results | ||||
self.failure = None | ||||
self._ns = ResultNS(self.results) | ||||
self.keys = self.results.keys() | ||||
def __repr__(self): | ||||
if self.failure is not None: | ||||
contents = self.failure | ||||
else: | ||||
contents = self.results | ||||
return "TaskResult[ID:%r]:%r"%(self.taskid, contents) | ||||
def __getitem__(self, key): | ||||
if self.failure is not None: | ||||
Brian E Granger
|
r1395 | self.raise_exception() | ||
Brian E Granger
|
r1234 | return self.results[key] | ||
Brian E Granger
|
r1395 | def raise_exception(self): | ||
Brian E Granger
|
r1234 | """Re-raise any remote exceptions in the local python session.""" | ||
if self.failure is not None: | ||||
self.failure.raiseException() | ||||
Brian E Granger
|
r1395 | #----------------------------------------------------------------------------- | ||
# The controller side of things | ||||
#----------------------------------------------------------------------------- | ||||
Brian E Granger
|
r1234 | class IWorker(zi.Interface): | ||
"""The Basic Worker Interface. | ||||
A worked is a representation of an Engine that is ready to run tasks. | ||||
""" | ||||
zi.Attribute("workerid", "the id of the worker") | ||||
def run(task): | ||||
"""Run task in worker's namespace. | ||||
:Parameters: | ||||
task : a `Task` object | ||||
Brian E Granger
|
r1395 | :Returns: `Deferred` to a tuple of (success, result) where | ||
success if a boolean that signifies success or failure | ||||
and result is the task result. | ||||
Brian E Granger
|
r1234 | """ | ||
class WorkerFromQueuedEngine(object): | ||||
"""Adapt an `IQueuedEngine` to an `IWorker` object""" | ||||
Brian E Granger
|
r1395 | |||
Brian E Granger
|
r1234 | zi.implements(IWorker) | ||
def __init__(self, qe): | ||||
self.queuedEngine = qe | ||||
self.workerid = None | ||||
def _get_properties(self): | ||||
return self.queuedEngine.properties | ||||
properties = property(_get_properties, lambda self, _:None) | ||||
def run(self, task): | ||||
"""Run task in worker's namespace. | ||||
Brian E Granger
|
r1395 | This takes a task and calls methods on the task that actually | ||
cause `self.queuedEngine` to do the task. See the methods of | ||||
`ITask` for more information about how these methods are called. | ||||
Brian E Granger
|
r1234 | :Parameters: | ||
task : a `Task` object | ||||
Brian E Granger
|
r1395 | :Returns: `Deferred` to a tuple of (success, result) where | ||
success if a boolean that signifies success or failure | ||||
and result is the task result. | ||||
Brian E Granger
|
r1234 | """ | ||
Brian E Granger
|
r1395 | d = defer.succeed(None) | ||
d.addCallback(task.start_time) | ||||
task.pre_task(d, self.queuedEngine) | ||||
task.submit_task(d, self.queuedEngine) | ||||
task.post_task(d, self.queuedEngine) | ||||
d.addBoth(task.stop_time) | ||||
d.addBoth(task.process_result, self.queuedEngine.id) | ||||
# At this point, there will be (success, result) coming down the line | ||||
return d | ||||
Brian E Granger
|
r1234 | |||
components.registerAdapter(WorkerFromQueuedEngine, es.IEngineQueued, IWorker) | ||||
class IScheduler(zi.Interface): | ||||
"""The interface for a Scheduler. | ||||
""" | ||||
zi.Attribute("nworkers", "the number of unassigned workers") | ||||
zi.Attribute("ntasks", "the number of unscheduled tasks") | ||||
zi.Attribute("workerids", "a list of the worker ids") | ||||
zi.Attribute("taskids", "a list of the task ids") | ||||
def add_task(task, **flags): | ||||
"""Add a task to the queue of the Scheduler. | ||||
:Parameters: | ||||
Brian E Granger
|
r1395 | task : an `ITask` implementer | ||
Brian E Granger
|
r1234 | The task to be queued. | ||
flags : dict | ||||
General keywords for more sophisticated scheduling | ||||
""" | ||||
def pop_task(id=None): | ||||
Brian E Granger
|
r1395 | """Pops a task object from the queue. | ||
Brian E Granger
|
r1234 | |||
This gets the next task to be run. If no `id` is requested, the highest priority | ||||
task is returned. | ||||
:Parameters: | ||||
id | ||||
The id of the task to be popped. The default (None) is to return | ||||
the highest priority task. | ||||
Brian E Granger
|
r1395 | :Returns: an `ITask` implementer | ||
Brian E Granger
|
r1234 | |||
:Exceptions: | ||||
IndexError : raised if no taskid in queue | ||||
""" | ||||
def add_worker(worker, **flags): | ||||
"""Add a worker to the worker queue. | ||||
:Parameters: | ||||
Brian E Granger
|
r1395 | worker : an `IWorker` implementer | ||
flags : dict | ||||
General keywords for more sophisticated scheduling | ||||
Brian E Granger
|
r1234 | """ | ||
def pop_worker(id=None): | ||||
"""Pops an IWorker object that is ready to do work. | ||||
This gets the next IWorker that is ready to do work. | ||||
:Parameters: | ||||
id : if specified, will pop worker with workerid=id, else pops | ||||
highest priority worker. Defaults to None. | ||||
:Returns: | ||||
an IWorker object | ||||
:Exceptions: | ||||
IndexError : raised if no workerid in queue | ||||
""" | ||||
def ready(): | ||||
"""Returns True if there is something to do, False otherwise""" | ||||
def schedule(): | ||||
Brian E Granger
|
r1395 | """Returns (worker,task) pair for the next task to be run.""" | ||
Brian E Granger
|
r1234 | |||
class FIFOScheduler(object): | ||||
Brian E Granger
|
r1395 | """ | ||
A basic First-In-First-Out (Queue) Scheduler. | ||||
This is the default Scheduler for the `TaskController`. | ||||
See the docstrings for `IScheduler` for interface details. | ||||
Brian E Granger
|
r1234 | """ | ||
zi.implements(IScheduler) | ||||
def __init__(self): | ||||
self.tasks = [] | ||||
self.workers = [] | ||||
def _ntasks(self): | ||||
return len(self.tasks) | ||||
def _nworkers(self): | ||||
return len(self.workers) | ||||
ntasks = property(_ntasks, lambda self, _:None) | ||||
nworkers = property(_nworkers, lambda self, _:None) | ||||
def _taskids(self): | ||||
return [t.taskid for t in self.tasks] | ||||
def _workerids(self): | ||||
return [w.workerid for w in self.workers] | ||||
taskids = property(_taskids, lambda self,_:None) | ||||
workerids = property(_workerids, lambda self,_:None) | ||||
def add_task(self, task, **flags): | ||||
self.tasks.append(task) | ||||
def pop_task(self, id=None): | ||||
if id is None: | ||||
return self.tasks.pop(0) | ||||
else: | ||||
for i in range(len(self.tasks)): | ||||
taskid = self.tasks[i].taskid | ||||
if id == taskid: | ||||
return self.tasks.pop(i) | ||||
raise IndexError("No task #%i"%id) | ||||
def add_worker(self, worker, **flags): | ||||
self.workers.append(worker) | ||||
def pop_worker(self, id=None): | ||||
if id is None: | ||||
return self.workers.pop(0) | ||||
else: | ||||
for i in range(len(self.workers)): | ||||
workerid = self.workers[i].workerid | ||||
if id == workerid: | ||||
return self.workers.pop(i) | ||||
raise IndexError("No worker #%i"%id) | ||||
def schedule(self): | ||||
for t in self.tasks: | ||||
for w in self.workers: | ||||
try:# do not allow exceptions to break this | ||||
Brian E Granger
|
r1395 | # Allow the task to check itself using its | ||
# check_depend method. | ||||
cando = t.check_depend(w.properties) | ||||
Brian E Granger
|
r1234 | except: | ||
cando = False | ||||
if cando: | ||||
return self.pop_worker(w.workerid), self.pop_task(t.taskid) | ||||
return None, None | ||||
class LIFOScheduler(FIFOScheduler): | ||||
Brian E Granger
|
r1395 | """ | ||
A Last-In-First-Out (Stack) Scheduler. | ||||
This scheduler should naively reward fast engines by giving | ||||
them more jobs. This risks starvation, but only in cases with | ||||
low load, where starvation does not really matter. | ||||
Brian E Granger
|
r1234 | """ | ||
def add_task(self, task, **flags): | ||||
# self.tasks.reverse() | ||||
self.tasks.insert(0, task) | ||||
# self.tasks.reverse() | ||||
def add_worker(self, worker, **flags): | ||||
# self.workers.reverse() | ||||
self.workers.insert(0, worker) | ||||
# self.workers.reverse() | ||||
class ITaskController(cs.IControllerBase): | ||||
Brian E Granger
|
r1395 | """ | ||
The Task based interface to a `ControllerService` object | ||||
Brian E Granger
|
r1234 | |||
This adapts a `ControllerService` to the ITaskController interface. | ||||
""" | ||||
def run(task): | ||||
Brian E Granger
|
r1395 | """ | ||
Run a task. | ||||
Brian E Granger
|
r1234 | |||
:Parameters: | ||||
task : an IPython `Task` object | ||||
:Returns: the integer ID of the task | ||||
""" | ||||
def get_task_result(taskid, block=False): | ||||
Brian E Granger
|
r1395 | """ | ||
Get the result of a task by its ID. | ||||
Brian E Granger
|
r1234 | |||
:Parameters: | ||||
taskid : int | ||||
the id of the task whose result is requested | ||||
Brian E Granger
|
r1395 | :Returns: `Deferred` to the task result if the task is done, and None | ||
Brian E Granger
|
r1234 | if not. | ||
:Exceptions: | ||||
actualResult will be an `IndexError` if no such task has been submitted | ||||
""" | ||||
def abort(taskid): | ||||
"""Remove task from queue if task is has not been submitted. | ||||
If the task has already been submitted, wait for it to finish and discard | ||||
results and prevent resubmission. | ||||
:Parameters: | ||||
taskid : the id of the task to be aborted | ||||
:Returns: | ||||
`Deferred` to abort attempt completion. Will be None on success. | ||||
:Exceptions: | ||||
deferred will fail with `IndexError` if no such task has been submitted | ||||
or the task has already completed. | ||||
""" | ||||
def barrier(taskids): | ||||
Brian E Granger
|
r1395 | """ | ||
Block until the list of taskids are completed. | ||||
Brian E Granger
|
r1234 | |||
Returns None on success. | ||||
""" | ||||
def spin(): | ||||
Brian E Granger
|
r1395 | """ | ||
Touch the scheduler, to resume scheduling without submitting a task. | ||||
Brian E Granger
|
r1234 | """ | ||
Brian E Granger
|
r1395 | def queue_status(verbose=False): | ||
""" | ||||
Get a dictionary with the current state of the task queue. | ||||
Brian E Granger
|
r1234 | |||
If verbose is True, then return lists of taskids, otherwise, | ||||
return the number of tasks with each status. | ||||
""" | ||||
Brian E Granger
|
r1395 | def clear(): | ||
""" | ||||
Clear all previously run tasks from the task controller. | ||||
This is needed because the task controller keep all task results | ||||
in memory. This can be a problem is there are many completed | ||||
tasks. Users should call this periodically to clean out these | ||||
cached task results. | ||||
""" | ||||
Brian E Granger
|
r1234 | |||
class TaskController(cs.ControllerAdapterBase): | ||||
"""The Task based interface to a Controller object. | ||||
If you want to use a different scheduler, just subclass this and set | ||||
the `SchedulerClass` member to the *class* of your chosen scheduler. | ||||
""" | ||||
zi.implements(ITaskController) | ||||
SchedulerClass = FIFOScheduler | ||||
timeout = 30 | ||||
def __init__(self, controller): | ||||
self.controller = controller | ||||
self.controller.on_register_engine_do(self.registerWorker, True) | ||||
self.controller.on_unregister_engine_do(self.unregisterWorker, True) | ||||
self.taskid = 0 | ||||
self.failurePenalty = 1 # the time in seconds to penalize | ||||
# a worker for failing a task | ||||
self.pendingTasks = {} # dict of {workerid:(taskid, task)} | ||||
self.deferredResults = {} # dict of {taskid:deferred} | ||||
self.finishedResults = {} # dict of {taskid:actualResult} | ||||
self.workers = {} # dict of {workerid:worker} | ||||
self.abortPending = [] # dict of {taskid:abortDeferred} | ||||
self.idleLater = None # delayed call object for timeout | ||||
self.scheduler = self.SchedulerClass() | ||||
for id in self.controller.engines.keys(): | ||||
self.workers[id] = IWorker(self.controller.engines[id]) | ||||
self.workers[id].workerid = id | ||||
self.schedule.add_worker(self.workers[id]) | ||||
def registerWorker(self, id): | ||||
"""Called by controller.register_engine.""" | ||||
if self.workers.get(id): | ||||
Brian E Granger
|
r1395 | raise ValueError("worker with id %s already exists. This should not happen." % id) | ||
Brian E Granger
|
r1234 | self.workers[id] = IWorker(self.controller.engines[id]) | ||
self.workers[id].workerid = id | ||||
if not self.pendingTasks.has_key(id):# if not working | ||||
self.scheduler.add_worker(self.workers[id]) | ||||
self.distributeTasks() | ||||
def unregisterWorker(self, id): | ||||
"""Called by controller.unregister_engine""" | ||||
if self.workers.has_key(id): | ||||
try: | ||||
self.scheduler.pop_worker(id) | ||||
except IndexError: | ||||
pass | ||||
self.workers.pop(id) | ||||
def _pendingTaskIDs(self): | ||||
return [t.taskid for t in self.pendingTasks.values()] | ||||
#--------------------------------------------------------------------------- | ||||
# Interface methods | ||||
#--------------------------------------------------------------------------- | ||||
def run(self, task): | ||||
Brian E Granger
|
r1395 | """ | ||
Run a task and return `Deferred` to its taskid. | ||||
""" | ||||
Brian E Granger
|
r1234 | task.taskid = self.taskid | ||
task.start = time.localtime() | ||||
self.taskid += 1 | ||||
d = defer.Deferred() | ||||
self.scheduler.add_task(task) | ||||
Brian E Granger
|
r1395 | log.msg('Queuing task: %i' % task.taskid) | ||
Brian E Granger
|
r1234 | |||
self.deferredResults[task.taskid] = [] | ||||
self.distributeTasks() | ||||
return defer.succeed(task.taskid) | ||||
def get_task_result(self, taskid, block=False): | ||||
Brian E Granger
|
r1395 | """ | ||
Returns a `Deferred` to the task result, or None. | ||||
""" | ||||
log.msg("Getting task result: %i" % taskid) | ||||
Brian E Granger
|
r1234 | if self.finishedResults.has_key(taskid): | ||
tr = self.finishedResults[taskid] | ||||
return defer.succeed(tr) | ||||
elif self.deferredResults.has_key(taskid): | ||||
if block: | ||||
d = defer.Deferred() | ||||
self.deferredResults[taskid].append(d) | ||||
return d | ||||
else: | ||||
return defer.succeed(None) | ||||
else: | ||||
return defer.fail(IndexError("task ID not registered: %r" % taskid)) | ||||
def abort(self, taskid): | ||||
Brian E Granger
|
r1395 | """ | ||
Remove a task from the queue if it has not been run already. | ||||
""" | ||||
Brian E Granger
|
r1234 | if not isinstance(taskid, int): | ||
return defer.fail(failure.Failure(TypeError("an integer task id expected: %r" % taskid))) | ||||
try: | ||||
self.scheduler.pop_task(taskid) | ||||
except IndexError, e: | ||||
if taskid in self.finishedResults.keys(): | ||||
d = defer.fail(IndexError("Task Already Completed")) | ||||
elif taskid in self.abortPending: | ||||
d = defer.fail(IndexError("Task Already Aborted")) | ||||
elif taskid in self._pendingTaskIDs():# task is pending | ||||
self.abortPending.append(taskid) | ||||
d = defer.succeed(None) | ||||
else: | ||||
d = defer.fail(e) | ||||
else: | ||||
d = defer.execute(self._doAbort, taskid) | ||||
return d | ||||
def barrier(self, taskids): | ||||
dList = [] | ||||
if isinstance(taskids, int): | ||||
taskids = [taskids] | ||||
for id in taskids: | ||||
d = self.get_task_result(id, block=True) | ||||
dList.append(d) | ||||
d = DeferredList(dList, consumeErrors=1) | ||||
d.addCallbacks(lambda r: None) | ||||
return d | ||||
def spin(self): | ||||
return defer.succeed(self.distributeTasks()) | ||||
def queue_status(self, verbose=False): | ||||
pending = self._pendingTaskIDs() | ||||
failed = [] | ||||
succeeded = [] | ||||
for k,v in self.finishedResults.iteritems(): | ||||
if not isinstance(v, failure.Failure): | ||||
if hasattr(v,'failure'): | ||||
if v.failure is None: | ||||
succeeded.append(k) | ||||
else: | ||||
failed.append(k) | ||||
scheduled = self.scheduler.taskids | ||||
if verbose: | ||||
result = dict(pending=pending, failed=failed, | ||||
succeeded=succeeded, scheduled=scheduled) | ||||
else: | ||||
result = dict(pending=len(pending),failed=len(failed), | ||||
succeeded=len(succeeded),scheduled=len(scheduled)) | ||||
return defer.succeed(result) | ||||
#--------------------------------------------------------------------------- | ||||
# Queue methods | ||||
#--------------------------------------------------------------------------- | ||||
def _doAbort(self, taskid): | ||||
Brian E Granger
|
r1395 | """ | ||
Helper function for aborting a pending task. | ||||
""" | ||||
log.msg("Task aborted: %i" % taskid) | ||||
Brian E Granger
|
r1234 | result = failure.Failure(error.TaskAborted()) | ||
self._finishTask(taskid, result) | ||||
if taskid in self.abortPending: | ||||
self.abortPending.remove(taskid) | ||||
def _finishTask(self, taskid, result): | ||||
dlist = self.deferredResults.pop(taskid) | ||||
Brian E Granger
|
r1395 | # result.taskid = taskid # The TaskResult should save the taskid | ||
Brian E Granger
|
r1234 | self.finishedResults[taskid] = result | ||
for d in dlist: | ||||
d.callback(result) | ||||
def distributeTasks(self): | ||||
Brian E Granger
|
r1395 | """ | ||
Distribute tasks while self.scheduler has things to do. | ||||
""" | ||||
log.msg("distributing Tasks") | ||||
Brian E Granger
|
r1234 | worker, task = self.scheduler.schedule() | ||
if not worker and not task: | ||||
if self.idleLater and self.idleLater.called:# we are inside failIdle | ||||
self.idleLater = None | ||||
else: | ||||
self.checkIdle() | ||||
return False | ||||
# else something to do: | ||||
while worker and task: | ||||
# get worker and task | ||||
# add to pending | ||||
self.pendingTasks[worker.workerid] = task | ||||
# run/link callbacks | ||||
d = worker.run(task) | ||||
Brian E Granger
|
r1395 | log.msg("Running task %i on worker %i" %(task.taskid, worker.workerid)) | ||
Brian E Granger
|
r1234 | d.addBoth(self.taskCompleted, task.taskid, worker.workerid) | ||
worker, task = self.scheduler.schedule() | ||||
# check for idle timeout: | ||||
self.checkIdle() | ||||
return True | ||||
def checkIdle(self): | ||||
if self.idleLater and not self.idleLater.called: | ||||
self.idleLater.cancel() | ||||
if self.scheduler.ntasks and self.workers and \ | ||||
self.scheduler.nworkers == len(self.workers): | ||||
self.idleLater = reactor.callLater(self.timeout, self.failIdle) | ||||
else: | ||||
self.idleLater = None | ||||
def failIdle(self): | ||||
if not self.distributeTasks(): | ||||
while self.scheduler.ntasks: | ||||
t = self.scheduler.pop_task() | ||||
msg = "task %i failed to execute due to unmet dependencies"%t.taskid | ||||
msg += " for %i seconds"%self.timeout | ||||
Brian E Granger
|
r1395 | log.msg("Task aborted by timeout: %i" % t.taskid) | ||
Brian E Granger
|
r1234 | f = failure.Failure(error.TaskTimeout(msg)) | ||
self._finishTask(t.taskid, f) | ||||
self.idleLater = None | ||||
Brian E Granger
|
r1395 | def taskCompleted(self, success_and_result, taskid, workerid): | ||
Brian E Granger
|
r1234 | """This is the err/callback for a completed task.""" | ||
Brian E Granger
|
r1395 | success, result = success_and_result | ||
Brian E Granger
|
r1234 | try: | ||
task = self.pendingTasks.pop(workerid) | ||||
except: | ||||
# this should not happen | ||||
log.msg("Tried to pop bad pending task %i from worker %i"%(taskid, workerid)) | ||||
log.msg("Result: %r"%result) | ||||
log.msg("Pending tasks: %s"%self.pendingTasks) | ||||
return | ||||
# Check if aborted while pending | ||||
aborted = False | ||||
if taskid in self.abortPending: | ||||
self._doAbort(taskid) | ||||
aborted = True | ||||
if not aborted: | ||||
Brian E Granger
|
r1395 | if not success: | ||
Brian E Granger
|
r1234 | log.msg("Task %i failed on worker %i"% (taskid, workerid)) | ||
if task.retries > 0: # resubmit | ||||
task.retries -= 1 | ||||
self.scheduler.add_task(task) | ||||
s = "Resubmitting task %i, %i retries remaining" %(taskid, task.retries) | ||||
log.msg(s) | ||||
self.distributeTasks() | ||||
Brian E Granger
|
r1395 | elif isinstance(task.recovery_task, BaseTask) and \ | ||
Brian E Granger
|
r1234 | task.recovery_task.retries > -1: | ||
# retries = -1 is to prevent infinite recovery_task loop | ||||
task.retries = -1 | ||||
task.recovery_task.taskid = taskid | ||||
task = task.recovery_task | ||||
self.scheduler.add_task(task) | ||||
s = "Recovering task %i, %i retries remaining" %(taskid, task.retries) | ||||
log.msg(s) | ||||
self.distributeTasks() | ||||
else: # done trying | ||||
self._finishTask(taskid, result) | ||||
# wait a second before readmitting a worker that failed | ||||
# it may have died, and not yet been unregistered | ||||
reactor.callLater(self.failurePenalty, self.readmitWorker, workerid) | ||||
else: # we succeeded | ||||
Brian E Granger
|
r1395 | log.msg("Task completed: %i"% taskid) | ||
Brian E Granger
|
r1234 | self._finishTask(taskid, result) | ||
self.readmitWorker(workerid) | ||||
Brian E Granger
|
r1395 | else: # we aborted the task | ||
if not success: | ||||
Brian E Granger
|
r1234 | reactor.callLater(self.failurePenalty, self.readmitWorker, workerid) | ||
else: | ||||
self.readmitWorker(workerid) | ||||
def readmitWorker(self, workerid): | ||||
Brian E Granger
|
r1395 | """ | ||
Readmit a worker to the scheduler. | ||||
Brian E Granger
|
r1234 | |||
This is outside `taskCompleted` because of the `failurePenalty` being | ||||
implemented through `reactor.callLater`. | ||||
""" | ||||
if workerid in self.workers.keys() and workerid not in self.pendingTasks.keys(): | ||||
self.scheduler.add_worker(self.workers[workerid]) | ||||
self.distributeTasks() | ||||
Brian E Granger
|
r1395 | |||
def clear(self): | ||||
""" | ||||
Clear all previously run tasks from the task controller. | ||||
This is needed because the task controller keep all task results | ||||
in memory. This can be a problem is there are many completed | ||||
tasks. Users should call this periodically to clean out these | ||||
cached task results. | ||||
""" | ||||
self.finishedResults = {} | ||||
return defer.succeed(None) | ||||
Brian E Granger
|
r1234 | |||
components.registerAdapter(TaskController, cs.IControllerBase, ITaskController) | ||||