##// END OF EJS Templates
update connections and diagrams for reduced sockets
update connections and diagrams for reduced sockets

File last commit:

r3654:62f8971b
r3658:8fb951e7
Show More
asyncresult.py
321 lines | 10.9 KiB | text/x-python | PythonLexer
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589 """AsyncResult objects for the client"""
#-----------------------------------------------------------------------------
# Copyright (C) 2010 The IPython Development Team
#
# Distributed under the terms of the BSD License. The full license is in
# the file COPYING, distributed as part of this software.
#-----------------------------------------------------------------------------
#-----------------------------------------------------------------------------
# Imports
#-----------------------------------------------------------------------------
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 import time
MinRK
add rich AsyncResult behavior
r3601 from IPython.external.decorator import decorator
MinRK
eliminate relative imports
r3642 from . import error
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589
#-----------------------------------------------------------------------------
# Classes
#-----------------------------------------------------------------------------
MinRK
add rich AsyncResult behavior
r3601 @decorator
def check_ready(f, self, *args, **kwargs):
"""Call spin() to sync state prior to calling the method."""
self.wait(0)
if not self._ready:
raise error.TimeoutError("result not ready")
return f(self, *args, **kwargs)
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589 class AsyncResult(object):
"""Class for representing results of non-blocking calls.
MinRK
cleanup pass
r3644 Provides the same interface as :py:class:`multiprocessing.pool.AsyncResult`.
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589 """
MinRK
parallelz doc updates, metadata bug fixed.
r3618
msg_ids = None
MinRK
add message tracking to client, add/improve tests
r3654 _targets = None
_tracker = None
MinRK
parallelz doc updates, metadata bug fixed.
r3618
MinRK
add message tracking to client, add/improve tests
r3654 def __init__(self, client, msg_ids, fname='unknown', targets=None, tracker=None):
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589 self._client = client
MinRK
support iterating through map results as they arrive
r3627 if isinstance(msg_ids, basestring):
msg_ids = [msg_ids]
MinRK
protect LBView.targets, AsyncResult._msg_ids -> .msg_ds
r3592 self.msg_ids = msg_ids
MinRK
multitarget returns list instead of dict
r3596 self._fname=fname
MinRK
add message tracking to client, add/improve tests
r3654 self._targets = targets
self._tracker = tracker
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589 self._ready = False
self._success = None
MinRK
add timeout for unmet dependencies in task scheduler
r3611 self._single_result = len(msg_ids) == 1
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589
def __repr__(self):
if self._ready:
return "<%s: finished>"%(self.__class__.__name__)
else:
MinRK
multitarget returns list instead of dict
r3596 return "<%s: %s>"%(self.__class__.__name__,self._fname)
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589
def _reconstruct_result(self, res):
MinRK
cleanup pass
r3644 """Reconstruct our result from actual result list (always a list)
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589 Override me in subclasses for turning a list of results
into the expected form.
"""
MinRK
add timeout for unmet dependencies in task scheduler
r3611 if self._single_result:
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589 return res[0]
else:
return res
def get(self, timeout=-1):
"""Return the result when it arrives.
If `timeout` is not ``None`` and the result does not arrive within
`timeout` seconds then ``TimeoutError`` is raised. If the
remote call raised an exception then that exception will be reraised
MinRK
cleanup pass
r3644 by get() inside a `RemoteError`.
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589 """
if not self.ready():
self.wait(timeout)
if self._ready:
if self._success:
return self._result
else:
raise self._exception
else:
raise error.TimeoutError("Result not ready.")
def ready(self):
"""Return whether the call has completed."""
if not self._ready:
self.wait(0)
return self._ready
def wait(self, timeout=-1):
"""Wait until the result is available or until `timeout` seconds pass.
MinRK
cleanup pass
r3644
This method always returns None.
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589 """
if self._ready:
return
MinRK
protect LBView.targets, AsyncResult._msg_ids -> .msg_ds
r3592 self._ready = self._client.barrier(self.msg_ids, timeout)
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589 if self._ready:
try:
MinRK
protect LBView.targets, AsyncResult._msg_ids -> .msg_ds
r3592 results = map(self._client.results.get, self.msg_ids)
MinRK
add rich AsyncResult behavior
r3601 self._result = results
MinRK
add timeout for unmet dependencies in task scheduler
r3611 if self._single_result:
r = results[0]
if isinstance(r, Exception):
raise r
else:
results = error.collect_exceptions(results, self._fname)
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589 self._result = self._reconstruct_result(results)
except Exception, e:
self._exception = e
self._success = False
else:
self._success = True
MinRK
add rich AsyncResult behavior
r3601 finally:
self._metadata = map(self._client.metadata.get, self.msg_ids)
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589
def successful(self):
"""Return whether the call completed without raising an exception.
Will raise ``AssertionError`` if the result is not ready.
"""
MinRK
cleanup pass
r3644 assert self.ready()
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589 return self._success
MinRK
add rich AsyncResult behavior
r3601
#----------------------------------------------------------------
# Extra methods not in mp.pool.AsyncResult
#----------------------------------------------------------------
def get_dict(self, timeout=-1):
MinRK
cleanup pass
r3644 """Get the results as a dict, keyed by engine_id.
timeout behavior is described in `get()`.
"""
MinRK
add rich AsyncResult behavior
r3601 results = self.get(timeout)
MinRK
Improvements to dependency handling...
r3607 engine_ids = [ md['engine_id'] for md in self._metadata ]
MinRK
add rich AsyncResult behavior
r3601 bycount = sorted(engine_ids, key=lambda k: engine_ids.count(k))
maxcount = bycount.count(bycount[-1])
if maxcount > 1:
raise ValueError("Cannot build dict, %i jobs ran on engine #%i"%(
maxcount, bycount[-1]))
return dict(zip(engine_ids,results))
@property
@check_ready
def result(self):
MinRK
cleanup pass
r3644 """result property wrapper for `get(timeout=0)`."""
MinRK
add rich AsyncResult behavior
r3601 return self._result
MinRK
Improvements to dependency handling...
r3607 # abbreviated alias:
r = result
MinRK
add rich AsyncResult behavior
r3601 @property
@check_ready
def metadata(self):
MinRK
cleanup pass
r3644 """property for accessing execution metadata."""
MinRK
add timeout for unmet dependencies in task scheduler
r3611 if self._single_result:
MinRK
Improvements to dependency handling...
r3607 return self._metadata[0]
else:
return self._metadata
MinRK
add rich AsyncResult behavior
r3601
@property
def result_dict(self):
"""result property as a dict."""
return self.get_dict(0)
MinRK
propagate iopub to clients
r3602 def __dict__(self):
return self.get_dict(0)
MinRK
add message tracking to client, add/improve tests
r3654
def abort(self):
"""abort my tasks."""
assert not self.ready(), "Can't abort, I am already done!"
return self.client.abort(self.msg_ids, targets=self._targets, block=True)
@property
def sent(self):
"""check whether my messages have been sent"""
if self._tracker is None:
return True
else:
return self._tracker.done
MinRK
propagate iopub to clients
r3602
MinRK
add rich AsyncResult behavior
r3601 #-------------------------------------
# dict-access
#-------------------------------------
@check_ready
def __getitem__(self, key):
"""getitem returns result value(s) if keyed by int/slice, or metadata if key is str.
"""
if isinstance(key, int):
return error.collect_exceptions([self._result[key]], self._fname)[0]
elif isinstance(key, slice):
return error.collect_exceptions(self._result[key], self._fname)
elif isinstance(key, basestring):
MinRK
Improvements to dependency handling...
r3607 values = [ md[key] for md in self._metadata ]
MinRK
add timeout for unmet dependencies in task scheduler
r3611 if self._single_result:
MinRK
Improvements to dependency handling...
r3607 return values[0]
else:
return values
MinRK
add rich AsyncResult behavior
r3601 else:
raise TypeError("Invalid key type %r, must be 'int','slice', or 'str'"%type(key))
@check_ready
def __getattr__(self, key):
MinRK
cleanup pass
r3644 """getattr maps to getitem for convenient attr access to metadata."""
MinRK
add rich AsyncResult behavior
r3601 if key not in self._metadata[0].keys():
raise AttributeError("%r object has no attribute %r"%(
self.__class__.__name__, key))
return self.__getitem__(key)
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639
# asynchronous iterator:
def __iter__(self):
if self._single_result:
raise TypeError("AsyncResults with a single result are not iterable.")
try:
rlist = self.get(0)
except error.TimeoutError:
# wait for each result individually
for msg_id in self.msg_ids:
ar = AsyncResult(self._client, msg_id, self._fname)
yield ar.get()
else:
# already done
for r in rlist:
yield r
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589
MinRK
add rich AsyncResult behavior
r3601
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589 class AsyncMapResult(AsyncResult):
"""Class for representing results of non-blocking gathers.
This will properly reconstruct the gather.
"""
MinRK
multitarget returns list instead of dict
r3596 def __init__(self, client, msg_ids, mapObject, fname=''):
AsyncResult.__init__(self, client, msg_ids, fname=fname)
MinRK
Improvements to dependency handling...
r3607 self._mapObject = mapObject
MinRK
add timeout for unmet dependencies in task scheduler
r3611 self._single_result = False
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589
def _reconstruct_result(self, res):
"""Perform the gather on the actual results."""
return self._mapObject.joinPartitions(res)
MinRK
support iterating through map results as they arrive
r3627 # asynchronous iterator:
def __iter__(self):
try:
rlist = self.get(0)
except error.TimeoutError:
# wait for each result individually
for msg_id in self.msg_ids:
ar = AsyncResult(self._client, msg_id, self._fname)
rlist = ar.get()
try:
for r in rlist:
yield r
except TypeError:
# flattened, not a list
# this could get broken by flattened data that returns iterables
# but most calls to map do not expose the `flatten` argument
yield rlist
else:
# already done
for r in rlist:
yield r
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639
class AsyncHubResult(AsyncResult):
MinRK
cleanup pass
r3644 """Class to wrap pending results that must be requested from the Hub.
Note that waiting/polling on these objects requires polling the Hubover the network,
so use `AsyncHubResult.wait()` sparingly.
"""
MinRK
support iterating through map results as they arrive
r3627
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 def wait(self, timeout=-1):
"""wait for result to complete."""
start = time.time()
if self._ready:
return
local_ids = filter(lambda msg_id: msg_id in self._client.outstanding, self.msg_ids)
local_ready = self._client.barrier(local_ids, timeout)
if local_ready:
remote_ids = filter(lambda msg_id: msg_id not in self._client.results, self.msg_ids)
if not remote_ids:
self._ready = True
else:
rdict = self._client.result_status(remote_ids, status_only=False)
pending = rdict['pending']
MinRK
testing fixes
r3641 while pending and (timeout < 0 or time.time() < start+timeout):
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 rdict = self._client.result_status(remote_ids, status_only=False)
pending = rdict['pending']
if pending:
time.sleep(0.1)
if not pending:
self._ready = True
if self._ready:
try:
results = map(self._client.results.get, self.msg_ids)
self._result = results
if self._single_result:
r = results[0]
if isinstance(r, Exception):
raise r
else:
results = error.collect_exceptions(results, self._fname)
self._result = self._reconstruct_result(results)
except Exception, e:
self._exception = e
self._success = False
else:
self._success = True
finally:
self._metadata = map(self._client.metadata.get, self.msg_ids)
MinRK
PendingResult->AsyncResult; match multiprocessing.AsyncResult api
r3589
MinRK
split get_results into get_result/result_status, add AsyncHubResult
r3639 __all__ = ['AsyncResult', 'AsyncMapResult', 'AsyncHubResult']