# HG changeset patch # User Simon Sapin # Date 2021-06-08 17:55:00 # Node ID e0a314bcbc9daa5325aa45b0864bf9631a617165 # Parent f77404040776c9cf9f2146b83ab69534f84b309a revlog: Extract low-level random-access file read caching logic The `revlog` class does many things, among which fulfilling requests for arbitrary byte slices from the revlog "data file" by reading a larger chunk and caching it in memory, in order to reduce the number of system calls. This extracts that logic into a new class, so that it may later also be used for the side-data file (with another instance of that class). The copyright notice of the new file does not include a date or author name since such information tend not to be kept up-to-date: https://www.linuxfoundation.org/en/blog/copyright-notices-in-open-source-software-projects/ Differential Revision: https://phab.mercurial-scm.org/D10878 diff --git a/mercurial/changelog.py b/mercurial/changelog.py --- a/mercurial/changelog.py +++ b/mercurial/changelog.py @@ -454,6 +454,7 @@ class changelog(revlog.revlog): self.opener = _delayopener( self._realopener, self._indexfile, self._delaybuf ) + self._segmentfile.opener = self.opener self._delayed = True tr.addpending(b'cl-%i' % id(self), self._writepending) tr.addfinalize(b'cl-%i' % id(self), self._finalize) @@ -462,6 +463,7 @@ class changelog(revlog.revlog): """finalize index updates""" self._delayed = False self.opener = self._realopener + self._segmentfile.opener = self.opener # move redirected index data back into place if self._docket is not None: self._write_docket(tr) @@ -501,6 +503,7 @@ class changelog(revlog.revlog): self._delaybuf = None self._divert = True self.opener = _divertopener(self._realopener, self._indexfile) + self._segmentfile.opener = self.opener if self._divert: return True diff --git a/mercurial/revlog.py b/mercurial/revlog.py --- a/mercurial/revlog.py +++ b/mercurial/revlog.py @@ -86,6 +86,7 @@ from .revlogutils import ( docket as docketutil, flagutil, nodemap as nodemaputil, + randomaccessfile, revlogv0, sidedata as sidedatautil, ) @@ -125,7 +126,6 @@ rustrevlog = policy.importrust('revlog') # max size of revlog with inline data _maxinline = 131072 -_chunksize = 1048576 # Flag processors for REVIDX_ELLIPSIS. def ellipsisreadprocessor(rl, text): @@ -232,10 +232,6 @@ def parse_index_v1_mixed(data, inline): # signed integer) _maxentrysize = 0x7FFFFFFF -PARTIAL_READ_MSG = _( - b'partial read of revlog %s; expected %d bytes from offset %d, got %d' -) - FILE_TOO_SHORT_MSG = _( b'cannot read from revlog %s;' b' expected %d bytes from offset %d, data size is %d' @@ -605,7 +601,7 @@ class revlog(object): self._parse_index = parse_index_v1_mixed try: d = self._parse_index(index_data, self._inline) - index, _chunkcache = d + index, chunkcache = d use_nodemap = ( not self._inline and self._nodemap_file is not None @@ -626,9 +622,13 @@ class revlog(object): raise error.RevlogError( _(b"index %s is corrupted") % self.display_id ) - self.index, self._chunkcache = d - if not self._chunkcache: - self._chunkclear() + self.index = index + self._segmentfile = randomaccessfile.randomaccessfile( + self.opener, + (self._indexfile if self._inline else self._datafile), + self._chunkcachesize, + chunkcache, + ) # revnum -> (chain-length, sum-delta-length) self._chaininfocache = util.lrucachedict(500) # revlog header -> revlog compressor @@ -709,32 +709,6 @@ class revlog(object): return self.opener(self._datafile, mode=mode) @contextlib.contextmanager - def _datareadfp(self, existingfp=None): - """file object suitable to read data""" - # Use explicit file handle, if given. - if existingfp is not None: - yield existingfp - - # Use a file handle being actively used for writes, if available. - # There is some danger to doing this because reads will seek the - # file. However, _writeentry() performs a SEEK_END before all writes, - # so we should be safe. - elif self._writinghandles: - if self._inline: - yield self._writinghandles[0] - else: - yield self._writinghandles[1] - - # Otherwise open a new file handle. - else: - if self._inline: - func = self._indexfp - else: - func = self._datafp - with func() as fp: - yield fp - - @contextlib.contextmanager def _sidedatareadfp(self): """file object suitable to read sidedata""" if self._writinghandles: @@ -807,7 +781,7 @@ class revlog(object): def clearcaches(self): self._revisioncache = None self._chainbasecache.clear() - self._chunkcache = (0, b'') + self._segmentfile.clear_cache() self._pcache = {} self._nodemap_docket = None self.index.clearcaches() @@ -1629,85 +1603,6 @@ class revlog(object): p1, p2 = self.parents(node) return storageutil.hashrevisionsha1(text, p1, p2) != node - def _cachesegment(self, offset, data): - """Add a segment to the revlog cache. - - Accepts an absolute offset and the data that is at that location. - """ - o, d = self._chunkcache - # try to add to existing cache - if o + len(d) == offset and len(d) + len(data) < _chunksize: - self._chunkcache = o, d + data - else: - self._chunkcache = offset, data - - def _readsegment(self, offset, length, df=None): - """Load a segment of raw data from the revlog. - - Accepts an absolute offset, length to read, and an optional existing - file handle to read from. - - If an existing file handle is passed, it will be seeked and the - original seek position will NOT be restored. - - Returns a str or buffer of raw byte data. - - Raises if the requested number of bytes could not be read. - """ - # Cache data both forward and backward around the requested - # data, in a fixed size window. This helps speed up operations - # involving reading the revlog backwards. - cachesize = self._chunkcachesize - realoffset = offset & ~(cachesize - 1) - reallength = ( - (offset + length + cachesize) & ~(cachesize - 1) - ) - realoffset - with self._datareadfp(df) as df: - df.seek(realoffset) - d = df.read(reallength) - - self._cachesegment(realoffset, d) - if offset != realoffset or reallength != length: - startoffset = offset - realoffset - if len(d) - startoffset < length: - filename = self._indexfile if self._inline else self._datafile - got = len(d) - startoffset - m = PARTIAL_READ_MSG % (filename, length, offset, got) - raise error.RevlogError(m) - return util.buffer(d, startoffset, length) - - if len(d) < length: - filename = self._indexfile if self._inline else self._datafile - got = len(d) - startoffset - m = PARTIAL_READ_MSG % (filename, length, offset, got) - raise error.RevlogError(m) - - return d - - def _getsegment(self, offset, length, df=None): - """Obtain a segment of raw data from the revlog. - - Accepts an absolute offset, length of bytes to obtain, and an - optional file handle to the already-opened revlog. If the file - handle is used, it's original seek position will not be preserved. - - Requests for data may be returned from a cache. - - Returns a str or a buffer instance of raw byte data. - """ - o, d = self._chunkcache - l = len(d) - - # is it in the cache? - cachestart = offset - o - cacheend = cachestart + length - if cachestart >= 0 and cacheend <= l: - if cachestart == 0 and cacheend == l: - return d # avoid a copy - return util.buffer(d, cachestart, cacheend - cachestart) - - return self._readsegment(offset, length, df=df) - def _getsegmentforrevs(self, startrev, endrev, df=None): """Obtain a segment of raw data corresponding to a range of revisions. @@ -1740,7 +1635,7 @@ class revlog(object): end += (endrev + 1) * self.index.entry_size length = end - start - return start, self._getsegment(start, length, df=df) + return start, self._segmentfile.read_chunk(start, length, df) def _chunk(self, rev, df=None): """Obtain a single decompressed chunk for a revision. @@ -1832,10 +1727,6 @@ class revlog(object): return l - def _chunkclear(self): - """Clear the raw chunk cache.""" - self._chunkcache = (0, b'') - def deltaparent(self, rev): """return deltaparent of the given revision""" base = self.index[rev][3] @@ -2043,7 +1934,12 @@ class revlog(object): length = sidedata_size offset = sidedata_offset got = len(comp_segment) - m = PARTIAL_READ_MSG % (filename, length, offset, got) + m = randomaccessfile.PARTIAL_READ_MSG % ( + filename, + length, + offset, + got, + ) raise error.RevlogError(m) comp = self.index[rev][11] @@ -2136,6 +2032,7 @@ class revlog(object): # We can't use the cached file handle after close(). So prevent # its usage. self._writinghandles = None + self._segmentfile.writing_handle = None new_dfh = self._datafp(b'w+') new_dfh.truncate(0) # drop any potentially existing data @@ -2171,12 +2068,17 @@ class revlog(object): tr.replace(self._indexfile, trindex * self.index.entry_size) nodemaputil.setup_persistent_nodemap(tr, self) - self._chunkclear() + self._segmentfile = randomaccessfile.randomaccessfile( + self.opener, + self._datafile, + self._chunkcachesize, + ) if existing_handles: # switched from inline to conventional reopen the index ifh = self.__index_write_fp() self._writinghandles = (ifh, new_dfh, None) + self._segmentfile.writing_handle = new_dfh new_dfh = None finally: if new_dfh is not None: @@ -2235,11 +2137,13 @@ class revlog(object): transaction.add(self._indexfile, isize) # exposing all file handle for writing. self._writinghandles = (ifh, dfh, sdfh) + self._segmentfile.writing_handle = ifh if self._inline else dfh yield if self._docket is not None: self._write_docket(transaction) finally: self._writinghandles = None + self._segmentfile.writing_handle = None if dfh is not None: dfh.close() if sdfh is not None: @@ -2873,7 +2777,7 @@ class revlog(object): # then reset internal state in memory to forget those revisions self._revisioncache = None self._chaininfocache = util.lrucachedict(500) - self._chunkclear() + self._segmentfile.clear_cache() del self.index[rev:-1] diff --git a/mercurial/revlogutils/randomaccessfile.py b/mercurial/revlogutils/randomaccessfile.py new file mode 100644 --- /dev/null +++ b/mercurial/revlogutils/randomaccessfile.py @@ -0,0 +1,138 @@ +# Copyright Mercurial Contributors +# +# This software may be used and distributed according to the terms of the +# GNU General Public License version 2 or any later version. + +import contextlib + +from ..i18n import _ +from .. import ( + error, + util, +) + + +_MAX_CACHED_CHUNK_SIZE = 1048576 # 1 MiB + +PARTIAL_READ_MSG = _( + b'partial read of revlog %s; expected %d bytes from offset %d, got %d' +) + + +def _is_power_of_two(n): + return (n & (n - 1) == 0) and n != 0 + + +class randomaccessfile(object): + """Accessing arbitrary chuncks of data within a file, with some caching""" + + def __init__( + self, + opener, + filename, + default_cached_chunk_size, + initial_cache=None, + ): + # Required by bitwise manipulation below + assert _is_power_of_two(default_cached_chunk_size) + + self.opener = opener + self.filename = filename + self.default_cached_chunk_size = default_cached_chunk_size + self.writing_handle = None # This is set from revlog.py + self._cached_chunk = b'' + self._cached_chunk_position = 0 # Offset from the start of the file + if initial_cache: + self._cached_chunk_position, self._cached_chunk = initial_cache + + def clear_cache(self): + self._cached_chunk = b'' + self._cached_chunk_position = 0 + + def _open(self, mode=b'r'): + """Return a file object""" + return self.opener(self.filename, mode=mode) + + @contextlib.contextmanager + def _open_read(self, existing_file_obj=None): + """File object suitable for reading data""" + # Use explicit file handle, if given. + if existing_file_obj is not None: + yield existing_file_obj + + # Use a file handle being actively used for writes, if available. + # There is some danger to doing this because reads will seek the + # file. However, revlog._writeentry performs a SEEK_END before all + # writes, so we should be safe. + elif self.writing_handle: + yield self.writing_handle + + # Otherwise open a new file handle. + else: + with self._open() as fp: + yield fp + + def read_chunk(self, offset, length, existing_file_obj=None): + """Read a chunk of bytes from the file. + + Accepts an absolute offset, length to read, and an optional existing + file handle to read from. + + If an existing file handle is passed, it will be seeked and the + original seek position will NOT be restored. + + Returns a str or buffer of raw byte data. + + Raises if the requested number of bytes could not be read. + """ + end = offset + length + cache_start = self._cached_chunk_position + cache_end = cache_start + len(self._cached_chunk) + # Is the requested chunk within the cache? + if cache_start <= offset and end <= cache_end: + if cache_start == offset and end == cache_end: + return self._cached_chunk # avoid a copy + relative_start = offset - cache_start + return util.buffer(self._cached_chunk, relative_start, length) + + return self._read_and_update_cache(offset, length, existing_file_obj) + + def _read_and_update_cache(self, offset, length, existing_file_obj=None): + # Cache data both forward and backward around the requested + # data, in a fixed size window. This helps speed up operations + # involving reading the revlog backwards. + real_offset = offset & ~(self.default_cached_chunk_size - 1) + real_length = ( + (offset + length + self.default_cached_chunk_size) + & ~(self.default_cached_chunk_size - 1) + ) - real_offset + with self._open_read(existing_file_obj) as file_obj: + file_obj.seek(real_offset) + data = file_obj.read(real_length) + + self._add_cached_chunk(real_offset, data) + + relative_offset = offset - real_offset + got = len(data) - relative_offset + if got < length: + message = PARTIAL_READ_MSG % (self.filename, length, offset, got) + raise error.RevlogError(message) + + if offset != real_offset or real_length != length: + return util.buffer(data, relative_offset, length) + return data + + def _add_cached_chunk(self, offset, data): + """Add to or replace the cached data chunk. + + Accepts an absolute offset and the data that is at that location. + """ + if ( + self._cached_chunk_position + len(self._cached_chunk) == offset + and len(self._cached_chunk) + len(data) < _MAX_CACHED_CHUNK_SIZE + ): + # add to existing cache + self._cached_chunk += data + else: + self._cached_chunk = data + self._cached_chunk_position = offset