remotefunction.py
200 lines
| 6.6 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3664 | """Remote Functions and decorators for Views.""" | ||
MinRK
|
r3588 | #----------------------------------------------------------------------------- | ||
# Copyright (C) 2010 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3636 | import warnings | ||
Thomas Kluyver
|
r3886 | from IPython.testing.skipdoctest import skip_doctest | ||
MinRK
|
r3641 | |||
MinRK
|
r3642 | from . import map as Map | ||
from .asyncresult import AsyncMapResult | ||||
MinRK
|
r3588 | |||
#----------------------------------------------------------------------------- | ||||
# Decorators | ||||
#----------------------------------------------------------------------------- | ||||
Thomas Kluyver
|
r3886 | @skip_doctest | ||
MinRK
|
r3664 | def remote(view, block=None, **flags): | ||
MinRK
|
r3588 | """Turn a function into a remote function. | ||
This method can be used for map: | ||||
MinRK
|
r3664 | In [1]: @remote(view,block=True) | ||
MinRK
|
r3641 | ...: def func(a): | ||
...: pass | ||||
MinRK
|
r3588 | """ | ||
MinRK
|
r3641 | |||
MinRK
|
r3588 | def remote_function(f): | ||
MinRK
|
r3664 | return RemoteFunction(view, f, block=block, **flags) | ||
MinRK
|
r3588 | return remote_function | ||
Thomas Kluyver
|
r3886 | @skip_doctest | ||
MinRK
|
r3664 | def parallel(view, dist='b', block=None, **flags): | ||
MinRK
|
r3588 | """Turn a function into a parallel remote function. | ||
This method can be used for map: | ||||
MinRK
|
r3664 | In [1]: @parallel(view, block=True) | ||
MinRK
|
r3641 | ...: def func(a): | ||
...: pass | ||||
MinRK
|
r3588 | """ | ||
MinRK
|
r3641 | |||
MinRK
|
r3588 | def parallel_function(f): | ||
MinRK
|
r3664 | return ParallelFunction(view, f, dist=dist, block=block, **flags) | ||
MinRK
|
r3588 | return parallel_function | ||
#-------------------------------------------------------------------------- | ||||
# Classes | ||||
#-------------------------------------------------------------------------- | ||||
class RemoteFunction(object): | ||||
"""Turn an existing function into a remote function. | ||||
Parameters | ||||
---------- | ||||
MinRK
|
r3664 | view : View instance | ||
The view to be used for execution | ||||
MinRK
|
r3588 | f : callable | ||
The function to be wrapped into a remote function | ||||
block : bool [default: None] | ||||
Whether to wait for results or not. The default behavior is | ||||
MinRK
|
r3664 | to use the current `block` attribute of `view` | ||
**flags : remaining kwargs are passed to View.temp_flags | ||||
MinRK
|
r3588 | """ | ||
MinRK
|
r3664 | view = None # the remote connection | ||
MinRK
|
r3588 | func = None # the wrapped function | ||
block = None # whether to block | ||||
MinRK
|
r3664 | flags = None # dict of extra kwargs for temp_flags | ||
MinRK
|
r3588 | |||
MinRK
|
r3664 | def __init__(self, view, f, block=None, **flags): | ||
self.view = view | ||||
MinRK
|
r3588 | self.func = f | ||
self.block=block | ||||
MinRK
|
r3664 | self.flags=flags | ||
MinRK
|
r3588 | |||
def __call__(self, *args, **kwargs): | ||||
MinRK
|
r3664 | block = self.view.block if self.block is None else self.block | ||
with self.view.temp_flags(block=block, **self.flags): | ||||
return self.view.apply(self.func, *args, **kwargs) | ||||
MinRK
|
r3588 | |||
class ParallelFunction(RemoteFunction): | ||||
MinRK
|
r3644 | """Class for mapping a function to sequences. | ||
This will distribute the sequences according the a mapper, and call | ||||
the function on each sub-sequence. If called via map, then the function | ||||
will be called once on each element, rather that each sub-sequence. | ||||
Parameters | ||||
---------- | ||||
MinRK
|
r3664 | view : View instance | ||
The view to be used for execution | ||||
MinRK
|
r3644 | f : callable | ||
The function to be wrapped into a remote function | ||||
MinRK
|
r3664 | dist : str [default: 'b'] | ||
The key for which mapObject to use to distribute sequences | ||||
options are: | ||||
* 'b' : use contiguous chunks in order | ||||
* 'r' : use round-robin striping | ||||
MinRK
|
r3644 | block : bool [default: None] | ||
Whether to wait for results or not. The default behavior is | ||||
MinRK
|
r3664 | to use the current `block` attribute of `view` | ||
chunksize : int or None | ||||
MinRK
|
r3644 | The size of chunk to use when breaking up sequences in a load-balanced manner | ||
MinRK
|
r3664 | **flags : remaining kwargs are passed to View.temp_flags | ||
MinRK
|
r3644 | """ | ||
MinRK
|
r3664 | |||
chunksize=None | ||||
mapObject=None | ||||
def __init__(self, view, f, dist='b', block=None, chunksize=None, **flags): | ||||
super(ParallelFunction, self).__init__(view, f, block=block, **flags) | ||||
self.chunksize = chunksize | ||||
MinRK
|
r3636 | |||
MinRK
|
r3588 | mapClass = Map.dists[dist] | ||
self.mapObject = mapClass() | ||||
def __call__(self, *sequences): | ||||
MinRK
|
r3664 | # check that the length of sequences match | ||
MinRK
|
r3588 | len_0 = len(sequences[0]) | ||
for s in sequences: | ||||
if len(s)!=len_0: | ||||
MinRK
|
r3635 | msg = 'all sequences must have equal length, but %i!=%i'%(len_0,len(s)) | ||
raise ValueError(msg) | ||||
MinRK
|
r3664 | balanced = 'Balanced' in self.view.__class__.__name__ | ||
if balanced: | ||||
if self.chunksize: | ||||
nparts = len_0/self.chunksize + int(len_0%self.chunksize > 0) | ||||
MinRK
|
r3636 | else: | ||
nparts = len_0 | ||||
MinRK
|
r3664 | targets = [None]*nparts | ||
MinRK
|
r3588 | else: | ||
MinRK
|
r3664 | if self.chunksize: | ||
warnings.warn("`chunksize` is ignored unless load balancing", UserWarning) | ||||
MinRK
|
r3588 | # multiplexed: | ||
MinRK
|
r3664 | targets = self.view.targets | ||
MinRK
|
r3636 | nparts = len(targets) | ||
MinRK
|
r3588 | |||
msg_ids = [] | ||||
# my_f = lambda *a: map(self.func, *a) | ||||
MinRK
|
r3664 | client = self.view.client | ||
MinRK
|
r3635 | for index, t in enumerate(targets): | ||
MinRK
|
r3588 | args = [] | ||
for seq in sequences: | ||||
part = self.mapObject.getPartition(seq, index, nparts) | ||||
MinRK
|
r3639 | if len(part) == 0: | ||
MinRK
|
r3588 | continue | ||
else: | ||||
args.append(part) | ||||
if not args: | ||||
continue | ||||
# print (args) | ||||
if hasattr(self, '_map'): | ||||
f = map | ||||
args = [self.func]+args | ||||
else: | ||||
f=self.func | ||||
MinRK
|
r3664 | |||
view = self.view if balanced else client[t] | ||||
with view.temp_flags(block=False, **self.flags): | ||||
ar = view.apply(f, *args) | ||||
MinRK
|
r3635 | |||
msg_ids.append(ar.msg_ids[0]) | ||||
MinRK
|
r3588 | |||
MinRK
|
r3664 | r = AsyncMapResult(self.view.client, msg_ids, self.mapObject, fname=self.func.__name__) | ||
MinRK
|
r3588 | if self.block: | ||
MinRK
|
r3635 | try: | ||
return r.get() | ||||
except KeyboardInterrupt: | ||||
return r | ||||
MinRK
|
r3588 | else: | ||
return r | ||||
def map(self, *sequences): | ||||
MinRK
|
r3644 | """call a function on each element of a sequence remotely. | ||
This should behave very much like the builtin map, but return an AsyncMapResult | ||||
if self.block is False. | ||||
""" | ||||
# set _map as a flag for use inside self.__call__ | ||||
MinRK
|
r3588 | self._map = True | ||
MinRK
|
r3635 | try: | ||
ret = self.__call__(*sequences) | ||||
finally: | ||||
del self._map | ||||
MinRK
|
r3588 | return ret | ||
Thomas Kluyver
|
r3886 | __all__ = ['remote', 'parallel', 'RemoteFunction', 'ParallelFunction'] | ||