From 1feaf0a360e3cef8ca6927207cb02b13911f6546 2008-07-22 02:39:52 From: Brian E Granger Date: 2008-07-22 02:39:52 Subject: [PATCH] The refactoring of the Task system is nearly complete. Now there are multiple types of tasks including `StringTask` and `MapTask`. Each task type is responsible for running itself and processing its own result. This makes it much easier for people to create new task types. Also, the map and parallel function support has been completely refactored and improved. This includes a map and parallel function implementation for the task controller as well as a @parallel decorator. --- diff --git a/IPython/kernel/asyncclient.py b/IPython/kernel/asyncclient.py index a1542e2..586202b 100644 --- a/IPython/kernel/asyncclient.py +++ b/IPython/kernel/asyncclient.py @@ -27,7 +27,7 @@ from IPython.kernel import codeutil from IPython.kernel.clientconnector import ClientConnector # Other things that the user will need -from IPython.kernel.task import Task +from IPython.kernel.task import MapTask, StringTask from IPython.kernel.error import CompositeError #------------------------------------------------------------------------------- diff --git a/IPython/kernel/client.py b/IPython/kernel/client.py index 85d677b..efbc160 100644 --- a/IPython/kernel/client.py +++ b/IPython/kernel/client.py @@ -44,7 +44,7 @@ from IPython.kernel import codeutil import IPython.kernel.magic # Other things that the user will need -from IPython.kernel.task import Task +from IPython.kernel.task import MapTask, StringTask from IPython.kernel.error import CompositeError #------------------------------------------------------------------------------- diff --git a/IPython/kernel/magic.py b/IPython/kernel/magic.py index cefca8b..980c63b 100644 --- a/IPython/kernel/magic.py +++ b/IPython/kernel/magic.py @@ -79,7 +79,7 @@ def magic_px(self,parameter_s=''): except AttributeError: print NO_ACTIVE_CONTROLLER else: - print "Executing command on Controller" + print "Parallel execution on engines: %s" % activeController.targets result = activeController.execute(parameter_s) return result diff --git a/IPython/kernel/mapper.py b/IPython/kernel/mapper.py index b12121c..e732b53 100644 --- a/IPython/kernel/mapper.py +++ b/IPython/kernel/mapper.py @@ -4,39 +4,230 @@ __docformat__ = "restructuredtext en" -#------------------------------------------------------------------------------- +#---------------------------------------------------------------------------- # Copyright (C) 2008 The IPython Development Team # # Distributed under the terms of the BSD License. The full license is in # the file COPYING, distributed as part of this software. -#------------------------------------------------------------------------------- +#---------------------------------------------------------------------------- -#------------------------------------------------------------------------------- +#---------------------------------------------------------------------------- # Imports -#------------------------------------------------------------------------------- +#---------------------------------------------------------------------------- from types import FunctionType from zope.interface import Interface, implements +from IPython.kernel.task import MapTask +from IPython.kernel.twistedutil import DeferredList, gatherBoth +from IPython.kernel.util import printer +from IPython.kernel.error import collect_exceptions + +#---------------------------------------------------------------------------- +# Code +#---------------------------------------------------------------------------- class IMapper(Interface): + """The basic interface for a Mapper. + + This defines a generic interface for mapping. The idea of this is + similar to that of Python's builtin `map` function, which applies a function + elementwise to a sequence. + """ + + def map(func, *seqs): + """Do map in parallel. + + Equivalent to map(func, *seqs) or: + + [func(seqs[0][0], seqs[1][0],...), func(seqs[0][1], seqs[1][1],...),...] + + :Parameters: + func : FunctionType + The function to apply to the sequence + sequences : tuple of iterables + A sequence of iterables that are used for sucessive function + arguments. This work just like map + """ + +class IMultiEngineMapperFactory(Interface): + """ + An interface for something that creates `IMapper` instances. + """ + + def mapper(dist='b', targets='all', block=True): + """ + Create an `IMapper` implementer with a given set of arguments. + + The `IMapper` created using a multiengine controller is + not load balanced. + """ + +class ITaskMapperFactory(Interface): + """ + An interface for something that creates `IMapper` instances. + """ - def __call__(func, *sequences): - """Do map in parallel.""" + def mapper(clear_before=False, clear_after=False, retries=0, + recovery_task=None, depend=None, block=True): + """ + Create an `IMapper` implementer with a given set of arguments. + + The `IMapper` created using a task controller is load balanced. + + See the documentation for `IPython.kernel.task.BaseTask` for + documentation on the arguments to this method. + """ -class Mapper(object): + +class MultiEngineMapper(object): + """ + A Mapper for `IMultiEngine` implementers. + """ implements(IMapper) def __init__(self, multiengine, dist='b', targets='all', block=True): + """ + Create a Mapper for a multiengine. + + The value of all arguments are used for all calls to `map`. This + class allows these arguemnts to be set for a series of map calls. + + :Parameters: + multiengine : `IMultiEngine` implementer + The multiengine to use for running the map commands + dist : str + The type of decomposition to use. Only block ('b') is + supported currently + targets : (str, int, tuple of ints) + The engines to use in the map + block : boolean + Whether to block when the map is applied + """ self.multiengine = multiengine self.dist = dist self.targets = targets self.block = block - - def __call__(self, func, *sequences): - return self.map(func, *sequences) def map(self, func, *sequences): + """ + Apply func to *sequences elementwise. Like Python's builtin map. + + This version is not load balanced. + """ + max_len = max(len(s) for s in sequences) + for s in sequences: + if len(s)!=max_len: + raise ValueError('all sequences must have equal length') assert isinstance(func, (str, FunctionType)), "func must be a fuction or str" - return self.multiengine._map(func, sequences, dist=self.dist, - targets=self.targets, block=self.block) \ No newline at end of file + return self.multiengine.raw_map(func, sequences, dist=self.dist, + targets=self.targets, block=self.block) + +class TaskMapper(object): + """ + Make an `ITaskController` look like an `IMapper`. + + This class provides a load balanced version of `map`. + """ + + def __init__(self, task_controller, clear_before=False, clear_after=False, retries=0, + recovery_task=None, depend=None, block=True): + """ + Create a `IMapper` given a `TaskController` and arguments. + + The additional arguments are those that are common to all types of + tasks and are described in the documentation for + `IPython.kernel.task.BaseTask`. + + :Parameters: + task_controller : an `IBlockingTaskClient` implementer + The `TaskController` to use for calls to `map` + """ + self.task_controller = task_controller + self.clear_before = clear_before + self.clear_after = clear_after + self.retries = retries + self.recovery_task = recovery_task + self.depend = depend + self.block = block + + def map(self, func, *sequences): + """ + Apply func to *sequences elementwise. Like Python's builtin map. + + This version is load balanced. + """ + max_len = max(len(s) for s in sequences) + for s in sequences: + if len(s)!=max_len: + raise ValueError('all sequences must have equal length') + task_args = zip(*sequences) + task_ids = [] + dlist = [] + for ta in task_args: + task = MapTask(func, ta, clear_before=self.clear_before, + clear_after=self.clear_after, retries=self.retries, + recovery_task=self.recovery_task, depend=self.depend) + dlist.append(self.task_controller.run(task)) + dlist = gatherBoth(dlist, consumeErrors=1) + dlist.addCallback(collect_exceptions,'map') + if self.block: + def get_results(task_ids): + d = self.task_controller.barrier(task_ids) + d.addCallback(lambda _: gatherBoth([self.task_controller.get_task_result(tid) for tid in task_ids], consumeErrors=1)) + d.addCallback(collect_exceptions, 'map') + return d + dlist.addCallback(get_results) + return dlist + +class SynchronousTaskMapper(object): + """ + Make an `IBlockingTaskClient` look like an `IMapper`. + + This class provides a load balanced version of `map`. + """ + + def __init__(self, task_controller, clear_before=False, clear_after=False, retries=0, + recovery_task=None, depend=None, block=True): + """ + Create a `IMapper` given a `IBlockingTaskClient` and arguments. + + The additional arguments are those that are common to all types of + tasks and are described in the documentation for + `IPython.kernel.task.BaseTask`. + + :Parameters: + task_controller : an `IBlockingTaskClient` implementer + The `TaskController` to use for calls to `map` + """ + self.task_controller = task_controller + self.clear_before = clear_before + self.clear_after = clear_after + self.retries = retries + self.recovery_task = recovery_task + self.depend = depend + self.block = block + + def map(self, func, *sequences): + """ + Apply func to *sequences elementwise. Like Python's builtin map. + + This version is load balanced. + """ + max_len = max(len(s) for s in sequences) + for s in sequences: + if len(s)!=max_len: + raise ValueError('all sequences must have equal length') + task_args = zip(*sequences) + task_ids = [] + for ta in task_args: + task = MapTask(func, ta, clear_before=self.clear_before, + clear_after=self.clear_after, retries=self.retries, + recovery_task=self.recovery_task, depend=self.depend) + task_ids.append(self.task_controller.run(task)) + if self.block: + self.task_controller.barrier(task_ids) + task_results = [self.task_controller.get_task_result(tid) for tid in task_ids] + return task_results + else: + return task_ids \ No newline at end of file diff --git a/IPython/kernel/multiengine.py b/IPython/kernel/multiengine.py index 4dc675c..6468a04 100644 --- a/IPython/kernel/multiengine.py +++ b/IPython/kernel/multiengine.py @@ -659,17 +659,23 @@ class IMultiEngineCoordinator(Interface): def gather(key, dist='b', targets='all'): """Gather object key from targets.""" - def _map(func, seq, dist='b', targets='all'): - """A parallelized version of Python's builtin map.""" - - def map(func, *sequences): - """Do a basic map with default for dist and targets.""" - - def mapper(dist='b', targets='all'): - """Create a mapper with dist and targets.""" - - def parallel(dist='b', targets='all'): - """A decorator that build a parallel function.""" + def raw_map(func, seqs, dist='b', targets='all'): + """ + A parallelized version of Python's builtin `map` function. + + This has a slightly different syntax than the builtin `map`. + This is needed because we need to have keyword arguments and thus + can't use *args to capture all the sequences. Instead, they must + be passed in a list or tuple. + + The equivalence is: + + raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...) + + Most users will want to use parallel functions or the `mapper` + and `map` methods for an API that follows that of the builtin + `map`. + """ class ISynchronousMultiEngineCoordinator(IMultiEngineCoordinator): @@ -681,17 +687,21 @@ class ISynchronousMultiEngineCoordinator(IMultiEngineCoordinator): def gather(key, dist='b', targets='all', block=True): """Gather object key from targets""" - def _map(func, sequences, dist='b', targets='all', block=True): - """Perform an actual map.""" - - def map(func, *sequences): - """Do a basic map with default for dist and targets.""" - - def mapper(dist='b', targets='all', block=True): - """Create a mapper with dist, targets and block.""" - - def parallel(dist='b', targets='all', block=True): - """A decorator that build a parallel function.""" + def raw_map(func, seqs, dist='b', targets='all', block=True): + """ + A parallelized version of Python's builtin map. + + This has a slightly different syntax than the builtin `map`. + This is needed because we need to have keyword arguments and thus + can't use *args to capture all the sequences. Instead, they must + be passed in a list or tuple. + + raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...) + + Most users will want to use parallel functions or the `mapper` + and `map` methods for an API that follows that of the builtin + `map`. + """ #------------------------------------------------------------------------------- diff --git a/IPython/kernel/multiengineclient.py b/IPython/kernel/multiengineclient.py index 1db6fba..1d27037 100644 --- a/IPython/kernel/multiengineclient.py +++ b/IPython/kernel/multiengineclient.py @@ -31,7 +31,11 @@ from IPython.ColorANSI import TermColors from IPython.kernel.twistedutil import blockingCallFromThread from IPython.kernel import error from IPython.kernel.parallelfunction import ParallelFunction -from IPython.kernel.mapper import Mapper +from IPython.kernel.mapper import ( + MultiEngineMapper, + IMultiEngineMapperFactory, + IMapper +) from IPython.kernel import map as Map from IPython.kernel import multiengine as me from IPython.kernel.multiengine import (IFullMultiEngine, @@ -187,10 +191,14 @@ class ResultList(list): def __repr__(self): output = [] - blue = TermColors.Blue - normal = TermColors.Normal - red = TermColors.Red - green = TermColors.Green + # These colored prompts were not working on Windows + if sys.platform == 'win32': + blue = normal = red = green = '' + else: + blue = TermColors.Blue + normal = TermColors.Normal + red = TermColors.Red + green = TermColors.Green output.append("\n") for cmd in self: if isinstance(cmd, Failure): @@ -295,35 +303,7 @@ class InteractiveMultiEngineClient(object): def __len__(self): """Return the number of available engines.""" return len(self.get_ids()) - - def parallelize(self, func, targets=None, block=None): - """Build a `ParallelFunction` object for functionName on engines. - - The returned object will implement a parallel version of functionName - that takes a local sequence as its only argument and calls (in - parallel) functionName on each element of that sequence. The - `ParallelFunction` object has a `targets` attribute that controls - which engines the function is run on. - - :Parameters: - targets : int, list or 'all' - The engine ids the action will apply to. Call `get_ids` to see - a list of currently available engines. - functionName : str - A Python string that names a callable defined on the engines. - - :Returns: A `ParallelFunction` object. - - Examples - ======== - - >>> psin = rc.parallelize('all','lambda x:sin(x)') - >>> psin(range(10000)) - [0,2,4,9,25,36,...] - """ - targets, block = self._findTargetsAndBlock(targets, block) - return ParallelFunction(func, self, targets, block) - + #--------------------------------------------------------------------------- # Make this a context manager for with #--------------------------------------------------------------------------- @@ -423,7 +403,11 @@ class FullBlockingMultiEngineClient(InteractiveMultiEngineClient): engine, run code on it, etc. """ - implements(IFullBlockingMultiEngineClient) + implements( + IFullBlockingMultiEngineClient, + IMultiEngineMapperFactory, + IMapper + ) def __init__(self, smultiengine): self.smultiengine = smultiengine @@ -796,23 +780,83 @@ class FullBlockingMultiEngineClient(InteractiveMultiEngineClient): return self._blockFromThread(self.smultiengine.gather, key, dist, targets=targets, block=block) - def _map(self, func, seq, dist='b', targets=None, block=None): + def raw_map(self, func, seq, dist='b', targets=None, block=None): """ - A parallelized version of Python's builtin map + A parallelized version of Python's builtin map. + + This has a slightly different syntax than the builtin `map`. + This is needed because we need to have keyword arguments and thus + can't use *args to capture all the sequences. Instead, they must + be passed in a list or tuple. + + raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...) + + Most users will want to use parallel functions or the `mapper` + and `map` methods for an API that follows that of the builtin + `map`. """ targets, block = self._findTargetsAndBlock(targets, block) - return self._blockFromThread(self.smultiengine._map, func, seq, + return self._blockFromThread(self.smultiengine.raw_map, func, seq, dist, targets=targets, block=block) def map(self, func, *sequences): - return self.mapper()(func, *sequences) + """ + A parallel version of Python's builtin `map` function. + + This method applies a function to sequences of arguments. It + follows the same syntax as the builtin `map`. + + This method creates a mapper objects by calling `self.mapper` with + no arguments and then uses that mapper to do the mapping. See + the documentation of `mapper` for more details. + """ + return self.mapper().map(func, *sequences) def mapper(self, dist='b', targets='all', block=None): - return Mapper(self, dist, targets, block) + """ + Create a mapper object that has a `map` method. + + This method returns an object that implements the `IMapper` + interface. This method is a factory that is used to control how + the map happens. + + :Parameters: + dist : str + What decomposition to use, 'b' is the only one supported + currently + targets : str, int, sequence of ints + Which engines to use for the map + block : boolean + Should calls to `map` block or not + """ + return MultiEngineMapper(self, dist, targets, block) def parallel(self, dist='b', targets=None, block=None): + """ + A decorator that turns a function into a parallel function. + + This can be used as: + + @parallel() + def f(x, y) + ... + + f(range(10), range(10)) + + This causes f(0,0), f(1,1), ... to be called in parallel. + + :Parameters: + dist : str + What decomposition to use, 'b' is the only one supported + currently + targets : str, int, sequence of ints + Which engines to use for the map + block : boolean + Should calls to `map` block or not + """ targets, block = self._findTargetsAndBlock(targets, block) - pf = ParallelFunction(self, dist=dist, targets=targets, block=block) + mapper = self.mapper(dist, targets, block) + pf = ParallelFunction(mapper) return pf #--------------------------------------------------------------------------- diff --git a/IPython/kernel/multienginefc.py b/IPython/kernel/multienginefc.py index c0dc939..ec51e47 100644 --- a/IPython/kernel/multienginefc.py +++ b/IPython/kernel/multienginefc.py @@ -30,7 +30,11 @@ from IPython.kernel import error from IPython.kernel.util import printer from IPython.kernel import map as Map from IPython.kernel.parallelfunction import ParallelFunction -from IPython.kernel.mapper import Mapper +from IPython.kernel.mapper import ( + MultiEngineMapper, + IMultiEngineMapperFactory, + IMapper +) from IPython.kernel.twistedutil import gatherBoth from IPython.kernel.multiengine import (MultiEngine, IMultiEngine, @@ -282,7 +286,12 @@ components.registerAdapter(FCSynchronousMultiEngineFromMultiEngine, class FCFullSynchronousMultiEngineClient(object): - implements(IFullSynchronousMultiEngine, IBlockingClientAdaptor) + implements( + IFullSynchronousMultiEngine, + IBlockingClientAdaptor, + IMultiEngineMapperFactory, + IMapper + ) def __init__(self, remote_reference): self.remote_reference = remote_reference @@ -606,9 +615,20 @@ class FCFullSynchronousMultiEngineClient(object): d.addCallback(do_gather) return d - def _map(self, func, sequences, dist='b', targets='all', block=True): + def raw_map(self, func, sequences, dist='b', targets='all', block=True): """ - Call a callable on elements of a sequence. + A parallelized version of Python's builtin map. + + This has a slightly different syntax than the builtin `map`. + This is needed because we need to have keyword arguments and thus + can't use *args to capture all the sequences. Instead, they must + be passed in a list or tuple. + + raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...) + + Most users will want to use parallel functions or the `mapper` + and `map` methods for an API that follows that of the builtin + `map`. """ if not isinstance(sequences, (list, tuple)): raise TypeError('sequences must be a list or tuple') @@ -634,13 +654,62 @@ class FCFullSynchronousMultiEngineClient(object): return d def map(self, func, *sequences): - return self.mapper()(func, *sequences) + """ + A parallel version of Python's builtin `map` function. + + This method applies a function to sequences of arguments. It + follows the same syntax as the builtin `map`. + + This method creates a mapper objects by calling `self.mapper` with + no arguments and then uses that mapper to do the mapping. See + the documentation of `mapper` for more details. + """ + return self.mapper().map(func, *sequences) def mapper(self, dist='b', targets='all', block=True): - return Mapper(self, dist, targets, block) + """ + Create a mapper object that has a `map` method. + + This method returns an object that implements the `IMapper` + interface. This method is a factory that is used to control how + the map happens. + + :Parameters: + dist : str + What decomposition to use, 'b' is the only one supported + currently + targets : str, int, sequence of ints + Which engines to use for the map + block : boolean + Should calls to `map` block or not + """ + return MultiEngineMapper(self, dist, targets, block) def parallel(self, dist='b', targets='all', block=True): - pf = ParallelFunction(self, dist=dist, targets=targets, block=True) + """ + A decorator that turns a function into a parallel function. + + This can be used as: + + @parallel() + def f(x, y) + ... + + f(range(10), range(10)) + + This causes f(0,0), f(1,1), ... to be called in parallel. + + :Parameters: + dist : str + What decomposition to use, 'b' is the only one supported + currently + targets : str, int, sequence of ints + Which engines to use for the map + block : boolean + Should calls to `map` block or not + """ + mapper = self.mapper(dist, targets, block) + pf = ParallelFunction(mapper) return pf #--------------------------------------------------------------------------- diff --git a/IPython/kernel/parallelfunction.py b/IPython/kernel/parallelfunction.py index 28fdc36..077081a 100644 --- a/IPython/kernel/parallelfunction.py +++ b/IPython/kernel/parallelfunction.py @@ -19,29 +19,89 @@ from types import FunctionType from zope.interface import Interface, implements +class IMultiEngineParallelDecorator(Interface): + """A decorator that creates a parallel function.""" + + def parallel(dist='b', targets=None, block=None): + """ + A decorator that turns a function into a parallel function. + + This can be used as: + + @parallel() + def f(x, y) + ... + + f(range(10), range(10)) + + This causes f(0,0), f(1,1), ... to be called in parallel. + + :Parameters: + dist : str + What decomposition to use, 'b' is the only one supported + currently + targets : str, int, sequence of ints + Which engines to use for the map + block : boolean + Should calls to `map` block or not + """ + +class ITaskParallelDecorator(Interface): + """A decorator that creates a parallel function.""" + + def parallel(clear_before=False, clear_after=False, retries=0, + recovery_task=None, depend=None, block=True): + """ + A decorator that turns a function into a parallel function. + + This can be used as: + + @parallel() + def f(x, y) + ... + + f(range(10), range(10)) + + This causes f(0,0), f(1,1), ... to be called in parallel. + + See the documentation for `IPython.kernel.task.BaseTask` for + documentation on the arguments to this method. + """ + +class IParallelFunction(Interface): + pass + class ParallelFunction(object): """ - A decorator for building parallel functions. + The implementation of a parallel function. + + A parallel function is similar to Python's map function: + + map(func, *sequences) -> pfunc(*sequences) + + Parallel functions should be created by using the @parallel decorator. """ - def __init__(self, multiengine, dist='b', targets='all', block=True): + implements(IParallelFunction) + + def __init__(self, mapper): """ - Create a `ParallelFunction decorator`. + Create a parallel function from an `IMapper`. + + :Parameters: + mapper : an `IMapper` implementer. + The mapper to use for the parallel function """ - self.multiengine = multiengine - self.dist = dist - self.targets = targets - self.block = block + self.mapper = mapper def __call__(self, func): """ - Decorate the function to make it run in parallel. + Decorate a function to make it run in parallel. """ assert isinstance(func, (str, FunctionType)), "func must be a fuction or str" self.func = func def call_function(*sequences): - return self.multiengine._map(self.func, sequences, dist=self.dist, - targets=self.targets, block=self.block) + return self.mapper.map(self.func, *sequences) return call_function \ No newline at end of file diff --git a/IPython/kernel/task.py b/IPython/kernel/task.py index 31e5c3f..fc2ebea 100644 --- a/IPython/kernel/task.py +++ b/IPython/kernel/task.py @@ -5,16 +5,16 @@ __docformat__ = "restructuredtext en" -#------------------------------------------------------------------------------- +#----------------------------------------------------------------------------- # Copyright (C) 2008 The IPython Development Team # # Distributed under the terms of the BSD License. The full license is in # the file COPYING, distributed as part of this software. -#------------------------------------------------------------------------------- +#----------------------------------------------------------------------------- -#------------------------------------------------------------------------------- +#----------------------------------------------------------------------------- # Imports -#------------------------------------------------------------------------------- +#----------------------------------------------------------------------------- import copy, time from types import FunctionType @@ -23,108 +23,386 @@ import zope.interface as zi, string from twisted.internet import defer, reactor from twisted.python import components, log, failure -# from IPython.genutils import time - +from IPython.kernel.util import printer from IPython.kernel import engineservice as es, error from IPython.kernel import controllerservice as cs from IPython.kernel.twistedutil import gatherBoth, DeferredList -from IPython.kernel.pickleutil import can,uncan, CannedFunction - -def can_task(task): - t = copy.copy(task) - t.depend = can(t.depend) - t.expression = can(t.expression) - if t.recovery_task: - t.recovery_task = can_task(t.recovery_task) - return t +from IPython.kernel.pickleutil import can, uncan, CannedFunction -def uncan_task(task): - t = copy.copy(task) - t.depend = uncan(t.depend) - t.expression = uncan(t.expression) - if t.recovery_task and t.recovery_task is not task: - t.recovery_task = uncan_task(t.recovery_task) - return t +#----------------------------------------------------------------------------- +# Definition of the Task objects +#----------------------------------------------------------------------------- time_format = '%Y/%m/%d %H:%M:%S' -class Task(object): - """Our representation of a task for the `TaskController` interface. - - The user should create instances of this class to represent a task that - needs to be done. - - :Parameters: - expression : str - A str that is valid python code that is the task. - pull : str or list of str - The names of objects to be pulled as results. If not specified, - will return {'result', None} - push : dict - A dict of objects to be pushed into the engines namespace before - execution of the expression. - clear_before : boolean - Should the engine's namespace be cleared before the task is run. - Default=False. - clear_after : boolean - Should the engine's namespace be cleared after the task is run. - Default=False. - retries : int - The number of times to resumbit the task if it fails. Default=0. - recovery_task : Task - This is the Task to be run when the task has exhausted its retries - Default=None. - depend : bool function(properties) - This is the dependency function for the Task, which determines - whether a task can be run on a Worker. `depend` is called with - one argument, the worker's properties dict, and should return - True if the worker meets the dependencies or False if it does - not. - Default=None - run on any worker - options : dict - Any other keyword options for more elaborate uses of tasks - - Examples - -------- +class ITask(zi.Interface): + """ + This interface provides a generic definition of what constitutes a task. + + There are two sides to a task. First a task needs to take input from + a user to determine what work is performed by the task. Second, the + task needs to have the logic that knows how to turn that information + info specific calls to a worker, through the `IQueuedEngine` interface. - >>> t = Task('dostuff(args)') - >>> t = Task('a=5', pull='a') - >>> t = Task('a=5\nb=4', pull=['a','b']) - >>> t = Task('os.kill(os.getpid(),9)', retries=100) # this is a bad idea - # A dependency case: - >>> def hasMPI(props): - ... return props.get('mpi') is not None - >>> t = Task('mpi.send(blah,blah)', depend = hasMPI) + Many method in this class get two things passed to them: a Deferred + and an IQueuedEngine implementer. Such methods should register callbacks + on the Deferred that use the IQueuedEngine to accomplish something. See + the existing task objects for examples. """ - def __init__(self, expression, args=None, kwargs=None, pull=None, push=None, - clear_before=False, clear_after=False, retries=0, - recovery_task=None, depend=None, **options): - self.expression = expression + zi.Attribute('retries','How many times to retry the task') + zi.Attribute('recovery_task','A task to try if the initial one fails') + zi.Attribute('taskid','the id of the task') + + def start_time(result): + """ + Do anything needed to start the timing of the task. + + Must simply return the result after starting the timers. + """ + + def stop_time(result): + """ + Do anything needed to stop the timing of the task. + + Must simply return the result after stopping the timers. This + method will usually set attributes that are used by `process_result` + in building result of the task. + """ + + def pre_task(d, queued_engine): + """Do something with the queued_engine before the task is run. + + This method should simply add callbacks to the input Deferred + that do something with the `queued_engine` before the task is run. + + :Parameters: + d : Deferred + The deferred that actions should be attached to + queued_engine : IQueuedEngine implementer + The worker that has been allocated to perform the task + """ + + def post_task(d, queued_engine): + """Do something with the queued_engine after the task is run. + + This method should simply add callbacks to the input Deferred + that do something with the `queued_engine` before the task is run. + + :Parameters: + d : Deferred + The deferred that actions should be attached to + queued_engine : IQueuedEngine implementer + The worker that has been allocated to perform the task + """ + + def submit_task(d, queued_engine): + """Submit a task using the `queued_engine` we have been allocated. + + When a task is ready to run, this method is called. This method + must take the internal information of the task and make suitable + calls on the queued_engine to have the actual work done. + + This method should simply add callbacks to the input Deferred + that do something with the `queued_engine` before the task is run. + + :Parameters: + d : Deferred + The deferred that actions should be attached to + queued_engine : IQueuedEngine implementer + The worker that has been allocated to perform the task + """ + + def process_result(d, result, engine_id): + """Take a raw task result. + + Objects that implement `ITask` can choose how the result of running + the task is presented. This method takes the raw result and + does this logic. Two example are the `MapTask` which simply returns + the raw result or a `Failure` object and the `StringTask` which + returns a `TaskResult` object. + + :Parameters: + d : Deferred + The deferred that actions should be attached to + result : object + The raw task result that needs to be wrapped + engine_id : int + The id of the engine that did the task + + :Returns: + The result, as a tuple of the form: (success, result). + Here, success is a boolean indicating if the task + succeeded or failed and result is the result. + """ + + def check_depend(properties): + """Check properties to see if the task should be run. + + :Parameters: + properties : dict + A dictionary of properties that an engine has set + + :Returns: + True if the task should be run, False otherwise + """ + + def can_task(self): + """Serialize (can) any functions in the task for pickling. + + Subclasses must override this method and make sure that all + functions in the task are canned by calling `can` on the + function. + """ + + def uncan_task(self): + """Unserialize (uncan) any canned function in the task.""" + +class BaseTask(object): + """ + Common fuctionality for all objects implementing `ITask`. + """ + + zi.implements(ITask) + + def __init__(self, clear_before=False, clear_after=False, retries=0, + recovery_task=None, depend=None): + """ + Make a generic task. + + :Parameters: + clear_before : boolean + Should the engines namespace be cleared before the task + is run + clear_after : boolean + Should the engines namespace be clear after the task is run + retries : int + The number of times a task should be retries upon failure + recovery_task : any task object + If a task fails and it has a recovery_task, that is run + upon a retry + depend : FunctionType + A function that is called to test for properties. This function + must take one argument, the properties dict and return a boolean + """ + self.clear_before = clear_before + self.clear_after = clear_after + self.retries = retries + self.recovery_task = recovery_task + self.depend = depend + self.taskid = None + + def start_time(self, result): + """ + Start the basic timers. + """ + self.start = time.time() + self.start_struct = time.localtime() + return result + + def stop_time(self, result): + """ + Stop the basic timers. + """ + self.stop = time.time() + self.stop_struct = time.localtime() + self.duration = self.stop - self.start + self.submitted = time.strftime(time_format, self.start_struct) + self.completed = time.strftime(time_format) + return result + + def pre_task(self, d, queued_engine): + """ + Clear the engine before running the task if clear_before is set. + """ + if self.clear_before: + d.addCallback(lambda r: queued_engine.reset()) + + def post_task(self, d, queued_engine): + """ + Clear the engine after running the task if clear_after is set. + """ + def reseter(result): + queued_engine.reset() + return result + if self.clear_after: + d.addBoth(reseter) + + def submit_task(self, d, queued_engine): + raise NotImplementedError('submit_task must be implemented in a subclass') + + def process_result(self, result, engine_id): + """ + Process a task result. + + This is the default `process_result` that just returns the raw + result or a `Failure`. + """ + if isinstance(result, failure.Failure): + return (False, result) + else: + return (True, result) + + def check_depend(self, properties): + """ + Calls self.depend(properties) to see if a task should be run. + """ + if self.depend is not None: + return self.depend(properties) + else: + return True + + def can_task(self): + self.depend = can(self.depend) + if isinstance(self.recovery_task, BaseTask): + self.recovery_task.can_task() + + def uncan_task(self): + self.depend = uncan(self.depend) + if isinstance(self.recovery_task, BaseTask): + self.recovery_task.uncan_task() + +class MapTask(BaseTask): + """ + A task that consists of a function and arguments. + """ + + zi.implements(ITask) + + def __init__(self, function, args=None, kwargs=None, clear_before=False, + clear_after=False, retries=0, recovery_task=None, depend=None): + """ + Create a task based on a function, args and kwargs. + + This is a simple type of task that consists of calling: + function(*args, **kwargs) and wrapping the result in a `TaskResult`. + + The return value of the function, or a `Failure` wrapping an + exception is the task result for this type of task. + """ + BaseTask.__init__(self, clear_before, clear_after, retries, + recovery_task, depend) + if not isinstance(function, FunctionType): + raise TypeError('a task function must be a FunctionType') + self.function = function if args is None: self.args = () else: self.args = args + if not isinstance(self.args, (list, tuple)): + raise TypeError('a task args must be a list or tuple') if kwargs is None: self.kwargs = {} else: self.kwargs = kwargs - if isinstance(pull, str): - self.pull = [pull] - else: + if not isinstance(self.kwargs, dict): + raise TypeError('a task kwargs must be a dict') + + def submit_task(self, d, queued_engine): + d.addCallback(lambda r: queued_engine.push_function( + dict(_ipython_task_function=self.function)) + ) + d.addCallback(lambda r: queued_engine.push( + dict(_ipython_task_args=self.args,_ipython_task_kwargs=self.kwargs)) + ) + d.addCallback(lambda r: queued_engine.execute( + '_ipython_task_result = _ipython_task_function(*_ipython_task_args,**_ipython_task_kwargs)') + ) + d.addCallback(lambda r: queued_engine.pull('_ipython_task_result')) + + def can_task(self): + self.function = can(self.function) + BaseTask.can_task(self) + + def uncan_task(self): + self.function = uncan(self.function) + BaseTask.uncan_task(self) + + +class StringTask(BaseTask): + """ + A task that consists of a string of Python code to run. + """ + + def __init__(self, expression, pull=None, push=None, + clear_before=False, clear_after=False, retries=0, + recovery_task=None, depend=None): + """ + Create a task based on a Python expression and variables + + This type of task lets you push a set of variables to the engines + namespace, run a Python string in that namespace and then bring back + a different set of Python variables as the result. + + Because this type of task can return many results (through the + `pull` keyword argument) it returns a special `TaskResult` object + that wraps the pulled variables, statistics about the run and + any exceptions raised. + """ + if not isinstance(expression, str): + raise TypeError('a task expression must be a string') + self.expression = expression + + if pull==None: + self.pull = () + elif isinstance(pull, str): + self.pull = (pull,) + elif isinstance(pull, (list, tuple)): self.pull = pull - self.push = push - self.clear_before = clear_before - self.clear_after = clear_after - self.retries=retries - self.recovery_task = recovery_task - self.depend = depend - self.options = options - self.taskid = None + else: + raise TypeError('pull must be str or a sequence of strs') + + if push==None: + self.push = {} + elif isinstance(push, dict): + self.push = push + else: + raise TypeError('push must be a dict') + + BaseTask.__init__(self, clear_before, clear_after, retries, + recovery_task, depend) + + def submit_task(self, d, queued_engine): + if self.push is not None: + d.addCallback(lambda r: queued_engine.push(self.push)) + + d.addCallback(lambda r: queued_engine.execute(self.expression)) + + if self.pull is not None: + d.addCallback(lambda r: queued_engine.pull(self.pull)) + else: + d.addCallback(lambda r: None) + + def process_result(self, result, engine_id): + if isinstance(result, failure.Failure): + tr = TaskResult(result, engine_id) + else: + if self.pull is None: + resultDict = {} + elif len(self.pull) == 1: + resultDict = {self.pull[0]:result} + else: + resultDict = dict(zip(self.pull, result)) + tr = TaskResult(resultDict, engine_id) + # Assign task attributes + tr.submitted = self.submitted + tr.completed = self.completed + tr.duration = self.duration + if hasattr(self,'taskid'): + tr.taskid = self.taskid + else: + tr.taskid = None + if isinstance(result, failure.Failure): + return (False, tr) + else: + return (True, tr) -class ResultNS: - """The result namespace object for use in TaskResult objects as tr.ns. +class ResultNS(object): + """ + A dict like object for holding the results of a task. + + The result namespace object for use in `TaskResult` objects as tr.ns. It builds an object from a dictionary, such that it has attributes according to the key,value pairs of the dictionary. @@ -162,7 +440,7 @@ class ResultNS: class TaskResult(object): """ - An object for returning task results. + An object for returning task results for certain types of tasks. This object encapsulates the results of a task. On task success it will have a keys attribute that will have a list @@ -172,21 +450,21 @@ class TaskResult(object): In task failure, keys will be empty, but failure will contain the failure object that encapsulates the remote exception. - One can also simply call the raiseException() method of + One can also simply call the `raise_exception` method of this class to re-raise any remote exception in the local session. - The TaskResult has a .ns member, which is a property for access + The `TaskResult` has a `.ns` member, which is a property for access to the results. If the Task had pull=['a', 'b'], then the - Task Result will have attributes tr.ns.a, tr.ns.b for those values. - Accessing tr.ns will raise the remote failure if the task failed. + Task Result will have attributes `tr.ns.a`, `tr.ns.b` for those values. + Accessing `tr.ns` will raise the remote failure if the task failed. - The engineid attribute should have the engineid of the engine - that ran the task. But, because engines can come and go in - the ipython task system, the engineid may not continue to be + The `engineid` attribute should have the `engineid` of the engine + that ran the task. But, because engines can come and go, + the `engineid` may not continue to be valid or accurate. - The taskid attribute simply gives the taskid that the task + The `taskid` attribute simply gives the `taskid` that the task is tracked under. """ taskid = None @@ -224,15 +502,19 @@ class TaskResult(object): def __getitem__(self, key): if self.failure is not None: - self.raiseException() + self.raise_exception() return self.results[key] - def raiseException(self): + def raise_exception(self): """Re-raise any remote exceptions in the local python session.""" if self.failure is not None: self.failure.raiseException() +#----------------------------------------------------------------------------- +# The controller side of things +#----------------------------------------------------------------------------- + class IWorker(zi.Interface): """The Basic Worker Interface. @@ -247,12 +529,15 @@ class IWorker(zi.Interface): :Parameters: task : a `Task` object - :Returns: `Deferred` to a `TaskResult` object. + :Returns: `Deferred` to a tuple of (success, result) where + success if a boolean that signifies success or failure + and result is the task result. """ class WorkerFromQueuedEngine(object): """Adapt an `IQueuedEngine` to an `IWorker` object""" + zi.implements(IWorker) def __init__(self, qe): @@ -267,74 +552,27 @@ class WorkerFromQueuedEngine(object): def run(self, task): """Run task in worker's namespace. + This takes a task and calls methods on the task that actually + cause `self.queuedEngine` to do the task. See the methods of + `ITask` for more information about how these methods are called. + :Parameters: task : a `Task` object - :Returns: `Deferred` to a `TaskResult` object. + :Returns: `Deferred` to a tuple of (success, result) where + success if a boolean that signifies success or failure + and result is the task result. """ - if task.clear_before: - d = self.queuedEngine.reset() - else: - d = defer.succeed(None) - - if isinstance(task.expression, FunctionType): - d.addCallback(lambda r: self.queuedEngine.push_function( - dict(_ipython_task_function=task.expression)) - ) - d.addCallback(lambda r: self.queuedEngine.push( - dict(_ipython_task_args=task.args,_ipython_task_kwargs=task.kwargs)) - ) - d.addCallback(lambda r: self.queuedEngine.execute( - '_ipython_task_result = _ipython_task_function(*_ipython_task_args,**_ipython_task_kwargs)') - ) - d.addCallback(lambda r: self.queuedEngine.pull('_ipython_task_result')) - elif isinstance(task.expression, str): - if task.push is not None: - d.addCallback(lambda r: self.queuedEngine.push(task.push)) - - d.addCallback(lambda r: self.queuedEngine.execute(task.expression)) - - if task.pull is not None: - d.addCallback(lambda r: self.queuedEngine.pull(task.pull)) - else: - d.addCallback(lambda r: None) - else: - raise TypeError("task expression must be a str or function") - - def reseter(result): - self.queuedEngine.reset() - return result - - if task.clear_after: - d.addBoth(reseter) - - if isinstance(task.expression, FunctionType): - return d.addBoth(self._zipResults, None, time.time(), time.localtime()) - else: - return d.addBoth(self._zipResults, task.pull, time.time(), time.localtime()) - - def _zipResults(self, result, names, start, start_struct): - """Callback for construting the TaskResult object.""" - if isinstance(result, failure.Failure): - tr = TaskResult(result, self.queuedEngine.id) - else: - if names is None: - resultDict = {} - elif len(names) == 1: - resultDict = {names[0]:result} - else: - resultDict = dict(zip(names, result)) - tr = TaskResult(resultDict, self.queuedEngine.id) - if names is None: - tr.result = result - else: - tr.result = None - # the time info - tr.submitted = time.strftime(time_format, start_struct) - tr.completed = time.strftime(time_format) - tr.duration = time.time()-start - return tr - + d = defer.succeed(None) + d.addCallback(task.start_time) + task.pre_task(d, self.queuedEngine) + task.submit_task(d, self.queuedEngine) + task.post_task(d, self.queuedEngine) + d.addBoth(task.stop_time) + d.addBoth(task.process_result, self.queuedEngine.id) + # At this point, there will be (success, result) coming down the line + return d + components.registerAdapter(WorkerFromQueuedEngine, es.IEngineQueued, IWorker) @@ -350,14 +588,14 @@ class IScheduler(zi.Interface): """Add a task to the queue of the Scheduler. :Parameters: - task : a `Task` object + task : an `ITask` implementer The task to be queued. flags : dict General keywords for more sophisticated scheduling """ def pop_task(id=None): - """Pops a Task object. + """Pops a task object from the queue. This gets the next task to be run. If no `id` is requested, the highest priority task is returned. @@ -367,7 +605,7 @@ class IScheduler(zi.Interface): The id of the task to be popped. The default (None) is to return the highest priority task. - :Returns: a `Task` object + :Returns: an `ITask` implementer :Exceptions: IndexError : raised if no taskid in queue @@ -377,8 +615,9 @@ class IScheduler(zi.Interface): """Add a worker to the worker queue. :Parameters: - worker : an IWorker implementing object - flags : General keywords for more sophisticated scheduling + worker : an `IWorker` implementer + flags : dict + General keywords for more sophisticated scheduling """ def pop_worker(id=None): @@ -401,15 +640,15 @@ class IScheduler(zi.Interface): """Returns True if there is something to do, False otherwise""" def schedule(): - """Returns a tuple of the worker and task pair for the next - task to be run. - """ + """Returns (worker,task) pair for the next task to be run.""" class FIFOScheduler(object): - """A basic First-In-First-Out (Queue) Scheduler. - This is the default Scheduler for the TaskController. - See the docstrings for IScheduler for interface details. + """ + A basic First-In-First-Out (Queue) Scheduler. + + This is the default Scheduler for the `TaskController`. + See the docstrings for `IScheduler` for interface details. """ zi.implements(IScheduler) @@ -466,7 +705,9 @@ class FIFOScheduler(object): for t in self.tasks: for w in self.workers: try:# do not allow exceptions to break this - cando = t.depend is None or t.depend(w.properties) + # Allow the task to check itself using its + # check_depend method. + cando = t.check_depend(w.properties) except: cando = False if cando: @@ -476,9 +717,12 @@ class FIFOScheduler(object): class LIFOScheduler(FIFOScheduler): - """A Last-In-First-Out (Stack) Scheduler. This scheduler should naively - reward fast engines by giving them more jobs. This risks starvation, but - only in cases with low load, where starvation does not really matter. + """ + A Last-In-First-Out (Stack) Scheduler. + + This scheduler should naively reward fast engines by giving + them more jobs. This risks starvation, but only in cases with + low load, where starvation does not really matter. """ def add_task(self, task, **flags): @@ -493,13 +737,15 @@ class LIFOScheduler(FIFOScheduler): class ITaskController(cs.IControllerBase): - """The Task based interface to a `ControllerService` object + """ + The Task based interface to a `ControllerService` object This adapts a `ControllerService` to the ITaskController interface. """ def run(task): - """Run a task. + """ + Run a task. :Parameters: task : an IPython `Task` object @@ -508,13 +754,14 @@ class ITaskController(cs.IControllerBase): """ def get_task_result(taskid, block=False): - """Get the result of a task by its ID. + """ + Get the result of a task by its ID. :Parameters: taskid : int the id of the task whose result is requested - :Returns: `Deferred` to (taskid, actualResult) if the task is done, and None + :Returns: `Deferred` to the task result if the task is done, and None if not. :Exceptions: @@ -539,23 +786,35 @@ class ITaskController(cs.IControllerBase): """ def barrier(taskids): - """Block until the list of taskids are completed. + """ + Block until the list of taskids are completed. Returns None on success. """ def spin(): - """touch the scheduler, to resume scheduling without submitting - a task. + """ + Touch the scheduler, to resume scheduling without submitting a task. """ - def queue_status(self, verbose=False): - """Get a dictionary with the current state of the task queue. + def queue_status(verbose=False): + """ + Get a dictionary with the current state of the task queue. If verbose is True, then return lists of taskids, otherwise, return the number of tasks with each status. """ + def clear(): + """ + Clear all previously run tasks from the task controller. + + This is needed because the task controller keep all task results + in memory. This can be a problem is there are many completed + tasks. Users should call this periodically to clean out these + cached task results. + """ + class TaskController(cs.ControllerAdapterBase): """The Task based interface to a Controller object. @@ -592,7 +851,7 @@ class TaskController(cs.ControllerAdapterBase): def registerWorker(self, id): """Called by controller.register_engine.""" if self.workers.get(id): - raise "We already have one! This should not happen." + raise ValueError("worker with id %s already exists. This should not happen." % id) self.workers[id] = IWorker(self.controller.engines[id]) self.workers[id].workerid = id if not self.pendingTasks.has_key(id):# if not working @@ -617,21 +876,25 @@ class TaskController(cs.ControllerAdapterBase): #--------------------------------------------------------------------------- def run(self, task): - """Run a task and return `Deferred` to its taskid.""" + """ + Run a task and return `Deferred` to its taskid. + """ task.taskid = self.taskid task.start = time.localtime() self.taskid += 1 d = defer.Deferred() self.scheduler.add_task(task) - # log.msg('Queuing task: %i' % task.taskid) + log.msg('Queuing task: %i' % task.taskid) self.deferredResults[task.taskid] = [] self.distributeTasks() return defer.succeed(task.taskid) def get_task_result(self, taskid, block=False): - """Returns a `Deferred` to a TaskResult tuple or None.""" - # log.msg("Getting task result: %i" % taskid) + """ + Returns a `Deferred` to the task result, or None. + """ + log.msg("Getting task result: %i" % taskid) if self.finishedResults.has_key(taskid): tr = self.finishedResults[taskid] return defer.succeed(tr) @@ -646,7 +909,9 @@ class TaskController(cs.ControllerAdapterBase): return defer.fail(IndexError("task ID not registered: %r" % taskid)) def abort(self, taskid): - """Remove a task from the queue if it has not been run already.""" + """ + Remove a task from the queue if it has not been run already. + """ if not isinstance(taskid, int): return defer.fail(failure.Failure(TypeError("an integer task id expected: %r" % taskid))) try: @@ -705,8 +970,10 @@ class TaskController(cs.ControllerAdapterBase): #--------------------------------------------------------------------------- def _doAbort(self, taskid): - """Helper function for aborting a pending task.""" - # log.msg("Task aborted: %i" % taskid) + """ + Helper function for aborting a pending task. + """ + log.msg("Task aborted: %i" % taskid) result = failure.Failure(error.TaskAborted()) self._finishTask(taskid, result) if taskid in self.abortPending: @@ -714,14 +981,16 @@ class TaskController(cs.ControllerAdapterBase): def _finishTask(self, taskid, result): dlist = self.deferredResults.pop(taskid) - result.taskid = taskid # The TaskResult should save the taskid + # result.taskid = taskid # The TaskResult should save the taskid self.finishedResults[taskid] = result for d in dlist: d.callback(result) def distributeTasks(self): - """Distribute tasks while self.scheduler has things to do.""" - # log.msg("distributing Tasks") + """ + Distribute tasks while self.scheduler has things to do. + """ + log.msg("distributing Tasks") worker, task = self.scheduler.schedule() if not worker and not task: if self.idleLater and self.idleLater.called:# we are inside failIdle @@ -736,7 +1005,7 @@ class TaskController(cs.ControllerAdapterBase): self.pendingTasks[worker.workerid] = task # run/link callbacks d = worker.run(task) - # log.msg("Running task %i on worker %i" %(task.taskid, worker.workerid)) + log.msg("Running task %i on worker %i" %(task.taskid, worker.workerid)) d.addBoth(self.taskCompleted, task.taskid, worker.workerid) worker, task = self.scheduler.schedule() # check for idle timeout: @@ -758,14 +1027,15 @@ class TaskController(cs.ControllerAdapterBase): t = self.scheduler.pop_task() msg = "task %i failed to execute due to unmet dependencies"%t.taskid msg += " for %i seconds"%self.timeout - # log.msg("Task aborted by timeout: %i" % t.taskid) + log.msg("Task aborted by timeout: %i" % t.taskid) f = failure.Failure(error.TaskTimeout(msg)) self._finishTask(t.taskid, f) self.idleLater = None - def taskCompleted(self, result, taskid, workerid): + def taskCompleted(self, success_and_result, taskid, workerid): """This is the err/callback for a completed task.""" + success, result = success_and_result try: task = self.pendingTasks.pop(workerid) except: @@ -782,7 +1052,7 @@ class TaskController(cs.ControllerAdapterBase): aborted = True if not aborted: - if result.failure is not None and isinstance(result.failure, failure.Failure): # we failed + if not success: log.msg("Task %i failed on worker %i"% (taskid, workerid)) if task.retries > 0: # resubmit task.retries -= 1 @@ -790,7 +1060,7 @@ class TaskController(cs.ControllerAdapterBase): s = "Resubmitting task %i, %i retries remaining" %(taskid, task.retries) log.msg(s) self.distributeTasks() - elif isinstance(task.recovery_task, Task) and \ + elif isinstance(task.recovery_task, BaseTask) and \ task.recovery_task.retries > -1: # retries = -1 is to prevent infinite recovery_task loop task.retries = -1 @@ -806,17 +1076,18 @@ class TaskController(cs.ControllerAdapterBase): # it may have died, and not yet been unregistered reactor.callLater(self.failurePenalty, self.readmitWorker, workerid) else: # we succeeded - # log.msg("Task completed: %i"% taskid) + log.msg("Task completed: %i"% taskid) self._finishTask(taskid, result) self.readmitWorker(workerid) - else:# we aborted the task - if result.failure is not None and isinstance(result.failure, failure.Failure): # it failed, penalize worker + else: # we aborted the task + if not success: reactor.callLater(self.failurePenalty, self.readmitWorker, workerid) else: self.readmitWorker(workerid) def readmitWorker(self, workerid): - """Readmit a worker to the scheduler. + """ + Readmit a worker to the scheduler. This is outside `taskCompleted` because of the `failurePenalty` being implemented through `reactor.callLater`. @@ -825,6 +1096,18 @@ class TaskController(cs.ControllerAdapterBase): if workerid in self.workers.keys() and workerid not in self.pendingTasks.keys(): self.scheduler.add_worker(self.workers[workerid]) self.distributeTasks() + + def clear(self): + """ + Clear all previously run tasks from the task controller. + + This is needed because the task controller keep all task results + in memory. This can be a problem is there are many completed + tasks. Users should call this periodically to clean out these + cached task results. + """ + self.finishedResults = {} + return defer.succeed(None) components.registerAdapter(TaskController, cs.IControllerBase, ITaskController) diff --git a/IPython/kernel/taskclient.py b/IPython/kernel/taskclient.py index 405407a..dc95418 100644 --- a/IPython/kernel/taskclient.py +++ b/IPython/kernel/taskclient.py @@ -1,9 +1,8 @@ # encoding: utf-8 # -*- test-case-name: IPython.kernel.tests.test_taskcontrollerxmlrpc -*- -"""The Generic Task Client object. - -This must be subclassed based on your connection method. +""" +A blocking version of the task client. """ __docformat__ = "restructuredtext en" @@ -24,119 +23,100 @@ from twisted.python import components, log from IPython.kernel.twistedutil import blockingCallFromThread from IPython.kernel import task, error +from IPython.kernel.mapper import ( + SynchronousTaskMapper, + ITaskMapperFactory, + IMapper +) +from IPython.kernel.parallelfunction import ( + ParallelFunction, + ITaskParallelDecorator +) #------------------------------------------------------------------------------- -# Connecting Task Client +# The task client #------------------------------------------------------------------------------- -class InteractiveTaskClient(object): - - def irun(self, *args, **kwargs): - """Run a task on the `TaskController`. - - This method is a shorthand for run(task) and its arguments are simply - passed onto a `Task` object: - - irun(*args, **kwargs) -> run(Task(*args, **kwargs)) - - :Parameters: - expression : str - A str that is valid python code that is the task. - pull : str or list of str - The names of objects to be pulled as results. - push : dict - A dict of objects to be pushed into the engines namespace before - execution of the expression. - clear_before : boolean - Should the engine's namespace be cleared before the task is run. - Default=False. - clear_after : boolean - Should the engine's namespace be cleared after the task is run. - Default=False. - retries : int - The number of times to resumbit the task if it fails. Default=0. - options : dict - Any other keyword options for more elaborate uses of tasks - - :Returns: A `TaskResult` object. - """ - block = kwargs.pop('block', False) - if len(args) == 1 and isinstance(args[0], task.Task): - t = args[0] - else: - t = task.Task(*args, **kwargs) - taskid = self.run(t) - print "TaskID = %i"%taskid - if block: - return self.get_task_result(taskid, block) - else: - return taskid - class IBlockingTaskClient(Interface): """ - An interface for blocking task clients. + A vague interface of the blocking task client """ pass - -class BlockingTaskClient(InteractiveTaskClient): +class BlockingTaskClient(object): """ - This class provides a blocking task client. + A blocking task client that adapts a non-blocking one. """ - implements(IBlockingTaskClient) + implements( + IBlockingTaskClient, + ITaskMapperFactory, + IMapper, + ITaskParallelDecorator + ) def __init__(self, task_controller): self.task_controller = task_controller self.block = True - def run(self, task): - """ - Run a task and return a task id that can be used to get the task result. + def run(self, task, block=False): + """Run a task on the `TaskController`. + + See the documentation of the `MapTask` and `StringTask` classes for + details on how to build a task of different types. :Parameters: - task : `Task` - The `Task` object to run + task : an `ITask` implementer + + :Returns: The int taskid of the submitted task. Pass this to + `get_task_result` to get the `TaskResult` object. """ - return blockingCallFromThread(self.task_controller.run, task) + tid = blockingCallFromThread(self.task_controller.run, task) + if block: + return self.get_task_result(tid, block=True) + else: + return tid def get_task_result(self, taskid, block=False): """ - Get or poll for a task result. + Get a task result by taskid. :Parameters: taskid : int - The id of the task whose result to get + The taskid of the task to be retrieved. block : boolean - If True, wait until the task is done and then result the - `TaskResult` object. If False, just poll for the result and - return None if the task is not done. + Should I block until the task is done? + + :Returns: A `TaskResult` object that encapsulates the task result. """ return blockingCallFromThread(self.task_controller.get_task_result, taskid, block) def abort(self, taskid): """ - Abort a task by task id if it has not been started. + Abort a task by taskid. + + :Parameters: + taskid : int + The taskid of the task to be aborted. """ return blockingCallFromThread(self.task_controller.abort, taskid) def barrier(self, taskids): - """ - Wait for a set of tasks to finish. + """Block until a set of tasks are completed. :Parameters: - taskids : list of ints - A list of task ids to wait for. + taskids : list, tuple + A sequence of taskids to block on. """ return blockingCallFromThread(self.task_controller.barrier, taskids) def spin(self): """ - Cause the scheduler to schedule tasks. + Touch the scheduler, to resume scheduling without submitting a task. This method only needs to be called in unusual situations where the - scheduler is idle for some reason. + scheduler is idle for some reason. """ return blockingCallFromThread(self.task_controller.spin) @@ -153,7 +133,46 @@ class BlockingTaskClient(InteractiveTaskClient): A dict with the queue status. """ return blockingCallFromThread(self.task_controller.queue_status, verbose) + + def clear(self): + """ + Clear all previously run tasks from the task controller. + + This is needed because the task controller keep all task results + in memory. This can be a problem is there are many completed + tasks. Users should call this periodically to clean out these + cached task results. + """ + return blockingCallFromThread(self.task_controller.clear) + + def map(self, func, *sequences): + """ + Apply func to *sequences elementwise. Like Python's builtin map. + + This version is load balanced. + """ + return self.mapper().map(func, *sequences) + def mapper(self, clear_before=False, clear_after=False, retries=0, + recovery_task=None, depend=None, block=True): + """ + Create an `IMapper` implementer with a given set of arguments. + + The `IMapper` created using a task controller is load balanced. + + See the documentation for `IPython.kernel.task.BaseTask` for + documentation on the arguments to this method. + """ + return SynchronousTaskMapper(self, clear_before=clear_before, + clear_after=clear_after, retries=retries, + recovery_task=recovery_task, depend=depend, block=block) + + def parallel(self, clear_before=False, clear_after=False, retries=0, + recovery_task=None, depend=None, block=True): + mapper = self.mapper(clear_before, clear_after, retries, + recovery_task, depend, block) + pf = ParallelFunction(mapper) + return pf components.registerAdapter(BlockingTaskClient, task.ITaskController, IBlockingTaskClient) diff --git a/IPython/kernel/taskfc.py b/IPython/kernel/taskfc.py index 9d21723..c559f64 100644 --- a/IPython/kernel/taskfc.py +++ b/IPython/kernel/taskfc.py @@ -34,6 +34,15 @@ from IPython.kernel.clientinterfaces import ( IFCClientInterfaceProvider, IBlockingClientAdaptor ) +from IPython.kernel.mapper import ( + TaskMapper, + ITaskMapperFactory, + IMapper +) +from IPython.kernel.parallelfunction import ( + ParallelFunction, + ITaskParallelDecorator +) #------------------------------------------------------------------------------- # The Controller side of things @@ -43,32 +52,38 @@ from IPython.kernel.clientinterfaces import ( class IFCTaskController(Interface): """Foolscap interface to task controller. - See the documentation of ITaskController for documentation about the methods. + See the documentation of `ITaskController` for more information. """ - def remote_run(request, binTask): + def remote_run(binTask): """""" - def remote_abort(request, taskid): + def remote_abort(taskid): """""" - def remote_get_task_result(request, taskid, block=False): + def remote_get_task_result(taskid, block=False): """""" - def remote_barrier(request, taskids): + def remote_barrier(taskids): + """""" + + def remote_spin(): """""" - def remote_spin(request): + def remote_queue_status(verbose): """""" - def remote_queue_status(request, verbose): + def remote_clear(): """""" class FCTaskControllerFromTaskController(Referenceable): - """XML-RPC attachmeot for controller. - - See IXMLRPCTaskController and ITaskController (and its children) for documentation. """ + Adapt a `TaskController` to an `IFCTaskController` + + This class is used to expose a `TaskController` over the wire using + the Foolscap network protocol. + """ + implements(IFCTaskController, IFCClientInterfaceProvider) def __init__(self, taskController): @@ -92,8 +107,8 @@ class FCTaskControllerFromTaskController(Referenceable): def remote_run(self, ptask): try: - ctask = pickle.loads(ptask) - task = taskmodule.uncan_task(ctask) + task = pickle.loads(ptask) + task.uncan_task() except: d = defer.fail(pickle.UnpickleableError("Could not unmarshal task")) else: @@ -132,6 +147,9 @@ class FCTaskControllerFromTaskController(Referenceable): d.addErrback(self.packageFailure) return d + def remote_clear(self): + return self.taskController.clear() + def remote_get_client_name(self): return 'IPython.kernel.taskfc.FCTaskClient' @@ -144,13 +162,23 @@ components.registerAdapter(FCTaskControllerFromTaskController, #------------------------------------------------------------------------------- class FCTaskClient(object): - """XML-RPC based TaskController client that implements ITaskController. - - :Parameters: - addr : (ip, port) - The ip (str) and port (int) tuple of the `TaskController`. """ - implements(taskmodule.ITaskController, IBlockingClientAdaptor) + Client class for Foolscap exposed `TaskController`. + + This class is an adapter that makes a `RemoteReference` to a + `TaskController` look like an actual `ITaskController` on the client side. + + This class also implements `IBlockingClientAdaptor` so that clients can + automatically get a blocking version of this class. + """ + + implements( + taskmodule.ITaskController, + IBlockingClientAdaptor, + ITaskMapperFactory, + IMapper, + ITaskParallelDecorator + ) def __init__(self, remote_reference): self.remote_reference = remote_reference @@ -168,48 +196,26 @@ class FCTaskClient(object): def run(self, task): """Run a task on the `TaskController`. - :Parameters: - task : a `Task` object - - The Task object is created using the following signature: - - Task(expression, pull=None, push={}, clear_before=False, - clear_after=False, retries=0, **options):) + See the documentation of the `MapTask` and `StringTask` classes for + details on how to build a task of different types. - The meaning of the arguments is as follows: + :Parameters: + task : an `ITask` implementer - :Task Parameters: - expression : str - A str that is valid python code that is the task. - pull : str or list of str - The names of objects to be pulled as results. - push : dict - A dict of objects to be pushed into the engines namespace before - execution of the expression. - clear_before : boolean - Should the engine's namespace be cleared before the task is run. - Default=False. - clear_after : boolean - Should the engine's namespace be cleared after the task is run. - Default=False. - retries : int - The number of times to resumbit the task if it fails. Default=0. - options : dict - Any other keyword options for more elaborate uses of tasks - :Returns: The int taskid of the submitted task. Pass this to `get_task_result` to get the `TaskResult` object. """ - assert isinstance(task, taskmodule.Task), "task must be a Task object!" - ctask = taskmodule.can_task(task) # handles arbitrary function in .depend - # as well as arbitrary recovery_task chains - ptask = pickle.dumps(ctask, 2) + assert isinstance(task, taskmodule.BaseTask), "task must be a Task object!" + task.can_task() + ptask = pickle.dumps(task, 2) + task.uncan_task() d = self.remote_reference.callRemote('run', ptask) d.addCallback(self.unpackage) return d def get_task_result(self, taskid, block=False): - """The task result by taskid. + """ + Get a task result by taskid. :Parameters: taskid : int @@ -224,20 +230,19 @@ class FCTaskClient(object): return d def abort(self, taskid): - """Abort a task by taskid. + """ + Abort a task by taskid. :Parameters: taskid : int The taskid of the task to be aborted. - block : boolean - Should I block until the task is aborted. """ d = self.remote_reference.callRemote('abort', taskid) d.addCallback(self.unpackage) return d def barrier(self, taskids): - """Block until all tasks are completed. + """Block until a set of tasks are completed. :Parameters: taskids : list, tuple @@ -248,20 +253,77 @@ class FCTaskClient(object): return d def spin(self): - """touch the scheduler, to resume scheduling without submitting - a task. + """ + Touch the scheduler, to resume scheduling without submitting a task. + + This method only needs to be called in unusual situations where the + scheduler is idle for some reason. """ d = self.remote_reference.callRemote('spin') d.addCallback(self.unpackage) return d def queue_status(self, verbose=False): - """Return a dict with the status of the task queue.""" + """ + Get a dictionary with the current state of the task queue. + + :Parameters: + verbose : boolean + If True, return a list of taskids. If False, simply give + the number of tasks with each status. + + :Returns: + A dict with the queue status. + """ d = self.remote_reference.callRemote('queue_status', verbose) d.addCallback(self.unpackage) return d + def clear(self): + """ + Clear all previously run tasks from the task controller. + + This is needed because the task controller keep all task results + in memory. This can be a problem is there are many completed + tasks. Users should call this periodically to clean out these + cached task results. + """ + d = self.remote_reference.callRemote('clear') + return d + def adapt_to_blocking_client(self): + """ + Wrap self in a blocking version that implements `IBlockingTaskClient. + """ from IPython.kernel.taskclient import IBlockingTaskClient return IBlockingTaskClient(self) + + def map(self, func, *sequences): + """ + Apply func to *sequences elementwise. Like Python's builtin map. + + This version is load balanced. + """ + return self.mapper().map(func, *sequences) + + def mapper(self, clear_before=False, clear_after=False, retries=0, + recovery_task=None, depend=None, block=True): + """ + Create an `IMapper` implementer with a given set of arguments. + + The `IMapper` created using a task controller is load balanced. + + See the documentation for `IPython.kernel.task.BaseTask` for + documentation on the arguments to this method. + """ + return TaskMapper(self, clear_before=clear_before, + clear_after=clear_after, retries=retries, + recovery_task=recovery_task, depend=depend, block=block) + + def parallel(self, clear_before=False, clear_after=False, retries=0, + recovery_task=None, depend=None, block=True): + mapper = self.mapper(clear_before, clear_after, retries, + recovery_task, depend, block) + pf = ParallelFunction(mapper) + return pf diff --git a/IPython/kernel/tests/tasktest.py b/IPython/kernel/tests/tasktest.py index 10a0a35..bddc8aa 100644 --- a/IPython/kernel/tests/tasktest.py +++ b/IPython/kernel/tests/tasktest.py @@ -43,23 +43,23 @@ class TaskTestBase(object): class ITaskControllerTestCase(TaskTestBase): - def testTaskIDs(self): + def test_task_ids(self): self.addEngine(1) - d = self.tc.run(task.Task('a=5')) + d = self.tc.run(task.StringTask('a=5')) d.addCallback(lambda r: self.assertEquals(r, 0)) - d.addCallback(lambda r: self.tc.run(task.Task('a=5'))) + d.addCallback(lambda r: self.tc.run(task.StringTask('a=5'))) d.addCallback(lambda r: self.assertEquals(r, 1)) - d.addCallback(lambda r: self.tc.run(task.Task('a=5'))) + d.addCallback(lambda r: self.tc.run(task.StringTask('a=5'))) d.addCallback(lambda r: self.assertEquals(r, 2)) - d.addCallback(lambda r: self.tc.run(task.Task('a=5'))) + d.addCallback(lambda r: self.tc.run(task.StringTask('a=5'))) d.addCallback(lambda r: self.assertEquals(r, 3)) return d - def testAbort(self): + def test_abort(self): """Cannot do a proper abort test, because blocking execution prevents abort from being called before task completes""" self.addEngine(1) - t = task.Task('a=5') + t = task.StringTask('a=5') d = self.tc.abort(0) d.addErrback(lambda f: self.assertRaises(IndexError, f.raiseException)) d.addCallback(lambda _:self.tc.run(t)) @@ -67,15 +67,15 @@ class ITaskControllerTestCase(TaskTestBase): d.addErrback(lambda f: self.assertRaises(IndexError, f.raiseException)) return d - def testAbortType(self): + def test_abort_type(self): self.addEngine(1) d = self.tc.abort('asdfadsf') d.addErrback(lambda f: self.assertRaises(TypeError, f.raiseException)) return d - def testClears(self): + def test_clear_before_and_after(self): self.addEngine(1) - t = task.Task('a=1', clear_before=True, pull='b', clear_after=True) + t = task.StringTask('a=1', clear_before=True, pull='b', clear_after=True) d = self.multiengine.execute('b=1', targets=0) d.addCallback(lambda _: self.tc.run(t)) d.addCallback(lambda tid: self.tc.get_task_result(tid,block=True)) @@ -85,10 +85,10 @@ class ITaskControllerTestCase(TaskTestBase): d.addErrback(lambda f: self.assertRaises(NameError, _raise_it, f)) return d - def testSimpleRetries(self): + def test_simple_retries(self): self.addEngine(1) - t = task.Task("i += 1\nassert i == 16", pull='i',retries=10) - t2 = task.Task("i += 1\nassert i == 16", pull='i',retries=10) + t = task.StringTask("i += 1\nassert i == 16", pull='i',retries=10) + t2 = task.StringTask("i += 1\nassert i == 16", pull='i',retries=10) d = self.multiengine.execute('i=0', targets=0) d.addCallback(lambda r: self.tc.run(t)) d.addCallback(self.tc.get_task_result, block=True) @@ -101,10 +101,10 @@ class ITaskControllerTestCase(TaskTestBase): d.addCallback(lambda r: self.assertEquals(r, 16)) return d - def testRecoveryTasks(self): + def test_recovery_tasks(self): self.addEngine(1) - t = task.Task("i=16", pull='i') - t2 = task.Task("raise Exception", recovery_task=t, retries = 2) + t = task.StringTask("i=16", pull='i') + t2 = task.StringTask("raise Exception", recovery_task=t, retries = 2) d = self.tc.run(t2) d.addCallback(self.tc.get_task_result, block=True) @@ -112,47 +112,76 @@ class ITaskControllerTestCase(TaskTestBase): d.addCallback(lambda r: self.assertEquals(r, 16)) return d - # def testInfiniteRecoveryLoop(self): - # self.addEngine(1) - # t = task.Task("raise Exception", retries = 5) - # t2 = task.Task("assert True", retries = 2, recovery_task = t) - # t.recovery_task = t2 - # - # d = self.tc.run(t) - # d.addCallback(self.tc.get_task_result, block=True) - # d.addCallback(lambda tr: tr.ns.i) - # d.addBoth(printer) - # d.addErrback(lambda f: self.assertRaises(AssertionError, f.raiseException)) - # return d - # - def testSetupNS(self): + def test_setup_ns(self): self.addEngine(1) d = self.multiengine.execute('a=0', targets=0) ns = dict(a=1, b=0) - t = task.Task("", push=ns, pull=['a','b']) + t = task.StringTask("", push=ns, pull=['a','b']) d.addCallback(lambda r: self.tc.run(t)) d.addCallback(self.tc.get_task_result, block=True) d.addCallback(lambda tr: {'a':tr.ns.a, 'b':tr['b']}) d.addCallback(lambda r: self.assertEquals(r, ns)) return d - def testTaskResults(self): + def test_string_task_results(self): self.addEngine(1) - t1 = task.Task('a=5', pull='a') + t1 = task.StringTask('a=5', pull='a') d = self.tc.run(t1) d.addCallback(self.tc.get_task_result, block=True) - d.addCallback(lambda tr: (tr.ns.a,tr['a'],tr.failure, tr.raiseException())) + d.addCallback(lambda tr: (tr.ns.a,tr['a'],tr.failure, tr.raise_exception())) d.addCallback(lambda r: self.assertEquals(r, (5,5,None,None))) - t2 = task.Task('7=5') + t2 = task.StringTask('7=5') d.addCallback(lambda r: self.tc.run(t2)) d.addCallback(self.tc.get_task_result, block=True) d.addCallback(lambda tr: tr.ns) d.addErrback(lambda f: self.assertRaises(SyntaxError, f.raiseException)) - t3 = task.Task('', pull='b') + t3 = task.StringTask('', pull='b') d.addCallback(lambda r: self.tc.run(t3)) d.addCallback(self.tc.get_task_result, block=True) d.addCallback(lambda tr: tr.ns) d.addErrback(lambda f: self.assertRaises(NameError, f.raiseException)) return d + + def test_map_task(self): + self.addEngine(1) + t1 = task.MapTask(lambda x: 2*x,(10,)) + d = self.tc.run(t1) + d.addCallback(self.tc.get_task_result, block=True) + d.addCallback(lambda r: self.assertEquals(r,20)) + + t2 = task.MapTask(lambda : 20) + d.addCallback(lambda _: self.tc.run(t2)) + d.addCallback(self.tc.get_task_result, block=True) + d.addCallback(lambda r: self.assertEquals(r,20)) + + t3 = task.MapTask(lambda x: x,(),{'x':20}) + d.addCallback(lambda _: self.tc.run(t3)) + d.addCallback(self.tc.get_task_result, block=True) + d.addCallback(lambda r: self.assertEquals(r,20)) + return d + + def test_map_task_failure(self): + self.addEngine(1) + t1 = task.MapTask(lambda x: 1/0,(10,)) + d = self.tc.run(t1) + d.addCallback(self.tc.get_task_result, block=True) + d.addErrback(lambda f: self.assertRaises(ZeroDivisionError, f.raiseException)) + return d + + def test_map_task_args(self): + self.assertRaises(TypeError, task.MapTask, 'asdfasdf') + self.assertRaises(TypeError, task.MapTask, lambda x: x, 10) + self.assertRaises(TypeError, task.MapTask, lambda x: x, (10,),30) + + def test_clear(self): + self.addEngine(1) + t1 = task.MapTask(lambda x: 2*x,(10,)) + d = self.tc.run(t1) + d.addCallback(lambda _: self.tc.get_task_result(0, block=True)) + d.addCallback(lambda r: self.assertEquals(r,20)) + d.addCallback(lambda _: self.tc.clear()) + d.addCallback(lambda _: self.tc.get_task_result(0, block=True)) + d.addErrback(lambda f: self.assertRaises(IndexError, f.raiseException)) + return d diff --git a/IPython/kernel/tests/test_taskfc.py b/IPython/kernel/tests/test_taskfc.py index 1e15317..e2b4122 100644 --- a/IPython/kernel/tests/test_taskfc.py +++ b/IPython/kernel/tests/test_taskfc.py @@ -30,6 +30,8 @@ try: from IPython.kernel.util import printer from IPython.kernel.tests.tasktest import ITaskControllerTestCase from IPython.kernel.clientconnector import ClientConnector + from IPython.kernel.error import CompositeError + from IPython.kernel.parallelfunction import ParallelFunction except ImportError: pass else: @@ -38,6 +40,12 @@ else: # Tests #------------------------------------------------------------------------------- + def _raise_it(f): + try: + f.raiseException() + except CompositeError, e: + e.raise_exception() + class TaskTest(DeferredTestCase, ITaskControllerTestCase): def setUp(self): @@ -87,4 +95,67 @@ else: d.addBoth(lambda _: self.controller.stopService()) dlist.append(d) return defer.DeferredList(dlist) - + + def test_mapper(self): + self.addEngine(1) + m = self.tc.mapper() + self.assertEquals(m.task_controller,self.tc) + self.assertEquals(m.clear_before,False) + self.assertEquals(m.clear_after,False) + self.assertEquals(m.retries,0) + self.assertEquals(m.recovery_task,None) + self.assertEquals(m.depend,None) + self.assertEquals(m.block,True) + + def test_map_default(self): + self.addEngine(1) + m = self.tc.mapper() + d = m.map(lambda x: 2*x, range(10)) + d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)])) + d.addCallback(lambda _: self.tc.map(lambda x: 2*x, range(10))) + d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)])) + return d + + def test_map_noblock(self): + self.addEngine(1) + m = self.tc.mapper(block=False) + d = m.map(lambda x: 2*x, range(10)) + d.addCallback(lambda r: self.assertEquals(r,[x for x in range(10)])) + return d + + def test_mapper_fail(self): + self.addEngine(1) + m = self.tc.mapper() + d = m.map(lambda x: 1/0, range(10)) + d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f)) + return d + + def test_parallel(self): + self.addEngine(1) + p = self.tc.parallel() + self.assert_(isinstance(p, ParallelFunction)) + @p + def f(x): return 2*x + d = f(range(10)) + d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)])) + return d + + def test_parallel_noblock(self): + self.addEngine(1) + p = self.tc.parallel(block=False) + self.assert_(isinstance(p, ParallelFunction)) + @p + def f(x): return 2*x + d = f(range(10)) + d.addCallback(lambda r: self.assertEquals(r,[x for x in range(10)])) + return d + + def test_parallel_fail(self): + self.addEngine(1) + p = self.tc.parallel() + self.assert_(isinstance(p, ParallelFunction)) + @p + def f(x): return 1/0 + d = f(range(10)) + d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f)) + return d \ No newline at end of file diff --git a/docs/examples/kernel/mcdriver.py b/docs/examples/kernel/mcdriver.py index d85fac9..2a5a8e1 100644 --- a/docs/examples/kernel/mcdriver.py +++ b/docs/examples/kernel/mcdriver.py @@ -31,7 +31,7 @@ sigma_vals = N.linspace(0.0, 0.2,5) taskids = [] for K in K_vals: for sigma in sigma_vals: - t = client.Task(task_string, + t = client.StringTask(task_string, push=dict(sigma=sigma,K=K), pull=('vp','ap','vc','ac','sigma','K')) taskids.append(tc.run(t)) diff --git a/docs/examples/kernel/multienginemap.py b/docs/examples/kernel/multienginemap.py new file mode 100644 index 0000000..016e0a8 --- /dev/null +++ b/docs/examples/kernel/multienginemap.py @@ -0,0 +1,18 @@ +from IPython.kernel import client + +mec = client.MultiEngineClient() + +result = mec.map(lambda x: 2*x, range(10)) +print "Simple, default map: ", result + +m = mec.mapper(block=False) +pr = m.map(lambda x: 2*x, range(10)) +print "Submitted map, got PendingResult: ", pr +result = pr.r +print "Using a mapper: ", result + +@mec.parallel() +def f(x): return 2*x + +result = f(range(10)) +print "Using a parallel function: ", result \ No newline at end of file diff --git a/docs/examples/kernel/task1.py b/docs/examples/kernel/task1.py index d233ef1..7206faf 100644 --- a/docs/examples/kernel/task1.py +++ b/docs/examples/kernel/task1.py @@ -11,7 +11,7 @@ b = 10*d c = a*b*d """ -t1 = client.Task(cmd1, clear_before=False, clear_after=True, pull=['a','b','c']) +t1 = client.StringTask(cmd1, clear_before=False, clear_after=True, pull=['a','b','c']) tid1 = tc.run(t1) tr1 = tc.get_task_result(tid1,block=True) tr1.raiseException() diff --git a/docs/examples/kernel/task2.py b/docs/examples/kernel/task2.py index 7a3c03d..661ad22 100644 --- a/docs/examples/kernel/task2.py +++ b/docs/examples/kernel/task2.py @@ -10,7 +10,7 @@ mec = client.MultiEngineClient() mec.execute('import time') for i in range(24): - tc.irun('time.sleep(1)') + tc.run(client.StringTask('time.sleep(1)')) for i in range(6): time.sleep(1.0) @@ -18,7 +18,7 @@ for i in range(6): print tc.queue_status() for i in range(24): - tc.irun('time.sleep(1)') + tc.run(client.StringTask('time.sleep(1)')) for i in range(6): time.sleep(1.0) @@ -26,7 +26,7 @@ for i in range(6): print tc.queue_status(True) for i in range(12): - tc.irun('time.sleep(2)') + tc.run(client.StringTask('time.sleep(2)')) print "Queue status (vebose=True)" print tc.queue_status(True) diff --git a/docs/examples/kernel/task_profiler.py b/docs/examples/kernel/task_profiler.py index 23118d9..3078e09 100644 --- a/docs/examples/kernel/task_profiler.py +++ b/docs/examples/kernel/task_profiler.py @@ -55,7 +55,7 @@ def main(): # the jobs should take a random time within a range times = [random.random()*(opts.tmax-opts.tmin)+opts.tmin for i in range(opts.n)] - tasks = [client.Task("time.sleep(%f)"%t) for t in times] + tasks = [client.StringTask("time.sleep(%f)"%t) for t in times] stime = sum(times) print "executing %i tasks, totalling %.1f secs on %i engines"%(opts.n, stime, nengines) diff --git a/docs/examples/kernel/taskmap.py b/docs/examples/kernel/taskmap.py new file mode 100644 index 0000000..44c311b --- /dev/null +++ b/docs/examples/kernel/taskmap.py @@ -0,0 +1,19 @@ +from IPython.kernel import client + +tc = client.TaskClient() + +result = tc.map(lambda x: 2*x, range(10)) +print "Simple, default map: ", result + +m = tc.mapper(block=False, clear_after=True, clear_before=True) +tids = m.map(lambda x: 2*x, range(10)) +print "Submitted tasks, got ids: ", tids +tc.barrier(tids) +result = [tc.get_task_result(tid) for tid in tids] +print "Using a mapper: ", result + +@tc.parallel() +def f(x): return 2*x + +result = f(range(10)) +print "Using a parallel function: ", result \ No newline at end of file