asyncresult.py
344 lines
| 11.4 KiB
| text/x-python
|
PythonLexer
MinRK
|
r4018 | """AsyncResult objects for the client | |
Authors: | |||
* MinRK | |||
""" | |||
MinRK
|
r3589 | #----------------------------------------------------------------------------- | |
MinRK
|
r3660 | # Copyright (C) 2010-2011 The IPython Development Team | |
MinRK
|
r3589 | # | |
# Distributed under the terms of the BSD License. The full license is in | |||
# the file COPYING, distributed as part of this software. | |||
#----------------------------------------------------------------------------- | |||
#----------------------------------------------------------------------------- | |||
# Imports | |||
#----------------------------------------------------------------------------- | |||
MinRK
|
r3639 | import time | |
MinRK
|
r3664 | from zmq import MessageTracker | |
MinRK
|
r3601 | from IPython.external.decorator import decorator | |
MinRK
|
r3673 | from IPython.parallel import error | |
MinRK
|
r3589 | ||
#----------------------------------------------------------------------------- | |||
# Classes | |||
#----------------------------------------------------------------------------- | |||
MinRK
|
r3664 | # global empty tracker that's always done: | |
finished_tracker = MessageTracker() | |||
MinRK
|
r3601 | @decorator | |
def check_ready(f, self, *args, **kwargs): | |||
"""Call spin() to sync state prior to calling the method.""" | |||
self.wait(0) | |||
if not self._ready: | |||
raise error.TimeoutError("result not ready") | |||
return f(self, *args, **kwargs) | |||
MinRK
|
r3589 | class AsyncResult(object): | |
"""Class for representing results of non-blocking calls. | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3644 | Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`. | |
MinRK
|
r3589 | """ | |
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3618 | msg_ids = None | |
MinRK
|
r3654 | _targets = None | |
_tracker = None | |||
MinRK
|
r3664 | _single_result = False | |
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3654 | def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None): | |
MinRK
|
r3627 | if isinstance(msg_ids, basestring): | |
MinRK
|
r3664 | # always a list | |
MinRK
|
r3627 | msg_ids = [msg_ids] | |
MinRK
|
r3664 | if tracker is None: | |
# default to always done | |||
tracker = finished_tracker | |||
self._client = client | |||
MinRK
|
r3592 | self.msg_ids = msg_ids | |
MinRK
|
r3596 | self._fname=fname | |
MinRK
|
r3654 | self._targets = targets | |
self._tracker = tracker | |||
MinRK
|
r3589 | self._ready = False | |
self._success = None | |||
MinRK
|
r3664 | if len(msg_ids) == 1: | |
self._single_result = not isinstance(targets, (list, tuple)) | |||
else: | |||
self._single_result = False | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3589 | def __repr__(self): | |
if self._ready: | |||
return "<%s: finished>"%(self.__class__.__name__) | |||
else: | |||
MinRK
|
r3596 | return "<%s: %s>"%(self.__class__.__name__,self._fname) | |
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3589 | def _reconstruct_result(self, res): | |
MinRK
|
r3644 | """Reconstruct our result from actual result list (always a list) | |
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3589 | Override me in subclasses for turning a list of results | |
into the expected form. | |||
""" | |||
MinRK
|
r3611 | if self._single_result: | |
MinRK
|
r3589 | return res[0] | |
else: | |||
return res | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3589 | def get(self, timeout=-1): | |
Bernardo B. Marques
|
r4872 | """Return the result when it arrives. | |
MinRK
|
r3589 | If `timeout` is not ``None`` and the result does not arrive within | |
`timeout` seconds then ``TimeoutError`` is raised. If the | |||
remote call raised an exception then that exception will be reraised | |||
MinRK
|
r3644 | by get() inside a `RemoteError`. | |
MinRK
|
r3589 | """ | |
if not self.ready(): | |||
self.wait(timeout) | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3589 | if self._ready: | |
if self._success: | |||
return self._result | |||
else: | |||
raise self._exception | |||
else: | |||
raise error.TimeoutError("Result not ready.") | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3589 | def ready(self): | |
"""Return whether the call has completed.""" | |||
if not self._ready: | |||
self.wait(0) | |||
return self._ready | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3589 | def wait(self, timeout=-1): | |
"""Wait until the result is available or until `timeout` seconds pass. | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3644 | This method always returns None. | |
MinRK
|
r3589 | """ | |
if self._ready: | |||
return | |||
MinRK
|
r3664 | self._ready = self._client.wait(self.msg_ids, timeout) | |
MinRK
|
r3589 | if self._ready: | |
try: | |||
MinRK
|
r3592 | results = map(self._client.results.get, self.msg_ids) | |
MinRK
|
r3601 | self._result = results | |
MinRK
|
r3611 | if self._single_result: | |
r = results[0] | |||
if isinstance(r, Exception): | |||
raise r | |||
else: | |||
results = error.collect_exceptions(results, self._fname) | |||
MinRK
|
r3589 | self._result = self._reconstruct_result(results) | |
except Exception, e: | |||
self._exception = e | |||
self._success = False | |||
else: | |||
self._success = True | |||
MinRK
|
r3601 | finally: | |
self._metadata = map(self._client.metadata.get, self.msg_ids) | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3589 | def successful(self): | |
Bernardo B. Marques
|
r4872 | """Return whether the call completed without raising an exception. | |
MinRK
|
r3589 | Will raise ``AssertionError`` if the result is not ready. | |
""" | |||
MinRK
|
r3644 | assert self.ready() | |
MinRK
|
r3589 | return self._success | |
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3601 | #---------------------------------------------------------------- | |
# Extra methods not in mp.pool.AsyncResult | |||
#---------------------------------------------------------------- | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3601 | def get_dict(self, timeout=-1): | |
MinRK
|
r3644 | """Get the results as a dict, keyed by engine_id. | |
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3644 | timeout behavior is described in `get()`. | |
""" | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3601 | results = self.get(timeout) | |
MinRK
|
r3607 | engine_ids = [ md['engine_id'] for md in self._metadata ] | |
MinRK
|
r3601 | bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k)) | |
maxcount = bycount.count(bycount[-1]) | |||
if maxcount > 1: | |||
raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%( | |||
maxcount, bycount[-1])) | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3601 | return dict(zip(engine_ids,results)) | |
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3601 | @property | |
def result(self): | |||
MinRK
|
r3644 | """result property wrapper for `get(timeout=0)`.""" | |
MinRK
|
r3664 | return self.get() | |
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3607 | # abbreviated alias: | |
r = result | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3601 | @property | |
@check_ready | |||
def metadata(self): | |||
MinRK
|
r3644 | """property for accessing execution metadata.""" | |
MinRK
|
r3611 | if self._single_result: | |
MinRK
|
r3607 | return self._metadata[0] | |
else: | |||
return self._metadata | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3601 | @property | |
def result_dict(self): | |||
"""result property as a dict.""" | |||
MinRK
|
r3664 | return self.get_dict() | |
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3602 | def __dict__(self): | |
return self.get_dict(0) | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3654 | def abort(self): | |
"""abort my tasks.""" | |||
assert not self.ready(), "Can't abort, I am already done!" | |||
return self.client.abort(self.msg_ids, targets=self._targets, block=True) | |||
@property | |||
def sent(self): | |||
MinRK
|
r3664 | """check whether my messages have been sent.""" | |
return self._tracker.done | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3664 | def wait_for_send(self, timeout=-1): | |
"""wait for pyzmq send to complete. | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3664 | This is necessary when sending arrays that you intend to edit in-place. | |
`timeout` is in seconds, and will raise TimeoutError if it is reached | |||
before the send completes. | |||
""" | |||
return self._tracker.wait(timeout) | |||
MinRK
|
r3602 | ||
MinRK
|
r3601 | #------------------------------------- | |
# dict-access | |||
#------------------------------------- | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3601 | @check_ready | |
def __getitem__(self, key): | |||
"""getitem returns result value(s) if keyed by int/slice, or metadata if key is str. | |||
""" | |||
if isinstance(key, int): | |||
return error.collect_exceptions([self._result[key]], self._fname)[0] | |||
elif isinstance(key, slice): | |||
return error.collect_exceptions(self._result[key], self._fname) | |||
elif isinstance(key, basestring): | |||
MinRK
|
r3607 | values = [ md[key] for md in self._metadata ] | |
MinRK
|
r3611 | if self._single_result: | |
MinRK
|
r3607 | return values[0] | |
else: | |||
return values | |||
MinRK
|
r3601 | else: | |
raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key)) | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3601 | @check_ready | |
def __getattr__(self, key): | |||
MinRK
|
r3644 | """getattr maps to getitem for convenient attr access to metadata.""" | |
MinRK
|
r3601 | if key not in self._metadata[0].keys(): | |
raise AttributeError("%r object has no attribute %r"%( | |||
self.__class__.__name__, key)) | |||
return self.__getitem__(key) | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3639 | # asynchronous iterator: | |
def __iter__(self): | |||
if self._single_result: | |||
raise TypeError("AsyncResults with a single result are not iterable.") | |||
try: | |||
rlist = self.get(0) | |||
except error.TimeoutError: | |||
# wait for each result individually | |||
for msg_id in self.msg_ids: | |||
ar = AsyncResult(self._client, msg_id, self._fname) | |||
yield ar.get() | |||
else: | |||
# already done | |||
for r in rlist: | |||
yield r | |||
MinRK
|
r3589 | ||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3589 | class AsyncMapResult(AsyncResult): | |
"""Class for representing results of non-blocking gathers. | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3589 | This will properly reconstruct the gather. | |
""" | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3596 | def __init__(self, client, msg_ids, mapObject, fname=''): | |
AsyncResult.__init__(self, client, msg_ids, fname=fname) | |||
MinRK
|
r3607 | self._mapObject = mapObject | |
MinRK
|
r3611 | self._single_result = False | |
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3589 | def _reconstruct_result(self, res): | |
"""Perform the gather on the actual results.""" | |||
return self._mapObject.joinPartitions(res) | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3627 | # asynchronous iterator: | |
def __iter__(self): | |||
try: | |||
rlist = self.get(0) | |||
except error.TimeoutError: | |||
# wait for each result individually | |||
for msg_id in self.msg_ids: | |||
ar = AsyncResult(self._client, msg_id, self._fname) | |||
rlist = ar.get() | |||
try: | |||
for r in rlist: | |||
yield r | |||
except TypeError: | |||
# flattened, not a list | |||
# this could get broken by flattened data that returns iterables | |||
# but most calls to map do not expose the `flatten` argument | |||
yield rlist | |||
else: | |||
# already done | |||
for r in rlist: | |||
yield r | |||
MinRK
|
r3639 | ||
class AsyncHubResult(AsyncResult): | |||
MinRK
|
r3644 | """Class to wrap pending results that must be requested from the Hub. | |
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3644 | Note that waiting/polling on these objects requires polling the Hubover the network, | |
so use `AsyncHubResult.wait()` sparingly. | |||
""" | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3639 | def wait(self, timeout=-1): | |
"""wait for result to complete.""" | |||
start = time.time() | |||
if self._ready: | |||
return | |||
local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids) | |||
MinRK
|
r3664 | local_ready = self._client.wait(local_ids, timeout) | |
MinRK
|
r3639 | if local_ready: | |
remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids) | |||
if not remote_ids: | |||
self._ready = True | |||
else: | |||
rdict = self._client.result_status(remote_ids, status_only=False) | |||
pending = rdict['pending'] | |||
MinRK
|
r3641 | while pending and (timeout < 0 or time.time() < start+timeout): | |
MinRK
|
r3639 | rdict = self._client.result_status(remote_ids, status_only=False) | |
pending = rdict['pending'] | |||
if pending: | |||
time.sleep(0.1) | |||
if not pending: | |||
self._ready = True | |||
if self._ready: | |||
try: | |||
results = map(self._client.results.get, self.msg_ids) | |||
self._result = results | |||
if self._single_result: | |||
r = results[0] | |||
if isinstance(r, Exception): | |||
raise r | |||
else: | |||
results = error.collect_exceptions(results, self._fname) | |||
self._result = self._reconstruct_result(results) | |||
except Exception, e: | |||
self._exception = e | |||
self._success = False | |||
else: | |||
self._success = True | |||
finally: | |||
self._metadata = map(self._client.metadata.get, self.msg_ids) | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3639 | __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult'] |