remotefunction.py
145 lines
| 4.9 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3588 | """Remote Functions and decorators for the client.""" | ||
#----------------------------------------------------------------------------- | ||||
# Copyright (C) 2010 The IPython Development Team | ||||
# | ||||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
import map as Map | ||||
MinRK
|
r3589 | from asyncresult import AsyncMapResult | ||
MinRK
|
r3588 | |||
#----------------------------------------------------------------------------- | ||||
# Decorators | ||||
#----------------------------------------------------------------------------- | ||||
def remote(client, bound=False, block=None, targets=None): | ||||
"""Turn a function into a remote function. | ||||
This method can be used for map: | ||||
>>> @remote(client,block=True) | ||||
def func(a) | ||||
""" | ||||
def remote_function(f): | ||||
return RemoteFunction(client, f, bound, block, targets) | ||||
return remote_function | ||||
def parallel(client, dist='b', bound=False, block=None, targets='all'): | ||||
"""Turn a function into a parallel remote function. | ||||
This method can be used for map: | ||||
>>> @parallel(client,block=True) | ||||
def func(a) | ||||
""" | ||||
def parallel_function(f): | ||||
return ParallelFunction(client, f, dist, bound, block, targets) | ||||
return parallel_function | ||||
#-------------------------------------------------------------------------- | ||||
# Classes | ||||
#-------------------------------------------------------------------------- | ||||
class RemoteFunction(object): | ||||
"""Turn an existing function into a remote function. | ||||
Parameters | ||||
---------- | ||||
client : Client instance | ||||
The client to be used to connect to engines | ||||
f : callable | ||||
The function to be wrapped into a remote function | ||||
bound : bool [default: False] | ||||
Whether the affect the remote namespace when called | ||||
block : bool [default: None] | ||||
Whether to wait for results or not. The default behavior is | ||||
to use the current `block` attribute of `client` | ||||
targets : valid target list [default: all] | ||||
The targets on which to execute. | ||||
""" | ||||
client = None # the remote connection | ||||
func = None # the wrapped function | ||||
block = None # whether to block | ||||
bound = None # whether to affect the namespace | ||||
targets = None # where to execute | ||||
def __init__(self, client, f, bound=False, block=None, targets=None): | ||||
self.client = client | ||||
self.func = f | ||||
self.block=block | ||||
self.bound=bound | ||||
self.targets=targets | ||||
def __call__(self, *args, **kwargs): | ||||
return self.client.apply(self.func, args=args, kwargs=kwargs, | ||||
block=self.block, targets=self.targets, bound=self.bound) | ||||
class ParallelFunction(RemoteFunction): | ||||
"""Class for mapping a function to sequences.""" | ||||
def __init__(self, client, f, dist='b', bound=False, block=None, targets='all'): | ||||
super(ParallelFunction, self).__init__(client,f,bound,block,targets) | ||||
mapClass = Map.dists[dist] | ||||
self.mapObject = mapClass() | ||||
def __call__(self, *sequences): | ||||
len_0 = len(sequences[0]) | ||||
for s in sequences: | ||||
if len(s)!=len_0: | ||||
raise ValueError('all sequences must have equal length') | ||||
if self.targets is None: | ||||
# load-balanced: | ||||
engines = [None]*len_0 | ||||
elif isinstance(self.targets, int): | ||||
engines = [None]*self.targets | ||||
else: | ||||
# multiplexed: | ||||
engines = self.client._build_targets(self.targets)[-1] | ||||
nparts = len(engines) | ||||
msg_ids = [] | ||||
# my_f = lambda *a: map(self.func, *a) | ||||
for index, engineid in enumerate(engines): | ||||
args = [] | ||||
for seq in sequences: | ||||
part = self.mapObject.getPartition(seq, index, nparts) | ||||
if not part: | ||||
continue | ||||
else: | ||||
args.append(part) | ||||
if not args: | ||||
continue | ||||
# print (args) | ||||
if hasattr(self, '_map'): | ||||
f = map | ||||
args = [self.func]+args | ||||
else: | ||||
f=self.func | ||||
mid = self.client.apply(f, args=args, block=False, | ||||
bound=self.bound, | ||||
MinRK
|
r3589 | targets=engineid)._msg_ids[0] | ||
MinRK
|
r3588 | msg_ids.append(mid) | ||
MinRK
|
r3589 | r = AsyncMapResult(self.client, msg_ids, self.mapObject) | ||
MinRK
|
r3588 | if self.block: | ||
r.wait() | ||||
return r.result | ||||
else: | ||||
return r | ||||
def map(self, *sequences): | ||||
"""call a function on each element of a sequence remotely.""" | ||||
self._map = True | ||||
ret = self.__call__(*sequences) | ||||
del self._map | ||||
return ret | ||||