##// END OF EJS Templates
git: key off `git` in .hg/requires rather than separate file...
git: key off `git` in .hg/requires rather than separate file Differential Revision: https://phab.mercurial-scm.org/D8265

File last commit:

r44952:6aee0647 default
r44977:02c47b74 default
Show More
nodemap.py
554 lines | 17.5 KiB | text/x-python | PythonLexer
# nodemap.py - nodemap related code and utilities
#
# Copyright 2019 Pierre-Yves David <pierre-yves.david@octobus.net>
# Copyright 2019 George Racinet <georges.racinet@octobus.net>
#
# This software may be used and distributed according to the terms of the
# GNU General Public License version 2 or any later version.
from __future__ import absolute_import
import errno
import os
import re
import struct
from .. import (
error,
node as nodemod,
util,
)
class NodeMap(dict):
def __missing__(self, x):
raise error.RevlogError(b'unknown node: %s' % x)
def persisted_data(revlog):
"""read the nodemap for a revlog from disk"""
if revlog.nodemap_file is None:
return None
pdata = revlog.opener.tryread(revlog.nodemap_file)
if not pdata:
return None
offset = 0
(version,) = S_VERSION.unpack(pdata[offset : offset + S_VERSION.size])
if version != ONDISK_VERSION:
return None
offset += S_VERSION.size
headers = S_HEADER.unpack(pdata[offset : offset + S_HEADER.size])
uid_size, tip_rev, data_length, data_unused = headers
offset += S_HEADER.size
docket = NodeMapDocket(pdata[offset : offset + uid_size])
docket.tip_rev = tip_rev
docket.data_length = data_length
docket.data_unused = data_unused
filename = _rawdata_filepath(revlog, docket)
use_mmap = revlog.opener.options.get("exp-persistent-nodemap.mmap")
try:
with revlog.opener(filename) as fd:
if use_mmap:
data = util.buffer(util.mmapread(fd, data_length))
else:
data = fd.read(data_length)
except OSError as e:
if e.errno != errno.ENOENT:
raise
if len(data) < data_length:
return None
return docket, data
def setup_persistent_nodemap(tr, revlog):
"""Install whatever is needed transaction side to persist a nodemap on disk
(only actually persist the nodemap if this is relevant for this revlog)
"""
if revlog._inline:
return # inlined revlog are too small for this to be relevant
if revlog.nodemap_file is None:
return # we do not use persistent_nodemap on this revlog
callback_id = b"revlog-persistent-nodemap-%s" % revlog.nodemap_file
if tr.hasfinalize(callback_id):
return # no need to register again
tr.addfinalize(
callback_id, lambda tr: _persist_nodemap(tr.addpostclose, revlog)
)
def update_persistent_nodemap(revlog):
"""update the persistent nodemap right now
To be used for updating the nodemap on disk outside of a normal transaction
setup (eg, `debugupdatecache`).
"""
cleanups = []
_persist_nodemap((lambda x, y: cleanups.append(y)), revlog)
for c in cleanups:
c(None)
def _persist_nodemap(cleaner, revlog):
"""Write nodemap data on disk for a given revlog
"""
if getattr(revlog, 'filteredrevs', ()):
raise error.ProgrammingError(
"cannot persist nodemap of a filtered changelog"
)
if revlog.nodemap_file is None:
msg = "calling persist nodemap on a revlog without the feature enableb"
raise error.ProgrammingError(msg)
can_incremental = util.safehasattr(revlog.index, "nodemap_data_incremental")
ondisk_docket = revlog._nodemap_docket
feed_data = util.safehasattr(revlog.index, "update_nodemap_data")
use_mmap = revlog.opener.options.get("exp-persistent-nodemap.mmap")
data = None
# first attemp an incremental update of the data
if can_incremental and ondisk_docket is not None:
target_docket = revlog._nodemap_docket.copy()
(
src_docket,
data_changed_count,
data,
) = revlog.index.nodemap_data_incremental()
if src_docket != target_docket:
data = None
else:
datafile = _rawdata_filepath(revlog, target_docket)
# EXP-TODO: if this is a cache, this should use a cache vfs, not a
# store vfs
new_length = target_docket.data_length + len(data)
with revlog.opener(datafile, b'r+') as fd:
fd.seek(target_docket.data_length)
fd.write(data)
if feed_data:
if use_mmap:
fd.seek(0)
new_data = fd.read(new_length)
else:
fd.flush()
new_data = util.buffer(util.mmapread(fd, new_length))
target_docket.data_length = new_length
target_docket.data_unused += data_changed_count
if data is None:
# otherwise fallback to a full new export
target_docket = NodeMapDocket()
datafile = _rawdata_filepath(revlog, target_docket)
if util.safehasattr(revlog.index, "nodemap_data_all"):
data = revlog.index.nodemap_data_all()
else:
data = persistent_data(revlog.index)
# EXP-TODO: if this is a cache, this should use a cache vfs, not a
# store vfs
with revlog.opener(datafile, b'w+') as fd:
fd.write(data)
if feed_data:
if use_mmap:
new_data = data
else:
fd.flush()
new_data = util.buffer(util.mmapread(fd, len(data)))
target_docket.data_length = len(data)
target_docket.tip_rev = revlog.tiprev()
# EXP-TODO: if this is a cache, this should use a cache vfs, not a
# store vfs
with revlog.opener(revlog.nodemap_file, b'w', atomictemp=True) as fp:
fp.write(target_docket.serialize())
revlog._nodemap_docket = target_docket
if feed_data:
revlog.index.update_nodemap_data(target_docket, new_data)
# EXP-TODO: if the transaction abort, we should remove the new data and
# reinstall the old one.
# search for old index file in all cases, some older process might have
# left one behind.
olds = _other_rawdata_filepath(revlog, target_docket)
if olds:
realvfs = getattr(revlog, '_realopener', revlog.opener)
def cleanup(tr):
for oldfile in olds:
realvfs.tryunlink(oldfile)
callback_id = b"revlog-cleanup-nodemap-%s" % revlog.nodemap_file
cleaner(callback_id, cleanup)
### Nodemap docket file
#
# The nodemap data are stored on disk using 2 files:
#
# * a raw data files containing a persistent nodemap
# (see `Nodemap Trie` section)
#
# * a small "docket" file containing medatadata
#
# While the nodemap data can be multiple tens of megabytes, the "docket" is
# small, it is easy to update it automatically or to duplicated its content
# during a transaction.
#
# Multiple raw data can exist at the same time (The currently valid one and a
# new one beind used by an in progress transaction). To accomodate this, the
# filename hosting the raw data has a variable parts. The exact filename is
# specified inside the "docket" file.
#
# The docket file contains information to find, qualify and validate the raw
# data. Its content is currently very light, but it will expand as the on disk
# nodemap gains the necessary features to be used in production.
# version 0 is experimental, no BC garantee, do no use outside of tests.
ONDISK_VERSION = 0
S_VERSION = struct.Struct(">B")
S_HEADER = struct.Struct(">BQQQ")
ID_SIZE = 8
def _make_uid():
"""return a new unique identifier.
The identifier is random and composed of ascii characters."""
return nodemod.hex(os.urandom(ID_SIZE))
class NodeMapDocket(object):
"""metadata associated with persistent nodemap data
The persistent data may come from disk or be on their way to disk.
"""
def __init__(self, uid=None):
if uid is None:
uid = _make_uid()
self.uid = uid
self.tip_rev = None
self.data_length = None
self.data_unused = 0
def copy(self):
new = NodeMapDocket(uid=self.uid)
new.tip_rev = self.tip_rev
new.data_length = self.data_length
new.data_unused = self.data_unused
return new
def __cmp__(self, other):
if self.uid < other.uid:
return -1
if self.uid > other.uid:
return 1
elif self.data_length < other.data_length:
return -1
elif self.data_length > other.data_length:
return 1
return 0
def __eq__(self, other):
return self.uid == other.uid and self.data_length == other.data_length
def serialize(self):
"""return serialized bytes for a docket using the passed uid"""
data = []
data.append(S_VERSION.pack(ONDISK_VERSION))
headers = (
len(self.uid),
self.tip_rev,
self.data_length,
self.data_unused,
)
data.append(S_HEADER.pack(*headers))
data.append(self.uid)
return b''.join(data)
def _rawdata_filepath(revlog, docket):
"""The (vfs relative) nodemap's rawdata file for a given uid"""
prefix = revlog.nodemap_file[:-2]
return b"%s-%s.nd" % (prefix, docket.uid)
def _other_rawdata_filepath(revlog, docket):
prefix = revlog.nodemap_file[:-2]
pattern = re.compile(br"(^|/)%s-[0-9a-f]+\.nd$" % prefix)
new_file_path = _rawdata_filepath(revlog, docket)
new_file_name = revlog.opener.basename(new_file_path)
dirpath = revlog.opener.dirname(new_file_path)
others = []
for f in revlog.opener.listdir(dirpath):
if pattern.match(f) and f != new_file_name:
others.append(f)
return others
### Nodemap Trie
#
# This is a simple reference implementation to compute and persist a nodemap
# trie. This reference implementation is write only. The python version of this
# is not expected to be actually used, since it wont provide performance
# improvement over existing non-persistent C implementation.
#
# The nodemap is persisted as Trie using 4bits-address/16-entries block. each
# revision can be adressed using its node shortest prefix.
#
# The trie is stored as a sequence of block. Each block contains 16 entries
# (signed 64bit integer, big endian). Each entry can be one of the following:
#
# * value >= 0 -> index of sub-block
# * value == -1 -> no value
# * value < -1 -> a revision value: rev = -(value+10)
#
# The implementation focus on simplicity, not on performance. A Rust
# implementation should provide a efficient version of the same binary
# persistence. This reference python implementation is never meant to be
# extensively use in production.
def persistent_data(index):
"""return the persistent binary form for a nodemap for a given index
"""
trie = _build_trie(index)
return _persist_trie(trie)
def update_persistent_data(index, root, max_idx, last_rev):
"""return the incremental update for persistent nodemap from a given index
"""
changed_block, trie = _update_trie(index, root, last_rev)
return (
changed_block * S_BLOCK.size,
_persist_trie(trie, existing_idx=max_idx),
)
S_BLOCK = struct.Struct(">" + ("l" * 16))
NO_ENTRY = -1
# rev 0 need to be -2 because 0 is used by block, -1 is a special value.
REV_OFFSET = 2
def _transform_rev(rev):
"""Return the number used to represent the rev in the tree.
(or retrieve a rev number from such representation)
Note that this is an involution, a function equal to its inverse (i.e.
which gives the identity when applied to itself).
"""
return -(rev + REV_OFFSET)
def _to_int(hex_digit):
"""turn an hexadecimal digit into a proper integer"""
return int(hex_digit, 16)
class Block(dict):
"""represent a block of the Trie
contains up to 16 entry indexed from 0 to 15"""
def __init__(self):
super(Block, self).__init__()
# If this block exist on disk, here is its ID
self.ondisk_id = None
def __iter__(self):
return iter(self.get(i) for i in range(16))
def _build_trie(index):
"""build a nodemap trie
The nodemap stores revision number for each unique prefix.
Each block is a dictionary with keys in `[0, 15]`. Values are either
another block or a revision number.
"""
root = Block()
for rev in range(len(index)):
hex = nodemod.hex(index[rev][7])
_insert_into_block(index, 0, root, rev, hex)
return root
def _update_trie(index, root, last_rev):
"""consume"""
changed = 0
for rev in range(last_rev + 1, len(index)):
hex = nodemod.hex(index[rev][7])
changed += _insert_into_block(index, 0, root, rev, hex)
return changed, root
def _insert_into_block(index, level, block, current_rev, current_hex):
"""insert a new revision in a block
index: the index we are adding revision for
level: the depth of the current block in the trie
block: the block currently being considered
current_rev: the revision number we are adding
current_hex: the hexadecimal representation of the of that revision
"""
changed = 1
if block.ondisk_id is not None:
block.ondisk_id = None
hex_digit = _to_int(current_hex[level : level + 1])
entry = block.get(hex_digit)
if entry is None:
# no entry, simply store the revision number
block[hex_digit] = current_rev
elif isinstance(entry, dict):
# need to recurse to an underlying block
changed += _insert_into_block(
index, level + 1, entry, current_rev, current_hex
)
else:
# collision with a previously unique prefix, inserting new
# vertices to fit both entry.
other_hex = nodemod.hex(index[entry][7])
other_rev = entry
new = Block()
block[hex_digit] = new
_insert_into_block(index, level + 1, new, other_rev, other_hex)
_insert_into_block(index, level + 1, new, current_rev, current_hex)
return changed
def _persist_trie(root, existing_idx=None):
"""turn a nodemap trie into persistent binary data
See `_build_trie` for nodemap trie structure"""
block_map = {}
if existing_idx is not None:
base_idx = existing_idx + 1
else:
base_idx = 0
chunks = []
for tn in _walk_trie(root):
if tn.ondisk_id is not None:
block_map[id(tn)] = tn.ondisk_id
else:
block_map[id(tn)] = len(chunks) + base_idx
chunks.append(_persist_block(tn, block_map))
return b''.join(chunks)
def _walk_trie(block):
"""yield all the block in a trie
Children blocks are always yield before their parent block.
"""
for (_, item) in sorted(block.items()):
if isinstance(item, dict):
for sub_block in _walk_trie(item):
yield sub_block
yield block
def _persist_block(block_node, block_map):
"""produce persistent binary data for a single block
Children block are assumed to be already persisted and present in
block_map.
"""
data = tuple(_to_value(v, block_map) for v in block_node)
return S_BLOCK.pack(*data)
def _to_value(item, block_map):
"""persist any value as an integer"""
if item is None:
return NO_ENTRY
elif isinstance(item, dict):
return block_map[id(item)]
else:
return _transform_rev(item)
def parse_data(data):
"""parse parse nodemap data into a nodemap Trie"""
if (len(data) % S_BLOCK.size) != 0:
msg = "nodemap data size is not a multiple of block size (%d): %d"
raise error.Abort(msg % (S_BLOCK.size, len(data)))
if not data:
return Block(), None
block_map = {}
new_blocks = []
for i in range(0, len(data), S_BLOCK.size):
block = Block()
block.ondisk_id = len(block_map)
block_map[block.ondisk_id] = block
block_data = data[i : i + S_BLOCK.size]
values = S_BLOCK.unpack(block_data)
new_blocks.append((block, values))
for b, values in new_blocks:
for idx, v in enumerate(values):
if v == NO_ENTRY:
continue
elif v >= 0:
b[idx] = block_map[v]
else:
b[idx] = _transform_rev(v)
return block, i // S_BLOCK.size
# debug utility
def check_data(ui, index, data):
"""verify that the provided nodemap data are valid for the given idex"""
ret = 0
ui.status((b"revision in index: %d\n") % len(index))
root, __ = parse_data(data)
all_revs = set(_all_revisions(root))
ui.status((b"revision in nodemap: %d\n") % len(all_revs))
for r in range(len(index)):
if r not in all_revs:
msg = b" revision missing from nodemap: %d\n" % r
ui.write_err(msg)
ret = 1
else:
all_revs.remove(r)
nm_rev = _find_node(root, nodemod.hex(index[r][7]))
if nm_rev is None:
msg = b" revision node does not match any entries: %d\n" % r
ui.write_err(msg)
ret = 1
elif nm_rev != r:
msg = (
b" revision node does not match the expected revision: "
b"%d != %d\n" % (r, nm_rev)
)
ui.write_err(msg)
ret = 1
if all_revs:
for r in sorted(all_revs):
msg = b" extra revision in nodemap: %d\n" % r
ui.write_err(msg)
ret = 1
return ret
def _all_revisions(root):
"""return all revisions stored in a Trie"""
for block in _walk_trie(root):
for v in block:
if v is None or isinstance(v, Block):
continue
yield v
def _find_node(block, node):
"""find the revision associated with a given node"""
entry = block.get(_to_int(node[0:1]))
if isinstance(entry, dict):
return _find_node(entry, node[1:])
return entry