"""Remote Functions and decorators for the client.""" #----------------------------------------------------------------------------- # Copyright (C) 2010 The IPython Development Team # # Distributed under the terms of the BSD License. The full license is in # the file COPYING, distributed as part of this software. #----------------------------------------------------------------------------- #----------------------------------------------------------------------------- # Imports #----------------------------------------------------------------------------- import map as Map from asyncresult import AsyncMapResult #----------------------------------------------------------------------------- # Decorators #----------------------------------------------------------------------------- def remote(client, bound=False, block=None, targets=None): """Turn a function into a remote function. This method can be used for map: >>> @remote(client,block=True) def func(a) """ def remote_function(f): return RemoteFunction(client, f, bound, block, targets) return remote_function def parallel(client, dist='b', bound=False, block=None, targets='all'): """Turn a function into a parallel remote function. This method can be used for map: >>> @parallel(client,block=True) def func(a) """ def parallel_function(f): return ParallelFunction(client, f, dist, bound, block, targets) return parallel_function #-------------------------------------------------------------------------- # Classes #-------------------------------------------------------------------------- class RemoteFunction(object): """Turn an existing function into a remote function. Parameters ---------- client : Client instance The client to be used to connect to engines f : callable The function to be wrapped into a remote function bound : bool [default: False] Whether the affect the remote namespace when called block : bool [default: None] Whether to wait for results or not. The default behavior is to use the current `block` attribute of `client` targets : valid target list [default: all] The targets on which to execute. """ client = None # the remote connection func = None # the wrapped function block = None # whether to block bound = None # whether to affect the namespace targets = None # where to execute def __init__(self, client, f, bound=False, block=None, targets=None): self.client = client self.func = f self.block=block self.bound=bound self.targets=targets def __call__(self, *args, **kwargs): return self.client.apply(self.func, args=args, kwargs=kwargs, block=self.block, targets=self.targets, bound=self.bound) class ParallelFunction(RemoteFunction): """Class for mapping a function to sequences.""" def __init__(self, client, f, dist='b', bound=False, block=None, targets='all'): super(ParallelFunction, self).__init__(client,f,bound,block,targets) mapClass = Map.dists[dist] self.mapObject = mapClass() def __call__(self, *sequences): len_0 = len(sequences[0]) for s in sequences: if len(s)!=len_0: raise ValueError('all sequences must have equal length') if self.targets is None: # load-balanced: engines = [None]*len_0 elif isinstance(self.targets, int): engines = [None]*self.targets else: # multiplexed: engines = self.client._build_targets(self.targets)[-1] nparts = len(engines) msg_ids = [] # my_f = lambda *a: map(self.func, *a) for index, engineid in enumerate(engines): args = [] for seq in sequences: part = self.mapObject.getPartition(seq, index, nparts) if not part: continue else: args.append(part) if not args: continue # print (args) if hasattr(self, '_map'): f = map args = [self.func]+args else: f=self.func mid = self.client.apply(f, args=args, block=False, bound=self.bound, targets=engineid)._msg_ids[0] msg_ids.append(mid) r = AsyncMapResult(self.client, msg_ids, self.mapObject) if self.block: r.wait() return r.result else: return r def map(self, *sequences): """call a function on each element of a sequence remotely.""" self._map = True ret = self.__call__(*sequences) del self._map return ret