diff --git a/mercurial/streamclone.py b/mercurial/streamclone.py --- a/mercurial/streamclone.py +++ b/mercurial/streamclone.py @@ -567,53 +567,113 @@ def _filterfull(entry, copy, vfsmap): class VolatileManager: - """Manage temporary backup of volatile file during stream clone + """Manage temporary backups of volatile files during stream clone. - This should be used as a Python context, the copies will be discarded when - exiting the context. + This class will keep open file handles for the volatile files, writing the + smaller ones on disk if the number of open file handles grow too much. - A copy can be done by calling the object on the real path (encoded full - path) + This should be used as a Python context, the file handles and copies will + be discarded when exiting the context. - The backup path can be retrieved using the __getitem__ protocol, obj[path]. - On file without backup, it will return the unmodified path. (equivalent to - `dict.get(x, x)`) + The preservation can be done by calling the object on the real path + (encoded full path). + + Valid filehandles for any file should be retrieved by calling `open(path)`. """ + # arbitrarily picked as "it seemed fine" and much higher than the current + # usage. + MAX_OPEN = 100 + def __init__(self): + self._counter = 0 + self._volatile_fps = None self._copies = None self._dst_dir = None def __enter__(self): - if self._copies is not None: - msg = "Copies context already open" - raise error.ProgrammingError(msg) + if self._counter == 0: + assert self._volatile_fps is None + self._volatile_fps = {} + self._counter += 1 + return self + + def __exit__(self, *args, **kwars): + """discard all backups""" + self._counter -= 1 + if self._counter == 0: + for _size, fp in self._volatile_fps.values(): + fp.close() + self._volatile_fps = None + if self._copies is not None: + for tmp in self._copies.values(): + util.tryunlink(tmp) + util.tryrmdir(self._dst_dir) + self._copies = None + self._dst_dir = None + assert self._volatile_fps is None + assert self._copies is None + assert self._dst_dir is None + + def _init_tmp_copies(self): + """prepare a temporary directory to save volatile files + + This will be used as backup if we have too many files open""" + assert 0 < self._counter + assert self._copies is None + assert self._dst_dir is None self._copies = {} self._dst_dir = pycompat.mkdtemp(prefix=b'hg-clone-') - return self + + def _flush_some_on_disk(self): + """move some of the open files to tempory files on disk""" + if self._copies is None: + self._init_tmp_copies() + flush_count = self.MAX_OPEN // 2 + for src, (size, fp) in sorted(self._volatile_fps.items())[:flush_count]: + prefix = os.path.basename(src) + fd, dst = pycompat.mkstemp(prefix=prefix, dir=self._dst_dir) + self._copies[src] = dst + os.close(fd) + # we no longer hardlink, but on the other hand we rarely do this, + # and we do it for the smallest file only and not at all in the + # common case. + with open(dst, 'wb') as bck: + fp.seek(0) + bck.write(fp.read()) + del self._volatile_fps[src] + fp.close() + + def _keep_one(self, src): + """preserve an open file handle for a given path""" + # store the file quickly to ensure we close it if any error happens + _, fp = self._volatile_fps[src] = (None, open(src, 'rb')) + fp.seek(0, os.SEEK_END) + size = fp.tell() + self._volatile_fps[src] = (size, fp) def __call__(self, src): - """create a backup of the file at src""" - prefix = os.path.basename(src) - fd, dst = pycompat.mkstemp(prefix=prefix, dir=self._dst_dir) - os.close(fd) - self._copies[src] = dst - util.copyfiles(src, dst, hardlink=True) - return dst + """preserve the volatile file at src""" + assert 0 < self._counter + if len(self._volatile_fps) >= (self.MAX_OPEN - 1): + self._flush_some_on_disk() + self._keep_one(src) @contextlib.contextmanager def open(self, src): - actual_path = self._copies.get(src, src) - with open(actual_path, 'rb') as fp: + assert 0 < self._counter + entry = self._volatile_fps.get(src) + if entry is not None: + _size, fp = entry + fp.seek(0) yield fp - - def __exit__(self, *args, **kwars): - """discard all backups""" - for tmp in self._copies.values(): - util.tryunlink(tmp) - util.tryrmdir(self._dst_dir) - self._copies = None - self._dst_dir = None + else: + if self._copies is None: + actual_path = src + else: + actual_path = self._copies.get(src, src) + with open(actual_path, 'rb') as fp: + yield fp def _makemap(repo):