thread.py
162 lines
| 5.5 KiB
| text/x-python
|
PythonLexer
Gregory Szorc
|
r37641 | # Copyright 2009 Brian Quinlan. All Rights Reserved. | ||
# Licensed to PSF under a Contributor Agreement. | ||||
"""Implements ThreadPoolExecutor.""" | ||||
Gregory Szorc
|
r37644 | from __future__ import absolute_import | ||
Gregory Szorc
|
r37641 | import atexit | ||
Gregory Szorc
|
r37644 | from . import _base | ||
Gregory Szorc
|
r37641 | import itertools | ||
import Queue as queue | ||||
import threading | ||||
import weakref | ||||
import sys | ||||
try: | ||||
from multiprocessing import cpu_count | ||||
except ImportError: | ||||
# some platforms don't have multiprocessing | ||||
def cpu_count(): | ||||
return None | ||||
__author__ = 'Brian Quinlan (brian@sweetapp.com)' | ||||
# Workers are created as daemon threads. This is done to allow the interpreter | ||||
# to exit when there are still idle threads in a ThreadPoolExecutor's thread | ||||
# pool (i.e. shutdown() was not called). However, allowing workers to die with | ||||
# the interpreter has two undesirable properties: | ||||
# - The workers would still be running during interpretor shutdown, | ||||
# meaning that they would fail in unpredictable ways. | ||||
# - The workers could be killed while evaluating a work item, which could | ||||
# be bad if the callable being evaluated has external side-effects e.g. | ||||
# writing to a file. | ||||
# | ||||
# To work around this problem, an exit handler is installed which tells the | ||||
# workers to exit when their work queues are empty and then waits until the | ||||
# threads finish. | ||||
_threads_queues = weakref.WeakKeyDictionary() | ||||
_shutdown = False | ||||
def _python_exit(): | ||||
global _shutdown | ||||
_shutdown = True | ||||
items = list(_threads_queues.items()) if _threads_queues else () | ||||
for t, q in items: | ||||
q.put(None) | ||||
for t, q in items: | ||||
t.join(sys.maxint) | ||||
atexit.register(_python_exit) | ||||
class _WorkItem(object): | ||||
def __init__(self, future, fn, args, kwargs): | ||||
self.future = future | ||||
self.fn = fn | ||||
self.args = args | ||||
self.kwargs = kwargs | ||||
def run(self): | ||||
if not self.future.set_running_or_notify_cancel(): | ||||
return | ||||
try: | ||||
result = self.fn(*self.args, **self.kwargs) | ||||
except: | ||||
e, tb = sys.exc_info()[1:] | ||||
self.future.set_exception_info(e, tb) | ||||
else: | ||||
self.future.set_result(result) | ||||
def _worker(executor_reference, work_queue): | ||||
try: | ||||
while True: | ||||
work_item = work_queue.get(block=True) | ||||
if work_item is not None: | ||||
work_item.run() | ||||
# Delete references to object. See issue16284 | ||||
del work_item | ||||
continue | ||||
executor = executor_reference() | ||||
# Exit if: | ||||
# - The interpreter is shutting down OR | ||||
# - The executor that owns the worker has been collected OR | ||||
# - The executor that owns the worker has been shutdown. | ||||
if _shutdown or executor is None or executor._shutdown: | ||||
# Notice other workers | ||||
work_queue.put(None) | ||||
return | ||||
del executor | ||||
except: | ||||
_base.LOGGER.critical('Exception in worker', exc_info=True) | ||||
class ThreadPoolExecutor(_base.Executor): | ||||
# Used to assign unique thread names when thread_name_prefix is not supplied. | ||||
_counter = itertools.count().next | ||||
def __init__(self, max_workers=None, thread_name_prefix=''): | ||||
"""Initializes a new ThreadPoolExecutor instance. | ||||
Args: | ||||
max_workers: The maximum number of threads that can be used to | ||||
execute the given calls. | ||||
thread_name_prefix: An optional name prefix to give our threads. | ||||
""" | ||||
if max_workers is None: | ||||
# Use this number because ThreadPoolExecutor is often | ||||
# used to overlap I/O instead of CPU work. | ||||
max_workers = (cpu_count() or 1) * 5 | ||||
if max_workers <= 0: | ||||
raise ValueError("max_workers must be greater than 0") | ||||
self._max_workers = max_workers | ||||
self._work_queue = queue.Queue() | ||||
self._threads = set() | ||||
self._shutdown = False | ||||
self._shutdown_lock = threading.Lock() | ||||
self._thread_name_prefix = (thread_name_prefix or | ||||
("ThreadPoolExecutor-%d" % self._counter())) | ||||
def submit(self, fn, *args, **kwargs): | ||||
with self._shutdown_lock: | ||||
if self._shutdown: | ||||
raise RuntimeError('cannot schedule new futures after shutdown') | ||||
f = _base.Future() | ||||
w = _WorkItem(f, fn, args, kwargs) | ||||
self._work_queue.put(w) | ||||
self._adjust_thread_count() | ||||
return f | ||||
submit.__doc__ = _base.Executor.submit.__doc__ | ||||
def _adjust_thread_count(self): | ||||
# When the executor gets lost, the weakref callback will wake up | ||||
# the worker threads. | ||||
def weakref_cb(_, q=self._work_queue): | ||||
q.put(None) | ||||
# TODO(bquinlan): Should avoid creating new threads if there are more | ||||
# idle threads than items in the work queue. | ||||
num_threads = len(self._threads) | ||||
if num_threads < self._max_workers: | ||||
thread_name = '%s_%d' % (self._thread_name_prefix or self, | ||||
num_threads) | ||||
t = threading.Thread(name=thread_name, target=_worker, | ||||
args=(weakref.ref(self, weakref_cb), | ||||
self._work_queue)) | ||||
t.daemon = True | ||||
t.start() | ||||
self._threads.add(t) | ||||
_threads_queues[t] = self._work_queue | ||||
def shutdown(self, wait=True): | ||||
with self._shutdown_lock: | ||||
self._shutdown = True | ||||
self._work_queue.put(None) | ||||
if wait: | ||||
for t in self._threads: | ||||
t.join(sys.maxint) | ||||
shutdown.__doc__ = _base.Executor.shutdown.__doc__ | ||||