##// END OF EJS Templates
Merging vvatsa's ipcluster-dev branch....
Merging vvatsa's ipcluster-dev branch. This merge brings in a new ssh mode for ipcluster. Thanks to Vishal Vatsa for this.

File last commit:

r1395:1feaf0a3
r1833:e4b173fe merge
Show More
mapper.py
232 lines | 8.4 KiB | text/x-python | PythonLexer
Brian E Granger
A new version and API for map is now working. We also now have an...
r1346 # encoding: utf-8
"""A parallelized version of Python's builtin map."""
__docformat__ = "restructuredtext en"
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 #----------------------------------------------------------------------------
Brian E Granger
A new version and API for map is now working. We also now have an...
r1346 # Copyright (C) 2008 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 #----------------------------------------------------------------------------
Brian E Granger
A new version and API for map is now working. We also now have an...
r1346
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 #----------------------------------------------------------------------------
Brian E Granger
A new version and API for map is now working. We also now have an...
r1346 # Imports
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 #----------------------------------------------------------------------------
Brian E Granger
A new version and API for map is now working. We also now have an...
r1346
from types import FunctionType
from zope.interface import Interface, implements
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 from IPython.kernel.task import MapTask
from IPython.kernel.twistedutil import DeferredList, gatherBoth
from IPython.kernel.util import printer
from IPython.kernel.error import collect_exceptions
#----------------------------------------------------------------------------
# Code
#----------------------------------------------------------------------------
Brian E Granger
A new version and API for map is now working. We also now have an...
r1346
class IMapper(Interface):
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 """The basic interface for a Mapper.
This defines a generic interface for mapping. The idea of this is
similar to that of Python's builtin `map` function, which applies a function
elementwise to a sequence.
"""
def map(func, *seqs):
"""Do map in parallel.
Equivalent to map(func, *seqs) or:
[func(seqs[0][0], seqs[1][0],...), func(seqs[0][1], seqs[1][1],...),...]
:Parameters:
func : FunctionType
The function to apply to the sequence
sequences : tuple of iterables
A sequence of iterables that are used for sucessive function
arguments. This work just like map
"""
class IMultiEngineMapperFactory(Interface):
"""
An interface for something that creates `IMapper` instances.
"""
def mapper(dist='b', targets='all', block=True):
"""
Create an `IMapper` implementer with a given set of arguments.
The `IMapper` created using a multiengine controller is
not load balanced.
"""
class ITaskMapperFactory(Interface):
"""
An interface for something that creates `IMapper` instances.
"""
Brian E Granger
A new version and API for map is now working. We also now have an...
r1346
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 def mapper(clear_before=False, clear_after=False, retries=0,
recovery_task=None, depend=None, block=True):
"""
Create an `IMapper` implementer with a given set of arguments.
The `IMapper` created using a task controller is load balanced.
See the documentation for `IPython.kernel.task.BaseTask` for
documentation on the arguments to this method.
"""
Brian E Granger
A new version and API for map is now working. We also now have an...
r1346
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395
class MultiEngineMapper(object):
"""
A Mapper for `IMultiEngine` implementers.
"""
Brian E Granger
A new version and API for map is now working. We also now have an...
r1346
implements(IMapper)
def __init__(self, multiengine, dist='b', targets='all', block=True):
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 """
Create a Mapper for a multiengine.
The value of all arguments are used for all calls to `map`. This
class allows these arguemnts to be set for a series of map calls.
:Parameters:
multiengine : `IMultiEngine` implementer
The multiengine to use for running the map commands
dist : str
The type of decomposition to use. Only block ('b') is
supported currently
targets : (str, int, tuple of ints)
The engines to use in the map
block : boolean
Whether to block when the map is applied
"""
Brian E Granger
A new version and API for map is now working. We also now have an...
r1346 self.multiengine = multiengine
self.dist = dist
self.targets = targets
self.block = block
def map(self, func, *sequences):
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 """
Apply func to *sequences elementwise. Like Python's builtin map.
This version is not load balanced.
"""
max_len = max(len(s) for s in sequences)
for s in sequences:
if len(s)!=max_len:
raise ValueError('all sequences must have equal length')
Brian E Granger
A new version and API for map is now working. We also now have an...
r1346 assert isinstance(func, (str, FunctionType)), "func must be a fuction or str"
Brian E Granger
The refactoring of the Task system is nearly complete. Now there are...
r1395 return self.multiengine.raw_map(func, sequences, dist=self.dist,
targets=self.targets, block=self.block)
class TaskMapper(object):
"""
Make an `ITaskController` look like an `IMapper`.
This class provides a load balanced version of `map`.
"""
def __init__(self, task_controller, clear_before=False, clear_after=False, retries=0,
recovery_task=None, depend=None, block=True):
"""
Create a `IMapper` given a `TaskController` and arguments.
The additional arguments are those that are common to all types of
tasks and are described in the documentation for
`IPython.kernel.task.BaseTask`.
:Parameters:
task_controller : an `IBlockingTaskClient` implementer
The `TaskController` to use for calls to `map`
"""
self.task_controller = task_controller
self.clear_before = clear_before
self.clear_after = clear_after
self.retries = retries
self.recovery_task = recovery_task
self.depend = depend
self.block = block
def map(self, func, *sequences):
"""
Apply func to *sequences elementwise. Like Python's builtin map.
This version is load balanced.
"""
max_len = max(len(s) for s in sequences)
for s in sequences:
if len(s)!=max_len:
raise ValueError('all sequences must have equal length')
task_args = zip(*sequences)
task_ids = []
dlist = []
for ta in task_args:
task = MapTask(func, ta, clear_before=self.clear_before,
clear_after=self.clear_after, retries=self.retries,
recovery_task=self.recovery_task, depend=self.depend)
dlist.append(self.task_controller.run(task))
dlist = gatherBoth(dlist, consumeErrors=1)
dlist.addCallback(collect_exceptions,'map')
if self.block:
def get_results(task_ids):
d = self.task_controller.barrier(task_ids)
d.addCallback(lambda _: gatherBoth([self.task_controller.get_task_result(tid) for tid in task_ids], consumeErrors=1))
d.addCallback(collect_exceptions, 'map')
return d
dlist.addCallback(get_results)
return dlist
class SynchronousTaskMapper(object):
"""
Make an `IBlockingTaskClient` look like an `IMapper`.
This class provides a load balanced version of `map`.
"""
def __init__(self, task_controller, clear_before=False, clear_after=False, retries=0,
recovery_task=None, depend=None, block=True):
"""
Create a `IMapper` given a `IBlockingTaskClient` and arguments.
The additional arguments are those that are common to all types of
tasks and are described in the documentation for
`IPython.kernel.task.BaseTask`.
:Parameters:
task_controller : an `IBlockingTaskClient` implementer
The `TaskController` to use for calls to `map`
"""
self.task_controller = task_controller
self.clear_before = clear_before
self.clear_after = clear_after
self.retries = retries
self.recovery_task = recovery_task
self.depend = depend
self.block = block
def map(self, func, *sequences):
"""
Apply func to *sequences elementwise. Like Python's builtin map.
This version is load balanced.
"""
max_len = max(len(s) for s in sequences)
for s in sequences:
if len(s)!=max_len:
raise ValueError('all sequences must have equal length')
task_args = zip(*sequences)
task_ids = []
for ta in task_args:
task = MapTask(func, ta, clear_before=self.clear_before,
clear_after=self.clear_after, retries=self.retries,
recovery_task=self.recovery_task, depend=self.depend)
task_ids.append(self.task_controller.run(task))
if self.block:
self.task_controller.barrier(task_ids)
task_results = [self.task_controller.get_task_result(tid) for tid in task_ids]
return task_results
else:
return task_ids