asyncresult.py
708 lines
| 24.5 KiB
| text/x-python
|
PythonLexer
MinRK
|
r4018 | """AsyncResult objects for the client | ||
Authors: | ||||
* MinRK | ||||
""" | ||||
MinRK
|
r3589 | #----------------------------------------------------------------------------- | ||
MinRK
|
r3660 | # Copyright (C) 2010-2011 The IPython Development Team | ||
MinRK
|
r3589 | # | ||
# Distributed under the terms of the BSD License. The full license is in | ||||
# the file COPYING, distributed as part of this software. | ||||
#----------------------------------------------------------------------------- | ||||
#----------------------------------------------------------------------------- | ||||
# Imports | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r7239 | from __future__ import print_function | ||
MinRK
|
r6462 | import sys | ||
MinRK
|
r3639 | import time | ||
MinRK
|
r6462 | from datetime import datetime | ||
MinRK
|
r3639 | |||
MinRK
|
r3664 | from zmq import MessageTracker | ||
MinRK
|
r7239 | from IPython.core.display import clear_output, display, display_pretty | ||
MinRK
|
r3601 | from IPython.external.decorator import decorator | ||
MinRK
|
r3673 | from IPython.parallel import error | ||
MinRK
|
r3589 | |||
MinRK
|
r6497 | #----------------------------------------------------------------------------- | ||
# Functions | ||||
#----------------------------------------------------------------------------- | ||||
def _total_seconds(td): | ||||
"""timedelta.total_seconds was added in 2.7""" | ||||
try: | ||||
# Python >= 2.7 | ||||
return td.total_seconds() | ||||
except AttributeError: | ||||
# Python 2.6 | ||||
return 1e-6 * (td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6) | ||||
MinRK
|
r6462 | |||
MinRK
|
r7239 | def _raw_text(s): | ||
display_pretty(s, raw=True) | ||||
MinRK
|
r3589 | #----------------------------------------------------------------------------- | ||
# Classes | ||||
#----------------------------------------------------------------------------- | ||||
MinRK
|
r3664 | # global empty tracker that's always done: | ||
finished_tracker = MessageTracker() | ||||
MinRK
|
r3601 | @decorator | ||
def check_ready(f, self, *args, **kwargs): | ||||
"""Call spin() to sync state prior to calling the method.""" | ||||
self.wait(0) | ||||
if not self._ready: | ||||
raise error.TimeoutError("result not ready") | ||||
return f(self, *args, **kwargs) | ||||
MinRK
|
r3589 | class AsyncResult(object): | ||
"""Class for representing results of non-blocking calls. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3644 | Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`. | ||
MinRK
|
r3589 | """ | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3618 | msg_ids = None | ||
MinRK
|
r3654 | _targets = None | ||
_tracker = None | ||||
MinRK
|
r3664 | _single_result = False | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3654 | def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None): | ||
MinRK
|
r3627 | if isinstance(msg_ids, basestring): | ||
MinRK
|
r3664 | # always a list | ||
MinRK
|
r3627 | msg_ids = [msg_ids] | ||
MinRK
|
r3664 | if tracker is None: | ||
# default to always done | ||||
tracker = finished_tracker | ||||
self._client = client | ||||
MinRK
|
r3592 | self.msg_ids = msg_ids | ||
MinRK
|
r3596 | self._fname=fname | ||
MinRK
|
r3654 | self._targets = targets | ||
self._tracker = tracker | ||||
MinRK
|
r3589 | self._ready = False | ||
MinRK
|
r8140 | self._outputs_ready = False | ||
MinRK
|
r3589 | self._success = None | ||
MinRK
|
r8104 | self._metadata = [ self._client.metadata.get(id) for id in self.msg_ids ] | ||
MinRK
|
r3664 | if len(msg_ids) == 1: | ||
self._single_result = not isinstance(targets, (list, tuple)) | ||||
else: | ||||
self._single_result = False | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3589 | def __repr__(self): | ||
if self._ready: | ||||
return "<%s: finished>"%(self.__class__.__name__) | ||||
else: | ||||
MinRK
|
r3596 | return "<%s: %s>"%(self.__class__.__name__,self._fname) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3589 | def _reconstruct_result(self, res): | ||
MinRK
|
r3644 | """Reconstruct our result from actual result list (always a list) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3589 | Override me in subclasses for turning a list of results | ||
into the expected form. | ||||
""" | ||||
MinRK
|
r3611 | if self._single_result: | ||
MinRK
|
r3589 | return res[0] | ||
else: | ||||
return res | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3589 | def get(self, timeout=-1): | ||
Bernardo B. Marques
|
r4872 | """Return the result when it arrives. | ||
MinRK
|
r3589 | If `timeout` is not ``None`` and the result does not arrive within | ||
`timeout` seconds then ``TimeoutError`` is raised. If the | ||||
remote call raised an exception then that exception will be reraised | ||||
MinRK
|
r3644 | by get() inside a `RemoteError`. | ||
MinRK
|
r3589 | """ | ||
if not self.ready(): | ||||
self.wait(timeout) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3589 | if self._ready: | ||
if self._success: | ||||
return self._result | ||||
else: | ||||
raise self._exception | ||||
else: | ||||
raise error.TimeoutError("Result not ready.") | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r8104 | def _check_ready(self): | ||
if not self.ready(): | ||||
raise error.TimeoutError("Result not ready.") | ||||
MinRK
|
r3589 | def ready(self): | ||
"""Return whether the call has completed.""" | ||||
if not self._ready: | ||||
self.wait(0) | ||||
MinRK
|
r8140 | elif not self._outputs_ready: | ||
self._wait_for_outputs(0) | ||||
MinRK
|
r3589 | return self._ready | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3589 | def wait(self, timeout=-1): | ||
"""Wait until the result is available or until `timeout` seconds pass. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3644 | This method always returns None. | ||
MinRK
|
r3589 | """ | ||
if self._ready: | ||||
MinRK
|
r8140 | self._wait_for_outputs(timeout) | ||
MinRK
|
r3589 | return | ||
MinRK
|
r3664 | self._ready = self._client.wait(self.msg_ids, timeout) | ||
MinRK
|
r3589 | if self._ready: | ||
try: | ||||
MinRK
|
r3592 | results = map(self._client.results.get, self.msg_ids) | ||
MinRK
|
r3601 | self._result = results | ||
MinRK
|
r3611 | if self._single_result: | ||
r = results[0] | ||||
if isinstance(r, Exception): | ||||
raise r | ||||
else: | ||||
results = error.collect_exceptions(results, self._fname) | ||||
MinRK
|
r3589 | self._result = self._reconstruct_result(results) | ||
Matthias BUSSONNIER
|
r7787 | except Exception as e: | ||
MinRK
|
r3589 | self._exception = e | ||
self._success = False | ||||
else: | ||||
self._success = True | ||||
MinRK
|
r3601 | finally: | ||
MinRK
|
r8142 | if timeout is None or timeout < 0: | ||
# cutoff infinite wait at 10s | ||||
MinRK
|
r8140 | timeout = 10 | ||
self._wait_for_outputs(timeout) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3589 | def successful(self): | ||
Bernardo B. Marques
|
r4872 | """Return whether the call completed without raising an exception. | ||
MinRK
|
r3589 | Will raise ``AssertionError`` if the result is not ready. | ||
""" | ||||
MinRK
|
r3644 | assert self.ready() | ||
MinRK
|
r3589 | return self._success | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3601 | #---------------------------------------------------------------- | ||
# Extra methods not in mp.pool.AsyncResult | ||||
#---------------------------------------------------------------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3601 | def get_dict(self, timeout=-1): | ||
MinRK
|
r3644 | """Get the results as a dict, keyed by engine_id. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3644 | timeout behavior is described in `get()`. | ||
""" | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3601 | results = self.get(timeout) | ||
MinRK
|
r3607 | engine_ids = [ md['engine_id'] for md in self._metadata ] | ||
MinRK
|
r3601 | bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k)) | ||
maxcount = bycount.count(bycount[-1]) | ||||
if maxcount > 1: | ||||
raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%( | ||||
maxcount, bycount[-1])) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3601 | return dict(zip(engine_ids,results)) | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3601 | @property | ||
def result(self): | ||||
MinRK
|
r8104 | """result property wrapper for `get(timeout=-1)`.""" | ||
MinRK
|
r3664 | return self.get() | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3607 | # abbreviated alias: | ||
r = result | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3601 | @property | ||
def metadata(self): | ||||
MinRK
|
r3644 | """property for accessing execution metadata.""" | ||
MinRK
|
r3611 | if self._single_result: | ||
MinRK
|
r3607 | return self._metadata[0] | ||
else: | ||||
return self._metadata | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3601 | @property | ||
def result_dict(self): | ||||
"""result property as a dict.""" | ||||
MinRK
|
r3664 | return self.get_dict() | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3602 | def __dict__(self): | ||
return self.get_dict(0) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3654 | def abort(self): | ||
"""abort my tasks.""" | ||||
assert not self.ready(), "Can't abort, I am already done!" | ||||
MinRK
|
r6426 | return self._client.abort(self.msg_ids, targets=self._targets, block=True) | ||
MinRK
|
r3654 | |||
@property | ||||
def sent(self): | ||||
MinRK
|
r3664 | """check whether my messages have been sent.""" | ||
return self._tracker.done | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | def wait_for_send(self, timeout=-1): | ||
"""wait for pyzmq send to complete. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3664 | This is necessary when sending arrays that you intend to edit in-place. | ||
`timeout` is in seconds, and will raise TimeoutError if it is reached | ||||
before the send completes. | ||||
""" | ||||
return self._tracker.wait(timeout) | ||||
MinRK
|
r3602 | |||
MinRK
|
r3601 | #------------------------------------- | ||
# dict-access | ||||
#------------------------------------- | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3601 | def __getitem__(self, key): | ||
"""getitem returns result value(s) if keyed by int/slice, or metadata if key is str. | ||||
""" | ||||
if isinstance(key, int): | ||||
MinRK
|
r8104 | self._check_ready() | ||
MinRK
|
r3601 | return error.collect_exceptions([self._result[key]], self._fname)[0] | ||
elif isinstance(key, slice): | ||||
MinRK
|
r8104 | self._check_ready() | ||
MinRK
|
r3601 | return error.collect_exceptions(self._result[key], self._fname) | ||
elif isinstance(key, basestring): | ||||
MinRK
|
r8104 | # metadata proxy *does not* require that results are done | ||
MinRK
|
r8140 | self.wait(0) | ||
MinRK
|
r3607 | values = [ md[key] for md in self._metadata ] | ||
MinRK
|
r3611 | if self._single_result: | ||
MinRK
|
r3607 | return values[0] | ||
else: | ||||
return values | ||||
MinRK
|
r3601 | else: | ||
raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key)) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3601 | def __getattr__(self, key): | ||
MinRK
|
r3644 | """getattr maps to getitem for convenient attr access to metadata.""" | ||
MinRK
|
r5222 | try: | ||
return self.__getitem__(key) | ||||
except (error.TimeoutError, KeyError): | ||||
MinRK
|
r3601 | raise AttributeError("%r object has no attribute %r"%( | ||
self.__class__.__name__, key)) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3639 | # asynchronous iterator: | ||
def __iter__(self): | ||||
if self._single_result: | ||||
raise TypeError("AsyncResults with a single result are not iterable.") | ||||
try: | ||||
rlist = self.get(0) | ||||
except error.TimeoutError: | ||||
# wait for each result individually | ||||
for msg_id in self.msg_ids: | ||||
ar = AsyncResult(self._client, msg_id, self._fname) | ||||
yield ar.get() | ||||
else: | ||||
# already done | ||||
for r in rlist: | ||||
yield r | ||||
MinRK
|
r6462 | |||
def __len__(self): | ||||
return len(self.msg_ids) | ||||
#------------------------------------- | ||||
# Sugar methods and attributes | ||||
#------------------------------------- | ||||
MinRK
|
r6470 | def timedelta(self, start, end, start_key=min, end_key=max): | ||
"""compute the difference between two sets of timestamps | ||||
The default behavior is to use the earliest of the first | ||||
and the latest of the second list, but this can be changed | ||||
by passing a different | ||||
Parameters | ||||
---------- | ||||
start : one or more datetime objects (e.g. ar.submitted) | ||||
end : one or more datetime objects (e.g. ar.received) | ||||
start_key : callable | ||||
Function to call on `start` to extract the relevant | ||||
entry [defalt: min] | ||||
end_key : callable | ||||
Function to call on `end` to extract the relevant | ||||
entry [default: max] | ||||
Returns | ||||
------- | ||||
dt : float | ||||
The time elapsed (in seconds) between the two selected timestamps. | ||||
""" | ||||
if not isinstance(start, datetime): | ||||
# handle single_result AsyncResults, where ar.stamp is single object, | ||||
# not a list | ||||
start = start_key(start) | ||||
if not isinstance(end, datetime): | ||||
# handle single_result AsyncResults, where ar.stamp is single object, | ||||
# not a list | ||||
end = end_key(end) | ||||
MinRK
|
r6497 | return _total_seconds(end - start) | ||
MinRK
|
r6470 | |||
MinRK
|
r6462 | @property | ||
def progress(self): | ||||
"""the number of tasks which have been completed at this point. | ||||
Fractional progress would be given by 1.0 * ar.progress / len(ar) | ||||
""" | ||||
self.wait(0) | ||||
return len(self) - len(set(self.msg_ids).intersection(self._client.outstanding)) | ||||
@property | ||||
def elapsed(self): | ||||
"""elapsed time since initial submission""" | ||||
if self.ready(): | ||||
return self.wall_time | ||||
now = submitted = datetime.now() | ||||
for msg_id in self.msg_ids: | ||||
if msg_id in self._client.metadata: | ||||
stamp = self._client.metadata[msg_id]['submitted'] | ||||
if stamp and stamp < submitted: | ||||
submitted = stamp | ||||
MinRK
|
r6497 | return _total_seconds(now-submitted) | ||
MinRK
|
r6462 | |||
@property | ||||
@check_ready | ||||
def serial_time(self): | ||||
"""serial computation time of a parallel calculation | ||||
Computed as the sum of (completed-started) of each task | ||||
""" | ||||
t = 0 | ||||
for md in self._metadata: | ||||
MinRK
|
r6497 | t += _total_seconds(md['completed'] - md['started']) | ||
MinRK
|
r6462 | return t | ||
@property | ||||
@check_ready | ||||
def wall_time(self): | ||||
"""actual computation time of a parallel calculation | ||||
Computed as the time between the latest `received` stamp | ||||
and the earliest `submitted`. | ||||
Only reliable if Client was spinning/waiting when the task finished, because | ||||
the `received` timestamp is created when a result is pulled off of the zmq queue, | ||||
which happens as a result of `client.spin()`. | ||||
MinRK
|
r6470 | |||
For similar comparison of other timestamp pairs, check out AsyncResult.timedelta. | ||||
MinRK
|
r6462 | """ | ||
MinRK
|
r6470 | return self.timedelta(self.submitted, self.received) | ||
MinRK
|
r6462 | |||
MinRK
|
r8140 | def wait_interactive(self, interval=1., timeout=-1): | ||
MinRK
|
r6462 | """interactive wait, printing progress at regular intervals""" | ||
MinRK
|
r8140 | if timeout is None: | ||
timeout = -1 | ||||
MinRK
|
r6462 | N = len(self) | ||
tic = time.time() | ||||
MinRK
|
r8140 | while not self.ready() and (timeout < 0 or time.time() - tic <= timeout): | ||
MinRK
|
r6462 | self.wait(interval) | ||
clear_output() | ||||
MinRK
|
r7239 | print("%4i/%i tasks finished after %4i s" % (self.progress, N, self.elapsed), end="") | ||
MinRK
|
r6462 | sys.stdout.flush() | ||
MinRK
|
r7239 | print() | ||
print("done") | ||||
MinRK
|
r7040 | |||
def _republish_displaypub(self, content, eid): | ||||
"""republish individual displaypub content dicts""" | ||||
try: | ||||
ip = get_ipython() | ||||
except NameError: | ||||
# displaypub is meaningless outside IPython | ||||
return | ||||
md = content['metadata'] or {} | ||||
md['engine'] = eid | ||||
ip.display_pub.publish(content['source'], content['data'], md) | ||||
MinRK
|
r7239 | |||
def _display_stream(self, text, prefix='', file=None): | ||||
if not text: | ||||
# nothing to display | ||||
return | ||||
if file is None: | ||||
file = sys.stdout | ||||
end = '' if text.endswith('\n') else '\n' | ||||
MinRK
|
r7040 | |||
MinRK
|
r7239 | multiline = text.count('\n') > int(text.endswith('\n')) | ||
if prefix and multiline and not text.startswith('\n'): | ||||
prefix = prefix + '\n' | ||||
print("%s%s" % (prefix, text), file=file, end=end) | ||||
MinRK
|
r7040 | |||
MinRK
|
r7239 | |||
MinRK
|
r7040 | def _display_single_result(self): | ||
MinRK
|
r7239 | self._display_stream(self.stdout) | ||
self._display_stream(self.stderr, file=sys.stderr) | ||||
MinRK
|
r7040 | |||
try: | ||||
get_ipython() | ||||
except NameError: | ||||
# displaypub is meaningless outside IPython | ||||
return | ||||
MinRK
|
r7239 | |||
MinRK
|
r7040 | for output in self.outputs: | ||
self._republish_displaypub(output, self.engine_id) | ||||
if self.pyout is not None: | ||||
display(self.get()) | ||||
MinRK
|
r7494 | def _wait_for_outputs(self, timeout=-1): | ||
"""wait for the 'status=idle' message that indicates we have all outputs | ||||
""" | ||||
MinRK
|
r8140 | if self._outputs_ready or not self._success: | ||
MinRK
|
r7494 | # don't wait on errors | ||
return | ||||
MinRK
|
r8140 | |||
# cast None to -1 for infinite timeout | ||||
if timeout is None: | ||||
timeout = -1 | ||||
MinRK
|
r7494 | tic = time.time() | ||
MinRK
|
r8140 | self._client._flush_iopub(self._client._iopub_socket) | ||
self._outputs_ready = all(md['outputs_ready'] for md in self._metadata) | ||||
while not self._outputs_ready: | ||||
MinRK
|
r7494 | time.sleep(0.01) | ||
self._client._flush_iopub(self._client._iopub_socket) | ||||
MinRK
|
r8140 | self._outputs_ready = all(md['outputs_ready'] for md in self._metadata) | ||
MinRK
|
r7494 | if timeout >= 0 and time.time() > tic + timeout: | ||
break | ||||
MinRK
|
r7040 | @check_ready | ||
def display_outputs(self, groupby="type"): | ||||
"""republish the outputs of the computation | ||||
Parameters | ||||
---------- | ||||
groupby : str [default: type] | ||||
if 'type': | ||||
Group outputs by type (show all stdout, then all stderr, etc.): | ||||
[stdout:1] foo | ||||
[stdout:2] foo | ||||
[stderr:1] bar | ||||
[stderr:2] bar | ||||
if 'engine': | ||||
Display outputs for each engine before moving on to the next: | ||||
[stdout:1] foo | ||||
[stderr:1] bar | ||||
[stdout:2] foo | ||||
[stderr:2] bar | ||||
if 'order': | ||||
Like 'type', but further collate individual displaypub | ||||
outputs. This is meant for cases of each command producing | ||||
several plots, and you would like to see all of the first | ||||
plots together, then all of the second plots, and so on. | ||||
""" | ||||
if self._single_result: | ||||
self._display_single_result() | ||||
return | ||||
MinRK
|
r7239 | stdouts = self.stdout | ||
stderrs = self.stderr | ||||
pyouts = self.pyout | ||||
MinRK
|
r7040 | output_lists = self.outputs | ||
results = self.get() | ||||
targets = self.engine_id | ||||
if groupby == "engine": | ||||
for eid,stdout,stderr,outputs,r,pyout in zip( | ||||
targets, stdouts, stderrs, output_lists, results, pyouts | ||||
): | ||||
MinRK
|
r7239 | self._display_stream(stdout, '[stdout:%i] ' % eid) | ||
self._display_stream(stderr, '[stderr:%i] ' % eid, file=sys.stderr) | ||||
MinRK
|
r7040 | |||
try: | ||||
get_ipython() | ||||
except NameError: | ||||
# displaypub is meaningless outside IPython | ||||
return | ||||
MinRK
|
r7239 | if outputs or pyout is not None: | ||
_raw_text('[output:%i]' % eid) | ||||
MinRK
|
r7040 | for output in outputs: | ||
self._republish_displaypub(output, eid) | ||||
if pyout is not None: | ||||
display(r) | ||||
elif groupby in ('type', 'order'): | ||||
# republish stdout: | ||||
MinRK
|
r7239 | for eid,stdout in zip(targets, stdouts): | ||
self._display_stream(stdout, '[stdout:%i] ' % eid) | ||||
MinRK
|
r7040 | |||
# republish stderr: | ||||
MinRK
|
r7239 | for eid,stderr in zip(targets, stderrs): | ||
self._display_stream(stderr, '[stderr:%i] ' % eid, file=sys.stderr) | ||||
MinRK
|
r7040 | |||
try: | ||||
get_ipython() | ||||
except NameError: | ||||
# displaypub is meaningless outside IPython | ||||
return | ||||
if groupby == 'order': | ||||
output_dict = dict((eid, outputs) for eid,outputs in zip(targets, output_lists)) | ||||
N = max(len(outputs) for outputs in output_lists) | ||||
for i in range(N): | ||||
for eid in targets: | ||||
outputs = output_dict[eid] | ||||
if len(outputs) >= N: | ||||
MinRK
|
r7239 | _raw_text('[output:%i]' % eid) | ||
MinRK
|
r7040 | self._republish_displaypub(outputs[i], eid) | ||
else: | ||||
# republish displaypub output | ||||
for eid,outputs in zip(targets, output_lists): | ||||
MinRK
|
r7239 | if outputs: | ||
_raw_text('[output:%i]' % eid) | ||||
MinRK
|
r7040 | for output in outputs: | ||
self._republish_displaypub(output, eid) | ||||
# finally, add pyout: | ||||
for eid,r,pyout in zip(targets, results, pyouts): | ||||
if pyout is not None: | ||||
display(r) | ||||
else: | ||||
raise ValueError("groupby must be one of 'type', 'engine', 'collate', not %r" % groupby) | ||||
MinRK
|
r3589 | |||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3589 | class AsyncMapResult(AsyncResult): | ||
"""Class for representing results of non-blocking gathers. | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3589 | This will properly reconstruct the gather. | ||
MinRK
|
r5171 | |||
This class is iterable at any time, and will wait on results as they come. | ||||
If ordered=False, then the first results to arrive will come first, otherwise | ||||
results will be yielded in the order they were submitted. | ||||
MinRK
|
r3589 | """ | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r5171 | def __init__(self, client, msg_ids, mapObject, fname='', ordered=True): | ||
MinRK
|
r3596 | AsyncResult.__init__(self, client, msg_ids, fname=fname) | ||
MinRK
|
r3607 | self._mapObject = mapObject | ||
MinRK
|
r3611 | self._single_result = False | ||
MinRK
|
r5171 | self.ordered = ordered | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3589 | def _reconstruct_result(self, res): | ||
"""Perform the gather on the actual results.""" | ||||
return self._mapObject.joinPartitions(res) | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3627 | # asynchronous iterator: | ||
def __iter__(self): | ||||
MinRK
|
r5171 | it = self._ordered_iter if self.ordered else self._unordered_iter | ||
for r in it(): | ||||
yield r | ||||
# asynchronous ordered iterator: | ||||
def _ordered_iter(self): | ||||
"""iterator for results *as they arrive*, preserving submission order.""" | ||||
MinRK
|
r3627 | try: | ||
rlist = self.get(0) | ||||
except error.TimeoutError: | ||||
# wait for each result individually | ||||
for msg_id in self.msg_ids: | ||||
ar = AsyncResult(self._client, msg_id, self._fname) | ||||
rlist = ar.get() | ||||
try: | ||||
for r in rlist: | ||||
yield r | ||||
except TypeError: | ||||
# flattened, not a list | ||||
# this could get broken by flattened data that returns iterables | ||||
# but most calls to map do not expose the `flatten` argument | ||||
yield rlist | ||||
else: | ||||
# already done | ||||
for r in rlist: | ||||
yield r | ||||
MinRK
|
r3639 | |||
MinRK
|
r5171 | # asynchronous unordered iterator: | ||
def _unordered_iter(self): | ||||
"""iterator for results *as they arrive*, on FCFS basis, ignoring submission order.""" | ||||
try: | ||||
rlist = self.get(0) | ||||
except error.TimeoutError: | ||||
pending = set(self.msg_ids) | ||||
while pending: | ||||
try: | ||||
self._client.wait(pending, 1e-3) | ||||
except error.TimeoutError: | ||||
# ignore timeout error, because that only means | ||||
# *some* jobs are outstanding | ||||
pass | ||||
# update ready set with those no longer outstanding: | ||||
ready = pending.difference(self._client.outstanding) | ||||
# update pending to exclude those that are finished | ||||
pending = pending.difference(ready) | ||||
while ready: | ||||
msg_id = ready.pop() | ||||
ar = AsyncResult(self._client, msg_id, self._fname) | ||||
rlist = ar.get() | ||||
try: | ||||
for r in rlist: | ||||
yield r | ||||
except TypeError: | ||||
# flattened, not a list | ||||
# this could get broken by flattened data that returns iterables | ||||
# but most calls to map do not expose the `flatten` argument | ||||
yield rlist | ||||
else: | ||||
# already done | ||||
for r in rlist: | ||||
yield r | ||||
MinRK
|
r3639 | class AsyncHubResult(AsyncResult): | ||
MinRK
|
r3644 | """Class to wrap pending results that must be requested from the Hub. | ||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r3644 | Note that waiting/polling on these objects requires polling the Hubover the network, | ||
so use `AsyncHubResult.wait()` sparingly. | ||||
""" | ||||
Bernardo B. Marques
|
r4872 | |||
MinRK
|
r8140 | def _wait_for_outputs(self, timeout=-1): | ||
MinRK
|
r7494 | """no-op, because HubResults are never incomplete""" | ||
MinRK
|
r8140 | self._outputs_ready = True | ||
MinRK
|
r7494 | |||
MinRK
|
r3639 | def wait(self, timeout=-1): | ||
"""wait for result to complete.""" | ||||
start = time.time() | ||||
if self._ready: | ||||
return | ||||
local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids) | ||||
MinRK
|
r3664 | local_ready = self._client.wait(local_ids, timeout) | ||
MinRK
|
r3639 | if local_ready: | ||
remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids) | ||||
if not remote_ids: | ||||
self._ready = True | ||||
else: | ||||
rdict = self._client.result_status(remote_ids, status_only=False) | ||||
pending = rdict['pending'] | ||||
MinRK
|
r3641 | while pending and (timeout < 0 or time.time() < start+timeout): | ||
MinRK
|
r3639 | rdict = self._client.result_status(remote_ids, status_only=False) | ||
pending = rdict['pending'] | ||||
if pending: | ||||
time.sleep(0.1) | ||||
if not pending: | ||||
self._ready = True | ||||
if self._ready: | ||||
try: | ||||
results = map(self._client.results.get, self.msg_ids) | ||||
self._result = results | ||||
if self._single_result: | ||||
r = results[0] | ||||
if isinstance(r, Exception): | ||||
raise r | ||||
else: | ||||
results = error.collect_exceptions(results, self._fname) | ||||
self._result = self._reconstruct_result(results) | ||||
Matthias BUSSONNIER
|
r7787 | except Exception as e: | ||
MinRK
|
r3639 | self._exception = e | ||
self._success = False | ||||
else: | ||||
self._success = True | ||||
finally: | ||||
self._metadata = map(self._client.metadata.get, self.msg_ids) | ||||
Bernardo B. Marques
|
r4872 | |||
Matthias BUSSONNIER
|
r7787 | __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult'] | ||