# nodemap.py - nodemap related code and utilities # # Copyright 2019 Pierre-Yves David # Copyright 2019 George Racinet # # This software may be used and distributed according to the terms of the # GNU General Public License version 2 or any later version. import re import struct from ..node import hex from .. import ( error, requirements, util, ) from . import docket as docket_mod class NodeMap(dict): def __missing__(self, x): raise error.RevlogError(b'unknown node: %s' % x) def test_race_hook_1(): """hook point for test This let tests to have things happens between the docket reading and the data reading""" pass def post_stream_cleanup(repo): """The stream clone might needs to remove some file if persisten nodemap was dropped while stream cloning """ if requirements.REVLOGV1_REQUIREMENT not in repo.requirements: return if requirements.NODEMAP_REQUIREMENT in repo.requirements: return unfi = repo.unfiltered() delete_nodemap(None, unfi, unfi.changelog) delete_nodemap(None, repo, unfi.manifestlog._rootstore._revlog) def persisted_data(revlog): """read the nodemap for a revlog from disk""" if revlog._nodemap_file is None: return None pdata = revlog.opener.tryread(revlog._nodemap_file) if not pdata: return None offset = 0 (version,) = S_VERSION.unpack(pdata[offset : offset + S_VERSION.size]) if version != ONDISK_VERSION: return None offset += S_VERSION.size headers = S_HEADER.unpack(pdata[offset : offset + S_HEADER.size]) uid_size, tip_rev, data_length, data_unused, tip_node_size = headers offset += S_HEADER.size docket = NodeMapDocket(pdata[offset : offset + uid_size]) offset += uid_size docket.tip_rev = tip_rev docket.tip_node = pdata[offset : offset + tip_node_size] docket.data_length = data_length docket.data_unused = data_unused filename = _rawdata_filepath(revlog, docket) use_mmap = revlog.opener.options.get(b"persistent-nodemap.mmap") test_race_hook_1() try: with revlog.opener(filename) as fd: if use_mmap: try: data = util.buffer(util.mmapread(fd, data_length)) except ValueError: # raised when the read file is too small data = b'' else: data = fd.read(data_length) except FileNotFoundError: return None if len(data) < data_length: return None return docket, data def setup_persistent_nodemap(tr, revlog): """Install whatever is needed transaction side to persist a nodemap on disk (only actually persist the nodemap if this is relevant for this revlog) """ if revlog._inline: return # inlined revlog are too small for this to be relevant if revlog._nodemap_file is None: return # we do not use persistent_nodemap on this revlog # we need to happen after the changelog finalization, in that use "cl-" callback_id = b"nm-revlog-persistent-nodemap-%s" % revlog._nodemap_file if tr.hasfinalize(callback_id): return # no need to register again tr.addpending( callback_id, lambda tr: persist_nodemap(tr, revlog, pending=True) ) tr.addfinalize(callback_id, lambda tr: persist_nodemap(tr, revlog)) class _NoTransaction: """transaction like object to update the nodemap outside a transaction""" def __init__(self): self._postclose = {} def addpostclose(self, callback_id, callback_func): self._postclose[callback_id] = callback_func def registertmp(self, *args, **kwargs): pass def addbackup(self, *args, **kwargs): pass def add(self, *args, **kwargs): pass def addabort(self, *args, **kwargs): pass def _report(self, *args): pass def update_persistent_nodemap(revlog): """update the persistent nodemap right now To be used for updating the nodemap on disk outside of a normal transaction setup (eg, `debugupdatecache`). """ if revlog._inline: return # inlined revlog are too small for this to be relevant if revlog._nodemap_file is None: return # we do not use persistent_nodemap on this revlog notr = _NoTransaction() persist_nodemap(notr, revlog) for k in sorted(notr._postclose): notr._postclose[k](None) def delete_nodemap(tr, repo, revlog): """Delete nodemap data on disk for a given revlog""" prefix = revlog.radix pattern = re.compile(br"(^|/)%s(-[0-9a-f]+\.nd|\.n(\.a)?)$" % prefix) dirpath = revlog.opener.dirname(revlog._indexfile) for f in revlog.opener.listdir(dirpath): if pattern.match(f): repo.svfs.tryunlink(f) def persist_nodemap(tr, revlog, pending=False, force=False): """Write nodemap data on disk for a given revlog""" if len(revlog.index) <= 0: return if getattr(revlog, 'filteredrevs', ()): raise error.ProgrammingError( "cannot persist nodemap of a filtered changelog" ) if revlog._nodemap_file is None: if force: revlog._nodemap_file = get_nodemap_file(revlog) else: msg = "calling persist nodemap on a revlog without the feature enabled" raise error.ProgrammingError(msg) can_incremental = hasattr(revlog.index, "nodemap_data_incremental") ondisk_docket = revlog._nodemap_docket feed_data = hasattr(revlog.index, "update_nodemap_data") use_mmap = revlog.opener.options.get(b"persistent-nodemap.mmap") data = None # first attemp an incremental update of the data if can_incremental and ondisk_docket is not None: target_docket = revlog._nodemap_docket.copy() ( src_docket, data_changed_count, data, ) = revlog.index.nodemap_data_incremental() new_length = target_docket.data_length + len(data) new_unused = target_docket.data_unused + data_changed_count if src_docket != target_docket: data = None elif new_length <= (new_unused * 10): # under 10% of unused data data = None else: datafile = _rawdata_filepath(revlog, target_docket) # EXP-TODO: if this is a cache, this should use a cache vfs, not a # store vfs tr.add(datafile, target_docket.data_length) with revlog.opener(datafile, b'r+') as fd: fd.seek(target_docket.data_length) fd.write(data) if feed_data: if use_mmap: fd.flush() new_data = util.buffer(util.mmapread(fd, new_length)) else: fd.seek(0) new_data = fd.read(new_length) target_docket.data_length = new_length target_docket.data_unused = new_unused if data is None: # otherwise fallback to a full new export target_docket = NodeMapDocket() datafile = _rawdata_filepath(revlog, target_docket) if hasattr(revlog.index, "nodemap_data_all"): data = revlog.index.nodemap_data_all() else: data = persistent_data(revlog.index) # EXP-TODO: if this is a cache, this should use a cache vfs, not a # store vfs tryunlink = revlog.opener.tryunlink def abortck(tr): tryunlink(datafile) callback_id = b"delete-%s" % datafile # some flavor of the transaction abort does not cleanup new file, it # simply empty them. tr.addabort(callback_id, abortck) with revlog.opener(datafile, b'w+') as fd: fd.write(data) if feed_data: if use_mmap: fd.flush() new_data = util.buffer(util.mmapread(fd, len(data))) else: new_data = data target_docket.data_length = len(data) target_docket.tip_rev = revlog.tiprev() target_docket.tip_node = revlog.node(target_docket.tip_rev) # EXP-TODO: if this is a cache, this should use a cache vfs, not a # store vfs file_path = revlog._nodemap_file if pending: file_path += b'.a' tr.registertmp(file_path) else: tr.addbackup(file_path) with revlog.opener(file_path, b'w', atomictemp=True) as fp: fp.write(target_docket.serialize()) revlog._nodemap_docket = target_docket if feed_data: revlog.index.update_nodemap_data(target_docket, new_data) # search for old index file in all cases, some older process might have # left one behind. olds = _other_rawdata_filepath(revlog, target_docket) if olds: realvfs = getattr(revlog, '_realopener', revlog.opener) def cleanup(tr): for oldfile in olds: realvfs.tryunlink(oldfile) callback_id = b"revlog-cleanup-nodemap-%s" % revlog._nodemap_file tr.addpostclose(callback_id, cleanup) ### Nodemap docket file # # The nodemap data are stored on disk using 2 files: # # * a raw data files containing a persistent nodemap # (see `Nodemap Trie` section) # # * a small "docket" file containing medatadata # # While the nodemap data can be multiple tens of megabytes, the "docket" is # small, it is easy to update it automatically or to duplicated its content # during a transaction. # # Multiple raw data can exist at the same time (The currently valid one and a # new one beind used by an in progress transaction). To accomodate this, the # filename hosting the raw data has a variable parts. The exact filename is # specified inside the "docket" file. # # The docket file contains information to find, qualify and validate the raw # data. Its content is currently very light, but it will expand as the on disk # nodemap gains the necessary features to be used in production. ONDISK_VERSION = 1 S_VERSION = struct.Struct(">B") S_HEADER = struct.Struct(">BQQQQ") class NodeMapDocket: """metadata associated with persistent nodemap data The persistent data may come from disk or be on their way to disk. """ def __init__(self, uid=None): if uid is None: uid = docket_mod.make_uid() # a unique identifier for the data file: # - When new data are appended, it is preserved. # - When a new data file is created, a new identifier is generated. self.uid = uid # the tipmost revision stored in the data file. This revision and all # revision before it are expected to be encoded in the data file. self.tip_rev = None # the node of that tipmost revision, if it mismatch the current index # data the docket is not valid for the current index and should be # discarded. # # note: this method is not perfect as some destructive operation could # preserve the same tip_rev + tip_node while altering lower revision. # However this multiple other caches have the same vulnerability (eg: # brancmap cache). self.tip_node = None # the size (in bytes) of the persisted data to encode the nodemap valid # for `tip_rev`. # - data file shorter than this are corrupted, # - any extra data should be ignored. self.data_length = None # the amount (in bytes) of "dead" data, still in the data file but no # longer used for the nodemap. self.data_unused = 0 def copy(self): new = NodeMapDocket(uid=self.uid) new.tip_rev = self.tip_rev new.tip_node = self.tip_node new.data_length = self.data_length new.data_unused = self.data_unused return new def __cmp__(self, other): if self.uid < other.uid: return -1 if self.uid > other.uid: return 1 elif self.data_length < other.data_length: return -1 elif self.data_length > other.data_length: return 1 return 0 def __eq__(self, other): return self.uid == other.uid and self.data_length == other.data_length def serialize(self): """return serialized bytes for a docket using the passed uid""" data = [] data.append(S_VERSION.pack(ONDISK_VERSION)) headers = ( len(self.uid), self.tip_rev, self.data_length, self.data_unused, len(self.tip_node), ) data.append(S_HEADER.pack(*headers)) data.append(self.uid) data.append(self.tip_node) return b''.join(data) def _rawdata_filepath(revlog, docket): """The (vfs relative) nodemap's rawdata file for a given uid""" prefix = revlog.radix return b"%s-%s.nd" % (prefix, docket.uid) def _other_rawdata_filepath(revlog, docket): prefix = revlog.radix pattern = re.compile(br"(^|/)%s-[0-9a-f]+\.nd$" % prefix) new_file_path = _rawdata_filepath(revlog, docket) new_file_name = revlog.opener.basename(new_file_path) dirpath = revlog.opener.dirname(new_file_path) others = [] for f in revlog.opener.listdir(dirpath): if pattern.match(f) and f != new_file_name: others.append(f) return others ### Nodemap Trie # # This is a simple reference implementation to compute and persist a nodemap # trie. This reference implementation is write only. The python version of this # is not expected to be actually used, since it wont provide performance # improvement over existing non-persistent C implementation. # # The nodemap is persisted as Trie using 4bits-address/16-entries block. each # revision can be adressed using its node shortest prefix. # # The trie is stored as a sequence of block. Each block contains 16 entries # (signed 64bit integer, big endian). Each entry can be one of the following: # # * value >= 0 -> index of sub-block # * value == -1 -> no value # * value < -1 -> encoded revision: rev = -(value+2) # # See REV_OFFSET and _transform_rev below. # # The implementation focus on simplicity, not on performance. A Rust # implementation should provide a efficient version of the same binary # persistence. This reference python implementation is never meant to be # extensively use in production. def persistent_data(index): """return the persistent binary form for a nodemap for a given index""" trie = _build_trie(index) return _persist_trie(trie) def update_persistent_data(index, root, max_idx, last_rev): """return the incremental update for persistent nodemap from a given index""" changed_block, trie = _update_trie(index, root, last_rev) return ( changed_block * S_BLOCK.size, _persist_trie(trie, existing_idx=max_idx), ) S_BLOCK = struct.Struct(">" + ("l" * 16)) NO_ENTRY = -1 # rev 0 need to be -2 because 0 is used by block, -1 is a special value. REV_OFFSET = 2 def _transform_rev(rev): """Return the number used to represent the rev in the tree. (or retrieve a rev number from such representation) Note that this is an involution, a function equal to its inverse (i.e. which gives the identity when applied to itself). """ return -(rev + REV_OFFSET) def _to_int(hex_digit): """turn an hexadecimal digit into a proper integer""" return int(hex_digit, 16) class Block(dict): """represent a block of the Trie contains up to 16 entry indexed from 0 to 15""" def __init__(self): super(Block, self).__init__() # If this block exist on disk, here is its ID self.ondisk_id = None def __iter__(self): return iter(self.get(i) for i in range(16)) def _build_trie(index): """build a nodemap trie The nodemap stores revision number for each unique prefix. Each block is a dictionary with keys in `[0, 15]`. Values are either another block or a revision number. """ root = Block() for rev in range(len(index)): current_hex = hex(index[rev][7]) _insert_into_block(index, 0, root, rev, current_hex) return root def _update_trie(index, root, last_rev): """consume""" changed = 0 for rev in range(last_rev + 1, len(index)): current_hex = hex(index[rev][7]) changed += _insert_into_block(index, 0, root, rev, current_hex) return changed, root def _insert_into_block(index, level, block, current_rev, current_hex): """insert a new revision in a block index: the index we are adding revision for level: the depth of the current block in the trie block: the block currently being considered current_rev: the revision number we are adding current_hex: the hexadecimal representation of the of that revision """ changed = 1 if block.ondisk_id is not None: block.ondisk_id = None hex_digit = _to_int(current_hex[level : level + 1]) entry = block.get(hex_digit) if entry is None: # no entry, simply store the revision number block[hex_digit] = current_rev elif isinstance(entry, dict): # need to recurse to an underlying block changed += _insert_into_block( index, level + 1, entry, current_rev, current_hex ) else: # collision with a previously unique prefix, inserting new # vertices to fit both entry. other_hex = hex(index[entry][7]) other_rev = entry new = Block() block[hex_digit] = new _insert_into_block(index, level + 1, new, other_rev, other_hex) _insert_into_block(index, level + 1, new, current_rev, current_hex) return changed def _persist_trie(root, existing_idx=None): """turn a nodemap trie into persistent binary data See `_build_trie` for nodemap trie structure""" block_map = {} if existing_idx is not None: base_idx = existing_idx + 1 else: base_idx = 0 chunks = [] for tn in _walk_trie(root): if tn.ondisk_id is not None: block_map[id(tn)] = tn.ondisk_id else: block_map[id(tn)] = len(chunks) + base_idx chunks.append(_persist_block(tn, block_map)) return b''.join(chunks) def _walk_trie(block): """yield all the block in a trie Children blocks are always yield before their parent block. """ for (__, item) in sorted(block.items()): if isinstance(item, dict): for sub_block in _walk_trie(item): yield sub_block yield block def _persist_block(block_node, block_map): """produce persistent binary data for a single block Children block are assumed to be already persisted and present in block_map. """ data = tuple(_to_value(v, block_map) for v in block_node) return S_BLOCK.pack(*data) def _to_value(item, block_map): """persist any value as an integer""" if item is None: return NO_ENTRY elif isinstance(item, dict): return block_map[id(item)] else: return _transform_rev(item) def parse_data(data): """parse parse nodemap data into a nodemap Trie""" if (len(data) % S_BLOCK.size) != 0: msg = b"nodemap data size is not a multiple of block size (%d): %d" raise error.Abort(msg % (S_BLOCK.size, len(data))) if not data: return Block(), None block_map = {} new_blocks = [] for i in range(0, len(data), S_BLOCK.size): block = Block() block.ondisk_id = len(block_map) block_map[block.ondisk_id] = block block_data = data[i : i + S_BLOCK.size] values = S_BLOCK.unpack(block_data) new_blocks.append((block, values)) for b, values in new_blocks: for idx, v in enumerate(values): if v == NO_ENTRY: continue elif v >= 0: b[idx] = block_map[v] else: b[idx] = _transform_rev(v) return block, i // S_BLOCK.size # debug utility def check_data(ui, index, data): """verify that the provided nodemap data are valid for the given idex""" ret = 0 ui.status((b"revisions in index: %d\n") % len(index)) root, __ = parse_data(data) all_revs = set(_all_revisions(root)) ui.status((b"revisions in nodemap: %d\n") % len(all_revs)) for r in range(len(index)): if r not in all_revs: msg = b" revision missing from nodemap: %d\n" % r ui.write_err(msg) ret = 1 else: all_revs.remove(r) nm_rev = _find_node(root, hex(index[r][7])) if nm_rev is None: msg = b" revision node does not match any entries: %d\n" % r ui.write_err(msg) ret = 1 elif nm_rev != r: msg = ( b" revision node does not match the expected revision: " b"%d != %d\n" % (r, nm_rev) ) ui.write_err(msg) ret = 1 if all_revs: for r in sorted(all_revs): msg = b" extra revisions in nodemap: %d\n" % r ui.write_err(msg) ret = 1 return ret def _all_revisions(root): """return all revisions stored in a Trie""" for block in _walk_trie(root): for v in block: if v is None or isinstance(v, Block): continue yield v def _find_node(block, node): """find the revision associated with a given node""" entry = block.get(_to_int(node[0:1])) if isinstance(entry, dict): return _find_node(entry, node[1:]) return entry def get_nodemap_file(revlog): if revlog._trypending: pending_path = revlog.radix + b".n.a" if revlog.opener.exists(pending_path): return pending_path return revlog.radix + b".n"