"""AsyncResult objects for the client""" #----------------------------------------------------------------------------- # Copyright (C) 2010 The IPython Development Team # # Distributed under the terms of the BSD License. The full license is in # the file COPYING, distributed as part of this software. #----------------------------------------------------------------------------- #----------------------------------------------------------------------------- # Imports #----------------------------------------------------------------------------- from IPython.external.decorator import decorator import error #----------------------------------------------------------------------------- # Classes #----------------------------------------------------------------------------- @decorator def check_ready(f, self, *args, **kwargs): """Call spin() to sync state prior to calling the method.""" self.wait(0) if not self._ready: raise error.TimeoutError("result not ready") return f(self, *args, **kwargs) class AsyncResult(object): """Class for representing results of non-blocking calls. Provides the same interface as :py:class:`multiprocessing.AsyncResult`. """ def __init__(self, client, msg_ids, fname=''): self._client = client self.msg_ids = msg_ids self._fname=fname self._ready = False self._success = None self._flatten_result = len(msg_ids) == 1 def __repr__(self): if self._ready: return "<%s: finished>"%(self.__class__.__name__) else: return "<%s: %s>"%(self.__class__.__name__,self._fname) def _reconstruct_result(self, res): """ Override me in subclasses for turning a list of results into the expected form. """ if self._flatten_result: return res[0] else: return res def get(self, timeout=-1): """Return the result when it arrives. If `timeout` is not ``None`` and the result does not arrive within `timeout` seconds then ``TimeoutError`` is raised. If the remote call raised an exception then that exception will be reraised by get(). """ if not self.ready(): self.wait(timeout) if self._ready: if self._success: return self._result else: raise self._exception else: raise error.TimeoutError("Result not ready.") def ready(self): """Return whether the call has completed.""" if not self._ready: self.wait(0) return self._ready def wait(self, timeout=-1): """Wait until the result is available or until `timeout` seconds pass. """ if self._ready: return self._ready = self._client.barrier(self.msg_ids, timeout) if self._ready: try: results = map(self._client.results.get, self.msg_ids) self._result = results results = error.collect_exceptions(results, self._fname) self._result = self._reconstruct_result(results) except Exception, e: self._exception = e self._success = False else: self._success = True finally: self._metadata = map(self._client.metadata.get, self.msg_ids) def successful(self): """Return whether the call completed without raising an exception. Will raise ``AssertionError`` if the result is not ready. """ assert self._ready return self._success #---------------------------------------------------------------- # Extra methods not in mp.pool.AsyncResult #---------------------------------------------------------------- def get_dict(self, timeout=-1): """Get the results as a dict, keyed by engine_id.""" results = self.get(timeout) engine_ids = [ md['engine_id'] for md in self._metadata ] bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k)) maxcount = bycount.count(bycount[-1]) if maxcount > 1: raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%( maxcount, bycount[-1])) return dict(zip(engine_ids,results)) @property @check_ready def result(self): """result property.""" return self._result # abbreviated alias: r = result @property @check_ready def metadata(self): """metadata property.""" if self._flatten_result: return self._metadata[0] else: return self._metadata @property def result_dict(self): """result property as a dict.""" return self.get_dict(0) def __dict__(self): return self.get_dict(0) #------------------------------------- # dict-access #------------------------------------- @check_ready def __getitem__(self, key): """getitem returns result value(s) if keyed by int/slice, or metadata if key is str. """ if isinstance(key, int): return error.collect_exceptions([self._result[key]], self._fname)[0] elif isinstance(key, slice): return error.collect_exceptions(self._result[key], self._fname) elif isinstance(key, basestring): values = [ md[key] for md in self._metadata ] if self._flatten_result: return values[0] else: return values else: raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key)) @check_ready def __getattr__(self, key): """getattr maps to getitem for convenient access to metadata.""" if key not in self._metadata[0].keys(): raise AttributeError("%r object has no attribute %r"%( self.__class__.__name__, key)) return self.__getitem__(key) class AsyncMapResult(AsyncResult): """Class for representing results of non-blocking gathers. This will properly reconstruct the gather. """ def __init__(self, client, msg_ids, mapObject, fname=''): AsyncResult.__init__(self, client, msg_ids, fname=fname) self._mapObject = mapObject self._flatten_result = False def _reconstruct_result(self, res): """Perform the gather on the actual results.""" return self._mapObject.joinPartitions(res) __all__ = ['AsyncResult', 'AsyncMapResult']