##// END OF EJS Templates
rust-revlog: add compression helpers...
Raphaël Gomès -
r53051:0744248c default
parent child Browse files
Show More
@@ -0,0 +1,383
1 //! Helpers around revlog compression
2
3 use std::cell::RefCell;
4 use std::collections::HashSet;
5 use std::io::Read;
6
7 use flate2::bufread::ZlibEncoder;
8 use flate2::read::ZlibDecoder;
9
10 use crate::config::Config;
11 use crate::errors::HgError;
12 use crate::exit_codes;
13
14 use super::corrupted;
15 use super::RevlogError;
16
17 /// Header byte used to identify ZSTD-compressed data
18 pub const ZSTD_BYTE: u8 = b'\x28';
19 /// Header byte used to identify Zlib-compressed data
20 pub const ZLIB_BYTE: u8 = b'x';
21
22 const ZSTD_DEFAULT_LEVEL: u8 = 3;
23 const ZLIB_DEFAULT_LEVEL: u8 = 6;
24 /// The length of data below which we don't even try to compress it when using
25 /// Zstandard.
26 const MINIMUM_LENGTH_ZSTD: usize = 50;
27 /// The length of data below which we don't even try to compress it when using
28 /// Zlib.
29 const MINIMUM_LENGTH_ZLIB: usize = 44;
30
31 /// Defines the available compression engines and their options.
32 #[derive(Debug, Copy, Clone, PartialEq, Eq)]
33 pub enum CompressionConfig {
34 Zlib {
35 /// Between 0 and 9 included
36 level: u8,
37 },
38 Zstd {
39 /// Between 0 and 22 included
40 level: u8,
41 /// Never used in practice for now
42 threads: u8,
43 },
44 /// No compression is performed
45 None,
46 }
47
48 impl CompressionConfig {
49 pub fn new(
50 config: &Config,
51 requirements: &HashSet<String>,
52 ) -> Result<Self, HgError> {
53 let mut new = Self::default();
54
55 let zlib_level = config.get_u32(b"storage", b"revlog.zlib.level")?;
56 let zstd_level = config.get_u32(b"storage", b"revlog.zstd.level")?;
57
58 for requirement in requirements {
59 if requirement.starts_with("revlog-compression-")
60 || requirement.starts_with("exp-compression-")
61 {
62 let split = &mut requirement.splitn(3, '-');
63 split.next();
64 split.next();
65 new = match split.next().unwrap() {
66 "zstd" => CompressionConfig::zstd(zstd_level)?,
67 e => {
68 return Err(HgError::UnsupportedFeature(format!(
69 "Unsupported compression engine '{e}'"
70 )))
71 }
72 };
73 }
74 }
75 if let Some(level) = zlib_level {
76 if matches!(new, CompressionConfig::Zlib { .. }) {
77 new.set_level(level as usize)?;
78 }
79 }
80 Ok(new)
81 }
82
83 /// Sets the level of the current compression engine
84 pub fn set_level(&mut self, new_level: usize) -> Result<(), HgError> {
85 match self {
86 CompressionConfig::Zlib { level } => {
87 if new_level > 9 {
88 return Err(HgError::abort(
89 format!(
90 "invalid compression zlib compression level {}, \
91 expected between 0 and 9 included",
92 new_level
93 ),
94 exit_codes::ABORT,
95 None,
96 ));
97 }
98 *level = new_level as u8;
99 }
100 CompressionConfig::Zstd { level, .. } => {
101 if new_level > 22 {
102 return Err(HgError::abort(
103 format!(
104 "invalid compression zstd compression level {}, \
105 expected between 0 and 22 included",
106 new_level
107 ),
108 exit_codes::ABORT,
109 None,
110 ));
111 }
112 *level = new_level as u8;
113 }
114 CompressionConfig::None => {}
115 }
116 Ok(())
117 }
118
119 /// Return a ZSTD compression config
120 pub fn zstd(
121 zstd_level: Option<u32>,
122 ) -> Result<CompressionConfig, HgError> {
123 let mut engine = CompressionConfig::Zstd {
124 level: ZSTD_DEFAULT_LEVEL,
125 threads: 0,
126 };
127 if let Some(level) = zstd_level {
128 engine.set_level(level as usize)?;
129 }
130 Ok(engine)
131 }
132 }
133
134 impl Default for CompressionConfig {
135 fn default() -> Self {
136 Self::Zlib {
137 level: ZLIB_DEFAULT_LEVEL,
138 }
139 }
140 }
141
142 /// A high-level trait to define compressors that should be able to compress
143 /// and decompress arbitrary bytes.
144 pub trait Compressor {
145 /// Returns a new [`Vec`] with the compressed data.
146 /// Should return `Ok(None)` if compression does not apply (e.g. too small)
147 fn compress(
148 &mut self,
149 data: &[u8],
150 ) -> Result<Option<Vec<u8>>, RevlogError>;
151 /// Returns a new [`Vec`] with the decompressed data.
152 fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, RevlogError>;
153 }
154
155 /// A compressor that does nothing (useful in tests)
156 pub struct NoneCompressor;
157
158 impl Compressor for NoneCompressor {
159 fn compress(
160 &mut self,
161 _data: &[u8],
162 ) -> Result<Option<Vec<u8>>, RevlogError> {
163 Ok(None)
164 }
165
166 fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, RevlogError> {
167 Ok(data.to_owned())
168 }
169 }
170
171 /// A compressor for Zstandard
172 pub struct ZstdCompressor {
173 /// Level of compression to use
174 level: u8,
175 /// How many threads are used (not implemented yet)
176 threads: u8,
177 /// The underlying zstd compressor
178 compressor: zstd::bulk::Compressor<'static>,
179 }
180
181 impl ZstdCompressor {
182 pub fn new(level: u8, threads: u8) -> Self {
183 Self {
184 level,
185 threads,
186 compressor: zstd::bulk::Compressor::new(level.into())
187 .expect("invalid zstd arguments"),
188 }
189 }
190 }
191
192 impl Compressor for ZstdCompressor {
193 fn compress(
194 &mut self,
195 data: &[u8],
196 ) -> Result<Option<Vec<u8>>, RevlogError> {
197 if self.threads != 0 {
198 // TODO use a zstd builder + zstd cargo feature to support this
199 unimplemented!("zstd parallel compression is not implemented");
200 }
201 if data.len() < MINIMUM_LENGTH_ZSTD {
202 return Ok(None);
203 }
204 let level = self.level as i32;
205 if data.len() <= 1000000 {
206 let compressed = self.compressor.compress(data).map_err(|e| {
207 corrupted(format!("revlog compress error: {}", e))
208 })?;
209 Ok(if compressed.len() < data.len() {
210 Some(compressed)
211 } else {
212 None
213 })
214 } else {
215 Ok(Some(zstd::stream::encode_all(data, level).map_err(
216 |e| corrupted(format!("revlog compress error: {}", e)),
217 )?))
218 }
219 }
220
221 fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, RevlogError> {
222 zstd::stream::decode_all(data).map_err(|e| {
223 corrupted(format!("revlog decompress error: {}", e)).into()
224 })
225 }
226 }
227
228 /// A compressor for Zlib
229 pub struct ZlibCompressor {
230 /// Level of compression to use
231 level: flate2::Compression,
232 }
233
234 impl ZlibCompressor {
235 pub fn new(level: u8) -> Self {
236 Self {
237 level: flate2::Compression::new(level.into()),
238 }
239 }
240 }
241
242 impl Compressor for ZlibCompressor {
243 fn compress(
244 &mut self,
245 data: &[u8],
246 ) -> Result<Option<Vec<u8>>, RevlogError> {
247 assert!(!data.is_empty());
248 if data.len() < MINIMUM_LENGTH_ZLIB {
249 return Ok(None);
250 }
251 let mut buf = Vec::with_capacity(data.len());
252 ZlibEncoder::new(data, self.level)
253 .read_to_end(&mut buf)
254 .map_err(|e| corrupted(format!("revlog compress error: {}", e)))?;
255
256 Ok(if buf.len() < data.len() {
257 buf.shrink_to_fit();
258 Some(buf)
259 } else {
260 None
261 })
262 }
263
264 fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, RevlogError> {
265 let mut decoder = ZlibDecoder::new(data);
266 // TODO reuse the allocation somehow?
267 let mut buf = vec![];
268 decoder.read_to_end(&mut buf).map_err(|e| {
269 corrupted(format!("revlog decompress error: {}", e))
270 })?;
271 Ok(buf)
272 }
273 }
274
275 thread_local! {
276 // seems fine to [unwrap] here: this can only fail due to memory allocation
277 // failing, and it's normal for that to cause panic.
278 static ZSTD_DECODER : RefCell<zstd::bulk::Decompressor<'static>> =
279 RefCell::new(zstd::bulk::Decompressor::new().ok().unwrap());
280 }
281
282 /// Util to wrap the reuse of a zstd decoder while controlling its buffer size.
283 fn zstd_decompress_to_buffer(
284 bytes: &[u8],
285 buf: &mut Vec<u8>,
286 ) -> Result<usize, std::io::Error> {
287 ZSTD_DECODER
288 .with(|decoder| decoder.borrow_mut().decompress_to_buffer(bytes, buf))
289 }
290
291 /// Specialized revlog decompression to use less memory for deltas while
292 /// keeping performance acceptable.
293 pub(super) fn uncompressed_zstd_data(
294 bytes: &[u8],
295 is_delta: bool,
296 uncompressed_len: i32,
297 ) -> Result<Vec<u8>, HgError> {
298 let cap = uncompressed_len.max(0) as usize;
299 if is_delta {
300 // [cap] is usually an over-estimate of the space needed because
301 // it's the length of delta-decoded data, but we're interested
302 // in the size of the delta.
303 // This means we have to [shrink_to_fit] to avoid holding on
304 // to a large chunk of memory, but it also means we must have a
305 // fallback branch, for the case when the delta is longer than
306 // the original data (surprisingly, this does happen in practice)
307 let mut buf = Vec::with_capacity(cap);
308 match zstd_decompress_to_buffer(bytes, &mut buf) {
309 Ok(_) => buf.shrink_to_fit(),
310 Err(_) => {
311 buf.clear();
312 zstd::stream::copy_decode(bytes, &mut buf)
313 .map_err(|e| corrupted(e.to_string()))?;
314 }
315 };
316 Ok(buf)
317 } else {
318 let mut buf = Vec::with_capacity(cap);
319 let len = zstd_decompress_to_buffer(bytes, &mut buf)
320 .map_err(|e| corrupted(e.to_string()))?;
321 if len != uncompressed_len as usize {
322 Err(corrupted("uncompressed length does not match"))
323 } else {
324 Ok(buf)
325 }
326 }
327 }
328
329 #[cfg(test)]
330 mod tests {
331 use super::*;
332
333 const LARGE_TEXT: &[u8] = b"
334 Patali Dirapata, Cromda Cromda Ripalo, Pata Pata, Ko Ko Ko
335 Bokoro Dipoulito, Rondi Rondi Pepino, Pata Pata, Ko Ko Ko
336 Emana Karassoli, Loucra Loucra Nonponto, Pata Pata, Ko Ko Ko.";
337
338 #[test]
339 fn test_zlib_compressor() {
340 // Can return `Ok(None)`
341 let mut compressor = ZlibCompressor::new(1);
342 assert_eq!(compressor.compress(b"too small").unwrap(), None);
343
344 // Compression returns bytes
345 let compressed_with_1 =
346 compressor.compress(LARGE_TEXT).unwrap().unwrap();
347 assert!(compressed_with_1.len() < LARGE_TEXT.len());
348 // Round trip works
349 assert_eq!(
350 compressor.decompress(&compressed_with_1).unwrap(),
351 LARGE_TEXT
352 );
353
354 // Compression levels mean something
355 let mut compressor = ZlibCompressor::new(9);
356 // Compression returns bytes
357 let compressed = compressor.compress(LARGE_TEXT).unwrap().unwrap();
358 assert!(compressed.len() < compressed_with_1.len());
359 }
360
361 #[test]
362 fn test_zstd_compressor() {
363 // Can return `Ok(None)`
364 let mut compressor = ZstdCompressor::new(1, 0);
365 assert_eq!(compressor.compress(b"too small").unwrap(), None);
366
367 // Compression returns bytes
368 let compressed_with_1 =
369 compressor.compress(LARGE_TEXT).unwrap().unwrap();
370 assert!(compressed_with_1.len() < LARGE_TEXT.len());
371 // Round trip works
372 assert_eq!(
373 compressor.decompress(&compressed_with_1).unwrap(),
374 LARGE_TEXT
375 );
376
377 // Compression levels mean something
378 let mut compressor = ZstdCompressor::new(22, 0);
379 // Compression returns bytes
380 let compressed = compressor.compress(LARGE_TEXT).unwrap().unwrap();
381 assert!(compressed.len() < compressed_with_1.len());
382 }
383 }
@@ -9,8 +9,10 pub mod node;
9 pub mod nodemap;
9 pub mod nodemap;
10 mod nodemap_docket;
10 mod nodemap_docket;
11 pub mod path_encode;
11 pub mod path_encode;
12 use compression::{uncompressed_zstd_data, CompressionConfig};
12 pub use node::{FromHexError, Node, NodePrefix};
13 pub use node::{FromHexError, Node, NodePrefix};
13 pub mod changelog;
14 pub mod changelog;
15 pub mod compression;
14 pub mod filelog;
16 pub mod filelog;
15 pub mod index;
17 pub mod index;
16 pub mod manifest;
18 pub mod manifest;
@@ -24,8 +26,6 use std::path::Path;
24
26
25 use flate2::read::ZlibDecoder;
27 use flate2::read::ZlibDecoder;
26 use sha1::{Digest, Sha1};
28 use sha1::{Digest, Sha1};
27 use std::cell::RefCell;
28 use zstd;
29
29
30 use self::node::{NODE_BYTES_LENGTH, NULL_NODE};
30 use self::node::{NODE_BYTES_LENGTH, NULL_NODE};
31 use self::nodemap_docket::NodeMapDocket;
31 use self::nodemap_docket::NodeMapDocket;
@@ -258,75 +258,6 impl TryFrom<usize> for RevlogType {
258 }
258 }
259 }
259 }
260
260
261 #[derive(Debug, Copy, Clone, PartialEq, Eq)]
262 pub enum CompressionEngine {
263 Zlib {
264 /// Between 0 and 9 included
265 level: u32,
266 },
267 Zstd {
268 /// Between 0 and 22 included
269 level: u32,
270 /// Never used in practice for now
271 threads: u32,
272 },
273 /// No compression is performed
274 None,
275 }
276 impl CompressionEngine {
277 pub fn set_level(&mut self, new_level: usize) -> Result<(), HgError> {
278 match self {
279 CompressionEngine::Zlib { level } => {
280 if new_level > 9 {
281 return Err(HgError::abort(
282 format!(
283 "invalid compression zlib compression level {}",
284 new_level
285 ),
286 exit_codes::ABORT,
287 None,
288 ));
289 }
290 *level = new_level as u32;
291 }
292 CompressionEngine::Zstd { level, .. } => {
293 if new_level > 22 {
294 return Err(HgError::abort(
295 format!(
296 "invalid compression zstd compression level {}",
297 new_level
298 ),
299 exit_codes::ABORT,
300 None,
301 ));
302 }
303 *level = new_level as u32;
304 }
305 CompressionEngine::None => {}
306 }
307 Ok(())
308 }
309
310 pub fn zstd(
311 zstd_level: Option<u32>,
312 ) -> Result<CompressionEngine, HgError> {
313 let mut engine = CompressionEngine::Zstd {
314 level: 3,
315 threads: 0,
316 };
317 if let Some(level) = zstd_level {
318 engine.set_level(level as usize)?;
319 }
320 Ok(engine)
321 }
322 }
323
324 impl Default for CompressionEngine {
325 fn default() -> Self {
326 Self::Zlib { level: 6 }
327 }
328 }
329
330 #[derive(Debug, Clone, Copy, PartialEq)]
261 #[derive(Debug, Clone, Copy, PartialEq)]
331 /// Holds configuration values about how the revlog data is read
262 /// Holds configuration values about how the revlog data is read
332 pub struct RevlogDataConfig {
263 pub struct RevlogDataConfig {
@@ -546,7 +477,7 impl Default for RevlogDeltaConfig {
546 /// Holds configuration values about the available revlog features
477 /// Holds configuration values about the available revlog features
547 pub struct RevlogFeatureConfig {
478 pub struct RevlogFeatureConfig {
548 /// The compression engine and its options
479 /// The compression engine and its options
549 pub compression_engine: CompressionEngine,
480 pub compression_engine: CompressionConfig,
550 /// Can we use censor on this revlog
481 /// Can we use censor on this revlog
551 pub censorable: bool,
482 pub censorable: bool,
552 /// Does this revlog use the "side data" feature
483 /// Does this revlog use the "side data" feature
@@ -568,46 +499,11 impl RevlogFeatureConfig {
568 config: &Config,
499 config: &Config,
569 requirements: &HashSet<String>,
500 requirements: &HashSet<String>,
570 ) -> Result<Self, HgError> {
501 ) -> Result<Self, HgError> {
571 let mut feature_config = Self::default();
502 Ok(Self {
572
503 compression_engine: CompressionConfig::new(config, requirements)?,
573 let zlib_level = config.get_u32(b"storage", b"revlog.zlib.level")?;
504 enable_ellipsis: requirements.contains(NARROW_REQUIREMENT),
574 let zstd_level = config.get_u32(b"storage", b"revlog.zstd.level")?;
505 ..Default::default()
575
506 })
576 feature_config.compression_engine = CompressionEngine::default();
577
578 for requirement in requirements {
579 if requirement.starts_with("revlog-compression-")
580 || requirement.starts_with("exp-compression-")
581 {
582 let split = &mut requirement.splitn(3, '-');
583 split.next();
584 split.next();
585 feature_config.compression_engine = match split.next().unwrap()
586 {
587 "zstd" => CompressionEngine::zstd(zstd_level)?,
588 e => {
589 return Err(HgError::UnsupportedFeature(format!(
590 "Unsupported compression engine '{e}'"
591 )))
592 }
593 };
594 }
595 }
596 if let Some(level) = zlib_level {
597 if matches!(
598 feature_config.compression_engine,
599 CompressionEngine::Zlib { .. }
600 ) {
601 feature_config
602 .compression_engine
603 .set_level(level as usize)?;
604 }
605 }
606
607 feature_config.enable_ellipsis =
608 requirements.contains(NARROW_REQUIREMENT);
609
610 Ok(feature_config)
611 }
507 }
612 }
508 }
613
509
@@ -1058,21 +954,6 pub struct RevlogEntry<'revlog> {
1058 hash: Node,
954 hash: Node,
1059 }
955 }
1060
956
1061 thread_local! {
1062 // seems fine to [unwrap] here: this can only fail due to memory allocation
1063 // failing, and it's normal for that to cause panic.
1064 static ZSTD_DECODER : RefCell<zstd::bulk::Decompressor<'static>> =
1065 RefCell::new(zstd::bulk::Decompressor::new().ok().unwrap());
1066 }
1067
1068 fn zstd_decompress_to_buffer(
1069 bytes: &[u8],
1070 buf: &mut Vec<u8>,
1071 ) -> Result<usize, std::io::Error> {
1072 ZSTD_DECODER
1073 .with(|decoder| decoder.borrow_mut().decompress_to_buffer(bytes, buf))
1074 }
1075
1076 impl<'revlog> RevlogEntry<'revlog> {
957 impl<'revlog> RevlogEntry<'revlog> {
1077 pub fn revision(&self) -> Revision {
958 pub fn revision(&self) -> Revision {
1078 self.rev
959 self.rev
@@ -1218,7 +1099,11 impl<'revlog> RevlogEntry<'revlog> {
1218 // zlib (RFC 1950) data.
1099 // zlib (RFC 1950) data.
1219 b'x' => Ok(Cow::Owned(self.uncompressed_zlib_data()?)),
1100 b'x' => Ok(Cow::Owned(self.uncompressed_zlib_data()?)),
1220 // zstd data.
1101 // zstd data.
1221 b'\x28' => Ok(Cow::Owned(self.uncompressed_zstd_data()?)),
1102 b'\x28' => Ok(Cow::Owned(uncompressed_zstd_data(
1103 self.bytes,
1104 self.is_delta(),
1105 self.uncompressed_len.max(0),
1106 )?)),
1222 // A proper new format should have had a repo/store requirement.
1107 // A proper new format should have had a repo/store requirement.
1223 format_type => Err(corrupted(format!(
1108 format_type => Err(corrupted(format!(
1224 "unknown compression header '{}'",
1109 "unknown compression header '{}'",
@@ -1245,38 +1130,6 impl<'revlog> RevlogEntry<'revlog> {
1245 }
1130 }
1246 }
1131 }
1247
1132
1248 fn uncompressed_zstd_data(&self) -> Result<Vec<u8>, HgError> {
1249 let cap = self.uncompressed_len.max(0) as usize;
1250 if self.is_delta() {
1251 // [cap] is usually an over-estimate of the space needed because
1252 // it's the length of delta-decoded data, but we're interested
1253 // in the size of the delta.
1254 // This means we have to [shrink_to_fit] to avoid holding on
1255 // to a large chunk of memory, but it also means we must have a
1256 // fallback branch, for the case when the delta is longer than
1257 // the original data (surprisingly, this does happen in practice)
1258 let mut buf = Vec::with_capacity(cap);
1259 match zstd_decompress_to_buffer(self.bytes, &mut buf) {
1260 Ok(_) => buf.shrink_to_fit(),
1261 Err(_) => {
1262 buf.clear();
1263 zstd::stream::copy_decode(self.bytes, &mut buf)
1264 .map_err(|e| corrupted(e.to_string()))?;
1265 }
1266 };
1267 Ok(buf)
1268 } else {
1269 let mut buf = Vec::with_capacity(cap);
1270 let len = zstd_decompress_to_buffer(self.bytes, &mut buf)
1271 .map_err(|e| corrupted(e.to_string()))?;
1272 if len != self.uncompressed_len as usize {
1273 Err(corrupted("uncompressed length does not match"))
1274 } else {
1275 Ok(buf)
1276 }
1277 }
1278 }
1279
1280 /// Tell if the entry is a snapshot or a delta
1133 /// Tell if the entry is a snapshot or a delta
1281 /// (influences on decompression).
1134 /// (influences on decompression).
1282 fn is_delta(&self) -> bool {
1135 fn is_delta(&self) -> bool {
General Comments 0
You need to be logged in to leave comments. Login now