##// END OF EJS Templates
tweak dagdeps for new AsyncResult objects
tweak dagdeps for new AsyncResult objects

File last commit:

r3598:6f48516f
r3606:9f1a03ab
Show More
remotefunction.py
145 lines | 4.9 KiB | text/x-python | PythonLexer
"""Remote Functions and decorators for the client."""
#-----------------------------------------------------------------------------
# Copyright (C) 2010 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
import map as Map
from asyncresult import AsyncMapResult
#-----------------------------------------------------------------------------
# Decorators
#-----------------------------------------------------------------------------
def remote(client, bound=False, block=None, targets=None):
"""Turn a function into a remote function.
This method can be used for map:
>>> @remote(client,block=True)
def func(a)
"""
def remote_function(f):
return RemoteFunction(client, f, bound, block, targets)
return remote_function
def parallel(client, dist='b', bound=False, block=None, targets='all'):
"""Turn a function into a parallel remote function.
This method can be used for map:
>>> @parallel(client,block=True)
def func(a)
"""
def parallel_function(f):
return ParallelFunction(client, f, dist, bound, block, targets)
return parallel_function
#--------------------------------------------------------------------------
# Classes
#--------------------------------------------------------------------------
class RemoteFunction(object):
"""Turn an existing function into a remote function.
Parameters
----------
client : Client instance
The client to be used to connect to engines
f : callable
The function to be wrapped into a remote function
bound : bool [default: False]
Whether the affect the remote namespace when called
block : bool [default: None]
Whether to wait for results or not. The default behavior is
to use the current `block` attribute of `client`
targets : valid target list [default: all]
The targets on which to execute.
"""
client = None # the remote connection
func = None # the wrapped function
block = None # whether to block
bound = None # whether to affect the namespace
targets = None # where to execute
def __init__(self, client, f, bound=False, block=None, targets=None):
self.client = client
self.func = f
self.block=block
self.bound=bound
self.targets=targets
def __call__(self, *args, **kwargs):
return self.client.apply(self.func, args=args, kwargs=kwargs,
block=self.block, targets=self.targets, bound=self.bound)
class ParallelFunction(RemoteFunction):
"""Class for mapping a function to sequences."""
def __init__(self, client, f, dist='b', bound=False, block=None, targets='all'):
super(ParallelFunction, self).__init__(client,f,bound,block,targets)
mapClass = Map.dists[dist]
self.mapObject = mapClass()
def __call__(self, *sequences):
len_0 = len(sequences[0])
for s in sequences:
if len(s)!=len_0:
raise ValueError('all sequences must have equal length')
if self.targets is None:
# load-balanced:
engines = [None]*len_0
elif isinstance(self.targets, int):
engines = [None]*self.targets
else:
# multiplexed:
engines = self.client._build_targets(self.targets)[-1]
nparts = len(engines)
msg_ids = []
# my_f = lambda *a: map(self.func, *a)
for index, engineid in enumerate(engines):
args = []
for seq in sequences:
part = self.mapObject.getPartition(seq, index, nparts)
if not part:
continue
else:
args.append(part)
if not args:
continue
# print (args)
if hasattr(self, '_map'):
f = map
args = [self.func]+args
else:
f=self.func
mid = self.client.apply(f, args=args, block=False,
bound=self.bound,
targets=engineid).msg_ids[0]
msg_ids.append(mid)
r = AsyncMapResult(self.client, msg_ids, self.mapObject, fname=self.func.__name__)
if self.block:
r.wait()
return r.result
else:
return r
def map(self, *sequences):
"""call a function on each element of a sequence remotely."""
self._map = True
ret = self.__call__(*sequences)
del self._map
return ret