# nodemap.py - nodemap related code and utilities # # Copyright 2019 Pierre-Yves David # Copyright 2019 George Racinet # # This software may be used and distributed according to the terms of the # GNU General Public License version 2 or any later version. from __future__ import absolute_import import errno import os import re import struct from .. import ( error, node as nodemod, util, ) class NodeMap(dict): def __missing__(self, x): raise error.RevlogError(b'unknown node: %s' % x) def persisted_data(revlog): """read the nodemap for a revlog from disk""" if revlog.nodemap_file is None: return None pdata = revlog.opener.tryread(revlog.nodemap_file) if not pdata: return None offset = 0 (version,) = S_VERSION.unpack(pdata[offset : offset + S_VERSION.size]) if version != ONDISK_VERSION: return None offset += S_VERSION.size headers = S_HEADER.unpack(pdata[offset : offset + S_HEADER.size]) uid_size, tip_rev, data_length, data_unused = headers offset += S_HEADER.size docket = NodeMapDocket(pdata[offset : offset + uid_size]) docket.tip_rev = tip_rev docket.data_length = data_length docket.data_unused = data_unused filename = _rawdata_filepath(revlog, docket) use_mmap = revlog.opener.options.get("exp-persistent-nodemap.mmap") try: with revlog.opener(filename) as fd: if use_mmap: data = util.buffer(util.mmapread(fd, data_length)) else: data = fd.read(data_length) except OSError as e: if e.errno != errno.ENOENT: raise if len(data) < data_length: return None return docket, data def setup_persistent_nodemap(tr, revlog): """Install whatever is needed transaction side to persist a nodemap on disk (only actually persist the nodemap if this is relevant for this revlog) """ if revlog._inline: return # inlined revlog are too small for this to be relevant if revlog.nodemap_file is None: return # we do not use persistent_nodemap on this revlog callback_id = b"revlog-persistent-nodemap-%s" % revlog.nodemap_file if tr.hasfinalize(callback_id): return # no need to register again tr.addfinalize( callback_id, lambda tr: _persist_nodemap(tr.addpostclose, revlog) ) def update_persistent_nodemap(revlog): """update the persistent nodemap right now To be used for updating the nodemap on disk outside of a normal transaction setup (eg, `debugupdatecache`). """ cleanups = [] _persist_nodemap((lambda x, y: cleanups.append(y)), revlog) for c in cleanups: c(None) def _persist_nodemap(cleaner, revlog): """Write nodemap data on disk for a given revlog """ if getattr(revlog, 'filteredrevs', ()): raise error.ProgrammingError( "cannot persist nodemap of a filtered changelog" ) if revlog.nodemap_file is None: msg = "calling persist nodemap on a revlog without the feature enableb" raise error.ProgrammingError(msg) can_incremental = util.safehasattr(revlog.index, "nodemap_data_incremental") ondisk_docket = revlog._nodemap_docket feed_data = util.safehasattr(revlog.index, "update_nodemap_data") use_mmap = revlog.opener.options.get("exp-persistent-nodemap.mmap") data = None # first attemp an incremental update of the data if can_incremental and ondisk_docket is not None: target_docket = revlog._nodemap_docket.copy() ( src_docket, data_changed_count, data, ) = revlog.index.nodemap_data_incremental() if src_docket != target_docket: data = None else: datafile = _rawdata_filepath(revlog, target_docket) # EXP-TODO: if this is a cache, this should use a cache vfs, not a # store vfs new_length = target_docket.data_length + len(data) with revlog.opener(datafile, b'r+') as fd: fd.seek(target_docket.data_length) fd.write(data) if feed_data: if use_mmap: fd.seek(0) new_data = fd.read(new_length) else: fd.flush() new_data = util.buffer(util.mmapread(fd, new_length)) target_docket.data_length = new_length target_docket.data_unused += data_changed_count if data is None: # otherwise fallback to a full new export target_docket = NodeMapDocket() datafile = _rawdata_filepath(revlog, target_docket) if util.safehasattr(revlog.index, "nodemap_data_all"): data = revlog.index.nodemap_data_all() else: data = persistent_data(revlog.index) # EXP-TODO: if this is a cache, this should use a cache vfs, not a # store vfs with revlog.opener(datafile, b'w+') as fd: fd.write(data) if feed_data: if use_mmap: new_data = data else: fd.flush() new_data = util.buffer(util.mmapread(fd, len(data))) target_docket.data_length = len(data) target_docket.tip_rev = revlog.tiprev() # EXP-TODO: if this is a cache, this should use a cache vfs, not a # store vfs with revlog.opener(revlog.nodemap_file, b'w', atomictemp=True) as fp: fp.write(target_docket.serialize()) revlog._nodemap_docket = target_docket if feed_data: revlog.index.update_nodemap_data(target_docket, new_data) # EXP-TODO: if the transaction abort, we should remove the new data and # reinstall the old one. # search for old index file in all cases, some older process might have # left one behind. olds = _other_rawdata_filepath(revlog, target_docket) if olds: realvfs = getattr(revlog, '_realopener', revlog.opener) def cleanup(tr): for oldfile in olds: realvfs.tryunlink(oldfile) callback_id = b"revlog-cleanup-nodemap-%s" % revlog.nodemap_file cleaner(callback_id, cleanup) ### Nodemap docket file # # The nodemap data are stored on disk using 2 files: # # * a raw data files containing a persistent nodemap # (see `Nodemap Trie` section) # # * a small "docket" file containing medatadata # # While the nodemap data can be multiple tens of megabytes, the "docket" is # small, it is easy to update it automatically or to duplicated its content # during a transaction. # # Multiple raw data can exist at the same time (The currently valid one and a # new one beind used by an in progress transaction). To accomodate this, the # filename hosting the raw data has a variable parts. The exact filename is # specified inside the "docket" file. # # The docket file contains information to find, qualify and validate the raw # data. Its content is currently very light, but it will expand as the on disk # nodemap gains the necessary features to be used in production. # version 0 is experimental, no BC garantee, do no use outside of tests. ONDISK_VERSION = 0 S_VERSION = struct.Struct(">B") S_HEADER = struct.Struct(">BQQQ") ID_SIZE = 8 def _make_uid(): """return a new unique identifier. The identifier is random and composed of ascii characters.""" return nodemod.hex(os.urandom(ID_SIZE)) class NodeMapDocket(object): """metadata associated with persistent nodemap data The persistent data may come from disk or be on their way to disk. """ def __init__(self, uid=None): if uid is None: uid = _make_uid() self.uid = uid self.tip_rev = None self.data_length = None self.data_unused = 0 def copy(self): new = NodeMapDocket(uid=self.uid) new.tip_rev = self.tip_rev new.data_length = self.data_length new.data_unused = self.data_unused return new def __cmp__(self, other): if self.uid < other.uid: return -1 if self.uid > other.uid: return 1 elif self.data_length < other.data_length: return -1 elif self.data_length > other.data_length: return 1 return 0 def __eq__(self, other): return self.uid == other.uid and self.data_length == other.data_length def serialize(self): """return serialized bytes for a docket using the passed uid""" data = [] data.append(S_VERSION.pack(ONDISK_VERSION)) headers = ( len(self.uid), self.tip_rev, self.data_length, self.data_unused, ) data.append(S_HEADER.pack(*headers)) data.append(self.uid) return b''.join(data) def _rawdata_filepath(revlog, docket): """The (vfs relative) nodemap's rawdata file for a given uid""" prefix = revlog.nodemap_file[:-2] return b"%s-%s.nd" % (prefix, docket.uid) def _other_rawdata_filepath(revlog, docket): prefix = revlog.nodemap_file[:-2] pattern = re.compile(br"(^|/)%s-[0-9a-f]+\.nd$" % prefix) new_file_path = _rawdata_filepath(revlog, docket) new_file_name = revlog.opener.basename(new_file_path) dirpath = revlog.opener.dirname(new_file_path) others = [] for f in revlog.opener.listdir(dirpath): if pattern.match(f) and f != new_file_name: others.append(f) return others ### Nodemap Trie # # This is a simple reference implementation to compute and persist a nodemap # trie. This reference implementation is write only. The python version of this # is not expected to be actually used, since it wont provide performance # improvement over existing non-persistent C implementation. # # The nodemap is persisted as Trie using 4bits-address/16-entries block. each # revision can be adressed using its node shortest prefix. # # The trie is stored as a sequence of block. Each block contains 16 entries # (signed 64bit integer, big endian). Each entry can be one of the following: # # * value >= 0 -> index of sub-block # * value == -1 -> no value # * value < -1 -> a revision value: rev = -(value+10) # # The implementation focus on simplicity, not on performance. A Rust # implementation should provide a efficient version of the same binary # persistence. This reference python implementation is never meant to be # extensively use in production. def persistent_data(index): """return the persistent binary form for a nodemap for a given index """ trie = _build_trie(index) return _persist_trie(trie) def update_persistent_data(index, root, max_idx, last_rev): """return the incremental update for persistent nodemap from a given index """ changed_block, trie = _update_trie(index, root, last_rev) return ( changed_block * S_BLOCK.size, _persist_trie(trie, existing_idx=max_idx), ) S_BLOCK = struct.Struct(">" + ("l" * 16)) NO_ENTRY = -1 # rev 0 need to be -2 because 0 is used by block, -1 is a special value. REV_OFFSET = 2 def _transform_rev(rev): """Return the number used to represent the rev in the tree. (or retrieve a rev number from such representation) Note that this is an involution, a function equal to its inverse (i.e. which gives the identity when applied to itself). """ return -(rev + REV_OFFSET) def _to_int(hex_digit): """turn an hexadecimal digit into a proper integer""" return int(hex_digit, 16) class Block(dict): """represent a block of the Trie contains up to 16 entry indexed from 0 to 15""" def __init__(self): super(Block, self).__init__() # If this block exist on disk, here is its ID self.ondisk_id = None def __iter__(self): return iter(self.get(i) for i in range(16)) def _build_trie(index): """build a nodemap trie The nodemap stores revision number for each unique prefix. Each block is a dictionary with keys in `[0, 15]`. Values are either another block or a revision number. """ root = Block() for rev in range(len(index)): hex = nodemod.hex(index[rev][7]) _insert_into_block(index, 0, root, rev, hex) return root def _update_trie(index, root, last_rev): """consume""" changed = 0 for rev in range(last_rev + 1, len(index)): hex = nodemod.hex(index[rev][7]) changed += _insert_into_block(index, 0, root, rev, hex) return changed, root def _insert_into_block(index, level, block, current_rev, current_hex): """insert a new revision in a block index: the index we are adding revision for level: the depth of the current block in the trie block: the block currently being considered current_rev: the revision number we are adding current_hex: the hexadecimal representation of the of that revision """ changed = 1 if block.ondisk_id is not None: block.ondisk_id = None hex_digit = _to_int(current_hex[level : level + 1]) entry = block.get(hex_digit) if entry is None: # no entry, simply store the revision number block[hex_digit] = current_rev elif isinstance(entry, dict): # need to recurse to an underlying block changed += _insert_into_block( index, level + 1, entry, current_rev, current_hex ) else: # collision with a previously unique prefix, inserting new # vertices to fit both entry. other_hex = nodemod.hex(index[entry][7]) other_rev = entry new = Block() block[hex_digit] = new _insert_into_block(index, level + 1, new, other_rev, other_hex) _insert_into_block(index, level + 1, new, current_rev, current_hex) return changed def _persist_trie(root, existing_idx=None): """turn a nodemap trie into persistent binary data See `_build_trie` for nodemap trie structure""" block_map = {} if existing_idx is not None: base_idx = existing_idx + 1 else: base_idx = 0 chunks = [] for tn in _walk_trie(root): if tn.ondisk_id is not None: block_map[id(tn)] = tn.ondisk_id else: block_map[id(tn)] = len(chunks) + base_idx chunks.append(_persist_block(tn, block_map)) return b''.join(chunks) def _walk_trie(block): """yield all the block in a trie Children blocks are always yield before their parent block. """ for (_, item) in sorted(block.items()): if isinstance(item, dict): for sub_block in _walk_trie(item): yield sub_block yield block def _persist_block(block_node, block_map): """produce persistent binary data for a single block Children block are assumed to be already persisted and present in block_map. """ data = tuple(_to_value(v, block_map) for v in block_node) return S_BLOCK.pack(*data) def _to_value(item, block_map): """persist any value as an integer""" if item is None: return NO_ENTRY elif isinstance(item, dict): return block_map[id(item)] else: return _transform_rev(item) def parse_data(data): """parse parse nodemap data into a nodemap Trie""" if (len(data) % S_BLOCK.size) != 0: msg = "nodemap data size is not a multiple of block size (%d): %d" raise error.Abort(msg % (S_BLOCK.size, len(data))) if not data: return Block(), None block_map = {} new_blocks = [] for i in range(0, len(data), S_BLOCK.size): block = Block() block.ondisk_id = len(block_map) block_map[block.ondisk_id] = block block_data = data[i : i + S_BLOCK.size] values = S_BLOCK.unpack(block_data) new_blocks.append((block, values)) for b, values in new_blocks: for idx, v in enumerate(values): if v == NO_ENTRY: continue elif v >= 0: b[idx] = block_map[v] else: b[idx] = _transform_rev(v) return block, i // S_BLOCK.size # debug utility def check_data(ui, index, data): """verify that the provided nodemap data are valid for the given idex""" ret = 0 ui.status((b"revision in index: %d\n") % len(index)) root, __ = parse_data(data) all_revs = set(_all_revisions(root)) ui.status((b"revision in nodemap: %d\n") % len(all_revs)) for r in range(len(index)): if r not in all_revs: msg = b" revision missing from nodemap: %d\n" % r ui.write_err(msg) ret = 1 else: all_revs.remove(r) nm_rev = _find_node(root, nodemod.hex(index[r][7])) if nm_rev is None: msg = b" revision node does not match any entries: %d\n" % r ui.write_err(msg) ret = 1 elif nm_rev != r: msg = ( b" revision node does not match the expected revision: " b"%d != %d\n" % (r, nm_rev) ) ui.write_err(msg) ret = 1 if all_revs: for r in sorted(all_revs): msg = b" extra revision in nodemap: %d\n" % r ui.write_err(msg) ret = 1 return ret def _all_revisions(root): """return all revisions stored in a Trie""" for block in _walk_trie(root): for v in block: if v is None or isinstance(v, Block): continue yield v def _find_node(block, node): """find the revision associated with a given node""" entry = block.get(_to_int(node[0:1])) if isinstance(entry, dict): return _find_node(entry, node[1:]) return entry