remotefunction.py
174 lines
| 5.9 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3588 | """Remote Functions and decorators for the client.""" | ||
#----------------------------------------------------------------------------- | ||||
# Copyright (C) 2010 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3636 | import warnings | ||
MinRK
|
r3641 | from IPython.testing import decorators as testdec | ||
MinRK
|
r3642 | from . import map as Map | ||
from .asyncresult import AsyncMapResult | ||||
MinRK
|
r3588 | |||
#----------------------------------------------------------------------------- | ||||
# Decorators | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3641 | @testdec.skip_doctest | ||
MinRK
|
r3636 | def remote(client, bound=True, block=None, targets=None, balanced=None): | ||
MinRK
|
r3588 | """Turn a function into a remote function. | ||
This method can be used for map: | ||||
MinRK
|
r3641 | In [1]: @remote(client,block=True) | ||
...: def func(a): | ||||
...: pass | ||||
MinRK
|
r3588 | """ | ||
MinRK
|
r3641 | |||
MinRK
|
r3588 | def remote_function(f): | ||
MinRK
|
r3635 | return RemoteFunction(client, f, bound, block, targets, balanced) | ||
MinRK
|
r3588 | return remote_function | ||
MinRK
|
r3641 | @testdec.skip_doctest | ||
MinRK
|
r3636 | def parallel(client, dist='b', bound=True, block=None, targets='all', balanced=None): | ||
MinRK
|
r3588 | """Turn a function into a parallel remote function. | ||
This method can be used for map: | ||||
MinRK
|
r3641 | In [1]: @parallel(client,block=True) | ||
...: def func(a): | ||||
...: pass | ||||
MinRK
|
r3588 | """ | ||
MinRK
|
r3641 | |||
MinRK
|
r3588 | def parallel_function(f): | ||
MinRK
|
r3635 | return ParallelFunction(client, f, dist, bound, block, targets, balanced) | ||
MinRK
|
r3588 | return parallel_function | ||
#-------------------------------------------------------------------------- | ||||
# Classes | ||||
#-------------------------------------------------------------------------- | ||||
class RemoteFunction(object): | ||||
"""Turn an existing function into a remote function. | ||||
Parameters | ||||
---------- | ||||
client : Client instance | ||||
The client to be used to connect to engines | ||||
f : callable | ||||
The function to be wrapped into a remote function | ||||
bound : bool [default: False] | ||||
Whether the affect the remote namespace when called | ||||
block : bool [default: None] | ||||
Whether to wait for results or not. The default behavior is | ||||
to use the current `block` attribute of `client` | ||||
targets : valid target list [default: all] | ||||
The targets on which to execute. | ||||
MinRK
|
r3635 | balanced : bool | ||
Whether to load-balance with the Task scheduler or not | ||||
MinRK
|
r3588 | """ | ||
client = None # the remote connection | ||||
func = None # the wrapped function | ||||
block = None # whether to block | ||||
bound = None # whether to affect the namespace | ||||
targets = None # where to execute | ||||
MinRK
|
r3635 | balanced = None # whether to load-balance | ||
MinRK
|
r3588 | |||
MinRK
|
r3635 | def __init__(self, client, f, bound=False, block=None, targets=None, balanced=None): | ||
MinRK
|
r3588 | self.client = client | ||
self.func = f | ||||
self.block=block | ||||
self.bound=bound | ||||
self.targets=targets | ||||
MinRK
|
r3635 | if balanced is None: | ||
if targets is None: | ||||
balanced = True | ||||
else: | ||||
balanced = False | ||||
self.balanced = balanced | ||||
MinRK
|
r3588 | |||
def __call__(self, *args, **kwargs): | ||||
return self.client.apply(self.func, args=args, kwargs=kwargs, | ||||
MinRK
|
r3635 | block=self.block, targets=self.targets, bound=self.bound, balanced=self.balanced) | ||
MinRK
|
r3588 | |||
class ParallelFunction(RemoteFunction): | ||||
"""Class for mapping a function to sequences.""" | ||||
MinRK
|
r3636 | def __init__(self, client, f, dist='b', bound=False, block=None, targets='all', balanced=None, chunk_size=None): | ||
MinRK
|
r3635 | super(ParallelFunction, self).__init__(client,f,bound,block,targets,balanced) | ||
MinRK
|
r3636 | self.chunk_size = chunk_size | ||
MinRK
|
r3588 | mapClass = Map.dists[dist] | ||
self.mapObject = mapClass() | ||||
def __call__(self, *sequences): | ||||
len_0 = len(sequences[0]) | ||||
for s in sequences: | ||||
if len(s)!=len_0: | ||||
MinRK
|
r3635 | msg = 'all sequences must have equal length, but %i!=%i'%(len_0,len(s)) | ||
raise ValueError(msg) | ||||
MinRK
|
r3588 | |||
MinRK
|
r3635 | if self.balanced: | ||
MinRK
|
r3636 | if self.chunk_size: | ||
nparts = len_0/self.chunk_size + int(len_0%self.chunk_size > 0) | ||||
else: | ||||
nparts = len_0 | ||||
targets = [self.targets]*nparts | ||||
MinRK
|
r3588 | else: | ||
MinRK
|
r3636 | if self.chunk_size: | ||
warnings.warn("`chunk_size` is ignored when `balanced=False", UserWarning) | ||||
MinRK
|
r3588 | # multiplexed: | ||
MinRK
|
r3635 | targets = self.client._build_targets(self.targets)[-1] | ||
MinRK
|
r3636 | nparts = len(targets) | ||
MinRK
|
r3588 | |||
msg_ids = [] | ||||
# my_f = lambda *a: map(self.func, *a) | ||||
MinRK
|
r3635 | for index, t in enumerate(targets): | ||
MinRK
|
r3588 | args = [] | ||
for seq in sequences: | ||||
part = self.mapObject.getPartition(seq, index, nparts) | ||||
MinRK
|
r3639 | if len(part) == 0: | ||
MinRK
|
r3588 | continue | ||
else: | ||||
args.append(part) | ||||
if not args: | ||||
continue | ||||
# print (args) | ||||
if hasattr(self, '_map'): | ||||
f = map | ||||
args = [self.func]+args | ||||
else: | ||||
f=self.func | ||||
MinRK
|
r3635 | ar = self.client.apply(f, args=args, block=False, bound=self.bound, | ||
MinRK
|
r3636 | targets=t, balanced=self.balanced) | ||
MinRK
|
r3635 | |||
msg_ids.append(ar.msg_ids[0]) | ||||
MinRK
|
r3588 | |||
MinRK
|
r3598 | r = AsyncMapResult(self.client, msg_ids, self.mapObject, fname=self.func.__name__) | ||
MinRK
|
r3588 | if self.block: | ||
MinRK
|
r3635 | try: | ||
return r.get() | ||||
except KeyboardInterrupt: | ||||
return r | ||||
MinRK
|
r3588 | else: | ||
return r | ||||
def map(self, *sequences): | ||||
"""call a function on each element of a sequence remotely.""" | ||||
self._map = True | ||||
MinRK
|
r3635 | try: | ||
ret = self.__call__(*sequences) | ||||
finally: | ||||
del self._map | ||||
MinRK
|
r3588 | return ret | ||