##// END OF EJS Templates
discovery: fix embarrassing typo in slice definition...
discovery: fix embarrassing typo in slice definition The code introduced in e514799e4e07 ended up having a silly bug. The indexing selected a single item slice picking only p1. The discovery result was still correct, but the sampling was hampered, sometime leading to much more round trips being performed. Fixing this issue restore the previous sampling behavior. This fix has a negative performance impact on the pathological case the previous test has been built. # parent of this changesets ! wall 5.313884 comb 5.310000 user 5.260000 sys 0.050000 (best of 5) ! wall 6.711860 comb 6.710000 user 6.670000 sys 0.040000 (max of 5) ! wall 5.844016 comb 5.842000 user 5.784000 sys 0.058000 (avg of 5) ! wall 5.778635 comb 5.780000 user 5.740000 sys 0.040000 (median of 5) # With this changesets. ! wall 6.350879 comb 6.350000 user 6.300000 sys 0.050000 (best of 5) ! wall 6.653647 comb 6.660000 user 6.480000 sys 0.180000 (max of 5) ! wall 6.492762 comb 6.494000 user 6.414000 sys 0.080000 (avg of 5) ! wall 6.547577 comb 6.550000 user 6.490000 sys 0.060000 (median of 5) Changeset e514799e4e07 raised the question of using the "_uncheckedparentrevs" instead of the current code. So I ran comparative timing: # old code: 55919b96c02a (e514799e4e07 parent) ! wall 64.078708 comb 64.080000 user 63.160000 sys 0.920000 (best of 5) ! wall 68.296300 comb 68.290000 user 67.410000 sys 0.880000 (max of 5) ! wall 65.899075 comb 65.894000 user 65.082000 sys 0.812000 (avg of 5) ! wall 66.140286 comb 66.130000 user 65.330000 sys 0.800000 (median of 5) # buggy code: e514799e4e07 ! wall 46.605362 comb 46.610000 user 45.880000 sys 0.730000 (best of 5) ! wall 48.619659 comb 48.620000 user 47.890000 sys 0.730000 (max of 5) ! wall 47.350247 comb 47.350000 user 46.672000 sys 0.678000 (avg of 5) ! wall 46.983224 comb 46.980000 user 46.350000 sys 0.630000 (median of 5) # fixed code: e514799e4e07 with this fix ! wall 55.858460 comb 55.850000 user 55.090000 sys 0.760000 (best of 5) ! wall 59.048805 comb 59.060000 user 58.110000 sys 0.950000 (max of 5) ! wall 57.192639 comb 57.192000 user 56.350000 sys 0.842000 (avg of 5) ! wall 57.056373 comb 57.060000 user 56.160000 sys 0.900000 (median of 5) # version using uncheckedparents ! wall 56.471916 comb 56.470000 user 55.630000 sys 0.840000 (best of 5) ! wall 58.228793 comb 58.230000 user 57.600000 sys 0.630000 (max of 5) ! wall 57.377583 comb 57.378000 user 56.674000 sys 0.704000 (avg of 5) ! wall 57.008843 comb 57.010000 user 56.330000 sys 0.680000 (median of 5) So it looks like the overhead from `_uncheckedparentrevs` is not that impactful. I'll investigate this shortly. I'm almost done updating our benchmark suite with more meaningful discovery cases.

File last commit:

