##// END OF EJS Templates
add rich AsyncResult behavior
add rich AsyncResult behavior

File last commit:

r3601:84a34248
r3601:84a34248
Show More
asyncresult.py
185 lines | 6.2 KiB | text/x-python | PythonLexer
"""AsyncResult objects for the client"""
#-----------------------------------------------------------------------------
# Copyright (C) 2010 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
from IPython.external.decorator import decorator
import error
#-----------------------------------------------------------------------------
# Classes
#-----------------------------------------------------------------------------
@decorator
def check_ready(f, self, *args, **kwargs):
"""Call spin() to sync state prior to calling the method."""
self.wait(0)
if not self._ready:
raise error.TimeoutError("result not ready")
return f(self, *args, **kwargs)
class AsyncResult(object):
"""Class for representing results of non-blocking calls.
Provides the same interface as :py:class:`multiprocessing.AsyncResult`.
"""
def __init__(self, client, msg_ids, fname=''):
self._client = client
self.msg_ids = msg_ids
self._fname=fname
self._ready = False
self._success = None
def __repr__(self):
if self._ready:
return "<%s: finished>"%(self.__class__.__name__)
else:
return "<%s: %s>"%(self.__class__.__name__,self._fname)
def _reconstruct_result(self, res):
"""
Override me in subclasses for turning a list of results
into the expected form.
"""
if len(self.msg_ids) == 1:
return res[0]
else:
return res
def get(self, timeout=-1):
"""Return the result when it arrives.
If `timeout` is not ``None`` and the result does not arrive within
`timeout` seconds then ``TimeoutError`` is raised. If the
remote call raised an exception then that exception will be reraised
by get().
"""
if not self.ready():
self.wait(timeout)
if self._ready:
if self._success:
return self._result
else:
raise self._exception
else:
raise error.TimeoutError("Result not ready.")
def ready(self):
"""Return whether the call has completed."""
if not self._ready:
self.wait(0)
return self._ready
def wait(self, timeout=-1):
"""Wait until the result is available or until `timeout` seconds pass.
"""
if self._ready:
return
self._ready = self._client.barrier(self.msg_ids, timeout)
if self._ready:
try:
results = map(self._client.results.get, self.msg_ids)
self._result = results
results = error.collect_exceptions(results, self._fname)
self._result = self._reconstruct_result(results)
except Exception, e:
self._exception = e
self._success = False
else:
self._success = True
finally:
self._metadata = map(self._client.metadata.get, self.msg_ids)
def successful(self):
"""Return whether the call completed without raising an exception.
Will raise ``AssertionError`` if the result is not ready.
"""
assert self._ready
return self._success
#----------------------------------------------------------------
# Extra methods not in mp.pool.AsyncResult
#----------------------------------------------------------------
def get_dict(self, timeout=-1):
"""Get the results as a dict, keyed by engine_id."""
results = self.get(timeout)
engine_ids = [md['engine_id'] for md in self._metadata ]
bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
maxcount = bycount.count(bycount[-1])
if maxcount > 1:
raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
maxcount, bycount[-1]))
return dict(zip(engine_ids,results))
@property
@check_ready
def result(self):
"""result property."""
return self._result
@property
@check_ready
def metadata(self):
"""metadata property."""
return self._metadata
@property
@check_ready
def result_dict(self):
"""result property as a dict."""
return self.get_dict(0)
#-------------------------------------
# dict-access
#-------------------------------------
@check_ready
def __getitem__(self, key):
"""getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
"""
if isinstance(key, int):
return error.collect_exceptions([self._result[key]], self._fname)[0]
elif isinstance(key, slice):
return error.collect_exceptions(self._result[key], self._fname)
elif isinstance(key, basestring):
return [ md[key] for md in self._metadata ]
else:
raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
@check_ready
def __getattr__(self, key):
"""getattr maps to getitem for convenient access to metadata."""
if key not in self._metadata[0].keys():
raise AttributeError("%r object has no attribute %r"%(
self.__class__.__name__, key))
return self.__getitem__(key)
class AsyncMapResult(AsyncResult):
"""Class for representing results of non-blocking gathers.
This will properly reconstruct the gather.
"""
def __init__(self, client, msg_ids, mapObject, fname=''):
self._mapObject = mapObject
AsyncResult.__init__(self, client, msg_ids, fname=fname)
def _reconstruct_result(self, res):
"""Perform the gather on the actual results."""
return self._mapObject.joinPartitions(res)
__all__ = ['AsyncResult', 'AsyncMapResult']