mapper.py
232 lines
| 8.4 KiB
| text/x-python
|
PythonLexer
Brian E Granger
|
r1346 | # encoding: utf-8 | |
"""A parallelized version of Python's builtin map.""" | |||
__docformat__ = "restructuredtext en" | |||
Brian E Granger
|
r1395 | #---------------------------------------------------------------------------- | |
Brian E Granger
|
r1346 | # Copyright (C) 2008 The IPython Development Team | |
# | |||
# Distributed under the terms of the BSD License. The full license is in | |||
# the file COPYING, distributed as part of this software. | |||
Brian E Granger
|
r1395 | #---------------------------------------------------------------------------- | |
Brian E Granger
|
r1346 | ||
Brian E Granger
|
r1395 | #---------------------------------------------------------------------------- | |
Brian E Granger
|
r1346 | # Imports | |
Brian E Granger
|
r1395 | #---------------------------------------------------------------------------- | |
Brian E Granger
|
r1346 | ||
from types import FunctionType | |||
from zope.interface import Interface, implements | |||
Brian E Granger
|
r1395 | from IPython.kernel.task import MapTask | |
from IPython.kernel.twistedutil import DeferredList, gatherBoth | |||
from IPython.kernel.util import printer | |||
from IPython.kernel.error import collect_exceptions | |||
#---------------------------------------------------------------------------- | |||
# Code | |||
#---------------------------------------------------------------------------- | |||
Brian E Granger
|
r1346 | ||
class IMapper(Interface): | |||
Brian E Granger
|
r1395 | """The basic interface for a Mapper. | |
This defines a generic interface for mapping. The idea of this is | |||
similar to that of Python's builtin `map` function, which applies a function | |||
elementwise to a sequence. | |||
""" | |||
def map(func, *seqs): | |||
"""Do map in parallel. | |||
Equivalent to map(func, *seqs) or: | |||
[func(seqs[0][0], seqs[1][0],...), func(seqs[0][1], seqs[1][1],...),...] | |||
:Parameters: | |||
func : FunctionType | |||
The function to apply to the sequence | |||
sequences : tuple of iterables | |||
A sequence of iterables that are used for sucessive function | |||
arguments. This work just like map | |||
""" | |||
class IMultiEngineMapperFactory(Interface): | |||
""" | |||
An interface for something that creates `IMapper` instances. | |||
""" | |||
def mapper(dist='b', targets='all', block=True): | |||
""" | |||
Create an `IMapper` implementer with a given set of arguments. | |||
The `IMapper` created using a multiengine controller is | |||
not load balanced. | |||
""" | |||
class ITaskMapperFactory(Interface): | |||
""" | |||
An interface for something that creates `IMapper` instances. | |||
""" | |||
Brian E Granger
|
r1346 | ||
Brian E Granger
|
r1395 | def mapper(clear_before=False, clear_after=False, retries=0, | |
recovery_task=None, depend=None, block=True): | |||
""" | |||
Create an `IMapper` implementer with a given set of arguments. | |||
The `IMapper` created using a task controller is load balanced. | |||
See the documentation for `IPython.kernel.task.BaseTask` for | |||
documentation on the arguments to this method. | |||
""" | |||
Brian E Granger
|
r1346 | ||
Brian E Granger
|
r1395 | ||
class MultiEngineMapper(object): | |||
""" | |||
A Mapper for `IMultiEngine` implementers. | |||
""" | |||
Brian E Granger
|
r1346 | ||
implements(IMapper) | |||
def __init__(self, multiengine, dist='b', targets='all', block=True): | |||
Brian E Granger
|
r1395 | """ | |
Create a Mapper for a multiengine. | |||
The value of all arguments are used for all calls to `map`. This | |||
class allows these arguemnts to be set for a series of map calls. | |||
:Parameters: | |||
multiengine : `IMultiEngine` implementer | |||
The multiengine to use for running the map commands | |||
dist : str | |||
The type of decomposition to use. Only block ('b') is | |||
supported currently | |||
targets : (str, int, tuple of ints) | |||
The engines to use in the map | |||
block : boolean | |||
Whether to block when the map is applied | |||
""" | |||
Brian E Granger
|
r1346 | self.multiengine = multiengine | |
self.dist = dist | |||
self.targets = targets | |||
self.block = block | |||
def map(self, func, *sequences): | |||
Brian E Granger
|
r1395 | """ | |
Apply func to *sequences elementwise. Like Python's builtin map. | |||
This version is not load balanced. | |||
""" | |||
max_len = max(len(s) for s in sequences) | |||
for s in sequences: | |||
if len(s)!=max_len: | |||
raise ValueError('all sequences must have equal length') | |||
Brian E Granger
|
r1346 | assert isinstance(func, (str, FunctionType)), "func must be a fuction or str" | |
Brian E Granger
|
r1395 | return self.multiengine.raw_map(func, sequences, dist=self.dist, | |
targets=self.targets, block=self.block) | |||
class TaskMapper(object): | |||
""" | |||
Make an `ITaskController` look like an `IMapper`. | |||
This class provides a load balanced version of `map`. | |||
""" | |||
def __init__(self, task_controller, clear_before=False, clear_after=False, retries=0, | |||
recovery_task=None, depend=None, block=True): | |||
""" | |||
Create a `IMapper` given a `TaskController` and arguments. | |||
The additional arguments are those that are common to all types of | |||
tasks and are described in the documentation for | |||
`IPython.kernel.task.BaseTask`. | |||
:Parameters: | |||
task_controller : an `IBlockingTaskClient` implementer | |||
The `TaskController` to use for calls to `map` | |||
""" | |||
self.task_controller = task_controller | |||
self.clear_before = clear_before | |||
self.clear_after = clear_after | |||
self.retries = retries | |||
self.recovery_task = recovery_task | |||
self.depend = depend | |||
self.block = block | |||
def map(self, func, *sequences): | |||
""" | |||
Apply func to *sequences elementwise. Like Python's builtin map. | |||
This version is load balanced. | |||
""" | |||
max_len = max(len(s) for s in sequences) | |||
for s in sequences: | |||
if len(s)!=max_len: | |||
raise ValueError('all sequences must have equal length') | |||
task_args = zip(*sequences) | |||
task_ids = [] | |||
dlist = [] | |||
for ta in task_args: | |||
task = MapTask(func, ta, clear_before=self.clear_before, | |||
clear_after=self.clear_after, retries=self.retries, | |||
recovery_task=self.recovery_task, depend=self.depend) | |||
dlist.append(self.task_controller.run(task)) | |||
dlist = gatherBoth(dlist, consumeErrors=1) | |||
dlist.addCallback(collect_exceptions,'map') | |||
if self.block: | |||
def get_results(task_ids): | |||
d = self.task_controller.barrier(task_ids) | |||
d.addCallback(lambda _: gatherBoth([self.task_controller.get_task_result(tid) for tid in task_ids], consumeErrors=1)) | |||
d.addCallback(collect_exceptions, 'map') | |||
return d | |||
dlist.addCallback(get_results) | |||
return dlist | |||
class SynchronousTaskMapper(object): | |||
""" | |||
Make an `IBlockingTaskClient` look like an `IMapper`. | |||
This class provides a load balanced version of `map`. | |||
""" | |||
def __init__(self, task_controller, clear_before=False, clear_after=False, retries=0, | |||
recovery_task=None, depend=None, block=True): | |||
""" | |||
Create a `IMapper` given a `IBlockingTaskClient` and arguments. | |||
The additional arguments are those that are common to all types of | |||
tasks and are described in the documentation for | |||
`IPython.kernel.task.BaseTask`. | |||
:Parameters: | |||
task_controller : an `IBlockingTaskClient` implementer | |||
The `TaskController` to use for calls to `map` | |||
""" | |||
self.task_controller = task_controller | |||
self.clear_before = clear_before | |||
self.clear_after = clear_after | |||
self.retries = retries | |||
self.recovery_task = recovery_task | |||
self.depend = depend | |||
self.block = block | |||
def map(self, func, *sequences): | |||
""" | |||
Apply func to *sequences elementwise. Like Python's builtin map. | |||
This version is load balanced. | |||
""" | |||
max_len = max(len(s) for s in sequences) | |||
for s in sequences: | |||
if len(s)!=max_len: | |||
raise ValueError('all sequences must have equal length') | |||
task_args = zip(*sequences) | |||
task_ids = [] | |||
for ta in task_args: | |||
task = MapTask(func, ta, clear_before=self.clear_before, | |||
clear_after=self.clear_after, retries=self.retries, | |||
recovery_task=self.recovery_task, depend=self.depend) | |||
task_ids.append(self.task_controller.run(task)) | |||
if self.block: | |||
self.task_controller.barrier(task_ids) | |||
task_results = [self.task_controller.get_task_result(tid) for tid in task_ids] | |||
return task_results | |||
else: | |||
return task_ids |