diff --git a/IPython/zmq/parallel/client.py b/IPython/zmq/parallel/client.py index 2d40547..41acc15 100644 --- a/IPython/zmq/parallel/client.py +++ b/IPython/zmq/parallel/client.py @@ -29,6 +29,8 @@ from view import DirectView, LoadBalancedView from dependency import Dependency, depend, require import error import map as Map +from pendingresult import PendingResult,PendingMapResult +from remotefunction import remote,parallel,ParallelFunction,RemoteFunction #-------------------------------------------------------------------------- # helpers for implementing old MEC API via client.apply @@ -81,167 +83,6 @@ def defaultblock(f, self, *args, **kwargs): self.block = saveblock return ret -def remote(client, bound=False, block=None, targets=None): - """Turn a function into a remote function. - - This method can be used for map: - - >>> @remote(client,block=True) - def func(a) - """ - def remote_function(f): - return RemoteFunction(client, f, bound, block, targets) - return remote_function - -def parallel(client, dist='b', bound=False, block=None, targets='all'): - """Turn a function into a parallel remote function. - - This method can be used for map: - - >>> @parallel(client,block=True) - def func(a) - """ - def parallel_function(f): - return ParallelFunction(client, f, dist, bound, block, targets) - return parallel_function - -#-------------------------------------------------------------------------- -# Classes -#-------------------------------------------------------------------------- - -class RemoteFunction(object): - """Turn an existing function into a remote function. - - Parameters - ---------- - - client : Client instance - The client to be used to connect to engines - f : callable - The function to be wrapped into a remote function - bound : bool [default: False] - Whether the affect the remote namespace when called - block : bool [default: None] - Whether to wait for results or not. The default behavior is - to use the current `block` attribute of `client` - targets : valid target list [default: all] - The targets on which to execute. - """ - - client = None # the remote connection - func = None # the wrapped function - block = None # whether to block - bound = None # whether to affect the namespace - targets = None # where to execute - - def __init__(self, client, f, bound=False, block=None, targets=None): - self.client = client - self.func = f - self.block=block - self.bound=bound - self.targets=targets - - def __call__(self, *args, **kwargs): - return self.client.apply(self.func, args=args, kwargs=kwargs, - block=self.block, targets=self.targets, bound=self.bound) - - -class ParallelFunction(RemoteFunction): - """Class for mapping a function to sequences.""" - def __init__(self, client, f, dist='b', bound=False, block=None, targets='all'): - super(ParallelFunction, self).__init__(client,f,bound,block,targets) - mapClass = Map.dists[dist] - self.mapObject = mapClass() - - def __call__(self, *sequences): - len_0 = len(sequences[0]) - for s in sequences: - if len(s)!=len_0: - raise ValueError('all sequences must have equal length') - - if self.targets is None: - # load-balanced: - engines = [None]*len_0 - else: - # multiplexed: - engines = self.client._build_targets(self.targets)[-1] - - nparts = len(engines) - msg_ids = [] - for index, engineid in enumerate(engines): - args = [] - for seq in sequences: - args.append(self.mapObject.getPartition(seq, index, nparts)) - mid = self.client.apply(self.func, args=args, block=False, - bound=self.bound, - targets=engineid) - msg_ids.append(mid) - - if self.block: - dg = PendingMapResult(self.client, msg_ids, self.mapObject) - dg.wait() - return dg.result - else: - return dg - - -class PendingResult(object): - """Class for representing results of non-blocking calls.""" - def __init__(self, client, msg_ids): - self.client = client - self.msg_ids = msg_ids - self._result = None - self.done = False - - def __repr__(self): - if self.done: - return "<%s: finished>"%(self.__class__.__name__) - else: - return "<%s: %r>"%(self.__class__.__name__,self.msg_ids) - - @property - def result(self): - if self._result is not None: - return self._result - if not self.done: - self.wait(0) - if self.done: - results = map(self.client.results.get, self.msg_ids) - results = error.collect_exceptions(results, 'get_result') - self._result = self.reconstruct_result(results) - return self._result - else: - raise error.ResultNotCompleted - - def reconstruct_result(self, res): - """ - Override me in subclasses for turning a list of results - into the expected form. - """ - if len(res) == 1: - return res[0] - else: - return res - - def wait(self, timout=-1): - self.done = self.client.barrier(self.msg_ids) - return self.done - -class PendingMapResult(PendingResult): - """Class for representing results of non-blocking gathers. - - This will properly reconstruct the gather. - """ - - def __init__(self, client, msg_ids, mapObject): - self.mapObject = mapObject - PendingResult.__init__(self, client, msg_ids) - - def reconstruct_result(self, res): - """Perform the gather on the actual results.""" - return self.mapObject.joinPartitions(res) - - class AbortedTask(object): """A basic wrapper object describing an aborted task.""" @@ -944,10 +785,11 @@ class Client(object): result[target] = self.results[mid] return error.collect_exceptions(result, f.__name__) - @defaultblock - def map(self, f, sequences, targets=None, block=None, bound=False): - pf = ParallelFunction(self,f,block=block,bound=bound,targets=targets) - return pf(*sequences) + def map(self, f, *sequences): + """Parallel version of builtin `map`, using all our engines.""" + pf = ParallelFunction(self, f, block=self.block, + bound=True, targets='all') + return pf.map(*sequences) #-------------------------------------------------------------------------- # Data movement diff --git a/IPython/zmq/parallel/pendingresult.py b/IPython/zmq/parallel/pendingresult.py new file mode 100644 index 0000000..8f3dfd1 --- /dev/null +++ b/IPython/zmq/parallel/pendingresult.py @@ -0,0 +1,75 @@ +"""PendingResult objects for the client""" +#----------------------------------------------------------------------------- +# Copyright (C) 2010 The IPython Development Team +# +# Distributed under the terms of the BSD License. The full license is in +# the file COPYING, distributed as part of this software. +#----------------------------------------------------------------------------- + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +import error + +#----------------------------------------------------------------------------- +# Classes +#----------------------------------------------------------------------------- + +class PendingResult(object): + """Class for representing results of non-blocking calls.""" + def __init__(self, client, msg_ids): + self.client = client + self.msg_ids = msg_ids + self._result = None + self.done = False + + def __repr__(self): + if self.done: + return "<%s: finished>"%(self.__class__.__name__) + else: + return "<%s: %r>"%(self.__class__.__name__,self.msg_ids) + + @property + def result(self): + if self._result is not None: + return self._result + if not self.done: + self.wait(0) + if self.done: + results = map(self.client.results.get, self.msg_ids) + results = error.collect_exceptions(results, 'get_result') + self._result = self.reconstruct_result(results) + return self._result + else: + raise error.ResultNotCompleted + + def reconstruct_result(self, res): + """ + Override me in subclasses for turning a list of results + into the expected form. + """ + if len(res) == 1: + return res[0] + else: + return res + + def wait(self, timout=-1): + self.done = self.client.barrier(self.msg_ids) + return self.done + +class PendingMapResult(PendingResult): + """Class for representing results of non-blocking gathers. + + This will properly reconstruct the gather. + """ + + def __init__(self, client, msg_ids, mapObject): + self.mapObject = mapObject + PendingResult.__init__(self, client, msg_ids) + + def reconstruct_result(self, res): + """Perform the gather on the actual results.""" + return self.mapObject.joinPartitions(res) + + diff --git a/IPython/zmq/parallel/remotefunction.py b/IPython/zmq/parallel/remotefunction.py new file mode 100644 index 0000000..d085b58 --- /dev/null +++ b/IPython/zmq/parallel/remotefunction.py @@ -0,0 +1,145 @@ +"""Remote Functions and decorators for the client.""" +#----------------------------------------------------------------------------- +# Copyright (C) 2010 The IPython Development Team +# +# Distributed under the terms of the BSD License. The full license is in +# the file COPYING, distributed as part of this software. +#----------------------------------------------------------------------------- + +#----------------------------------------------------------------------------- +# Imports +#----------------------------------------------------------------------------- + +import map as Map +from pendingresult import PendingMapResult + +#----------------------------------------------------------------------------- +# Decorators +#----------------------------------------------------------------------------- + +def remote(client, bound=False, block=None, targets=None): + """Turn a function into a remote function. + + This method can be used for map: + + >>> @remote(client,block=True) + def func(a) + """ + def remote_function(f): + return RemoteFunction(client, f, bound, block, targets) + return remote_function + +def parallel(client, dist='b', bound=False, block=None, targets='all'): + """Turn a function into a parallel remote function. + + This method can be used for map: + + >>> @parallel(client,block=True) + def func(a) + """ + def parallel_function(f): + return ParallelFunction(client, f, dist, bound, block, targets) + return parallel_function + +#-------------------------------------------------------------------------- +# Classes +#-------------------------------------------------------------------------- + +class RemoteFunction(object): + """Turn an existing function into a remote function. + + Parameters + ---------- + + client : Client instance + The client to be used to connect to engines + f : callable + The function to be wrapped into a remote function + bound : bool [default: False] + Whether the affect the remote namespace when called + block : bool [default: None] + Whether to wait for results or not. The default behavior is + to use the current `block` attribute of `client` + targets : valid target list [default: all] + The targets on which to execute. + """ + + client = None # the remote connection + func = None # the wrapped function + block = None # whether to block + bound = None # whether to affect the namespace + targets = None # where to execute + + def __init__(self, client, f, bound=False, block=None, targets=None): + self.client = client + self.func = f + self.block=block + self.bound=bound + self.targets=targets + + def __call__(self, *args, **kwargs): + return self.client.apply(self.func, args=args, kwargs=kwargs, + block=self.block, targets=self.targets, bound=self.bound) + + +class ParallelFunction(RemoteFunction): + """Class for mapping a function to sequences.""" + def __init__(self, client, f, dist='b', bound=False, block=None, targets='all'): + super(ParallelFunction, self).__init__(client,f,bound,block,targets) + mapClass = Map.dists[dist] + self.mapObject = mapClass() + + def __call__(self, *sequences): + len_0 = len(sequences[0]) + for s in sequences: + if len(s)!=len_0: + raise ValueError('all sequences must have equal length') + + if self.targets is None: + # load-balanced: + engines = [None]*len_0 + elif isinstance(self.targets, int): + engines = [None]*self.targets + else: + # multiplexed: + engines = self.client._build_targets(self.targets)[-1] + + nparts = len(engines) + msg_ids = [] + # my_f = lambda *a: map(self.func, *a) + for index, engineid in enumerate(engines): + args = [] + for seq in sequences: + part = self.mapObject.getPartition(seq, index, nparts) + if not part: + continue + else: + args.append(part) + if not args: + continue + + # print (args) + if hasattr(self, '_map'): + f = map + args = [self.func]+args + else: + f=self.func + mid = self.client.apply(f, args=args, block=False, + bound=self.bound, + targets=engineid).msg_ids[0] + msg_ids.append(mid) + + r = PendingMapResult(self.client, msg_ids, self.mapObject) + if self.block: + r.wait() + return r.result + else: + return r + + def map(self, *sequences): + """call a function on each element of a sequence remotely.""" + self._map = True + ret = self.__call__(*sequences) + del self._map + return ret + diff --git a/IPython/zmq/parallel/streamkernel.py b/IPython/zmq/parallel/streamkernel.py index 8a75254..1024b66 100755 --- a/IPython/zmq/parallel/streamkernel.py +++ b/IPython/zmq/parallel/streamkernel.py @@ -277,7 +277,12 @@ class Kernel(HasTraits): suffix = prefix = "_" # prevent keyword collisions with lambda f,args,kwargs = unpack_apply_message(bufs, working, copy=False) # if f.fun - fname = prefix+f.func_name.strip('<>')+suffix + if hasattr(f, 'func_name'): + fname = f.func_name + else: + fname = f.__name__ + + fname = prefix+fname.strip('<>')+suffix argname = prefix+"args"+suffix kwargname = prefix+"kwargs"+suffix resultname = prefix+"result"+suffix diff --git a/IPython/zmq/parallel/view.py b/IPython/zmq/parallel/view.py index 382991c..c2c1f72 100644 --- a/IPython/zmq/parallel/view.py +++ b/IPython/zmq/parallel/view.py @@ -11,6 +11,7 @@ #----------------------------------------------------------------------------- from IPython.external.decorator import decorator +from IPython.zmq.parallel.remotefunction import ParallelFunction #----------------------------------------------------------------------------- # Decorators @@ -28,8 +29,10 @@ def myblock(f, self, *args, **kwargs): @decorator def save_ids(f, self, *args, **kwargs): """Keep our history and outstanding attributes up to date after a method call.""" + n_previous = len(self.client.history) ret = f(self, *args, **kwargs) - msg_ids = self.client.history[-self._ntargets:] + nmsgs = len(self.client.history) - n_previous + msg_ids = self.client.history[-nmsgs:] self.history.extend(msg_ids) map(self.outstanding.add, msg_ids) return ret @@ -172,6 +175,16 @@ class View(object): """ return self.client.apply(f, args, kwargs, block=True, targets=self.targets, bound=True) + @spin_after + @save_ids + def map(self, f, *sequences): + """Parallel version of builtin `map`, using this view's engines.""" + if isinstance(self.targets, int): + targets = [self.targets] + pf = ParallelFunction(self.client, f, block=self.block, + bound=True, targets=targets) + return pf.map(*sequences) + def abort(self, msg_ids=None, block=None): """Abort jobs on my engines.