|
|
# encoding: utf-8
|
|
|
|
|
|
"""A parallelized version of Python's builtin map."""
|
|
|
|
|
|
__docformat__ = "restructuredtext en"
|
|
|
|
|
|
#----------------------------------------------------------------------------
|
|
|
# Copyright (C) 2008 The IPython Development Team
|
|
|
#
|
|
|
# Distributed under the terms of the BSD License. The full license is in
|
|
|
# the file COPYING, distributed as part of this software.
|
|
|
#----------------------------------------------------------------------------
|
|
|
|
|
|
#----------------------------------------------------------------------------
|
|
|
# Imports
|
|
|
#----------------------------------------------------------------------------
|
|
|
|
|
|
from types import FunctionType
|
|
|
from zope.interface import Interface, implements
|
|
|
from IPython.kernel.task import MapTask
|
|
|
from IPython.kernel.twistedutil import gatherBoth
|
|
|
from IPython.kernel.error import collect_exceptions
|
|
|
|
|
|
#----------------------------------------------------------------------------
|
|
|
# Code
|
|
|
#----------------------------------------------------------------------------
|
|
|
|
|
|
class IMapper(Interface):
|
|
|
"""The basic interface for a Mapper.
|
|
|
|
|
|
This defines a generic interface for mapping. The idea of this is
|
|
|
similar to that of Python's builtin `map` function, which applies a function
|
|
|
elementwise to a sequence.
|
|
|
"""
|
|
|
|
|
|
def map(func, *seqs):
|
|
|
"""Do map in parallel.
|
|
|
|
|
|
Equivalent to map(func, *seqs) or:
|
|
|
|
|
|
[func(seqs[0][0], seqs[1][0],...), func(seqs[0][1], seqs[1][1],...),...]
|
|
|
|
|
|
:Parameters:
|
|
|
func : FunctionType
|
|
|
The function to apply to the sequence
|
|
|
sequences : tuple of iterables
|
|
|
A sequence of iterables that are used for sucessive function
|
|
|
arguments. This work just like map
|
|
|
"""
|
|
|
|
|
|
class IMultiEngineMapperFactory(Interface):
|
|
|
"""
|
|
|
An interface for something that creates `IMapper` instances.
|
|
|
"""
|
|
|
|
|
|
def mapper(dist='b', targets='all', block=True):
|
|
|
"""
|
|
|
Create an `IMapper` implementer with a given set of arguments.
|
|
|
|
|
|
The `IMapper` created using a multiengine controller is
|
|
|
not load balanced.
|
|
|
"""
|
|
|
|
|
|
class ITaskMapperFactory(Interface):
|
|
|
"""
|
|
|
An interface for something that creates `IMapper` instances.
|
|
|
"""
|
|
|
|
|
|
def mapper(clear_before=False, clear_after=False, retries=0,
|
|
|
recovery_task=None, depend=None, block=True):
|
|
|
"""
|
|
|
Create an `IMapper` implementer with a given set of arguments.
|
|
|
|
|
|
The `IMapper` created using a task controller is load balanced.
|
|
|
|
|
|
See the documentation for `IPython.kernel.task.BaseTask` for
|
|
|
documentation on the arguments to this method.
|
|
|
"""
|
|
|
|
|
|
|
|
|
class MultiEngineMapper(object):
|
|
|
"""
|
|
|
A Mapper for `IMultiEngine` implementers.
|
|
|
"""
|
|
|
|
|
|
implements(IMapper)
|
|
|
|
|
|
def __init__(self, multiengine, dist='b', targets='all', block=True):
|
|
|
"""
|
|
|
Create a Mapper for a multiengine.
|
|
|
|
|
|
The value of all arguments are used for all calls to `map`. This
|
|
|
class allows these arguemnts to be set for a series of map calls.
|
|
|
|
|
|
:Parameters:
|
|
|
multiengine : `IMultiEngine` implementer
|
|
|
The multiengine to use for running the map commands
|
|
|
dist : str
|
|
|
The type of decomposition to use. Only block ('b') is
|
|
|
supported currently
|
|
|
targets : (str, int, tuple of ints)
|
|
|
The engines to use in the map
|
|
|
block : boolean
|
|
|
Whether to block when the map is applied
|
|
|
"""
|
|
|
self.multiengine = multiengine
|
|
|
self.dist = dist
|
|
|
self.targets = targets
|
|
|
self.block = block
|
|
|
|
|
|
def map(self, func, *sequences):
|
|
|
"""
|
|
|
Apply func to *sequences elementwise. Like Python's builtin map.
|
|
|
|
|
|
This version is not load balanced.
|
|
|
"""
|
|
|
max_len = max(len(s) for s in sequences)
|
|
|
for s in sequences:
|
|
|
if len(s)!=max_len:
|
|
|
raise ValueError('all sequences must have equal length')
|
|
|
assert isinstance(func, (str, FunctionType)), "func must be a fuction or str"
|
|
|
return self.multiengine.raw_map(func, sequences, dist=self.dist,
|
|
|
targets=self.targets, block=self.block)
|
|
|
|
|
|
class TaskMapper(object):
|
|
|
"""
|
|
|
Make an `ITaskController` look like an `IMapper`.
|
|
|
|
|
|
This class provides a load balanced version of `map`.
|
|
|
"""
|
|
|
|
|
|
def __init__(self, task_controller, clear_before=False, clear_after=False, retries=0,
|
|
|
recovery_task=None, depend=None, block=True):
|
|
|
"""
|
|
|
Create a `IMapper` given a `TaskController` and arguments.
|
|
|
|
|
|
The additional arguments are those that are common to all types of
|
|
|
tasks and are described in the documentation for
|
|
|
`IPython.kernel.task.BaseTask`.
|
|
|
|
|
|
:Parameters:
|
|
|
task_controller : an `IBlockingTaskClient` implementer
|
|
|
The `TaskController` to use for calls to `map`
|
|
|
"""
|
|
|
self.task_controller = task_controller
|
|
|
self.clear_before = clear_before
|
|
|
self.clear_after = clear_after
|
|
|
self.retries = retries
|
|
|
self.recovery_task = recovery_task
|
|
|
self.depend = depend
|
|
|
self.block = block
|
|
|
|
|
|
def map(self, func, *sequences):
|
|
|
"""
|
|
|
Apply func to *sequences elementwise. Like Python's builtin map.
|
|
|
|
|
|
This version is load balanced.
|
|
|
"""
|
|
|
max_len = max(len(s) for s in sequences)
|
|
|
for s in sequences:
|
|
|
if len(s)!=max_len:
|
|
|
raise ValueError('all sequences must have equal length')
|
|
|
task_args = zip(*sequences)
|
|
|
task_ids = []
|
|
|
dlist = []
|
|
|
for ta in task_args:
|
|
|
task = MapTask(func, ta, clear_before=self.clear_before,
|
|
|
clear_after=self.clear_after, retries=self.retries,
|
|
|
recovery_task=self.recovery_task, depend=self.depend)
|
|
|
dlist.append(self.task_controller.run(task))
|
|
|
dlist = gatherBoth(dlist, consumeErrors=1)
|
|
|
dlist.addCallback(collect_exceptions,'map')
|
|
|
if self.block:
|
|
|
def get_results(task_ids):
|
|
|
d = self.task_controller.barrier(task_ids)
|
|
|
d.addCallback(lambda _: gatherBoth([self.task_controller.get_task_result(tid) for tid in task_ids], consumeErrors=1))
|
|
|
d.addCallback(collect_exceptions, 'map')
|
|
|
return d
|
|
|
dlist.addCallback(get_results)
|
|
|
return dlist
|
|
|
|
|
|
class SynchronousTaskMapper(object):
|
|
|
"""
|
|
|
Make an `IBlockingTaskClient` look like an `IMapper`.
|
|
|
|
|
|
This class provides a load balanced version of `map`.
|
|
|
"""
|
|
|
|
|
|
def __init__(self, task_controller, clear_before=False, clear_after=False, retries=0,
|
|
|
recovery_task=None, depend=None, block=True):
|
|
|
"""
|
|
|
Create a `IMapper` given a `IBlockingTaskClient` and arguments.
|
|
|
|
|
|
The additional arguments are those that are common to all types of
|
|
|
tasks and are described in the documentation for
|
|
|
`IPython.kernel.task.BaseTask`.
|
|
|
|
|
|
:Parameters:
|
|
|
task_controller : an `IBlockingTaskClient` implementer
|
|
|
The `TaskController` to use for calls to `map`
|
|
|
"""
|
|
|
self.task_controller = task_controller
|
|
|
self.clear_before = clear_before
|
|
|
self.clear_after = clear_after
|
|
|
self.retries = retries
|
|
|
self.recovery_task = recovery_task
|
|
|
self.depend = depend
|
|
|
self.block = block
|
|
|
|
|
|
def map(self, func, *sequences):
|
|
|
"""
|
|
|
Apply func to *sequences elementwise. Like Python's builtin map.
|
|
|
|
|
|
This version is load balanced.
|
|
|
"""
|
|
|
max_len = max(len(s) for s in sequences)
|
|
|
for s in sequences:
|
|
|
if len(s)!=max_len:
|
|
|
raise ValueError('all sequences must have equal length')
|
|
|
task_args = zip(*sequences)
|
|
|
task_ids = []
|
|
|
for ta in task_args:
|
|
|
task = MapTask(func, ta, clear_before=self.clear_before,
|
|
|
clear_after=self.clear_after, retries=self.retries,
|
|
|
recovery_task=self.recovery_task, depend=self.depend)
|
|
|
task_ids.append(self.task_controller.run(task))
|
|
|
if self.block:
|
|
|
self.task_controller.barrier(task_ids)
|
|
|
task_results = [self.task_controller.get_task_result(tid) for tid in task_ids]
|
|
|
return task_results
|
|
|
else:
|
|
|
return task_ids
|