asyncresult.py
468 lines
| 15.9 KiB
| text/x-python
|
PythonLexer
MinRK
|
r4018 | """AsyncResult objects for the client | |
Authors: | |||
* MinRK | |||
""" | |||
MinRK
|
r3589 | #----------------------------------------------------------------------------- | |
MinRK
|
r3660 | # Copyright (C) 2010-2011 The IPython Development Team | |
MinRK
|
r3589 | # | |
# Distributed under the terms of the BSD License. The full license is in | |||
# the file COPYING, distributed as part of this software. | |||
#----------------------------------------------------------------------------- | |||
#----------------------------------------------------------------------------- | |||
# Imports | |||
#----------------------------------------------------------------------------- | |||
MinRK
|
r6462 | import sys | |
MinRK
|
r3639 | import time | |
MinRK
|
r6462 | from datetime import datetime | |
MinRK
|
r3639 | ||
MinRK
|
r3664 | from zmq import MessageTracker | |
MinRK
|
r6462 | from IPython.core.display import clear_output | |
MinRK
|
r3601 | from IPython.external.decorator import decorator | |
MinRK
|
r3673 | from IPython.parallel import error | |
MinRK
|
r3589 | ||
MinRK
|
r6462 | ||
MinRK
|
r3589 | #----------------------------------------------------------------------------- | |
# Classes | |||
#----------------------------------------------------------------------------- | |||
MinRK
|
r3664 | # global empty tracker that's always done: | |
finished_tracker = MessageTracker() | |||
MinRK
|
r3601 | @decorator | |
def check_ready(f, self, *args, **kwargs): | |||
"""Call spin() to sync state prior to calling the method.""" | |||
self.wait(0) | |||
if not self._ready: | |||
raise error.TimeoutError("result not ready") | |||
return f(self, *args, **kwargs) | |||
MinRK
|
r3589 | class AsyncResult(object): | |
"""Class for representing results of non-blocking calls. | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3644 | Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`. | |
MinRK
|
r3589 | """ | |
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3618 | msg_ids = None | |
MinRK
|
r3654 | _targets = None | |
_tracker = None | |||
MinRK
|
r3664 | _single_result = False | |
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3654 | def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None): | |
MinRK
|
r3627 | if isinstance(msg_ids, basestring): | |
MinRK
|
r3664 | # always a list | |
MinRK
|
r3627 | msg_ids = [msg_ids] | |
MinRK
|
r3664 | if tracker is None: | |
# default to always done | |||
tracker = finished_tracker | |||
self._client = client | |||
MinRK
|
r3592 | self.msg_ids = msg_ids | |
MinRK
|
r3596 | self._fname=fname | |
MinRK
|
r3654 | self._targets = targets | |
self._tracker = tracker | |||
MinRK
|
r3589 | self._ready = False | |
self._success = None | |||
MinRK
|
r5222 | self._metadata = None | |
MinRK
|
r3664 | if len(msg_ids) == 1: | |
self._single_result = not isinstance(targets, (list, tuple)) | |||
else: | |||
self._single_result = False | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3589 | def __repr__(self): | |
if self._ready: | |||
return "<%s: finished>"%(self.__class__.__name__) | |||
else: | |||
MinRK
|
r3596 | return "<%s: %s>"%(self.__class__.__name__,self._fname) | |
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3589 | def _reconstruct_result(self, res): | |
MinRK
|
r3644 | """Reconstruct our result from actual result list (always a list) | |
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3589 | Override me in subclasses for turning a list of results | |
into the expected form. | |||
""" | |||
MinRK
|
r3611 | if self._single_result: | |
MinRK
|
r3589 | return res[0] | |
else: | |||
return res | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3589 | def get(self, timeout=-1): | |
Bernardo B. Marques
|
r4872 | """Return the result when it arrives. | |
MinRK
|
r3589 | If `timeout` is not ``None`` and the result does not arrive within | |
`timeout` seconds then ``TimeoutError`` is raised. If the | |||
remote call raised an exception then that exception will be reraised | |||
MinRK
|
r3644 | by get() inside a `RemoteError`. | |
MinRK
|
r3589 | """ | |
if not self.ready(): | |||
self.wait(timeout) | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3589 | if self._ready: | |
if self._success: | |||
return self._result | |||
else: | |||
raise self._exception | |||
else: | |||
raise error.TimeoutError("Result not ready.") | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3589 | def ready(self): | |
"""Return whether the call has completed.""" | |||
if not self._ready: | |||
self.wait(0) | |||
return self._ready | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3589 | def wait(self, timeout=-1): | |
"""Wait until the result is available or until `timeout` seconds pass. | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3644 | This method always returns None. | |
MinRK
|
r3589 | """ | |
if self._ready: | |||
return | |||
MinRK
|
r3664 | self._ready = self._client.wait(self.msg_ids, timeout) | |
MinRK
|
r3589 | if self._ready: | |
try: | |||
MinRK
|
r3592 | results = map(self._client.results.get, self.msg_ids) | |
MinRK
|
r3601 | self._result = results | |
MinRK
|
r3611 | if self._single_result: | |
r = results[0] | |||
if isinstance(r, Exception): | |||
raise r | |||
else: | |||
results = error.collect_exceptions(results, self._fname) | |||
MinRK
|
r3589 | self._result = self._reconstruct_result(results) | |
except Exception, e: | |||
self._exception = e | |||
self._success = False | |||
else: | |||
self._success = True | |||
MinRK
|
r3601 | finally: | |
self._metadata = map(self._client.metadata.get, self.msg_ids) | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3589 | def successful(self): | |
Bernardo B. Marques
|
r4872 | """Return whether the call completed without raising an exception. | |
MinRK
|
r3589 | Will raise ``AssertionError`` if the result is not ready. | |
""" | |||
MinRK
|
r3644 | assert self.ready() | |
MinRK
|
r3589 | return self._success | |
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3601 | #---------------------------------------------------------------- | |
# Extra methods not in mp.pool.AsyncResult | |||
#---------------------------------------------------------------- | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3601 | def get_dict(self, timeout=-1): | |
MinRK
|
r3644 | """Get the results as a dict, keyed by engine_id. | |
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3644 | timeout behavior is described in `get()`. | |
""" | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3601 | results = self.get(timeout) | |
MinRK
|
r3607 | engine_ids = [ md['engine_id'] for md in self._metadata ] | |
MinRK
|
r3601 | bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k)) | |
maxcount = bycount.count(bycount[-1]) | |||
if maxcount > 1: | |||
raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%( | |||
maxcount, bycount[-1])) | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3601 | return dict(zip(engine_ids,results)) | |
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3601 | @property | |
def result(self): | |||
MinRK
|
r3644 | """result property wrapper for `get(timeout=0)`.""" | |
MinRK
|
r3664 | return self.get() | |
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3607 | # abbreviated alias: | |
r = result | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3601 | @property | |
@check_ready | |||
def metadata(self): | |||
MinRK
|
r3644 | """property for accessing execution metadata.""" | |
MinRK
|
r3611 | if self._single_result: | |
MinRK
|
r3607 | return self._metadata[0] | |
else: | |||
return self._metadata | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3601 | @property | |
def result_dict(self): | |||
"""result property as a dict.""" | |||
MinRK
|
r3664 | return self.get_dict() | |
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3602 | def __dict__(self): | |
return self.get_dict(0) | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3654 | def abort(self): | |
"""abort my tasks.""" | |||
assert not self.ready(), "Can't abort, I am already done!" | |||
MinRK
|
r6426 | return self._client.abort(self.msg_ids, targets=self._targets, block=True) | |
MinRK
|
r3654 | ||
@property | |||
def sent(self): | |||
MinRK
|
r3664 | """check whether my messages have been sent.""" | |
return self._tracker.done | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3664 | def wait_for_send(self, timeout=-1): | |
"""wait for pyzmq send to complete. | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3664 | This is necessary when sending arrays that you intend to edit in-place. | |
`timeout` is in seconds, and will raise TimeoutError if it is reached | |||
before the send completes. | |||
""" | |||
return self._tracker.wait(timeout) | |||
MinRK
|
r3602 | ||
MinRK
|
r3601 | #------------------------------------- | |
# dict-access | |||
#------------------------------------- | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3601 | @check_ready | |
def __getitem__(self, key): | |||
"""getitem returns result value(s) if keyed by int/slice, or metadata if key is str. | |||
""" | |||
if isinstance(key, int): | |||
return error.collect_exceptions([self._result[key]], self._fname)[0] | |||
elif isinstance(key, slice): | |||
return error.collect_exceptions(self._result[key], self._fname) | |||
elif isinstance(key, basestring): | |||
MinRK
|
r3607 | values = [ md[key] for md in self._metadata ] | |
MinRK
|
r3611 | if self._single_result: | |
MinRK
|
r3607 | return values[0] | |
else: | |||
return values | |||
MinRK
|
r3601 | else: | |
raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key)) | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3601 | def __getattr__(self, key): | |
MinRK
|
r3644 | """getattr maps to getitem for convenient attr access to metadata.""" | |
MinRK
|
r5222 | try: | |
return self.__getitem__(key) | |||
except (error.TimeoutError, KeyError): | |||
MinRK
|
r3601 | raise AttributeError("%r object has no attribute %r"%( | |
self.__class__.__name__, key)) | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3639 | # asynchronous iterator: | |
def __iter__(self): | |||
if self._single_result: | |||
raise TypeError("AsyncResults with a single result are not iterable.") | |||
try: | |||
rlist = self.get(0) | |||
except error.TimeoutError: | |||
# wait for each result individually | |||
for msg_id in self.msg_ids: | |||
ar = AsyncResult(self._client, msg_id, self._fname) | |||
yield ar.get() | |||
else: | |||
# already done | |||
for r in rlist: | |||
yield r | |||
MinRK
|
r6462 | ||
def __len__(self): | |||
return len(self.msg_ids) | |||
#------------------------------------- | |||
# Sugar methods and attributes | |||
#------------------------------------- | |||
@property | |||
def progress(self): | |||
"""the number of tasks which have been completed at this point. | |||
Fractional progress would be given by 1.0 * ar.progress / len(ar) | |||
""" | |||
self.wait(0) | |||
return len(self) - len(set(self.msg_ids).intersection(self._client.outstanding)) | |||
@property | |||
def elapsed(self): | |||
"""elapsed time since initial submission""" | |||
if self.ready(): | |||
return self.wall_time | |||
now = submitted = datetime.now() | |||
for msg_id in self.msg_ids: | |||
if msg_id in self._client.metadata: | |||
stamp = self._client.metadata[msg_id]['submitted'] | |||
if stamp and stamp < submitted: | |||
submitted = stamp | |||
return (now-submitted).total_seconds() | |||
@property | |||
@check_ready | |||
def serial_time(self): | |||
"""serial computation time of a parallel calculation | |||
Computed as the sum of (completed-started) of each task | |||
""" | |||
t = 0 | |||
for md in self._metadata: | |||
t += (md['completed'] - md['started']).total_seconds() | |||
return t | |||
@property | |||
@check_ready | |||
def wall_time(self): | |||
"""actual computation time of a parallel calculation | |||
Computed as the time between the latest `received` stamp | |||
and the earliest `submitted`. | |||
Only reliable if Client was spinning/waiting when the task finished, because | |||
the `received` timestamp is created when a result is pulled off of the zmq queue, | |||
which happens as a result of `client.spin()`. | |||
""" | |||
received = max([ md['received'] for md in self._metadata ]) | |||
submitted = min([ md['submitted'] for md in self._metadata ]) | |||
return (received - submitted).total_seconds() | |||
def wait_interactive(self, interval=1., timeout=None): | |||
"""interactive wait, printing progress at regular intervals""" | |||
N = len(self) | |||
tic = time.time() | |||
while not self.ready() and (timeout is None or time.time() - tic <= timeout): | |||
self.wait(interval) | |||
clear_output() | |||
print "%4i/%i tasks finished after %4i s" % (self.progress, N, self.elapsed), | |||
sys.stdout.flush() | |||
print "done" | |||
MinRK
|
r3589 | ||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3589 | class AsyncMapResult(AsyncResult): | |
"""Class for representing results of non-blocking gathers. | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3589 | This will properly reconstruct the gather. | |
MinRK
|
r5171 | ||
This class is iterable at any time, and will wait on results as they come. | |||
If ordered=False, then the first results to arrive will come first, otherwise | |||
results will be yielded in the order they were submitted. | |||
MinRK
|
r3589 | """ | |
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r5171 | def __init__(self, client, msg_ids, mapObject, fname='', ordered=True): | |
MinRK
|
r3596 | AsyncResult.__init__(self, client, msg_ids, fname=fname) | |
MinRK
|
r3607 | self._mapObject = mapObject | |
MinRK
|
r3611 | self._single_result = False | |
MinRK
|
r5171 | self.ordered = ordered | |
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3589 | def _reconstruct_result(self, res): | |
"""Perform the gather on the actual results.""" | |||
return self._mapObject.joinPartitions(res) | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3627 | # asynchronous iterator: | |
def __iter__(self): | |||
MinRK
|
r5171 | it = self._ordered_iter if self.ordered else self._unordered_iter | |
for r in it(): | |||
yield r | |||
# asynchronous ordered iterator: | |||
def _ordered_iter(self): | |||
"""iterator for results *as they arrive*, preserving submission order.""" | |||
MinRK
|
r3627 | try: | |
rlist = self.get(0) | |||
except error.TimeoutError: | |||
# wait for each result individually | |||
for msg_id in self.msg_ids: | |||
ar = AsyncResult(self._client, msg_id, self._fname) | |||
rlist = ar.get() | |||
try: | |||
for r in rlist: | |||
yield r | |||
except TypeError: | |||
# flattened, not a list | |||
# this could get broken by flattened data that returns iterables | |||
# but most calls to map do not expose the `flatten` argument | |||
yield rlist | |||
else: | |||
# already done | |||
for r in rlist: | |||
yield r | |||
MinRK
|
r3639 | ||
MinRK
|
r5171 | # asynchronous unordered iterator: | |
def _unordered_iter(self): | |||
"""iterator for results *as they arrive*, on FCFS basis, ignoring submission order.""" | |||
try: | |||
rlist = self.get(0) | |||
except error.TimeoutError: | |||
pending = set(self.msg_ids) | |||
while pending: | |||
try: | |||
self._client.wait(pending, 1e-3) | |||
except error.TimeoutError: | |||
# ignore timeout error, because that only means | |||
# *some* jobs are outstanding | |||
pass | |||
# update ready set with those no longer outstanding: | |||
ready = pending.difference(self._client.outstanding) | |||
# update pending to exclude those that are finished | |||
pending = pending.difference(ready) | |||
while ready: | |||
msg_id = ready.pop() | |||
ar = AsyncResult(self._client, msg_id, self._fname) | |||
rlist = ar.get() | |||
try: | |||
for r in rlist: | |||
yield r | |||
except TypeError: | |||
# flattened, not a list | |||
# this could get broken by flattened data that returns iterables | |||
# but most calls to map do not expose the `flatten` argument | |||
yield rlist | |||
else: | |||
# already done | |||
for r in rlist: | |||
yield r | |||
MinRK
|
r3639 | ||
class AsyncHubResult(AsyncResult): | |||
MinRK
|
r3644 | """Class to wrap pending results that must be requested from the Hub. | |
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3644 | Note that waiting/polling on these objects requires polling the Hubover the network, | |
so use `AsyncHubResult.wait()` sparingly. | |||
""" | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3639 | def wait(self, timeout=-1): | |
"""wait for result to complete.""" | |||
start = time.time() | |||
if self._ready: | |||
return | |||
local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids) | |||
MinRK
|
r3664 | local_ready = self._client.wait(local_ids, timeout) | |
MinRK
|
r3639 | if local_ready: | |
remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids) | |||
if not remote_ids: | |||
self._ready = True | |||
else: | |||
rdict = self._client.result_status(remote_ids, status_only=False) | |||
pending = rdict['pending'] | |||
MinRK
|
r3641 | while pending and (timeout < 0 or time.time() < start+timeout): | |
MinRK
|
r3639 | rdict = self._client.result_status(remote_ids, status_only=False) | |
pending = rdict['pending'] | |||
if pending: | |||
time.sleep(0.1) | |||
if not pending: | |||
self._ready = True | |||
if self._ready: | |||
try: | |||
results = map(self._client.results.get, self.msg_ids) | |||
self._result = results | |||
if self._single_result: | |||
r = results[0] | |||
if isinstance(r, Exception): | |||
raise r | |||
else: | |||
results = error.collect_exceptions(results, self._fname) | |||
self._result = self._reconstruct_result(results) | |||
except Exception, e: | |||
self._exception = e | |||
self._success = False | |||
else: | |||
self._success = True | |||
finally: | |||
self._metadata = map(self._client.metadata.get, self.msg_ids) | |||
Bernardo B. Marques
|
r4872 | ||
MinRK
|
r3639 | __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult'] |