##// END OF EJS Templates
stream: create a `e.preserve_volatiles` method directly on StoreEntry...
stream: create a `e.preserve_volatiles` method directly on StoreEntry For cache, we will need an atomic operation that detect the file and preserve it, so we start by creating an API that make such atomic operation possible. We will actually make things atomic in the next changesets.

File last commit:

r52756:f4733654 default
r53254:a260d326 default
Show More
nodemap.py
671 lines | 21.6 KiB | text/x-python | PythonLexer
# nodemap.py - nodemap related code and utilities
#
# Copyright 2019 Pierre-Yves David <pierre-yves.david@octobus.net>
# Copyright 2019 George Racinet <georges.racinet@octobus.net>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
from __future__ import annotations
import re
import struct
from ..node import hex
from .. import (
error,
requirements,
util,
)
from . import docket as docket_mod
class NodeMap(dict):
def __missing__(self, x):
raise error.RevlogError(b'unknown node: %s' % x)
def test_race_hook_1():
"""hook point for test
This let tests to have things happens between the docket reading and the
data reading"""
pass
def post_stream_cleanup(repo):
"""The stream clone might needs to remove some file if persisten nodemap
was dropped while stream cloning
"""
if requirements.REVLOGV1_REQUIREMENT not in repo.requirements:
return
if requirements.NODEMAP_REQUIREMENT in repo.requirements:
return
unfi = repo.unfiltered()
delete_nodemap(None, unfi, unfi.changelog)
delete_nodemap(None, repo, unfi.manifestlog._rootstore._revlog)
def persisted_data(revlog):
"""read the nodemap for a revlog from disk"""
if revlog._nodemap_file is None:
return None
pdata = revlog.opener.tryread(revlog._nodemap_file)
if not pdata:
return None
offset = 0
(version,) = S_VERSION.unpack(pdata[offset : offset + S_VERSION.size])
if version != ONDISK_VERSION:
return None
offset += S_VERSION.size
headers = S_HEADER.unpack(pdata[offset : offset + S_HEADER.size])
uid_size, tip_rev, data_length, data_unused, tip_node_size = headers
offset += S_HEADER.size
docket = NodeMapDocket(pdata[offset : offset + uid_size])
offset += uid_size
docket.tip_rev = tip_rev
docket.tip_node = pdata[offset : offset + tip_node_size]
docket.data_length = data_length
docket.data_unused = data_unused
filename = _rawdata_filepath(revlog, docket)
use_mmap = revlog.opener.options.get(b"persistent-nodemap.mmap")
test_race_hook_1()
try:
with revlog.opener(filename) as fd:
if use_mmap and revlog.opener.is_mmap_safe(filename):
try:
data = util.buffer(util.mmapread(fd, data_length))
except ValueError:
# raised when the read file is too small
data = b''
else:
data = fd.read(data_length)
except FileNotFoundError:
return None
if len(data) < data_length:
return None
return docket, data
def setup_persistent_nodemap(tr, revlog):
"""Install whatever is needed transaction side to persist a nodemap on disk
(only actually persist the nodemap if this is relevant for this revlog)
"""
if revlog._inline:
return # inlined revlog are too small for this to be relevant
if revlog._nodemap_file is None:
return # we do not use persistent_nodemap on this revlog
# we need to happen after the changelog finalization, in that use "cl-"
callback_id = b"nm-revlog-persistent-nodemap-%s" % revlog._nodemap_file
if tr.hasfinalize(callback_id):
return # no need to register again
tr.addpending(
callback_id, lambda tr: persist_nodemap(tr, revlog, pending=True)
)
tr.addfinalize(callback_id, lambda tr: persist_nodemap(tr, revlog))
class _NoTransaction:
"""transaction like object to update the nodemap outside a transaction"""
def __init__(self):
self._postclose = {}
def addpostclose(self, callback_id, callback_func):
self._postclose[callback_id] = callback_func
def registertmp(self, *args, **kwargs):
pass
def addbackup(self, *args, **kwargs):
pass
def add(self, *args, **kwargs):
pass
def addabort(self, *args, **kwargs):
pass
def _report(self, *args):
pass
def update_persistent_nodemap(revlog):
"""update the persistent nodemap right now
To be used for updating the nodemap on disk outside of a normal transaction
setup (eg, `debugupdatecache`).
"""
if revlog._inline:
return # inlined revlog are too small for this to be relevant
if revlog._nodemap_file is None:
return # we do not use persistent_nodemap on this revlog
notr = _NoTransaction()
persist_nodemap(notr, revlog)
for k in sorted(notr._postclose):
notr._postclose[k](None)
def delete_nodemap(tr, repo, revlog):
"""Delete nodemap data on disk for a given revlog"""
prefix = revlog.radix
pattern = re.compile(br"(^|/)%s(-[0-9a-f]+\.nd|\.n(\.a)?)$" % prefix)
dirpath = revlog.opener.dirname(revlog._indexfile)
for f in revlog.opener.listdir(dirpath):
if pattern.match(f):
repo.svfs.tryunlink(f)
def persist_nodemap(tr, revlog, pending=False, force=False):
"""Write nodemap data on disk for a given revlog"""
if len(revlog.index) <= 0:
return
if getattr(revlog, 'filteredrevs', ()):
raise error.ProgrammingError(
"cannot persist nodemap of a filtered changelog"
)
if revlog._nodemap_file is None:
if force:
revlog._nodemap_file = get_nodemap_file(revlog)
else:
msg = "calling persist nodemap on a revlog without the feature enabled"
raise error.ProgrammingError(msg)
can_incremental = hasattr(revlog.index, "nodemap_data_incremental")
ondisk_docket = revlog._nodemap_docket
feed_data = hasattr(revlog.index, "update_nodemap_data")
use_mmap = revlog.opener.options.get(b"persistent-nodemap.mmap")
data = None
# first attemp an incremental update of the data
if can_incremental and ondisk_docket is not None:
target_docket = revlog._nodemap_docket.copy()
(
src_docket,
data_changed_count,
data,
) = revlog.index.nodemap_data_incremental()
new_length = target_docket.data_length + len(data)
new_unused = target_docket.data_unused + data_changed_count
if src_docket != target_docket:
data = None
elif new_length <= (new_unused * 10): # under 10% of unused data
data = None
else:
datafile = _rawdata_filepath(revlog, target_docket)
# EXP-TODO: if this is a cache, this should use a cache vfs, not a
# store vfs
tr.add(datafile, target_docket.data_length)
with revlog.opener(datafile, b'r+') as fd:
fd.seek(target_docket.data_length)
fd.write(data)
if feed_data:
if use_mmap and revlog.opener.is_mmap_safe(datafile):
fd.flush()
new_data = util.buffer(util.mmapread(fd, new_length))
else:
fd.seek(0)
new_data = fd.read(new_length)
target_docket.data_length = new_length
target_docket.data_unused = new_unused
if data is None:
# otherwise fallback to a full new export
target_docket = NodeMapDocket()
datafile = _rawdata_filepath(revlog, target_docket)
if hasattr(revlog.index, "nodemap_data_all"):
data = revlog.index.nodemap_data_all()
else:
data = persistent_data(revlog.index)
# EXP-TODO: if this is a cache, this should use a cache vfs, not a
# store vfs
tryunlink = revlog.opener.tryunlink
def abortck(tr):
tryunlink(datafile)
callback_id = b"delete-%s" % datafile
# some flavor of the transaction abort does not cleanup new file, it
# simply empty them.
tr.addabort(callback_id, abortck)
with revlog.opener(datafile, b'w+') as fd:
fd.write(data)
if feed_data:
if use_mmap and revlog.opener.is_mmap_safe(datafile):
fd.flush()
new_data = util.buffer(util.mmapread(fd, len(data)))
else:
new_data = data
target_docket.data_length = len(data)
target_docket.tip_rev = revlog.tiprev()
target_docket.tip_node = revlog.node(target_docket.tip_rev)
# EXP-TODO: if this is a cache, this should use a cache vfs, not a
# store vfs
file_path = revlog._nodemap_file
if pending:
file_path += b'.a'
tr.registertmp(file_path)
else:
tr.addbackup(file_path)
with revlog.opener(file_path, b'w', atomictemp=True) as fp:
fp.write(target_docket.serialize())
revlog._nodemap_docket = target_docket
if feed_data:
revlog.index.update_nodemap_data(target_docket, new_data)
# search for old index file in all cases, some older process might have
# left one behind.
olds = _other_rawdata_filepath(revlog, target_docket)
if olds:
realvfs = getattr(revlog, '_realopener', revlog.opener)
def cleanup(tr):
for oldfile in olds:
realvfs.tryunlink(oldfile)
callback_id = b"revlog-cleanup-nodemap-%s" % revlog._nodemap_file
tr.addpostclose(callback_id, cleanup)
### Nodemap docket file
#
# The nodemap data are stored on disk using 2 files:
#
# * a raw data files containing a persistent nodemap
# (see `Nodemap Trie` section)
#
# * a small "docket" file containing medatadata
#
# While the nodemap data can be multiple tens of megabytes, the "docket" is
# small, it is easy to update it automatically or to duplicated its content
# during a transaction.
#
# Multiple raw data can exist at the same time (The currently valid one and a
# new one beind used by an in progress transaction). To accomodate this, the
# filename hosting the raw data has a variable parts. The exact filename is
# specified inside the "docket" file.
#
# The docket file contains information to find, qualify and validate the raw
# data. Its content is currently very light, but it will expand as the on disk
# nodemap gains the necessary features to be used in production.
ONDISK_VERSION = 1
S_VERSION = struct.Struct(">B")
S_HEADER = struct.Struct(">BQQQQ")
class NodeMapDocket:
"""metadata associated with persistent nodemap data
The persistent data may come from disk or be on their way to disk.
"""
def __init__(self, uid=None):
if uid is None:
uid = docket_mod.make_uid()
# a unique identifier for the data file:
# - When new data are appended, it is preserved.
# - When a new data file is created, a new identifier is generated.
self.uid = uid
# the tipmost revision stored in the data file. This revision and all
# revision before it are expected to be encoded in the data file.
self.tip_rev = None
# the node of that tipmost revision, if it mismatch the current index
# data the docket is not valid for the current index and should be
# discarded.
#
# note: this method is not perfect as some destructive operation could
# preserve the same tip_rev + tip_node while altering lower revision.
# However this multiple other caches have the same vulnerability (eg:
# brancmap cache).
self.tip_node = None
# the size (in bytes) of the persisted data to encode the nodemap valid
# for `tip_rev`.
# - data file shorter than this are corrupted,
# - any extra data should be ignored.
self.data_length = None
# the amount (in bytes) of "dead" data, still in the data file but no
# longer used for the nodemap.
self.data_unused = 0
def copy(self):
new = NodeMapDocket(uid=self.uid)
new.tip_rev = self.tip_rev
new.tip_node = self.tip_node
new.data_length = self.data_length
new.data_unused = self.data_unused
return new
def __cmp__(self, other):
if self.uid < other.uid:
return -1
if self.uid > other.uid:
return 1
elif self.data_length < other.data_length:
return -1
elif self.data_length > other.data_length:
return 1
return 0
def __eq__(self, other):
return self.uid == other.uid and self.data_length == other.data_length
def serialize(self):
"""return serialized bytes for a docket using the passed uid"""
data = []
data.append(S_VERSION.pack(ONDISK_VERSION))
headers = (
len(self.uid),
self.tip_rev,
self.data_length,
self.data_unused,
len(self.tip_node),
)
data.append(S_HEADER.pack(*headers))
data.append(self.uid)
data.append(self.tip_node)
return b''.join(data)
def _rawdata_filepath(revlog, docket):
"""The (vfs relative) nodemap's rawdata file for a given uid"""
prefix = revlog.radix
return b"%s-%s.nd" % (prefix, docket.uid)
def _other_rawdata_filepath(revlog, docket):
prefix = revlog.radix
pattern = re.compile(br"(^|/)%s-[0-9a-f]+\.nd$" % prefix)
new_file_path = _rawdata_filepath(revlog, docket)
new_file_name = revlog.opener.basename(new_file_path)
dirpath = revlog.opener.dirname(new_file_path)
others = []
for f in revlog.opener.listdir(dirpath):
if pattern.match(f) and f != new_file_name:
others.append(f)
return others
### Nodemap Trie
#
# This is a simple reference implementation to compute and persist a nodemap
# trie. This reference implementation is write only. The python version of this
# is not expected to be actually used, since it wont provide performance
# improvement over existing non-persistent C implementation.
#
# The nodemap is persisted as Trie using 4bits-address/16-entries block. each
# revision can be adressed using its node shortest prefix.
#
# The trie is stored as a sequence of block. Each block contains 16 entries
# (signed 64bit integer, big endian). Each entry can be one of the following:
#
# * value >= 0 -> index of sub-block
# * value == -1 -> no value
# * value < -1 -> encoded revision: rev = -(value+2)
#
# See REV_OFFSET and _transform_rev below.
#
# The implementation focus on simplicity, not on performance. A Rust
# implementation should provide a efficient version of the same binary
# persistence. This reference python implementation is never meant to be
# extensively use in production.
def persistent_data(index):
"""return the persistent binary form for a nodemap for a given index"""
trie = _build_trie(index)
return _persist_trie(trie)
def update_persistent_data(index, root, max_idx, last_rev):
"""return the incremental update for persistent nodemap from a given index"""
changed_block, trie = _update_trie(index, root, last_rev)
return (
changed_block * S_BLOCK.size,
_persist_trie(trie, existing_idx=max_idx),
)
S_BLOCK = struct.Struct(">" + ("l" * 16))
NO_ENTRY = -1
# rev 0 need to be -2 because 0 is used by block, -1 is a special value.
REV_OFFSET = 2
def _transform_rev(rev):
"""Return the number used to represent the rev in the tree.
(or retrieve a rev number from such representation)
Note that this is an involution, a function equal to its inverse (i.e.
which gives the identity when applied to itself).
"""
return -(rev + REV_OFFSET)
def _to_int(hex_digit):
"""turn an hexadecimal digit into a proper integer"""
return int(hex_digit, 16)
class Block(dict):
"""represent a block of the Trie
contains up to 16 entry indexed from 0 to 15"""
def __init__(self):
super(Block, self).__init__()
# If this block exist on disk, here is its ID
self.ondisk_id = None
def __iter__(self):
return iter(self.get(i) for i in range(16))
def _build_trie(index):
"""build a nodemap trie
The nodemap stores revision number for each unique prefix.
Each block is a dictionary with keys in `[0, 15]`. Values are either
another block or a revision number.
"""
root = Block()
for rev in range(len(index)):
current_hex = hex(index[rev][7])
_insert_into_block(index, 0, root, rev, current_hex)
return root
def _update_trie(index, root, last_rev):
"""consume"""
changed = 0
for rev in range(last_rev + 1, len(index)):
current_hex = hex(index[rev][7])
changed += _insert_into_block(index, 0, root, rev, current_hex)
return changed, root
def _insert_into_block(index, level, block, current_rev, current_hex):
"""insert a new revision in a block
index: the index we are adding revision for
level: the depth of the current block in the trie
block: the block currently being considered
current_rev: the revision number we are adding
current_hex: the hexadecimal representation of the of that revision
"""
changed = 1
if block.ondisk_id is not None:
block.ondisk_id = None
hex_digit = _to_int(current_hex[level : level + 1])
entry = block.get(hex_digit)
if entry is None:
# no entry, simply store the revision number
block[hex_digit] = current_rev
elif isinstance(entry, dict):
# need to recurse to an underlying block
changed += _insert_into_block(
index, level + 1, entry, current_rev, current_hex
)
else:
# collision with a previously unique prefix, inserting new
# vertices to fit both entry.
other_hex = hex(index[entry][7])
other_rev = entry
new = Block()
block[hex_digit] = new
_insert_into_block(index, level + 1, new, other_rev, other_hex)
_insert_into_block(index, level + 1, new, current_rev, current_hex)
return changed
def _persist_trie(root, existing_idx=None):
"""turn a nodemap trie into persistent binary data
See `_build_trie` for nodemap trie structure"""
block_map = {}
if existing_idx is not None:
base_idx = existing_idx + 1
else:
base_idx = 0
chunks = []
for tn in _walk_trie(root):
if tn.ondisk_id is not None:
block_map[id(tn)] = tn.ondisk_id
else:
block_map[id(tn)] = len(chunks) + base_idx
chunks.append(_persist_block(tn, block_map))
return b''.join(chunks)
def _walk_trie(block):
"""yield all the block in a trie
Children blocks are always yield before their parent block.
"""
for __, item in sorted(block.items()):
if isinstance(item, dict):
for sub_block in _walk_trie(item):
yield sub_block
yield block
def _persist_block(block_node, block_map):
"""produce persistent binary data for a single block
Children block are assumed to be already persisted and present in
block_map.
"""
data = tuple(_to_value(v, block_map) for v in block_node)
return S_BLOCK.pack(*data)
def _to_value(item, block_map):
"""persist any value as an integer"""
if item is None:
return NO_ENTRY
elif isinstance(item, dict):
return block_map[id(item)]
else:
return _transform_rev(item)
def parse_data(data):
"""parse parse nodemap data into a nodemap Trie"""
if (len(data) % S_BLOCK.size) != 0:
msg = b"nodemap data size is not a multiple of block size (%d): %d"
raise error.Abort(msg % (S_BLOCK.size, len(data)))
if not data:
return Block(), None
block_map = {}
new_blocks = []
for i in range(0, len(data), S_BLOCK.size):
block = Block()
block.ondisk_id = len(block_map)
block_map[block.ondisk_id] = block
block_data = data[i : i + S_BLOCK.size]
values = S_BLOCK.unpack(block_data)
new_blocks.append((block, values))
for b, values in new_blocks:
for idx, v in enumerate(values):
if v == NO_ENTRY:
continue
elif v >= 0:
b[idx] = block_map[v]
else:
b[idx] = _transform_rev(v)
return block, i // S_BLOCK.size
# debug utility
def check_data(ui, index, data):
"""verify that the provided nodemap data are valid for the given idex"""
ret = 0
ui.status((b"revisions in index: %d\n") % len(index))
root, __ = parse_data(data)
all_revs = set(_all_revisions(root))
ui.status((b"revisions in nodemap: %d\n") % len(all_revs))
for r in range(len(index)):
if r not in all_revs:
msg = b" revision missing from nodemap: %d\n" % r
ui.write_err(msg)
ret = 1
else:
all_revs.remove(r)
nm_rev = _find_node(root, hex(index[r][7]))
if nm_rev is None:
msg = b" revision node does not match any entries: %d\n" % r
ui.write_err(msg)
ret = 1
elif nm_rev != r:
msg = (
b" revision node does not match the expected revision: "
b"%d != %d\n" % (r, nm_rev)
)
ui.write_err(msg)
ret = 1
if all_revs:
for r in sorted(all_revs):
msg = b" extra revisions in nodemap: %d\n" % r
ui.write_err(msg)
ret = 1
return ret
def _all_revisions(root):
"""return all revisions stored in a Trie"""
for block in _walk_trie(root):
for v in block:
if v is None or isinstance(v, Block):
continue
yield v
def _find_node(block, node):
"""find the revision associated with a given node"""
entry = block.get(_to_int(node[0:1]))
if isinstance(entry, dict):
return _find_node(entry, node[1:])
return entry
def get_nodemap_file(revlog):
if revlog._trypending:
pending_path = revlog.radix + b".n.a"
if revlog.opener.exists(pending_path):
return pending_path
return revlog.radix + b".n"