remotefunction.py
202 lines
| 7.1 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3588 | """Remote Functions and decorators for the client.""" | |
#----------------------------------------------------------------------------- | |||
# Copyright (C) 2010 The IPython Development Team | |||
# | |||
# Distributed under the terms of the BSD License. The full license is in | |||
# the file COPYING, distributed as part of this software. | |||
#----------------------------------------------------------------------------- | |||
#----------------------------------------------------------------------------- | |||
# Imports | |||
#----------------------------------------------------------------------------- | |||
MinRK
|
r3636 | import warnings | |
MinRK
|
r3641 | from IPython.testing import decorators as testdec | |
MinRK
|
r3642 | from . import map as Map | |
from .asyncresult import AsyncMapResult | |||
MinRK
|
r3588 | ||
#----------------------------------------------------------------------------- | |||
# Decorators | |||
#----------------------------------------------------------------------------- | |||
MinRK
|
r3641 | @testdec.skip_doctest | |
MinRK
|
r3655 | def remote(client, bound=False, block=None, targets=None, balanced=None): | |
MinRK
|
r3588 | """Turn a function into a remote function. | |
This method can be used for map: | |||
MinRK
|
r3641 | In [1]: @remote(client,block=True) | |
...: def func(a): | |||
...: pass | |||
MinRK
|
r3588 | """ | |
MinRK
|
r3641 | ||
MinRK
|
r3588 | def remote_function(f): | |
MinRK
|
r3635 | return RemoteFunction(client, f, bound, block, targets, balanced) | |
MinRK
|
r3588 | return remote_function | |
MinRK
|
r3641 | @testdec.skip_doctest | |
MinRK
|
r3655 | def parallel(client, dist='b', bound=False, block=None, targets='all', balanced=None): | |
MinRK
|
r3588 | """Turn a function into a parallel remote function. | |
This method can be used for map: | |||
MinRK
|
r3641 | In [1]: @parallel(client,block=True) | |
...: def func(a): | |||
...: pass | |||
MinRK
|
r3588 | """ | |
MinRK
|
r3641 | ||
MinRK
|
r3588 | def parallel_function(f): | |
MinRK
|
r3635 | return ParallelFunction(client, f, dist, bound, block, targets, balanced) | |
MinRK
|
r3588 | return parallel_function | |
#-------------------------------------------------------------------------- | |||
# Classes | |||
#-------------------------------------------------------------------------- | |||
class RemoteFunction(object): | |||
"""Turn an existing function into a remote function. | |||
Parameters | |||
---------- | |||
client : Client instance | |||
The client to be used to connect to engines | |||
f : callable | |||
The function to be wrapped into a remote function | |||
bound : bool [default: False] | |||
Whether the affect the remote namespace when called | |||
block : bool [default: None] | |||
Whether to wait for results or not. The default behavior is | |||
to use the current `block` attribute of `client` | |||
targets : valid target list [default: all] | |||
The targets on which to execute. | |||
MinRK
|
r3635 | balanced : bool | |
Whether to load-balance with the Task scheduler or not | |||
MinRK
|
r3588 | """ | |
client = None # the remote connection | |||
func = None # the wrapped function | |||
block = None # whether to block | |||
bound = None # whether to affect the namespace | |||
targets = None # where to execute | |||
MinRK
|
r3635 | balanced = None # whether to load-balance | |
MinRK
|
r3588 | ||
MinRK
|
r3635 | def __init__(self, client, f, bound=False, block=None, targets=None, balanced=None): | |
MinRK
|
r3588 | self.client = client | |
self.func = f | |||
self.block=block | |||
self.bound=bound | |||
self.targets=targets | |||
MinRK
|
r3635 | if balanced is None: | |
if targets is None: | |||
balanced = True | |||
else: | |||
balanced = False | |||
self.balanced = balanced | |||
MinRK
|
r3588 | ||
def __call__(self, *args, **kwargs): | |||
return self.client.apply(self.func, args=args, kwargs=kwargs, | |||
MinRK
|
r3635 | block=self.block, targets=self.targets, bound=self.bound, balanced=self.balanced) | |
MinRK
|
r3588 | ||
class ParallelFunction(RemoteFunction): | |||
MinRK
|
r3644 | """Class for mapping a function to sequences. | |
This will distribute the sequences according the a mapper, and call | |||
the function on each sub-sequence. If called via map, then the function | |||
will be called once on each element, rather that each sub-sequence. | |||
Parameters | |||
---------- | |||
client : Client instance | |||
The client to be used to connect to engines | |||
f : callable | |||
The function to be wrapped into a remote function | |||
bound : bool [default: False] | |||
Whether the affect the remote namespace when called | |||
block : bool [default: None] | |||
Whether to wait for results or not. The default behavior is | |||
to use the current `block` attribute of `client` | |||
targets : valid target list [default: all] | |||
The targets on which to execute. | |||
balanced : bool | |||
Whether to load-balance with the Task scheduler or not | |||
chunk_size : int or None | |||
The size of chunk to use when breaking up sequences in a load-balanced manner | |||
""" | |||
MinRK
|
r3636 | def __init__(self, client, f, dist='b', bound=False, block=None, targets='all', balanced=None, chunk_size=None): | |
MinRK
|
r3635 | super(ParallelFunction, self).__init__(client,f,bound,block,targets,balanced) | |
MinRK
|
r3636 | self.chunk_size = chunk_size | |
MinRK
|
r3588 | mapClass = Map.dists[dist] | |
self.mapObject = mapClass() | |||
def __call__(self, *sequences): | |||
len_0 = len(sequences[0]) | |||
for s in sequences: | |||
if len(s)!=len_0: | |||
MinRK
|
r3635 | msg = 'all sequences must have equal length, but %i!=%i'%(len_0,len(s)) | |
raise ValueError(msg) | |||
MinRK
|
r3588 | ||
MinRK
|
r3635 | if self.balanced: | |
MinRK
|
r3636 | if self.chunk_size: | |
nparts = len_0/self.chunk_size + int(len_0%self.chunk_size > 0) | |||
else: | |||
nparts = len_0 | |||
targets = [self.targets]*nparts | |||
MinRK
|
r3588 | else: | |
MinRK
|
r3636 | if self.chunk_size: | |
warnings.warn("`chunk_size` is ignored when `balanced=False", UserWarning) | |||
MinRK
|
r3588 | # multiplexed: | |
MinRK
|
r3635 | targets = self.client._build_targets(self.targets)[-1] | |
MinRK
|
r3636 | nparts = len(targets) | |
MinRK
|
r3588 | ||
msg_ids = [] | |||
# my_f = lambda *a: map(self.func, *a) | |||
MinRK
|
r3635 | for index, t in enumerate(targets): | |
MinRK
|
r3588 | args = [] | |
for seq in sequences: | |||
part = self.mapObject.getPartition(seq, index, nparts) | |||
MinRK
|
r3639 | if len(part) == 0: | |
MinRK
|
r3588 | continue | |
else: | |||
args.append(part) | |||
if not args: | |||
continue | |||
# print (args) | |||
if hasattr(self, '_map'): | |||
f = map | |||
args = [self.func]+args | |||
else: | |||
f=self.func | |||
MinRK
|
r3635 | ar = self.client.apply(f, args=args, block=False, bound=self.bound, | |
MinRK
|
r3636 | targets=t, balanced=self.balanced) | |
MinRK
|
r3635 | ||
msg_ids.append(ar.msg_ids[0]) | |||
MinRK
|
r3588 | ||
MinRK
|
r3598 | r = AsyncMapResult(self.client, msg_ids, self.mapObject, fname=self.func.__name__) | |
MinRK
|
r3588 | if self.block: | |
MinRK
|
r3635 | try: | |
return r.get() | |||
except KeyboardInterrupt: | |||
return r | |||
MinRK
|
r3588 | else: | |
return r | |||
def map(self, *sequences): | |||
MinRK
|
r3644 | """call a function on each element of a sequence remotely. | |
This should behave very much like the builtin map, but return an AsyncMapResult | |||
if self.block is False. | |||
""" | |||
# set _map as a flag for use inside self.__call__ | |||
MinRK
|
r3588 | self._map = True | |
MinRK
|
r3635 | try: | |
ret = self.__call__(*sequences) | |||
finally: | |||
del self._map | |||
MinRK
|
r3588 | return ret | |
MinRK
|
r3644 | __all__ = ['remote', 'parallel', 'RemoteFunction', 'ParallelFunction'] |