asyncresult.py
204 lines
| 6.8 KiB
| text/x-python
|
PythonLexer
MinRK
|
r3589 | """AsyncResult objects for the client""" | |
#----------------------------------------------------------------------------- | |||
# Copyright (C) 2010 The IPython Development Team | |||
# | |||
# Distributed under the terms of the BSD License. The full license is in | |||
# the file COPYING, distributed as part of this software. | |||
#----------------------------------------------------------------------------- | |||
#----------------------------------------------------------------------------- | |||
# Imports | |||
#----------------------------------------------------------------------------- | |||
MinRK
|
r3601 | from IPython.external.decorator import decorator | |
MinRK
|
r3589 | import error | |
#----------------------------------------------------------------------------- | |||
# Classes | |||
#----------------------------------------------------------------------------- | |||
MinRK
|
r3601 | @decorator | |
def check_ready(f, self, *args, **kwargs): | |||
"""Call spin() to sync state prior to calling the method.""" | |||
self.wait(0) | |||
if not self._ready: | |||
raise error.TimeoutError("result not ready") | |||
return f(self, *args, **kwargs) | |||
MinRK
|
r3589 | class AsyncResult(object): | |
"""Class for representing results of non-blocking calls. | |||
Provides the same interface as :py:class:`multiprocessing.AsyncResult`. | |||
""" | |||
MinRK
|
r3596 | def __init__(self, client, msg_ids, fname=''): | |
MinRK
|
r3589 | self._client = client | |
MinRK
|
r3592 | self.msg_ids = msg_ids | |
MinRK
|
r3596 | self._fname=fname | |
MinRK
|
r3589 | self._ready = False | |
self._success = None | |||
MinRK
|
r3611 | self._single_result = len(msg_ids) == 1 | |
MinRK
|
r3589 | ||
def __repr__(self): | |||
if self._ready: | |||
return "<%s: finished>"%(self.__class__.__name__) | |||
else: | |||
MinRK
|
r3596 | return "<%s: %s>"%(self.__class__.__name__,self._fname) | |
MinRK
|
r3589 | ||
def _reconstruct_result(self, res): | |||
""" | |||
Override me in subclasses for turning a list of results | |||
into the expected form. | |||
""" | |||
MinRK
|
r3611 | if self._single_result: | |
MinRK
|
r3589 | return res[0] | |
else: | |||
return res | |||
def get(self, timeout=-1): | |||
"""Return the result when it arrives. | |||
If `timeout` is not ``None`` and the result does not arrive within | |||
`timeout` seconds then ``TimeoutError`` is raised. If the | |||
remote call raised an exception then that exception will be reraised | |||
by get(). | |||
""" | |||
if not self.ready(): | |||
self.wait(timeout) | |||
if self._ready: | |||
if self._success: | |||
return self._result | |||
else: | |||
raise self._exception | |||
else: | |||
raise error.TimeoutError("Result not ready.") | |||
def ready(self): | |||
"""Return whether the call has completed.""" | |||
if not self._ready: | |||
self.wait(0) | |||
return self._ready | |||
def wait(self, timeout=-1): | |||
"""Wait until the result is available or until `timeout` seconds pass. | |||
""" | |||
if self._ready: | |||
return | |||
MinRK
|
r3592 | self._ready = self._client.barrier(self.msg_ids, timeout) | |
MinRK
|
r3589 | if self._ready: | |
try: | |||
MinRK
|
r3592 | results = map(self._client.results.get, self.msg_ids) | |
MinRK
|
r3601 | self._result = results | |
MinRK
|
r3611 | if self._single_result: | |
r = results[0] | |||
if isinstance(r, Exception): | |||
raise r | |||
else: | |||
results = error.collect_exceptions(results, self._fname) | |||
MinRK
|
r3589 | self._result = self._reconstruct_result(results) | |
except Exception, e: | |||
self._exception = e | |||
self._success = False | |||
else: | |||
self._success = True | |||
MinRK
|
r3601 | finally: | |
self._metadata = map(self._client.metadata.get, self.msg_ids) | |||
MinRK
|
r3589 | ||
def successful(self): | |||
"""Return whether the call completed without raising an exception. | |||
Will raise ``AssertionError`` if the result is not ready. | |||
""" | |||
assert self._ready | |||
return self._success | |||
MinRK
|
r3601 | ||
#---------------------------------------------------------------- | |||
# Extra methods not in mp.pool.AsyncResult | |||
#---------------------------------------------------------------- | |||
def get_dict(self, timeout=-1): | |||
"""Get the results as a dict, keyed by engine_id.""" | |||
results = self.get(timeout) | |||
MinRK
|
r3607 | engine_ids = [ md['engine_id'] for md in self._metadata ] | |
MinRK
|
r3601 | bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k)) | |
maxcount = bycount.count(bycount[-1]) | |||
if maxcount > 1: | |||
raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%( | |||
maxcount, bycount[-1])) | |||
return dict(zip(engine_ids,results)) | |||
@property | |||
@check_ready | |||
def result(self): | |||
"""result property.""" | |||
return self._result | |||
MinRK
|
r3607 | # abbreviated alias: | |
r = result | |||
MinRK
|
r3601 | @property | |
@check_ready | |||
def metadata(self): | |||
"""metadata property.""" | |||
MinRK
|
r3611 | if self._single_result: | |
MinRK
|
r3607 | return self._metadata[0] | |
else: | |||
return self._metadata | |||
MinRK
|
r3601 | ||
@property | |||
def result_dict(self): | |||
"""result property as a dict.""" | |||
return self.get_dict(0) | |||
MinRK
|
r3602 | def __dict__(self): | |
return self.get_dict(0) | |||
MinRK
|
r3601 | #------------------------------------- | |
# dict-access | |||
#------------------------------------- | |||
@check_ready | |||
def __getitem__(self, key): | |||
"""getitem returns result value(s) if keyed by int/slice, or metadata if key is str. | |||
""" | |||
if isinstance(key, int): | |||
return error.collect_exceptions([self._result[key]], self._fname)[0] | |||
elif isinstance(key, slice): | |||
return error.collect_exceptions(self._result[key], self._fname) | |||
elif isinstance(key, basestring): | |||
MinRK
|
r3607 | values = [ md[key] for md in self._metadata ] | |
MinRK
|
r3611 | if self._single_result: | |
MinRK
|
r3607 | return values[0] | |
else: | |||
return values | |||
MinRK
|
r3601 | else: | |
raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key)) | |||
@check_ready | |||
def __getattr__(self, key): | |||
"""getattr maps to getitem for convenient access to metadata.""" | |||
if key not in self._metadata[0].keys(): | |||
raise AttributeError("%r object has no attribute %r"%( | |||
self.__class__.__name__, key)) | |||
return self.__getitem__(key) | |||
MinRK
|
r3589 | ||
MinRK
|
r3601 | ||
MinRK
|
r3589 | class AsyncMapResult(AsyncResult): | |
"""Class for representing results of non-blocking gathers. | |||
This will properly reconstruct the gather. | |||
""" | |||
MinRK
|
r3596 | def __init__(self, client, msg_ids, mapObject, fname=''): | |
AsyncResult.__init__(self, client, msg_ids, fname=fname) | |||
MinRK
|
r3607 | self._mapObject = mapObject | |
MinRK
|
r3611 | self._single_result = False | |
MinRK
|
r3589 | ||
def _reconstruct_result(self, res): | |||
"""Perform the gather on the actual results.""" | |||
return self._mapObject.joinPartitions(res) | |||
MinRK
|
r3601 | __all__ = ['AsyncResult', 'AsyncMapResult'] |