# HG changeset patch # User Pierre-Yves David # Date 2024-10-01 14:07:51 # Node ID a47f09da8bd175f1f9aada7c6548d7010f48558e # Parent 4801fde72cc2341e17b3da32cd3a23e34b9b997b stream: prefer keeping an open file handle to volatile file instead of copy We will still do copy if too many file handle are open. Currently, have less than 10 volatile files in typical usage, so we should be fine. See inline documentation for details. diff --git a/mercurial/streamclone.py b/mercurial/streamclone.py --- a/mercurial/streamclone.py +++ b/mercurial/streamclone.py @@ -567,53 +567,113 @@ def _filterfull(entry, copy, vfsmap): class VolatileManager: - """Manage temporary backup of volatile file during stream clone + """Manage temporary backups of volatile files during stream clone. - This should be used as a Python context, the copies will be discarded when - exiting the context. + This class will keep open file handles for the volatile files, writing the + smaller ones on disk if the number of open file handles grow too much. - A copy can be done by calling the object on the real path (encoded full - path) + This should be used as a Python context, the file handles and copies will + be discarded when exiting the context. - The backup path can be retrieved using the __getitem__ protocol, obj[path]. - On file without backup, it will return the unmodified path. (equivalent to - `dict.get(x, x)`) + The preservation can be done by calling the object on the real path + (encoded full path). + + Valid filehandles for any file should be retrieved by calling `open(path)`. """ + # arbitrarily picked as "it seemed fine" and much higher than the current + # usage. + MAX_OPEN = 100 + def __init__(self): + self._counter = 0 + self._volatile_fps = None self._copies = None self._dst_dir = None def __enter__(self): - if self._copies is not None: - msg = "Copies context already open" - raise error.ProgrammingError(msg) + if self._counter == 0: + assert self._volatile_fps is None + self._volatile_fps = {} + self._counter += 1 + return self + + def __exit__(self, *args, **kwars): + """discard all backups""" + self._counter -= 1 + if self._counter == 0: + for _size, fp in self._volatile_fps.values(): + fp.close() + self._volatile_fps = None + if self._copies is not None: + for tmp in self._copies.values(): + util.tryunlink(tmp) + util.tryrmdir(self._dst_dir) + self._copies = None + self._dst_dir = None + assert self._volatile_fps is None + assert self._copies is None + assert self._dst_dir is None + + def _init_tmp_copies(self): + """prepare a temporary directory to save volatile files + + This will be used as backup if we have too many files open""" + assert 0 < self._counter + assert self._copies is None + assert self._dst_dir is None self._copies = {} self._dst_dir = pycompat.mkdtemp(prefix=b'hg-clone-') - return self + + def _flush_some_on_disk(self): + """move some of the open files to tempory files on disk""" + if self._copies is None: + self._init_tmp_copies() + flush_count = self.MAX_OPEN // 2 + for src, (size, fp) in sorted(self._volatile_fps.items())[:flush_count]: + prefix = os.path.basename(src) + fd, dst = pycompat.mkstemp(prefix=prefix, dir=self._dst_dir) + self._copies[src] = dst + os.close(fd) + # we no longer hardlink, but on the other hand we rarely do this, + # and we do it for the smallest file only and not at all in the + # common case. + with open(dst, 'wb') as bck: + fp.seek(0) + bck.write(fp.read()) + del self._volatile_fps[src] + fp.close() + + def _keep_one(self, src): + """preserve an open file handle for a given path""" + # store the file quickly to ensure we close it if any error happens + _, fp = self._volatile_fps[src] = (None, open(src, 'rb')) + fp.seek(0, os.SEEK_END) + size = fp.tell() + self._volatile_fps[src] = (size, fp) def __call__(self, src): - """create a backup of the file at src""" - prefix = os.path.basename(src) - fd, dst = pycompat.mkstemp(prefix=prefix, dir=self._dst_dir) - os.close(fd) - self._copies[src] = dst - util.copyfiles(src, dst, hardlink=True) - return dst + """preserve the volatile file at src""" + assert 0 < self._counter + if len(self._volatile_fps) >= (self.MAX_OPEN - 1): + self._flush_some_on_disk() + self._keep_one(src) @contextlib.contextmanager def open(self, src): - actual_path = self._copies.get(src, src) - with open(actual_path, 'rb') as fp: + assert 0 < self._counter + entry = self._volatile_fps.get(src) + if entry is not None: + _size, fp = entry + fp.seek(0) yield fp - - def __exit__(self, *args, **kwars): - """discard all backups""" - for tmp in self._copies.values(): - util.tryunlink(tmp) - util.tryrmdir(self._dst_dir) - self._copies = None - self._dst_dir = None + else: + if self._copies is None: + actual_path = src + else: + actual_path = self._copies.get(src, src) + with open(actual_path, 'rb') as fp: + yield fp def _makemap(repo):