"""Remote Functions and decorators for the client.""" #----------------------------------------------------------------------------- # Copyright (C) 2010 The IPython Development Team # # Distributed under the terms of the BSD License. The full license is in # the file COPYING, distributed as part of this software. #----------------------------------------------------------------------------- #----------------------------------------------------------------------------- # Imports #----------------------------------------------------------------------------- import warnings from IPython.testing import decorators as testdec from . import map as Map from .asyncresult import AsyncMapResult #----------------------------------------------------------------------------- # Decorators #----------------------------------------------------------------------------- @testdec.skip_doctest def remote(client, bound=True, block=None, targets=None, balanced=None): """Turn a function into a remote function. This method can be used for map: In [1]: @remote(client,block=True) ...: def func(a): ...: pass """ def remote_function(f): return RemoteFunction(client, f, bound, block, targets, balanced) return remote_function @testdec.skip_doctest def parallel(client, dist='b', bound=True, block=None, targets='all', balanced=None): """Turn a function into a parallel remote function. This method can be used for map: In [1]: @parallel(client,block=True) ...: def func(a): ...: pass """ def parallel_function(f): return ParallelFunction(client, f, dist, bound, block, targets, balanced) return parallel_function #-------------------------------------------------------------------------- # Classes #-------------------------------------------------------------------------- class RemoteFunction(object): """Turn an existing function into a remote function. Parameters ---------- client : Client instance The client to be used to connect to engines f : callable The function to be wrapped into a remote function bound : bool [default: False] Whether the affect the remote namespace when called block : bool [default: None] Whether to wait for results or not. The default behavior is to use the current `block` attribute of `client` targets : valid target list [default: all] The targets on which to execute. balanced : bool Whether to load-balance with the Task scheduler or not """ client = None # the remote connection func = None # the wrapped function block = None # whether to block bound = None # whether to affect the namespace targets = None # where to execute balanced = None # whether to load-balance def __init__(self, client, f, bound=False, block=None, targets=None, balanced=None): self.client = client self.func = f self.block=block self.bound=bound self.targets=targets if balanced is None: if targets is None: balanced = True else: balanced = False self.balanced = balanced def __call__(self, *args, **kwargs): return self.client.apply(self.func, args=args, kwargs=kwargs, block=self.block, targets=self.targets, bound=self.bound, balanced=self.balanced) class ParallelFunction(RemoteFunction): """Class for mapping a function to sequences. This will distribute the sequences according the a mapper, and call the function on each sub-sequence. If called via map, then the function will be called once on each element, rather that each sub-sequence. Parameters ---------- client : Client instance The client to be used to connect to engines f : callable The function to be wrapped into a remote function bound : bool [default: False] Whether the affect the remote namespace when called block : bool [default: None] Whether to wait for results or not. The default behavior is to use the current `block` attribute of `client` targets : valid target list [default: all] The targets on which to execute. balanced : bool Whether to load-balance with the Task scheduler or not chunk_size : int or None The size of chunk to use when breaking up sequences in a load-balanced manner """ def __init__(self, client, f, dist='b', bound=False, block=None, targets='all', balanced=None, chunk_size=None): super(ParallelFunction, self).__init__(client,f,bound,block,targets,balanced) self.chunk_size = chunk_size mapClass = Map.dists[dist] self.mapObject = mapClass() def __call__(self, *sequences): len_0 = len(sequences[0]) for s in sequences: if len(s)!=len_0: msg = 'all sequences must have equal length, but %i!=%i'%(len_0,len(s)) raise ValueError(msg) if self.balanced: if self.chunk_size: nparts = len_0/self.chunk_size + int(len_0%self.chunk_size > 0) else: nparts = len_0 targets = [self.targets]*nparts else: if self.chunk_size: warnings.warn("`chunk_size` is ignored when `balanced=False", UserWarning) # multiplexed: targets = self.client._build_targets(self.targets)[-1] nparts = len(targets) msg_ids = [] # my_f = lambda *a: map(self.func, *a) for index, t in enumerate(targets): args = [] for seq in sequences: part = self.mapObject.getPartition(seq, index, nparts) if len(part) == 0: continue else: args.append(part) if not args: continue # print (args) if hasattr(self, '_map'): f = map args = [self.func]+args else: f=self.func ar = self.client.apply(f, args=args, block=False, bound=self.bound, targets=t, balanced=self.balanced) msg_ids.append(ar.msg_ids[0]) r = AsyncMapResult(self.client, msg_ids, self.mapObject, fname=self.func.__name__) if self.block: try: return r.get() except KeyboardInterrupt: return r else: return r def map(self, *sequences): """call a function on each element of a sequence remotely. This should behave very much like the builtin map, but return an AsyncMapResult if self.block is False. """ # set _map as a flag for use inside self.__call__ self._map = True try: ret = self.__call__(*sequences) finally: del self._map return ret __all__ = ['remote', 'parallel', 'RemoteFunction', 'ParallelFunction']