r37644:0a9c0d34 default
r42145:0d467e4d default
Show More
thread.py
162 lines | 5.5 KiB | text/x-python | PythonLexer
# Copyright 2009 Brian Quinlan. All Rights Reserved.
# Licensed to PSF under a Contributor Agreement.
"""Implements ThreadPoolExecutor."""
from __future__ import absolute_import
import atexit
from . import _base
import itertools
import Queue as queue
import threading
import weakref
import sys
try:
from multiprocessing import cpu_count
except ImportError:
# some platforms don't have multiprocessing
def cpu_count():
return None
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
# Workers are created as daemon threads. This is done to allow the interpreter
# to exit when there are still idle threads in a ThreadPoolExecutor's thread
# pool (i.e. shutdown() was not called). However, allowing workers to die with
# the interpreter has two undesirable properties:
# - The workers would still be running during interpretor shutdown,
# meaning that they would fail in unpredictable ways.
# - The workers could be killed while evaluating a work item, which could
# be bad if the callable being evaluated has external side-effects e.g.
# writing to a file.
#
# To work around this problem, an exit handler is installed which tells the
# workers to exit when their work queues are empty and then waits until the
# threads finish.
_threads_queues = weakref.WeakKeyDictionary()
_shutdown = False
def _python_exit():
global _shutdown
_shutdown = True
items = list(_threads_queues.items()) if _threads_queues else ()
for t, q in items:
q.put(None)
for t, q in items:
t.join(sys.maxint)
atexit.register(_python_exit)
class _WorkItem(object):
def __init__(self, future, fn, args, kwargs):
self.future = future
self.fn = fn
self.args = args
self.kwargs = kwargs
def run(self):
if not self.future.set_running_or_notify_cancel():
return
try:
result = self.fn(*self.args, **self.kwargs)
except:
e, tb = sys.exc_info()[1:]
self.future.set_exception_info(e, tb)
else:
self.future.set_result(result)
def _worker(executor_reference, work_queue):
try:
while True:
work_item = work_queue.get(block=True)
if work_item is not None:
work_item.run()
# Delete references to object. See issue16284
del work_item
continue
executor = executor_reference()
# Exit if:
# - The interpreter is shutting down OR
# - The executor that owns the worker has been collected OR
# - The executor that owns the worker has been shutdown.
if _shutdown or executor is None or executor._shutdown:
# Notice other workers
work_queue.put(None)
return
del executor
except:
_base.LOGGER.critical('Exception in worker', exc_info=True)
class ThreadPoolExecutor(_base.Executor):
# Used to assign unique thread names when thread_name_prefix is not supplied.
_counter = itertools.count().next
def __init__(self, max_workers=None, thread_name_prefix=''):
"""Initializes a new ThreadPoolExecutor instance.
Args:
max_workers: The maximum number of threads that can be used to
execute the given calls.
thread_name_prefix: An optional name prefix to give our threads.
"""
if max_workers is None:
# Use this number because ThreadPoolExecutor is often
# used to overlap I/O instead of CPU work.
max_workers = (cpu_count() or 1) * 5
if max_workers <= 0:
raise ValueError("max_workers must be greater than 0")
self._max_workers = max_workers
self._work_queue = queue.Queue()
self._threads = set()
self._shutdown = False
self._shutdown_lock = threading.Lock()
self._thread_name_prefix = (thread_name_prefix or
("ThreadPoolExecutor-%d" % self._counter()))
def submit(self, fn, *args, **kwargs):
with self._shutdown_lock:
if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown')
f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)
self._work_queue.put(w)
self._adjust_thread_count()
return f
submit.__doc__ = _base.Executor.submit.__doc__
def _adjust_thread_count(self):
# When the executor gets lost, the weakref callback will wake up
# the worker threads.
def weakref_cb(_, q=self._work_queue):
q.put(None)
# TODO(bquinlan): Should avoid creating new threads if there are more
# idle threads than items in the work queue.
num_threads = len(self._threads)
if num_threads < self._max_workers:
thread_name = '%s_%d' % (self._thread_name_prefix or self,
num_threads)
t = threading.Thread(name=thread_name, target=_worker,
args=(weakref.ref(self, weakref_cb),
self._work_queue))
t.daemon = True
t.start()
self._threads.add(t)
_threads_queues[t] = self._work_queue
def shutdown(self, wait=True):
with self._shutdown_lock:
self._shutdown = True
self._work_queue.put(None)
if wait:
for t in self._threads:
t.join(sys.maxint)
shutdown.__doc__ = _base.Executor.shutdown.__doc__