From 7724ff799f6b5c5c016076855b586dcbbec86f8d 2011-04-08 00:38:21 From: MinRK Date: 2011-04-08 00:38:21 Subject: [PATCH] split get_results into get_result/result_status, add AsyncHubResult --- diff --git a/IPython/zmq/parallel/asyncresult.py b/IPython/zmq/parallel/asyncresult.py index d910004..0cfb16c 100644 --- a/IPython/zmq/parallel/asyncresult.py +++ b/IPython/zmq/parallel/asyncresult.py @@ -10,6 +10,8 @@ # Imports #----------------------------------------------------------------------------- +import time + from IPython.external.decorator import decorator import error @@ -189,6 +191,23 @@ class AsyncResult(object): raise AttributeError("%r object has no attribute %r"%( self.__class__.__name__, key)) return self.__getitem__(key) + + # asynchronous iterator: + def __iter__(self): + if self._single_result: + raise TypeError("AsyncResults with a single result are not iterable.") + try: + rlist = self.get(0) + except error.TimeoutError: + # wait for each result individually + for msg_id in self.msg_ids: + ar = AsyncResult(self._client, msg_id, self._fname) + yield ar.get() + else: + # already done + for r in rlist: + yield r + class AsyncMapResult(AsyncResult): @@ -227,6 +246,49 @@ class AsyncMapResult(AsyncResult): # already done for r in rlist: yield r + + +class AsyncHubResult(AsyncResult): + """Class to wrap pending results that must be requested from the Hub""" + def wait(self, timeout=-1): + """wait for result to complete.""" + start = time.time() + if self._ready: + return + local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids) + local_ready = self._client.barrier(local_ids, timeout) + if local_ready: + remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids) + if not remote_ids: + self._ready = True + else: + rdict = self._client.result_status(remote_ids, status_only=False) + pending = rdict['pending'] + while pending and time.time() < start+timeout: + rdict = self._client.result_status(remote_ids, status_only=False) + pending = rdict['pending'] + if pending: + time.sleep(0.1) + if not pending: + self._ready = True + if self._ready: + try: + results = map(self._client.results.get, self.msg_ids) + self._result = results + if self._single_result: + r = results[0] + if isinstance(r, Exception): + raise r + else: + results = error.collect_exceptions(results, self._fname) + self._result = self._reconstruct_result(results) + except Exception, e: + self._exception = e + self._success = False + else: + self._success = True + finally: + self._metadata = map(self._client.metadata.get, self.msg_ids) -__all__ = ['AsyncResult', 'AsyncMapResult'] \ No newline at end of file +__all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult'] \ No newline at end of file diff --git a/IPython/zmq/parallel/client.py b/IPython/zmq/parallel/client.py index 81b57a0..78734b9 100644 --- a/IPython/zmq/parallel/client.py +++ b/IPython/zmq/parallel/client.py @@ -32,7 +32,7 @@ from IPython.external.ssh import tunnel import error import map as Map import streamsession as ss -from asyncresult import AsyncResult, AsyncMapResult +from asyncresult import AsyncResult, AsyncMapResult, AsyncHubResult from clusterdir import ClusterDir, ClusterDirError from dependency import Dependency, depend, require, dependent from remotefunction import remote,parallel,ParallelFunction,RemoteFunction @@ -485,6 +485,15 @@ class Client(HasTraits): # handlers and callbacks for incoming messages #-------------------------------------------------------------------------- + def _unwrap_exception(self, content): + """unwrap exception, and remap engineid to int.""" + e = ss.unwrap_exception(content) + if e.engine_info: + e_uuid = e.engine_info['engineid'] + eid = self._engines[e_uuid] + e.engine_info['engineid'] = eid + return e + def _register_engine(self, msg): """Register a new engine, and update our connection info.""" content = msg['content'] @@ -537,7 +546,7 @@ class Client(HasTraits): print ("got unknown result: %s"%msg_id) else: self.outstanding.remove(msg_id) - self.results[msg_id] = ss.unwrap_exception(msg['content']) + self.results[msg_id] = self._unwrap_exception(msg['content']) def _handle_apply_reply(self, msg): """Save the reply to an apply_request into our results.""" @@ -569,12 +578,7 @@ class Client(HasTraits): # TODO: handle resubmission pass else: - e = ss.unwrap_exception(content) - if e.engine_info: - e_uuid = e.engine_info['engineid'] - eid = self._engines[e_uuid] - e.engine_info['engineid'] = eid - self.results[msg_id] = e + self.results[msg_id] = self._unwrap_exception(content) def _flush_notifications(self): """Flush notifications of engine registrations waiting @@ -641,7 +645,7 @@ class Client(HasTraits): s = md[name] or '' md[name] = s + content['data'] elif msg_type == 'pyerr': - md.update({'pyerr' : ss.unwrap_exception(content)}) + md.update({'pyerr' : self._unwrap_exception(content)}) else: md.update({msg_type : content['data']}) @@ -685,13 +689,13 @@ class Client(HasTraits): if self._iopub_socket: self._flush_iopub(self._iopub_socket) - def barrier(self, msg_ids=None, timeout=-1): - """waits on one or more `msg_ids`, for up to `timeout` seconds. + def barrier(self, jobs=None, timeout=-1): + """waits on one or more `jobs`, for up to `timeout` seconds. Parameters ---------- - msg_ids : int, str, or list of ints and/or strs, or one or more AsyncResult objects + jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects ints are indices to self.history strs are msg_ids default: wait on all outstanding messages @@ -706,19 +710,20 @@ class Client(HasTraits): False : timeout reached, some msg_ids still outstanding """ tic = time.time() - if msg_ids is None: + if jobs is None: theids = self.outstanding else: - if isinstance(msg_ids, (int, str, AsyncResult)): - msg_ids = [msg_ids] + if isinstance(jobs, (int, str, AsyncResult)): + jobs = [jobs] theids = set() - for msg_id in msg_ids: - if isinstance(msg_id, int): - msg_id = self.history[msg_id] - elif isinstance(msg_id, AsyncResult): - map(theids.add, msg_id.msg_ids) + for job in jobs: + if isinstance(job, int): + # index access + job = self.history[job] + elif isinstance(job, AsyncResult): + map(theids.add, job.msg_ids) continue - theids.add(msg_id) + theids.add(job) if not theids.intersection(self.outstanding): return True self.spin() @@ -747,18 +752,39 @@ class Client(HasTraits): if self.debug: pprint(msg) if msg['content']['status'] != 'ok': - error = ss.unwrap_exception(msg['content']) + error = self._unwrap_exception(msg['content']) if error: return error @spinfirst @defaultblock - def abort(self, msg_ids = None, targets=None, block=None): - """Abort the execution queues of target(s).""" + def abort(self, jobs=None, targets=None, block=None): + """Abort specific jobs from the execution queues of target(s). + + This is a mechanism to prevent jobs that have already been submitted + from executing. + + Parameters + ---------- + + jobs : msg_id, list of msg_ids, or AsyncResult + The jobs to be aborted + + + """ targets = self._build_targets(targets)[0] - if isinstance(msg_ids, basestring): - msg_ids = [msg_ids] + msg_ids = [] + if isinstance(jobs, (basestring,AsyncResult)): + jobs = [jobs] + bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs) + if bad_ids: + raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0]) + for j in jobs: + if isinstance(j, AsyncResult): + msg_ids.extend(j.msg_ids) + else: + msg_ids.append(j) content = dict(msg_ids=msg_ids) for t in targets: self.session.send(self._control_socket, 'abort_request', @@ -770,7 +796,7 @@ class Client(HasTraits): if self.debug: pprint(msg) if msg['content']['status'] != 'ok': - error = ss.unwrap_exception(msg['content']) + error = self._unwrap_exception(msg['content']) if error: return error @@ -791,7 +817,7 @@ class Client(HasTraits): if self.debug: pprint(msg) if msg['content']['status'] != 'ok': - error = ss.unwrap_exception(msg['content']) + error = self._unwrap_exception(msg['content']) if controller: time.sleep(0.25) @@ -800,7 +826,7 @@ class Client(HasTraits): if self.debug: pprint(msg) if msg['content']['status'] != 'ok': - error = ss.unwrap_exception(msg['content']) + error = self._unwrap_exception(msg['content']) if error: raise error @@ -827,8 +853,9 @@ class Client(HasTraits): whether or not to wait until done to return default: self.block """ - result = self.apply(_execute, (code,), targets=targets, block=self.block, bound=True, balanced=False) - return result + result = self.apply(_execute, (code,), targets=targets, block=block, bound=True, balanced=False) + if not block: + return result def run(self, filename, targets='all', block=None): """Execute contents of `filename` on engine(s). @@ -1134,6 +1161,8 @@ class Client(HasTraits): targets = slice(None) if isinstance(targets, int): + if targets < 0: + targets = self.ids[targets] if targets not in self.ids: raise IndexError("No such engine: %i"%targets) return self._cache_view(targets, balanced) @@ -1159,7 +1188,8 @@ class Client(HasTraits): if not isinstance(ns, dict): raise TypeError("Must be a dict, not %s"%type(ns)) result = self.apply(_push, (ns,), targets=targets, block=block, bound=True, balanced=False) - return result + if not block: + return result @defaultblock def pull(self, keys, targets='all', block=None): @@ -1191,7 +1221,7 @@ class Client(HasTraits): msg_ids.extend(r.msg_ids) r = AsyncResult(self, msg_ids, fname='scatter') if block: - return r.get() + r.get() else: return r @@ -1218,33 +1248,104 @@ class Client(HasTraits): #-------------------------------------------------------------------------- @spinfirst - def get_results(self, msg_ids, status_only=False): - """Returns the result of the execute or task request with `msg_ids`. + @defaultblock + def get_result(self, indices_or_msg_ids=None, block=None): + """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object. + + If the client already has the results, no request to the Hub will be made. + + This is a convenient way to construct AsyncResult objects, which are wrappers + that include metadata about execution, and allow for awaiting results that + were not submitted by this Client. + + It can also be a convenient way to retrieve the metadata associated with + blocking execution, since it always retrieves + + Examples + -------- + :: + + In [10]: r = client.apply() Parameters ---------- - msg_ids : list of ints or msg_ids + indices_or_msg_ids : integer history index, str msg_id, or list of either + The indices or msg_ids of indices to be retrieved + + block : bool + Whether to wait for the result to be done + + Returns + ------- + + AsyncResult + A single AsyncResult object will always be returned. + + AsyncHubResult + A subclass of AsyncResult that retrieves results from the Hub + + """ + if indices_or_msg_ids is None: + indices_or_msg_ids = -1 + + if not isinstance(indices_or_msg_ids, (list,tuple)): + indices_or_msg_ids = [indices_or_msg_ids] + + theids = [] + for id in indices_or_msg_ids: + if isinstance(id, int): + id = self.history[id] + if not isinstance(id, str): + raise TypeError("indices must be str or int, not %r"%id) + theids.append(id) + + local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids) + remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids) + + if remote_ids: + ar = AsyncHubResult(self, msg_ids=theids) + else: + ar = AsyncResult(self, msg_ids=theids) + + if block: + ar.wait() + + return ar + + @spinfirst + def result_status(self, msg_ids, status_only=True): + """Check on the status of the result(s) of the apply request with `msg_ids`. + + If status_only is False, then the actual results will be retrieved, else + only the status of the results will be checked. + + Parameters + ---------- + + msg_ids : list of msg_ids if int: Passed as index to self.history for convenience. - status_only : bool (default: False) + status_only : bool (default: True) if False: - return the actual results + Retrieve the actual results of completed tasks. Returns ------- results : dict There will always be the keys 'pending' and 'completed', which will - be lists of msg_ids. + be lists of msg_ids that are incomplete or complete. If `status_only` + is False, then completed results will be keyed by their `msg_id`. """ - if not isinstance(msg_ids, (list,tuple)): - msg_ids = [msg_ids] + if not isinstance(indices_or_msg_ids, (list,tuple)): + indices_or_msg_ids = [indices_or_msg_ids] + theids = [] - for msg_id in msg_ids: + for msg_id in indices_or_msg_ids: if isinstance(msg_id, int): msg_id = self.history[msg_id] - if not isinstance(msg_id, str): + if not isinstance(msg_id, basestring): raise TypeError("msg_ids must be str, not %r"%msg_id) theids.append(msg_id) @@ -1252,7 +1353,7 @@ class Client(HasTraits): local_results = {} # comment this block out to temporarily disable local shortcut: - for msg_id in list(theids): + for msg_id in theids: if msg_id in self.results: completed.append(msg_id) local_results[msg_id] = self.results[msg_id] @@ -1267,7 +1368,7 @@ class Client(HasTraits): pprint(msg) content = msg['content'] if content['status'] != 'ok': - raise ss.unwrap_exception(content) + raise self._unwrap_exception(content) buffers = msg['buffers'] else: content = dict(completed=[],pending=[]) @@ -1298,13 +1399,17 @@ class Client(HasTraits): if rcontent['status'] == 'ok': res,buffers = ss.unserialize_object(buffers) else: - res = ss.unwrap_exception(rcontent) + print rcontent + res = self._unwrap_exception(rcontent) failures.append(res) self.results[msg_id] = res content[msg_id] = res - error.collect_exceptions(failures, "get_results") + if len(theids) == 1 and failures: + raise failures[0] + + error.collect_exceptions(failures, "result_status") return content @spinfirst @@ -1329,11 +1434,11 @@ class Client(HasTraits): content = msg['content'] status = content.pop('status') if status != 'ok': - raise ss.unwrap_exception(content) + raise self._unwrap_exception(content) return ss.rekey(content) @spinfirst - def purge_results(self, msg_ids=[], targets=[]): + def purge_results(self, jobs=[], targets=[]): """Tell the controller to forget results. Individual results can be purged by msg_id, or the entire @@ -1342,7 +1447,7 @@ class Client(HasTraits): Parameters ---------- - msg_ids : str or list of strs + jobs : str or list of strs or AsyncResult objects the msg_ids whose results should be forgotten. targets : int/str/list of ints/strs The targets, by uuid or int_id, whose entire history is to be purged. @@ -1350,10 +1455,24 @@ class Client(HasTraits): default : None """ - if not targets and not msg_ids: - raise ValueError + if not targets and not jobs: + raise ValueError("Must specify at least one of `targets` and `jobs`") if targets: targets = self._build_targets(targets)[1] + + # construct msg_ids from jobs + msg_ids = [] + if isinstance(jobs, (basestring,AsyncResult)): + jobs = [jobs] + bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs) + if bad_ids: + raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0]) + for j in jobs: + if isinstance(j, AsyncResult): + msg_ids.extend(j.msg_ids) + else: + msg_ids.append(j) + content = dict(targets=targets, msg_ids=msg_ids) self.session.send(self._query_socket, "purge_request", content=content) idents, msg = self.session.recv(self._query_socket, 0) @@ -1361,7 +1480,7 @@ class Client(HasTraits): pprint(msg) content = msg['content'] if content['status'] != 'ok': - raise ss.unwrap_exception(content) + raise self._unwrap_exception(content) __all__ = [ 'Client', diff --git a/IPython/zmq/parallel/remotefunction.py b/IPython/zmq/parallel/remotefunction.py index fa7283c..8169bcb 100644 --- a/IPython/zmq/parallel/remotefunction.py +++ b/IPython/zmq/parallel/remotefunction.py @@ -128,7 +128,7 @@ class ParallelFunction(RemoteFunction): args = [] for seq in sequences: part = self.mapObject.getPartition(seq, index, nparts) - if not part: + if len(part) == 0: continue else: args.append(part) diff --git a/IPython/zmq/parallel/view.py b/IPython/zmq/parallel/view.py index 1fc3f3f..9be98d2 100644 --- a/IPython/zmq/parallel/view.py +++ b/IPython/zmq/parallel/view.py @@ -15,7 +15,7 @@ from IPython.utils.traitlets import HasTraits, Bool, List, Dict, Set, Int, Insta from IPython.external.decorator import decorator from IPython.zmq.parallel.asyncresult import AsyncResult from IPython.zmq.parallel.dependency import Dependency -from IPython.zmq.parallel.remotefunction import ParallelFunction, parallel +from IPython.zmq.parallel.remotefunction import ParallelFunction, parallel, remote #----------------------------------------------------------------------------- # Decorators @@ -91,6 +91,8 @@ class View(HasTraits): for name in self._default_names: setattr(self, name, getattr(self, name, None)) + assert not self.__class__ is View, "Don't use base View objects, use subclasses" + def __repr__(self): strtargets = str(self._targets) @@ -106,9 +108,17 @@ class View(HasTraits): def targets(self, value): raise AttributeError("Cannot set View `targets` after construction!") + @property + def balanced(self): + return self._balanced + + @balanced.setter + def balanced(self, value): + raise AttributeError("Cannot set View `balanced` after construction!") + def _defaults(self, *excludes): """return dict of our default attributes, excluding names given.""" - d = dict(balanced=self._balanced, targets=self.targets) + d = dict(balanced=self._balanced, targets=self._targets) for name in self._default_names: if name not in excludes: d[name] = getattr(self, name) @@ -182,22 +192,22 @@ class View(HasTraits): d = self._defaults('block', 'bound') return self.client.apply(f,args,kwargs, block=True, bound=False, **d) - @sync_results - @save_ids - def apply_bound(self, f, *args, **kwargs): - """calls f(*args, **kwargs) bound to engine namespace(s). - - if self.block is False: - returns msg_id - else: - returns actual result of f(*args, **kwargs) - - This method has access to the targets' globals - - """ - d = self._defaults('bound') - return self.client.apply(f, args, kwargs, bound=True, **d) - + # @sync_results + # @save_ids + # def apply_bound(self, f, *args, **kwargs): + # """calls f(*args, **kwargs) bound to engine namespace(s). + # + # if self.block is False: + # returns msg_id + # else: + # returns actual result of f(*args, **kwargs) + # + # This method has access to the targets' namespace via globals() + # + # """ + # d = self._defaults('bound') + # return self.client.apply(f, args, kwargs, bound=True, **d) + # @sync_results @save_ids def apply_async_bound(self, f, *args, **kwargs): @@ -206,7 +216,7 @@ class View(HasTraits): returns: msg_id - This method has access to the targets' globals + This method has access to the targets' namespace via globals() """ d = self._defaults('block', 'bound') @@ -219,35 +229,54 @@ class View(HasTraits): returns: actual result of f(*args, **kwargs) - This method has access to the targets' globals + This method has access to the targets' namespace via globals() """ d = self._defaults('block', 'bound') return self.client.apply(f, args, kwargs, block=True, bound=True, **d) - def abort(self, msg_ids=None, block=None): + def abort(self, jobs=None, block=None): """Abort jobs on my engines. Parameters ---------- - msg_ids : None, str, list of strs, optional + jobs : None, str, list of strs, optional if None: abort all jobs. else: abort specific msg_id(s). """ block = block if block is not None else self.block - return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block) + return self.client.abort(jobs=jobs, targets=self._targets, block=block) def queue_status(self, verbose=False): """Fetch the Queue status of my engines""" - return self.client.queue_status(targets=self.targets, verbose=verbose) + return self.client.queue_status(targets=self._targets, verbose=verbose) - def purge_results(self, msg_ids=[], targets=[]): + def purge_results(self, jobs=[], targets=[]): """Instruct the controller to forget specific results.""" if targets is None or targets == 'all': - targets = self.targets - return self.client.purge_results(msg_ids=msg_ids, targets=targets) + targets = self._targets + return self.client.purge_results(jobs=jobs, targets=targets) + + @spin_after + def get_result(self, indices_or_msg_ids=None): + """return one or more results, specified by history index or msg_id. + See client.get_result for details. + + """ + + if indices_or_msg_ids is None: + indices_or_msg_ids = -1 + if isinstance(indices_or_msg_ids, int): + indices_or_msg_ids = self.history[indices_or_msg_ids] + elif isinstance(indices_or_msg_ids, (list,tuple,set)): + indices_or_msg_ids = list(indices_or_msg_ids) + for i,index in enumerate(indices_or_msg_ids): + if isinstance(index, int): + indices_or_msg_ids[i] = self.history[index] + return self.client.get_result(indices_or_msg_ids) + #------------------------------------------------------------------- # Map #------------------------------------------------------------------- @@ -261,7 +290,7 @@ class View(HasTraits): This is equivalent to map(...block=False) - See `map` for details. + See `self.map` for details. """ if 'block' in kwargs: raise TypeError("map_async doesn't take a `block` keyword argument.") @@ -273,25 +302,33 @@ class View(HasTraits): This is equivalent to map(...block=True) - See `map` for details. + See `self.map` for details. """ if 'block' in kwargs: raise TypeError("map_sync doesn't take a `block` keyword argument.") kwargs['block'] = True return self.map(f,*sequences,**kwargs) + def imap(self, f, *sequences, **kwargs): + """Parallel version of `itertools.imap`. + + See `self.map` for details. + """ + + return iter(self.map_async(f,*sequences, **kwargs)) + #------------------------------------------------------------------- # Decorators #------------------------------------------------------------------- def remote(self, bound=True, block=True): """Decorator for making a RemoteFunction""" - return remote(self.client, bound=bound, targets=self.targets, block=block, balanced=self._balanced) + return remote(self.client, bound=bound, targets=self._targets, block=block, balanced=self._balanced) def parallel(self, dist='b', bound=True, block=None): """Decorator for making a ParallelFunction""" block = self.block if block is None else block - return parallel(self.client, bound=bound, targets=self.targets, block=block, balanced=self._balanced) + return parallel(self.client, bound=bound, targets=self._targets, block=block, balanced=self._balanced) class DirectView(View): @@ -320,7 +357,9 @@ class DirectView(View): @spin_after @save_ids def map(self, f, *sequences, **kwargs): - """Parallel version of builtin `map`, using this View's `targets`. + """view.map(f, *sequences, block=self.block, bound=self.bound) => list|AsyncMapResult + + Parallel version of builtin `map`, using this View's `targets`. There will be one task per target, so work will be chunked if the sequences are longer than `targets`. @@ -337,7 +376,7 @@ class DirectView(View): block : bool whether to wait for the result or not [default self.block] bound : bool - whether to wait for the result or not [default self.bound] + whether to have access to the engines' namespaces [default self.bound] Returns ------- @@ -347,7 +386,8 @@ class DirectView(View): An object like AsyncResult, but which reassembles the sequence of results into a single list. AsyncMapResults can be iterated through before all results are complete. - else: + else: + list the result of map(f,*sequences) """ @@ -359,18 +399,18 @@ class DirectView(View): assert len(sequences) > 0, "must have some sequences to map onto!" pf = ParallelFunction(self.client, f, block=block, bound=bound, - targets=self.targets, balanced=False) + targets=self._targets, balanced=False) return pf.map(*sequences) @sync_results @save_ids def execute(self, code, block=True): """execute some code on my targets.""" - return self.client.execute(code, block=block, targets=self.targets) + return self.client.execute(code, block=block, targets=self._targets) def update(self, ns): """update remote namespace with dict `ns`""" - return self.client.push(ns, targets=self.targets, block=self.block) + return self.client.push(ns, targets=self._targets, block=self.block) push = update @@ -379,7 +419,7 @@ class DirectView(View): will return one object if it is a key. It also takes a list of keys, and will return a list of objects.""" # block = block if block is not None else self.block - return self.client.pull(key_s, block=True, targets=self.targets) + return self.client.pull(key_s, block=True, targets=self._targets) @sync_results @save_ids @@ -388,14 +428,14 @@ class DirectView(View): will return one object if it is a key. It also takes a list of keys, and will return a list of objects.""" block = block if block is not None else self.block - return self.client.pull(key_s, block=block, targets=self.targets) + return self.client.pull(key_s, block=block, targets=self._targets) def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None): """ Partition a Python sequence and send the partitions to a set of engines. """ block = block if block is not None else self.block - targets = targets if targets is not None else self.targets + targets = targets if targets is not None else self._targets return self.client.scatter(key, seq, dist=dist, flatten=flatten, targets=targets, block=block) @@ -407,7 +447,7 @@ class DirectView(View): Gather a partitioned sequence on a set of engines as a single local seq. """ block = block if block is not None else self.block - targets = targets if targets is not None else self.targets + targets = targets if targets is not None else self._targets return self.client.gather(key, dist=dist, targets=targets, block=block) @@ -420,12 +460,12 @@ class DirectView(View): def clear(self, block=False): """Clear the remote namespaces on my engines.""" block = block if block is not None else self.block - return self.client.clear(targets=self.targets, block=block) + return self.client.clear(targets=self._targets, block=block) def kill(self, block=True): """Kill my engines.""" block = block if block is not None else self.block - return self.client.kill(targets=self.targets, block=block) + return self.client.kill(targets=self._targets, block=block) #---------------------------------------- # activate for %px,%autopx magics @@ -504,9 +544,9 @@ class LoadBalancedView(View): def set_flags(self, **kwargs): """set my attribute flags by keyword. - A View is a wrapper for the Client's apply method, but - with attributes that specify keyword arguments, those attributes - can be set by keyword argument with this method. + A View is a wrapper for the Client's apply method, but with attributes + that specify keyword arguments, those attributes can be set by keyword + argument with this method. Parameters ---------- @@ -543,10 +583,15 @@ class LoadBalancedView(View): @spin_after @save_ids def map(self, f, *sequences, **kwargs): - """Parallel version of builtin `map`, load-balanced by this View. + """view.map(f, *sequences, block=self.block, bound=self.bound, chunk_size=1) => list|AsyncMapResult + + Parallel version of builtin `map`, load-balanced by this View. - Each element will be a separate task, and will be load-balanced. This - lets individual elements be available for iteration as soon as they arrive. + `block`, `bound`, and `chunk_size` can be specified by keyword only. + + Each `chunk_size` elements will be a separate task, and will be + load-balanced. This lets individual elements be available for iteration + as soon as they arrive. Parameters ---------- @@ -558,7 +603,9 @@ class LoadBalancedView(View): block : bool whether to wait for the result or not [default self.block] bound : bool - whether to use the engine's namespace + whether to use the engine's namespace [default self.bound] + chunk_size : int + how many elements should be in each task [default 1] Returns ------- @@ -586,7 +633,7 @@ class LoadBalancedView(View): assert len(sequences) > 0, "must have some sequences to map onto!" pf = ParallelFunction(self.client, f, block=block, bound=bound, - targets=self.targets, balanced=True, + targets=self._targets, balanced=True, chunk_size=chunk_size) return pf.map(*sequences) diff --git a/docs/source/parallelz/parallel_multiengine.txt b/docs/source/parallelz/parallel_multiengine.txt index 02b979b..9b91f0c 100644 --- a/docs/source/parallelz/parallel_multiengine.txt +++ b/docs/source/parallelz/parallel_multiengine.txt @@ -59,13 +59,24 @@ of engine ids: Here we see that there are four engines ready to do work for us. +For direct execution, we will make use of a :class:`DirectView` object, which can be +constructed via list-access to the client: + +.. sourcecode:: + + In [4]: dview = rc[:] # use all engines + +.. seealso:: + + For more information, see the in-depth explanation of :ref:`Views `. + + Quick and easy parallelism ========================== In many cases, you simply want to apply a Python function to a sequence of objects, but *in parallel*. The client interface provides a simple way -of accomplishing this: using the builtin :func:`map` and the ``@remote`` -function decorator, or the client's :meth:`map` method. +of accomplishing this: using the DirectView's :meth:`~DirectView.map` method. Parallel map ------------ @@ -79,44 +90,67 @@ DirectView's :meth:`map` method: .. sourcecode:: ipython In [62]: serial_result = map(lambda x:x**10, range(32)) + + In [63]: dview.block = True + + In [66]: parallel_result = dview.map(lambda x: x**10, range(32)) - In [66]: parallel_result = rc[:].map(lambda x: x**10, range(32)) - - In [67]: serial_result==parallel_result.get() + In [67]: serial_result==parallel_result Out[67]: True .. note:: The :class:`DirectView`'s version of :meth:`map` does - not do any load balancing. For a load balanced version, use a + not do dynamic load balancing. For a load balanced version, use a :class:`LoadBalancedView`, or a :class:`ParallelFunction` with `balanced=True`. .. seealso:: - :meth:`map` is implemented via :class:`.ParallelFunction`. + :meth:`map` is implemented via :class:`ParallelFunction`. -Remote function decorator -------------------------- +Remote function decorators +-------------------------- Remote functions are just like normal functions, but when they are called, they execute on one or more engines, rather than locally. IPython provides -some decorators: +two decorators: .. sourcecode:: ipython - In [10]: @rc.remote(block=True, targets=0) - ....: def f(x): - ....: return 10.0*x**4 - ....: + In [10]: @rc.remote(block=True, targets='all') + ...: def getpid(): + ...: import os + ...: return os.getpid() + ...: + + In [11]: getpid() + Out[11]: [12345, 12346, 12347, 12348] - In [11]: map(f, range(32)) # this is done on engine 0 - Out[11]: [0.0,10.0,160.0,...] +A ``@parallel`` decorator creates parallel functions, that break up an element-wise +operations and distribute them, reconstructing the result. + +.. sourcecode:: ipython + + In [12]: import numpy as np + + In [13]: A = np.random.random((64,48)) + + In [14]: @rc.parallel(block=True, targets='all') + ...: def pmul(A,B): + ...: return A*B + + In [15]: C_local = A*A + + In [16]: C_remote_partial = pmul(A,A) + + In [17]: (C_local == C_remote).all() + Out[17]: True .. seealso:: - See the docstring for the :func:`parallel` and :func:`remote` decorators for + See the docstrings for the :func:`parallel` and :func:`remote` decorators for options. Calling Python functions @@ -152,7 +186,7 @@ the extra arguments. For instance, performing index-access on a client creates a Out[4]: In [5]: view.apply - view.apply view.apply_async view.apply_async_bound view.apply_bound view.apply_sync view.apply_sync_bound + view.apply view.apply_async view.apply_async_bound view.apply_sync view.apply_sync_bound A :class:`DirectView` always uses its `targets` attribute, and it will use its `bound` and `block` attributes in its :meth:`apply` method, but the suffixed :meth:`apply_x` @@ -180,8 +214,8 @@ blocks until the engines are done executing the command: .. sourcecode:: ipython - In [2]: rc.block=True - In [3]: dview = rc[:] # A DirectView of all engines + In [2]: dview = rc[:] # A DirectView of all engines + In [3]: dview.block=True In [4]: dview['a'] = 5 In [5]: dview['b'] = 10 @@ -189,13 +223,13 @@ blocks until the engines are done executing the command: In [6]: dview.apply_bound(lambda x: a+b+x, 27) Out[6]: [42, 42, 42, 42] -Python commands can be executed on specific engines by calling execute using -the ``targets`` keyword argument, or creating a :class:`DirectView` instance -by index-access to the client: +Python commands can be executed on specific engines by calling execute using the ``targets`` +keyword argument in :meth:`client.execute`, or creating a :class:`DirectView` instance by +index-access to the client: .. sourcecode:: ipython - In [6]: rc[::2].execute('c=a+b') # shorthand for rc.execute('c=a+b',targets=[0,2]) + In [6]: rc.execute('c=a+b', targets=[0,2]) In [7]: rc[1::2].execute('c=a-b') # shorthand for rc.execute('c=a-b',targets=[1,3]) @@ -214,10 +248,12 @@ by index-access to the client: :meth:`View.apply(f,*args,**kwargs)`, which simply calls ``f(*args,**kwargs)`` remotely. -This example also shows one of the most important things about the IPython +Bound and unbound execution +--------------------------- + +The previous example also shows one of the most important things about the IPython engines: they have a persistent user namespaces. The :meth:`apply` method can -be run in either a bound or unbound way. The default for a View is to be -unbound, unless called by the :meth:`apply_bound` method: +be run in either a bound or unbound manner: .. sourcecode:: ipython @@ -225,10 +261,10 @@ unbound, unless called by the :meth:`apply_bound` method: In [10]: v0 = rc[0] - In [12]: v0.apply_bound(lambda : b) + In [12]: v0.apply_sync_bound(lambda : b) Out[12]: 5 - In [13]: v0.apply(lambda : b) + In [13]: v0.apply_sync(lambda : b) --------------------------------------------------------------------------- RemoteError Traceback (most recent call last) /home/you/ in () @@ -244,10 +280,10 @@ unbound, unless called by the :meth:`apply_bound` method: Specifically, `bound=True` specifies that the engine's namespace is to be used -for execution, and `bound=False` specifies that the engine's namespace is not -to be used (hence, 'b' is undefined during unbound execution, since the -function is called in an empty namespace). Unbound execution is often useful -for large numbers of atomic tasks, which prevents bloating the engine's +as the `globals` when the function is called, and `bound=False` specifies that +the engine's namespace is not to be used (hence, 'b' is undefined during unbound +execution, since the function is called in an empty namespace). Unbound execution is +often useful for large numbers of atomic tasks, which prevents bloating the engine's memory, while bound execution lets you build on your previous work. @@ -257,7 +293,7 @@ Non-blocking execution In non-blocking mode, :meth:`apply` submits the command to be executed and then returns a :class:`AsyncResult` object immediately. The :class:`AsyncResult` object gives you a way of getting a result at a later -time through its :meth:`get` method. +time through its :meth:`get` method. .. Note:: @@ -280,25 +316,25 @@ local Python/IPython session: ...: return time.time()-tic # In non-blocking mode - In [7]: pr = dview.apply_async(wait, 2) + In [7]: ar = dview.apply_async(wait, 2) # Now block for the result - In [8]: pr.get() + In [8]: ar.get() Out[8]: [2.0006198883056641, 1.9997570514678955, 1.9996809959411621, 2.0003249645233154] # Again in non-blocking mode - In [9]: pr = dview.apply_async(wait, 10) + In [9]: ar = dview.apply_async(wait, 10) # Poll to see if the result is ready - In [10]: pr.ready() + In [10]: ar.ready() Out[10]: False # ask for the result, but wait a maximum of 1 second: - In [45]: pr.get(1) + In [45]: ar.get(1) --------------------------------------------------------------------------- TimeoutError Traceback (most recent call last) /home/you/ in () - ----> 1 pr.get(1) + ----> 1 ar.get(1) /path/to/site-packages/IPython/zmq/parallel/asyncresult.pyc in get(self, timeout) 62 raise self._exception @@ -316,8 +352,8 @@ local Python/IPython session: Often, it is desirable to wait until a set of :class:`AsyncResult` objects are done. For this, there is a the method :meth:`barrier`. This method takes a -tuple of :class:`AsyncResult` objects (or `msg_ids`) and blocks until all of the -associated results are ready: +tuple of :class:`AsyncResult` objects (or `msg_ids` or indices to the client's History), +and blocks until all of the associated results are ready: .. sourcecode:: ipython @@ -338,15 +374,17 @@ associated results are ready: The ``block`` keyword argument and attributes --------------------------------------------- -Most methods(like :meth:`apply`) accept +Most client methods(like :meth:`apply`) accept ``block`` as a keyword argument. As we have seen above, these -keyword arguments control the blocking mode . The :class:`Client` class also has +keyword arguments control the blocking mode. The :class:`Client` class also has a :attr:`block` attribute that controls the default behavior when the keyword argument is not provided. Thus the following logic is used for :attr:`block`: * If no keyword argument is provided, the instance attributes are used. * Keyword argument, if provided override the instance attributes for the duration of a single call. + +DirectView objects also have a ``bound`` attribute, which is used in the same way. The following examples demonstrate how to use the instance attributes: @@ -365,7 +403,7 @@ The following examples demonstrate how to use the instance attributes: In [22]: rc.apply(lambda : 42, targets='all') Out[22]: [42, 42, 42, 42] -The :attr:`block` and :attr:`targets` instance attributes of the +The :attr:`block`, :attr:`bound`, and :attr:`targets` instance attributes of the :class:`.DirectView` also determine the behavior of the parallel magic commands. @@ -381,9 +419,9 @@ Parallel magic commands We provide a few IPython magic commands (``%px``, ``%autopx`` and ``%result``) that make it more pleasant to execute Python commands on the engines interactively. These are simply shortcuts to :meth:`execute` and -:meth:`get_result`. The ``%px`` magic executes a single Python command on the -engines specified by the :attr:`targets` attribute of the -:class:`MultiEngineClient` instance (by default this is ``'all'``): +:meth:`get_result` of the :class:`DirectView`. The ``%px`` magic executes a single +Python command on the engines specified by the :attr:`targets` attribute of the +:class:`DirectView` instance: .. sourcecode:: ipython @@ -399,7 +437,6 @@ engines specified by the :attr:`targets` attribute of the In [26]: %px import numpy Parallel execution on engines: [0, 1, 2, 3] - Out[26]:[None,None,None,None] In [27]: %px a = numpy.random.rand(2,2) Parallel execution on engines: [0, 1, 2, 3] @@ -408,36 +445,25 @@ engines specified by the :attr:`targets` attribute of the Parallel execution on engines: [0, 1, 2, 3] In [28]: dv['ev'] - Out[44]: [ array([ 1.09522024, -0.09645227]), + Out[28]: [ array([ 1.09522024, -0.09645227]), array([ 1.21435496, -0.35546712]), array([ 0.72180653, 0.07133042]), array([ 1.46384341e+00, 1.04353244e-04]) ] -.. Note:: - - ``%result`` doesn't work - -The ``%result`` magic gets and prints the stdin/stdout/stderr of the last -command executed on each engine. It is simply a shortcut to the +The ``%result`` magic gets the most recent result, or takes an argument +specifying the index of the result to be requested. It is simply a shortcut to the :meth:`get_result` method: .. sourcecode:: ipython - - In [29]: %result - Out[29]: - - [0] In [10]: print numpy.linalg.eigvals(a) - [0] Out[10]: [ 1.28167017 0.14197338] - - [1] In [9]: print numpy.linalg.eigvals(a) - [1] Out[9]: [-0.14093616 1.27877273] - - [2] In [10]: print numpy.linalg.eigvals(a) - [2] Out[10]: [-0.37023573 1.06779409] - - [3] In [9]: print numpy.linalg.eigvals(a) - [3] Out[9]: [ 0.83664764 -0.25602658] + + In [29]: dv.apply_async_bound(lambda : ev) + + In [30]: %result + Out[30]: [ [ 1.28167017 0.14197338], + [-0.14093616 1.27877273], + [-0.37023573 1.06779409], + [ 0.83664764 -0.25602658] ] The ``%autopx`` magic switches to a mode where everything you type is executed on the engines given by the :attr:`targets` attribute: @@ -477,12 +503,6 @@ on the engines given by the :attr:`targets` attribute: 'Average max eigenvalue is: 10.1158837784',] -.. Note:: - - Multiline ``%autpx`` gets fouled up by NameErrors, because IPython - currently introspects too much. - - Moving Python objects around ============================ @@ -524,14 +544,12 @@ In non-blocking mode :meth:`push` and :meth:`pull` also return In [47]: rc.block=False - In [48]: pr = rc.pull('a') + In [48]: ar = rc.pull('a') - In [49]: pr.get() + In [49]: ar.get() Out[49]: [1.03234, 1.03234, 1.03234, 1.03234] - - Dictionary interface -------------------- @@ -751,9 +769,9 @@ All of this same error handling magic even works in non-blocking mode: In [83]: rc.block=False - In [84]: pr = rc.execute('1/0') + In [84]: ar = rc.execute('1/0') - In [85]: pr.get() + In [85]: ar.get() --------------------------------------------------------------------------- CompositeError Traceback (most recent call last) diff --git a/docs/source/parallelz/parallel_task.txt b/docs/source/parallelz/parallel_task.txt index ec49a0f..1e502ff 100644 --- a/docs/source/parallelz/parallel_task.txt +++ b/docs/source/parallelz/parallel_task.txt @@ -33,7 +33,8 @@ Creating a ``Client`` instance ============================== The first step is to import the IPython :mod:`IPython.zmq.parallel.client` -module and then create a :class:`.Client` instance: +module and then create a :class:`.Client` instance, and we will also be using +a :class:`LoadBalancedView`, here called `lview`: .. sourcecode:: ipython @@ -41,8 +42,7 @@ module and then create a :class:`.Client` instance: In [2]: rc = client.Client() - In [3]: lview = rc.view(balanced=True) - Out[3]: + In [3]: lview = rc.view() This form assumes that the controller was started on localhost with default @@ -73,14 +73,15 @@ the task interface. Parallel map ------------ -To load-balance :meth:`map`,simply use a LoadBalancedView, created by asking -for the ``None`` element: +To load-balance :meth:`map`,simply use a LoadBalancedView: .. sourcecode:: ipython - + + In [62]: lview.block = True + In [63]: serial_result = map(lambda x:x**10, range(32)) - In [64]: parallel_result = lview.map(lambda x:x**10, range(32), block=True) + In [64]: parallel_result = lview.map(lambda x:x**10, range(32)) In [65]: serial_result==parallel_result Out[65]: True