##// END OF EJS Templates
rust-inner-revlog: cache the compressor...
rust-inner-revlog: cache the compressor The `compress` function is unlikely to be used in highly contended situations, and creating a compressor has some overhead, on top of losing out on some potential advantages of longer-running optimizations from the compressor.

File last commit:

r53051:0744248c default
r53201:7756494c default
Show More
compression.rs
383 lines | 11.8 KiB | application/rls-services+xml | RustLexer
//! Helpers around revlog compression
use std::cell::RefCell;
use std::collections::HashSet;
use std::io::Read;
use flate2::bufread::ZlibEncoder;
use flate2::read::ZlibDecoder;
use crate::config::Config;
use crate::errors::HgError;
use crate::exit_codes;
use super::corrupted;
use super::RevlogError;
/// Header byte used to identify ZSTD-compressed data
pub const ZSTD_BYTE: u8 = b'\x28';
/// Header byte used to identify Zlib-compressed data
pub const ZLIB_BYTE: u8 = b'x';
const ZSTD_DEFAULT_LEVEL: u8 = 3;
const ZLIB_DEFAULT_LEVEL: u8 = 6;
/// The length of data below which we don't even try to compress it when using
/// Zstandard.
const MINIMUM_LENGTH_ZSTD: usize = 50;
/// The length of data below which we don't even try to compress it when using
/// Zlib.
const MINIMUM_LENGTH_ZLIB: usize = 44;
/// Defines the available compression engines and their options.
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum CompressionConfig {
Zlib {
/// Between 0 and 9 included
level: u8,
},
Zstd {
/// Between 0 and 22 included
level: u8,
/// Never used in practice for now
threads: u8,
},
/// No compression is performed
None,
}
impl CompressionConfig {
pub fn new(
config: &Config,
requirements: &HashSet<String>,
) -> Result<Self, HgError> {
let mut new = Self::default();
let zlib_level = config.get_u32(b"storage", b"revlog.zlib.level")?;
let zstd_level = config.get_u32(b"storage", b"revlog.zstd.level")?;
for requirement in requirements {
if requirement.starts_with("revlog-compression-")
|| requirement.starts_with("exp-compression-")
{
let split = &mut requirement.splitn(3, '-');
split.next();
split.next();
new = match split.next().unwrap() {
"zstd" => CompressionConfig::zstd(zstd_level)?,
e => {
return Err(HgError::UnsupportedFeature(format!(
"Unsupported compression engine '{e}'"
)))
}
};
}
}
if let Some(level) = zlib_level {
if matches!(new, CompressionConfig::Zlib { .. }) {
new.set_level(level as usize)?;
}
}
Ok(new)
}
/// Sets the level of the current compression engine
pub fn set_level(&mut self, new_level: usize) -> Result<(), HgError> {
match self {
CompressionConfig::Zlib { level } => {
if new_level > 9 {
return Err(HgError::abort(
format!(
"invalid compression zlib compression level {}, \
expected between 0 and 9 included",
new_level
),
exit_codes::ABORT,
None,
));
}
*level = new_level as u8;
}
CompressionConfig::Zstd { level, .. } => {
if new_level > 22 {
return Err(HgError::abort(
format!(
"invalid compression zstd compression level {}, \
expected between 0 and 22 included",
new_level
),
exit_codes::ABORT,
None,
));
}
*level = new_level as u8;
}
CompressionConfig::None => {}
}
Ok(())
}
/// Return a ZSTD compression config
pub fn zstd(
zstd_level: Option<u32>,
) -> Result<CompressionConfig, HgError> {
let mut engine = CompressionConfig::Zstd {
level: ZSTD_DEFAULT_LEVEL,
threads: 0,
};
if let Some(level) = zstd_level {
engine.set_level(level as usize)?;
}
Ok(engine)
}
}
impl Default for CompressionConfig {
fn default() -> Self {
Self::Zlib {
level: ZLIB_DEFAULT_LEVEL,
}
}
}
/// A high-level trait to define compressors that should be able to compress
/// and decompress arbitrary bytes.
pub trait Compressor {
/// Returns a new [`Vec`] with the compressed data.
/// Should return `Ok(None)` if compression does not apply (e.g. too small)
fn compress(
&mut self,
data: &[u8],
) -> Result<Option<Vec<u8>>, RevlogError>;
/// Returns a new [`Vec`] with the decompressed data.
fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, RevlogError>;
}
/// A compressor that does nothing (useful in tests)
pub struct NoneCompressor;
impl Compressor for NoneCompressor {
fn compress(
&mut self,
_data: &[u8],
) -> Result<Option<Vec<u8>>, RevlogError> {
Ok(None)
}
fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, RevlogError> {
Ok(data.to_owned())
}
}
/// A compressor for Zstandard
pub struct ZstdCompressor {
/// Level of compression to use
level: u8,
/// How many threads are used (not implemented yet)
threads: u8,
/// The underlying zstd compressor
compressor: zstd::bulk::Compressor<'static>,
}
impl ZstdCompressor {
pub fn new(level: u8, threads: u8) -> Self {
Self {
level,
threads,
compressor: zstd::bulk::Compressor::new(level.into())
.expect("invalid zstd arguments"),
}
}
}
impl Compressor for ZstdCompressor {
fn compress(
&mut self,
data: &[u8],
) -> Result<Option<Vec<u8>>, RevlogError> {
if self.threads != 0 {
// TODO use a zstd builder + zstd cargo feature to support this
unimplemented!("zstd parallel compression is not implemented");
}
if data.len() < MINIMUM_LENGTH_ZSTD {
return Ok(None);
}
let level = self.level as i32;
if data.len() <= 1000000 {
let compressed = self.compressor.compress(data).map_err(|e| {
corrupted(format!("revlog compress error: {}", e))
})?;
Ok(if compressed.len() < data.len() {
Some(compressed)
} else {
None
})
} else {
Ok(Some(zstd::stream::encode_all(data, level).map_err(
|e| corrupted(format!("revlog compress error: {}", e)),
)?))
}
}
fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, RevlogError> {
zstd::stream::decode_all(data).map_err(|e| {
corrupted(format!("revlog decompress error: {}", e)).into()
})
}
}
/// A compressor for Zlib
pub struct ZlibCompressor {
/// Level of compression to use
level: flate2::Compression,
}
impl ZlibCompressor {
pub fn new(level: u8) -> Self {
Self {
level: flate2::Compression::new(level.into()),
}
}
}
impl Compressor for ZlibCompressor {
fn compress(
&mut self,
data: &[u8],
) -> Result<Option<Vec<u8>>, RevlogError> {
assert!(!data.is_empty());
if data.len() < MINIMUM_LENGTH_ZLIB {
return Ok(None);
}
let mut buf = Vec::with_capacity(data.len());
ZlibEncoder::new(data, self.level)
.read_to_end(&mut buf)
.map_err(|e| corrupted(format!("revlog compress error: {}", e)))?;
Ok(if buf.len() < data.len() {
buf.shrink_to_fit();
Some(buf)
} else {
None
})
}
fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, RevlogError> {
let mut decoder = ZlibDecoder::new(data);
// TODO reuse the allocation somehow?
let mut buf = vec![];
decoder.read_to_end(&mut buf).map_err(|e| {
corrupted(format!("revlog decompress error: {}", e))
})?;
Ok(buf)
}
}
thread_local! {
// seems fine to [unwrap] here: this can only fail due to memory allocation
// failing, and it's normal for that to cause panic.
static ZSTD_DECODER : RefCell<zstd::bulk::Decompressor<'static>> =
RefCell::new(zstd::bulk::Decompressor::new().ok().unwrap());
}
/// Util to wrap the reuse of a zstd decoder while controlling its buffer size.
fn zstd_decompress_to_buffer(
bytes: &[u8],
buf: &mut Vec<u8>,
) -> Result<usize, std::io::Error> {
ZSTD_DECODER
.with(|decoder| decoder.borrow_mut().decompress_to_buffer(bytes, buf))
}
/// Specialized revlog decompression to use less memory for deltas while
/// keeping performance acceptable.
pub(super) fn uncompressed_zstd_data(
bytes: &[u8],
is_delta: bool,
uncompressed_len: i32,
) -> Result<Vec<u8>, HgError> {
let cap = uncompressed_len.max(0) as usize;
if is_delta {
// [cap] is usually an over-estimate of the space needed because
// it's the length of delta-decoded data, but we're interested
// in the size of the delta.
// This means we have to [shrink_to_fit] to avoid holding on
// to a large chunk of memory, but it also means we must have a
// fallback branch, for the case when the delta is longer than
// the original data (surprisingly, this does happen in practice)
let mut buf = Vec::with_capacity(cap);
match zstd_decompress_to_buffer(bytes, &mut buf) {
Ok(_) => buf.shrink_to_fit(),
Err(_) => {
buf.clear();
zstd::stream::copy_decode(bytes, &mut buf)
.map_err(|e| corrupted(e.to_string()))?;
}
};
Ok(buf)
} else {
let mut buf = Vec::with_capacity(cap);
let len = zstd_decompress_to_buffer(bytes, &mut buf)
.map_err(|e| corrupted(e.to_string()))?;
if len != uncompressed_len as usize {
Err(corrupted("uncompressed length does not match"))
} else {
Ok(buf)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
const LARGE_TEXT: &[u8] = b"
Patali Dirapata, Cromda Cromda Ripalo, Pata Pata, Ko Ko Ko
Bokoro Dipoulito, Rondi Rondi Pepino, Pata Pata, Ko Ko Ko
Emana Karassoli, Loucra Loucra Nonponto, Pata Pata, Ko Ko Ko.";
#[test]
fn test_zlib_compressor() {
// Can return `Ok(None)`
let mut compressor = ZlibCompressor::new(1);
assert_eq!(compressor.compress(b"too small").unwrap(), None);
// Compression returns bytes
let compressed_with_1 =
compressor.compress(LARGE_TEXT).unwrap().unwrap();
assert!(compressed_with_1.len() < LARGE_TEXT.len());
// Round trip works
assert_eq!(
compressor.decompress(&compressed_with_1).unwrap(),
LARGE_TEXT
);
// Compression levels mean something
let mut compressor = ZlibCompressor::new(9);
// Compression returns bytes
let compressed = compressor.compress(LARGE_TEXT).unwrap().unwrap();
assert!(compressed.len() < compressed_with_1.len());
}
#[test]
fn test_zstd_compressor() {
// Can return `Ok(None)`
let mut compressor = ZstdCompressor::new(1, 0);
assert_eq!(compressor.compress(b"too small").unwrap(), None);
// Compression returns bytes
let compressed_with_1 =
compressor.compress(LARGE_TEXT).unwrap().unwrap();
assert!(compressed_with_1.len() < LARGE_TEXT.len());
// Round trip works
assert_eq!(
compressor.decompress(&compressed_with_1).unwrap(),
LARGE_TEXT
);
// Compression levels mean something
let mut compressor = ZstdCompressor::new(22, 0);
// Compression returns bytes
let compressed = compressor.compress(LARGE_TEXT).unwrap().unwrap();
assert!(compressed.len() < compressed_with_1.len());
}
}