##// END OF EJS Templates
revlog: add the glue to use the Rust `InnerRevlog` from Python...
revlog: add the glue to use the Rust `InnerRevlog` from Python The performance of this has been looked at for quite some time, and some workflows are actually quite a bit faster than with the Python + C code. However, we are still (up to 20%) slower in some crucial places like cloning certain repos, log, cat, which makes this an incomplete rewrite. This is mostly due to the high amount of overhead in Python <-> Rust FFI, especially around the VFS code. A future patch series will rewrite the VFS code in pure Rust, which should hopefully get us up to par with current perfomance, if not better in all important cases. This is a "save state" of sorts, as this is a ton of code, and I don't want to pile up even more things in a single review. Continuing to try to match the current performance will take an extremely long time, if it's not impossible, without the aforementioned VFS work.

File last commit:

r53057:e01e84e5 default
r53060:7346f93b default
Show More
inner_revlog.rs
1353 lines | 46.9 KiB | application/rls-services+xml | RustLexer
//! A layer of lower-level revlog functionality to encapsulate most of the
//! IO work and expensive operations.
use std::{
borrow::Cow,
cell::RefCell,
io::{ErrorKind, Seek, SeekFrom, Write},
ops::Deref,
path::PathBuf,
sync::{Arc, Mutex},
};
use schnellru::{ByMemoryUsage, LruMap};
use sha1::{Digest, Sha1};
use crate::{
errors::{HgError, IoResultExt},
exit_codes,
transaction::Transaction,
vfs::Vfs,
};
use super::{
compression::{
uncompressed_zstd_data, CompressionConfig, Compressor, NoneCompressor,
ZlibCompressor, ZstdCompressor, ZLIB_BYTE, ZSTD_BYTE,
},
file_io::{DelayedBuffer, FileHandle, RandomAccessFile, WriteHandles},
index::{Index, IndexHeader, INDEX_ENTRY_SIZE},
node::{NODE_BYTES_LENGTH, NULL_NODE},
options::{RevlogDataConfig, RevlogDeltaConfig, RevlogFeatureConfig},
BaseRevision, Node, Revision, RevlogEntry, RevlogError, RevlogIndex,
UncheckedRevision, NULL_REVISION, NULL_REVLOG_ENTRY_FLAGS,
};
/// Matches the `_InnerRevlog` class in the Python code, as an arbitrary
/// boundary to incrementally rewrite higher-level revlog functionality in
/// Rust.
pub struct InnerRevlog {
/// When index and data are not interleaved: bytes of the revlog index.
/// When index and data are interleaved (inline revlog): bytes of the
/// revlog index and data.
pub index: Index,
/// The store vfs that is used to interact with the filesystem
vfs: Box<dyn Vfs>,
/// The index file path, relative to the vfs root
pub index_file: PathBuf,
/// The data file path, relative to the vfs root (same as `index_file`
/// if inline)
data_file: PathBuf,
/// Data config that applies to this revlog
data_config: RevlogDataConfig,
/// Delta config that applies to this revlog
delta_config: RevlogDeltaConfig,
/// Feature config that applies to this revlog
feature_config: RevlogFeatureConfig,
/// A view into this revlog's data file
segment_file: RandomAccessFile,
/// A cache of uncompressed chunks that have previously been restored.
/// Its eviction policy is defined in [`Self::new`].
uncompressed_chunk_cache: Option<UncompressedChunkCache>,
/// Used to keep track of the actual target during diverted writes
/// for the changelog
original_index_file: Option<PathBuf>,
/// Write handles to the index and data files
/// XXX why duplicate from `index` and `segment_file`?
writing_handles: Option<WriteHandles>,
/// See [`DelayedBuffer`].
delayed_buffer: Option<Arc<Mutex<DelayedBuffer>>>,
/// Whether this revlog is inline. XXX why duplicate from `index`?
pub inline: bool,
/// A cache of the last revision, which is usually accessed multiple
/// times.
pub last_revision_cache: Mutex<Option<SingleRevisionCache>>,
}
impl InnerRevlog {
pub fn new(
vfs: Box<dyn Vfs>,
index: Index,
index_file: PathBuf,
data_file: PathBuf,
data_config: RevlogDataConfig,
delta_config: RevlogDeltaConfig,
feature_config: RevlogFeatureConfig,
) -> Self {
assert!(index_file.is_relative());
assert!(data_file.is_relative());
let segment_file = RandomAccessFile::new(
dyn_clone::clone_box(&*vfs),
if index.is_inline() {
index_file.to_owned()
} else {
data_file.to_owned()
},
);
let uncompressed_chunk_cache =
data_config.uncompressed_cache_factor.map(
// Arbitrary initial value
// TODO check if using a hasher specific to integers is useful
|_factor| RefCell::new(LruMap::with_memory_budget(65536)),
);
let inline = index.is_inline();
Self {
index,
vfs,
index_file,
data_file,
data_config,
delta_config,
feature_config,
segment_file,
uncompressed_chunk_cache,
original_index_file: None,
writing_handles: None,
delayed_buffer: None,
inline,
last_revision_cache: Mutex::new(None),
}
}
/// Return number of entries of the revlog index
pub fn len(&self) -> usize {
self.index.len()
}
/// Return `true` if this revlog has no entries
pub fn is_empty(&self) -> bool {
self.len() == 0
}
/// Return whether this revlog is inline (mixed index and data)
pub fn is_inline(&self) -> bool {
self.inline
}
/// Clear all caches from this revlog
pub fn clear_cache(&mut self) {
assert!(!self.is_delaying());
if let Some(cache) = self.uncompressed_chunk_cache.as_ref() {
// We don't clear the allocation here because it's probably faster.
// We could change our minds later if this ends up being a problem
// with regards to memory consumption.
cache.borrow_mut().clear();
}
}
/// Return an entry for the null revision
pub fn make_null_entry(&self) -> RevlogEntry {
RevlogEntry {
revlog: self,
rev: NULL_REVISION,
uncompressed_len: 0,
p1: NULL_REVISION,
p2: NULL_REVISION,
flags: NULL_REVLOG_ENTRY_FLAGS,
hash: NULL_NODE,
}
}
/// Return the [`RevlogEntry`] for a [`Revision`] that is known to exist
pub fn get_entry_for_checked_rev(
&self,
rev: Revision,
) -> Result<RevlogEntry, RevlogError> {
if rev == NULL_REVISION {
return Ok(self.make_null_entry());
}
let index_entry = self
.index
.get_entry(rev)
.ok_or_else(|| RevlogError::InvalidRevision(rev.to_string()))?;
let p1 =
self.index.check_revision(index_entry.p1()).ok_or_else(|| {
RevlogError::corrupted(format!(
"p1 for rev {} is invalid",
rev
))
})?;
let p2 =
self.index.check_revision(index_entry.p2()).ok_or_else(|| {
RevlogError::corrupted(format!(
"p2 for rev {} is invalid",
rev
))
})?;
let entry = RevlogEntry {
revlog: self,
rev,
uncompressed_len: index_entry.uncompressed_len(),
p1,
p2,
flags: index_entry.flags(),
hash: *index_entry.hash(),
};
Ok(entry)
}
/// Return the [`RevlogEntry`] for `rev`. If `rev` fails to check, this
/// returns a [`RevlogError`].
/// TODO normalize naming across the index and all revlogs
/// (changelog, etc.) so that `get_entry` is always on an unchecked rev and
/// `get_entry_for_checked_rev` is for checked rev
pub fn get_entry(
&self,
rev: UncheckedRevision,
) -> Result<RevlogEntry, RevlogError> {
if rev == NULL_REVISION.into() {
return Ok(self.make_null_entry());
}
let rev = self.index.check_revision(rev).ok_or_else(|| {
RevlogError::corrupted(format!("rev {} is invalid", rev))
})?;
self.get_entry_for_checked_rev(rev)
}
/// Is the revlog currently delaying the visibility of written data?
///
/// The delaying mechanism can be either in-memory or written on disk in a
/// side-file.
pub fn is_delaying(&self) -> bool {
self.delayed_buffer.is_some() || self.original_index_file.is_some()
}
/// The offset of the data chunk for this revision
#[inline(always)]
pub fn start(&self, rev: Revision) -> usize {
self.index.start(
rev,
&self
.index
.get_entry(rev)
.unwrap_or_else(|| self.index.make_null_entry()),
)
}
/// The length of the data chunk for this revision
/// TODO rename this method and others to more explicit names than the
/// existing ones that were copied over from Python
#[inline(always)]
pub fn length(&self, rev: Revision) -> usize {
self.index
.get_entry(rev)
.unwrap_or_else(|| self.index.make_null_entry())
.compressed_len() as usize
}
/// The end of the data chunk for this revision
#[inline(always)]
pub fn end(&self, rev: Revision) -> usize {
self.start(rev) + self.length(rev)
}
/// Return the delta parent of the given revision
pub fn delta_parent(&self, rev: Revision) -> Revision {
let base = self
.index
.get_entry(rev)
.unwrap()
.base_revision_or_base_of_delta_chain();
if base.0 == rev.0 {
NULL_REVISION
} else if self.delta_config.general_delta {
Revision(base.0)
} else {
Revision(rev.0 - 1)
}
}
/// Return whether `rev` points to a snapshot revision (i.e. does not have
/// a delta base).
pub fn is_snapshot(&self, rev: Revision) -> Result<bool, RevlogError> {
if !self.delta_config.sparse_revlog {
return Ok(self.delta_parent(rev) == NULL_REVISION);
}
self.index.is_snapshot_unchecked(rev)
}
/// Return the delta chain for `rev` according to this revlog's config.
/// See [`Index::delta_chain`] for more information.
pub fn delta_chain(
&self,
rev: Revision,
stop_rev: Option<Revision>,
) -> Result<(Vec<Revision>, bool), HgError> {
self.index.delta_chain(
rev,
stop_rev,
self.delta_config.general_delta.into(),
)
}
fn compressor(&self) -> Result<Box<dyn Compressor>, HgError> {
// TODO cache the compressor?
Ok(match self.feature_config.compression_engine {
CompressionConfig::Zlib { level } => {
Box::new(ZlibCompressor::new(level))
}
CompressionConfig::Zstd { level, threads } => {
Box::new(ZstdCompressor::new(level, threads))
}
CompressionConfig::None => Box::new(NoneCompressor),
})
}
/// Generate a possibly-compressed representation of data.
/// Returns `None` if the data was not compressed.
pub fn compress<'data>(
&self,
data: &'data [u8],
) -> Result<Option<Cow<'data, [u8]>>, RevlogError> {
if data.is_empty() {
return Ok(Some(data.into()));
}
let res = self.compressor()?.compress(data)?;
if let Some(compressed) = res {
// The revlog compressor added the header in the returned data.
return Ok(Some(compressed.into()));
}
if data[0] == b'\0' {
return Ok(Some(data.into()));
}
Ok(None)
}
/// Decompress a revlog chunk.
///
/// The chunk is expected to begin with a header identifying the
/// format type so it can be routed to an appropriate decompressor.
pub fn decompress<'a>(
&'a self,
data: &'a [u8],
) -> Result<Cow<[u8]>, RevlogError> {
if data.is_empty() {
return Ok(data.into());
}
// Revlogs are read much more frequently than they are written and many
// chunks only take microseconds to decompress, so performance is
// important here.
let header = data[0];
match header {
// Settings don't matter as they only affect compression
ZLIB_BYTE => Ok(ZlibCompressor::new(0).decompress(data)?.into()),
// Settings don't matter as they only affect compression
ZSTD_BYTE => {
Ok(ZstdCompressor::new(0, 0).decompress(data)?.into())
}
b'\0' => Ok(data.into()),
b'u' => Ok((&data[1..]).into()),
other => Err(HgError::UnsupportedFeature(format!(
"unknown compression header '{}'",
other
))
.into()),
}
}
/// Obtain a segment of raw data corresponding to a range of revisions.
///
/// Requests for data may be satisfied by a cache.
///
/// Returns a 2-tuple of (offset, data) for the requested range of
/// revisions. Offset is the integer offset from the beginning of the
/// revlog and data is a slice of the raw byte data.
///
/// Callers will need to call `self.start(rev)` and `self.length(rev)`
/// to determine where each revision's data begins and ends.
pub fn get_segment_for_revs(
&self,
start_rev: Revision,
end_rev: Revision,
) -> Result<(usize, Vec<u8>), HgError> {
let start = if start_rev == NULL_REVISION {
0
} else {
let start_entry = self
.index
.get_entry(start_rev)
.expect("null revision segment");
self.index.start(start_rev, &start_entry)
};
let end_entry = self
.index
.get_entry(end_rev)
.expect("null revision segment");
let end = self.index.start(end_rev, &end_entry) + self.length(end_rev);
let length = end - start;
// XXX should we use mmap instead of doing this for platforms that
// support madvise/populate?
Ok((start, self.segment_file.read_chunk(start, length)?))
}
/// Return the uncompressed raw data for `rev`
pub fn chunk_for_rev(&self, rev: Revision) -> Result<Arc<[u8]>, HgError> {
if let Some(cache) = self.uncompressed_chunk_cache.as_ref() {
if let Some(chunk) = cache.borrow_mut().get(&rev) {
return Ok(chunk.clone());
}
}
// TODO revlogv2 should check the compression mode
let data = self.get_segment_for_revs(rev, rev)?.1;
let uncompressed = self.decompress(&data).map_err(|e| {
HgError::abort(
format!("revlog decompression error: {}", e),
exit_codes::ABORT,
None,
)
})?;
let uncompressed: Arc<[u8]> = Arc::from(uncompressed.into_owned());
if let Some(cache) = self.uncompressed_chunk_cache.as_ref() {
cache.borrow_mut().insert(rev, uncompressed.clone());
}
Ok(uncompressed)
}
/// Execute `func` within a read context for the data file, meaning that
/// the read handle will be taken and discarded after the operation.
pub fn with_read<R>(
&self,
func: impl FnOnce() -> Result<R, RevlogError>,
) -> Result<R, RevlogError> {
self.enter_reading_context()?;
let res = func();
self.exit_reading_context();
res.map_err(Into::into)
}
/// `pub` only for use in hg-cpython
#[doc(hidden)]
pub fn enter_reading_context(&self) -> Result<(), HgError> {
if self.is_empty() {
// Nothing to be read
return Ok(());
}
if self.delayed_buffer.is_some() && self.is_inline() {
return Err(HgError::abort(
"revlog with delayed write should not be inline",
exit_codes::ABORT,
None,
));
}
self.segment_file.get_read_handle()?;
Ok(())
}
/// `pub` only for use in hg-cpython
#[doc(hidden)]
pub fn exit_reading_context(&self) {
self.segment_file.exit_reading_context()
}
/// Fill the buffer returned by `get_buffer` with the possibly un-validated
/// raw text for a revision. It can be already validated if it comes
/// from the cache.
pub fn raw_text<G, T>(
&self,
rev: Revision,
get_buffer: G,
) -> Result<(), RevlogError>
where
G: FnOnce(
usize,
&mut dyn FnMut(
&mut dyn RevisionBuffer<Target = T>,
) -> Result<(), RevlogError>,
) -> Result<(), RevlogError>,
{
let entry = &self.get_entry_for_checked_rev(rev)?;
let raw_size = entry.uncompressed_len();
let mut mutex_guard = self
.last_revision_cache
.lock()
.expect("lock should not be held");
let cached_rev = if let Some((_node, rev, data)) = &*mutex_guard {
Some((*rev, data.deref().as_ref()))
} else {
None
};
if let Some(cache) = &self.uncompressed_chunk_cache {
let cache = &mut cache.borrow_mut();
if let Some(size) = raw_size {
// Dynamically update the uncompressed_chunk_cache size to the
// largest revision we've seen in this revlog.
// Do it *before* restoration in case the current revision
// is the largest.
let factor = self
.data_config
.uncompressed_cache_factor
.expect("cache should not exist without factor");
let candidate_size = (size as f64 * factor) as usize;
let limiter_mut = cache.limiter_mut();
if candidate_size > limiter_mut.max_memory_usage() {
std::mem::swap(
limiter_mut,
&mut ByMemoryUsage::new(candidate_size),
);
}
}
}
entry.rawdata(cached_rev, get_buffer)?;
// drop cache to save memory, the caller is expected to update
// the revision cache after validating the text
mutex_guard.take();
Ok(())
}
/// Only `pub` for `hg-cpython`.
/// Obtain decompressed raw data for the specified revisions that are
/// assumed to be in ascending order.
///
/// Returns a list with decompressed data for each requested revision.
#[doc(hidden)]
pub fn chunks(
&self,
revs: Vec<Revision>,
target_size: Option<u64>,
) -> Result<Vec<Arc<[u8]>>, RevlogError> {
if revs.is_empty() {
return Ok(vec![]);
}
let mut fetched_revs = vec![];
let mut chunks = Vec::with_capacity(revs.len());
match self.uncompressed_chunk_cache.as_ref() {
Some(cache) => {
let mut cache = cache.borrow_mut();
for rev in revs.iter() {
match cache.get(rev) {
Some(hit) => chunks.push((*rev, hit.to_owned())),
None => fetched_revs.push(*rev),
}
}
}
None => fetched_revs = revs,
}
let already_cached = chunks.len();
let sliced_chunks = if fetched_revs.is_empty() {
vec![]
} else if !self.data_config.with_sparse_read || self.is_inline() {
vec![fetched_revs]
} else {
self.slice_chunk(&fetched_revs, target_size)?
};
self.with_read(|| {
for revs_chunk in sliced_chunks {
let first_rev = revs_chunk[0];
// Skip trailing revisions with empty diff
let last_rev_idx = revs_chunk
.iter()
.rposition(|r| self.length(*r) != 0)
.unwrap_or(revs_chunk.len() - 1);
let last_rev = revs_chunk[last_rev_idx];
let (offset, data) =
self.get_segment_for_revs(first_rev, last_rev)?;
let revs_chunk = &revs_chunk[..=last_rev_idx];
for rev in revs_chunk {
let chunk_start = self.start(*rev);
let chunk_length = self.length(*rev);
// TODO revlogv2 should check the compression mode
let bytes = &data[chunk_start - offset..][..chunk_length];
let chunk = if !bytes.is_empty() && bytes[0] == ZSTD_BYTE {
// If we're using `zstd`, we want to try a more
// specialized decompression
let entry = self.index.get_entry(*rev).unwrap();
let is_delta = entry
.base_revision_or_base_of_delta_chain()
!= (*rev).into();
let uncompressed = uncompressed_zstd_data(
bytes,
is_delta,
entry.uncompressed_len(),
)?;
Cow::Owned(uncompressed)
} else {
// Otherwise just fallback to generic decompression.
self.decompress(bytes)?
};
chunks.push((*rev, chunk.into()));
}
}
Ok(())
})?;
if let Some(cache) = self.uncompressed_chunk_cache.as_ref() {
let mut cache = cache.borrow_mut();
for (rev, chunk) in chunks.iter().skip(already_cached) {
cache.insert(*rev, chunk.clone());
}
}
// Use stable sort here since it's *mostly* sorted
chunks.sort_by(|a, b| a.0.cmp(&b.0));
Ok(chunks.into_iter().map(|(_r, chunk)| chunk).collect())
}
/// Slice revs to reduce the amount of unrelated data to be read from disk.
///
/// ``revs`` is sliced into groups that should be read in one time.
/// Assume that revs are sorted.
///
/// The initial chunk is sliced until the overall density
/// (payload/chunks-span ratio) is above
/// `revlog.data_config.sr_density_threshold`.
/// No gap smaller than `revlog.data_config.sr_min_gap_size` is skipped.
///
/// If `target_size` is set, no chunk larger than `target_size`
/// will be returned.
/// For consistency with other slicing choices, this limit won't go lower
/// than `revlog.data_config.sr_min_gap_size`.
///
/// If individual revision chunks are larger than this limit, they will
/// still be raised individually.
pub fn slice_chunk(
&self,
revs: &[Revision],
target_size: Option<u64>,
) -> Result<Vec<Vec<Revision>>, RevlogError> {
let target_size =
target_size.map(|size| size.max(self.data_config.sr_min_gap_size));
let target_density = self.data_config.sr_density_threshold;
let min_gap_size = self.data_config.sr_min_gap_size as usize;
let to_density = self.index.slice_chunk_to_density(
revs,
target_density,
min_gap_size,
);
let mut sliced = vec![];
for chunk in to_density {
sliced.extend(
self.slice_chunk_to_size(&chunk, target_size)?
.into_iter()
.map(ToOwned::to_owned),
);
}
Ok(sliced)
}
/// Slice revs to match the target size
///
/// This is intended to be used on chunks that density slicing selected,
/// but that are still too large compared to the read guarantee of revlogs.
/// This might happen when the "minimal gap size" interrupted the slicing
/// or when chains are built in a way that create large blocks next to
/// each other.
fn slice_chunk_to_size<'a>(
&self,
revs: &'a [Revision],
target_size: Option<u64>,
) -> Result<Vec<&'a [Revision]>, RevlogError> {
let mut start_data = self.start(revs[0]);
let end_data = self.end(revs[revs.len() - 1]);
let full_span = end_data - start_data;
let nothing_to_do = target_size
.map(|size| full_span <= size as usize)
.unwrap_or(true);
if nothing_to_do {
return Ok(vec![revs]);
}
let target_size = target_size.expect("target_size is set") as usize;
let mut start_rev_idx = 0;
let mut end_rev_idx = 1;
let mut chunks = vec![];
for (idx, rev) in revs.iter().enumerate().skip(1) {
let span = self.end(*rev) - start_data;
let is_snapshot = self.is_snapshot(*rev)?;
if span <= target_size && is_snapshot {
end_rev_idx = idx + 1;
} else {
let chunk =
self.trim_chunk(revs, start_rev_idx, Some(end_rev_idx));
if !chunk.is_empty() {
chunks.push(chunk);
}
start_rev_idx = idx;
start_data = self.start(*rev);
end_rev_idx = idx + 1;
}
if !is_snapshot {
break;
}
}
// For the others, we use binary slicing to quickly converge towards
// valid chunks (otherwise, we might end up looking for the start/end
// of many revisions). This logic is not looking for the perfect
// slicing point, it quickly converges towards valid chunks.
let number_of_items = revs.len();
while (end_data - start_data) > target_size {
end_rev_idx = number_of_items;
if number_of_items - start_rev_idx <= 1 {
// Protect against individual chunks larger than the limit
break;
}
let mut local_end_data = self.end(revs[end_rev_idx - 1]);
let mut span = local_end_data - start_data;
while span > target_size {
if end_rev_idx - start_rev_idx <= 1 {
// Protect against individual chunks larger than the limit
break;
}
end_rev_idx -= (end_rev_idx - start_rev_idx) / 2;
local_end_data = self.end(revs[end_rev_idx - 1]);
span = local_end_data - start_data;
}
let chunk =
self.trim_chunk(revs, start_rev_idx, Some(end_rev_idx));
if !chunk.is_empty() {
chunks.push(chunk);
}
start_rev_idx = end_rev_idx;
start_data = self.start(revs[start_rev_idx]);
}
let chunk = self.trim_chunk(revs, start_rev_idx, None);
if !chunk.is_empty() {
chunks.push(chunk);
}
Ok(chunks)
}
/// Returns `revs[startidx..endidx]` without empty trailing revs
fn trim_chunk<'a>(
&self,
revs: &'a [Revision],
start_rev_idx: usize,
end_rev_idx: Option<usize>,
) -> &'a [Revision] {
let mut end_rev_idx = end_rev_idx.unwrap_or(revs.len());
// If we have a non-empty delta candidate, there is nothing to trim
if revs[end_rev_idx - 1].0 < self.len() as BaseRevision {
// Trim empty revs at the end, except the very first rev of a chain
while end_rev_idx > 1
&& end_rev_idx > start_rev_idx
&& self.length(revs[end_rev_idx - 1]) == 0
{
end_rev_idx -= 1
}
}
&revs[start_rev_idx..end_rev_idx]
}
/// Check the hash of some given data against the recorded hash.
pub fn check_hash(
&self,
p1: Revision,
p2: Revision,
expected: &[u8],
data: &[u8],
) -> bool {
let e1 = self.index.get_entry(p1);
let h1 = match e1 {
Some(ref entry) => entry.hash(),
None => &NULL_NODE,
};
let e2 = self.index.get_entry(p2);
let h2 = match e2 {
Some(ref entry) => entry.hash(),
None => &NULL_NODE,
};
hash(data, h1.as_bytes(), h2.as_bytes()) == expected
}
/// Returns whether we are currently in a [`Self::with_write`] context
pub fn is_writing(&self) -> bool {
self.writing_handles.is_some()
}
/// Open the revlog files for writing
///
/// Adding content to a revlog should be done within this context.
/// TODO try using `BufRead` and `BufWrite` and see if performance improves
pub fn with_write<R>(
&mut self,
transaction: &mut impl Transaction,
data_end: Option<usize>,
func: impl FnOnce() -> R,
) -> Result<R, HgError> {
if self.is_writing() {
return Ok(func());
}
self.enter_writing_context(data_end, transaction)
.map_err(|e| {
self.exit_writing_context();
e
})?;
let res = func();
self.exit_writing_context();
Ok(res)
}
/// `pub` only for use in hg-cpython
#[doc(hidden)]
pub fn exit_writing_context(&mut self) {
self.writing_handles.take();
self.segment_file.writing_handle.take();
self.segment_file.reading_handle.take();
}
/// `pub` only for use in hg-cpython
#[doc(hidden)]
pub fn python_writing_handles(&self) -> Option<&WriteHandles> {
self.writing_handles.as_ref()
}
/// `pub` only for use in hg-cpython
#[doc(hidden)]
pub fn enter_writing_context(
&mut self,
data_end: Option<usize>,
transaction: &mut impl Transaction,
) -> Result<(), HgError> {
let data_size = if self.is_empty() {
0
} else {
self.end(Revision((self.len() - 1) as BaseRevision))
};
let data_handle = if !self.is_inline() {
let data_handle = match self.vfs.open(&self.data_file) {
Ok(mut f) => {
if let Some(end) = data_end {
f.seek(SeekFrom::Start(end as u64))
.when_reading_file(&self.data_file)?;
} else {
f.seek(SeekFrom::End(0))
.when_reading_file(&self.data_file)?;
}
f
}
Err(e) => match e {
HgError::IoError { error, context } => {
if error.kind() != ErrorKind::NotFound {
return Err(HgError::IoError { error, context });
}
self.vfs.create(&self.data_file)?
}
e => return Err(e),
},
};
transaction.add(&self.data_file, data_size);
Some(FileHandle::from_file(
data_handle,
dyn_clone::clone_box(&*self.vfs),
&self.data_file,
))
} else {
None
};
let index_size = self.len() * INDEX_ENTRY_SIZE;
let index_handle = self.index_write_handle()?;
if self.is_inline() {
transaction.add(&self.index_file, data_size);
} else {
transaction.add(&self.index_file, index_size);
}
self.writing_handles = Some(WriteHandles {
index_handle: index_handle.clone(),
data_handle: data_handle.clone(),
});
*self.segment_file.reading_handle.borrow_mut() = if self.is_inline() {
Some(index_handle)
} else {
data_handle
};
Ok(())
}
/// Get a write handle to the index, sought to the end of its data.
fn index_write_handle(&self) -> Result<FileHandle, HgError> {
let res = if self.delayed_buffer.is_none() {
if self.data_config.check_ambig {
self.vfs.open_check_ambig(&self.index_file)
} else {
self.vfs.open(&self.index_file)
}
} else {
self.vfs.open(&self.index_file)
};
match res {
Ok(mut handle) => {
handle
.seek(SeekFrom::End(0))
.when_reading_file(&self.index_file)?;
Ok(
if let Some(delayed_buffer) = self.delayed_buffer.as_ref()
{
FileHandle::from_file_delayed(
handle,
dyn_clone::clone_box(&*self.vfs),
&self.index_file,
delayed_buffer.clone(),
)?
} else {
FileHandle::from_file(
handle,
dyn_clone::clone_box(&*self.vfs),
&self.index_file,
)
},
)
}
Err(e) => match e {
HgError::IoError { error, context } => {
if error.kind() != ErrorKind::NotFound {
return Err(HgError::IoError { error, context });
};
if let Some(delayed_buffer) = self.delayed_buffer.as_ref()
{
FileHandle::new_delayed(
dyn_clone::clone_box(&*self.vfs),
&self.index_file,
true,
delayed_buffer.clone(),
)
} else {
FileHandle::new(
dyn_clone::clone_box(&*self.vfs),
&self.index_file,
true,
true,
)
}
}
e => Err(e),
},
}
}
/// Split the data of an inline revlog into an index and a data file
pub fn split_inline(
&mut self,
header: IndexHeader,
new_index_file_path: Option<PathBuf>,
) -> Result<PathBuf, RevlogError> {
assert!(self.delayed_buffer.is_none());
let existing_handles = self.writing_handles.is_some();
if let Some(handles) = &mut self.writing_handles {
handles.index_handle.flush()?;
self.writing_handles.take();
self.segment_file.writing_handle.take();
}
let mut new_data_file_handle = self.vfs.create(&self.data_file)?;
// Drop any potential data, possibly redundant with the VFS impl.
new_data_file_handle
.set_len(0)
.when_writing_file(&self.data_file)?;
self.with_read(|| -> Result<(), RevlogError> {
for r in 0..self.index.len() {
let rev = Revision(r as BaseRevision);
let rev_segment = self.get_segment_for_revs(rev, rev)?.1;
new_data_file_handle
.write_all(&rev_segment)
.when_writing_file(&self.data_file)?;
}
new_data_file_handle
.flush()
.when_writing_file(&self.data_file)?;
Ok(())
})?;
if let Some(index_path) = new_index_file_path {
self.index_file = index_path
}
let mut new_index_handle = self.vfs.create(&self.index_file)?;
let mut new_data = Vec::with_capacity(self.len() * INDEX_ENTRY_SIZE);
for r in 0..self.len() {
let rev = Revision(r as BaseRevision);
let entry = self.index.entry_binary(rev).unwrap_or_else(|| {
panic!(
"entry {} should exist in {}",
r,
self.index_file.display()
)
});
if r == 0 {
new_data.extend(header.header_bytes);
}
new_data.extend(entry);
}
new_index_handle
.write_all(&new_data)
.when_writing_file(&self.index_file)?;
// Replace the index with a new one because the buffer contains inline
// data
self.index = Index::new(Box::new(new_data), header)?;
self.inline = false;
self.segment_file = RandomAccessFile::new(
dyn_clone::clone_box(&*self.vfs),
self.data_file.to_owned(),
);
if existing_handles {
// Switched from inline to conventional, reopen the index
let new_data_handle = Some(FileHandle::from_file(
new_data_file_handle,
dyn_clone::clone_box(&*self.vfs),
&self.data_file,
));
self.writing_handles = Some(WriteHandles {
index_handle: self.index_write_handle()?,
data_handle: new_data_handle.clone(),
});
*self.segment_file.writing_handle.borrow_mut() = new_data_handle;
}
Ok(self.index_file.to_owned())
}
/// Write a new entry to this revlog.
/// - `entry` is the index bytes
/// - `header_and_data` is the compression header and the revision data
/// - `offset` is the position in the data file to write to
/// - `index_end` is the overwritten position in the index in revlog-v2,
/// since the format may allow a rewrite of garbage data at the end.
/// - `data_end` is the overwritten position in the data-file in revlog-v2,
/// since the format may allow a rewrite of garbage data at the end.
///
/// XXX Why do we have `data_end` *and* `offset`? Same question in Python
pub fn write_entry(
&mut self,
mut transaction: impl Transaction,
entry: &[u8],
header_and_data: (&[u8], &[u8]),
mut offset: usize,
index_end: Option<u64>,
data_end: Option<u64>,
) -> Result<(u64, Option<u64>), HgError> {
let current_revision = self.len() - 1;
let canonical_index_file = self.canonical_index_file();
let is_inline = self.is_inline();
let handles = match &mut self.writing_handles {
None => {
return Err(HgError::abort(
"adding revision outside of the `with_write` context",
exit_codes::ABORT,
None,
));
}
Some(handles) => handles,
};
let index_handle = &mut handles.index_handle;
let data_handle = &mut handles.data_handle;
if let Some(end) = index_end {
index_handle
.seek(SeekFrom::Start(end))
.when_reading_file(&self.index_file)?;
} else {
index_handle
.seek(SeekFrom::End(0))
.when_reading_file(&self.index_file)?;
}
if let Some(data_handle) = data_handle {
if let Some(end) = data_end {
data_handle
.seek(SeekFrom::Start(end))
.when_reading_file(&self.data_file)?;
} else {
data_handle
.seek(SeekFrom::End(0))
.when_reading_file(&self.data_file)?;
}
}
let (header, data) = header_and_data;
if !is_inline {
transaction.add(&self.data_file, offset);
transaction
.add(&canonical_index_file, current_revision * entry.len());
let data_handle = data_handle
.as_mut()
.expect("data handle should exist when not inline");
if !header.is_empty() {
data_handle.write_all(header)?;
}
data_handle.write_all(data)?;
match &mut self.delayed_buffer {
Some(buf) => {
buf.lock()
.expect("propagate the panic")
.buffer
.write_all(entry)
.expect("write to delay buffer should succeed");
}
None => index_handle.write_all(entry)?,
}
} else if self.delayed_buffer.is_some() {
return Err(HgError::abort(
"invalid delayed write on inline revlog",
exit_codes::ABORT,
None,
));
} else {
offset += current_revision * entry.len();
transaction.add(&canonical_index_file, offset);
index_handle.write_all(entry)?;
index_handle.write_all(header)?;
index_handle.write_all(data)?;
}
let data_position = match data_handle {
Some(h) => Some(h.position()?),
None => None,
};
Ok((index_handle.position()?, data_position))
}
/// Return the real target index file and not the temporary when diverting
pub fn canonical_index_file(&self) -> PathBuf {
self.original_index_file
.as_ref()
.map(ToOwned::to_owned)
.unwrap_or_else(|| self.index_file.to_owned())
}
/// Return the path to the diverted index
fn diverted_index(&self) -> PathBuf {
self.index_file.with_extension("i.a")
}
/// True if we're in a [`Self::with_write`] or [`Self::with_read`] context
pub fn is_open(&self) -> bool {
self.segment_file.is_open()
}
/// Set this revlog to delay its writes to a buffer
pub fn delay(&mut self) -> Result<Option<PathBuf>, HgError> {
assert!(!self.is_open());
if self.is_inline() {
return Err(HgError::abort(
"revlog with delayed write should not be inline",
exit_codes::ABORT,
None,
));
}
if self.delayed_buffer.is_some() || self.original_index_file.is_some()
{
// Delay or divert already happening
return Ok(None);
}
if self.is_empty() {
self.original_index_file = Some(self.index_file.to_owned());
self.index_file = self.diverted_index();
if self.vfs.exists(&self.index_file) {
self.vfs.unlink(&self.index_file)?;
}
Ok(Some(self.index_file.to_owned()))
} else {
self.delayed_buffer =
Some(Arc::new(Mutex::new(DelayedBuffer::default())));
Ok(None)
}
}
/// Write the pending data (in memory) if any to the diverted index file
/// (on disk temporary file)
pub fn write_pending(
&mut self,
) -> Result<(Option<PathBuf>, bool), HgError> {
assert!(!self.is_open());
if self.is_inline() {
return Err(HgError::abort(
"revlog with delayed write should not be inline",
exit_codes::ABORT,
None,
));
}
if self.original_index_file.is_some() {
return Ok((None, true));
}
let mut any_pending = false;
let pending_index_file = self.diverted_index();
if self.vfs.exists(&pending_index_file) {
self.vfs.unlink(&pending_index_file)?;
}
self.vfs.copy(&self.index_file, &pending_index_file)?;
if let Some(delayed_buffer) = self.delayed_buffer.take() {
let mut index_file_handle = self.vfs.open(&pending_index_file)?;
index_file_handle
.seek(SeekFrom::End(0))
.when_writing_file(&pending_index_file)?;
let delayed_data =
&delayed_buffer.lock().expect("propagate the panic").buffer;
index_file_handle
.write_all(delayed_data)
.when_writing_file(&pending_index_file)?;
any_pending = true;
}
self.original_index_file = Some(self.index_file.to_owned());
self.index_file = pending_index_file;
Ok((Some(self.index_file.to_owned()), any_pending))
}
/// Overwrite the canonical file with the diverted file, or write out the
/// delayed buffer.
/// Returns an error if the revlog is neither diverted nor delayed.
pub fn finalize_pending(&mut self) -> Result<PathBuf, HgError> {
assert!(!self.is_open());
if self.is_inline() {
return Err(HgError::abort(
"revlog with delayed write should not be inline",
exit_codes::ABORT,
None,
));
}
match (
self.delayed_buffer.as_ref(),
self.original_index_file.as_ref(),
) {
(None, None) => {
return Err(HgError::abort(
"neither delay nor divert found on this revlog",
exit_codes::ABORT,
None,
));
}
(Some(delay), None) => {
let mut index_file_handle = self.vfs.open(&self.index_file)?;
index_file_handle
.seek(SeekFrom::End(0))
.when_writing_file(&self.index_file)?;
index_file_handle
.write_all(
&delay.lock().expect("propagate the panic").buffer,
)
.when_writing_file(&self.index_file)?;
self.delayed_buffer = None;
}
(None, Some(divert)) => {
if self.vfs.exists(&self.index_file) {
self.vfs.rename(&self.index_file, divert, true)?;
}
divert.clone_into(&mut self.index_file);
self.original_index_file = None;
}
(Some(_), Some(_)) => unreachable!(
"{} is in an inconsistent state of both delay and divert",
self.canonical_index_file().display(),
),
}
Ok(self.canonical_index_file())
}
/// `pub` only for `hg-cpython`. This is made a different method than
/// [`Revlog::index`] in case there is a different invariant that pops up
/// later.
#[doc(hidden)]
pub fn shared_index(&self) -> &Index {
&self.index
}
}
/// The use of a [`Refcell`] assumes that a given revlog will only
/// be accessed (read or write) by a single thread.
type UncompressedChunkCache =
RefCell<LruMap<Revision, Arc<[u8]>, ByMemoryUsage>>;
/// The node, revision and data for the last revision we've seen. Speeds up
/// a lot of sequential operations of the revlog.
///
/// The data is not just bytes since it can come from Python and we want to
/// avoid copies if possible.
type SingleRevisionCache =
(Node, Revision, Box<dyn Deref<Target = [u8]> + Send>);
/// A way of progressively filling a buffer with revision data, then return
/// that buffer. Used to abstract away Python-allocated code to reduce copying
/// for performance reasons.
pub trait RevisionBuffer {
/// The owned buffer type to return
type Target;
/// Copies the slice into the buffer
fn extend_from_slice(&mut self, slice: &[u8]);
/// Returns the now finished owned buffer
fn finish(self) -> Self::Target;
}
/// A simple vec-based buffer. This is uselessly complicated for the pure Rust
/// case, but it's the price to pay for Python compatibility.
#[derive(Debug)]
pub(super) struct CoreRevisionBuffer {
buf: Vec<u8>,
}
impl CoreRevisionBuffer {
pub fn new() -> Self {
Self { buf: vec![] }
}
#[inline]
pub fn resize(&mut self, size: usize) {
self.buf.reserve_exact(size - self.buf.capacity());
}
}
impl RevisionBuffer for CoreRevisionBuffer {
type Target = Vec<u8>;
#[inline]
fn extend_from_slice(&mut self, slice: &[u8]) {
self.buf.extend_from_slice(slice);
}
#[inline]
fn finish(self) -> Self::Target {
self.buf
}
}
/// Calculate the hash of a revision given its data and its parents.
pub fn hash(
data: &[u8],
p1_hash: &[u8],
p2_hash: &[u8],
) -> [u8; NODE_BYTES_LENGTH] {
let mut hasher = Sha1::new();
let (a, b) = (p1_hash, p2_hash);
if a > b {
hasher.update(b);
hasher.update(a);
} else {
hasher.update(a);
hasher.update(b);
}
hasher.update(data);
*hasher.finalize().as_ref()
}