|
|
"""AsyncResult objects for the client
|
|
|
|
|
|
Authors:
|
|
|
|
|
|
* MinRK
|
|
|
"""
|
|
|
#-----------------------------------------------------------------------------
|
|
|
# Copyright (C) 2010-2011 The IPython Development Team
|
|
|
#
|
|
|
# Distributed under the terms of the BSD License. The full license is in
|
|
|
# the file COPYING, distributed as part of this software.
|
|
|
#-----------------------------------------------------------------------------
|
|
|
|
|
|
#-----------------------------------------------------------------------------
|
|
|
# Imports
|
|
|
#-----------------------------------------------------------------------------
|
|
|
|
|
|
import sys
|
|
|
import time
|
|
|
from datetime import datetime
|
|
|
|
|
|
from zmq import MessageTracker
|
|
|
|
|
|
from IPython.core.display import clear_output
|
|
|
from IPython.external.decorator import decorator
|
|
|
from IPython.parallel import error
|
|
|
|
|
|
|
|
|
#-----------------------------------------------------------------------------
|
|
|
# Classes
|
|
|
#-----------------------------------------------------------------------------
|
|
|
|
|
|
# global empty tracker that's always done:
|
|
|
finished_tracker = MessageTracker()
|
|
|
|
|
|
@decorator
|
|
|
def check_ready(f, self, *args, **kwargs):
|
|
|
"""Call spin() to sync state prior to calling the method."""
|
|
|
self.wait(0)
|
|
|
if not self._ready:
|
|
|
raise error.TimeoutError("result not ready")
|
|
|
return f(self, *args, **kwargs)
|
|
|
|
|
|
class AsyncResult(object):
|
|
|
"""Class for representing results of non-blocking calls.
|
|
|
|
|
|
Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`.
|
|
|
"""
|
|
|
|
|
|
msg_ids = None
|
|
|
_targets = None
|
|
|
_tracker = None
|
|
|
_single_result = False
|
|
|
|
|
|
def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None):
|
|
|
if isinstance(msg_ids, basestring):
|
|
|
# always a list
|
|
|
msg_ids = [msg_ids]
|
|
|
if tracker is None:
|
|
|
# default to always done
|
|
|
tracker = finished_tracker
|
|
|
self._client = client
|
|
|
self.msg_ids = msg_ids
|
|
|
self._fname=fname
|
|
|
self._targets = targets
|
|
|
self._tracker = tracker
|
|
|
self._ready = False
|
|
|
self._success = None
|
|
|
self._metadata = None
|
|
|
if len(msg_ids) == 1:
|
|
|
self._single_result = not isinstance(targets, (list, tuple))
|
|
|
else:
|
|
|
self._single_result = False
|
|
|
|
|
|
def __repr__(self):
|
|
|
if self._ready:
|
|
|
return "<%s: finished>"%(self.__class__.__name__)
|
|
|
else:
|
|
|
return "<%s: %s>"%(self.__class__.__name__,self._fname)
|
|
|
|
|
|
|
|
|
def _reconstruct_result(self, res):
|
|
|
"""Reconstruct our result from actual result list (always a list)
|
|
|
|
|
|
Override me in subclasses for turning a list of results
|
|
|
into the expected form.
|
|
|
"""
|
|
|
if self._single_result:
|
|
|
return res[0]
|
|
|
else:
|
|
|
return res
|
|
|
|
|
|
def get(self, timeout=-1):
|
|
|
"""Return the result when it arrives.
|
|
|
|
|
|
If `timeout` is not ``None`` and the result does not arrive within
|
|
|
`timeout` seconds then ``TimeoutError`` is raised. If the
|
|
|
remote call raised an exception then that exception will be reraised
|
|
|
by get() inside a `RemoteError`.
|
|
|
"""
|
|
|
if not self.ready():
|
|
|
self.wait(timeout)
|
|
|
|
|
|
if self._ready:
|
|
|
if self._success:
|
|
|
return self._result
|
|
|
else:
|
|
|
raise self._exception
|
|
|
else:
|
|
|
raise error.TimeoutError("Result not ready.")
|
|
|
|
|
|
def ready(self):
|
|
|
"""Return whether the call has completed."""
|
|
|
if not self._ready:
|
|
|
self.wait(0)
|
|
|
return self._ready
|
|
|
|
|
|
def wait(self, timeout=-1):
|
|
|
"""Wait until the result is available or until `timeout` seconds pass.
|
|
|
|
|
|
This method always returns None.
|
|
|
"""
|
|
|
if self._ready:
|
|
|
return
|
|
|
self._ready = self._client.wait(self.msg_ids, timeout)
|
|
|
if self._ready:
|
|
|
try:
|
|
|
results = map(self._client.results.get, self.msg_ids)
|
|
|
self._result = results
|
|
|
if self._single_result:
|
|
|
r = results[0]
|
|
|
if isinstance(r, Exception):
|
|
|
raise r
|
|
|
else:
|
|
|
results = error.collect_exceptions(results, self._fname)
|
|
|
self._result = self._reconstruct_result(results)
|
|
|
except Exception, e:
|
|
|
self._exception = e
|
|
|
self._success = False
|
|
|
else:
|
|
|
self._success = True
|
|
|
finally:
|
|
|
self._metadata = map(self._client.metadata.get, self.msg_ids)
|
|
|
|
|
|
|
|
|
def successful(self):
|
|
|
"""Return whether the call completed without raising an exception.
|
|
|
|
|
|
Will raise ``AssertionError`` if the result is not ready.
|
|
|
"""
|
|
|
assert self.ready()
|
|
|
return self._success
|
|
|
|
|
|
#----------------------------------------------------------------
|
|
|
# Extra methods not in mp.pool.AsyncResult
|
|
|
#----------------------------------------------------------------
|
|
|
|
|
|
def get_dict(self, timeout=-1):
|
|
|
"""Get the results as a dict, keyed by engine_id.
|
|
|
|
|
|
timeout behavior is described in `get()`.
|
|
|
"""
|
|
|
|
|
|
results = self.get(timeout)
|
|
|
engine_ids = [ md['engine_id'] for md in self._metadata ]
|
|
|
bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
|
|
|
maxcount = bycount.count(bycount[-1])
|
|
|
if maxcount > 1:
|
|
|
raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
|
|
|
maxcount, bycount[-1]))
|
|
|
|
|
|
return dict(zip(engine_ids,results))
|
|
|
|
|
|
@property
|
|
|
def result(self):
|
|
|
"""result property wrapper for `get(timeout=0)`."""
|
|
|
return self.get()
|
|
|
|
|
|
# abbreviated alias:
|
|
|
r = result
|
|
|
|
|
|
@property
|
|
|
@check_ready
|
|
|
def metadata(self):
|
|
|
"""property for accessing execution metadata."""
|
|
|
if self._single_result:
|
|
|
return self._metadata[0]
|
|
|
else:
|
|
|
return self._metadata
|
|
|
|
|
|
@property
|
|
|
def result_dict(self):
|
|
|
"""result property as a dict."""
|
|
|
return self.get_dict()
|
|
|
|
|
|
def __dict__(self):
|
|
|
return self.get_dict(0)
|
|
|
|
|
|
def abort(self):
|
|
|
"""abort my tasks."""
|
|
|
assert not self.ready(), "Can't abort, I am already done!"
|
|
|
return self._client.abort(self.msg_ids, targets=self._targets, block=True)
|
|
|
|
|
|
@property
|
|
|
def sent(self):
|
|
|
"""check whether my messages have been sent."""
|
|
|
return self._tracker.done
|
|
|
|
|
|
def wait_for_send(self, timeout=-1):
|
|
|
"""wait for pyzmq send to complete.
|
|
|
|
|
|
This is necessary when sending arrays that you intend to edit in-place.
|
|
|
`timeout` is in seconds, and will raise TimeoutError if it is reached
|
|
|
before the send completes.
|
|
|
"""
|
|
|
return self._tracker.wait(timeout)
|
|
|
|
|
|
#-------------------------------------
|
|
|
# dict-access
|
|
|
#-------------------------------------
|
|
|
|
|
|
@check_ready
|
|
|
def __getitem__(self, key):
|
|
|
"""getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
|
|
|
"""
|
|
|
if isinstance(key, int):
|
|
|
return error.collect_exceptions([self._result[key]], self._fname)[0]
|
|
|
elif isinstance(key, slice):
|
|
|
return error.collect_exceptions(self._result[key], self._fname)
|
|
|
elif isinstance(key, basestring):
|
|
|
values = [ md[key] for md in self._metadata ]
|
|
|
if self._single_result:
|
|
|
return values[0]
|
|
|
else:
|
|
|
return values
|
|
|
else:
|
|
|
raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
|
|
|
|
|
|
def __getattr__(self, key):
|
|
|
"""getattr maps to getitem for convenient attr access to metadata."""
|
|
|
try:
|
|
|
return self.__getitem__(key)
|
|
|
except (error.TimeoutError, KeyError):
|
|
|
raise AttributeError("%r object has no attribute %r"%(
|
|
|
self.__class__.__name__, key))
|
|
|
|
|
|
# asynchronous iterator:
|
|
|
def __iter__(self):
|
|
|
if self._single_result:
|
|
|
raise TypeError("AsyncResults with a single result are not iterable.")
|
|
|
try:
|
|
|
rlist = self.get(0)
|
|
|
except error.TimeoutError:
|
|
|
# wait for each result individually
|
|
|
for msg_id in self.msg_ids:
|
|
|
ar = AsyncResult(self._client, msg_id, self._fname)
|
|
|
yield ar.get()
|
|
|
else:
|
|
|
# already done
|
|
|
for r in rlist:
|
|
|
yield r
|
|
|
|
|
|
def __len__(self):
|
|
|
return len(self.msg_ids)
|
|
|
|
|
|
#-------------------------------------
|
|
|
# Sugar methods and attributes
|
|
|
#-------------------------------------
|
|
|
|
|
|
@property
|
|
|
def progress(self):
|
|
|
"""the number of tasks which have been completed at this point.
|
|
|
|
|
|
Fractional progress would be given by 1.0 * ar.progress / len(ar)
|
|
|
"""
|
|
|
self.wait(0)
|
|
|
return len(self) - len(set(self.msg_ids).intersection(self._client.outstanding))
|
|
|
|
|
|
@property
|
|
|
def elapsed(self):
|
|
|
"""elapsed time since initial submission"""
|
|
|
if self.ready():
|
|
|
return self.wall_time
|
|
|
|
|
|
now = submitted = datetime.now()
|
|
|
for msg_id in self.msg_ids:
|
|
|
if msg_id in self._client.metadata:
|
|
|
stamp = self._client.metadata[msg_id]['submitted']
|
|
|
if stamp and stamp < submitted:
|
|
|
submitted = stamp
|
|
|
return (now-submitted).total_seconds()
|
|
|
|
|
|
@property
|
|
|
@check_ready
|
|
|
def serial_time(self):
|
|
|
"""serial computation time of a parallel calculation
|
|
|
|
|
|
Computed as the sum of (completed-started) of each task
|
|
|
"""
|
|
|
t = 0
|
|
|
for md in self._metadata:
|
|
|
t += (md['completed'] - md['started']).total_seconds()
|
|
|
return t
|
|
|
|
|
|
@property
|
|
|
@check_ready
|
|
|
def wall_time(self):
|
|
|
"""actual computation time of a parallel calculation
|
|
|
|
|
|
Computed as the time between the latest `received` stamp
|
|
|
and the earliest `submitted`.
|
|
|
|
|
|
Only reliable if Client was spinning/waiting when the task finished, because
|
|
|
the `received` timestamp is created when a result is pulled off of the zmq queue,
|
|
|
which happens as a result of `client.spin()`.
|
|
|
"""
|
|
|
received = max([ md['received'] for md in self._metadata ])
|
|
|
submitted = min([ md['submitted'] for md in self._metadata ])
|
|
|
return (received - submitted).total_seconds()
|
|
|
|
|
|
def wait_interactive(self, interval=1., timeout=None):
|
|
|
"""interactive wait, printing progress at regular intervals"""
|
|
|
N = len(self)
|
|
|
tic = time.time()
|
|
|
while not self.ready() and (timeout is None or time.time() - tic <= timeout):
|
|
|
self.wait(interval)
|
|
|
clear_output()
|
|
|
print "%4i/%i tasks finished after %4i s" % (self.progress, N, self.elapsed),
|
|
|
sys.stdout.flush()
|
|
|
print
|
|
|
print "done"
|
|
|
|
|
|
|
|
|
class AsyncMapResult(AsyncResult):
|
|
|
"""Class for representing results of non-blocking gathers.
|
|
|
|
|
|
This will properly reconstruct the gather.
|
|
|
|
|
|
This class is iterable at any time, and will wait on results as they come.
|
|
|
|
|
|
If ordered=False, then the first results to arrive will come first, otherwise
|
|
|
results will be yielded in the order they were submitted.
|
|
|
|
|
|
"""
|
|
|
|
|
|
def __init__(self, client, msg_ids, mapObject, fname='', ordered=True):
|
|
|
AsyncResult.__init__(self, client, msg_ids, fname=fname)
|
|
|
self._mapObject = mapObject
|
|
|
self._single_result = False
|
|
|
self.ordered = ordered
|
|
|
|
|
|
def _reconstruct_result(self, res):
|
|
|
"""Perform the gather on the actual results."""
|
|
|
return self._mapObject.joinPartitions(res)
|
|
|
|
|
|
# asynchronous iterator:
|
|
|
def __iter__(self):
|
|
|
it = self._ordered_iter if self.ordered else self._unordered_iter
|
|
|
for r in it():
|
|
|
yield r
|
|
|
|
|
|
# asynchronous ordered iterator:
|
|
|
def _ordered_iter(self):
|
|
|
"""iterator for results *as they arrive*, preserving submission order."""
|
|
|
try:
|
|
|
rlist = self.get(0)
|
|
|
except error.TimeoutError:
|
|
|
# wait for each result individually
|
|
|
for msg_id in self.msg_ids:
|
|
|
ar = AsyncResult(self._client, msg_id, self._fname)
|
|
|
rlist = ar.get()
|
|
|
try:
|
|
|
for r in rlist:
|
|
|
yield r
|
|
|
except TypeError:
|
|
|
# flattened, not a list
|
|
|
# this could get broken by flattened data that returns iterables
|
|
|
# but most calls to map do not expose the `flatten` argument
|
|
|
yield rlist
|
|
|
else:
|
|
|
# already done
|
|
|
for r in rlist:
|
|
|
yield r
|
|
|
|
|
|
# asynchronous unordered iterator:
|
|
|
def _unordered_iter(self):
|
|
|
"""iterator for results *as they arrive*, on FCFS basis, ignoring submission order."""
|
|
|
try:
|
|
|
rlist = self.get(0)
|
|
|
except error.TimeoutError:
|
|
|
pending = set(self.msg_ids)
|
|
|
while pending:
|
|
|
try:
|
|
|
self._client.wait(pending, 1e-3)
|
|
|
except error.TimeoutError:
|
|
|
# ignore timeout error, because that only means
|
|
|
# *some* jobs are outstanding
|
|
|
pass
|
|
|
# update ready set with those no longer outstanding:
|
|
|
ready = pending.difference(self._client.outstanding)
|
|
|
# update pending to exclude those that are finished
|
|
|
pending = pending.difference(ready)
|
|
|
while ready:
|
|
|
msg_id = ready.pop()
|
|
|
ar = AsyncResult(self._client, msg_id, self._fname)
|
|
|
rlist = ar.get()
|
|
|
try:
|
|
|
for r in rlist:
|
|
|
yield r
|
|
|
except TypeError:
|
|
|
# flattened, not a list
|
|
|
# this could get broken by flattened data that returns iterables
|
|
|
# but most calls to map do not expose the `flatten` argument
|
|
|
yield rlist
|
|
|
else:
|
|
|
# already done
|
|
|
for r in rlist:
|
|
|
yield r
|
|
|
|
|
|
|
|
|
|
|
|
class AsyncHubResult(AsyncResult):
|
|
|
"""Class to wrap pending results that must be requested from the Hub.
|
|
|
|
|
|
Note that waiting/polling on these objects requires polling the Hubover the network,
|
|
|
so use `AsyncHubResult.wait()` sparingly.
|
|
|
"""
|
|
|
|
|
|
def wait(self, timeout=-1):
|
|
|
"""wait for result to complete."""
|
|
|
start = time.time()
|
|
|
if self._ready:
|
|
|
return
|
|
|
local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
|
|
|
local_ready = self._client.wait(local_ids, timeout)
|
|
|
if local_ready:
|
|
|
remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
|
|
|
if not remote_ids:
|
|
|
self._ready = True
|
|
|
else:
|
|
|
rdict = self._client.result_status(remote_ids, status_only=False)
|
|
|
pending = rdict['pending']
|
|
|
while pending and (timeout < 0 or time.time() < start+timeout):
|
|
|
rdict = self._client.result_status(remote_ids, status_only=False)
|
|
|
pending = rdict['pending']
|
|
|
if pending:
|
|
|
time.sleep(0.1)
|
|
|
if not pending:
|
|
|
self._ready = True
|
|
|
if self._ready:
|
|
|
try:
|
|
|
results = map(self._client.results.get, self.msg_ids)
|
|
|
self._result = results
|
|
|
if self._single_result:
|
|
|
r = results[0]
|
|
|
if isinstance(r, Exception):
|
|
|
raise r
|
|
|
else:
|
|
|
results = error.collect_exceptions(results, self._fname)
|
|
|
self._result = self._reconstruct_result(results)
|
|
|
except Exception, e:
|
|
|
self._exception = e
|
|
|
self._success = False
|
|
|
else:
|
|
|
self._success = True
|
|
|
finally:
|
|
|
self._metadata = map(self._client.metadata.get, self.msg_ids)
|
|
|
|
|
|
__all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult']
|