##// END OF EJS Templates
Merge my main trunk work. Lots of testing and documentation work....
Merge my main trunk work. Lots of testing and documentation work. See the individual log messages for details, this is a branch that added a lot of tests, as well as improving the testing system quite a bit so it is more robust and identifies many more tests. Various API cleanups, bug fixes and improvements went in as well.

File last commit:

r1395:1feaf0a3
r1878:56332962 merge
Show More
mapper.py
232 lines | 8.4 KiB | text/x-python | PythonLexer
# encoding: utf-8
"""A parallelized version of Python's builtin map."""
__docformat__ = "restructuredtext en"
#----------------------------------------------------------------------------
# Copyright (C) 2008 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#----------------------------------------------------------------------------
#----------------------------------------------------------------------------
# Imports
#----------------------------------------------------------------------------
from types import FunctionType
from zope.interface import Interface, implements
from IPython.kernel.task import MapTask
from IPython.kernel.twistedutil import DeferredList, gatherBoth
from IPython.kernel.util import printer
from IPython.kernel.error import collect_exceptions
#----------------------------------------------------------------------------
# Code
#----------------------------------------------------------------------------
class IMapper(Interface):
"""The basic interface for a Mapper.
This defines a generic interface for mapping. The idea of this is
similar to that of Python's builtin `map` function, which applies a function
elementwise to a sequence.
"""
def map(func, *seqs):
"""Do map in parallel.
Equivalent to map(func, *seqs) or:
[func(seqs[0][0], seqs[1][0],...), func(seqs[0][1], seqs[1][1],...),...]
:Parameters:
func : FunctionType
The function to apply to the sequence
sequences : tuple of iterables
A sequence of iterables that are used for sucessive function
arguments. This work just like map
"""
class IMultiEngineMapperFactory(Interface):
"""
An interface for something that creates `IMapper` instances.
"""
def mapper(dist='b', targets='all', block=True):
"""
Create an `IMapper` implementer with a given set of arguments.
The `IMapper` created using a multiengine controller is
not load balanced.
"""
class ITaskMapperFactory(Interface):
"""
An interface for something that creates `IMapper` instances.
"""
def mapper(clear_before=False, clear_after=False, retries=0,
recovery_task=None, depend=None, block=True):
"""
Create an `IMapper` implementer with a given set of arguments.
The `IMapper` created using a task controller is load balanced.
See the documentation for `IPython.kernel.task.BaseTask` for
documentation on the arguments to this method.
"""
class MultiEngineMapper(object):
"""
A Mapper for `IMultiEngine` implementers.
"""
implements(IMapper)
def __init__(self, multiengine, dist='b', targets='all', block=True):
"""
Create a Mapper for a multiengine.
The value of all arguments are used for all calls to `map`. This
class allows these arguemnts to be set for a series of map calls.
:Parameters:
multiengine : `IMultiEngine` implementer
The multiengine to use for running the map commands
dist : str
The type of decomposition to use. Only block ('b') is
supported currently
targets : (str, int, tuple of ints)
The engines to use in the map
block : boolean
Whether to block when the map is applied
"""
self.multiengine = multiengine
self.dist = dist
self.targets = targets
self.block = block
def map(self, func, *sequences):
"""
Apply func to *sequences elementwise. Like Python's builtin map.
This version is not load balanced.
"""
max_len = max(len(s) for s in sequences)
for s in sequences:
if len(s)!=max_len:
raise ValueError('all sequences must have equal length')
assert isinstance(func, (str, FunctionType)), "func must be a fuction or str"
return self.multiengine.raw_map(func, sequences, dist=self.dist,
targets=self.targets, block=self.block)
class TaskMapper(object):
"""
Make an `ITaskController` look like an `IMapper`.
This class provides a load balanced version of `map`.
"""
def __init__(self, task_controller, clear_before=False, clear_after=False, retries=0,
recovery_task=None, depend=None, block=True):
"""
Create a `IMapper` given a `TaskController` and arguments.
The additional arguments are those that are common to all types of
tasks and are described in the documentation for
`IPython.kernel.task.BaseTask`.
:Parameters:
task_controller : an `IBlockingTaskClient` implementer
The `TaskController` to use for calls to `map`
"""
self.task_controller = task_controller
self.clear_before = clear_before
self.clear_after = clear_after
self.retries = retries
self.recovery_task = recovery_task
self.depend = depend
self.block = block
def map(self, func, *sequences):
"""
Apply func to *sequences elementwise. Like Python's builtin map.
This version is load balanced.
"""
max_len = max(len(s) for s in sequences)
for s in sequences:
if len(s)!=max_len:
raise ValueError('all sequences must have equal length')
task_args = zip(*sequences)
task_ids = []
dlist = []
for ta in task_args:
task = MapTask(func, ta, clear_before=self.clear_before,
clear_after=self.clear_after, retries=self.retries,
recovery_task=self.recovery_task, depend=self.depend)
dlist.append(self.task_controller.run(task))
dlist = gatherBoth(dlist, consumeErrors=1)
dlist.addCallback(collect_exceptions,'map')
if self.block:
def get_results(task_ids):
d = self.task_controller.barrier(task_ids)
d.addCallback(lambda _: gatherBoth([self.task_controller.get_task_result(tid) for tid in task_ids], consumeErrors=1))
d.addCallback(collect_exceptions, 'map')
return d
dlist.addCallback(get_results)
return dlist
class SynchronousTaskMapper(object):
"""
Make an `IBlockingTaskClient` look like an `IMapper`.
This class provides a load balanced version of `map`.
"""
def __init__(self, task_controller, clear_before=False, clear_after=False, retries=0,
recovery_task=None, depend=None, block=True):
"""
Create a `IMapper` given a `IBlockingTaskClient` and arguments.
The additional arguments are those that are common to all types of
tasks and are described in the documentation for
`IPython.kernel.task.BaseTask`.
:Parameters:
task_controller : an `IBlockingTaskClient` implementer
The `TaskController` to use for calls to `map`
"""
self.task_controller = task_controller
self.clear_before = clear_before
self.clear_after = clear_after
self.retries = retries
self.recovery_task = recovery_task
self.depend = depend
self.block = block
def map(self, func, *sequences):
"""
Apply func to *sequences elementwise. Like Python's builtin map.
This version is load balanced.
"""
max_len = max(len(s) for s in sequences)
for s in sequences:
if len(s)!=max_len:
raise ValueError('all sequences must have equal length')
task_args = zip(*sequences)
task_ids = []
for ta in task_args:
task = MapTask(func, ta, clear_before=self.clear_before,
clear_after=self.clear_after, retries=self.retries,
recovery_task=self.recovery_task, depend=self.depend)
task_ids.append(self.task_controller.run(task))
if self.block:
self.task_controller.barrier(task_ids)
task_results = [self.task_controller.get_task_result(tid) for tid in task_ids]
return task_results
else:
return task_ids