_base.py
669 lines
| 23.3 KiB
| text/x-python
|
PythonLexer
Gregory Szorc
|
r37641 | # Copyright 2009 Brian Quinlan. All Rights Reserved. | ||
# Licensed to PSF under a Contributor Agreement. | ||||
Gregory Szorc
|
r37644 | from __future__ import absolute_import | ||
Gregory Szorc
|
r37641 | import collections | ||
import logging | ||||
import threading | ||||
import itertools | ||||
import time | ||||
import types | ||||
__author__ = 'Brian Quinlan (brian@sweetapp.com)' | ||||
FIRST_COMPLETED = 'FIRST_COMPLETED' | ||||
FIRST_EXCEPTION = 'FIRST_EXCEPTION' | ||||
ALL_COMPLETED = 'ALL_COMPLETED' | ||||
_AS_COMPLETED = '_AS_COMPLETED' | ||||
# Possible future states (for internal use by the futures package). | ||||
PENDING = 'PENDING' | ||||
RUNNING = 'RUNNING' | ||||
# The future was cancelled by the user... | ||||
CANCELLED = 'CANCELLED' | ||||
# ...and _Waiter.add_cancelled() was called by a worker. | ||||
CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED' | ||||
FINISHED = 'FINISHED' | ||||
_FUTURE_STATES = [ | ||||
PENDING, | ||||
RUNNING, | ||||
CANCELLED, | ||||
CANCELLED_AND_NOTIFIED, | ||||
FINISHED | ||||
] | ||||
_STATE_TO_DESCRIPTION_MAP = { | ||||
PENDING: "pending", | ||||
RUNNING: "running", | ||||
CANCELLED: "cancelled", | ||||
CANCELLED_AND_NOTIFIED: "cancelled", | ||||
FINISHED: "finished" | ||||
} | ||||
# Logger for internal use by the futures package. | ||||
LOGGER = logging.getLogger("concurrent.futures") | ||||
class Error(Exception): | ||||
"""Base class for all future-related exceptions.""" | ||||
pass | ||||
class CancelledError(Error): | ||||
"""The Future was cancelled.""" | ||||
pass | ||||
class TimeoutError(Error): | ||||
"""The operation exceeded the given deadline.""" | ||||
pass | ||||
class _Waiter(object): | ||||
"""Provides the event that wait() and as_completed() block on.""" | ||||
def __init__(self): | ||||
self.event = threading.Event() | ||||
self.finished_futures = [] | ||||
def add_result(self, future): | ||||
self.finished_futures.append(future) | ||||
def add_exception(self, future): | ||||
self.finished_futures.append(future) | ||||
def add_cancelled(self, future): | ||||
self.finished_futures.append(future) | ||||
class _AsCompletedWaiter(_Waiter): | ||||
"""Used by as_completed().""" | ||||
def __init__(self): | ||||
super(_AsCompletedWaiter, self).__init__() | ||||
self.lock = threading.Lock() | ||||
def add_result(self, future): | ||||
with self.lock: | ||||
super(_AsCompletedWaiter, self).add_result(future) | ||||
self.event.set() | ||||
def add_exception(self, future): | ||||
with self.lock: | ||||
super(_AsCompletedWaiter, self).add_exception(future) | ||||
self.event.set() | ||||
def add_cancelled(self, future): | ||||
with self.lock: | ||||
super(_AsCompletedWaiter, self).add_cancelled(future) | ||||
self.event.set() | ||||
class _FirstCompletedWaiter(_Waiter): | ||||
"""Used by wait(return_when=FIRST_COMPLETED).""" | ||||
def add_result(self, future): | ||||
super(_FirstCompletedWaiter, self).add_result(future) | ||||
self.event.set() | ||||
def add_exception(self, future): | ||||
super(_FirstCompletedWaiter, self).add_exception(future) | ||||
self.event.set() | ||||
def add_cancelled(self, future): | ||||
super(_FirstCompletedWaiter, self).add_cancelled(future) | ||||
self.event.set() | ||||
class _AllCompletedWaiter(_Waiter): | ||||
"""Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED).""" | ||||
def __init__(self, num_pending_calls, stop_on_exception): | ||||
self.num_pending_calls = num_pending_calls | ||||
self.stop_on_exception = stop_on_exception | ||||
self.lock = threading.Lock() | ||||
super(_AllCompletedWaiter, self).__init__() | ||||
def _decrement_pending_calls(self): | ||||
with self.lock: | ||||
self.num_pending_calls -= 1 | ||||
if not self.num_pending_calls: | ||||
self.event.set() | ||||
def add_result(self, future): | ||||
super(_AllCompletedWaiter, self).add_result(future) | ||||
self._decrement_pending_calls() | ||||
def add_exception(self, future): | ||||
super(_AllCompletedWaiter, self).add_exception(future) | ||||
if self.stop_on_exception: | ||||
self.event.set() | ||||
else: | ||||
self._decrement_pending_calls() | ||||
def add_cancelled(self, future): | ||||
super(_AllCompletedWaiter, self).add_cancelled(future) | ||||
self._decrement_pending_calls() | ||||
class _AcquireFutures(object): | ||||
"""A context manager that does an ordered acquire of Future conditions.""" | ||||
def __init__(self, futures): | ||||
self.futures = sorted(futures, key=id) | ||||
def __enter__(self): | ||||
for future in self.futures: | ||||
future._condition.acquire() | ||||
def __exit__(self, *args): | ||||
for future in self.futures: | ||||
future._condition.release() | ||||
def _create_and_install_waiters(fs, return_when): | ||||
if return_when == _AS_COMPLETED: | ||||
waiter = _AsCompletedWaiter() | ||||
elif return_when == FIRST_COMPLETED: | ||||
waiter = _FirstCompletedWaiter() | ||||
else: | ||||
pending_count = sum( | ||||
f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs) | ||||
if return_when == FIRST_EXCEPTION: | ||||
waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True) | ||||
elif return_when == ALL_COMPLETED: | ||||
waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False) | ||||
else: | ||||
raise ValueError("Invalid return condition: %r" % return_when) | ||||
for f in fs: | ||||
f._waiters.append(waiter) | ||||
return waiter | ||||
def _yield_finished_futures(fs, waiter, ref_collect): | ||||
""" | ||||
Iterate on the list *fs*, yielding finished futures one by one in | ||||
reverse order. | ||||
Before yielding a future, *waiter* is removed from its waiters | ||||
and the future is removed from each set in the collection of sets | ||||
*ref_collect*. | ||||
The aim of this function is to avoid keeping stale references after | ||||
the future is yielded and before the iterator resumes. | ||||
""" | ||||
while fs: | ||||
f = fs[-1] | ||||
for futures_set in ref_collect: | ||||
futures_set.remove(f) | ||||
with f._condition: | ||||
f._waiters.remove(waiter) | ||||
del f | ||||
# Careful not to keep a reference to the popped value | ||||
yield fs.pop() | ||||
def as_completed(fs, timeout=None): | ||||
"""An iterator over the given futures that yields each as it completes. | ||||
Args: | ||||
fs: The sequence of Futures (possibly created by different Executors) to | ||||
iterate over. | ||||
timeout: The maximum number of seconds to wait. If None, then there | ||||
is no limit on the wait time. | ||||
Returns: | ||||
An iterator that yields the given Futures as they complete (finished or | ||||
cancelled). If any given Futures are duplicated, they will be returned | ||||
once. | ||||
Raises: | ||||
TimeoutError: If the entire result iterator could not be generated | ||||
before the given timeout. | ||||
""" | ||||
if timeout is not None: | ||||
end_time = timeout + time.time() | ||||
fs = set(fs) | ||||
total_futures = len(fs) | ||||
with _AcquireFutures(fs): | ||||
finished = set( | ||||
f for f in fs | ||||
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) | ||||
pending = fs - finished | ||||
waiter = _create_and_install_waiters(fs, _AS_COMPLETED) | ||||
finished = list(finished) | ||||
try: | ||||
for f in _yield_finished_futures(finished, waiter, | ||||
ref_collect=(fs,)): | ||||
f = [f] | ||||
yield f.pop() | ||||
while pending: | ||||
if timeout is None: | ||||
wait_timeout = None | ||||
else: | ||||
wait_timeout = end_time - time.time() | ||||
if wait_timeout < 0: | ||||
raise TimeoutError( | ||||
'%d (of %d) futures unfinished' % ( | ||||
len(pending), total_futures)) | ||||
waiter.event.wait(wait_timeout) | ||||
with waiter.lock: | ||||
finished = waiter.finished_futures | ||||
waiter.finished_futures = [] | ||||
waiter.event.clear() | ||||
# reverse to keep finishing order | ||||
finished.reverse() | ||||
for f in _yield_finished_futures(finished, waiter, | ||||
ref_collect=(fs, pending)): | ||||
f = [f] | ||||
yield f.pop() | ||||
finally: | ||||
# Remove waiter from unfinished futures | ||||
for f in fs: | ||||
with f._condition: | ||||
f._waiters.remove(waiter) | ||||
DoneAndNotDoneFutures = collections.namedtuple( | ||||
'DoneAndNotDoneFutures', 'done not_done') | ||||
def wait(fs, timeout=None, return_when=ALL_COMPLETED): | ||||
"""Wait for the futures in the given sequence to complete. | ||||
Args: | ||||
fs: The sequence of Futures (possibly created by different Executors) to | ||||
wait upon. | ||||
timeout: The maximum number of seconds to wait. If None, then there | ||||
is no limit on the wait time. | ||||
return_when: Indicates when this function should return. The options | ||||
are: | ||||
FIRST_COMPLETED - Return when any future finishes or is | ||||
cancelled. | ||||
FIRST_EXCEPTION - Return when any future finishes by raising an | ||||
exception. If no future raises an exception | ||||
then it is equivalent to ALL_COMPLETED. | ||||
ALL_COMPLETED - Return when all futures finish or are cancelled. | ||||
Returns: | ||||
A named 2-tuple of sets. The first set, named 'done', contains the | ||||
futures that completed (is finished or cancelled) before the wait | ||||
completed. The second set, named 'not_done', contains uncompleted | ||||
futures. | ||||
""" | ||||
with _AcquireFutures(fs): | ||||
done = set(f for f in fs | ||||
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) | ||||
not_done = set(fs) - done | ||||
if (return_when == FIRST_COMPLETED) and done: | ||||
return DoneAndNotDoneFutures(done, not_done) | ||||
elif (return_when == FIRST_EXCEPTION) and done: | ||||
if any(f for f in done | ||||
if not f.cancelled() and f.exception() is not None): | ||||
return DoneAndNotDoneFutures(done, not_done) | ||||
if len(done) == len(fs): | ||||
return DoneAndNotDoneFutures(done, not_done) | ||||
waiter = _create_and_install_waiters(fs, return_when) | ||||
waiter.event.wait(timeout) | ||||
for f in fs: | ||||
with f._condition: | ||||
f._waiters.remove(waiter) | ||||
done.update(waiter.finished_futures) | ||||
return DoneAndNotDoneFutures(done, set(fs) - done) | ||||
class Future(object): | ||||
"""Represents the result of an asynchronous computation.""" | ||||
def __init__(self): | ||||
"""Initializes the future. Should not be called by clients.""" | ||||
self._condition = threading.Condition() | ||||
self._state = PENDING | ||||
self._result = None | ||||
self._exception = None | ||||
self._traceback = None | ||||
self._waiters = [] | ||||
self._done_callbacks = [] | ||||
def _invoke_callbacks(self): | ||||
for callback in self._done_callbacks: | ||||
try: | ||||
callback(self) | ||||
except Exception: | ||||
LOGGER.exception('exception calling callback for %r', self) | ||||
except BaseException: | ||||
# Explicitly let all other new-style exceptions through so | ||||
# that we can catch all old-style exceptions with a simple | ||||
# "except:" clause below. | ||||
# | ||||
# All old-style exception objects are instances of | ||||
# types.InstanceType, but "except types.InstanceType:" does | ||||
# not catch old-style exceptions for some reason. Thus, the | ||||
# only way to catch all old-style exceptions without catching | ||||
# any new-style exceptions is to filter out the new-style | ||||
# exceptions, which all derive from BaseException. | ||||
raise | ||||
except: | ||||
# Because of the BaseException clause above, this handler only | ||||
# executes for old-style exception objects. | ||||
LOGGER.exception('exception calling callback for %r', self) | ||||
def __repr__(self): | ||||
with self._condition: | ||||
if self._state == FINISHED: | ||||
if self._exception: | ||||
return '<%s at %#x state=%s raised %s>' % ( | ||||
self.__class__.__name__, | ||||
id(self), | ||||
_STATE_TO_DESCRIPTION_MAP[self._state], | ||||
self._exception.__class__.__name__) | ||||
else: | ||||
return '<%s at %#x state=%s returned %s>' % ( | ||||
self.__class__.__name__, | ||||
id(self), | ||||
_STATE_TO_DESCRIPTION_MAP[self._state], | ||||
self._result.__class__.__name__) | ||||
return '<%s at %#x state=%s>' % ( | ||||
self.__class__.__name__, | ||||
id(self), | ||||
_STATE_TO_DESCRIPTION_MAP[self._state]) | ||||
def cancel(self): | ||||
"""Cancel the future if possible. | ||||
Returns True if the future was cancelled, False otherwise. A future | ||||
cannot be cancelled if it is running or has already completed. | ||||
""" | ||||
with self._condition: | ||||
if self._state in [RUNNING, FINISHED]: | ||||
return False | ||||
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: | ||||
return True | ||||
self._state = CANCELLED | ||||
self._condition.notify_all() | ||||
self._invoke_callbacks() | ||||
return True | ||||
def cancelled(self): | ||||
"""Return True if the future was cancelled.""" | ||||
with self._condition: | ||||
return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] | ||||
def running(self): | ||||
"""Return True if the future is currently executing.""" | ||||
with self._condition: | ||||
return self._state == RUNNING | ||||
def done(self): | ||||
"""Return True of the future was cancelled or finished executing.""" | ||||
with self._condition: | ||||
return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED] | ||||
def __get_result(self): | ||||
if self._exception: | ||||
if isinstance(self._exception, types.InstanceType): | ||||
# The exception is an instance of an old-style class, which | ||||
# means type(self._exception) returns types.ClassType instead | ||||
# of the exception's actual class type. | ||||
exception_type = self._exception.__class__ | ||||
else: | ||||
exception_type = type(self._exception) | ||||
raise exception_type, self._exception, self._traceback | ||||
else: | ||||
return self._result | ||||
def add_done_callback(self, fn): | ||||
"""Attaches a callable that will be called when the future finishes. | ||||
Args: | ||||
fn: A callable that will be called with this future as its only | ||||
argument when the future completes or is cancelled. The callable | ||||
will always be called by a thread in the same process in which | ||||
it was added. If the future has already completed or been | ||||
cancelled then the callable will be called immediately. These | ||||
callables are called in the order that they were added. | ||||
""" | ||||
with self._condition: | ||||
if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]: | ||||
self._done_callbacks.append(fn) | ||||
return | ||||
fn(self) | ||||
def result(self, timeout=None): | ||||
"""Return the result of the call that the future represents. | ||||
Args: | ||||
timeout: The number of seconds to wait for the result if the future | ||||
isn't done. If None, then there is no limit on the wait time. | ||||
Returns: | ||||
The result of the call that the future represents. | ||||
Raises: | ||||
CancelledError: If the future was cancelled. | ||||
TimeoutError: If the future didn't finish executing before the given | ||||
timeout. | ||||
Exception: If the call raised then that exception will be raised. | ||||
""" | ||||
with self._condition: | ||||
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: | ||||
raise CancelledError() | ||||
elif self._state == FINISHED: | ||||
return self.__get_result() | ||||
self._condition.wait(timeout) | ||||
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: | ||||
raise CancelledError() | ||||
elif self._state == FINISHED: | ||||
return self.__get_result() | ||||
else: | ||||
raise TimeoutError() | ||||
def exception_info(self, timeout=None): | ||||
"""Return a tuple of (exception, traceback) raised by the call that the | ||||
future represents. | ||||
Args: | ||||
timeout: The number of seconds to wait for the exception if the | ||||
future isn't done. If None, then there is no limit on the wait | ||||
time. | ||||
Returns: | ||||
The exception raised by the call that the future represents or None | ||||
if the call completed without raising. | ||||
Raises: | ||||
CancelledError: If the future was cancelled. | ||||
TimeoutError: If the future didn't finish executing before the given | ||||
timeout. | ||||
""" | ||||
with self._condition: | ||||
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: | ||||
raise CancelledError() | ||||
elif self._state == FINISHED: | ||||
return self._exception, self._traceback | ||||
self._condition.wait(timeout) | ||||
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: | ||||
raise CancelledError() | ||||
elif self._state == FINISHED: | ||||
return self._exception, self._traceback | ||||
else: | ||||
raise TimeoutError() | ||||
def exception(self, timeout=None): | ||||
"""Return the exception raised by the call that the future represents. | ||||
Args: | ||||
timeout: The number of seconds to wait for the exception if the | ||||
future isn't done. If None, then there is no limit on the wait | ||||
time. | ||||
Returns: | ||||
The exception raised by the call that the future represents or None | ||||
if the call completed without raising. | ||||
Raises: | ||||
CancelledError: If the future was cancelled. | ||||
TimeoutError: If the future didn't finish executing before the given | ||||
timeout. | ||||
""" | ||||
return self.exception_info(timeout)[0] | ||||
# The following methods should only be used by Executors and in tests. | ||||
def set_running_or_notify_cancel(self): | ||||
"""Mark the future as running or process any cancel notifications. | ||||
Should only be used by Executor implementations and unit tests. | ||||
If the future has been cancelled (cancel() was called and returned | ||||
True) then any threads waiting on the future completing (though calls | ||||
to as_completed() or wait()) are notified and False is returned. | ||||
If the future was not cancelled then it is put in the running state | ||||
(future calls to running() will return True) and True is returned. | ||||
This method should be called by Executor implementations before | ||||
executing the work associated with this future. If this method returns | ||||
False then the work should not be executed. | ||||
Returns: | ||||
False if the Future was cancelled, True otherwise. | ||||
Raises: | ||||
RuntimeError: if this method was already called or if set_result() | ||||
or set_exception() was called. | ||||
""" | ||||
with self._condition: | ||||
if self._state == CANCELLED: | ||||
self._state = CANCELLED_AND_NOTIFIED | ||||
for waiter in self._waiters: | ||||
waiter.add_cancelled(self) | ||||
# self._condition.notify_all() is not necessary because | ||||
# self.cancel() triggers a notification. | ||||
return False | ||||
elif self._state == PENDING: | ||||
self._state = RUNNING | ||||
return True | ||||
else: | ||||
LOGGER.critical('Future %s in unexpected state: %s', | ||||
id(self), | ||||
self._state) | ||||
raise RuntimeError('Future in unexpected state') | ||||
def set_result(self, result): | ||||
"""Sets the return value of work associated with the future. | ||||
Should only be used by Executor implementations and unit tests. | ||||
""" | ||||
with self._condition: | ||||
self._result = result | ||||
self._state = FINISHED | ||||
for waiter in self._waiters: | ||||
waiter.add_result(self) | ||||
self._condition.notify_all() | ||||
self._invoke_callbacks() | ||||
def set_exception_info(self, exception, traceback): | ||||
"""Sets the result of the future as being the given exception | ||||
and traceback. | ||||
Should only be used by Executor implementations and unit tests. | ||||
""" | ||||
with self._condition: | ||||
self._exception = exception | ||||
self._traceback = traceback | ||||
self._state = FINISHED | ||||
for waiter in self._waiters: | ||||
waiter.add_exception(self) | ||||
self._condition.notify_all() | ||||
self._invoke_callbacks() | ||||
def set_exception(self, exception): | ||||
"""Sets the result of the future as being the given exception. | ||||
Should only be used by Executor implementations and unit tests. | ||||
""" | ||||
self.set_exception_info(exception, None) | ||||
class Executor(object): | ||||
"""This is an abstract base class for concrete asynchronous executors.""" | ||||
def submit(self, fn, *args, **kwargs): | ||||
"""Submits a callable to be executed with the given arguments. | ||||
Schedules the callable to be executed as fn(*args, **kwargs) and returns | ||||
a Future instance representing the execution of the callable. | ||||
Returns: | ||||
A Future representing the given call. | ||||
""" | ||||
raise NotImplementedError() | ||||
def map(self, fn, *iterables, **kwargs): | ||||
"""Returns an iterator equivalent to map(fn, iter). | ||||
Args: | ||||
fn: A callable that will take as many arguments as there are | ||||
passed iterables. | ||||
timeout: The maximum number of seconds to wait. If None, then there | ||||
is no limit on the wait time. | ||||
Returns: | ||||
An iterator equivalent to: map(func, *iterables) but the calls may | ||||
be evaluated out-of-order. | ||||
Raises: | ||||
TimeoutError: If the entire result iterator could not be generated | ||||
before the given timeout. | ||||
Exception: If fn(*args) raises for any values. | ||||
""" | ||||
timeout = kwargs.get('timeout') | ||||
if timeout is not None: | ||||
end_time = timeout + time.time() | ||||
fs = [self.submit(fn, *args) for args in itertools.izip(*iterables)] | ||||
# Yield must be hidden in closure so that the futures are submitted | ||||
# before the first iterator value is required. | ||||
def result_iterator(): | ||||
try: | ||||
# reverse to keep finishing order | ||||
fs.reverse() | ||||
while fs: | ||||
# Careful not to keep a reference to the popped future | ||||
if timeout is None: | ||||
yield fs.pop().result() | ||||
else: | ||||
yield fs.pop().result(end_time - time.time()) | ||||
finally: | ||||
for future in fs: | ||||
future.cancel() | ||||
return result_iterator() | ||||
def shutdown(self, wait=True): | ||||
"""Clean-up the resources associated with the Executor. | ||||
It is safe to call this method several times. Otherwise, no other | ||||
methods can be called after this one. | ||||
Args: | ||||
wait: If True then shutdown will not return until all running | ||||
futures have finished executing and the resources used by the | ||||
executor have been reclaimed. | ||||
""" | ||||
pass | ||||
def __enter__(self): | ||||
return self | ||||
def __exit__(self, exc_type, exc_val, exc_tb): | ||||
self.shutdown(wait=True) | ||||
return False | ||||