##// END OF EJS Templates
revlog: add the glue to use the Rust `InnerRevlog` from Python...
revlog: add the glue to use the Rust `InnerRevlog` from Python The performance of this has been looked at for quite some time, and some workflows are actually quite a bit faster than with the Python + C code. However, we are still (up to 20%) slower in some crucial places like cloning certain repos, log, cat, which makes this an incomplete rewrite. This is mostly due to the high amount of overhead in Python <-> Rust FFI, especially around the VFS code. A future patch series will rewrite the VFS code in pure Rust, which should hopefully get us up to par with current perfomance, if not better in all important cases. This is a "save state" of sorts, as this is a ton of code, and I don't want to pile up even more things in a single review. Continuing to try to match the current performance will take an extremely long time, if it's not impossible, without the aforementioned VFS work.

File last commit:

r53051:0744248c default
r53060:7346f93b default
Show More
compression.rs
383 lines | 11.8 KiB | application/rls-services+xml | RustLexer
//! Helpers around revlog compression
use std::cell::RefCell;
use std::collections::HashSet;
use std::io::Read;
use flate2::bufread::ZlibEncoder;
use flate2::read::ZlibDecoder;
use crate::config::Config;
use crate::errors::HgError;
use crate::exit_codes;
use super::corrupted;
use super::RevlogError;
/// Header byte used to identify ZSTD-compressed data
pub const ZSTD_BYTE: u8 = b'\x28';
/// Header byte used to identify Zlib-compressed data
pub const ZLIB_BYTE: u8 = b'x';
const ZSTD_DEFAULT_LEVEL: u8 = 3;
const ZLIB_DEFAULT_LEVEL: u8 = 6;
/// The length of data below which we don't even try to compress it when using
/// Zstandard.
const MINIMUM_LENGTH_ZSTD: usize = 50;
/// The length of data below which we don't even try to compress it when using
/// Zlib.
const MINIMUM_LENGTH_ZLIB: usize = 44;
/// Defines the available compression engines and their options.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum CompressionConfig {
Zlib {
/// Between 0 and 9 included
level: u8,
},
Zstd {
/// Between 0 and 22 included
level: u8,
/// Never used in practice for now
threads: u8,
},
/// No compression is performed
None,
}
impl CompressionConfig {
pub fn new(
config: &Config,
requirements: &HashSet<String>,
) -> Result<Self, HgError> {
let mut new = Self::default();
let zlib_level = config.get_u32(b"storage", b"revlog.zlib.level")?;
let zstd_level = config.get_u32(b"storage", b"revlog.zstd.level")?;
for requirement in requirements {
if requirement.starts_with("revlog-compression-")
|| requirement.starts_with("exp-compression-")
{
let split = &mut requirement.splitn(3, '-');
split.next();
split.next();
new = match split.next().unwrap() {
"zstd" => CompressionConfig::zstd(zstd_level)?,
e => {
return Err(HgError::UnsupportedFeature(format!(
"Unsupported compression engine '{e}'"
)))
}
};
}
}
if let Some(level) = zlib_level {
if matches!(new, CompressionConfig::Zlib { .. }) {
new.set_level(level as usize)?;
}
}
Ok(new)
}
/// Sets the level of the current compression engine
pub fn set_level(&mut self, new_level: usize) -> Result<(), HgError> {
match self {
CompressionConfig::Zlib { level } => {
if new_level > 9 {
return Err(HgError::abort(
format!(
"invalid compression zlib compression level {}, \
expected between 0 and 9 included",
new_level
),
exit_codes::ABORT,
None,
));
}
*level = new_level as u8;
}
CompressionConfig::Zstd { level, .. } => {
if new_level > 22 {
return Err(HgError::abort(
format!(
"invalid compression zstd compression level {}, \
expected between 0 and 22 included",
new_level
),
exit_codes::ABORT,
None,
));
}
*level = new_level as u8;
}
CompressionConfig::None => {}
}
Ok(())
}
/// Return a ZSTD compression config
pub fn zstd(
zstd_level: Option<u32>,
) -> Result<CompressionConfig, HgError> {
let mut engine = CompressionConfig::Zstd {
level: ZSTD_DEFAULT_LEVEL,
threads: 0,
};
if let Some(level) = zstd_level {
engine.set_level(level as usize)?;
}
Ok(engine)
}
}
impl Default for CompressionConfig {
fn default() -> Self {
Self::Zlib {
level: ZLIB_DEFAULT_LEVEL,
}
}
}
/// A high-level trait to define compressors that should be able to compress
/// and decompress arbitrary bytes.
pub trait Compressor {
/// Returns a new [`Vec`] with the compressed data.
/// Should return `Ok(None)` if compression does not apply (e.g. too small)
fn compress(
&mut self,
data: &[u8],
) -> Result<Option<Vec<u8>>, RevlogError>;
/// Returns a new [`Vec`] with the decompressed data.
fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, RevlogError>;
}
/// A compressor that does nothing (useful in tests)
pub struct NoneCompressor;
impl Compressor for NoneCompressor {
fn compress(
&mut self,
_data: &[u8],
) -> Result<Option<Vec<u8>>, RevlogError> {
Ok(None)
}
fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, RevlogError> {
Ok(data.to_owned())
}
}
/// A compressor for Zstandard
pub struct ZstdCompressor {
/// Level of compression to use
level: u8,
/// How many threads are used (not implemented yet)
threads: u8,
/// The underlying zstd compressor
compressor: zstd::bulk::Compressor<'static>,
}
impl ZstdCompressor {
pub fn new(level: u8, threads: u8) -> Self {
Self {
level,
threads,
compressor: zstd::bulk::Compressor::new(level.into())
.expect("invalid zstd arguments"),
}
}
}
impl Compressor for ZstdCompressor {
fn compress(
&mut self,
data: &[u8],
) -> Result<Option<Vec<u8>>, RevlogError> {
if self.threads != 0 {
// TODO use a zstd builder + zstd cargo feature to support this
unimplemented!("zstd parallel compression is not implemented");
}
if data.len() < MINIMUM_LENGTH_ZSTD {
return Ok(None);
}
let level = self.level as i32;
if data.len() <= 1000000 {
let compressed = self.compressor.compress(data).map_err(|e| {
corrupted(format!("revlog compress error: {}", e))
})?;
Ok(if compressed.len() < data.len() {
Some(compressed)
} else {
None
})
} else {
Ok(Some(zstd::stream::encode_all(data, level).map_err(
|e| corrupted(format!("revlog compress error: {}", e)),
)?))
}
}
fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, RevlogError> {
zstd::stream::decode_all(data).map_err(|e| {
corrupted(format!("revlog decompress error: {}", e)).into()
})
}
}
/// A compressor for Zlib
pub struct ZlibCompressor {
/// Level of compression to use
level: flate2::Compression,
}
impl ZlibCompressor {
pub fn new(level: u8) -> Self {
Self {
level: flate2::Compression::new(level.into()),
}
}
}
impl Compressor for ZlibCompressor {
fn compress(
&mut self,
data: &[u8],
) -> Result<Option<Vec<u8>>, RevlogError> {
assert!(!data.is_empty());
if data.len() < MINIMUM_LENGTH_ZLIB {
return Ok(None);
}
let mut buf = Vec::with_capacity(data.len());
ZlibEncoder::new(data, self.level)
.read_to_end(&mut buf)
.map_err(|e| corrupted(format!("revlog compress error: {}", e)))?;
Ok(if buf.len() < data.len() {
buf.shrink_to_fit();
Some(buf)
} else {
None
})
}
fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, RevlogError> {
let mut decoder = ZlibDecoder::new(data);
// TODO reuse the allocation somehow?
let mut buf = vec![];
decoder.read_to_end(&mut buf).map_err(|e| {
corrupted(format!("revlog decompress error: {}", e))
})?;
Ok(buf)
}
}
thread_local! {
// seems fine to [unwrap] here: this can only fail due to memory allocation
// failing, and it's normal for that to cause panic.
static ZSTD_DECODER : RefCell<zstd::bulk::Decompressor<'static>> =
RefCell::new(zstd::bulk::Decompressor::new().ok().unwrap());
}
/// Util to wrap the reuse of a zstd decoder while controlling its buffer size.
fn zstd_decompress_to_buffer(
bytes: &[u8],
buf: &mut Vec<u8>,
) -> Result<usize, std::io::Error> {
ZSTD_DECODER
.with(|decoder| decoder.borrow_mut().decompress_to_buffer(bytes, buf))
}
/// Specialized revlog decompression to use less memory for deltas while
/// keeping performance acceptable.
pub(super) fn uncompressed_zstd_data(
bytes: &[u8],
is_delta: bool,
uncompressed_len: i32,
) -> Result<Vec<u8>, HgError> {
let cap = uncompressed_len.max(0) as usize;
if is_delta {
// [cap] is usually an over-estimate of the space needed because
// it's the length of delta-decoded data, but we're interested
// in the size of the delta.
// This means we have to [shrink_to_fit] to avoid holding on
// to a large chunk of memory, but it also means we must have a
// fallback branch, for the case when the delta is longer than
// the original data (surprisingly, this does happen in practice)
let mut buf = Vec::with_capacity(cap);
match zstd_decompress_to_buffer(bytes, &mut buf) {
Ok(_) => buf.shrink_to_fit(),
Err(_) => {
buf.clear();
zstd::stream::copy_decode(bytes, &mut buf)
.map_err(|e| corrupted(e.to_string()))?;
}
};
Ok(buf)
} else {
let mut buf = Vec::with_capacity(cap);
let len = zstd_decompress_to_buffer(bytes, &mut buf)
.map_err(|e| corrupted(e.to_string()))?;
if len != uncompressed_len as usize {
Err(corrupted("uncompressed length does not match"))
} else {
Ok(buf)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
const LARGE_TEXT: &[u8] = b"
Patali Dirapata, Cromda Cromda Ripalo, Pata Pata, Ko Ko Ko
Bokoro Dipoulito, Rondi Rondi Pepino, Pata Pata, Ko Ko Ko
Emana Karassoli, Loucra Loucra Nonponto, Pata Pata, Ko Ko Ko.";
#[test]
fn test_zlib_compressor() {
// Can return `Ok(None)`
let mut compressor = ZlibCompressor::new(1);
assert_eq!(compressor.compress(b"too small").unwrap(), None);
// Compression returns bytes
let compressed_with_1 =
compressor.compress(LARGE_TEXT).unwrap().unwrap();
assert!(compressed_with_1.len() < LARGE_TEXT.len());
// Round trip works
assert_eq!(
compressor.decompress(&compressed_with_1).unwrap(),
LARGE_TEXT
);
// Compression levels mean something
let mut compressor = ZlibCompressor::new(9);
// Compression returns bytes
let compressed = compressor.compress(LARGE_TEXT).unwrap().unwrap();
assert!(compressed.len() < compressed_with_1.len());
}
#[test]
fn test_zstd_compressor() {
// Can return `Ok(None)`
let mut compressor = ZstdCompressor::new(1, 0);
assert_eq!(compressor.compress(b"too small").unwrap(), None);
// Compression returns bytes
let compressed_with_1 =
compressor.compress(LARGE_TEXT).unwrap().unwrap();
assert!(compressed_with_1.len() < LARGE_TEXT.len());
// Round trip works
assert_eq!(
compressor.decompress(&compressed_with_1).unwrap(),
LARGE_TEXT
);
// Compression levels mean something
let mut compressor = ZstdCompressor::new(22, 0);
// Compression returns bytes
let compressed = compressor.compress(LARGE_TEXT).unwrap().unwrap();
assert!(compressed.len() < compressed_with_1.len());
}
}