remotefunction.py
199 lines
| 6.6 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3664 | """Remote Functions and decorators for Views.""" | |
MinRK
|
r3588 | #----------------------------------------------------------------------------- | |
# Copyright (C) 2010 The IPython Development Team | |||
# | |||
# Distributed under the terms of the BSD License. The full license is in | |||
# the file COPYING, distributed as part of this software. | |||
#----------------------------------------------------------------------------- | |||
#----------------------------------------------------------------------------- | |||
# Imports | |||
#----------------------------------------------------------------------------- | |||
MinRK
|
r3636 | import warnings | |
MinRK
|
r3641 | from IPython.testing import decorators as testdec | |
MinRK
|
r3642 | from . import map as Map | |
from .asyncresult import AsyncMapResult | |||
MinRK
|
r3588 | ||
#----------------------------------------------------------------------------- | |||
# Decorators | |||
#----------------------------------------------------------------------------- | |||
MinRK
|
r3641 | @testdec.skip_doctest | |
MinRK
|
r3664 | def remote(view, block=None, **flags): | |
MinRK
|
r3588 | """Turn a function into a remote function. | |
This method can be used for map: | |||
MinRK
|
r3664 | In [1]: @remote(view,block=True) | |
MinRK
|
r3641 | ...: def func(a): | |
...: pass | |||
MinRK
|
r3588 | """ | |
MinRK
|
r3641 | ||
MinRK
|
r3588 | def remote_function(f): | |
MinRK
|
r3664 | return RemoteFunction(view, f, block=block, **flags) | |
MinRK
|
r3588 | return remote_function | |
MinRK
|
r3641 | @testdec.skip_doctest | |
MinRK
|
r3664 | def parallel(view, dist='b', block=None, **flags): | |
MinRK
|
r3588 | """Turn a function into a parallel remote function. | |
This method can be used for map: | |||
MinRK
|
r3664 | In [1]: @parallel(view, block=True) | |
MinRK
|
r3641 | ...: def func(a): | |
...: pass | |||
MinRK
|
r3588 | """ | |
MinRK
|
r3641 | ||
MinRK
|
r3588 | def parallel_function(f): | |
MinRK
|
r3664 | return ParallelFunction(view, f, dist=dist, block=block, **flags) | |
MinRK
|
r3588 | return parallel_function | |
#-------------------------------------------------------------------------- | |||
# Classes | |||
#-------------------------------------------------------------------------- | |||
class RemoteFunction(object): | |||
"""Turn an existing function into a remote function. | |||
Parameters | |||
---------- | |||
MinRK
|
r3664 | view : View instance | |
The view to be used for execution | |||
MinRK
|
r3588 | f : callable | |
The function to be wrapped into a remote function | |||
block : bool [default: None] | |||
Whether to wait for results or not. The default behavior is | |||
MinRK
|
r3664 | to use the current `block` attribute of `view` | |
**flags : remaining kwargs are passed to View.temp_flags | |||
MinRK
|
r3588 | """ | |
MinRK
|
r3664 | view = None # the remote connection | |
MinRK
|
r3588 | func = None # the wrapped function | |
block = None # whether to block | |||
MinRK
|
r3664 | flags = None # dict of extra kwargs for temp_flags | |
MinRK
|
r3588 | ||
MinRK
|
r3664 | def __init__(self, view, f, block=None, **flags): | |
self.view = view | |||
MinRK
|
r3588 | self.func = f | |
self.block=block | |||
MinRK
|
r3664 | self.flags=flags | |
MinRK
|
r3588 | ||
def __call__(self, *args, **kwargs): | |||
MinRK
|
r3664 | block = self.view.block if self.block is None else self.block | |
with self.view.temp_flags(block=block, **self.flags): | |||
return self.view.apply(self.func, *args, **kwargs) | |||
MinRK
|
r3588 | ||
class ParallelFunction(RemoteFunction): | |||
MinRK
|
r3644 | """Class for mapping a function to sequences. | |
This will distribute the sequences according the a mapper, and call | |||
the function on each sub-sequence. If called via map, then the function | |||
will be called once on each element, rather that each sub-sequence. | |||
Parameters | |||
---------- | |||
MinRK
|
r3664 | view : View instance | |
The view to be used for execution | |||
MinRK
|
r3644 | f : callable | |
The function to be wrapped into a remote function | |||
MinRK
|
r3664 | dist : str [default: 'b'] | |
The key for which mapObject to use to distribute sequences | |||
options are: | |||
* 'b' : use contiguous chunks in order | |||
* 'r' : use round-robin striping | |||
MinRK
|
r3644 | block : bool [default: None] | |
Whether to wait for results or not. The default behavior is | |||
MinRK
|
r3664 | to use the current `block` attribute of `view` | |
chunksize : int or None | |||
MinRK
|
r3644 | The size of chunk to use when breaking up sequences in a load-balanced manner | |
MinRK
|
r3664 | **flags : remaining kwargs are passed to View.temp_flags | |
MinRK
|
r3644 | """ | |
MinRK
|
r3664 | ||
chunksize=None | |||
mapObject=None | |||
def __init__(self, view, f, dist='b', block=None, chunksize=None, **flags): | |||
super(ParallelFunction, self).__init__(view, f, block=block, **flags) | |||
self.chunksize = chunksize | |||
MinRK
|
r3636 | ||
MinRK
|
r3588 | mapClass = Map.dists[dist] | |
self.mapObject = mapClass() | |||
def __call__(self, *sequences): | |||
MinRK
|
r3664 | # check that the length of sequences match | |
MinRK
|
r3588 | len_0 = len(sequences[0]) | |
for s in sequences: | |||
if len(s)!=len_0: | |||
MinRK
|
r3635 | msg = 'all sequences must have equal length, but %i!=%i'%(len_0,len(s)) | |
raise ValueError(msg) | |||
MinRK
|
r3664 | balanced = 'Balanced' in self.view.__class__.__name__ | |
if balanced: | |||
if self.chunksize: | |||
nparts = len_0/self.chunksize + int(len_0%self.chunksize > 0) | |||
MinRK
|
r3636 | else: | |
nparts = len_0 | |||
MinRK
|
r3664 | targets = [None]*nparts | |
MinRK
|
r3588 | else: | |
MinRK
|
r3664 | if self.chunksize: | |
warnings.warn("`chunksize` is ignored unless load balancing", UserWarning) | |||
MinRK
|
r3588 | # multiplexed: | |
MinRK
|
r3664 | targets = self.view.targets | |
MinRK
|
r3636 | nparts = len(targets) | |
MinRK
|
r3588 | ||
msg_ids = [] | |||
# my_f = lambda *a: map(self.func, *a) | |||
MinRK
|
r3664 | client = self.view.client | |
MinRK
|
r3635 | for index, t in enumerate(targets): | |
MinRK
|
r3588 | args = [] | |
for seq in sequences: | |||
part = self.mapObject.getPartition(seq, index, nparts) | |||
MinRK
|
r3639 | if len(part) == 0: | |
MinRK
|
r3588 | continue | |
else: | |||
args.append(part) | |||
if not args: | |||
continue | |||
# print (args) | |||
if hasattr(self, '_map'): | |||
f = map | |||
args = [self.func]+args | |||
else: | |||
f=self.func | |||
MinRK
|
r3664 | ||
view = self.view if balanced else client[t] | |||
with view.temp_flags(block=False, **self.flags): | |||
ar = view.apply(f, *args) | |||
MinRK
|
r3635 | ||
msg_ids.append(ar.msg_ids[0]) | |||
MinRK
|
r3588 | ||
MinRK
|
r3664 | r = AsyncMapResult(self.view.client, msg_ids, self.mapObject, fname=self.func.__name__) | |
MinRK
|
r3588 | if self.block: | |
MinRK
|
r3635 | try: | |
return r.get() | |||
except KeyboardInterrupt: | |||
return r | |||
MinRK
|
r3588 | else: | |
return r | |||
def map(self, *sequences): | |||
MinRK
|
r3644 | """call a function on each element of a sequence remotely. | |
This should behave very much like the builtin map, but return an AsyncMapResult | |||
if self.block is False. | |||
""" | |||
# set _map as a flag for use inside self.__call__ | |||
MinRK
|
r3588 | self._map = True | |
MinRK
|
r3635 | try: | |
ret = self.__call__(*sequences) | |||
finally: | |||
del self._map | |||
MinRK
|
r3588 | return ret | |
MinRK
|
r3644 | __all__ = ['remote', 'parallel', 'RemoteFunction', 'ParallelFunction'] |