##// END OF EJS Templates
String in test for #822 was meant to be a bytestring, fixes it.
String in test for #822 was meant to be a bytestring, fixes it.

File last commit:

r5222:d1c1aaa7
r5360:ddb3a0fb
Show More
asyncresult.py
395 lines | 13.4 KiB | text/x-python | PythonLexer
MinRK
update recently changed modules with Authors in docstring
r4018 """AsyncResult objects for the client
Authors:
* MinRK
"""
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589 #-----------------------------------------------------------------------------
MinRK
copyright statements
r3660 # Copyright (C) 2010-2011 The IPython Development Team
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589 #
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 import time
MinRK
update API after sagedays29...
r3664 from zmq import MessageTracker
MinRK
add rich AsyncResult behavior
r3601 from IPython.external.decorator import decorator
MinRK
organize IPython.parallel into subpackages
r3673 from IPython.parallel import error
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589
#-----------------------------------------------------------------------------
# Classes
#-----------------------------------------------------------------------------
MinRK
update API after sagedays29...
r3664 # global empty tracker that's always done:
finished_tracker = MessageTracker()
MinRK
add rich AsyncResult behavior
r3601 @decorator
def check_ready(f, self, *args, **kwargs):
"""Call spin() to sync state prior to calling the method."""
self.wait(0)
if not self._ready:
raise error.TimeoutError("result not ready")
return f(self, *args, **kwargs)
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589 class AsyncResult(object):
"""Class for representing results of non-blocking calls.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
cleanup pass
r3644 Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`.
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589 """
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
parallelz doc updates, metadata bug fixed.
r3618 msg_ids = None
MinRK
add message tracking to client, add/improve tests
r3654 _targets = None
_tracker = None
MinRK
update API after sagedays29...
r3664 _single_result = False
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add message tracking to client, add/improve tests
r3654 def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None):
MinRK
support iterating through map results as they arrive
r3627 if isinstance(msg_ids, basestring):
MinRK
update API after sagedays29...
r3664 # always a list
MinRK
support iterating through map results as they arrive
r3627 msg_ids = [msg_ids]
MinRK
update API after sagedays29...
r3664 if tracker is None:
# default to always done
tracker = finished_tracker
self._client = client
MinRK
protect LBView.targets, AsyncResult._msg_ids -> .msg_ds
r3592 self.msg_ids = msg_ids
MinRK
multitarget returns list instead of dict
r3596 self._fname=fname
MinRK
add message tracking to client, add/improve tests
r3654 self._targets = targets
self._tracker = tracker
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589 self._ready = False
self._success = None
MinRK
AsyncResult.__getattr__ shouldn't raise TimeoutError...
r5222 self._metadata = None
MinRK
update API after sagedays29...
r3664 if len(msg_ids) == 1:
self._single_result = not isinstance(targets, (list, tuple))
else:
self._single_result = False
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589 def __repr__(self):
if self._ready:
return "<%s: finished>"%(self.__class__.__name__)
else:
MinRK
multitarget returns list instead of dict
r3596 return "<%s: %s>"%(self.__class__.__name__,self._fname)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589 def _reconstruct_result(self, res):
MinRK
cleanup pass
r3644 """Reconstruct our result from actual result list (always a list)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589 Override me in subclasses for turning a list of results
into the expected form.
"""
MinRK
add timeout for unmet dependencies in task scheduler
r3611 if self._single_result:
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589 return res[0]
else:
return res
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589 def get(self, timeout=-1):
Bernardo B. Marques
remove all trailling spaces
r4872 """Return the result when it arrives.
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589 If `timeout` is not ``None`` and the result does not arrive within
`timeout` seconds then ``TimeoutError`` is raised. If the
remote call raised an exception then that exception will be reraised
MinRK
cleanup pass
r3644 by get() inside a `RemoteError`.
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589 """
if not self.ready():
self.wait(timeout)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589 if self._ready:
if self._success:
return self._result
else:
raise self._exception
else:
raise error.TimeoutError("Result not ready.")
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589 def ready(self):
"""Return whether the call has completed."""
if not self._ready:
self.wait(0)
return self._ready
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589 def wait(self, timeout=-1):
"""Wait until the result is available or until `timeout` seconds pass.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
cleanup pass
r3644 This method always returns None.
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589 """
if self._ready:
return
MinRK
update API after sagedays29...
r3664 self._ready = self._client.wait(self.msg_ids, timeout)
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589 if self._ready:
try:
MinRK
protect LBView.targets, AsyncResult._msg_ids -> .msg_ds
r3592 results = map(self._client.results.get, self.msg_ids)
MinRK
add rich AsyncResult behavior
r3601 self._result = results
MinRK
add timeout for unmet dependencies in task scheduler
r3611 if self._single_result:
r = results[0]
if isinstance(r, Exception):
raise r
else:
results = error.collect_exceptions(results, self._fname)
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589 self._result = self._reconstruct_result(results)
except Exception, e:
self._exception = e
self._success = False
else:
self._success = True
MinRK
add rich AsyncResult behavior
r3601 finally:
self._metadata = map(self._client.metadata.get, self.msg_ids)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589 def successful(self):
Bernardo B. Marques
remove all trailling spaces
r4872 """Return whether the call completed without raising an exception.
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589 Will raise ``AssertionError`` if the result is not ready.
"""
MinRK
cleanup pass
r3644 assert self.ready()
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589 return self._success
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add rich AsyncResult behavior
r3601 #----------------------------------------------------------------
# Extra methods not in mp.pool.AsyncResult
#----------------------------------------------------------------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add rich AsyncResult behavior
r3601 def get_dict(self, timeout=-1):
MinRK
cleanup pass
r3644 """Get the results as a dict, keyed by engine_id.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
cleanup pass
r3644 timeout behavior is described in `get()`.
"""
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add rich AsyncResult behavior
r3601 results = self.get(timeout)
MinRK
Improvements to dependency handling...
r3607 engine_ids = [ md['engine_id'] for md in self._metadata ]
MinRK
add rich AsyncResult behavior
r3601 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
maxcount = bycount.count(bycount[-1])
if maxcount > 1:
raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
maxcount, bycount[-1]))
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add rich AsyncResult behavior
r3601 return dict(zip(engine_ids,results))
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add rich AsyncResult behavior
r3601 @property
def result(self):
MinRK
cleanup pass
r3644 """result property wrapper for `get(timeout=0)`."""
MinRK
update API after sagedays29...
r3664 return self.get()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
Improvements to dependency handling...
r3607 # abbreviated alias:
r = result
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add rich AsyncResult behavior
r3601 @property
@check_ready
def metadata(self):
MinRK
cleanup pass
r3644 """property for accessing execution metadata."""
MinRK
add timeout for unmet dependencies in task scheduler
r3611 if self._single_result:
MinRK
Improvements to dependency handling...
r3607 return self._metadata[0]
else:
return self._metadata
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add rich AsyncResult behavior
r3601 @property
def result_dict(self):
"""result property as a dict."""
MinRK
update API after sagedays29...
r3664 return self.get_dict()
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
propagate iopub to clients
r3602 def __dict__(self):
return self.get_dict(0)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add message tracking to client, add/improve tests
r3654 def abort(self):
"""abort my tasks."""
assert not self.ready(), "Can't abort, I am already done!"
return self.client.abort(self.msg_ids, targets=self._targets, block=True)
@property
def sent(self):
MinRK
update API after sagedays29...
r3664 """check whether my messages have been sent."""
return self._tracker.done
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 def wait_for_send(self, timeout=-1):
"""wait for pyzmq send to complete.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
update API after sagedays29...
r3664 This is necessary when sending arrays that you intend to edit in-place.
`timeout` is in seconds, and will raise TimeoutError if it is reached
before the send completes.
"""
return self._tracker.wait(timeout)
MinRK
propagate iopub to clients
r3602
MinRK
add rich AsyncResult behavior
r3601 #-------------------------------------
# dict-access
#-------------------------------------
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add rich AsyncResult behavior
r3601 @check_ready
def __getitem__(self, key):
"""getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
"""
if isinstance(key, int):
return error.collect_exceptions([self._result[key]], self._fname)[0]
elif isinstance(key, slice):
return error.collect_exceptions(self._result[key], self._fname)
elif isinstance(key, basestring):
MinRK
Improvements to dependency handling...
r3607 values = [ md[key] for md in self._metadata ]
MinRK
add timeout for unmet dependencies in task scheduler
r3611 if self._single_result:
MinRK
Improvements to dependency handling...
r3607 return values[0]
else:
return values
MinRK
add rich AsyncResult behavior
r3601 else:
raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add rich AsyncResult behavior
r3601 def __getattr__(self, key):
MinRK
cleanup pass
r3644 """getattr maps to getitem for convenient attr access to metadata."""
MinRK
AsyncResult.__getattr__ shouldn't raise TimeoutError...
r5222 try:
return self.__getitem__(key)
except (error.TimeoutError, KeyError):
MinRK
add rich AsyncResult behavior
r3601 raise AttributeError("%r object has no attribute %r"%(
self.__class__.__name__, key))
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 # asynchronous iterator:
def __iter__(self):
if self._single_result:
raise TypeError("AsyncResults with a single result are not iterable.")
try:
rlist = self.get(0)
except error.TimeoutError:
# wait for each result individually
for msg_id in self.msg_ids:
ar = AsyncResult(self._client, msg_id, self._fname)
yield ar.get()
else:
# already done
for r in rlist:
yield r
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589 class AsyncMapResult(AsyncResult):
"""Class for representing results of non-blocking gathers.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589 This will properly reconstruct the gather.
MinRK
add unordered iteration to AsyncMapResults...
r5171
This class is iterable at any time, and will wait on results as they come.
If ordered=False, then the first results to arrive will come first, otherwise
results will be yielded in the order they were submitted.
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589 """
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
add unordered iteration to AsyncMapResults...
r5171 def __init__(self, client, msg_ids, mapObject, fname='', ordered=True):
MinRK
multitarget returns list instead of dict
r3596 AsyncResult.__init__(self, client, msg_ids, fname=fname)
MinRK
Improvements to dependency handling...
r3607 self._mapObject = mapObject
MinRK
add timeout for unmet dependencies in task scheduler
r3611 self._single_result = False
MinRK
add unordered iteration to AsyncMapResults...
r5171 self.ordered = ordered
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589 def _reconstruct_result(self, res):
"""Perform the gather on the actual results."""
return self._mapObject.joinPartitions(res)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
support iterating through map results as they arrive
r3627 # asynchronous iterator:
def __iter__(self):
MinRK
add unordered iteration to AsyncMapResults...
r5171 it = self._ordered_iter if self.ordered else self._unordered_iter
for r in it():
yield r
# asynchronous ordered iterator:
def _ordered_iter(self):
"""iterator for results *as they arrive*, preserving submission order."""
MinRK
support iterating through map results as they arrive
r3627 try:
rlist = self.get(0)
except error.TimeoutError:
# wait for each result individually
for msg_id in self.msg_ids:
ar = AsyncResult(self._client, msg_id, self._fname)
rlist = ar.get()
try:
for r in rlist:
yield r
except TypeError:
# flattened, not a list
# this could get broken by flattened data that returns iterables
# but most calls to map do not expose the `flatten` argument
yield rlist
else:
# already done
for r in rlist:
yield r
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639
MinRK
add unordered iteration to AsyncMapResults...
r5171 # asynchronous unordered iterator:
def _unordered_iter(self):
"""iterator for results *as they arrive*, on FCFS basis, ignoring submission order."""
try:
rlist = self.get(0)
except error.TimeoutError:
pending = set(self.msg_ids)
while pending:
try:
self._client.wait(pending, 1e-3)
except error.TimeoutError:
# ignore timeout error, because that only means
# *some* jobs are outstanding
pass
# update ready set with those no longer outstanding:
ready = pending.difference(self._client.outstanding)
# update pending to exclude those that are finished
pending = pending.difference(ready)
while ready:
msg_id = ready.pop()
ar = AsyncResult(self._client, msg_id, self._fname)
rlist = ar.get()
try:
for r in rlist:
yield r
except TypeError:
# flattened, not a list
# this could get broken by flattened data that returns iterables
# but most calls to map do not expose the `flatten` argument
yield rlist
else:
# already done
for r in rlist:
yield r
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639
class AsyncHubResult(AsyncResult):
MinRK
cleanup pass
r3644 """Class to wrap pending results that must be requested from the Hub.
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
cleanup pass
r3644 Note that waiting/polling on these objects requires polling the Hubover the network,
so use `AsyncHubResult.wait()` sparingly.
"""
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 def wait(self, timeout=-1):
"""wait for result to complete."""
start = time.time()
if self._ready:
return
local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
MinRK
update API after sagedays29...
r3664 local_ready = self._client.wait(local_ids, timeout)
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 if local_ready:
remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
if not remote_ids:
self._ready = True
else:
rdict = self._client.result_status(remote_ids, status_only=False)
pending = rdict['pending']
MinRK
testing fixes
r3641 while pending and (timeout < 0 or time.time() < start+timeout):
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 rdict = self._client.result_status(remote_ids, status_only=False)
pending = rdict['pending']
if pending:
time.sleep(0.1)
if not pending:
self._ready = True
if self._ready:
try:
results = map(self._client.results.get, self.msg_ids)
self._result = results
if self._single_result:
r = results[0]
if isinstance(r, Exception):
raise r
else:
results = error.collect_exceptions(results, self._fname)
self._result = self._reconstruct_result(results)
except Exception, e:
self._exception = e
self._success = False
else:
self._success = True
finally:
self._metadata = map(self._client.metadata.get, self.msg_ids)
Bernardo B. Marques
remove all trailling spaces
r4872
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult']