diff --git a/IPython/frontend/tests/test_frontendbase.py b/IPython/frontend/tests/test_frontendbase.py index f74c43e..54c470d 100644 --- a/IPython/frontend/tests/test_frontendbase.py +++ b/IPython/frontend/tests/test_frontendbase.py @@ -83,7 +83,7 @@ class TestAsyncFrontendBase(unittest.TestCase): d.addCallback(self.checkBlockID, expected='TEST_ID') def test_blockID_added_to_failure(self): - block = "raise Exception()" + block = "raise Exception()" d = self.fb.execute(block,blockID='TEST_ID') d.addErrback(self.checkFailureID, expected='TEST_ID') diff --git a/IPython/kernel/asyncclient.py b/IPython/kernel/asyncclient.py index a1542e2..586202b 100644 --- a/IPython/kernel/asyncclient.py +++ b/IPython/kernel/asyncclient.py @@ -27,7 +27,7 @@ from IPython.kernel import codeutil from IPython.kernel.clientconnector import ClientConnector # Other things that the user will need -from IPython.kernel.task import Task +from IPython.kernel.task import MapTask, StringTask from IPython.kernel.error import CompositeError #------------------------------------------------------------------------------- diff --git a/IPython/kernel/client.py b/IPython/kernel/client.py index 85d677b..efbc160 100644 --- a/IPython/kernel/client.py +++ b/IPython/kernel/client.py @@ -44,7 +44,7 @@ from IPython.kernel import codeutil import IPython.kernel.magic # Other things that the user will need -from IPython.kernel.task import Task +from IPython.kernel.task import MapTask, StringTask from IPython.kernel.error import CompositeError #------------------------------------------------------------------------------- diff --git a/IPython/kernel/contexts.py b/IPython/kernel/contexts.py index ce9055c..553f140 100644 --- a/IPython/kernel/contexts.py +++ b/IPython/kernel/contexts.py @@ -141,46 +141,3 @@ class RemoteMultiEngine(RemoteContextBase): def __enter__(self): src = self.findsource(sys._getframe(1)) return self.mec.execute(src) - - -# XXX - Temporary hackish testing, we'll move this into proper tests right -# away - -if __name__ == '__main__': - - # XXX - for now, we need a running cluster to be started separately. The - # daemon work is almost finished, and will make much of this unnecessary. - from IPython.kernel import client - mec = client.MultiEngineClient(('127.0.0.1',10105)) - - try: - mec.get_ids() - except ConnectionRefusedError: - import os, time - os.system('ipcluster -n 2 &') - time.sleep(2) - mec = client.MultiEngineClient(('127.0.0.1',10105)) - - mec.block = False - - import itertools - c = itertools.count() - - parallel = RemoteMultiEngine(mec) - - mec.pushAll() - - with parallel as pr: - # A comment - remote() # this means the code below only runs remotely - print 'Hello remote world' - x = range(10) - # Comments are OK - # Even misindented. - y = x+1 - - - with pfor('i',sequence) as pr: - print x[i] - - print pr.x + pr.y diff --git a/IPython/kernel/magic.py b/IPython/kernel/magic.py index cefca8b..980c63b 100644 --- a/IPython/kernel/magic.py +++ b/IPython/kernel/magic.py @@ -79,7 +79,7 @@ def magic_px(self,parameter_s=''): except AttributeError: print NO_ACTIVE_CONTROLLER else: - print "Executing command on Controller" + print "Parallel execution on engines: %s" % activeController.targets result = activeController.execute(parameter_s) return result diff --git a/IPython/kernel/map.py b/IPython/kernel/map.py index 2e0b932..6183176 100644 --- a/IPython/kernel/map.py +++ b/IPython/kernel/map.py @@ -115,7 +115,7 @@ class RoundRobinMap(Map): # result.append(concat[i:totalLength:maxPartitionLength]) return self.concatenate(listOfPartitions) -styles = {'basic':Map} +dists = {'b':Map} diff --git a/IPython/kernel/mapper.py b/IPython/kernel/mapper.py new file mode 100644 index 0000000..e732b53 --- /dev/null +++ b/IPython/kernel/mapper.py @@ -0,0 +1,233 @@ +# encoding: utf-8 + +"""A parallelized version of Python's builtin map.""" + +__docformat__ = "restructuredtext en" + +#---------------------------------------------------------------------------- +# Copyright (C) 2008 The IPython Development Team +# +# Distributed under the terms of the BSD License. The full license is in +# the file COPYING, distributed as part of this software. +#---------------------------------------------------------------------------- + +#---------------------------------------------------------------------------- +# Imports +#---------------------------------------------------------------------------- + +from types import FunctionType +from zope.interface import Interface, implements +from IPython.kernel.task import MapTask +from IPython.kernel.twistedutil import DeferredList, gatherBoth +from IPython.kernel.util import printer +from IPython.kernel.error import collect_exceptions + +#---------------------------------------------------------------------------- +# Code +#---------------------------------------------------------------------------- + +class IMapper(Interface): + """The basic interface for a Mapper. + + This defines a generic interface for mapping. The idea of this is + similar to that of Python's builtin `map` function, which applies a function + elementwise to a sequence. + """ + + def map(func, *seqs): + """Do map in parallel. + + Equivalent to map(func, *seqs) or: + + [func(seqs[0][0], seqs[1][0],...), func(seqs[0][1], seqs[1][1],...),...] + + :Parameters: + func : FunctionType + The function to apply to the sequence + sequences : tuple of iterables + A sequence of iterables that are used for sucessive function + arguments. This work just like map + """ + +class IMultiEngineMapperFactory(Interface): + """ + An interface for something that creates `IMapper` instances. + """ + + def mapper(dist='b', targets='all', block=True): + """ + Create an `IMapper` implementer with a given set of arguments. + + The `IMapper` created using a multiengine controller is + not load balanced. + """ + +class ITaskMapperFactory(Interface): + """ + An interface for something that creates `IMapper` instances. + """ + + def mapper(clear_before=False, clear_after=False, retries=0, + recovery_task=None, depend=None, block=True): + """ + Create an `IMapper` implementer with a given set of arguments. + + The `IMapper` created using a task controller is load balanced. + + See the documentation for `IPython.kernel.task.BaseTask` for + documentation on the arguments to this method. + """ + + +class MultiEngineMapper(object): + """ + A Mapper for `IMultiEngine` implementers. + """ + + implements(IMapper) + + def __init__(self, multiengine, dist='b', targets='all', block=True): + """ + Create a Mapper for a multiengine. + + The value of all arguments are used for all calls to `map`. This + class allows these arguemnts to be set for a series of map calls. + + :Parameters: + multiengine : `IMultiEngine` implementer + The multiengine to use for running the map commands + dist : str + The type of decomposition to use. Only block ('b') is + supported currently + targets : (str, int, tuple of ints) + The engines to use in the map + block : boolean + Whether to block when the map is applied + """ + self.multiengine = multiengine + self.dist = dist + self.targets = targets + self.block = block + + def map(self, func, *sequences): + """ + Apply func to *sequences elementwise. Like Python's builtin map. + + This version is not load balanced. + """ + max_len = max(len(s) for s in sequences) + for s in sequences: + if len(s)!=max_len: + raise ValueError('all sequences must have equal length') + assert isinstance(func, (str, FunctionType)), "func must be a fuction or str" + return self.multiengine.raw_map(func, sequences, dist=self.dist, + targets=self.targets, block=self.block) + +class TaskMapper(object): + """ + Make an `ITaskController` look like an `IMapper`. + + This class provides a load balanced version of `map`. + """ + + def __init__(self, task_controller, clear_before=False, clear_after=False, retries=0, + recovery_task=None, depend=None, block=True): + """ + Create a `IMapper` given a `TaskController` and arguments. + + The additional arguments are those that are common to all types of + tasks and are described in the documentation for + `IPython.kernel.task.BaseTask`. + + :Parameters: + task_controller : an `IBlockingTaskClient` implementer + The `TaskController` to use for calls to `map` + """ + self.task_controller = task_controller + self.clear_before = clear_before + self.clear_after = clear_after + self.retries = retries + self.recovery_task = recovery_task + self.depend = depend + self.block = block + + def map(self, func, *sequences): + """ + Apply func to *sequences elementwise. Like Python's builtin map. + + This version is load balanced. + """ + max_len = max(len(s) for s in sequences) + for s in sequences: + if len(s)!=max_len: + raise ValueError('all sequences must have equal length') + task_args = zip(*sequences) + task_ids = [] + dlist = [] + for ta in task_args: + task = MapTask(func, ta, clear_before=self.clear_before, + clear_after=self.clear_after, retries=self.retries, + recovery_task=self.recovery_task, depend=self.depend) + dlist.append(self.task_controller.run(task)) + dlist = gatherBoth(dlist, consumeErrors=1) + dlist.addCallback(collect_exceptions,'map') + if self.block: + def get_results(task_ids): + d = self.task_controller.barrier(task_ids) + d.addCallback(lambda _: gatherBoth([self.task_controller.get_task_result(tid) for tid in task_ids], consumeErrors=1)) + d.addCallback(collect_exceptions, 'map') + return d + dlist.addCallback(get_results) + return dlist + +class SynchronousTaskMapper(object): + """ + Make an `IBlockingTaskClient` look like an `IMapper`. + + This class provides a load balanced version of `map`. + """ + + def __init__(self, task_controller, clear_before=False, clear_after=False, retries=0, + recovery_task=None, depend=None, block=True): + """ + Create a `IMapper` given a `IBlockingTaskClient` and arguments. + + The additional arguments are those that are common to all types of + tasks and are described in the documentation for + `IPython.kernel.task.BaseTask`. + + :Parameters: + task_controller : an `IBlockingTaskClient` implementer + The `TaskController` to use for calls to `map` + """ + self.task_controller = task_controller + self.clear_before = clear_before + self.clear_after = clear_after + self.retries = retries + self.recovery_task = recovery_task + self.depend = depend + self.block = block + + def map(self, func, *sequences): + """ + Apply func to *sequences elementwise. Like Python's builtin map. + + This version is load balanced. + """ + max_len = max(len(s) for s in sequences) + for s in sequences: + if len(s)!=max_len: + raise ValueError('all sequences must have equal length') + task_args = zip(*sequences) + task_ids = [] + for ta in task_args: + task = MapTask(func, ta, clear_before=self.clear_before, + clear_after=self.clear_after, retries=self.retries, + recovery_task=self.recovery_task, depend=self.depend) + task_ids.append(self.task_controller.run(task)) + if self.block: + self.task_controller.barrier(task_ids) + task_results = [self.task_controller.get_task_result(tid) for tid in task_ids] + return task_results + else: + return task_ids \ No newline at end of file diff --git a/IPython/kernel/multiengine.py b/IPython/kernel/multiengine.py index 930f05f..6468a04 100644 --- a/IPython/kernel/multiengine.py +++ b/IPython/kernel/multiengine.py @@ -653,67 +653,55 @@ components.registerAdapter(SynchronousMultiEngine, IMultiEngine, ISynchronousMul class IMultiEngineCoordinator(Interface): """Methods that work on multiple engines explicitly.""" - def scatter(key, seq, style='basic', flatten=False, targets='all'): - """Partition and distribute a sequence to targets. + def scatter(key, seq, dist='b', flatten=False, targets='all'): + """Partition and distribute a sequence to targets.""" - :Parameters: - key : str - The variable name to call the scattered sequence. - seq : list, tuple, array - The sequence to scatter. The type should be preserved. - style : string - A specification of how the sequence is partitioned. Currently - only 'basic' is implemented. - flatten : boolean - Should single element sequences be converted to scalars. - """ - - def gather(key, style='basic', targets='all'): - """Gather object key from targets. + def gather(key, dist='b', targets='all'): + """Gather object key from targets.""" - :Parameters: - key : string - The name of a sequence on the targets to gather. - style : string - A specification of how the sequence is partitioned. Currently - only 'basic' is implemented. + def raw_map(func, seqs, dist='b', targets='all'): """ - - def map(func, seq, style='basic', targets='all'): - """A parallelized version of Python's builtin map. + A parallelized version of Python's builtin `map` function. - This function implements the following pattern: + This has a slightly different syntax than the builtin `map`. + This is needed because we need to have keyword arguments and thus + can't use *args to capture all the sequences. Instead, they must + be passed in a list or tuple. - 1. The sequence seq is scattered to the given targets. - 2. map(functionSource, seq) is called on each engine. - 3. The resulting sequences are gathered back to the local machine. - - :Parameters: - targets : int, list or 'all' - The engine ids the action will apply to. Call `get_ids` to see - a list of currently available engines. - func : str, function - An actual function object or a Python string that names a - callable defined on the engines. - seq : list, tuple or numpy array - The local sequence to be scattered. - style : str - Only 'basic' is supported for now. - - :Returns: A list of len(seq) with functionSource called on each element - of seq. - - Example - ======= + The equivalence is: - >>> rc.mapAll('lambda x: x*x', range(10000)) - [0,2,4,9,25,36,...] + raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...) + + Most users will want to use parallel functions or the `mapper` + and `map` methods for an API that follows that of the builtin + `map`. """ class ISynchronousMultiEngineCoordinator(IMultiEngineCoordinator): """Methods that work on multiple engines explicitly.""" - pass + + def scatter(key, seq, dist='b', flatten=False, targets='all', block=True): + """Partition and distribute a sequence to targets.""" + + def gather(key, dist='b', targets='all', block=True): + """Gather object key from targets""" + + def raw_map(func, seqs, dist='b', targets='all', block=True): + """ + A parallelized version of Python's builtin map. + + This has a slightly different syntax than the builtin `map`. + This is needed because we need to have keyword arguments and thus + can't use *args to capture all the sequences. Instead, they must + be passed in a list or tuple. + + raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...) + + Most users will want to use parallel functions or the `mapper` + and `map` methods for an API that follows that of the builtin + `map`. + """ #------------------------------------------------------------------------------- @@ -722,46 +710,31 @@ class ISynchronousMultiEngineCoordinator(IMultiEngineCoordinator): class IMultiEngineExtras(Interface): - def zip_pull(targets, *keys): - """Pull, but return results in a different format from `pull`. + def zip_pull(targets, keys): + """ + Pull, but return results in a different format from `pull`. This method basically returns zip(pull(targets, *keys)), with a few edge cases handled differently. Users of chainsaw will find this format familiar. - - :Parameters: - targets : int, list or 'all' - The engine ids the action will apply to. Call `get_ids` to see - a list of currently available engines. - keys: list or tuple of str - A list of variable names as string of the Python objects to be pulled - back to the client. - - :Returns: A list of pulled Python objects for each target. """ def run(targets, fname): - """Run a .py file on targets. - - :Parameters: - targets : int, list or 'all' - The engine ids the action will apply to. Call `get_ids` to see - a list of currently available engines. - fname : str - The filename of a .py file on the local system to be sent to and run - on the engines. - block : boolean - Should I block or not. If block=True, wait for the action to - complete and return the result. If block=False, return a - `PendingResult` object that can be used to later get the - result. If block is not specified, the block attribute - will be used instead. - """ + """Run a .py file on targets.""" class ISynchronousMultiEngineExtras(IMultiEngineExtras): - pass - + def zip_pull(targets, keys, block=True): + """ + Pull, but return results in a different format from `pull`. + + This method basically returns zip(pull(targets, *keys)), with a few + edge cases handled differently. Users of chainsaw will find this format + familiar. + """ + + def run(targets, fname, block=True): + """Run a .py file on targets.""" #------------------------------------------------------------------------------- # The full MultiEngine interface diff --git a/IPython/kernel/multiengineclient.py b/IPython/kernel/multiengineclient.py index 1cee1bc..1d27037 100644 --- a/IPython/kernel/multiengineclient.py +++ b/IPython/kernel/multiengineclient.py @@ -31,6 +31,11 @@ from IPython.ColorANSI import TermColors from IPython.kernel.twistedutil import blockingCallFromThread from IPython.kernel import error from IPython.kernel.parallelfunction import ParallelFunction +from IPython.kernel.mapper import ( + MultiEngineMapper, + IMultiEngineMapperFactory, + IMapper +) from IPython.kernel import map as Map from IPython.kernel import multiengine as me from IPython.kernel.multiengine import (IFullMultiEngine, @@ -186,10 +191,14 @@ class ResultList(list): def __repr__(self): output = [] - blue = TermColors.Blue - normal = TermColors.Normal - red = TermColors.Red - green = TermColors.Green + # These colored prompts were not working on Windows + if sys.platform == 'win32': + blue = normal = red = green = '' + else: + blue = TermColors.Blue + normal = TermColors.Normal + red = TermColors.Red + green = TermColors.Green output.append("\n") for cmd in self: if isinstance(cmd, Failure): @@ -294,28 +303,7 @@ class InteractiveMultiEngineClient(object): def __len__(self): """Return the number of available engines.""" return len(self.get_ids()) - - def parallelize(self, func, targets=None, block=None): - """Build a `ParallelFunction` object for functionName on engines. - - The returned object will implement a parallel version of functionName - that takes a local sequence as its only argument and calls (in - parallel) functionName on each element of that sequence. The - `ParallelFunction` object has a `targets` attribute that controls - which engines the function is run on. - - :Parameters: - targets : int, list or 'all' - The engine ids the action will apply to. Call `get_ids` to see - a list of currently available engines. - functionName : str - A Python string that names a callable defined on the engines. - - :Returns: A `ParallelFunction` object. - """ - targets, block = self._findTargetsAndBlock(targets, block) - return ParallelFunction(func, self, targets, block) - + #--------------------------------------------------------------------------- # Make this a context manager for with #--------------------------------------------------------------------------- @@ -415,7 +403,11 @@ class FullBlockingMultiEngineClient(InteractiveMultiEngineClient): engine, run code on it, etc. """ - implements(IFullBlockingMultiEngineClient) + implements( + IFullBlockingMultiEngineClient, + IMultiEngineMapperFactory, + IMapper + ) def __init__(self, smultiengine): self.smultiengine = smultiengine @@ -772,29 +764,100 @@ class FullBlockingMultiEngineClient(InteractiveMultiEngineClient): # IMultiEngineCoordinator #--------------------------------------------------------------------------- - def scatter(self, key, seq, style='basic', flatten=False, targets=None, block=None): + def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None): """ Partition a Python sequence and send the partitions to a set of engines. """ targets, block = self._findTargetsAndBlock(targets, block) return self._blockFromThread(self.smultiengine.scatter, key, seq, - style, flatten, targets=targets, block=block) + dist, flatten, targets=targets, block=block) - def gather(self, key, style='basic', targets=None, block=None): + def gather(self, key, dist='b', targets=None, block=None): """ Gather a partitioned sequence on a set of engines as a single local seq. """ targets, block = self._findTargetsAndBlock(targets, block) - return self._blockFromThread(self.smultiengine.gather, key, style, + return self._blockFromThread(self.smultiengine.gather, key, dist, targets=targets, block=block) - def map(self, func, seq, style='basic', targets=None, block=None): + def raw_map(self, func, seq, dist='b', targets=None, block=None): """ - A parallelized version of Python's builtin map + A parallelized version of Python's builtin map. + + This has a slightly different syntax than the builtin `map`. + This is needed because we need to have keyword arguments and thus + can't use *args to capture all the sequences. Instead, they must + be passed in a list or tuple. + + raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...) + + Most users will want to use parallel functions or the `mapper` + and `map` methods for an API that follows that of the builtin + `map`. """ targets, block = self._findTargetsAndBlock(targets, block) - return self._blockFromThread(self.smultiengine.map, func, seq, - style, targets=targets, block=block) + return self._blockFromThread(self.smultiengine.raw_map, func, seq, + dist, targets=targets, block=block) + + def map(self, func, *sequences): + """ + A parallel version of Python's builtin `map` function. + + This method applies a function to sequences of arguments. It + follows the same syntax as the builtin `map`. + + This method creates a mapper objects by calling `self.mapper` with + no arguments and then uses that mapper to do the mapping. See + the documentation of `mapper` for more details. + """ + return self.mapper().map(func, *sequences) + + def mapper(self, dist='b', targets='all', block=None): + """ + Create a mapper object that has a `map` method. + + This method returns an object that implements the `IMapper` + interface. This method is a factory that is used to control how + the map happens. + + :Parameters: + dist : str + What decomposition to use, 'b' is the only one supported + currently + targets : str, int, sequence of ints + Which engines to use for the map + block : boolean + Should calls to `map` block or not + """ + return MultiEngineMapper(self, dist, targets, block) + + def parallel(self, dist='b', targets=None, block=None): + """ + A decorator that turns a function into a parallel function. + + This can be used as: + + @parallel() + def f(x, y) + ... + + f(range(10), range(10)) + + This causes f(0,0), f(1,1), ... to be called in parallel. + + :Parameters: + dist : str + What decomposition to use, 'b' is the only one supported + currently + targets : str, int, sequence of ints + Which engines to use for the map + block : boolean + Should calls to `map` block or not + """ + targets, block = self._findTargetsAndBlock(targets, block) + mapper = self.mapper(dist, targets, block) + pf = ParallelFunction(mapper) + return pf #--------------------------------------------------------------------------- # IMultiEngineExtras diff --git a/IPython/kernel/multienginefc.py b/IPython/kernel/multienginefc.py index df53f22..ec51e47 100644 --- a/IPython/kernel/multienginefc.py +++ b/IPython/kernel/multienginefc.py @@ -29,6 +29,12 @@ from foolscap import Referenceable from IPython.kernel import error from IPython.kernel.util import printer from IPython.kernel import map as Map +from IPython.kernel.parallelfunction import ParallelFunction +from IPython.kernel.mapper import ( + MultiEngineMapper, + IMultiEngineMapperFactory, + IMapper +) from IPython.kernel.twistedutil import gatherBoth from IPython.kernel.multiengine import (MultiEngine, IMultiEngine, @@ -280,7 +286,12 @@ components.registerAdapter(FCSynchronousMultiEngineFromMultiEngine, class FCFullSynchronousMultiEngineClient(object): - implements(IFullSynchronousMultiEngine, IBlockingClientAdaptor) + implements( + IFullSynchronousMultiEngine, + IBlockingClientAdaptor, + IMultiEngineMapperFactory, + IMapper + ) def __init__(self, remote_reference): self.remote_reference = remote_reference @@ -475,7 +486,7 @@ class FCFullSynchronousMultiEngineClient(object): d.addCallback(create_targets) return d - def scatter(self, key, seq, style='basic', flatten=False, targets='all', block=True): + def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=True): # Note: scatter and gather handle pending deferreds locally through self.pdm. # This enables us to collect a bunch fo deferred ids and make a secondary @@ -483,7 +494,7 @@ class FCFullSynchronousMultiEngineClient(object): # difficult to get right though. def do_scatter(engines): nEngines = len(engines) - mapClass = Map.styles[style] + mapClass = Map.dists[dist] mapObject = mapClass() d_list = [] # Loop through and push to each engine in non-blocking mode. @@ -541,7 +552,7 @@ class FCFullSynchronousMultiEngineClient(object): d.addCallback(do_scatter) return d - def gather(self, key, style='basic', targets='all', block=True): + def gather(self, key, dist='b', targets='all', block=True): # Note: scatter and gather handle pending deferreds locally through self.pdm. # This enables us to collect a bunch fo deferred ids and make a secondary @@ -549,7 +560,7 @@ class FCFullSynchronousMultiEngineClient(object): # difficult to get right though. def do_gather(engines): nEngines = len(engines) - mapClass = Map.styles[style] + mapClass = Map.dists[dist] mapObject = mapClass() d_list = [] # Loop through and push to each engine in non-blocking mode. @@ -604,25 +615,103 @@ class FCFullSynchronousMultiEngineClient(object): d.addCallback(do_gather) return d - def map(self, func, seq, style='basic', targets='all', block=True): - d_list = [] + def raw_map(self, func, sequences, dist='b', targets='all', block=True): + """ + A parallelized version of Python's builtin map. + + This has a slightly different syntax than the builtin `map`. + This is needed because we need to have keyword arguments and thus + can't use *args to capture all the sequences. Instead, they must + be passed in a list or tuple. + + raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...) + + Most users will want to use parallel functions or the `mapper` + and `map` methods for an API that follows that of the builtin + `map`. + """ + if not isinstance(sequences, (list, tuple)): + raise TypeError('sequences must be a list or tuple') + max_len = max(len(s) for s in sequences) + for s in sequences: + if len(s)!=max_len: + raise ValueError('all sequences must have equal length') if isinstance(func, FunctionType): d = self.push_function(dict(_ipython_map_func=func), targets=targets, block=False) d.addCallback(lambda did: self.get_pending_deferred(did, True)) - sourceToRun = '_ipython_map_seq_result = map(_ipython_map_func, _ipython_map_seq)' + sourceToRun = '_ipython_map_seq_result = map(_ipython_map_func, *zip(*_ipython_map_seq))' elif isinstance(func, str): d = defer.succeed(None) sourceToRun = \ - '_ipython_map_seq_result = map(%s, _ipython_map_seq)' % func + '_ipython_map_seq_result = map(%s, *zip(*_ipython_map_seq))' % func else: raise TypeError("func must be a function or str") - d.addCallback(lambda _: self.scatter('_ipython_map_seq', seq, style, targets=targets)) + d.addCallback(lambda _: self.scatter('_ipython_map_seq', zip(*sequences), dist, targets=targets)) d.addCallback(lambda _: self.execute(sourceToRun, targets=targets, block=False)) d.addCallback(lambda did: self.get_pending_deferred(did, True)) - d.addCallback(lambda _: self.gather('_ipython_map_seq_result', style, targets=targets, block=block)) + d.addCallback(lambda _: self.gather('_ipython_map_seq_result', dist, targets=targets, block=block)) return d + def map(self, func, *sequences): + """ + A parallel version of Python's builtin `map` function. + + This method applies a function to sequences of arguments. It + follows the same syntax as the builtin `map`. + + This method creates a mapper objects by calling `self.mapper` with + no arguments and then uses that mapper to do the mapping. See + the documentation of `mapper` for more details. + """ + return self.mapper().map(func, *sequences) + + def mapper(self, dist='b', targets='all', block=True): + """ + Create a mapper object that has a `map` method. + + This method returns an object that implements the `IMapper` + interface. This method is a factory that is used to control how + the map happens. + + :Parameters: + dist : str + What decomposition to use, 'b' is the only one supported + currently + targets : str, int, sequence of ints + Which engines to use for the map + block : boolean + Should calls to `map` block or not + """ + return MultiEngineMapper(self, dist, targets, block) + + def parallel(self, dist='b', targets='all', block=True): + """ + A decorator that turns a function into a parallel function. + + This can be used as: + + @parallel() + def f(x, y) + ... + + f(range(10), range(10)) + + This causes f(0,0), f(1,1), ... to be called in parallel. + + :Parameters: + dist : str + What decomposition to use, 'b' is the only one supported + currently + targets : str, int, sequence of ints + Which engines to use for the map + block : boolean + Should calls to `map` block or not + """ + mapper = self.mapper(dist, targets, block) + pf = ParallelFunction(mapper) + return pf + #--------------------------------------------------------------------------- # ISynchronousMultiEngineExtras related methods #--------------------------------------------------------------------------- diff --git a/IPython/kernel/parallelfunction.py b/IPython/kernel/parallelfunction.py index 129b369..077081a 100644 --- a/IPython/kernel/parallelfunction.py +++ b/IPython/kernel/parallelfunction.py @@ -16,17 +16,92 @@ __docformat__ = "restructuredtext en" #------------------------------------------------------------------------------- from types import FunctionType +from zope.interface import Interface, implements -class ParallelFunction: - """A function that operates in parallel on sequences.""" - def __init__(self, func, multiengine, targets, block): - """Create a `ParallelFunction`. + +class IMultiEngineParallelDecorator(Interface): + """A decorator that creates a parallel function.""" + + def parallel(dist='b', targets=None, block=None): + """ + A decorator that turns a function into a parallel function. + + This can be used as: + + @parallel() + def f(x, y) + ... + + f(range(10), range(10)) + + This causes f(0,0), f(1,1), ... to be called in parallel. + + :Parameters: + dist : str + What decomposition to use, 'b' is the only one supported + currently + targets : str, int, sequence of ints + Which engines to use for the map + block : boolean + Should calls to `map` block or not + """ + +class ITaskParallelDecorator(Interface): + """A decorator that creates a parallel function.""" + + def parallel(clear_before=False, clear_after=False, retries=0, + recovery_task=None, depend=None, block=True): + """ + A decorator that turns a function into a parallel function. + + This can be used as: + + @parallel() + def f(x, y) + ... + + f(range(10), range(10)) + + This causes f(0,0), f(1,1), ... to be called in parallel. + + See the documentation for `IPython.kernel.task.BaseTask` for + documentation on the arguments to this method. + """ + +class IParallelFunction(Interface): + pass + +class ParallelFunction(object): + """ + The implementation of a parallel function. + + A parallel function is similar to Python's map function: + + map(func, *sequences) -> pfunc(*sequences) + + Parallel functions should be created by using the @parallel decorator. + """ + + implements(IParallelFunction) + + def __init__(self, mapper): + """ + Create a parallel function from an `IMapper`. + + :Parameters: + mapper : an `IMapper` implementer. + The mapper to use for the parallel function + """ + self.mapper = mapper + + def __call__(self, func): + """ + Decorate a function to make it run in parallel. """ assert isinstance(func, (str, FunctionType)), "func must be a fuction or str" self.func = func - self.multiengine = multiengine - self.targets = targets - self.block = block - - def __call__(self, sequence): - return self.multiengine.map(self.func, sequence, targets=self.targets, block=self.block) \ No newline at end of file + def call_function(*sequences): + return self.mapper.map(self.func, *sequences) + return call_function + + \ No newline at end of file diff --git a/IPython/kernel/task.py b/IPython/kernel/task.py index 1d8ba53..9b6f0ae 100644 --- a/IPython/kernel/task.py +++ b/IPython/kernel/task.py @@ -5,117 +5,404 @@ __docformat__ = "restructuredtext en" -#------------------------------------------------------------------------------- +#----------------------------------------------------------------------------- # Copyright (C) 2008 The IPython Development Team # # Distributed under the terms of the BSD License. The full license is in # the file COPYING, distributed as part of this software. -#------------------------------------------------------------------------------- +#----------------------------------------------------------------------------- -#------------------------------------------------------------------------------- +#----------------------------------------------------------------------------- # Imports -#------------------------------------------------------------------------------- +#----------------------------------------------------------------------------- import copy, time -from types import FunctionType as function +from types import FunctionType import zope.interface as zi, string from twisted.internet import defer, reactor from twisted.python import components, log, failure -# from IPython.genutils import time - +from IPython.kernel.util import printer from IPython.kernel import engineservice as es, error from IPython.kernel import controllerservice as cs from IPython.kernel.twistedutil import gatherBoth, DeferredList -from IPython.kernel.pickleutil import can,uncan, CannedFunction - -def canTask(task): - t = copy.copy(task) - t.depend = can(t.depend) - if t.recovery_task: - t.recovery_task = canTask(t.recovery_task) - return t +from IPython.kernel.pickleutil import can, uncan, CannedFunction -def uncanTask(task): - t = copy.copy(task) - t.depend = uncan(t.depend) - if t.recovery_task and t.recovery_task is not task: - t.recovery_task = uncanTask(t.recovery_task) - return t +#----------------------------------------------------------------------------- +# Definition of the Task objects +#----------------------------------------------------------------------------- time_format = '%Y/%m/%d %H:%M:%S' -class Task(object): - r"""Our representation of a task for the `TaskController` interface. - - The user should create instances of this class to represent a task that - needs to be done. - - :Parameters: - expression : str - A str that is valid python code that is the task. - pull : str or list of str - The names of objects to be pulled as results. If not specified, - will return {'result', None} - push : dict - A dict of objects to be pushed into the engines namespace before - execution of the expression. - clear_before : boolean - Should the engine's namespace be cleared before the task is run. - Default=False. - clear_after : boolean - Should the engine's namespace be cleared after the task is run. - Default=False. - retries : int - The number of times to resumbit the task if it fails. Default=0. - recovery_task : Task - This is the Task to be run when the task has exhausted its retries - Default=None. - depend : bool function(properties) - This is the dependency function for the Task, which determines - whether a task can be run on a Worker. `depend` is called with - one argument, the worker's properties dict, and should return - True if the worker meets the dependencies or False if it does - not. - Default=None - run on any worker - options : dict - Any other keyword options for more elaborate uses of tasks - - Examples - -------- +class ITask(zi.Interface): + """ + This interface provides a generic definition of what constitutes a task. + + There are two sides to a task. First a task needs to take input from + a user to determine what work is performed by the task. Second, the + task needs to have the logic that knows how to turn that information + info specific calls to a worker, through the `IQueuedEngine` interface. + + Many method in this class get two things passed to them: a Deferred + and an IQueuedEngine implementer. Such methods should register callbacks + on the Deferred that use the IQueuedEngine to accomplish something. See + the existing task objects for examples. + """ + + zi.Attribute('retries','How many times to retry the task') + zi.Attribute('recovery_task','A task to try if the initial one fails') + zi.Attribute('taskid','the id of the task') + + def start_time(result): + """ + Do anything needed to start the timing of the task. + + Must simply return the result after starting the timers. + """ - >>> t = Task('dostuff(args)') - >>> t = Task('a=5', pull='a') - >>> t = Task('a=5\nb=4', pull=['a','b']) - >>> t = Task('os.kill(os.getpid(),9)', retries=100) # this is a bad idea + def stop_time(result): + """ + Do anything needed to stop the timing of the task. + + Must simply return the result after stopping the timers. This + method will usually set attributes that are used by `process_result` + in building result of the task. + """ + + def pre_task(d, queued_engine): + """Do something with the queued_engine before the task is run. + + This method should simply add callbacks to the input Deferred + that do something with the `queued_engine` before the task is run. + + :Parameters: + d : Deferred + The deferred that actions should be attached to + queued_engine : IQueuedEngine implementer + The worker that has been allocated to perform the task + """ + + def post_task(d, queued_engine): + """Do something with the queued_engine after the task is run. + + This method should simply add callbacks to the input Deferred + that do something with the `queued_engine` before the task is run. + + :Parameters: + d : Deferred + The deferred that actions should be attached to + queued_engine : IQueuedEngine implementer + The worker that has been allocated to perform the task + """ + + def submit_task(d, queued_engine): + """Submit a task using the `queued_engine` we have been allocated. + + When a task is ready to run, this method is called. This method + must take the internal information of the task and make suitable + calls on the queued_engine to have the actual work done. + + This method should simply add callbacks to the input Deferred + that do something with the `queued_engine` before the task is run. + + :Parameters: + d : Deferred + The deferred that actions should be attached to + queued_engine : IQueuedEngine implementer + The worker that has been allocated to perform the task + """ - A dependency case: - >>> def hasMPI(props): - ... return props.get('mpi') is not None - >>> t = Task('mpi.send(blah,blah)', depend = hasMPI) + def process_result(d, result, engine_id): + """Take a raw task result. + + Objects that implement `ITask` can choose how the result of running + the task is presented. This method takes the raw result and + does this logic. Two example are the `MapTask` which simply returns + the raw result or a `Failure` object and the `StringTask` which + returns a `TaskResult` object. + + :Parameters: + d : Deferred + The deferred that actions should be attached to + result : object + The raw task result that needs to be wrapped + engine_id : int + The id of the engine that did the task + + :Returns: + The result, as a tuple of the form: (success, result). + Here, success is a boolean indicating if the task + succeeded or failed and result is the result. + """ + + def check_depend(properties): + """Check properties to see if the task should be run. + + :Parameters: + properties : dict + A dictionary of properties that an engine has set + + :Returns: + True if the task should be run, False otherwise + """ + + def can_task(self): + """Serialize (can) any functions in the task for pickling. + + Subclasses must override this method and make sure that all + functions in the task are canned by calling `can` on the + function. + """ + + def uncan_task(self): + """Unserialize (uncan) any canned function in the task.""" + +class BaseTask(object): + """ + Common fuctionality for all objects implementing `ITask`. """ - def __init__(self, expression, pull=None, push=None, - clear_before=False, clear_after=False, retries=0, - recovery_task=None, depend=None, **options): - self.expression = expression - if isinstance(pull, str): - self.pull = [pull] - else: - self.pull = pull - self.push = push + zi.implements(ITask) + + def __init__(self, clear_before=False, clear_after=False, retries=0, + recovery_task=None, depend=None): + """ + Make a generic task. + + :Parameters: + clear_before : boolean + Should the engines namespace be cleared before the task + is run + clear_after : boolean + Should the engines namespace be clear after the task is run + retries : int + The number of times a task should be retries upon failure + recovery_task : any task object + If a task fails and it has a recovery_task, that is run + upon a retry + depend : FunctionType + A function that is called to test for properties. This function + must take one argument, the properties dict and return a boolean + """ self.clear_before = clear_before self.clear_after = clear_after - self.retries=retries + self.retries = retries self.recovery_task = recovery_task self.depend = depend - self.options = options self.taskid = None + + def start_time(self, result): + """ + Start the basic timers. + """ + self.start = time.time() + self.start_struct = time.localtime() + return result + + def stop_time(self, result): + """ + Stop the basic timers. + """ + self.stop = time.time() + self.stop_struct = time.localtime() + self.duration = self.stop - self.start + self.submitted = time.strftime(time_format, self.start_struct) + self.completed = time.strftime(time_format) + return result + + def pre_task(self, d, queued_engine): + """ + Clear the engine before running the task if clear_before is set. + """ + if self.clear_before: + d.addCallback(lambda r: queued_engine.reset()) + + def post_task(self, d, queued_engine): + """ + Clear the engine after running the task if clear_after is set. + """ + def reseter(result): + queued_engine.reset() + return result + if self.clear_after: + d.addBoth(reseter) + + def submit_task(self, d, queued_engine): + raise NotImplementedError('submit_task must be implemented in a subclass') + + def process_result(self, result, engine_id): + """ + Process a task result. + + This is the default `process_result` that just returns the raw + result or a `Failure`. + """ + if isinstance(result, failure.Failure): + return (False, result) + else: + return (True, result) + + def check_depend(self, properties): + """ + Calls self.depend(properties) to see if a task should be run. + """ + if self.depend is not None: + return self.depend(properties) + else: + return True + + def can_task(self): + self.depend = can(self.depend) + if isinstance(self.recovery_task, BaseTask): + self.recovery_task.can_task() + + def uncan_task(self): + self.depend = uncan(self.depend) + if isinstance(self.recovery_task, BaseTask): + self.recovery_task.uncan_task() + +class MapTask(BaseTask): + """ + A task that consists of a function and arguments. + """ + + zi.implements(ITask) + + def __init__(self, function, args=None, kwargs=None, clear_before=False, + clear_after=False, retries=0, recovery_task=None, depend=None): + """ + Create a task based on a function, args and kwargs. + + This is a simple type of task that consists of calling: + function(*args, **kwargs) and wrapping the result in a `TaskResult`. + + The return value of the function, or a `Failure` wrapping an + exception is the task result for this type of task. + """ + BaseTask.__init__(self, clear_before, clear_after, retries, + recovery_task, depend) + if not isinstance(function, FunctionType): + raise TypeError('a task function must be a FunctionType') + self.function = function + if args is None: + self.args = () + else: + self.args = args + if not isinstance(self.args, (list, tuple)): + raise TypeError('a task args must be a list or tuple') + if kwargs is None: + self.kwargs = {} + else: + self.kwargs = kwargs + if not isinstance(self.kwargs, dict): + raise TypeError('a task kwargs must be a dict') + + def submit_task(self, d, queued_engine): + d.addCallback(lambda r: queued_engine.push_function( + dict(_ipython_task_function=self.function)) + ) + d.addCallback(lambda r: queued_engine.push( + dict(_ipython_task_args=self.args,_ipython_task_kwargs=self.kwargs)) + ) + d.addCallback(lambda r: queued_engine.execute( + '_ipython_task_result = _ipython_task_function(*_ipython_task_args,**_ipython_task_kwargs)') + ) + d.addCallback(lambda r: queued_engine.pull('_ipython_task_result')) + + def can_task(self): + self.function = can(self.function) + BaseTask.can_task(self) + + def uncan_task(self): + self.function = uncan(self.function) + BaseTask.uncan_task(self) + + +class StringTask(BaseTask): + """ + A task that consists of a string of Python code to run. + """ + + def __init__(self, expression, pull=None, push=None, + clear_before=False, clear_after=False, retries=0, + recovery_task=None, depend=None): + """ + Create a task based on a Python expression and variables + + This type of task lets you push a set of variables to the engines + namespace, run a Python string in that namespace and then bring back + a different set of Python variables as the result. + + Because this type of task can return many results (through the + `pull` keyword argument) it returns a special `TaskResult` object + that wraps the pulled variables, statistics about the run and + any exceptions raised. + """ + if not isinstance(expression, str): + raise TypeError('a task expression must be a string') + self.expression = expression + + if pull==None: + self.pull = () + elif isinstance(pull, str): + self.pull = (pull,) + elif isinstance(pull, (list, tuple)): + self.pull = pull + else: + raise TypeError('pull must be str or a sequence of strs') + + if push==None: + self.push = {} + elif isinstance(push, dict): + self.push = push + else: + raise TypeError('push must be a dict') + + BaseTask.__init__(self, clear_before, clear_after, retries, + recovery_task, depend) -class ResultNS: - """The result namespace object for use in TaskResult objects as tr.ns. + def submit_task(self, d, queued_engine): + if self.push is not None: + d.addCallback(lambda r: queued_engine.push(self.push)) + + d.addCallback(lambda r: queued_engine.execute(self.expression)) + + if self.pull is not None: + d.addCallback(lambda r: queued_engine.pull(self.pull)) + else: + d.addCallback(lambda r: None) + + def process_result(self, result, engine_id): + if isinstance(result, failure.Failure): + tr = TaskResult(result, engine_id) + else: + if self.pull is None: + resultDict = {} + elif len(self.pull) == 1: + resultDict = {self.pull[0]:result} + else: + resultDict = dict(zip(self.pull, result)) + tr = TaskResult(resultDict, engine_id) + # Assign task attributes + tr.submitted = self.submitted + tr.completed = self.completed + tr.duration = self.duration + if hasattr(self,'taskid'): + tr.taskid = self.taskid + else: + tr.taskid = None + if isinstance(result, failure.Failure): + return (False, tr) + else: + return (True, tr) + +class ResultNS(object): + """ + A dict like object for holding the results of a task. + + The result namespace object for use in `TaskResult` objects as tr.ns. It builds an object from a dictionary, such that it has attributes according to the key,value pairs of the dictionary. @@ -128,15 +415,12 @@ class ResultNS: -------- >>> ns = ResultNS({'a':17,'foo':range(3)}) - >>> print ns - NS{'a': 17, 'foo': [0, 1, 2]} - + NS{'a':17,'foo':range(3)} >>> ns.a - 17 - + 17 >>> ns['foo'] - [0, 1, 2] + [0,1,2] """ def __init__(self, dikt): for k,v in dikt.iteritems(): @@ -156,7 +440,7 @@ class ResultNS: class TaskResult(object): """ - An object for returning task results. + An object for returning task results for certain types of tasks. This object encapsulates the results of a task. On task success it will have a keys attribute that will have a list @@ -166,21 +450,21 @@ class TaskResult(object): In task failure, keys will be empty, but failure will contain the failure object that encapsulates the remote exception. - One can also simply call the raiseException() method of + One can also simply call the `raise_exception` method of this class to re-raise any remote exception in the local session. - The TaskResult has a .ns member, which is a property for access + The `TaskResult` has a `.ns` member, which is a property for access to the results. If the Task had pull=['a', 'b'], then the - Task Result will have attributes tr.ns.a, tr.ns.b for those values. - Accessing tr.ns will raise the remote failure if the task failed. + Task Result will have attributes `tr.ns.a`, `tr.ns.b` for those values. + Accessing `tr.ns` will raise the remote failure if the task failed. - The engineid attribute should have the engineid of the engine - that ran the task. But, because engines can come and go in - the ipython task system, the engineid may not continue to be + The `engineid` attribute should have the `engineid` of the engine + that ran the task. But, because engines can come and go, + the `engineid` may not continue to be valid or accurate. - The taskid attribute simply gives the taskid that the task + The `taskid` attribute simply gives the `taskid` that the task is tracked under. """ taskid = None @@ -192,7 +476,7 @@ class TaskResult(object): return self._ns def _setNS(self, v): - raise Exception("I am protected!") + raise Exception("the ns attribute cannot be changed") ns = property(_getNS, _setNS) @@ -218,15 +502,19 @@ class TaskResult(object): def __getitem__(self, key): if self.failure is not None: - self.raiseException() + self.raise_exception() return self.results[key] - def raiseException(self): + def raise_exception(self): """Re-raise any remote exceptions in the local python session.""" if self.failure is not None: self.failure.raiseException() +#----------------------------------------------------------------------------- +# The controller side of things +#----------------------------------------------------------------------------- + class IWorker(zi.Interface): """The Basic Worker Interface. @@ -241,12 +529,15 @@ class IWorker(zi.Interface): :Parameters: task : a `Task` object - :Returns: `Deferred` to a `TaskResult` object. + :Returns: `Deferred` to a tuple of (success, result) where + success if a boolean that signifies success or failure + and result is the task result. """ class WorkerFromQueuedEngine(object): """Adapt an `IQueuedEngine` to an `IWorker` object""" + zi.implements(IWorker) def __init__(self, qe): @@ -261,53 +552,27 @@ class WorkerFromQueuedEngine(object): def run(self, task): """Run task in worker's namespace. + This takes a task and calls methods on the task that actually + cause `self.queuedEngine` to do the task. See the methods of + `ITask` for more information about how these methods are called. + :Parameters: task : a `Task` object - :Returns: `Deferred` to a `TaskResult` object. + :Returns: `Deferred` to a tuple of (success, result) where + success if a boolean that signifies success or failure + and result is the task result. """ - if task.clear_before: - d = self.queuedEngine.reset() - else: - d = defer.succeed(None) - - if task.push is not None: - d.addCallback(lambda r: self.queuedEngine.push(task.push)) - - d.addCallback(lambda r: self.queuedEngine.execute(task.expression)) - - if task.pull is not None: - d.addCallback(lambda r: self.queuedEngine.pull(task.pull)) - else: - d.addCallback(lambda r: None) - - def reseter(result): - self.queuedEngine.reset() - return result - - if task.clear_after: - d.addBoth(reseter) - - return d.addBoth(self._zipResults, task.pull, time.time(), time.localtime()) - - def _zipResults(self, result, names, start, start_struct): - """Callback for construting the TaskResult object.""" - if isinstance(result, failure.Failure): - tr = TaskResult(result, self.queuedEngine.id) - else: - if names is None: - resultDict = {} - elif len(names) == 1: - resultDict = {names[0]:result} - else: - resultDict = dict(zip(names, result)) - tr = TaskResult(resultDict, self.queuedEngine.id) - # the time info - tr.submitted = time.strftime(time_format, start_struct) - tr.completed = time.strftime(time_format) - tr.duration = time.time()-start - return tr - + d = defer.succeed(None) + d.addCallback(task.start_time) + task.pre_task(d, self.queuedEngine) + task.submit_task(d, self.queuedEngine) + task.post_task(d, self.queuedEngine) + d.addBoth(task.stop_time) + d.addBoth(task.process_result, self.queuedEngine.id) + # At this point, there will be (success, result) coming down the line + return d + components.registerAdapter(WorkerFromQueuedEngine, es.IEngineQueued, IWorker) @@ -323,14 +588,14 @@ class IScheduler(zi.Interface): """Add a task to the queue of the Scheduler. :Parameters: - task : a `Task` object + task : an `ITask` implementer The task to be queued. flags : dict General keywords for more sophisticated scheduling """ def pop_task(id=None): - """Pops a Task object. + """Pops a task object from the queue. This gets the next task to be run. If no `id` is requested, the highest priority task is returned. @@ -340,7 +605,7 @@ class IScheduler(zi.Interface): The id of the task to be popped. The default (None) is to return the highest priority task. - :Returns: a `Task` object + :Returns: an `ITask` implementer :Exceptions: IndexError : raised if no taskid in queue @@ -350,8 +615,9 @@ class IScheduler(zi.Interface): """Add a worker to the worker queue. :Parameters: - worker : an IWorker implementing object - flags : General keywords for more sophisticated scheduling + worker : an `IWorker` implementer + flags : dict + General keywords for more sophisticated scheduling """ def pop_worker(id=None): @@ -374,15 +640,15 @@ class IScheduler(zi.Interface): """Returns True if there is something to do, False otherwise""" def schedule(): - """Returns a tuple of the worker and task pair for the next - task to be run. - """ + """Returns (worker,task) pair for the next task to be run.""" class FIFOScheduler(object): - """A basic First-In-First-Out (Queue) Scheduler. - This is the default Scheduler for the TaskController. - See the docstrings for IScheduler for interface details. + """ + A basic First-In-First-Out (Queue) Scheduler. + + This is the default Scheduler for the `TaskController`. + See the docstrings for `IScheduler` for interface details. """ zi.implements(IScheduler) @@ -439,7 +705,9 @@ class FIFOScheduler(object): for t in self.tasks: for w in self.workers: try:# do not allow exceptions to break this - cando = t.depend is None or t.depend(w.properties) + # Allow the task to check itself using its + # check_depend method. + cando = t.check_depend(w.properties) except: cando = False if cando: @@ -449,9 +717,12 @@ class FIFOScheduler(object): class LIFOScheduler(FIFOScheduler): - """A Last-In-First-Out (Stack) Scheduler. This scheduler should naively - reward fast engines by giving them more jobs. This risks starvation, but - only in cases with low load, where starvation does not really matter. + """ + A Last-In-First-Out (Stack) Scheduler. + + This scheduler should naively reward fast engines by giving + them more jobs. This risks starvation, but only in cases with + low load, where starvation does not really matter. """ def add_task(self, task, **flags): @@ -466,13 +737,15 @@ class LIFOScheduler(FIFOScheduler): class ITaskController(cs.IControllerBase): - """The Task based interface to a `ControllerService` object + """ + The Task based interface to a `ControllerService` object This adapts a `ControllerService` to the ITaskController interface. """ def run(task): - """Run a task. + """ + Run a task. :Parameters: task : an IPython `Task` object @@ -481,13 +754,14 @@ class ITaskController(cs.IControllerBase): """ def get_task_result(taskid, block=False): - """Get the result of a task by its ID. + """ + Get the result of a task by its ID. :Parameters: taskid : int the id of the task whose result is requested - :Returns: `Deferred` to (taskid, actualResult) if the task is done, and None + :Returns: `Deferred` to the task result if the task is done, and None if not. :Exceptions: @@ -512,23 +786,35 @@ class ITaskController(cs.IControllerBase): """ def barrier(taskids): - """Block until the list of taskids are completed. + """ + Block until the list of taskids are completed. Returns None on success. """ def spin(): - """touch the scheduler, to resume scheduling without submitting - a task. + """ + Touch the scheduler, to resume scheduling without submitting a task. """ - def queue_status(self, verbose=False): - """Get a dictionary with the current state of the task queue. + def queue_status(verbose=False): + """ + Get a dictionary with the current state of the task queue. If verbose is True, then return lists of taskids, otherwise, return the number of tasks with each status. """ + def clear(): + """ + Clear all previously run tasks from the task controller. + + This is needed because the task controller keep all task results + in memory. This can be a problem is there are many completed + tasks. Users should call this periodically to clean out these + cached task results. + """ + class TaskController(cs.ControllerAdapterBase): """The Task based interface to a Controller object. @@ -565,7 +851,7 @@ class TaskController(cs.ControllerAdapterBase): def registerWorker(self, id): """Called by controller.register_engine.""" if self.workers.get(id): - raise "We already have one! This should not happen." + raise ValueError("worker with id %s already exists. This should not happen." % id) self.workers[id] = IWorker(self.controller.engines[id]) self.workers[id].workerid = id if not self.pendingTasks.has_key(id):# if not working @@ -590,21 +876,25 @@ class TaskController(cs.ControllerAdapterBase): #--------------------------------------------------------------------------- def run(self, task): - """Run a task and return `Deferred` to its taskid.""" + """ + Run a task and return `Deferred` to its taskid. + """ task.taskid = self.taskid task.start = time.localtime() self.taskid += 1 d = defer.Deferred() self.scheduler.add_task(task) - # log.msg('Queuing task: %i' % task.taskid) + log.msg('Queuing task: %i' % task.taskid) self.deferredResults[task.taskid] = [] self.distributeTasks() return defer.succeed(task.taskid) def get_task_result(self, taskid, block=False): - """Returns a `Deferred` to a TaskResult tuple or None.""" - # log.msg("Getting task result: %i" % taskid) + """ + Returns a `Deferred` to the task result, or None. + """ + log.msg("Getting task result: %i" % taskid) if self.finishedResults.has_key(taskid): tr = self.finishedResults[taskid] return defer.succeed(tr) @@ -619,7 +909,9 @@ class TaskController(cs.ControllerAdapterBase): return defer.fail(IndexError("task ID not registered: %r" % taskid)) def abort(self, taskid): - """Remove a task from the queue if it has not been run already.""" + """ + Remove a task from the queue if it has not been run already. + """ if not isinstance(taskid, int): return defer.fail(failure.Failure(TypeError("an integer task id expected: %r" % taskid))) try: @@ -678,8 +970,10 @@ class TaskController(cs.ControllerAdapterBase): #--------------------------------------------------------------------------- def _doAbort(self, taskid): - """Helper function for aborting a pending task.""" - # log.msg("Task aborted: %i" % taskid) + """ + Helper function for aborting a pending task. + """ + log.msg("Task aborted: %i" % taskid) result = failure.Failure(error.TaskAborted()) self._finishTask(taskid, result) if taskid in self.abortPending: @@ -687,14 +981,16 @@ class TaskController(cs.ControllerAdapterBase): def _finishTask(self, taskid, result): dlist = self.deferredResults.pop(taskid) - result.taskid = taskid # The TaskResult should save the taskid + # result.taskid = taskid # The TaskResult should save the taskid self.finishedResults[taskid] = result for d in dlist: d.callback(result) def distributeTasks(self): - """Distribute tasks while self.scheduler has things to do.""" - # log.msg("distributing Tasks") + """ + Distribute tasks while self.scheduler has things to do. + """ + log.msg("distributing Tasks") worker, task = self.scheduler.schedule() if not worker and not task: if self.idleLater and self.idleLater.called:# we are inside failIdle @@ -709,7 +1005,7 @@ class TaskController(cs.ControllerAdapterBase): self.pendingTasks[worker.workerid] = task # run/link callbacks d = worker.run(task) - # log.msg("Running task %i on worker %i" %(task.taskid, worker.workerid)) + log.msg("Running task %i on worker %i" %(task.taskid, worker.workerid)) d.addBoth(self.taskCompleted, task.taskid, worker.workerid) worker, task = self.scheduler.schedule() # check for idle timeout: @@ -731,14 +1027,15 @@ class TaskController(cs.ControllerAdapterBase): t = self.scheduler.pop_task() msg = "task %i failed to execute due to unmet dependencies"%t.taskid msg += " for %i seconds"%self.timeout - # log.msg("Task aborted by timeout: %i" % t.taskid) + log.msg("Task aborted by timeout: %i" % t.taskid) f = failure.Failure(error.TaskTimeout(msg)) self._finishTask(t.taskid, f) self.idleLater = None - def taskCompleted(self, result, taskid, workerid): + def taskCompleted(self, success_and_result, taskid, workerid): """This is the err/callback for a completed task.""" + success, result = success_and_result try: task = self.pendingTasks.pop(workerid) except: @@ -755,7 +1052,7 @@ class TaskController(cs.ControllerAdapterBase): aborted = True if not aborted: - if result.failure is not None and isinstance(result.failure, failure.Failure): # we failed + if not success: log.msg("Task %i failed on worker %i"% (taskid, workerid)) if task.retries > 0: # resubmit task.retries -= 1 @@ -763,7 +1060,7 @@ class TaskController(cs.ControllerAdapterBase): s = "Resubmitting task %i, %i retries remaining" %(taskid, task.retries) log.msg(s) self.distributeTasks() - elif isinstance(task.recovery_task, Task) and \ + elif isinstance(task.recovery_task, BaseTask) and \ task.recovery_task.retries > -1: # retries = -1 is to prevent infinite recovery_task loop task.retries = -1 @@ -779,17 +1076,18 @@ class TaskController(cs.ControllerAdapterBase): # it may have died, and not yet been unregistered reactor.callLater(self.failurePenalty, self.readmitWorker, workerid) else: # we succeeded - # log.msg("Task completed: %i"% taskid) + log.msg("Task completed: %i"% taskid) self._finishTask(taskid, result) self.readmitWorker(workerid) - else:# we aborted the task - if result.failure is not None and isinstance(result.failure, failure.Failure): # it failed, penalize worker + else: # we aborted the task + if not success: reactor.callLater(self.failurePenalty, self.readmitWorker, workerid) else: self.readmitWorker(workerid) def readmitWorker(self, workerid): - """Readmit a worker to the scheduler. + """ + Readmit a worker to the scheduler. This is outside `taskCompleted` because of the `failurePenalty` being implemented through `reactor.callLater`. @@ -798,6 +1096,18 @@ class TaskController(cs.ControllerAdapterBase): if workerid in self.workers.keys() and workerid not in self.pendingTasks.keys(): self.scheduler.add_worker(self.workers[workerid]) self.distributeTasks() + + def clear(self): + """ + Clear all previously run tasks from the task controller. + + This is needed because the task controller keep all task results + in memory. This can be a problem is there are many completed + tasks. Users should call this periodically to clean out these + cached task results. + """ + self.finishedResults = {} + return defer.succeed(None) components.registerAdapter(TaskController, cs.IControllerBase, ITaskController) diff --git a/IPython/kernel/taskclient.py b/IPython/kernel/taskclient.py index 405407a..dc95418 100644 --- a/IPython/kernel/taskclient.py +++ b/IPython/kernel/taskclient.py @@ -1,9 +1,8 @@ # encoding: utf-8 # -*- test-case-name: IPython.kernel.tests.test_taskcontrollerxmlrpc -*- -"""The Generic Task Client object. - -This must be subclassed based on your connection method. +""" +A blocking version of the task client. """ __docformat__ = "restructuredtext en" @@ -24,119 +23,100 @@ from twisted.python import components, log from IPython.kernel.twistedutil import blockingCallFromThread from IPython.kernel import task, error +from IPython.kernel.mapper import ( + SynchronousTaskMapper, + ITaskMapperFactory, + IMapper +) +from IPython.kernel.parallelfunction import ( + ParallelFunction, + ITaskParallelDecorator +) #------------------------------------------------------------------------------- -# Connecting Task Client +# The task client #------------------------------------------------------------------------------- -class InteractiveTaskClient(object): - - def irun(self, *args, **kwargs): - """Run a task on the `TaskController`. - - This method is a shorthand for run(task) and its arguments are simply - passed onto a `Task` object: - - irun(*args, **kwargs) -> run(Task(*args, **kwargs)) - - :Parameters: - expression : str - A str that is valid python code that is the task. - pull : str or list of str - The names of objects to be pulled as results. - push : dict - A dict of objects to be pushed into the engines namespace before - execution of the expression. - clear_before : boolean - Should the engine's namespace be cleared before the task is run. - Default=False. - clear_after : boolean - Should the engine's namespace be cleared after the task is run. - Default=False. - retries : int - The number of times to resumbit the task if it fails. Default=0. - options : dict - Any other keyword options for more elaborate uses of tasks - - :Returns: A `TaskResult` object. - """ - block = kwargs.pop('block', False) - if len(args) == 1 and isinstance(args[0], task.Task): - t = args[0] - else: - t = task.Task(*args, **kwargs) - taskid = self.run(t) - print "TaskID = %i"%taskid - if block: - return self.get_task_result(taskid, block) - else: - return taskid - class IBlockingTaskClient(Interface): """ - An interface for blocking task clients. + A vague interface of the blocking task client """ pass - -class BlockingTaskClient(InteractiveTaskClient): +class BlockingTaskClient(object): """ - This class provides a blocking task client. + A blocking task client that adapts a non-blocking one. """ - implements(IBlockingTaskClient) + implements( + IBlockingTaskClient, + ITaskMapperFactory, + IMapper, + ITaskParallelDecorator + ) def __init__(self, task_controller): self.task_controller = task_controller self.block = True - def run(self, task): - """ - Run a task and return a task id that can be used to get the task result. + def run(self, task, block=False): + """Run a task on the `TaskController`. + + See the documentation of the `MapTask` and `StringTask` classes for + details on how to build a task of different types. :Parameters: - task : `Task` - The `Task` object to run + task : an `ITask` implementer + + :Returns: The int taskid of the submitted task. Pass this to + `get_task_result` to get the `TaskResult` object. """ - return blockingCallFromThread(self.task_controller.run, task) + tid = blockingCallFromThread(self.task_controller.run, task) + if block: + return self.get_task_result(tid, block=True) + else: + return tid def get_task_result(self, taskid, block=False): """ - Get or poll for a task result. + Get a task result by taskid. :Parameters: taskid : int - The id of the task whose result to get + The taskid of the task to be retrieved. block : boolean - If True, wait until the task is done and then result the - `TaskResult` object. If False, just poll for the result and - return None if the task is not done. + Should I block until the task is done? + + :Returns: A `TaskResult` object that encapsulates the task result. """ return blockingCallFromThread(self.task_controller.get_task_result, taskid, block) def abort(self, taskid): """ - Abort a task by task id if it has not been started. + Abort a task by taskid. + + :Parameters: + taskid : int + The taskid of the task to be aborted. """ return blockingCallFromThread(self.task_controller.abort, taskid) def barrier(self, taskids): - """ - Wait for a set of tasks to finish. + """Block until a set of tasks are completed. :Parameters: - taskids : list of ints - A list of task ids to wait for. + taskids : list, tuple + A sequence of taskids to block on. """ return blockingCallFromThread(self.task_controller.barrier, taskids) def spin(self): """ - Cause the scheduler to schedule tasks. + Touch the scheduler, to resume scheduling without submitting a task. This method only needs to be called in unusual situations where the - scheduler is idle for some reason. + scheduler is idle for some reason. """ return blockingCallFromThread(self.task_controller.spin) @@ -153,7 +133,46 @@ class BlockingTaskClient(InteractiveTaskClient): A dict with the queue status. """ return blockingCallFromThread(self.task_controller.queue_status, verbose) + + def clear(self): + """ + Clear all previously run tasks from the task controller. + + This is needed because the task controller keep all task results + in memory. This can be a problem is there are many completed + tasks. Users should call this periodically to clean out these + cached task results. + """ + return blockingCallFromThread(self.task_controller.clear) + + def map(self, func, *sequences): + """ + Apply func to *sequences elementwise. Like Python's builtin map. + + This version is load balanced. + """ + return self.mapper().map(func, *sequences) + def mapper(self, clear_before=False, clear_after=False, retries=0, + recovery_task=None, depend=None, block=True): + """ + Create an `IMapper` implementer with a given set of arguments. + + The `IMapper` created using a task controller is load balanced. + + See the documentation for `IPython.kernel.task.BaseTask` for + documentation on the arguments to this method. + """ + return SynchronousTaskMapper(self, clear_before=clear_before, + clear_after=clear_after, retries=retries, + recovery_task=recovery_task, depend=depend, block=block) + + def parallel(self, clear_before=False, clear_after=False, retries=0, + recovery_task=None, depend=None, block=True): + mapper = self.mapper(clear_before, clear_after, retries, + recovery_task, depend, block) + pf = ParallelFunction(mapper) + return pf components.registerAdapter(BlockingTaskClient, task.ITaskController, IBlockingTaskClient) diff --git a/IPython/kernel/taskfc.py b/IPython/kernel/taskfc.py index b4096e7..c559f64 100644 --- a/IPython/kernel/taskfc.py +++ b/IPython/kernel/taskfc.py @@ -34,6 +34,15 @@ from IPython.kernel.clientinterfaces import ( IFCClientInterfaceProvider, IBlockingClientAdaptor ) +from IPython.kernel.mapper import ( + TaskMapper, + ITaskMapperFactory, + IMapper +) +from IPython.kernel.parallelfunction import ( + ParallelFunction, + ITaskParallelDecorator +) #------------------------------------------------------------------------------- # The Controller side of things @@ -43,32 +52,38 @@ from IPython.kernel.clientinterfaces import ( class IFCTaskController(Interface): """Foolscap interface to task controller. - See the documentation of ITaskController for documentation about the methods. + See the documentation of `ITaskController` for more information. """ - def remote_run(request, binTask): + def remote_run(binTask): """""" - def remote_abort(request, taskid): + def remote_abort(taskid): """""" - def remote_get_task_result(request, taskid, block=False): + def remote_get_task_result(taskid, block=False): """""" - def remote_barrier(request, taskids): + def remote_barrier(taskids): + """""" + + def remote_spin(): """""" - def remote_spin(request): + def remote_queue_status(verbose): """""" - def remote_queue_status(request, verbose): + def remote_clear(): """""" class FCTaskControllerFromTaskController(Referenceable): - """XML-RPC attachmeot for controller. - - See IXMLRPCTaskController and ITaskController (and its children) for documentation. """ + Adapt a `TaskController` to an `IFCTaskController` + + This class is used to expose a `TaskController` over the wire using + the Foolscap network protocol. + """ + implements(IFCTaskController, IFCClientInterfaceProvider) def __init__(self, taskController): @@ -92,8 +107,8 @@ class FCTaskControllerFromTaskController(Referenceable): def remote_run(self, ptask): try: - ctask = pickle.loads(ptask) - task = taskmodule.uncanTask(ctask) + task = pickle.loads(ptask) + task.uncan_task() except: d = defer.fail(pickle.UnpickleableError("Could not unmarshal task")) else: @@ -132,6 +147,9 @@ class FCTaskControllerFromTaskController(Referenceable): d.addErrback(self.packageFailure) return d + def remote_clear(self): + return self.taskController.clear() + def remote_get_client_name(self): return 'IPython.kernel.taskfc.FCTaskClient' @@ -144,13 +162,23 @@ components.registerAdapter(FCTaskControllerFromTaskController, #------------------------------------------------------------------------------- class FCTaskClient(object): - """XML-RPC based TaskController client that implements ITaskController. - - :Parameters: - addr : (ip, port) - The ip (str) and port (int) tuple of the `TaskController`. """ - implements(taskmodule.ITaskController, IBlockingClientAdaptor) + Client class for Foolscap exposed `TaskController`. + + This class is an adapter that makes a `RemoteReference` to a + `TaskController` look like an actual `ITaskController` on the client side. + + This class also implements `IBlockingClientAdaptor` so that clients can + automatically get a blocking version of this class. + """ + + implements( + taskmodule.ITaskController, + IBlockingClientAdaptor, + ITaskMapperFactory, + IMapper, + ITaskParallelDecorator + ) def __init__(self, remote_reference): self.remote_reference = remote_reference @@ -168,48 +196,26 @@ class FCTaskClient(object): def run(self, task): """Run a task on the `TaskController`. - :Parameters: - task : a `Task` object - - The Task object is created using the following signature: - - Task(expression, pull=None, push={}, clear_before=False, - clear_after=False, retries=0, **options):) + See the documentation of the `MapTask` and `StringTask` classes for + details on how to build a task of different types. - The meaning of the arguments is as follows: + :Parameters: + task : an `ITask` implementer - :Task Parameters: - expression : str - A str that is valid python code that is the task. - pull : str or list of str - The names of objects to be pulled as results. - push : dict - A dict of objects to be pushed into the engines namespace before - execution of the expression. - clear_before : boolean - Should the engine's namespace be cleared before the task is run. - Default=False. - clear_after : boolean - Should the engine's namespace be cleared after the task is run. - Default=False. - retries : int - The number of times to resumbit the task if it fails. Default=0. - options : dict - Any other keyword options for more elaborate uses of tasks - :Returns: The int taskid of the submitted task. Pass this to `get_task_result` to get the `TaskResult` object. """ - assert isinstance(task, taskmodule.Task), "task must be a Task object!" - ctask = taskmodule.canTask(task) # handles arbitrary function in .depend - # as well as arbitrary recovery_task chains - ptask = pickle.dumps(ctask, 2) + assert isinstance(task, taskmodule.BaseTask), "task must be a Task object!" + task.can_task() + ptask = pickle.dumps(task, 2) + task.uncan_task() d = self.remote_reference.callRemote('run', ptask) d.addCallback(self.unpackage) return d def get_task_result(self, taskid, block=False): - """The task result by taskid. + """ + Get a task result by taskid. :Parameters: taskid : int @@ -224,20 +230,19 @@ class FCTaskClient(object): return d def abort(self, taskid): - """Abort a task by taskid. + """ + Abort a task by taskid. :Parameters: taskid : int The taskid of the task to be aborted. - block : boolean - Should I block until the task is aborted. """ d = self.remote_reference.callRemote('abort', taskid) d.addCallback(self.unpackage) return d def barrier(self, taskids): - """Block until all tasks are completed. + """Block until a set of tasks are completed. :Parameters: taskids : list, tuple @@ -248,20 +253,77 @@ class FCTaskClient(object): return d def spin(self): - """touch the scheduler, to resume scheduling without submitting - a task. + """ + Touch the scheduler, to resume scheduling without submitting a task. + + This method only needs to be called in unusual situations where the + scheduler is idle for some reason. """ d = self.remote_reference.callRemote('spin') d.addCallback(self.unpackage) return d def queue_status(self, verbose=False): - """Return a dict with the status of the task queue.""" + """ + Get a dictionary with the current state of the task queue. + + :Parameters: + verbose : boolean + If True, return a list of taskids. If False, simply give + the number of tasks with each status. + + :Returns: + A dict with the queue status. + """ d = self.remote_reference.callRemote('queue_status', verbose) d.addCallback(self.unpackage) return d + def clear(self): + """ + Clear all previously run tasks from the task controller. + + This is needed because the task controller keep all task results + in memory. This can be a problem is there are many completed + tasks. Users should call this periodically to clean out these + cached task results. + """ + d = self.remote_reference.callRemote('clear') + return d + def adapt_to_blocking_client(self): + """ + Wrap self in a blocking version that implements `IBlockingTaskClient. + """ from IPython.kernel.taskclient import IBlockingTaskClient return IBlockingTaskClient(self) + + def map(self, func, *sequences): + """ + Apply func to *sequences elementwise. Like Python's builtin map. + + This version is load balanced. + """ + return self.mapper().map(func, *sequences) + + def mapper(self, clear_before=False, clear_after=False, retries=0, + recovery_task=None, depend=None, block=True): + """ + Create an `IMapper` implementer with a given set of arguments. + + The `IMapper` created using a task controller is load balanced. + + See the documentation for `IPython.kernel.task.BaseTask` for + documentation on the arguments to this method. + """ + return TaskMapper(self, clear_before=clear_before, + clear_after=clear_after, retries=retries, + recovery_task=recovery_task, depend=depend, block=block) + + def parallel(self, clear_before=False, clear_after=False, retries=0, + recovery_task=None, depend=None, block=True): + mapper = self.mapper(clear_before, clear_after, retries, + recovery_task, depend, block) + pf = ParallelFunction(mapper) + return pf diff --git a/IPython/kernel/tests/engineservicetest.py b/IPython/kernel/tests/engineservicetest.py index b4a6d21..7336acd 100644 --- a/IPython/kernel/tests/engineservicetest.py +++ b/IPython/kernel/tests/engineservicetest.py @@ -163,7 +163,6 @@ class IEngineCoreTestCase(object): try: import numpy except: - print 'no numpy, ', return a = numpy.random.random(1000) d = self.engine.push(dict(a=a)) diff --git a/IPython/kernel/tests/multienginetest.py b/IPython/kernel/tests/multienginetest.py index 30a2df7..10b690e 100644 --- a/IPython/kernel/tests/multienginetest.py +++ b/IPython/kernel/tests/multienginetest.py @@ -733,7 +733,7 @@ class ISynchronousMultiEngineCoordinatorTestCase(IMultiEngineCoordinatorTestCase d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True)) d.addCallback(lambda r: self.assertEquals(r, range(16))) return d - + def testScatterGatherNumpyNonblocking(self): try: import numpy @@ -749,17 +749,7 @@ class ISynchronousMultiEngineCoordinatorTestCase(IMultiEngineCoordinatorTestCase d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True)) d.addCallback(lambda r: assert_array_equal(r, a)) return d - - def testMapNonblocking(self): - self.addEngine(4) - def f(x): - return x**2 - data = range(16) - d= self.multiengine.map(f, data, block=False) - d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True)) - d.addCallback(lambda r: self.assertEquals(r,[f(x) for x in data])) - return d - + def test_clear_pending_deferreds(self): self.addEngine(4) did_list = [] diff --git a/IPython/kernel/tests/tasktest.py b/IPython/kernel/tests/tasktest.py index 10a0a35..bddc8aa 100644 --- a/IPython/kernel/tests/tasktest.py +++ b/IPython/kernel/tests/tasktest.py @@ -43,23 +43,23 @@ class TaskTestBase(object): class ITaskControllerTestCase(TaskTestBase): - def testTaskIDs(self): + def test_task_ids(self): self.addEngine(1) - d = self.tc.run(task.Task('a=5')) + d = self.tc.run(task.StringTask('a=5')) d.addCallback(lambda r: self.assertEquals(r, 0)) - d.addCallback(lambda r: self.tc.run(task.Task('a=5'))) + d.addCallback(lambda r: self.tc.run(task.StringTask('a=5'))) d.addCallback(lambda r: self.assertEquals(r, 1)) - d.addCallback(lambda r: self.tc.run(task.Task('a=5'))) + d.addCallback(lambda r: self.tc.run(task.StringTask('a=5'))) d.addCallback(lambda r: self.assertEquals(r, 2)) - d.addCallback(lambda r: self.tc.run(task.Task('a=5'))) + d.addCallback(lambda r: self.tc.run(task.StringTask('a=5'))) d.addCallback(lambda r: self.assertEquals(r, 3)) return d - def testAbort(self): + def test_abort(self): """Cannot do a proper abort test, because blocking execution prevents abort from being called before task completes""" self.addEngine(1) - t = task.Task('a=5') + t = task.StringTask('a=5') d = self.tc.abort(0) d.addErrback(lambda f: self.assertRaises(IndexError, f.raiseException)) d.addCallback(lambda _:self.tc.run(t)) @@ -67,15 +67,15 @@ class ITaskControllerTestCase(TaskTestBase): d.addErrback(lambda f: self.assertRaises(IndexError, f.raiseException)) return d - def testAbortType(self): + def test_abort_type(self): self.addEngine(1) d = self.tc.abort('asdfadsf') d.addErrback(lambda f: self.assertRaises(TypeError, f.raiseException)) return d - def testClears(self): + def test_clear_before_and_after(self): self.addEngine(1) - t = task.Task('a=1', clear_before=True, pull='b', clear_after=True) + t = task.StringTask('a=1', clear_before=True, pull='b', clear_after=True) d = self.multiengine.execute('b=1', targets=0) d.addCallback(lambda _: self.tc.run(t)) d.addCallback(lambda tid: self.tc.get_task_result(tid,block=True)) @@ -85,10 +85,10 @@ class ITaskControllerTestCase(TaskTestBase): d.addErrback(lambda f: self.assertRaises(NameError, _raise_it, f)) return d - def testSimpleRetries(self): + def test_simple_retries(self): self.addEngine(1) - t = task.Task("i += 1\nassert i == 16", pull='i',retries=10) - t2 = task.Task("i += 1\nassert i == 16", pull='i',retries=10) + t = task.StringTask("i += 1\nassert i == 16", pull='i',retries=10) + t2 = task.StringTask("i += 1\nassert i == 16", pull='i',retries=10) d = self.multiengine.execute('i=0', targets=0) d.addCallback(lambda r: self.tc.run(t)) d.addCallback(self.tc.get_task_result, block=True) @@ -101,10 +101,10 @@ class ITaskControllerTestCase(TaskTestBase): d.addCallback(lambda r: self.assertEquals(r, 16)) return d - def testRecoveryTasks(self): + def test_recovery_tasks(self): self.addEngine(1) - t = task.Task("i=16", pull='i') - t2 = task.Task("raise Exception", recovery_task=t, retries = 2) + t = task.StringTask("i=16", pull='i') + t2 = task.StringTask("raise Exception", recovery_task=t, retries = 2) d = self.tc.run(t2) d.addCallback(self.tc.get_task_result, block=True) @@ -112,47 +112,76 @@ class ITaskControllerTestCase(TaskTestBase): d.addCallback(lambda r: self.assertEquals(r, 16)) return d - # def testInfiniteRecoveryLoop(self): - # self.addEngine(1) - # t = task.Task("raise Exception", retries = 5) - # t2 = task.Task("assert True", retries = 2, recovery_task = t) - # t.recovery_task = t2 - # - # d = self.tc.run(t) - # d.addCallback(self.tc.get_task_result, block=True) - # d.addCallback(lambda tr: tr.ns.i) - # d.addBoth(printer) - # d.addErrback(lambda f: self.assertRaises(AssertionError, f.raiseException)) - # return d - # - def testSetupNS(self): + def test_setup_ns(self): self.addEngine(1) d = self.multiengine.execute('a=0', targets=0) ns = dict(a=1, b=0) - t = task.Task("", push=ns, pull=['a','b']) + t = task.StringTask("", push=ns, pull=['a','b']) d.addCallback(lambda r: self.tc.run(t)) d.addCallback(self.tc.get_task_result, block=True) d.addCallback(lambda tr: {'a':tr.ns.a, 'b':tr['b']}) d.addCallback(lambda r: self.assertEquals(r, ns)) return d - def testTaskResults(self): + def test_string_task_results(self): self.addEngine(1) - t1 = task.Task('a=5', pull='a') + t1 = task.StringTask('a=5', pull='a') d = self.tc.run(t1) d.addCallback(self.tc.get_task_result, block=True) - d.addCallback(lambda tr: (tr.ns.a,tr['a'],tr.failure, tr.raiseException())) + d.addCallback(lambda tr: (tr.ns.a,tr['a'],tr.failure, tr.raise_exception())) d.addCallback(lambda r: self.assertEquals(r, (5,5,None,None))) - t2 = task.Task('7=5') + t2 = task.StringTask('7=5') d.addCallback(lambda r: self.tc.run(t2)) d.addCallback(self.tc.get_task_result, block=True) d.addCallback(lambda tr: tr.ns) d.addErrback(lambda f: self.assertRaises(SyntaxError, f.raiseException)) - t3 = task.Task('', pull='b') + t3 = task.StringTask('', pull='b') d.addCallback(lambda r: self.tc.run(t3)) d.addCallback(self.tc.get_task_result, block=True) d.addCallback(lambda tr: tr.ns) d.addErrback(lambda f: self.assertRaises(NameError, f.raiseException)) return d + + def test_map_task(self): + self.addEngine(1) + t1 = task.MapTask(lambda x: 2*x,(10,)) + d = self.tc.run(t1) + d.addCallback(self.tc.get_task_result, block=True) + d.addCallback(lambda r: self.assertEquals(r,20)) + + t2 = task.MapTask(lambda : 20) + d.addCallback(lambda _: self.tc.run(t2)) + d.addCallback(self.tc.get_task_result, block=True) + d.addCallback(lambda r: self.assertEquals(r,20)) + + t3 = task.MapTask(lambda x: x,(),{'x':20}) + d.addCallback(lambda _: self.tc.run(t3)) + d.addCallback(self.tc.get_task_result, block=True) + d.addCallback(lambda r: self.assertEquals(r,20)) + return d + + def test_map_task_failure(self): + self.addEngine(1) + t1 = task.MapTask(lambda x: 1/0,(10,)) + d = self.tc.run(t1) + d.addCallback(self.tc.get_task_result, block=True) + d.addErrback(lambda f: self.assertRaises(ZeroDivisionError, f.raiseException)) + return d + + def test_map_task_args(self): + self.assertRaises(TypeError, task.MapTask, 'asdfasdf') + self.assertRaises(TypeError, task.MapTask, lambda x: x, 10) + self.assertRaises(TypeError, task.MapTask, lambda x: x, (10,),30) + + def test_clear(self): + self.addEngine(1) + t1 = task.MapTask(lambda x: 2*x,(10,)) + d = self.tc.run(t1) + d.addCallback(lambda _: self.tc.get_task_result(0, block=True)) + d.addCallback(lambda r: self.assertEquals(r,20)) + d.addCallback(lambda _: self.tc.clear()) + d.addCallback(lambda _: self.tc.get_task_result(0, block=True)) + d.addErrback(lambda f: self.assertRaises(IndexError, f.raiseException)) + return d diff --git a/IPython/kernel/tests/test_enginefc.py b/IPython/kernel/tests/test_enginefc.py index be99c4c..7f482cf 100644 --- a/IPython/kernel/tests/test_enginefc.py +++ b/IPython/kernel/tests/test_enginefc.py @@ -38,7 +38,7 @@ try: IEngineQueuedTestCase except ImportError: print "we got an error!!!" - pass + raise else: class EngineFCTest(DeferredTestCase, IEngineCoreTestCase, diff --git a/IPython/kernel/tests/test_multienginefc.py b/IPython/kernel/tests/test_multienginefc.py index 610ada1..f390992 100644 --- a/IPython/kernel/tests/test_multienginefc.py +++ b/IPython/kernel/tests/test_multienginefc.py @@ -26,9 +26,20 @@ try: from IPython.kernel.multienginefc import IFCSynchronousMultiEngine from IPython.kernel import multiengine as me from IPython.kernel.clientconnector import ClientConnector + from IPython.kernel.parallelfunction import ParallelFunction + from IPython.kernel.error import CompositeError + from IPython.kernel.util import printer except ImportError: pass else: + + def _raise_it(f): + try: + f.raiseException() + except CompositeError, e: + e.raise_exception() + + class FullSynchronousMultiEngineTestCase(DeferredTestCase, IFullSynchronousMultiEngineTestCase): def setUp(self): @@ -68,3 +79,66 @@ else: d.addBoth(lambda _: self.controller.stopService()) dlist.append(d) return defer.DeferredList(dlist) + + def test_mapper(self): + self.addEngine(4) + m = self.multiengine.mapper() + self.assertEquals(m.multiengine,self.multiengine) + self.assertEquals(m.dist,'b') + self.assertEquals(m.targets,'all') + self.assertEquals(m.block,True) + + def test_map_default(self): + self.addEngine(4) + m = self.multiengine.mapper() + d = m.map(lambda x: 2*x, range(10)) + d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)])) + d.addCallback(lambda _: self.multiengine.map(lambda x: 2*x, range(10))) + d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)])) + return d + + def test_map_noblock(self): + self.addEngine(4) + m = self.multiengine.mapper(block=False) + d = m.map(lambda x: 2*x, range(10)) + d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True)) + d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)])) + return d + + def test_mapper_fail(self): + self.addEngine(4) + m = self.multiengine.mapper() + d = m.map(lambda x: 1/0, range(10)) + d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f)) + return d + + def test_parallel(self): + self.addEngine(4) + p = self.multiengine.parallel() + self.assert_(isinstance(p, ParallelFunction)) + @p + def f(x): return 2*x + d = f(range(10)) + d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)])) + return d + + def test_parallel_noblock(self): + self.addEngine(1) + p = self.multiengine.parallel(block=False) + self.assert_(isinstance(p, ParallelFunction)) + @p + def f(x): return 2*x + d = f(range(10)) + d.addCallback(lambda did: self.multiengine.get_pending_deferred(did, True)) + d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)])) + return d + + def test_parallel_fail(self): + self.addEngine(4) + p = self.multiengine.parallel() + self.assert_(isinstance(p, ParallelFunction)) + @p + def f(x): return 1/0 + d = f(range(10)) + d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f)) + return d \ No newline at end of file diff --git a/IPython/kernel/tests/test_pendingdeferred.py b/IPython/kernel/tests/test_pendingdeferred.py old mode 100755 new mode 100644 index 2e14968..2ac9bda --- a/IPython/kernel/tests/test_pendingdeferred.py +++ b/IPython/kernel/tests/test_pendingdeferred.py @@ -20,8 +20,6 @@ try: from twisted.internet import defer from twisted.python import failure - from IPython.testing import tcommon - #from IPython.testing.tcommon import * from IPython.testing.util import DeferredTestCase import IPython.kernel.pendingdeferred as pd from IPython.kernel import error @@ -29,6 +27,11 @@ try: except ImportError: pass else: + + class Foo(object): + + def bar(self, bahz): + return defer.succeed('blahblah: %s' % bahz) class TwoPhaseFoo(pd.PendingDeferredManager): @@ -181,6 +184,3 @@ else: d3 = self.pdm.get_pending_deferred(did,False) d3.addCallback(lambda r: self.assertEquals(r,'bar')) - -# Global object expected by Twisted's trial -testSuite = lambda : makeTestSuite(__name__,dt_files,dt_modules) diff --git a/IPython/kernel/tests/test_taskfc.py b/IPython/kernel/tests/test_taskfc.py index 1e15317..e2b4122 100644 --- a/IPython/kernel/tests/test_taskfc.py +++ b/IPython/kernel/tests/test_taskfc.py @@ -30,6 +30,8 @@ try: from IPython.kernel.util import printer from IPython.kernel.tests.tasktest import ITaskControllerTestCase from IPython.kernel.clientconnector import ClientConnector + from IPython.kernel.error import CompositeError + from IPython.kernel.parallelfunction import ParallelFunction except ImportError: pass else: @@ -38,6 +40,12 @@ else: # Tests #------------------------------------------------------------------------------- + def _raise_it(f): + try: + f.raiseException() + except CompositeError, e: + e.raise_exception() + class TaskTest(DeferredTestCase, ITaskControllerTestCase): def setUp(self): @@ -87,4 +95,67 @@ else: d.addBoth(lambda _: self.controller.stopService()) dlist.append(d) return defer.DeferredList(dlist) - + + def test_mapper(self): + self.addEngine(1) + m = self.tc.mapper() + self.assertEquals(m.task_controller,self.tc) + self.assertEquals(m.clear_before,False) + self.assertEquals(m.clear_after,False) + self.assertEquals(m.retries,0) + self.assertEquals(m.recovery_task,None) + self.assertEquals(m.depend,None) + self.assertEquals(m.block,True) + + def test_map_default(self): + self.addEngine(1) + m = self.tc.mapper() + d = m.map(lambda x: 2*x, range(10)) + d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)])) + d.addCallback(lambda _: self.tc.map(lambda x: 2*x, range(10))) + d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)])) + return d + + def test_map_noblock(self): + self.addEngine(1) + m = self.tc.mapper(block=False) + d = m.map(lambda x: 2*x, range(10)) + d.addCallback(lambda r: self.assertEquals(r,[x for x in range(10)])) + return d + + def test_mapper_fail(self): + self.addEngine(1) + m = self.tc.mapper() + d = m.map(lambda x: 1/0, range(10)) + d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f)) + return d + + def test_parallel(self): + self.addEngine(1) + p = self.tc.parallel() + self.assert_(isinstance(p, ParallelFunction)) + @p + def f(x): return 2*x + d = f(range(10)) + d.addCallback(lambda r: self.assertEquals(r,[2*x for x in range(10)])) + return d + + def test_parallel_noblock(self): + self.addEngine(1) + p = self.tc.parallel(block=False) + self.assert_(isinstance(p, ParallelFunction)) + @p + def f(x): return 2*x + d = f(range(10)) + d.addCallback(lambda r: self.assertEquals(r,[x for x in range(10)])) + return d + + def test_parallel_fail(self): + self.addEngine(1) + p = self.tc.parallel() + self.assert_(isinstance(p, ParallelFunction)) + @p + def f(x): return 1/0 + d = f(range(10)) + d.addBoth(lambda f: self.assertRaises(ZeroDivisionError, _raise_it, f)) + return d \ No newline at end of file diff --git a/IPython/testing/tests/test_testutils.py b/IPython/testing/tests/test_testutils.py deleted file mode 100755 index 683661b..0000000 --- a/IPython/testing/tests/test_testutils.py +++ /dev/null @@ -1,66 +0,0 @@ -#!/usr/bin/env python -# encoding: utf-8 -"""Simple template for unit tests. - -This file should be renamed to - -test_FEATURE.py - -so that it is recognized by the overall test driver (Twisted's 'trial'), which -looks for all test_*.py files in the current directory to extract tests from -them. -""" -__docformat__ = "restructuredtext en" - -#------------------------------------------------------------------------------- -# Copyright (C) 2005 Fernando Perez -# Brian E Granger -# Benjamin Ragan-Kelley -# -# Distributed under the terms of the BSD License. The full license is in -# the file COPYING, distributed as part of this software. -#------------------------------------------------------------------------------- - -#------------------------------------------------------------------------------- -# Imports -#------------------------------------------------------------------------------- - -from IPython.testing import tcommon -from IPython.testing.tcommon import * - -#------------------------------------------------------------------------------- -# Setup for inline and standalone doctests -#------------------------------------------------------------------------------- - - -# If you have standalone doctests in a separate file, set their names in the -# dt_files variable (as a single string or a list thereof). The mkPath call -# forms an absolute path based on the current file, it is not needed if you -# provide the full pahts. -dt_files = fullPath(__file__,[]) - - -# If you have any modules whose docstrings should be scanned for embedded tests -# as examples accorging to standard doctest practice, set them here (as a -# single string or a list thereof): -dt_modules = ['IPython.testing.tutils'] - -#------------------------------------------------------------------------------- -# Regular Unittests -#------------------------------------------------------------------------------- - -## class FooTestCase(unittest.TestCase): -## def test_foo(self): -## pass - -#------------------------------------------------------------------------------- -# Regular Unittests -#------------------------------------------------------------------------------- - -# This ensures that the code will run either standalone as a script, or that it -# can be picked up by Twisted's `trial` test wrapper to run all the tests. -if tcommon.pexpect is not None: - if __name__ == '__main__': - unittest.main(testLoader=IPDocTestLoader(dt_files,dt_modules)) - else: - testSuite = lambda : makeTestSuite(__name__,dt_files,dt_modules) diff --git a/IPython/tools/tests/__init__.py b/IPython/tools/tests/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/IPython/tools/tests/__init__.py diff --git a/docs/examples/kernel/fetchparse.py b/docs/examples/kernel/fetchparse.py index 4c5d489..421bb25 100644 --- a/docs/examples/kernel/fetchparse.py +++ b/docs/examples/kernel/fetchparse.py @@ -53,7 +53,7 @@ class DistributedSpider(object): self.allLinks.append(url) if url.startswith(self.site): print ' ', url - self.linksWorking[url] = self.tc.run(client.Task('links = fetchAndParse(url)', pull=['links'], push={'url': url})) + self.linksWorking[url] = self.tc.run(client.StringTask('links = fetchAndParse(url)', pull=['links'], push={'url': url})) def onVisitDone(self, result, url): print url, ':' diff --git a/docs/examples/kernel/helloworld.py b/docs/examples/kernel/helloworld.py index a96aa34..ce18244 100644 --- a/docs/examples/kernel/helloworld.py +++ b/docs/examples/kernel/helloworld.py @@ -8,7 +8,7 @@ tc = client.TaskClient() mec = client.MultiEngineClient() mec.execute('import time') -hello_taskid = tc.run(client.Task('time.sleep(3) ; word = "Hello,"', pull=('word'))) -world_taskid = tc.run(client.Task('time.sleep(3) ; word = "World!"', pull=('word'))) +hello_taskid = tc.run(client.StringTask('time.sleep(3) ; word = "Hello,"', pull=('word'))) +world_taskid = tc.run(client.StringTask('time.sleep(3) ; word = "World!"', pull=('word'))) print "Submitted tasks:", hello_taskid, world_taskid print tc.get_task_result(hello_taskid,block=True).ns.word, tc.get_task_result(world_taskid,block=True).ns.word diff --git a/docs/examples/kernel/mcdriver.py b/docs/examples/kernel/mcdriver.py index d85fac9..2a5a8e1 100644 --- a/docs/examples/kernel/mcdriver.py +++ b/docs/examples/kernel/mcdriver.py @@ -31,7 +31,7 @@ sigma_vals = N.linspace(0.0, 0.2,5) taskids = [] for K in K_vals: for sigma in sigma_vals: - t = client.Task(task_string, + t = client.StringTask(task_string, push=dict(sigma=sigma,K=K), pull=('vp','ap','vc','ac','sigma','K')) taskids.append(tc.run(t)) diff --git a/docs/examples/kernel/multienginemap.py b/docs/examples/kernel/multienginemap.py new file mode 100644 index 0000000..016e0a8 --- /dev/null +++ b/docs/examples/kernel/multienginemap.py @@ -0,0 +1,18 @@ +from IPython.kernel import client + +mec = client.MultiEngineClient() + +result = mec.map(lambda x: 2*x, range(10)) +print "Simple, default map: ", result + +m = mec.mapper(block=False) +pr = m.map(lambda x: 2*x, range(10)) +print "Submitted map, got PendingResult: ", pr +result = pr.r +print "Using a mapper: ", result + +@mec.parallel() +def f(x): return 2*x + +result = f(range(10)) +print "Using a parallel function: ", result \ No newline at end of file diff --git a/docs/examples/kernel/task1.py b/docs/examples/kernel/task1.py index d233ef1..34d989d 100644 --- a/docs/examples/kernel/task1.py +++ b/docs/examples/kernel/task1.py @@ -11,8 +11,8 @@ b = 10*d c = a*b*d """ -t1 = client.Task(cmd1, clear_before=False, clear_after=True, pull=['a','b','c']) +t1 = client.StringTask(cmd1, clear_before=False, clear_after=True, pull=['a','b','c']) tid1 = tc.run(t1) tr1 = tc.get_task_result(tid1,block=True) -tr1.raiseException() +tr1.raise_exception() print "a, b: ", tr1.ns.a, tr1.ns.b \ No newline at end of file diff --git a/docs/examples/kernel/task2.py b/docs/examples/kernel/task2.py index 7a3c03d..661ad22 100644 --- a/docs/examples/kernel/task2.py +++ b/docs/examples/kernel/task2.py @@ -10,7 +10,7 @@ mec = client.MultiEngineClient() mec.execute('import time') for i in range(24): - tc.irun('time.sleep(1)') + tc.run(client.StringTask('time.sleep(1)')) for i in range(6): time.sleep(1.0) @@ -18,7 +18,7 @@ for i in range(6): print tc.queue_status() for i in range(24): - tc.irun('time.sleep(1)') + tc.run(client.StringTask('time.sleep(1)')) for i in range(6): time.sleep(1.0) @@ -26,7 +26,7 @@ for i in range(6): print tc.queue_status(True) for i in range(12): - tc.irun('time.sleep(2)') + tc.run(client.StringTask('time.sleep(2)')) print "Queue status (vebose=True)" print tc.queue_status(True) diff --git a/docs/examples/kernel/task_profiler.py b/docs/examples/kernel/task_profiler.py index 23118d9..3078e09 100644 --- a/docs/examples/kernel/task_profiler.py +++ b/docs/examples/kernel/task_profiler.py @@ -55,7 +55,7 @@ def main(): # the jobs should take a random time within a range times = [random.random()*(opts.tmax-opts.tmin)+opts.tmin for i in range(opts.n)] - tasks = [client.Task("time.sleep(%f)"%t) for t in times] + tasks = [client.StringTask("time.sleep(%f)"%t) for t in times] stime = sum(times) print "executing %i tasks, totalling %.1f secs on %i engines"%(opts.n, stime, nengines) diff --git a/docs/examples/kernel/taskmap.py b/docs/examples/kernel/taskmap.py new file mode 100644 index 0000000..44c311b --- /dev/null +++ b/docs/examples/kernel/taskmap.py @@ -0,0 +1,19 @@ +from IPython.kernel import client + +tc = client.TaskClient() + +result = tc.map(lambda x: 2*x, range(10)) +print "Simple, default map: ", result + +m = tc.mapper(block=False, clear_after=True, clear_before=True) +tids = m.map(lambda x: 2*x, range(10)) +print "Submitted tasks, got ids: ", tids +tc.barrier(tids) +result = [tc.get_task_result(tid) for tid in tids] +print "Using a mapper: ", result + +@tc.parallel() +def f(x): return 2*x + +result = f(range(10)) +print "Using a parallel function: ", result \ No newline at end of file diff --git a/docs/source/changes.txt b/docs/source/changes.txt index 25f1fa7..03bbf91 100644 --- a/docs/source/changes.txt +++ b/docs/source/changes.txt @@ -12,6 +12,22 @@ Release 0.9 New features ------------ + * The notion of a task has been completely reworked. An `ITask` interface has + been created. This interface defines the methods that tasks need to implement. + These methods are now responsible for things like submitting tasks and processing + results. There are two basic task types: :class:`IPython.kernel.task.StringTask` + (this is the old `Task` object, but renamed) and the new + :class:`IPython.kernel.task.MapTask`, which is based on a function. + * A new interface, :class:`IPython.kernel.mapper.IMapper` has been defined to + standardize the idea of a `map` method. This interface has a single + `map` method that has the same syntax as the built-in `map`. We have also defined + a `mapper` factory interface that creates objects that implement + :class:`IPython.kernel.mapper.IMapper` for different controllers. Both + the multiengine and task controller now have mapping capabilties. + * The parallel function capabilities have been reworks. The major changes are that + i) there is now an `@parallel` magic that creates parallel functions, ii) + the syntax for mulitple variable follows that of `map`, iii) both the + multiengine and task controller now have a parallel function implementation. * All of the parallel computing capabilities from `ipython1-dev` have been merged into IPython proper. This resulted in the following new subpackages: :mod:`IPython.kernel`, :mod:`IPython.kernel.core`, :mod:`IPython.config`, @@ -38,11 +54,11 @@ New features when ipcluster is able to start things on other hosts, we will put security back. - - Bug fixes --------- + * The colors escapes in the multiengine client are now turned off on win32 as they + don't print correctly. * The :mod:`IPython.kernel.scripts.ipengine` script was exec'ing mpi_import_statement incorrectly, which was leading the engine to crash when mpi was enabled. * A few subpackages has missing `__init__.py` files. @@ -52,6 +68,12 @@ Bug fixes Backwards incompatible changes ------------------------------ + * :class:`IPython.kernel.client.Task` has been renamed + :class:`IPython.kernel.client.StringTask` to make way for new task types. + * The keyword argument `style` has been renamed `dist` in `scatter`, `gather` + and `map`. + * Renamed the values that the rename `dist` keyword argument can have from + `'basic'` to `'b'`. * IPython has a larger set of dependencies if you want all of its capabilities. See the `setup.py` script for details. * The constructors for :class:`IPython.kernel.client.MultiEngineClient` and diff --git a/docs/source/install/index.txt b/docs/source/install/index.txt index 537d42c..63dfae4 100644 --- a/docs/source/install/index.txt +++ b/docs/source/install/index.txt @@ -1,3 +1,5 @@ +.. _install_index: + ================== Installation ================== diff --git a/docs/source/overview.txt b/docs/source/overview.txt index 3877240..2b3320d 100644 --- a/docs/source/overview.txt +++ b/docs/source/overview.txt @@ -4,18 +4,6 @@ Introduction ============ -This is the official documentation for IPython 0.x series (i.e. what -we are used to refer to just as "IPython"). The original text of the -manual (most of which is still in place) has been authored by Fernando -Perez, but as recommended usage patterns and new features have -emerged, this manual has been updated to reflect that fact. Most of -the additions have been authored by Ville M. Vainio. - -The manual has been generated from reStructuredText source markup with -Sphinx, which should make it much easier to keep it up-to-date in the -future. Some reST artifacts and bugs may still be apparent in the -documentation, but this should improve as the toolchain matures. - Overview ======== @@ -25,8 +13,19 @@ creating test files as is typical in most programming languages. However, the interpreter supplied with the standard Python distribution is somewhat limited for extended interactive use. -IPython is a free software project (released under the BSD license) -which tries to: +The goal of IPython is to create a comprehensive environment for +interactive and exploratory computing. To support, this goal, IPython +has two main components: + + * An enhanced interactive Python shell. + * An architecture for interactive parallel computing. + +All of IPython is open source (released under the revised BSD license). + +Enhanced interactive Python shell +================================= + +IPython's interactive shell (`ipython`), has the following goals: 1. Provide an interactive shell superior to Python's default. IPython has many features for object introspection, system shell access, @@ -50,140 +49,126 @@ which tries to: WX applications via special threading flags. The normal Python shell can only do this for Tkinter applications. - -Main features -------------- - -* Dynamic object introspection. One can access docstrings, function - definition prototypes, source code, source files and other details - of any object accessible to the interpreter with a single - keystroke ('?', and using '??' provides additional detail). -* Searching through modules and namespaces with '*' wildcards, both - when using the '?' system and via the %psearch command. -* Completion in the local namespace, by typing TAB at the prompt. - This works for keywords, modules, methods, variables and files in the - current directory. This is supported via the readline library, and - full access to configuring readline's behavior is provided. - Custom completers can be implemented easily for different purposes - (system commands, magic arguments etc.) -* Numbered input/output prompts with command history (persistent - across sessions and tied to each profile), full searching in this - history and caching of all input and output. -* User-extensible 'magic' commands. A set of commands prefixed with - % is available for controlling IPython itself and provides - directory control, namespace information and many aliases to - common system shell commands. -* Alias facility for defining your own system aliases. -* Complete system shell access. Lines starting with ! are passed - directly to the system shell, and using !! or var = !cmd - captures shell output into python variables for further use. -* Background execution of Python commands in a separate thread. - IPython has an internal job manager called jobs, and a - conveninence backgrounding magic function called %bg. -* The ability to expand python variables when calling the system - shell. In a shell command, any python variable prefixed with $ is - expanded. A double $$ allows passing a literal $ to the shell (for - access to shell and environment variables like $PATH). -* Filesystem navigation, via a magic %cd command, along with a - persistent bookmark system (using %bookmark) for fast access to - frequently visited directories. -* A lightweight persistence framework via the %store command, which - allows you to save arbitrary Python variables. These get restored - automatically when your session restarts. -* Automatic indentation (optional) of code as you type (through the - readline library). -* Macro system for quickly re-executing multiple lines of previous - input with a single name. Macros can be stored persistently via - %store and edited via %edit. -* Session logging (you can then later use these logs as code in your - programs). Logs can optionally timestamp all input, and also store - session output (marked as comments, so the log remains valid - Python source code). -* Session restoring: logs can be replayed to restore a previous - session to the state where you left it. -* Verbose and colored exception traceback printouts. Easier to parse - visually, and in verbose mode they produce a lot of useful - debugging information (basically a terminal version of the cgitb - module). -* Auto-parentheses: callable objects can be executed without - parentheses: 'sin 3' is automatically converted to 'sin(3)'. -* Auto-quoting: using ',' or ';' as the first character forces - auto-quoting of the rest of the line: ',my_function a b' becomes - automatically 'my_function("a","b")', while ';my_function a b' - becomes 'my_function("a b")'. -* Extensible input syntax. You can define filters that pre-process - user input to simplify input in special situations. This allows - for example pasting multi-line code fragments which start with - '>>>' or '...' such as those from other python sessions or the - standard Python documentation. -* Flexible configuration system. It uses a configuration file which - allows permanent setting of all command-line options, module - loading, code and file execution. The system allows recursive file - inclusion, so you can have a base file with defaults and layers - which load other customizations for particular projects. -* Embeddable. You can call IPython as a python shell inside your own - python programs. This can be used both for debugging code or for - providing interactive abilities to your programs with knowledge - about the local namespaces (very useful in debugging and data - analysis situations). -* Easy debugger access. You can set IPython to call up an enhanced - version of the Python debugger (pdb) every time there is an - uncaught exception. This drops you inside the code which triggered - the exception with all the data live and it is possible to - navigate the stack to rapidly isolate the source of a bug. The - %run magic command -with the -d option- can run any script under - pdb's control, automatically setting initial breakpoints for you. - This version of pdb has IPython-specific improvements, including - tab-completion and traceback coloring support. For even easier - debugger access, try %debug after seeing an exception. winpdb is - also supported, see ipy_winpdb extension. -* Profiler support. You can run single statements (similar to - profile.run()) or complete programs under the profiler's control. - While this is possible with standard cProfile or profile modules, - IPython wraps this functionality with magic commands (see '%prun' - and '%run -p') convenient for rapid interactive work. -* Doctest support. The special %doctest_mode command toggles a mode - that allows you to paste existing doctests (with leading '>>>' - prompts and whitespace) and uses doctest-compatible prompts and - output, so you can use IPython sessions as doctest code. - +Main features of the interactive shell +-------------------------------------- + + * Dynamic object introspection. One can access docstrings, function + definition prototypes, source code, source files and other details + of any object accessible to the interpreter with a single + keystroke (:samp:`?`, and using :samp:`??` provides additional detail). + * Searching through modules and namespaces with :samp:`*` wildcards, both + when using the :samp:`?` system and via the :samp:`%psearch` command. + * Completion in the local namespace, by typing :kbd:`TAB` at the prompt. + This works for keywords, modules, methods, variables and files in the + current directory. This is supported via the readline library, and + full access to configuring readline's behavior is provided. + Custom completers can be implemented easily for different purposes + (system commands, magic arguments etc.) + * Numbered input/output prompts with command history (persistent + across sessions and tied to each profile), full searching in this + history and caching of all input and output. + * User-extensible 'magic' commands. A set of commands prefixed with + :samp:`%` is available for controlling IPython itself and provides + directory control, namespace information and many aliases to + common system shell commands. + * Alias facility for defining your own system aliases. + * Complete system shell access. Lines starting with :samp:`!` are passed + directly to the system shell, and using :samp:`!!` or :samp:`var = !cmd` + captures shell output into python variables for further use. + * Background execution of Python commands in a separate thread. + IPython has an internal job manager called jobs, and a + conveninence backgrounding magic function called :samp:`%bg`. + * The ability to expand python variables when calling the system + shell. In a shell command, any python variable prefixed with :samp:`$` is + expanded. A double :samp:`$$` allows passing a literal :samp:`$` to the shell (for + access to shell and environment variables like :envvar:`PATH`). + * Filesystem navigation, via a magic :samp:`%cd` command, along with a + persistent bookmark system (using :samp:`%bookmark`) for fast access to + frequently visited directories. + * A lightweight persistence framework via the :samp:`%store` command, which + allows you to save arbitrary Python variables. These get restored + automatically when your session restarts. + * Automatic indentation (optional) of code as you type (through the + readline library). + * Macro system for quickly re-executing multiple lines of previous + input with a single name. Macros can be stored persistently via + :samp:`%store` and edited via :samp:`%edit`. + * Session logging (you can then later use these logs as code in your + programs). Logs can optionally timestamp all input, and also store + session output (marked as comments, so the log remains valid + Python source code). + * Session restoring: logs can be replayed to restore a previous + session to the state where you left it. + * Verbose and colored exception traceback printouts. Easier to parse + visually, and in verbose mode they produce a lot of useful + debugging information (basically a terminal version of the cgitb + module). + * Auto-parentheses: callable objects can be executed without + parentheses: :samp:`sin 3` is automatically converted to :samp:`sin(3)`. + * Auto-quoting: using :samp:`,`, or :samp:`;` as the first character forces + auto-quoting of the rest of the line: :samp:`,my_function a b` becomes + automatically :samp:`my_function("a","b")`, while :samp:`;my_function a b` + becomes :samp:`my_function("a b")`. + * Extensible input syntax. You can define filters that pre-process + user input to simplify input in special situations. This allows + for example pasting multi-line code fragments which start with + :samp:`>>>` or :samp:`...` such as those from other python sessions or the + standard Python documentation. + * Flexible configuration system. It uses a configuration file which + allows permanent setting of all command-line options, module + loading, code and file execution. The system allows recursive file + inclusion, so you can have a base file with defaults and layers + which load other customizations for particular projects. + * Embeddable. You can call IPython as a python shell inside your own + python programs. This can be used both for debugging code or for + providing interactive abilities to your programs with knowledge + about the local namespaces (very useful in debugging and data + analysis situations). + * Easy debugger access. You can set IPython to call up an enhanced + version of the Python debugger (pdb) every time there is an + uncaught exception. This drops you inside the code which triggered + the exception with all the data live and it is possible to + navigate the stack to rapidly isolate the source of a bug. The + :samp:`%run` magic command (with the :samp:`-d` option) can run any script under + pdb's control, automatically setting initial breakpoints for you. + This version of pdb has IPython-specific improvements, including + tab-completion and traceback coloring support. For even easier + debugger access, try :samp:`%debug` after seeing an exception. winpdb is + also supported, see ipy_winpdb extension. + * Profiler support. You can run single statements (similar to + :samp:`profile.run()`) or complete programs under the profiler's control. + While this is possible with standard cProfile or profile modules, + IPython wraps this functionality with magic commands (see :samp:`%prun` + and :samp:`%run -p`) convenient for rapid interactive work. + * Doctest support. The special :samp:`%doctest_mode` command toggles a mode + that allows you to paste existing doctests (with leading :samp:`>>>` + prompts and whitespace) and uses doctest-compatible prompts and + output, so you can use IPython sessions as doctest code. + +Interactive parallel computing +============================== + +Increasingly, parallel computer hardware, such as multicore CPUs, clusters and supercomputers, is becoming ubiquitous. Over the last 3 years, we have developed an +architecture within IPython that allows such hardware to be used quickly and easily +from Python. Moreover, this architecture is designed to support interactive and +collaborative parallel computing. + +For more information, see our :ref:`overview ` of using IPython for +parallel computing. Portability and Python requirements ----------------------------------- -Python requirements: IPython requires with Python version 2.3 or newer. -If you are still using Python 2.2 and can not upgrade, the last version -of IPython which worked with Python 2.2 was 0.6.15, so you will have to -use that. - -IPython is developed under Linux, but it should work in any reasonable -Unix-type system (tested OK under Solaris and the BSD family, for which -a port exists thanks to Dryice Liu). - -Mac OS X: it works, apparently without any problems (thanks to Jim Boyle -at Lawrence Livermore for the information). Thanks to Andrea Riciputi, -Fink support is available. - -CygWin: it works mostly OK, though some users have reported problems -with prompt coloring. No satisfactory solution to this has been found so -far, you may want to disable colors permanently in the ipythonrc -configuration file if you experience problems. If you have proper color -support under cygwin, please post to the IPython mailing list so this -issue can be resolved for all users. - -Windows: it works well under Windows Vista/XP/2k, and I suspect NT should -behave similarly. Section "Installation under windows" describes -installation details for Windows, including some additional tools needed -on this platform. - -Windows 9x support is present, and has been reported to work fine (at -least on WinME). - -Location --------- - -IPython is generously hosted at http://ipython.scipy.org by the -Enthought, Inc and the SciPy project. This site offers downloads, -subversion access, mailing lists and a bug tracking system. I am very -grateful to Enthought (http://www.enthought.com) and all of the SciPy -team for their contribution. \ No newline at end of file +As of the 0.9 release, IPython requires Python 2.4 or greater. We have +not begun to test IPython on Python 2.6 or 3.0, but we expect it will +work with some minor changes. + +IPython is known to work on the following operating systems: + + * Linux + * AIX + * Most other Unix-like OSs (Solaris, BSD, etc.) + * Mac OS X + * Windows (CygWin, XP, Vista, etc.) + +See :ref:`here ` for instructions on how to install IPython. \ No newline at end of file diff --git a/docs/source/parallel/index.txt b/docs/source/parallel/index.txt index da5fe16..cc31f75 100644 --- a/docs/source/parallel/index.txt +++ b/docs/source/parallel/index.txt @@ -1,3 +1,5 @@ +.. _parallel_index: + ==================================== Using IPython for Parallel computing ====================================