diff --git a/mercurial/help/config.txt b/mercurial/help/config.txt --- a/mercurial/help/config.txt +++ b/mercurial/help/config.txt @@ -2037,3 +2037,27 @@ helps performance. Number of CPUs to use for parallel operations. A zero or negative value is treated as ``use the default``. (default: 4 or the number of CPUs on the system, whichever is larger) + +``backgroundclose`` + Whether to enable closing file handles on background threads during certain + operations. Some platforms aren't very efficient at closing file + handles that have been written or appened to. By performing file closing + on background threads, file write rate can increase substantially. + (default: true on Windows, false elsewhere) + +``backgroundcloseminfilecount`` + Minimum number of files required to trigger background file closing. + Operations not writing this many files won't start background close + threads. + (default: 2048) + +``backgroundclosemaxqueue`` + The maximum number of opened file handles waiting to be closed in the + background. This option only has an effect if ``backgroundclose`` is + enabled. + (default: 384) + +``backgroundclosethreadcount`` + Number of threads to process background file closes. Only relevant if + ``backgroundclose`` is enabled. + (default: 4) diff --git a/mercurial/scmutil.py b/mercurial/scmutil.py --- a/mercurial/scmutil.py +++ b/mercurial/scmutil.py @@ -7,6 +7,8 @@ from __future__ import absolute_import +import Queue +import contextlib import errno import glob import os @@ -14,6 +16,7 @@ import re import shutil import stat import tempfile +import threading from .i18n import _ from .node import wdirrev @@ -254,7 +257,7 @@ class abstractvfs(object): return [] def open(self, path, mode="r", text=False, atomictemp=False, - notindexed=False): + notindexed=False, backgroundclose=False): '''Open ``path`` file, which is relative to vfs root. Newly created directories are marked as "not to be indexed by @@ -262,7 +265,8 @@ class abstractvfs(object): for "write" mode access. ''' self.open = self.__call__ - return self.__call__(path, mode, text, atomictemp, notindexed) + return self.__call__(path, mode, text, atomictemp, notindexed, + backgroundclose=backgroundclose) def read(self, path): with self(path, 'rb') as fp: @@ -436,6 +440,27 @@ class abstractvfs(object): for dirpath, dirs, files in os.walk(self.join(path), onerror=onerror): yield (dirpath[prefixlen:], dirs, files) + @contextlib.contextmanager + def backgroundclosing(self, ui, expectedcount=-1): + """Allow files to be closed asynchronously. + + When this context manager is active, ``backgroundclose`` can be passed + to ``__call__``/``open`` to result in the file possibly being closed + asynchronously, on a background thread. + """ + # This is an arbitrary restriction and could be changed if we ever + # have a use case. + vfs = getattr(self, 'vfs', self) + if getattr(vfs, '_backgroundfilecloser', None): + raise error.Abort('can only have 1 active background file closer') + + with backgroundfilecloser(ui, expectedcount=expectedcount) as bfc: + try: + vfs._backgroundfilecloser = bfc + yield bfc + finally: + vfs._backgroundfilecloser = None + class vfs(abstractvfs): '''Operate files relative to a base directory @@ -478,12 +503,25 @@ class vfs(abstractvfs): os.chmod(name, self.createmode & 0o666) def __call__(self, path, mode="r", text=False, atomictemp=False, - notindexed=False): + notindexed=False, backgroundclose=False): '''Open ``path`` file, which is relative to vfs root. Newly created directories are marked as "not to be indexed by the content indexing service", if ``notindexed`` is specified for "write" mode access. + + If ``backgroundclose`` is passed, the file may be closed asynchronously. + It can only be used if the ``self.backgroundclosing()`` context manager + is active. This should only be specified if the following criteria hold: + + 1. There is a potential for writing thousands of files. Unless you + are writing thousands of files, the performance benefits of + asynchronously closing files is not realized. + 2. Files are opened exactly once for the ``backgroundclosing`` + active duration and are therefore free of race conditions between + closing a file on a background thread and reopening it. (If the + file were opened multiple times, there could be unflushed data + because the original file handle hasn't been flushed/closed yet.) ''' if self._audit: r = util.checkosfilename(path) @@ -528,6 +566,14 @@ class vfs(abstractvfs): fp = util.posixfile(f, mode) if nlink == 0: self._fixfilemode(f) + + if backgroundclose: + if not self._backgroundfilecloser: + raise error.Abort('backgroundclose can only be used when a ' + 'backgroundclosing context manager is active') + + fp = delayclosedfile(fp, self._backgroundfilecloser) + return fp def symlink(self, src, dst): @@ -1211,3 +1257,123 @@ def gddeltaconfig(ui): """ # experimental config: format.generaldelta return ui.configbool('format', 'generaldelta', False) + +class delayclosedfile(object): + """Proxy for a file object whose close is delayed. + + Do not instantiate outside of the vfs layer. + """ + + def __init__(self, fh, closer): + object.__setattr__(self, '_origfh', fh) + object.__setattr__(self, '_closer', closer) + + def __getattr__(self, attr): + return getattr(self._origfh, attr) + + def __setattr__(self, attr, value): + return setattr(self._origfh, attr, value) + + def __delattr__(self, attr): + return delattr(self._origfh, attr) + + def __enter__(self): + return self._origfh.__enter__() + + def __exit__(self, exc_type, exc_value, exc_tb): + self._closer.close(self._origfh) + + def close(self): + self._closer.close(self._origfh) + +class backgroundfilecloser(object): + """Coordinates background closing of file handles on multiple threads.""" + def __init__(self, ui, expectedcount=-1): + self._running = False + self._entered = False + self._threads = [] + self._threadexception = None + + # Only Windows/NTFS has slow file closing. So only enable by default + # on that platform. But allow to be enabled elsewhere for testing. + defaultenabled = os.name == 'nt' + enabled = ui.configbool('worker', 'backgroundclose', defaultenabled) + + if not enabled: + return + + # There is overhead to starting and stopping the background threads. + # Don't do background processing unless the file count is large enough + # to justify it. + minfilecount = ui.configint('worker', 'backgroundcloseminfilecount', + 2048) + # FUTURE dynamically start background threads after minfilecount closes. + # (We don't currently have any callers that don't know their file count) + if expectedcount > 0 and expectedcount < minfilecount: + return + + # Windows defaults to a limit of 512 open files. A buffer of 128 + # should give us enough headway. + maxqueue = ui.configint('worker', 'backgroundclosemaxqueue', 384) + threadcount = ui.configint('worker', 'backgroundclosethreadcount', 4) + + ui.debug('starting %d threads for background file closing\n' % + threadcount) + + self._queue = Queue.Queue(maxsize=maxqueue) + self._running = True + + for i in range(threadcount): + t = threading.Thread(target=self._worker, name='backgroundcloser') + self._threads.append(t) + t.start() + + def __enter__(self): + self._entered = True + return self + + def __exit__(self, exc_type, exc_value, exc_tb): + self._running = False + + # Wait for threads to finish closing so open files don't linger for + # longer than lifetime of context manager. + for t in self._threads: + t.join() + + def _worker(self): + """Main routine for worker thread.""" + while True: + try: + fh = self._queue.get(block=True, timeout=0.100) + # Need to catch or the thread will terminate and + # we could orphan file descriptors. + try: + fh.close() + except Exception as e: + # Stash so can re-raise from main thread later. + self._threadexception = e + except Queue.Empty: + if not self._running: + break + + def close(self, fh): + """Schedule a file for closing.""" + if not self._entered: + raise error.Abort('can only call close() when context manager ' + 'active') + + # If a background thread encountered an exception, raise now so we fail + # fast. Otherwise we may potentially go on for minutes until the error + # is acted on. + if self._threadexception: + e = self._threadexception + self._threadexception = None + raise e + + # If we're not actively running, close synchronously. + if not self._running: + fh.close() + return + + self._queue.put(fh, block=True, timeout=None) +