nodemap.py
554 lines
| 17.5 KiB
| text/x-python
|
PythonLexer
r44486 | # nodemap.py - nodemap related code and utilities | |||
# | ||||
# Copyright 2019 Pierre-Yves David <pierre-yves.david@octobus.net> | ||||
# Copyright 2019 George Racinet <georges.racinet@octobus.net> | ||||
# | ||||
# This software may be used and distributed according to the terms of the | ||||
# GNU General Public License version 2 or any later version. | ||||
from __future__ import absolute_import | ||||
r44788 | ||||
r44843 | import errno | |||
r44792 | import os | |||
r44793 | import re | |||
r44788 | import struct | |||
from .. import ( | ||||
error, | ||||
node as nodemod, | ||||
r44795 | util, | |||
r44788 | ) | |||
r44486 | ||||
class NodeMap(dict): | ||||
def __missing__(self, x): | ||||
raise error.RevlogError(b'unknown node: %s' % x) | ||||
r44788 | ||||
r44790 | def persisted_data(revlog): | |||
"""read the nodemap for a revlog from disk""" | ||||
if revlog.nodemap_file is None: | ||||
return None | ||||
r44792 | pdata = revlog.opener.tryread(revlog.nodemap_file) | |||
if not pdata: | ||||
return None | ||||
offset = 0 | ||||
(version,) = S_VERSION.unpack(pdata[offset : offset + S_VERSION.size]) | ||||
if version != ONDISK_VERSION: | ||||
return None | ||||
offset += S_VERSION.size | ||||
r44807 | headers = S_HEADER.unpack(pdata[offset : offset + S_HEADER.size]) | |||
r44808 | uid_size, tip_rev, data_length, data_unused = headers | |||
r44792 | offset += S_HEADER.size | |||
r44803 | docket = NodeMapDocket(pdata[offset : offset + uid_size]) | |||
r44807 | docket.tip_rev = tip_rev | |||
r44808 | docket.data_length = data_length | |||
docket.data_unused = data_unused | ||||
r44792 | ||||
r44803 | filename = _rawdata_filepath(revlog, docket) | |||
r44843 | use_mmap = revlog.opener.options.get("exp-persistent-nodemap.mmap") | |||
try: | ||||
with revlog.opener(filename) as fd: | ||||
if use_mmap: | ||||
data = util.buffer(util.mmapread(fd, data_length)) | ||||
else: | ||||
data = fd.read(data_length) | ||||
except OSError as e: | ||||
if e.errno != errno.ENOENT: | ||||
raise | ||||
r44811 | if len(data) < data_length: | |||
return None | ||||
return docket, data | ||||
r44790 | ||||
r44789 | def setup_persistent_nodemap(tr, revlog): | |||
"""Install whatever is needed transaction side to persist a nodemap on disk | ||||
(only actually persist the nodemap if this is relevant for this revlog) | ||||
""" | ||||
r44791 | if revlog._inline: | |||
return # inlined revlog are too small for this to be relevant | ||||
r44789 | if revlog.nodemap_file is None: | |||
return # we do not use persistent_nodemap on this revlog | ||||
callback_id = b"revlog-persistent-nodemap-%s" % revlog.nodemap_file | ||||
if tr.hasfinalize(callback_id): | ||||
return # no need to register again | ||||
r44932 | tr.addfinalize( | |||
callback_id, lambda tr: _persist_nodemap(tr.addpostclose, revlog) | ||||
) | ||||
r44789 | ||||
r44932 | def update_persistent_nodemap(revlog): | |||
"""update the persistent nodemap right now | ||||
To be used for updating the nodemap on disk outside of a normal transaction | ||||
setup (eg, `debugupdatecache`). | ||||
""" | ||||
cleanups = [] | ||||
_persist_nodemap((lambda x, y: cleanups.append(y)), revlog) | ||||
for c in cleanups: | ||||
c(None) | ||||
def _persist_nodemap(cleaner, revlog): | ||||
r44789 | """Write nodemap data on disk for a given revlog | |||
""" | ||||
if getattr(revlog, 'filteredrevs', ()): | ||||
raise error.ProgrammingError( | ||||
"cannot persist nodemap of a filtered changelog" | ||||
) | ||||
if revlog.nodemap_file is None: | ||||
msg = "calling persist nodemap on a revlog without the feature enableb" | ||||
raise error.ProgrammingError(msg) | ||||
r44805 | ||||
can_incremental = util.safehasattr(revlog.index, "nodemap_data_incremental") | ||||
ondisk_docket = revlog._nodemap_docket | ||||
r44843 | feed_data = util.safehasattr(revlog.index, "update_nodemap_data") | |||
use_mmap = revlog.opener.options.get("exp-persistent-nodemap.mmap") | ||||
r44805 | ||||
r44809 | data = None | |||
r44805 | # first attemp an incremental update of the data | |||
if can_incremental and ondisk_docket is not None: | ||||
target_docket = revlog._nodemap_docket.copy() | ||||
r44809 | ( | |||
src_docket, | ||||
data_changed_count, | ||||
data, | ||||
) = revlog.index.nodemap_data_incremental() | ||||
if src_docket != target_docket: | ||||
data = None | ||||
else: | ||||
datafile = _rawdata_filepath(revlog, target_docket) | ||||
# EXP-TODO: if this is a cache, this should use a cache vfs, not a | ||||
# store vfs | ||||
r44843 | new_length = target_docket.data_length + len(data) | |||
r44810 | with revlog.opener(datafile, b'r+') as fd: | |||
fd.seek(target_docket.data_length) | ||||
r44809 | fd.write(data) | |||
r44843 | if feed_data: | |||
if use_mmap: | ||||
fd.seek(0) | ||||
new_data = fd.read(new_length) | ||||
else: | ||||
fd.flush() | ||||
new_data = util.buffer(util.mmapread(fd, new_length)) | ||||
target_docket.data_length = new_length | ||||
r44809 | target_docket.data_unused += data_changed_count | |||
if data is None: | ||||
r44805 | # otherwise fallback to a full new export | |||
target_docket = NodeMapDocket() | ||||
datafile = _rawdata_filepath(revlog, target_docket) | ||||
if util.safehasattr(revlog.index, "nodemap_data_all"): | ||||
data = revlog.index.nodemap_data_all() | ||||
else: | ||||
data = persistent_data(revlog.index) | ||||
# EXP-TODO: if this is a cache, this should use a cache vfs, not a | ||||
# store vfs | ||||
r44843 | with revlog.opener(datafile, b'w+') as fd: | |||
r44805 | fd.write(data) | |||
r44843 | if feed_data: | |||
if use_mmap: | ||||
new_data = data | ||||
else: | ||||
fd.flush() | ||||
new_data = util.buffer(util.mmapread(fd, len(data))) | ||||
r44808 | target_docket.data_length = len(data) | |||
r44807 | target_docket.tip_rev = revlog.tiprev() | |||
r44805 | # EXP-TODO: if this is a cache, this should use a cache vfs, not a | |||
# store vfs | ||||
with revlog.opener(revlog.nodemap_file, b'w', atomictemp=True) as fp: | ||||
fp.write(target_docket.serialize()) | ||||
revlog._nodemap_docket = target_docket | ||||
r44843 | if feed_data: | |||
r44812 | revlog.index.update_nodemap_data(target_docket, new_data) | |||
r44805 | # EXP-TODO: if the transaction abort, we should remove the new data and | |||
# reinstall the old one. | ||||
# search for old index file in all cases, some older process might have | ||||
# left one behind. | ||||
r44803 | olds = _other_rawdata_filepath(revlog, target_docket) | |||
r44793 | if olds: | |||
realvfs = getattr(revlog, '_realopener', revlog.opener) | ||||
def cleanup(tr): | ||||
for oldfile in olds: | ||||
realvfs.tryunlink(oldfile) | ||||
callback_id = b"revlog-cleanup-nodemap-%s" % revlog.nodemap_file | ||||
r44932 | cleaner(callback_id, cleanup) | |||
r44792 | ||||
### Nodemap docket file | ||||
# | ||||
# The nodemap data are stored on disk using 2 files: | ||||
# | ||||
# * a raw data files containing a persistent nodemap | ||||
# (see `Nodemap Trie` section) | ||||
# | ||||
# * a small "docket" file containing medatadata | ||||
# | ||||
# While the nodemap data can be multiple tens of megabytes, the "docket" is | ||||
# small, it is easy to update it automatically or to duplicated its content | ||||
# during a transaction. | ||||
# | ||||
# Multiple raw data can exist at the same time (The currently valid one and a | ||||
# new one beind used by an in progress transaction). To accomodate this, the | ||||
# filename hosting the raw data has a variable parts. The exact filename is | ||||
# specified inside the "docket" file. | ||||
# | ||||
# The docket file contains information to find, qualify and validate the raw | ||||
# data. Its content is currently very light, but it will expand as the on disk | ||||
# nodemap gains the necessary features to be used in production. | ||||
# version 0 is experimental, no BC garantee, do no use outside of tests. | ||||
ONDISK_VERSION = 0 | ||||
S_VERSION = struct.Struct(">B") | ||||
r44808 | S_HEADER = struct.Struct(">BQQQ") | |||
r44792 | ||||
ID_SIZE = 8 | ||||
def _make_uid(): | ||||
"""return a new unique identifier. | ||||
The identifier is random and composed of ascii characters.""" | ||||
return nodemod.hex(os.urandom(ID_SIZE)) | ||||
r44803 | class NodeMapDocket(object): | |||
"""metadata associated with persistent nodemap data | ||||
The persistent data may come from disk or be on their way to disk. | ||||
""" | ||||
def __init__(self, uid=None): | ||||
if uid is None: | ||||
uid = _make_uid() | ||||
self.uid = uid | ||||
r44807 | self.tip_rev = None | |||
r44808 | self.data_length = None | |||
self.data_unused = 0 | ||||
r44803 | ||||
def copy(self): | ||||
r44807 | new = NodeMapDocket(uid=self.uid) | |||
new.tip_rev = self.tip_rev | ||||
r44808 | new.data_length = self.data_length | |||
new.data_unused = self.data_unused | ||||
r44807 | return new | |||
r44803 | ||||
r44809 | def __cmp__(self, other): | |||
if self.uid < other.uid: | ||||
return -1 | ||||
if self.uid > other.uid: | ||||
return 1 | ||||
elif self.data_length < other.data_length: | ||||
return -1 | ||||
elif self.data_length > other.data_length: | ||||
return 1 | ||||
return 0 | ||||
def __eq__(self, other): | ||||
return self.uid == other.uid and self.data_length == other.data_length | ||||
r44803 | def serialize(self): | |||
"""return serialized bytes for a docket using the passed uid""" | ||||
data = [] | ||||
data.append(S_VERSION.pack(ONDISK_VERSION)) | ||||
r44808 | headers = ( | |||
len(self.uid), | ||||
self.tip_rev, | ||||
self.data_length, | ||||
self.data_unused, | ||||
) | ||||
r44807 | data.append(S_HEADER.pack(*headers)) | |||
r44803 | data.append(self.uid) | |||
return b''.join(data) | ||||
r44792 | ||||
r44803 | def _rawdata_filepath(revlog, docket): | |||
r44792 | """The (vfs relative) nodemap's rawdata file for a given uid""" | |||
prefix = revlog.nodemap_file[:-2] | ||||
r44803 | return b"%s-%s.nd" % (prefix, docket.uid) | |||
r44789 | ||||
r44803 | def _other_rawdata_filepath(revlog, docket): | |||
r44793 | prefix = revlog.nodemap_file[:-2] | |||
Augie Fackler
|
r44952 | pattern = re.compile(br"(^|/)%s-[0-9a-f]+\.nd$" % prefix) | ||
r44803 | new_file_path = _rawdata_filepath(revlog, docket) | |||
r44793 | new_file_name = revlog.opener.basename(new_file_path) | |||
dirpath = revlog.opener.dirname(new_file_path) | ||||
others = [] | ||||
for f in revlog.opener.listdir(dirpath): | ||||
if pattern.match(f) and f != new_file_name: | ||||
others.append(f) | ||||
return others | ||||
r44788 | ### Nodemap Trie | |||
# | ||||
# This is a simple reference implementation to compute and persist a nodemap | ||||
# trie. This reference implementation is write only. The python version of this | ||||
# is not expected to be actually used, since it wont provide performance | ||||
# improvement over existing non-persistent C implementation. | ||||
# | ||||
# The nodemap is persisted as Trie using 4bits-address/16-entries block. each | ||||
# revision can be adressed using its node shortest prefix. | ||||
# | ||||
# The trie is stored as a sequence of block. Each block contains 16 entries | ||||
# (signed 64bit integer, big endian). Each entry can be one of the following: | ||||
# | ||||
# * value >= 0 -> index of sub-block | ||||
# * value == -1 -> no value | ||||
# * value < -1 -> a revision value: rev = -(value+10) | ||||
# | ||||
# The implementation focus on simplicity, not on performance. A Rust | ||||
# implementation should provide a efficient version of the same binary | ||||
# persistence. This reference python implementation is never meant to be | ||||
# extensively use in production. | ||||
def persistent_data(index): | ||||
"""return the persistent binary form for a nodemap for a given index | ||||
""" | ||||
trie = _build_trie(index) | ||||
return _persist_trie(trie) | ||||
r44805 | def update_persistent_data(index, root, max_idx, last_rev): | |||
"""return the incremental update for persistent nodemap from a given index | ||||
""" | ||||
r44808 | changed_block, trie = _update_trie(index, root, last_rev) | |||
return ( | ||||
changed_block * S_BLOCK.size, | ||||
_persist_trie(trie, existing_idx=max_idx), | ||||
) | ||||
r44805 | ||||
r44788 | S_BLOCK = struct.Struct(">" + ("l" * 16)) | |||
NO_ENTRY = -1 | ||||
# rev 0 need to be -2 because 0 is used by block, -1 is a special value. | ||||
REV_OFFSET = 2 | ||||
def _transform_rev(rev): | ||||
"""Return the number used to represent the rev in the tree. | ||||
(or retrieve a rev number from such representation) | ||||
Note that this is an involution, a function equal to its inverse (i.e. | ||||
which gives the identity when applied to itself). | ||||
""" | ||||
return -(rev + REV_OFFSET) | ||||
def _to_int(hex_digit): | ||||
"""turn an hexadecimal digit into a proper integer""" | ||||
return int(hex_digit, 16) | ||||
r44796 | class Block(dict): | |||
"""represent a block of the Trie | ||||
contains up to 16 entry indexed from 0 to 15""" | ||||
r44802 | def __init__(self): | |||
super(Block, self).__init__() | ||||
# If this block exist on disk, here is its ID | ||||
self.ondisk_id = None | ||||
r44797 | def __iter__(self): | |||
return iter(self.get(i) for i in range(16)) | ||||
r44796 | ||||
r44788 | def _build_trie(index): | |||
"""build a nodemap trie | ||||
The nodemap stores revision number for each unique prefix. | ||||
Each block is a dictionary with keys in `[0, 15]`. Values are either | ||||
another block or a revision number. | ||||
""" | ||||
r44796 | root = Block() | |||
r44788 | for rev in range(len(index)): | |||
hex = nodemod.hex(index[rev][7]) | ||||
_insert_into_block(index, 0, root, rev, hex) | ||||
return root | ||||
r44805 | def _update_trie(index, root, last_rev): | |||
"""consume""" | ||||
r44808 | changed = 0 | |||
r44805 | for rev in range(last_rev + 1, len(index)): | |||
hex = nodemod.hex(index[rev][7]) | ||||
r44808 | changed += _insert_into_block(index, 0, root, rev, hex) | |||
return changed, root | ||||
r44805 | ||||
r44788 | def _insert_into_block(index, level, block, current_rev, current_hex): | |||
"""insert a new revision in a block | ||||
index: the index we are adding revision for | ||||
level: the depth of the current block in the trie | ||||
block: the block currently being considered | ||||
current_rev: the revision number we are adding | ||||
current_hex: the hexadecimal representation of the of that revision | ||||
""" | ||||
r44808 | changed = 1 | |||
r44805 | if block.ondisk_id is not None: | |||
block.ondisk_id = None | ||||
r44788 | hex_digit = _to_int(current_hex[level : level + 1]) | |||
entry = block.get(hex_digit) | ||||
if entry is None: | ||||
# no entry, simply store the revision number | ||||
block[hex_digit] = current_rev | ||||
elif isinstance(entry, dict): | ||||
# need to recurse to an underlying block | ||||
r44808 | changed += _insert_into_block( | |||
index, level + 1, entry, current_rev, current_hex | ||||
) | ||||
r44788 | else: | |||
# collision with a previously unique prefix, inserting new | ||||
# vertices to fit both entry. | ||||
other_hex = nodemod.hex(index[entry][7]) | ||||
other_rev = entry | ||||
r44796 | new = Block() | |||
r44788 | block[hex_digit] = new | |||
_insert_into_block(index, level + 1, new, other_rev, other_hex) | ||||
_insert_into_block(index, level + 1, new, current_rev, current_hex) | ||||
r44808 | return changed | |||
r44788 | ||||
r44805 | def _persist_trie(root, existing_idx=None): | |||
r44788 | """turn a nodemap trie into persistent binary data | |||
See `_build_trie` for nodemap trie structure""" | ||||
block_map = {} | ||||
r44805 | if existing_idx is not None: | |||
base_idx = existing_idx + 1 | ||||
else: | ||||
base_idx = 0 | ||||
r44788 | chunks = [] | |||
for tn in _walk_trie(root): | ||||
r44805 | if tn.ondisk_id is not None: | |||
block_map[id(tn)] = tn.ondisk_id | ||||
else: | ||||
block_map[id(tn)] = len(chunks) + base_idx | ||||
chunks.append(_persist_block(tn, block_map)) | ||||
r44788 | return b''.join(chunks) | |||
def _walk_trie(block): | ||||
"""yield all the block in a trie | ||||
Children blocks are always yield before their parent block. | ||||
""" | ||||
for (_, item) in sorted(block.items()): | ||||
if isinstance(item, dict): | ||||
for sub_block in _walk_trie(item): | ||||
yield sub_block | ||||
yield block | ||||
def _persist_block(block_node, block_map): | ||||
"""produce persistent binary data for a single block | ||||
Children block are assumed to be already persisted and present in | ||||
block_map. | ||||
""" | ||||
r44797 | data = tuple(_to_value(v, block_map) for v in block_node) | |||
r44788 | return S_BLOCK.pack(*data) | |||
def _to_value(item, block_map): | ||||
"""persist any value as an integer""" | ||||
if item is None: | ||||
return NO_ENTRY | ||||
elif isinstance(item, dict): | ||||
return block_map[id(item)] | ||||
else: | ||||
return _transform_rev(item) | ||||
r44798 | ||||
def parse_data(data): | ||||
"""parse parse nodemap data into a nodemap Trie""" | ||||
if (len(data) % S_BLOCK.size) != 0: | ||||
msg = "nodemap data size is not a multiple of block size (%d): %d" | ||||
raise error.Abort(msg % (S_BLOCK.size, len(data))) | ||||
if not data: | ||||
r44805 | return Block(), None | |||
r44798 | block_map = {} | |||
new_blocks = [] | ||||
for i in range(0, len(data), S_BLOCK.size): | ||||
block = Block() | ||||
r44802 | block.ondisk_id = len(block_map) | |||
block_map[block.ondisk_id] = block | ||||
r44798 | block_data = data[i : i + S_BLOCK.size] | |||
values = S_BLOCK.unpack(block_data) | ||||
new_blocks.append((block, values)) | ||||
for b, values in new_blocks: | ||||
for idx, v in enumerate(values): | ||||
if v == NO_ENTRY: | ||||
continue | ||||
elif v >= 0: | ||||
b[idx] = block_map[v] | ||||
else: | ||||
b[idx] = _transform_rev(v) | ||||
r44805 | return block, i // S_BLOCK.size | |||
r44799 | ||||
# debug utility | ||||
def check_data(ui, index, data): | ||||
"""verify that the provided nodemap data are valid for the given idex""" | ||||
ret = 0 | ||||
ui.status((b"revision in index: %d\n") % len(index)) | ||||
r44805 | root, __ = parse_data(data) | |||
r44799 | all_revs = set(_all_revisions(root)) | |||
ui.status((b"revision in nodemap: %d\n") % len(all_revs)) | ||||
for r in range(len(index)): | ||||
if r not in all_revs: | ||||
msg = b" revision missing from nodemap: %d\n" % r | ||||
ui.write_err(msg) | ||||
ret = 1 | ||||
else: | ||||
all_revs.remove(r) | ||||
r44800 | nm_rev = _find_node(root, nodemod.hex(index[r][7])) | |||
if nm_rev is None: | ||||
msg = b" revision node does not match any entries: %d\n" % r | ||||
ui.write_err(msg) | ||||
ret = 1 | ||||
elif nm_rev != r: | ||||
msg = ( | ||||
b" revision node does not match the expected revision: " | ||||
b"%d != %d\n" % (r, nm_rev) | ||||
) | ||||
ui.write_err(msg) | ||||
ret = 1 | ||||
r44799 | if all_revs: | |||
for r in sorted(all_revs): | ||||
msg = b" extra revision in nodemap: %d\n" % r | ||||
ui.write_err(msg) | ||||
ret = 1 | ||||
return ret | ||||
def _all_revisions(root): | ||||
"""return all revisions stored in a Trie""" | ||||
for block in _walk_trie(root): | ||||
for v in block: | ||||
if v is None or isinstance(v, Block): | ||||
continue | ||||
yield v | ||||
r44800 | ||||
def _find_node(block, node): | ||||
"""find the revision associated with a given node""" | ||||
entry = block.get(_to_int(node[0:1])) | ||||
if isinstance(entry, dict): | ||||
return _find_node(entry, node[1:]) | ||||
return entry | ||||