##// END OF EJS Templates
rust-compression: move the `Send` bound to the `Compressor` trait...
Raphaël Gomès -
r53202:f69a3f55 default
parent child Browse files
Show More
@@ -1,383 +1,383
1 //! Helpers around revlog compression
1 //! Helpers around revlog compression
2
2
3 use std::cell::RefCell;
3 use std::cell::RefCell;
4 use std::collections::HashSet;
4 use std::collections::HashSet;
5 use std::io::Read;
5 use std::io::Read;
6
6
7 use flate2::bufread::ZlibEncoder;
7 use flate2::bufread::ZlibEncoder;
8 use flate2::read::ZlibDecoder;
8 use flate2::read::ZlibDecoder;
9
9
10 use crate::config::Config;
10 use crate::config::Config;
11 use crate::errors::HgError;
11 use crate::errors::HgError;
12 use crate::exit_codes;
12 use crate::exit_codes;
13
13
14 use super::corrupted;
14 use super::corrupted;
15 use super::RevlogError;
15 use super::RevlogError;
16
16
17 /// Header byte used to identify ZSTD-compressed data
17 /// Header byte used to identify ZSTD-compressed data
18 pub const ZSTD_BYTE: u8 = b'\x28';
18 pub const ZSTD_BYTE: u8 = b'\x28';
19 /// Header byte used to identify Zlib-compressed data
19 /// Header byte used to identify Zlib-compressed data
20 pub const ZLIB_BYTE: u8 = b'x';
20 pub const ZLIB_BYTE: u8 = b'x';
21
21
22 const ZSTD_DEFAULT_LEVEL: u8 = 3;
22 const ZSTD_DEFAULT_LEVEL: u8 = 3;
23 const ZLIB_DEFAULT_LEVEL: u8 = 6;
23 const ZLIB_DEFAULT_LEVEL: u8 = 6;
24 /// The length of data below which we don't even try to compress it when using
24 /// The length of data below which we don't even try to compress it when using
25 /// Zstandard.
25 /// Zstandard.
26 const MINIMUM_LENGTH_ZSTD: usize = 50;
26 const MINIMUM_LENGTH_ZSTD: usize = 50;
27 /// The length of data below which we don't even try to compress it when using
27 /// The length of data below which we don't even try to compress it when using
28 /// Zlib.
28 /// Zlib.
29 const MINIMUM_LENGTH_ZLIB: usize = 44;
29 const MINIMUM_LENGTH_ZLIB: usize = 44;
30
30
31 /// Defines the available compression engines and their options.
31 /// Defines the available compression engines and their options.
32 #[derive(Debug, Copy, Clone, PartialEq, Eq)]
32 #[derive(Debug, Copy, Clone, PartialEq, Eq)]
33 pub enum CompressionConfig {
33 pub enum CompressionConfig {
34 Zlib {
34 Zlib {
35 /// Between 0 and 9 included
35 /// Between 0 and 9 included
36 level: u8,
36 level: u8,
37 },
37 },
38 Zstd {
38 Zstd {
39 /// Between 0 and 22 included
39 /// Between 0 and 22 included
40 level: u8,
40 level: u8,
41 /// Never used in practice for now
41 /// Never used in practice for now
42 threads: u8,
42 threads: u8,
43 },
43 },
44 /// No compression is performed
44 /// No compression is performed
45 None,
45 None,
46 }
46 }
47
47
48 impl CompressionConfig {
48 impl CompressionConfig {
49 pub fn new(
49 pub fn new(
50 config: &Config,
50 config: &Config,
51 requirements: &HashSet<String>,
51 requirements: &HashSet<String>,
52 ) -> Result<Self, HgError> {
52 ) -> Result<Self, HgError> {
53 let mut new = Self::default();
53 let mut new = Self::default();
54
54
55 let zlib_level = config.get_u32(b"storage", b"revlog.zlib.level")?;
55 let zlib_level = config.get_u32(b"storage", b"revlog.zlib.level")?;
56 let zstd_level = config.get_u32(b"storage", b"revlog.zstd.level")?;
56 let zstd_level = config.get_u32(b"storage", b"revlog.zstd.level")?;
57
57
58 for requirement in requirements {
58 for requirement in requirements {
59 if requirement.starts_with("revlog-compression-")
59 if requirement.starts_with("revlog-compression-")
60 || requirement.starts_with("exp-compression-")
60 || requirement.starts_with("exp-compression-")
61 {
61 {
62 let split = &mut requirement.splitn(3, '-');
62 let split = &mut requirement.splitn(3, '-');
63 split.next();
63 split.next();
64 split.next();
64 split.next();
65 new = match split.next().unwrap() {
65 new = match split.next().unwrap() {
66 "zstd" => CompressionConfig::zstd(zstd_level)?,
66 "zstd" => CompressionConfig::zstd(zstd_level)?,
67 e => {
67 e => {
68 return Err(HgError::UnsupportedFeature(format!(
68 return Err(HgError::UnsupportedFeature(format!(
69 "Unsupported compression engine '{e}'"
69 "Unsupported compression engine '{e}'"
70 )))
70 )))
71 }
71 }
72 };
72 };
73 }
73 }
74 }
74 }
75 if let Some(level) = zlib_level {
75 if let Some(level) = zlib_level {
76 if matches!(new, CompressionConfig::Zlib { .. }) {
76 if matches!(new, CompressionConfig::Zlib { .. }) {
77 new.set_level(level as usize)?;
77 new.set_level(level as usize)?;
78 }
78 }
79 }
79 }
80 Ok(new)
80 Ok(new)
81 }
81 }
82
82
83 /// Sets the level of the current compression engine
83 /// Sets the level of the current compression engine
84 pub fn set_level(&mut self, new_level: usize) -> Result<(), HgError> {
84 pub fn set_level(&mut self, new_level: usize) -> Result<(), HgError> {
85 match self {
85 match self {
86 CompressionConfig::Zlib { level } => {
86 CompressionConfig::Zlib { level } => {
87 if new_level > 9 {
87 if new_level > 9 {
88 return Err(HgError::abort(
88 return Err(HgError::abort(
89 format!(
89 format!(
90 "invalid compression zlib compression level {}, \
90 "invalid compression zlib compression level {}, \
91 expected between 0 and 9 included",
91 expected between 0 and 9 included",
92 new_level
92 new_level
93 ),
93 ),
94 exit_codes::ABORT,
94 exit_codes::ABORT,
95 None,
95 None,
96 ));
96 ));
97 }
97 }
98 *level = new_level as u8;
98 *level = new_level as u8;
99 }
99 }
100 CompressionConfig::Zstd { level, .. } => {
100 CompressionConfig::Zstd { level, .. } => {
101 if new_level > 22 {
101 if new_level > 22 {
102 return Err(HgError::abort(
102 return Err(HgError::abort(
103 format!(
103 format!(
104 "invalid compression zstd compression level {}, \
104 "invalid compression zstd compression level {}, \
105 expected between 0 and 22 included",
105 expected between 0 and 22 included",
106 new_level
106 new_level
107 ),
107 ),
108 exit_codes::ABORT,
108 exit_codes::ABORT,
109 None,
109 None,
110 ));
110 ));
111 }
111 }
112 *level = new_level as u8;
112 *level = new_level as u8;
113 }
113 }
114 CompressionConfig::None => {}
114 CompressionConfig::None => {}
115 }
115 }
116 Ok(())
116 Ok(())
117 }
117 }
118
118
119 /// Return a ZSTD compression config
119 /// Return a ZSTD compression config
120 pub fn zstd(
120 pub fn zstd(
121 zstd_level: Option<u32>,
121 zstd_level: Option<u32>,
122 ) -> Result<CompressionConfig, HgError> {
122 ) -> Result<CompressionConfig, HgError> {
123 let mut engine = CompressionConfig::Zstd {
123 let mut engine = CompressionConfig::Zstd {
124 level: ZSTD_DEFAULT_LEVEL,
124 level: ZSTD_DEFAULT_LEVEL,
125 threads: 0,
125 threads: 0,
126 };
126 };
127 if let Some(level) = zstd_level {
127 if let Some(level) = zstd_level {
128 engine.set_level(level as usize)?;
128 engine.set_level(level as usize)?;
129 }
129 }
130 Ok(engine)
130 Ok(engine)
131 }
131 }
132 }
132 }
133
133
134 impl Default for CompressionConfig {
134 impl Default for CompressionConfig {
135 fn default() -> Self {
135 fn default() -> Self {
136 Self::Zlib {
136 Self::Zlib {
137 level: ZLIB_DEFAULT_LEVEL,
137 level: ZLIB_DEFAULT_LEVEL,
138 }
138 }
139 }
139 }
140 }
140 }
141
141
142 /// A high-level trait to define compressors that should be able to compress
142 /// A high-level trait to define compressors that should be able to compress
143 /// and decompress arbitrary bytes.
143 /// and decompress arbitrary bytes.
144 pub trait Compressor {
144 pub trait Compressor: Send {
145 /// Returns a new [`Vec`] with the compressed data.
145 /// Returns a new [`Vec`] with the compressed data.
146 /// Should return `Ok(None)` if compression does not apply (e.g. too small)
146 /// Should return `Ok(None)` if compression does not apply (e.g. too small)
147 fn compress(
147 fn compress(
148 &mut self,
148 &mut self,
149 data: &[u8],
149 data: &[u8],
150 ) -> Result<Option<Vec<u8>>, RevlogError>;
150 ) -> Result<Option<Vec<u8>>, RevlogError>;
151 /// Returns a new [`Vec`] with the decompressed data.
151 /// Returns a new [`Vec`] with the decompressed data.
152 fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, RevlogError>;
152 fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, RevlogError>;
153 }
153 }
154
154
155 /// A compressor that does nothing (useful in tests)
155 /// A compressor that does nothing (useful in tests)
156 pub struct NoneCompressor;
156 pub struct NoneCompressor;
157
157
158 impl Compressor for NoneCompressor {
158 impl Compressor for NoneCompressor {
159 fn compress(
159 fn compress(
160 &mut self,
160 &mut self,
161 _data: &[u8],
161 _data: &[u8],
162 ) -> Result<Option<Vec<u8>>, RevlogError> {
162 ) -> Result<Option<Vec<u8>>, RevlogError> {
163 Ok(None)
163 Ok(None)
164 }
164 }
165
165
166 fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, RevlogError> {
166 fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, RevlogError> {
167 Ok(data.to_owned())
167 Ok(data.to_owned())
168 }
168 }
169 }
169 }
170
170
171 /// A compressor for Zstandard
171 /// A compressor for Zstandard
172 pub struct ZstdCompressor {
172 pub struct ZstdCompressor {
173 /// Level of compression to use
173 /// Level of compression to use
174 level: u8,
174 level: u8,
175 /// How many threads are used (not implemented yet)
175 /// How many threads are used (not implemented yet)
176 threads: u8,
176 threads: u8,
177 /// The underlying zstd compressor
177 /// The underlying zstd compressor
178 compressor: zstd::bulk::Compressor<'static>,
178 compressor: zstd::bulk::Compressor<'static>,
179 }
179 }
180
180
181 impl ZstdCompressor {
181 impl ZstdCompressor {
182 pub fn new(level: u8, threads: u8) -> Self {
182 pub fn new(level: u8, threads: u8) -> Self {
183 Self {
183 Self {
184 level,
184 level,
185 threads,
185 threads,
186 compressor: zstd::bulk::Compressor::new(level.into())
186 compressor: zstd::bulk::Compressor::new(level.into())
187 .expect("invalid zstd arguments"),
187 .expect("invalid zstd arguments"),
188 }
188 }
189 }
189 }
190 }
190 }
191
191
192 impl Compressor for ZstdCompressor {
192 impl Compressor for ZstdCompressor {
193 fn compress(
193 fn compress(
194 &mut self,
194 &mut self,
195 data: &[u8],
195 data: &[u8],
196 ) -> Result<Option<Vec<u8>>, RevlogError> {
196 ) -> Result<Option<Vec<u8>>, RevlogError> {
197 if self.threads != 0 {
197 if self.threads != 0 {
198 // TODO use a zstd builder + zstd cargo feature to support this
198 // TODO use a zstd builder + zstd cargo feature to support this
199 unimplemented!("zstd parallel compression is not implemented");
199 unimplemented!("zstd parallel compression is not implemented");
200 }
200 }
201 if data.len() < MINIMUM_LENGTH_ZSTD {
201 if data.len() < MINIMUM_LENGTH_ZSTD {
202 return Ok(None);
202 return Ok(None);
203 }
203 }
204 let level = self.level as i32;
204 let level = self.level as i32;
205 if data.len() <= 1000000 {
205 if data.len() <= 1000000 {
206 let compressed = self.compressor.compress(data).map_err(|e| {
206 let compressed = self.compressor.compress(data).map_err(|e| {
207 corrupted(format!("revlog compress error: {}", e))
207 corrupted(format!("revlog compress error: {}", e))
208 })?;
208 })?;
209 Ok(if compressed.len() < data.len() {
209 Ok(if compressed.len() < data.len() {
210 Some(compressed)
210 Some(compressed)
211 } else {
211 } else {
212 None
212 None
213 })
213 })
214 } else {
214 } else {
215 Ok(Some(zstd::stream::encode_all(data, level).map_err(
215 Ok(Some(zstd::stream::encode_all(data, level).map_err(
216 |e| corrupted(format!("revlog compress error: {}", e)),
216 |e| corrupted(format!("revlog compress error: {}", e)),
217 )?))
217 )?))
218 }
218 }
219 }
219 }
220
220
221 fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, RevlogError> {
221 fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, RevlogError> {
222 zstd::stream::decode_all(data).map_err(|e| {
222 zstd::stream::decode_all(data).map_err(|e| {
223 corrupted(format!("revlog decompress error: {}", e)).into()
223 corrupted(format!("revlog decompress error: {}", e)).into()
224 })
224 })
225 }
225 }
226 }
226 }
227
227
228 /// A compressor for Zlib
228 /// A compressor for Zlib
229 pub struct ZlibCompressor {
229 pub struct ZlibCompressor {
230 /// Level of compression to use
230 /// Level of compression to use
231 level: flate2::Compression,
231 level: flate2::Compression,
232 }
232 }
233
233
234 impl ZlibCompressor {
234 impl ZlibCompressor {
235 pub fn new(level: u8) -> Self {
235 pub fn new(level: u8) -> Self {
236 Self {
236 Self {
237 level: flate2::Compression::new(level.into()),
237 level: flate2::Compression::new(level.into()),
238 }
238 }
239 }
239 }
240 }
240 }
241
241
242 impl Compressor for ZlibCompressor {
242 impl Compressor for ZlibCompressor {
243 fn compress(
243 fn compress(
244 &mut self,
244 &mut self,
245 data: &[u8],
245 data: &[u8],
246 ) -> Result<Option<Vec<u8>>, RevlogError> {
246 ) -> Result<Option<Vec<u8>>, RevlogError> {
247 assert!(!data.is_empty());
247 assert!(!data.is_empty());
248 if data.len() < MINIMUM_LENGTH_ZLIB {
248 if data.len() < MINIMUM_LENGTH_ZLIB {
249 return Ok(None);
249 return Ok(None);
250 }
250 }
251 let mut buf = Vec::with_capacity(data.len());
251 let mut buf = Vec::with_capacity(data.len());
252 ZlibEncoder::new(data, self.level)
252 ZlibEncoder::new(data, self.level)
253 .read_to_end(&mut buf)
253 .read_to_end(&mut buf)
254 .map_err(|e| corrupted(format!("revlog compress error: {}", e)))?;
254 .map_err(|e| corrupted(format!("revlog compress error: {}", e)))?;
255
255
256 Ok(if buf.len() < data.len() {
256 Ok(if buf.len() < data.len() {
257 buf.shrink_to_fit();
257 buf.shrink_to_fit();
258 Some(buf)
258 Some(buf)
259 } else {
259 } else {
260 None
260 None
261 })
261 })
262 }
262 }
263
263
264 fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, RevlogError> {
264 fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, RevlogError> {
265 let mut decoder = ZlibDecoder::new(data);
265 let mut decoder = ZlibDecoder::new(data);
266 // TODO reuse the allocation somehow?
266 // TODO reuse the allocation somehow?
267 let mut buf = vec![];
267 let mut buf = vec![];
268 decoder.read_to_end(&mut buf).map_err(|e| {
268 decoder.read_to_end(&mut buf).map_err(|e| {
269 corrupted(format!("revlog decompress error: {}", e))
269 corrupted(format!("revlog decompress error: {}", e))
270 })?;
270 })?;
271 Ok(buf)
271 Ok(buf)
272 }
272 }
273 }
273 }
274
274
275 thread_local! {
275 thread_local! {
276 // seems fine to [unwrap] here: this can only fail due to memory allocation
276 // seems fine to [unwrap] here: this can only fail due to memory allocation
277 // failing, and it's normal for that to cause panic.
277 // failing, and it's normal for that to cause panic.
278 static ZSTD_DECODER : RefCell<zstd::bulk::Decompressor<'static>> =
278 static ZSTD_DECODER : RefCell<zstd::bulk::Decompressor<'static>> =
279 RefCell::new(zstd::bulk::Decompressor::new().ok().unwrap());
279 RefCell::new(zstd::bulk::Decompressor::new().ok().unwrap());
280 }
280 }
281
281
282 /// Util to wrap the reuse of a zstd decoder while controlling its buffer size.
282 /// Util to wrap the reuse of a zstd decoder while controlling its buffer size.
283 fn zstd_decompress_to_buffer(
283 fn zstd_decompress_to_buffer(
284 bytes: &[u8],
284 bytes: &[u8],
285 buf: &mut Vec<u8>,
285 buf: &mut Vec<u8>,
286 ) -> Result<usize, std::io::Error> {
286 ) -> Result<usize, std::io::Error> {
287 ZSTD_DECODER
287 ZSTD_DECODER
288 .with(|decoder| decoder.borrow_mut().decompress_to_buffer(bytes, buf))
288 .with(|decoder| decoder.borrow_mut().decompress_to_buffer(bytes, buf))
289 }
289 }
290
290
291 /// Specialized revlog decompression to use less memory for deltas while
291 /// Specialized revlog decompression to use less memory for deltas while
292 /// keeping performance acceptable.
292 /// keeping performance acceptable.
293 pub(super) fn uncompressed_zstd_data(
293 pub(super) fn uncompressed_zstd_data(
294 bytes: &[u8],
294 bytes: &[u8],
295 is_delta: bool,
295 is_delta: bool,
296 uncompressed_len: i32,
296 uncompressed_len: i32,
297 ) -> Result<Vec<u8>, HgError> {
297 ) -> Result<Vec<u8>, HgError> {
298 let cap = uncompressed_len.max(0) as usize;
298 let cap = uncompressed_len.max(0) as usize;
299 if is_delta {
299 if is_delta {
300 // [cap] is usually an over-estimate of the space needed because
300 // [cap] is usually an over-estimate of the space needed because
301 // it's the length of delta-decoded data, but we're interested
301 // it's the length of delta-decoded data, but we're interested
302 // in the size of the delta.
302 // in the size of the delta.
303 // This means we have to [shrink_to_fit] to avoid holding on
303 // This means we have to [shrink_to_fit] to avoid holding on
304 // to a large chunk of memory, but it also means we must have a
304 // to a large chunk of memory, but it also means we must have a
305 // fallback branch, for the case when the delta is longer than
305 // fallback branch, for the case when the delta is longer than
306 // the original data (surprisingly, this does happen in practice)
306 // the original data (surprisingly, this does happen in practice)
307 let mut buf = Vec::with_capacity(cap);
307 let mut buf = Vec::with_capacity(cap);
308 match zstd_decompress_to_buffer(bytes, &mut buf) {
308 match zstd_decompress_to_buffer(bytes, &mut buf) {
309 Ok(_) => buf.shrink_to_fit(),
309 Ok(_) => buf.shrink_to_fit(),
310 Err(_) => {
310 Err(_) => {
311 buf.clear();
311 buf.clear();
312 zstd::stream::copy_decode(bytes, &mut buf)
312 zstd::stream::copy_decode(bytes, &mut buf)
313 .map_err(|e| corrupted(e.to_string()))?;
313 .map_err(|e| corrupted(e.to_string()))?;
314 }
314 }
315 };
315 };
316 Ok(buf)
316 Ok(buf)
317 } else {
317 } else {
318 let mut buf = Vec::with_capacity(cap);
318 let mut buf = Vec::with_capacity(cap);
319 let len = zstd_decompress_to_buffer(bytes, &mut buf)
319 let len = zstd_decompress_to_buffer(bytes, &mut buf)
320 .map_err(|e| corrupted(e.to_string()))?;
320 .map_err(|e| corrupted(e.to_string()))?;
321 if len != uncompressed_len as usize {
321 if len != uncompressed_len as usize {
322 Err(corrupted("uncompressed length does not match"))
322 Err(corrupted("uncompressed length does not match"))
323 } else {
323 } else {
324 Ok(buf)
324 Ok(buf)
325 }
325 }
326 }
326 }
327 }
327 }
328
328
329 #[cfg(test)]
329 #[cfg(test)]
330 mod tests {
330 mod tests {
331 use super::*;
331 use super::*;
332
332
333 const LARGE_TEXT: &[u8] = b"
333 const LARGE_TEXT: &[u8] = b"
334 Patali Dirapata, Cromda Cromda Ripalo, Pata Pata, Ko Ko Ko
334 Patali Dirapata, Cromda Cromda Ripalo, Pata Pata, Ko Ko Ko
335 Bokoro Dipoulito, Rondi Rondi Pepino, Pata Pata, Ko Ko Ko
335 Bokoro Dipoulito, Rondi Rondi Pepino, Pata Pata, Ko Ko Ko
336 Emana Karassoli, Loucra Loucra Nonponto, Pata Pata, Ko Ko Ko.";
336 Emana Karassoli, Loucra Loucra Nonponto, Pata Pata, Ko Ko Ko.";
337
337
338 #[test]
338 #[test]
339 fn test_zlib_compressor() {
339 fn test_zlib_compressor() {
340 // Can return `Ok(None)`
340 // Can return `Ok(None)`
341 let mut compressor = ZlibCompressor::new(1);
341 let mut compressor = ZlibCompressor::new(1);
342 assert_eq!(compressor.compress(b"too small").unwrap(), None);
342 assert_eq!(compressor.compress(b"too small").unwrap(), None);
343
343
344 // Compression returns bytes
344 // Compression returns bytes
345 let compressed_with_1 =
345 let compressed_with_1 =
346 compressor.compress(LARGE_TEXT).unwrap().unwrap();
346 compressor.compress(LARGE_TEXT).unwrap().unwrap();
347 assert!(compressed_with_1.len() < LARGE_TEXT.len());
347 assert!(compressed_with_1.len() < LARGE_TEXT.len());
348 // Round trip works
348 // Round trip works
349 assert_eq!(
349 assert_eq!(
350 compressor.decompress(&compressed_with_1).unwrap(),
350 compressor.decompress(&compressed_with_1).unwrap(),
351 LARGE_TEXT
351 LARGE_TEXT
352 );
352 );
353
353
354 // Compression levels mean something
354 // Compression levels mean something
355 let mut compressor = ZlibCompressor::new(9);
355 let mut compressor = ZlibCompressor::new(9);
356 // Compression returns bytes
356 // Compression returns bytes
357 let compressed = compressor.compress(LARGE_TEXT).unwrap().unwrap();
357 let compressed = compressor.compress(LARGE_TEXT).unwrap().unwrap();
358 assert!(compressed.len() < compressed_with_1.len());
358 assert!(compressed.len() < compressed_with_1.len());
359 }
359 }
360
360
361 #[test]
361 #[test]
362 fn test_zstd_compressor() {
362 fn test_zstd_compressor() {
363 // Can return `Ok(None)`
363 // Can return `Ok(None)`
364 let mut compressor = ZstdCompressor::new(1, 0);
364 let mut compressor = ZstdCompressor::new(1, 0);
365 assert_eq!(compressor.compress(b"too small").unwrap(), None);
365 assert_eq!(compressor.compress(b"too small").unwrap(), None);
366
366
367 // Compression returns bytes
367 // Compression returns bytes
368 let compressed_with_1 =
368 let compressed_with_1 =
369 compressor.compress(LARGE_TEXT).unwrap().unwrap();
369 compressor.compress(LARGE_TEXT).unwrap().unwrap();
370 assert!(compressed_with_1.len() < LARGE_TEXT.len());
370 assert!(compressed_with_1.len() < LARGE_TEXT.len());
371 // Round trip works
371 // Round trip works
372 assert_eq!(
372 assert_eq!(
373 compressor.decompress(&compressed_with_1).unwrap(),
373 compressor.decompress(&compressed_with_1).unwrap(),
374 LARGE_TEXT
374 LARGE_TEXT
375 );
375 );
376
376
377 // Compression levels mean something
377 // Compression levels mean something
378 let mut compressor = ZstdCompressor::new(22, 0);
378 let mut compressor = ZstdCompressor::new(22, 0);
379 // Compression returns bytes
379 // Compression returns bytes
380 let compressed = compressor.compress(LARGE_TEXT).unwrap().unwrap();
380 let compressed = compressor.compress(LARGE_TEXT).unwrap().unwrap();
381 assert!(compressed.len() < compressed_with_1.len());
381 assert!(compressed.len() < compressed_with_1.len());
382 }
382 }
383 }
383 }
@@ -1,1352 +1,1352
1 //! A layer of lower-level revlog functionality to encapsulate most of the
1 //! A layer of lower-level revlog functionality to encapsulate most of the
2 //! IO work and expensive operations.
2 //! IO work and expensive operations.
3 use std::{
3 use std::{
4 borrow::Cow,
4 borrow::Cow,
5 cell::RefCell,
5 cell::RefCell,
6 io::{ErrorKind, Seek, SeekFrom, Write},
6 io::{ErrorKind, Seek, SeekFrom, Write},
7 ops::Deref,
7 ops::Deref,
8 path::PathBuf,
8 path::PathBuf,
9 sync::{Arc, Mutex},
9 sync::{Arc, Mutex},
10 };
10 };
11
11
12 use schnellru::{ByMemoryUsage, LruMap};
12 use schnellru::{ByMemoryUsage, LruMap};
13 use sha1::{Digest, Sha1};
13 use sha1::{Digest, Sha1};
14
14
15 use crate::{
15 use crate::{
16 errors::{HgError, IoResultExt},
16 errors::{HgError, IoResultExt},
17 exit_codes,
17 exit_codes,
18 transaction::Transaction,
18 transaction::Transaction,
19 vfs::Vfs,
19 vfs::Vfs,
20 };
20 };
21
21
22 use super::{
22 use super::{
23 compression::{
23 compression::{
24 uncompressed_zstd_data, CompressionConfig, Compressor, NoneCompressor,
24 uncompressed_zstd_data, CompressionConfig, Compressor, NoneCompressor,
25 ZlibCompressor, ZstdCompressor, ZLIB_BYTE, ZSTD_BYTE,
25 ZlibCompressor, ZstdCompressor, ZLIB_BYTE, ZSTD_BYTE,
26 },
26 },
27 file_io::{DelayedBuffer, FileHandle, RandomAccessFile, WriteHandles},
27 file_io::{DelayedBuffer, FileHandle, RandomAccessFile, WriteHandles},
28 index::{Index, IndexHeader, INDEX_ENTRY_SIZE},
28 index::{Index, IndexHeader, INDEX_ENTRY_SIZE},
29 node::{NODE_BYTES_LENGTH, NULL_NODE},
29 node::{NODE_BYTES_LENGTH, NULL_NODE},
30 options::{RevlogDataConfig, RevlogDeltaConfig, RevlogFeatureConfig},
30 options::{RevlogDataConfig, RevlogDeltaConfig, RevlogFeatureConfig},
31 BaseRevision, Node, Revision, RevlogEntry, RevlogError, RevlogIndex,
31 BaseRevision, Node, Revision, RevlogEntry, RevlogError, RevlogIndex,
32 UncheckedRevision, NULL_REVISION, NULL_REVLOG_ENTRY_FLAGS,
32 UncheckedRevision, NULL_REVISION, NULL_REVLOG_ENTRY_FLAGS,
33 };
33 };
34
34
35 /// Matches the `_InnerRevlog` class in the Python code, as an arbitrary
35 /// Matches the `_InnerRevlog` class in the Python code, as an arbitrary
36 /// boundary to incrementally rewrite higher-level revlog functionality in
36 /// boundary to incrementally rewrite higher-level revlog functionality in
37 /// Rust.
37 /// Rust.
38 pub struct InnerRevlog {
38 pub struct InnerRevlog {
39 /// When index and data are not interleaved: bytes of the revlog index.
39 /// When index and data are not interleaved: bytes of the revlog index.
40 /// When index and data are interleaved (inline revlog): bytes of the
40 /// When index and data are interleaved (inline revlog): bytes of the
41 /// revlog index and data.
41 /// revlog index and data.
42 pub index: Index,
42 pub index: Index,
43 /// The store vfs that is used to interact with the filesystem
43 /// The store vfs that is used to interact with the filesystem
44 vfs: Box<dyn Vfs>,
44 vfs: Box<dyn Vfs>,
45 /// The index file path, relative to the vfs root
45 /// The index file path, relative to the vfs root
46 pub index_file: PathBuf,
46 pub index_file: PathBuf,
47 /// The data file path, relative to the vfs root (same as `index_file`
47 /// The data file path, relative to the vfs root (same as `index_file`
48 /// if inline)
48 /// if inline)
49 data_file: PathBuf,
49 data_file: PathBuf,
50 /// Data config that applies to this revlog
50 /// Data config that applies to this revlog
51 data_config: RevlogDataConfig,
51 data_config: RevlogDataConfig,
52 /// Delta config that applies to this revlog
52 /// Delta config that applies to this revlog
53 delta_config: RevlogDeltaConfig,
53 delta_config: RevlogDeltaConfig,
54 /// Feature config that applies to this revlog
54 /// Feature config that applies to this revlog
55 #[allow(unused)]
55 #[allow(unused)]
56 feature_config: RevlogFeatureConfig,
56 feature_config: RevlogFeatureConfig,
57 /// A view into this revlog's data file
57 /// A view into this revlog's data file
58 segment_file: RandomAccessFile,
58 segment_file: RandomAccessFile,
59 /// A cache of uncompressed chunks that have previously been restored.
59 /// A cache of uncompressed chunks that have previously been restored.
60 /// Its eviction policy is defined in [`Self::new`].
60 /// Its eviction policy is defined in [`Self::new`].
61 uncompressed_chunk_cache: Option<UncompressedChunkCache>,
61 uncompressed_chunk_cache: Option<UncompressedChunkCache>,
62 /// Used to keep track of the actual target during diverted writes
62 /// Used to keep track of the actual target during diverted writes
63 /// for the changelog
63 /// for the changelog
64 original_index_file: Option<PathBuf>,
64 original_index_file: Option<PathBuf>,
65 /// Write handles to the index and data files
65 /// Write handles to the index and data files
66 /// XXX why duplicate from `index` and `segment_file`?
66 /// XXX why duplicate from `index` and `segment_file`?
67 writing_handles: Option<WriteHandles>,
67 writing_handles: Option<WriteHandles>,
68 /// See [`DelayedBuffer`].
68 /// See [`DelayedBuffer`].
69 delayed_buffer: Option<Arc<Mutex<DelayedBuffer>>>,
69 delayed_buffer: Option<Arc<Mutex<DelayedBuffer>>>,
70 /// Whether this revlog is inline. XXX why duplicate from `index`?
70 /// Whether this revlog is inline. XXX why duplicate from `index`?
71 pub inline: bool,
71 pub inline: bool,
72 /// A cache of the last revision, which is usually accessed multiple
72 /// A cache of the last revision, which is usually accessed multiple
73 /// times.
73 /// times.
74 pub last_revision_cache: Mutex<Option<SingleRevisionCache>>,
74 pub last_revision_cache: Mutex<Option<SingleRevisionCache>>,
75 /// The [`Compressor`] that this revlog uses by default to compress data.
75 /// The [`Compressor`] that this revlog uses by default to compress data.
76 /// This does not mean that this revlog uses this compressor for reading
76 /// This does not mean that this revlog uses this compressor for reading
77 /// data, as different revisions may have different compression modes.
77 /// data, as different revisions may have different compression modes.
78 compressor: Mutex<Box<dyn Compressor + Send>>,
78 compressor: Mutex<Box<dyn Compressor>>,
79 }
79 }
80
80
81 impl InnerRevlog {
81 impl InnerRevlog {
82 pub fn new(
82 pub fn new(
83 vfs: Box<dyn Vfs>,
83 vfs: Box<dyn Vfs>,
84 index: Index,
84 index: Index,
85 index_file: PathBuf,
85 index_file: PathBuf,
86 data_file: PathBuf,
86 data_file: PathBuf,
87 data_config: RevlogDataConfig,
87 data_config: RevlogDataConfig,
88 delta_config: RevlogDeltaConfig,
88 delta_config: RevlogDeltaConfig,
89 feature_config: RevlogFeatureConfig,
89 feature_config: RevlogFeatureConfig,
90 ) -> Self {
90 ) -> Self {
91 assert!(index_file.is_relative());
91 assert!(index_file.is_relative());
92 assert!(data_file.is_relative());
92 assert!(data_file.is_relative());
93 let segment_file = RandomAccessFile::new(
93 let segment_file = RandomAccessFile::new(
94 dyn_clone::clone_box(&*vfs),
94 dyn_clone::clone_box(&*vfs),
95 if index.is_inline() {
95 if index.is_inline() {
96 index_file.to_owned()
96 index_file.to_owned()
97 } else {
97 } else {
98 data_file.to_owned()
98 data_file.to_owned()
99 },
99 },
100 );
100 );
101
101
102 let uncompressed_chunk_cache =
102 let uncompressed_chunk_cache =
103 data_config.uncompressed_cache_factor.map(
103 data_config.uncompressed_cache_factor.map(
104 // Arbitrary initial value
104 // Arbitrary initial value
105 // TODO check if using a hasher specific to integers is useful
105 // TODO check if using a hasher specific to integers is useful
106 |_factor| RefCell::new(LruMap::with_memory_budget(65536)),
106 |_factor| RefCell::new(LruMap::with_memory_budget(65536)),
107 );
107 );
108
108
109 let inline = index.is_inline();
109 let inline = index.is_inline();
110 Self {
110 Self {
111 index,
111 index,
112 vfs,
112 vfs,
113 index_file,
113 index_file,
114 data_file,
114 data_file,
115 data_config,
115 data_config,
116 delta_config,
116 delta_config,
117 feature_config,
117 feature_config,
118 segment_file,
118 segment_file,
119 uncompressed_chunk_cache,
119 uncompressed_chunk_cache,
120 original_index_file: None,
120 original_index_file: None,
121 writing_handles: None,
121 writing_handles: None,
122 delayed_buffer: None,
122 delayed_buffer: None,
123 inline,
123 inline,
124 last_revision_cache: Mutex::new(None),
124 last_revision_cache: Mutex::new(None),
125 compressor: Mutex::new(match feature_config.compression_engine {
125 compressor: Mutex::new(match feature_config.compression_engine {
126 CompressionConfig::Zlib { level } => {
126 CompressionConfig::Zlib { level } => {
127 Box::new(ZlibCompressor::new(level))
127 Box::new(ZlibCompressor::new(level))
128 }
128 }
129 CompressionConfig::Zstd { level, threads } => {
129 CompressionConfig::Zstd { level, threads } => {
130 Box::new(ZstdCompressor::new(level, threads))
130 Box::new(ZstdCompressor::new(level, threads))
131 }
131 }
132 CompressionConfig::None => Box::new(NoneCompressor),
132 CompressionConfig::None => Box::new(NoneCompressor),
133 }),
133 }),
134 }
134 }
135 }
135 }
136
136
137 /// Return number of entries of the revlog index
137 /// Return number of entries of the revlog index
138 pub fn len(&self) -> usize {
138 pub fn len(&self) -> usize {
139 self.index.len()
139 self.index.len()
140 }
140 }
141
141
142 /// Return `true` if this revlog has no entries
142 /// Return `true` if this revlog has no entries
143 pub fn is_empty(&self) -> bool {
143 pub fn is_empty(&self) -> bool {
144 self.len() == 0
144 self.len() == 0
145 }
145 }
146
146
147 /// Return whether this revlog is inline (mixed index and data)
147 /// Return whether this revlog is inline (mixed index and data)
148 pub fn is_inline(&self) -> bool {
148 pub fn is_inline(&self) -> bool {
149 self.inline
149 self.inline
150 }
150 }
151
151
152 /// Clear all caches from this revlog
152 /// Clear all caches from this revlog
153 pub fn clear_cache(&mut self) {
153 pub fn clear_cache(&mut self) {
154 assert!(!self.is_delaying());
154 assert!(!self.is_delaying());
155 if let Some(cache) = self.uncompressed_chunk_cache.as_ref() {
155 if let Some(cache) = self.uncompressed_chunk_cache.as_ref() {
156 // We don't clear the allocation here because it's probably faster.
156 // We don't clear the allocation here because it's probably faster.
157 // We could change our minds later if this ends up being a problem
157 // We could change our minds later if this ends up being a problem
158 // with regards to memory consumption.
158 // with regards to memory consumption.
159 cache.borrow_mut().clear();
159 cache.borrow_mut().clear();
160 }
160 }
161 }
161 }
162
162
163 /// Return an entry for the null revision
163 /// Return an entry for the null revision
164 pub fn make_null_entry(&self) -> RevlogEntry {
164 pub fn make_null_entry(&self) -> RevlogEntry {
165 RevlogEntry {
165 RevlogEntry {
166 revlog: self,
166 revlog: self,
167 rev: NULL_REVISION,
167 rev: NULL_REVISION,
168 uncompressed_len: 0,
168 uncompressed_len: 0,
169 p1: NULL_REVISION,
169 p1: NULL_REVISION,
170 p2: NULL_REVISION,
170 p2: NULL_REVISION,
171 flags: NULL_REVLOG_ENTRY_FLAGS,
171 flags: NULL_REVLOG_ENTRY_FLAGS,
172 hash: NULL_NODE,
172 hash: NULL_NODE,
173 }
173 }
174 }
174 }
175
175
176 /// Return the [`RevlogEntry`] for a [`Revision`] that is known to exist
176 /// Return the [`RevlogEntry`] for a [`Revision`] that is known to exist
177 pub fn get_entry(
177 pub fn get_entry(
178 &self,
178 &self,
179 rev: Revision,
179 rev: Revision,
180 ) -> Result<RevlogEntry, RevlogError> {
180 ) -> Result<RevlogEntry, RevlogError> {
181 if rev == NULL_REVISION {
181 if rev == NULL_REVISION {
182 return Ok(self.make_null_entry());
182 return Ok(self.make_null_entry());
183 }
183 }
184 let index_entry = self
184 let index_entry = self
185 .index
185 .index
186 .get_entry(rev)
186 .get_entry(rev)
187 .ok_or_else(|| RevlogError::InvalidRevision(rev.to_string()))?;
187 .ok_or_else(|| RevlogError::InvalidRevision(rev.to_string()))?;
188 let p1 =
188 let p1 =
189 self.index.check_revision(index_entry.p1()).ok_or_else(|| {
189 self.index.check_revision(index_entry.p1()).ok_or_else(|| {
190 RevlogError::corrupted(format!(
190 RevlogError::corrupted(format!(
191 "p1 for rev {} is invalid",
191 "p1 for rev {} is invalid",
192 rev
192 rev
193 ))
193 ))
194 })?;
194 })?;
195 let p2 =
195 let p2 =
196 self.index.check_revision(index_entry.p2()).ok_or_else(|| {
196 self.index.check_revision(index_entry.p2()).ok_or_else(|| {
197 RevlogError::corrupted(format!(
197 RevlogError::corrupted(format!(
198 "p2 for rev {} is invalid",
198 "p2 for rev {} is invalid",
199 rev
199 rev
200 ))
200 ))
201 })?;
201 })?;
202 let entry = RevlogEntry {
202 let entry = RevlogEntry {
203 revlog: self,
203 revlog: self,
204 rev,
204 rev,
205 uncompressed_len: index_entry.uncompressed_len(),
205 uncompressed_len: index_entry.uncompressed_len(),
206 p1,
206 p1,
207 p2,
207 p2,
208 flags: index_entry.flags(),
208 flags: index_entry.flags(),
209 hash: *index_entry.hash(),
209 hash: *index_entry.hash(),
210 };
210 };
211 Ok(entry)
211 Ok(entry)
212 }
212 }
213
213
214 /// Return the [`RevlogEntry`] for `rev`. If `rev` fails to check, this
214 /// Return the [`RevlogEntry`] for `rev`. If `rev` fails to check, this
215 /// returns a [`RevlogError`].
215 /// returns a [`RevlogError`].
216 pub fn get_entry_for_unchecked_rev(
216 pub fn get_entry_for_unchecked_rev(
217 &self,
217 &self,
218 rev: UncheckedRevision,
218 rev: UncheckedRevision,
219 ) -> Result<RevlogEntry, RevlogError> {
219 ) -> Result<RevlogEntry, RevlogError> {
220 if rev == NULL_REVISION.into() {
220 if rev == NULL_REVISION.into() {
221 return Ok(self.make_null_entry());
221 return Ok(self.make_null_entry());
222 }
222 }
223 let rev = self.index.check_revision(rev).ok_or_else(|| {
223 let rev = self.index.check_revision(rev).ok_or_else(|| {
224 RevlogError::corrupted(format!("rev {} is invalid", rev))
224 RevlogError::corrupted(format!("rev {} is invalid", rev))
225 })?;
225 })?;
226 self.get_entry(rev)
226 self.get_entry(rev)
227 }
227 }
228
228
229 /// Is the revlog currently delaying the visibility of written data?
229 /// Is the revlog currently delaying the visibility of written data?
230 ///
230 ///
231 /// The delaying mechanism can be either in-memory or written on disk in a
231 /// The delaying mechanism can be either in-memory or written on disk in a
232 /// side-file.
232 /// side-file.
233 pub fn is_delaying(&self) -> bool {
233 pub fn is_delaying(&self) -> bool {
234 self.delayed_buffer.is_some() || self.original_index_file.is_some()
234 self.delayed_buffer.is_some() || self.original_index_file.is_some()
235 }
235 }
236
236
237 /// The offset of the data chunk for this revision
237 /// The offset of the data chunk for this revision
238 #[inline(always)]
238 #[inline(always)]
239 pub fn data_start(&self, rev: Revision) -> usize {
239 pub fn data_start(&self, rev: Revision) -> usize {
240 self.index.start(
240 self.index.start(
241 rev,
241 rev,
242 &self
242 &self
243 .index
243 .index
244 .get_entry(rev)
244 .get_entry(rev)
245 .unwrap_or_else(|| self.index.make_null_entry()),
245 .unwrap_or_else(|| self.index.make_null_entry()),
246 )
246 )
247 }
247 }
248
248
249 /// The length of the data chunk for this revision
249 /// The length of the data chunk for this revision
250 #[inline(always)]
250 #[inline(always)]
251 pub fn data_compressed_length(&self, rev: Revision) -> usize {
251 pub fn data_compressed_length(&self, rev: Revision) -> usize {
252 self.index
252 self.index
253 .get_entry(rev)
253 .get_entry(rev)
254 .unwrap_or_else(|| self.index.make_null_entry())
254 .unwrap_or_else(|| self.index.make_null_entry())
255 .compressed_len() as usize
255 .compressed_len() as usize
256 }
256 }
257
257
258 /// The end of the data chunk for this revision
258 /// The end of the data chunk for this revision
259 #[inline(always)]
259 #[inline(always)]
260 pub fn data_end(&self, rev: Revision) -> usize {
260 pub fn data_end(&self, rev: Revision) -> usize {
261 self.data_start(rev) + self.data_compressed_length(rev)
261 self.data_start(rev) + self.data_compressed_length(rev)
262 }
262 }
263
263
264 /// Return the delta parent of the given revision
264 /// Return the delta parent of the given revision
265 pub fn delta_parent(&self, rev: Revision) -> Revision {
265 pub fn delta_parent(&self, rev: Revision) -> Revision {
266 let base = self
266 let base = self
267 .index
267 .index
268 .get_entry(rev)
268 .get_entry(rev)
269 .unwrap()
269 .unwrap()
270 .base_revision_or_base_of_delta_chain();
270 .base_revision_or_base_of_delta_chain();
271 if base.0 == rev.0 {
271 if base.0 == rev.0 {
272 NULL_REVISION
272 NULL_REVISION
273 } else if self.delta_config.general_delta {
273 } else if self.delta_config.general_delta {
274 Revision(base.0)
274 Revision(base.0)
275 } else {
275 } else {
276 Revision(rev.0 - 1)
276 Revision(rev.0 - 1)
277 }
277 }
278 }
278 }
279
279
280 /// Return whether `rev` points to a snapshot revision (i.e. does not have
280 /// Return whether `rev` points to a snapshot revision (i.e. does not have
281 /// a delta base).
281 /// a delta base).
282 pub fn is_snapshot(&self, rev: Revision) -> Result<bool, RevlogError> {
282 pub fn is_snapshot(&self, rev: Revision) -> Result<bool, RevlogError> {
283 if !self.delta_config.sparse_revlog {
283 if !self.delta_config.sparse_revlog {
284 return Ok(self.delta_parent(rev) == NULL_REVISION);
284 return Ok(self.delta_parent(rev) == NULL_REVISION);
285 }
285 }
286 self.index.is_snapshot_unchecked(rev)
286 self.index.is_snapshot_unchecked(rev)
287 }
287 }
288
288
289 /// Return the delta chain for `rev` according to this revlog's config.
289 /// Return the delta chain for `rev` according to this revlog's config.
290 /// See [`Index::delta_chain`] for more information.
290 /// See [`Index::delta_chain`] for more information.
291 pub fn delta_chain(
291 pub fn delta_chain(
292 &self,
292 &self,
293 rev: Revision,
293 rev: Revision,
294 stop_rev: Option<Revision>,
294 stop_rev: Option<Revision>,
295 ) -> Result<(Vec<Revision>, bool), HgError> {
295 ) -> Result<(Vec<Revision>, bool), HgError> {
296 self.index.delta_chain(
296 self.index.delta_chain(
297 rev,
297 rev,
298 stop_rev,
298 stop_rev,
299 self.delta_config.general_delta.into(),
299 self.delta_config.general_delta.into(),
300 )
300 )
301 }
301 }
302
302
303 /// Generate a possibly-compressed representation of data.
303 /// Generate a possibly-compressed representation of data.
304 /// Returns `None` if the data was not compressed.
304 /// Returns `None` if the data was not compressed.
305 pub fn compress<'data>(
305 pub fn compress<'data>(
306 &self,
306 &self,
307 data: &'data [u8],
307 data: &'data [u8],
308 ) -> Result<Option<Cow<'data, [u8]>>, RevlogError> {
308 ) -> Result<Option<Cow<'data, [u8]>>, RevlogError> {
309 if data.is_empty() {
309 if data.is_empty() {
310 return Ok(Some(data.into()));
310 return Ok(Some(data.into()));
311 }
311 }
312 let res = self.compressor.lock().unwrap().compress(data)?;
312 let res = self.compressor.lock().unwrap().compress(data)?;
313 if let Some(compressed) = res {
313 if let Some(compressed) = res {
314 // The revlog compressor added the header in the returned data.
314 // The revlog compressor added the header in the returned data.
315 return Ok(Some(compressed.into()));
315 return Ok(Some(compressed.into()));
316 }
316 }
317
317
318 if data[0] == b'\0' {
318 if data[0] == b'\0' {
319 return Ok(Some(data.into()));
319 return Ok(Some(data.into()));
320 }
320 }
321 Ok(None)
321 Ok(None)
322 }
322 }
323
323
324 /// Decompress a revlog chunk.
324 /// Decompress a revlog chunk.
325 ///
325 ///
326 /// The chunk is expected to begin with a header identifying the
326 /// The chunk is expected to begin with a header identifying the
327 /// format type so it can be routed to an appropriate decompressor.
327 /// format type so it can be routed to an appropriate decompressor.
328 pub fn decompress<'a>(
328 pub fn decompress<'a>(
329 &'a self,
329 &'a self,
330 data: &'a [u8],
330 data: &'a [u8],
331 ) -> Result<Cow<[u8]>, RevlogError> {
331 ) -> Result<Cow<[u8]>, RevlogError> {
332 if data.is_empty() {
332 if data.is_empty() {
333 return Ok(data.into());
333 return Ok(data.into());
334 }
334 }
335
335
336 // Revlogs are read much more frequently than they are written and many
336 // Revlogs are read much more frequently than they are written and many
337 // chunks only take microseconds to decompress, so performance is
337 // chunks only take microseconds to decompress, so performance is
338 // important here.
338 // important here.
339
339
340 let header = data[0];
340 let header = data[0];
341 match header {
341 match header {
342 // Settings don't matter as they only affect compression
342 // Settings don't matter as they only affect compression
343 ZLIB_BYTE => Ok(ZlibCompressor::new(0).decompress(data)?.into()),
343 ZLIB_BYTE => Ok(ZlibCompressor::new(0).decompress(data)?.into()),
344 // Settings don't matter as they only affect compression
344 // Settings don't matter as they only affect compression
345 ZSTD_BYTE => {
345 ZSTD_BYTE => {
346 Ok(ZstdCompressor::new(0, 0).decompress(data)?.into())
346 Ok(ZstdCompressor::new(0, 0).decompress(data)?.into())
347 }
347 }
348 b'\0' => Ok(data.into()),
348 b'\0' => Ok(data.into()),
349 b'u' => Ok((&data[1..]).into()),
349 b'u' => Ok((&data[1..]).into()),
350 other => Err(HgError::UnsupportedFeature(format!(
350 other => Err(HgError::UnsupportedFeature(format!(
351 "unknown compression header '{}'",
351 "unknown compression header '{}'",
352 other
352 other
353 ))
353 ))
354 .into()),
354 .into()),
355 }
355 }
356 }
356 }
357
357
358 /// Obtain a segment of raw data corresponding to a range of revisions.
358 /// Obtain a segment of raw data corresponding to a range of revisions.
359 ///
359 ///
360 /// Requests for data may be satisfied by a cache.
360 /// Requests for data may be satisfied by a cache.
361 ///
361 ///
362 /// Returns a 2-tuple of (offset, data) for the requested range of
362 /// Returns a 2-tuple of (offset, data) for the requested range of
363 /// revisions. Offset is the integer offset from the beginning of the
363 /// revisions. Offset is the integer offset from the beginning of the
364 /// revlog and data is a slice of the raw byte data.
364 /// revlog and data is a slice of the raw byte data.
365 ///
365 ///
366 /// Callers will need to call `self.start(rev)` and `self.length(rev)`
366 /// Callers will need to call `self.start(rev)` and `self.length(rev)`
367 /// to determine where each revision's data begins and ends.
367 /// to determine where each revision's data begins and ends.
368 pub fn get_segment_for_revs(
368 pub fn get_segment_for_revs(
369 &self,
369 &self,
370 start_rev: Revision,
370 start_rev: Revision,
371 end_rev: Revision,
371 end_rev: Revision,
372 ) -> Result<(usize, Vec<u8>), HgError> {
372 ) -> Result<(usize, Vec<u8>), HgError> {
373 let start = if start_rev == NULL_REVISION {
373 let start = if start_rev == NULL_REVISION {
374 0
374 0
375 } else {
375 } else {
376 let start_entry = self
376 let start_entry = self
377 .index
377 .index
378 .get_entry(start_rev)
378 .get_entry(start_rev)
379 .expect("null revision segment");
379 .expect("null revision segment");
380 self.index.start(start_rev, &start_entry)
380 self.index.start(start_rev, &start_entry)
381 };
381 };
382 let end_entry = self
382 let end_entry = self
383 .index
383 .index
384 .get_entry(end_rev)
384 .get_entry(end_rev)
385 .expect("null revision segment");
385 .expect("null revision segment");
386 let end = self.index.start(end_rev, &end_entry)
386 let end = self.index.start(end_rev, &end_entry)
387 + self.data_compressed_length(end_rev);
387 + self.data_compressed_length(end_rev);
388
388
389 let length = end - start;
389 let length = end - start;
390
390
391 // XXX should we use mmap instead of doing this for platforms that
391 // XXX should we use mmap instead of doing this for platforms that
392 // support madvise/populate?
392 // support madvise/populate?
393 Ok((start, self.segment_file.read_chunk(start, length)?))
393 Ok((start, self.segment_file.read_chunk(start, length)?))
394 }
394 }
395
395
396 /// Return the uncompressed raw data for `rev`
396 /// Return the uncompressed raw data for `rev`
397 pub fn chunk_for_rev(&self, rev: Revision) -> Result<Arc<[u8]>, HgError> {
397 pub fn chunk_for_rev(&self, rev: Revision) -> Result<Arc<[u8]>, HgError> {
398 if let Some(cache) = self.uncompressed_chunk_cache.as_ref() {
398 if let Some(cache) = self.uncompressed_chunk_cache.as_ref() {
399 if let Some(chunk) = cache.borrow_mut().get(&rev) {
399 if let Some(chunk) = cache.borrow_mut().get(&rev) {
400 return Ok(chunk.clone());
400 return Ok(chunk.clone());
401 }
401 }
402 }
402 }
403 // TODO revlogv2 should check the compression mode
403 // TODO revlogv2 should check the compression mode
404 let data = self.get_segment_for_revs(rev, rev)?.1;
404 let data = self.get_segment_for_revs(rev, rev)?.1;
405 let uncompressed = self.decompress(&data).map_err(|e| {
405 let uncompressed = self.decompress(&data).map_err(|e| {
406 HgError::abort(
406 HgError::abort(
407 format!("revlog decompression error: {}", e),
407 format!("revlog decompression error: {}", e),
408 exit_codes::ABORT,
408 exit_codes::ABORT,
409 None,
409 None,
410 )
410 )
411 })?;
411 })?;
412 let uncompressed: Arc<[u8]> = Arc::from(uncompressed.into_owned());
412 let uncompressed: Arc<[u8]> = Arc::from(uncompressed.into_owned());
413 if let Some(cache) = self.uncompressed_chunk_cache.as_ref() {
413 if let Some(cache) = self.uncompressed_chunk_cache.as_ref() {
414 cache.borrow_mut().insert(rev, uncompressed.clone());
414 cache.borrow_mut().insert(rev, uncompressed.clone());
415 }
415 }
416 Ok(uncompressed)
416 Ok(uncompressed)
417 }
417 }
418
418
419 /// Execute `func` within a read context for the data file, meaning that
419 /// Execute `func` within a read context for the data file, meaning that
420 /// the read handle will be taken and discarded after the operation.
420 /// the read handle will be taken and discarded after the operation.
421 pub fn with_read<R>(
421 pub fn with_read<R>(
422 &self,
422 &self,
423 func: impl FnOnce() -> Result<R, RevlogError>,
423 func: impl FnOnce() -> Result<R, RevlogError>,
424 ) -> Result<R, RevlogError> {
424 ) -> Result<R, RevlogError> {
425 self.enter_reading_context()?;
425 self.enter_reading_context()?;
426 let res = func();
426 let res = func();
427 self.exit_reading_context();
427 self.exit_reading_context();
428 res.map_err(Into::into)
428 res.map_err(Into::into)
429 }
429 }
430
430
431 /// `pub` only for use in hg-cpython
431 /// `pub` only for use in hg-cpython
432 #[doc(hidden)]
432 #[doc(hidden)]
433 pub fn enter_reading_context(&self) -> Result<(), HgError> {
433 pub fn enter_reading_context(&self) -> Result<(), HgError> {
434 if self.is_empty() {
434 if self.is_empty() {
435 // Nothing to be read
435 // Nothing to be read
436 return Ok(());
436 return Ok(());
437 }
437 }
438 if self.delayed_buffer.is_some() && self.is_inline() {
438 if self.delayed_buffer.is_some() && self.is_inline() {
439 return Err(HgError::abort(
439 return Err(HgError::abort(
440 "revlog with delayed write should not be inline",
440 "revlog with delayed write should not be inline",
441 exit_codes::ABORT,
441 exit_codes::ABORT,
442 None,
442 None,
443 ));
443 ));
444 }
444 }
445 self.segment_file.get_read_handle()?;
445 self.segment_file.get_read_handle()?;
446 Ok(())
446 Ok(())
447 }
447 }
448
448
449 /// `pub` only for use in hg-cpython
449 /// `pub` only for use in hg-cpython
450 #[doc(hidden)]
450 #[doc(hidden)]
451 pub fn exit_reading_context(&self) {
451 pub fn exit_reading_context(&self) {
452 self.segment_file.exit_reading_context()
452 self.segment_file.exit_reading_context()
453 }
453 }
454
454
455 /// Fill the buffer returned by `get_buffer` with the possibly un-validated
455 /// Fill the buffer returned by `get_buffer` with the possibly un-validated
456 /// raw text for a revision. It can be already validated if it comes
456 /// raw text for a revision. It can be already validated if it comes
457 /// from the cache.
457 /// from the cache.
458 pub fn raw_text<G, T>(
458 pub fn raw_text<G, T>(
459 &self,
459 &self,
460 rev: Revision,
460 rev: Revision,
461 get_buffer: G,
461 get_buffer: G,
462 ) -> Result<(), RevlogError>
462 ) -> Result<(), RevlogError>
463 where
463 where
464 G: FnOnce(
464 G: FnOnce(
465 usize,
465 usize,
466 &mut dyn FnMut(
466 &mut dyn FnMut(
467 &mut dyn RevisionBuffer<Target = T>,
467 &mut dyn RevisionBuffer<Target = T>,
468 ) -> Result<(), RevlogError>,
468 ) -> Result<(), RevlogError>,
469 ) -> Result<(), RevlogError>,
469 ) -> Result<(), RevlogError>,
470 {
470 {
471 let entry = &self.get_entry(rev)?;
471 let entry = &self.get_entry(rev)?;
472 let raw_size = entry.uncompressed_len();
472 let raw_size = entry.uncompressed_len();
473 let mut mutex_guard = self
473 let mut mutex_guard = self
474 .last_revision_cache
474 .last_revision_cache
475 .lock()
475 .lock()
476 .expect("lock should not be held");
476 .expect("lock should not be held");
477 let cached_rev = if let Some((_node, rev, data)) = &*mutex_guard {
477 let cached_rev = if let Some((_node, rev, data)) = &*mutex_guard {
478 Some((*rev, data.deref().as_ref()))
478 Some((*rev, data.deref().as_ref()))
479 } else {
479 } else {
480 None
480 None
481 };
481 };
482 if let Some(cache) = &self.uncompressed_chunk_cache {
482 if let Some(cache) = &self.uncompressed_chunk_cache {
483 let cache = &mut cache.borrow_mut();
483 let cache = &mut cache.borrow_mut();
484 if let Some(size) = raw_size {
484 if let Some(size) = raw_size {
485 // Dynamically update the uncompressed_chunk_cache size to the
485 // Dynamically update the uncompressed_chunk_cache size to the
486 // largest revision we've seen in this revlog.
486 // largest revision we've seen in this revlog.
487 // Do it *before* restoration in case the current revision
487 // Do it *before* restoration in case the current revision
488 // is the largest.
488 // is the largest.
489 let factor = self
489 let factor = self
490 .data_config
490 .data_config
491 .uncompressed_cache_factor
491 .uncompressed_cache_factor
492 .expect("cache should not exist without factor");
492 .expect("cache should not exist without factor");
493 let candidate_size = (size as f64 * factor) as usize;
493 let candidate_size = (size as f64 * factor) as usize;
494 let limiter_mut = cache.limiter_mut();
494 let limiter_mut = cache.limiter_mut();
495 if candidate_size > limiter_mut.max_memory_usage() {
495 if candidate_size > limiter_mut.max_memory_usage() {
496 std::mem::swap(
496 std::mem::swap(
497 limiter_mut,
497 limiter_mut,
498 &mut ByMemoryUsage::new(candidate_size),
498 &mut ByMemoryUsage::new(candidate_size),
499 );
499 );
500 }
500 }
501 }
501 }
502 }
502 }
503 entry.rawdata(cached_rev, get_buffer)?;
503 entry.rawdata(cached_rev, get_buffer)?;
504 // drop cache to save memory, the caller is expected to update
504 // drop cache to save memory, the caller is expected to update
505 // the revision cache after validating the text
505 // the revision cache after validating the text
506 mutex_guard.take();
506 mutex_guard.take();
507 Ok(())
507 Ok(())
508 }
508 }
509
509
510 /// Only `pub` for `hg-cpython`.
510 /// Only `pub` for `hg-cpython`.
511 /// Obtain decompressed raw data for the specified revisions that are
511 /// Obtain decompressed raw data for the specified revisions that are
512 /// assumed to be in ascending order.
512 /// assumed to be in ascending order.
513 ///
513 ///
514 /// Returns a list with decompressed data for each requested revision.
514 /// Returns a list with decompressed data for each requested revision.
515 #[doc(hidden)]
515 #[doc(hidden)]
516 pub fn chunks(
516 pub fn chunks(
517 &self,
517 &self,
518 revs: Vec<Revision>,
518 revs: Vec<Revision>,
519 target_size: Option<u64>,
519 target_size: Option<u64>,
520 ) -> Result<Vec<Arc<[u8]>>, RevlogError> {
520 ) -> Result<Vec<Arc<[u8]>>, RevlogError> {
521 if revs.is_empty() {
521 if revs.is_empty() {
522 return Ok(vec![]);
522 return Ok(vec![]);
523 }
523 }
524 let mut fetched_revs = vec![];
524 let mut fetched_revs = vec![];
525 let mut chunks = Vec::with_capacity(revs.len());
525 let mut chunks = Vec::with_capacity(revs.len());
526
526
527 match self.uncompressed_chunk_cache.as_ref() {
527 match self.uncompressed_chunk_cache.as_ref() {
528 Some(cache) => {
528 Some(cache) => {
529 let mut cache = cache.borrow_mut();
529 let mut cache = cache.borrow_mut();
530 for rev in revs.iter() {
530 for rev in revs.iter() {
531 match cache.get(rev) {
531 match cache.get(rev) {
532 Some(hit) => chunks.push((*rev, hit.to_owned())),
532 Some(hit) => chunks.push((*rev, hit.to_owned())),
533 None => fetched_revs.push(*rev),
533 None => fetched_revs.push(*rev),
534 }
534 }
535 }
535 }
536 }
536 }
537 None => fetched_revs = revs,
537 None => fetched_revs = revs,
538 }
538 }
539
539
540 let already_cached = chunks.len();
540 let already_cached = chunks.len();
541
541
542 let sliced_chunks = if fetched_revs.is_empty() {
542 let sliced_chunks = if fetched_revs.is_empty() {
543 vec![]
543 vec![]
544 } else if !self.data_config.with_sparse_read || self.is_inline() {
544 } else if !self.data_config.with_sparse_read || self.is_inline() {
545 vec![fetched_revs]
545 vec![fetched_revs]
546 } else {
546 } else {
547 self.slice_chunk(&fetched_revs, target_size)?
547 self.slice_chunk(&fetched_revs, target_size)?
548 };
548 };
549
549
550 self.with_read(|| {
550 self.with_read(|| {
551 for revs_chunk in sliced_chunks {
551 for revs_chunk in sliced_chunks {
552 let first_rev = revs_chunk[0];
552 let first_rev = revs_chunk[0];
553 // Skip trailing revisions with empty diff
553 // Skip trailing revisions with empty diff
554 let last_rev_idx = revs_chunk
554 let last_rev_idx = revs_chunk
555 .iter()
555 .iter()
556 .rposition(|r| self.data_compressed_length(*r) != 0)
556 .rposition(|r| self.data_compressed_length(*r) != 0)
557 .unwrap_or(revs_chunk.len() - 1);
557 .unwrap_or(revs_chunk.len() - 1);
558
558
559 let last_rev = revs_chunk[last_rev_idx];
559 let last_rev = revs_chunk[last_rev_idx];
560
560
561 let (offset, data) =
561 let (offset, data) =
562 self.get_segment_for_revs(first_rev, last_rev)?;
562 self.get_segment_for_revs(first_rev, last_rev)?;
563
563
564 let revs_chunk = &revs_chunk[..=last_rev_idx];
564 let revs_chunk = &revs_chunk[..=last_rev_idx];
565
565
566 for rev in revs_chunk {
566 for rev in revs_chunk {
567 let chunk_start = self.data_start(*rev);
567 let chunk_start = self.data_start(*rev);
568 let chunk_length = self.data_compressed_length(*rev);
568 let chunk_length = self.data_compressed_length(*rev);
569 // TODO revlogv2 should check the compression mode
569 // TODO revlogv2 should check the compression mode
570 let bytes = &data[chunk_start - offset..][..chunk_length];
570 let bytes = &data[chunk_start - offset..][..chunk_length];
571 let chunk = if !bytes.is_empty() && bytes[0] == ZSTD_BYTE {
571 let chunk = if !bytes.is_empty() && bytes[0] == ZSTD_BYTE {
572 // If we're using `zstd`, we want to try a more
572 // If we're using `zstd`, we want to try a more
573 // specialized decompression
573 // specialized decompression
574 let entry = self.index.get_entry(*rev).unwrap();
574 let entry = self.index.get_entry(*rev).unwrap();
575 let is_delta = entry
575 let is_delta = entry
576 .base_revision_or_base_of_delta_chain()
576 .base_revision_or_base_of_delta_chain()
577 != (*rev).into();
577 != (*rev).into();
578 let uncompressed = uncompressed_zstd_data(
578 let uncompressed = uncompressed_zstd_data(
579 bytes,
579 bytes,
580 is_delta,
580 is_delta,
581 entry.uncompressed_len(),
581 entry.uncompressed_len(),
582 )?;
582 )?;
583 Cow::Owned(uncompressed)
583 Cow::Owned(uncompressed)
584 } else {
584 } else {
585 // Otherwise just fallback to generic decompression.
585 // Otherwise just fallback to generic decompression.
586 self.decompress(bytes)?
586 self.decompress(bytes)?
587 };
587 };
588
588
589 chunks.push((*rev, chunk.into()));
589 chunks.push((*rev, chunk.into()));
590 }
590 }
591 }
591 }
592 Ok(())
592 Ok(())
593 })?;
593 })?;
594
594
595 if let Some(cache) = self.uncompressed_chunk_cache.as_ref() {
595 if let Some(cache) = self.uncompressed_chunk_cache.as_ref() {
596 let mut cache = cache.borrow_mut();
596 let mut cache = cache.borrow_mut();
597 for (rev, chunk) in chunks.iter().skip(already_cached) {
597 for (rev, chunk) in chunks.iter().skip(already_cached) {
598 cache.insert(*rev, chunk.clone());
598 cache.insert(*rev, chunk.clone());
599 }
599 }
600 }
600 }
601 // Use stable sort here since it's *mostly* sorted
601 // Use stable sort here since it's *mostly* sorted
602 chunks.sort_by(|a, b| a.0.cmp(&b.0));
602 chunks.sort_by(|a, b| a.0.cmp(&b.0));
603 Ok(chunks.into_iter().map(|(_r, chunk)| chunk).collect())
603 Ok(chunks.into_iter().map(|(_r, chunk)| chunk).collect())
604 }
604 }
605
605
606 /// Slice revs to reduce the amount of unrelated data to be read from disk.
606 /// Slice revs to reduce the amount of unrelated data to be read from disk.
607 ///
607 ///
608 /// ``revs`` is sliced into groups that should be read in one time.
608 /// ``revs`` is sliced into groups that should be read in one time.
609 /// Assume that revs are sorted.
609 /// Assume that revs are sorted.
610 ///
610 ///
611 /// The initial chunk is sliced until the overall density
611 /// The initial chunk is sliced until the overall density
612 /// (payload/chunks-span ratio) is above
612 /// (payload/chunks-span ratio) is above
613 /// `revlog.data_config.sr_density_threshold`.
613 /// `revlog.data_config.sr_density_threshold`.
614 /// No gap smaller than `revlog.data_config.sr_min_gap_size` is skipped.
614 /// No gap smaller than `revlog.data_config.sr_min_gap_size` is skipped.
615 ///
615 ///
616 /// If `target_size` is set, no chunk larger than `target_size`
616 /// If `target_size` is set, no chunk larger than `target_size`
617 /// will be returned.
617 /// will be returned.
618 /// For consistency with other slicing choices, this limit won't go lower
618 /// For consistency with other slicing choices, this limit won't go lower
619 /// than `revlog.data_config.sr_min_gap_size`.
619 /// than `revlog.data_config.sr_min_gap_size`.
620 ///
620 ///
621 /// If individual revision chunks are larger than this limit, they will
621 /// If individual revision chunks are larger than this limit, they will
622 /// still be raised individually.
622 /// still be raised individually.
623 pub fn slice_chunk(
623 pub fn slice_chunk(
624 &self,
624 &self,
625 revs: &[Revision],
625 revs: &[Revision],
626 target_size: Option<u64>,
626 target_size: Option<u64>,
627 ) -> Result<Vec<Vec<Revision>>, RevlogError> {
627 ) -> Result<Vec<Vec<Revision>>, RevlogError> {
628 let target_size =
628 let target_size =
629 target_size.map(|size| size.max(self.data_config.sr_min_gap_size));
629 target_size.map(|size| size.max(self.data_config.sr_min_gap_size));
630
630
631 let target_density = self.data_config.sr_density_threshold;
631 let target_density = self.data_config.sr_density_threshold;
632 let min_gap_size = self.data_config.sr_min_gap_size as usize;
632 let min_gap_size = self.data_config.sr_min_gap_size as usize;
633 let to_density = self.index.slice_chunk_to_density(
633 let to_density = self.index.slice_chunk_to_density(
634 revs,
634 revs,
635 target_density,
635 target_density,
636 min_gap_size,
636 min_gap_size,
637 );
637 );
638
638
639 let mut sliced = vec![];
639 let mut sliced = vec![];
640
640
641 for chunk in to_density {
641 for chunk in to_density {
642 sliced.extend(
642 sliced.extend(
643 self.slice_chunk_to_size(&chunk, target_size)?
643 self.slice_chunk_to_size(&chunk, target_size)?
644 .into_iter()
644 .into_iter()
645 .map(ToOwned::to_owned),
645 .map(ToOwned::to_owned),
646 );
646 );
647 }
647 }
648
648
649 Ok(sliced)
649 Ok(sliced)
650 }
650 }
651
651
652 /// Slice revs to match the target size
652 /// Slice revs to match the target size
653 ///
653 ///
654 /// This is intended to be used on chunks that density slicing selected,
654 /// This is intended to be used on chunks that density slicing selected,
655 /// but that are still too large compared to the read guarantee of revlogs.
655 /// but that are still too large compared to the read guarantee of revlogs.
656 /// This might happen when the "minimal gap size" interrupted the slicing
656 /// This might happen when the "minimal gap size" interrupted the slicing
657 /// or when chains are built in a way that create large blocks next to
657 /// or when chains are built in a way that create large blocks next to
658 /// each other.
658 /// each other.
659 fn slice_chunk_to_size<'a>(
659 fn slice_chunk_to_size<'a>(
660 &self,
660 &self,
661 revs: &'a [Revision],
661 revs: &'a [Revision],
662 target_size: Option<u64>,
662 target_size: Option<u64>,
663 ) -> Result<Vec<&'a [Revision]>, RevlogError> {
663 ) -> Result<Vec<&'a [Revision]>, RevlogError> {
664 let mut start_data = self.data_start(revs[0]);
664 let mut start_data = self.data_start(revs[0]);
665 let end_data = self.data_end(revs[revs.len() - 1]);
665 let end_data = self.data_end(revs[revs.len() - 1]);
666 let full_span = end_data - start_data;
666 let full_span = end_data - start_data;
667
667
668 let nothing_to_do = target_size
668 let nothing_to_do = target_size
669 .map(|size| full_span <= size as usize)
669 .map(|size| full_span <= size as usize)
670 .unwrap_or(true);
670 .unwrap_or(true);
671
671
672 if nothing_to_do {
672 if nothing_to_do {
673 return Ok(vec![revs]);
673 return Ok(vec![revs]);
674 }
674 }
675 let target_size = target_size.expect("target_size is set") as usize;
675 let target_size = target_size.expect("target_size is set") as usize;
676
676
677 let mut start_rev_idx = 0;
677 let mut start_rev_idx = 0;
678 let mut end_rev_idx = 1;
678 let mut end_rev_idx = 1;
679 let mut chunks = vec![];
679 let mut chunks = vec![];
680
680
681 for (idx, rev) in revs.iter().enumerate().skip(1) {
681 for (idx, rev) in revs.iter().enumerate().skip(1) {
682 let span = self.data_end(*rev) - start_data;
682 let span = self.data_end(*rev) - start_data;
683 let is_snapshot = self.is_snapshot(*rev)?;
683 let is_snapshot = self.is_snapshot(*rev)?;
684 if span <= target_size && is_snapshot {
684 if span <= target_size && is_snapshot {
685 end_rev_idx = idx + 1;
685 end_rev_idx = idx + 1;
686 } else {
686 } else {
687 let chunk =
687 let chunk =
688 self.trim_chunk(revs, start_rev_idx, Some(end_rev_idx));
688 self.trim_chunk(revs, start_rev_idx, Some(end_rev_idx));
689 if !chunk.is_empty() {
689 if !chunk.is_empty() {
690 chunks.push(chunk);
690 chunks.push(chunk);
691 }
691 }
692 start_rev_idx = idx;
692 start_rev_idx = idx;
693 start_data = self.data_start(*rev);
693 start_data = self.data_start(*rev);
694 end_rev_idx = idx + 1;
694 end_rev_idx = idx + 1;
695 }
695 }
696 if !is_snapshot {
696 if !is_snapshot {
697 break;
697 break;
698 }
698 }
699 }
699 }
700
700
701 // For the others, we use binary slicing to quickly converge towards
701 // For the others, we use binary slicing to quickly converge towards
702 // valid chunks (otherwise, we might end up looking for the start/end
702 // valid chunks (otherwise, we might end up looking for the start/end
703 // of many revisions). This logic is not looking for the perfect
703 // of many revisions). This logic is not looking for the perfect
704 // slicing point, it quickly converges towards valid chunks.
704 // slicing point, it quickly converges towards valid chunks.
705 let number_of_items = revs.len();
705 let number_of_items = revs.len();
706
706
707 while (end_data - start_data) > target_size {
707 while (end_data - start_data) > target_size {
708 end_rev_idx = number_of_items;
708 end_rev_idx = number_of_items;
709 if number_of_items - start_rev_idx <= 1 {
709 if number_of_items - start_rev_idx <= 1 {
710 // Protect against individual chunks larger than the limit
710 // Protect against individual chunks larger than the limit
711 break;
711 break;
712 }
712 }
713 let mut local_end_data = self.data_end(revs[end_rev_idx - 1]);
713 let mut local_end_data = self.data_end(revs[end_rev_idx - 1]);
714 let mut span = local_end_data - start_data;
714 let mut span = local_end_data - start_data;
715 while span > target_size {
715 while span > target_size {
716 if end_rev_idx - start_rev_idx <= 1 {
716 if end_rev_idx - start_rev_idx <= 1 {
717 // Protect against individual chunks larger than the limit
717 // Protect against individual chunks larger than the limit
718 break;
718 break;
719 }
719 }
720 end_rev_idx -= (end_rev_idx - start_rev_idx) / 2;
720 end_rev_idx -= (end_rev_idx - start_rev_idx) / 2;
721 local_end_data = self.data_end(revs[end_rev_idx - 1]);
721 local_end_data = self.data_end(revs[end_rev_idx - 1]);
722 span = local_end_data - start_data;
722 span = local_end_data - start_data;
723 }
723 }
724 let chunk =
724 let chunk =
725 self.trim_chunk(revs, start_rev_idx, Some(end_rev_idx));
725 self.trim_chunk(revs, start_rev_idx, Some(end_rev_idx));
726 if !chunk.is_empty() {
726 if !chunk.is_empty() {
727 chunks.push(chunk);
727 chunks.push(chunk);
728 }
728 }
729 start_rev_idx = end_rev_idx;
729 start_rev_idx = end_rev_idx;
730 start_data = self.data_start(revs[start_rev_idx]);
730 start_data = self.data_start(revs[start_rev_idx]);
731 }
731 }
732
732
733 let chunk = self.trim_chunk(revs, start_rev_idx, None);
733 let chunk = self.trim_chunk(revs, start_rev_idx, None);
734 if !chunk.is_empty() {
734 if !chunk.is_empty() {
735 chunks.push(chunk);
735 chunks.push(chunk);
736 }
736 }
737
737
738 Ok(chunks)
738 Ok(chunks)
739 }
739 }
740
740
741 /// Returns `revs[startidx..endidx]` without empty trailing revs
741 /// Returns `revs[startidx..endidx]` without empty trailing revs
742 fn trim_chunk<'a>(
742 fn trim_chunk<'a>(
743 &self,
743 &self,
744 revs: &'a [Revision],
744 revs: &'a [Revision],
745 start_rev_idx: usize,
745 start_rev_idx: usize,
746 end_rev_idx: Option<usize>,
746 end_rev_idx: Option<usize>,
747 ) -> &'a [Revision] {
747 ) -> &'a [Revision] {
748 let mut end_rev_idx = end_rev_idx.unwrap_or(revs.len());
748 let mut end_rev_idx = end_rev_idx.unwrap_or(revs.len());
749
749
750 // If we have a non-empty delta candidate, there is nothing to trim
750 // If we have a non-empty delta candidate, there is nothing to trim
751 if revs[end_rev_idx - 1].0 < self.len() as BaseRevision {
751 if revs[end_rev_idx - 1].0 < self.len() as BaseRevision {
752 // Trim empty revs at the end, except the very first rev of a chain
752 // Trim empty revs at the end, except the very first rev of a chain
753 while end_rev_idx > 1
753 while end_rev_idx > 1
754 && end_rev_idx > start_rev_idx
754 && end_rev_idx > start_rev_idx
755 && self.data_compressed_length(revs[end_rev_idx - 1]) == 0
755 && self.data_compressed_length(revs[end_rev_idx - 1]) == 0
756 {
756 {
757 end_rev_idx -= 1
757 end_rev_idx -= 1
758 }
758 }
759 }
759 }
760
760
761 &revs[start_rev_idx..end_rev_idx]
761 &revs[start_rev_idx..end_rev_idx]
762 }
762 }
763
763
764 /// Check the hash of some given data against the recorded hash.
764 /// Check the hash of some given data against the recorded hash.
765 pub fn check_hash(
765 pub fn check_hash(
766 &self,
766 &self,
767 p1: Revision,
767 p1: Revision,
768 p2: Revision,
768 p2: Revision,
769 expected: &[u8],
769 expected: &[u8],
770 data: &[u8],
770 data: &[u8],
771 ) -> bool {
771 ) -> bool {
772 let e1 = self.index.get_entry(p1);
772 let e1 = self.index.get_entry(p1);
773 let h1 = match e1 {
773 let h1 = match e1 {
774 Some(ref entry) => entry.hash(),
774 Some(ref entry) => entry.hash(),
775 None => &NULL_NODE,
775 None => &NULL_NODE,
776 };
776 };
777 let e2 = self.index.get_entry(p2);
777 let e2 = self.index.get_entry(p2);
778 let h2 = match e2 {
778 let h2 = match e2 {
779 Some(ref entry) => entry.hash(),
779 Some(ref entry) => entry.hash(),
780 None => &NULL_NODE,
780 None => &NULL_NODE,
781 };
781 };
782
782
783 hash(data, h1.as_bytes(), h2.as_bytes()) == expected
783 hash(data, h1.as_bytes(), h2.as_bytes()) == expected
784 }
784 }
785
785
786 /// Returns whether we are currently in a [`Self::with_write`] context
786 /// Returns whether we are currently in a [`Self::with_write`] context
787 pub fn is_writing(&self) -> bool {
787 pub fn is_writing(&self) -> bool {
788 self.writing_handles.is_some()
788 self.writing_handles.is_some()
789 }
789 }
790
790
791 /// Open the revlog files for writing
791 /// Open the revlog files for writing
792 ///
792 ///
793 /// Adding content to a revlog should be done within this context.
793 /// Adding content to a revlog should be done within this context.
794 /// TODO try using `BufRead` and `BufWrite` and see if performance improves
794 /// TODO try using `BufRead` and `BufWrite` and see if performance improves
795 pub fn with_write<R>(
795 pub fn with_write<R>(
796 &mut self,
796 &mut self,
797 transaction: &mut impl Transaction,
797 transaction: &mut impl Transaction,
798 data_end: Option<usize>,
798 data_end: Option<usize>,
799 func: impl FnOnce() -> R,
799 func: impl FnOnce() -> R,
800 ) -> Result<R, HgError> {
800 ) -> Result<R, HgError> {
801 if self.is_writing() {
801 if self.is_writing() {
802 return Ok(func());
802 return Ok(func());
803 }
803 }
804 self.enter_writing_context(data_end, transaction)
804 self.enter_writing_context(data_end, transaction)
805 .inspect_err(|_| {
805 .inspect_err(|_| {
806 self.exit_writing_context();
806 self.exit_writing_context();
807 })?;
807 })?;
808 let res = func();
808 let res = func();
809 self.exit_writing_context();
809 self.exit_writing_context();
810 Ok(res)
810 Ok(res)
811 }
811 }
812
812
813 /// `pub` only for use in hg-cpython
813 /// `pub` only for use in hg-cpython
814 #[doc(hidden)]
814 #[doc(hidden)]
815 pub fn exit_writing_context(&mut self) {
815 pub fn exit_writing_context(&mut self) {
816 self.writing_handles.take();
816 self.writing_handles.take();
817 self.segment_file.writing_handle.take();
817 self.segment_file.writing_handle.take();
818 self.segment_file.reading_handle.take();
818 self.segment_file.reading_handle.take();
819 }
819 }
820
820
821 /// `pub` only for use in hg-cpython
821 /// `pub` only for use in hg-cpython
822 #[doc(hidden)]
822 #[doc(hidden)]
823 pub fn python_writing_handles(&self) -> Option<&WriteHandles> {
823 pub fn python_writing_handles(&self) -> Option<&WriteHandles> {
824 self.writing_handles.as_ref()
824 self.writing_handles.as_ref()
825 }
825 }
826
826
827 /// `pub` only for use in hg-cpython
827 /// `pub` only for use in hg-cpython
828 #[doc(hidden)]
828 #[doc(hidden)]
829 pub fn enter_writing_context(
829 pub fn enter_writing_context(
830 &mut self,
830 &mut self,
831 data_end: Option<usize>,
831 data_end: Option<usize>,
832 transaction: &mut impl Transaction,
832 transaction: &mut impl Transaction,
833 ) -> Result<(), HgError> {
833 ) -> Result<(), HgError> {
834 let data_size = if self.is_empty() {
834 let data_size = if self.is_empty() {
835 0
835 0
836 } else {
836 } else {
837 self.data_end(Revision((self.len() - 1) as BaseRevision))
837 self.data_end(Revision((self.len() - 1) as BaseRevision))
838 };
838 };
839 let data_handle = if !self.is_inline() {
839 let data_handle = if !self.is_inline() {
840 let data_handle = match self.vfs.open_write(&self.data_file) {
840 let data_handle = match self.vfs.open_write(&self.data_file) {
841 Ok(mut f) => {
841 Ok(mut f) => {
842 if let Some(end) = data_end {
842 if let Some(end) = data_end {
843 f.seek(SeekFrom::Start(end as u64))
843 f.seek(SeekFrom::Start(end as u64))
844 .when_reading_file(&self.data_file)?;
844 .when_reading_file(&self.data_file)?;
845 } else {
845 } else {
846 f.seek(SeekFrom::End(0))
846 f.seek(SeekFrom::End(0))
847 .when_reading_file(&self.data_file)?;
847 .when_reading_file(&self.data_file)?;
848 }
848 }
849 f
849 f
850 }
850 }
851 Err(e) => match e {
851 Err(e) => match e {
852 HgError::IoError { error, context } => {
852 HgError::IoError { error, context } => {
853 if error.kind() != ErrorKind::NotFound {
853 if error.kind() != ErrorKind::NotFound {
854 return Err(HgError::IoError { error, context });
854 return Err(HgError::IoError { error, context });
855 }
855 }
856 self.vfs.create(&self.data_file, true)?
856 self.vfs.create(&self.data_file, true)?
857 }
857 }
858 e => return Err(e),
858 e => return Err(e),
859 },
859 },
860 };
860 };
861 transaction.add(&self.data_file, data_size);
861 transaction.add(&self.data_file, data_size);
862 Some(FileHandle::from_file(
862 Some(FileHandle::from_file(
863 data_handle,
863 data_handle,
864 dyn_clone::clone_box(&*self.vfs),
864 dyn_clone::clone_box(&*self.vfs),
865 &self.data_file,
865 &self.data_file,
866 ))
866 ))
867 } else {
867 } else {
868 None
868 None
869 };
869 };
870 let index_size = self.len() * INDEX_ENTRY_SIZE;
870 let index_size = self.len() * INDEX_ENTRY_SIZE;
871 let index_handle = self.index_write_handle()?;
871 let index_handle = self.index_write_handle()?;
872 if self.is_inline() {
872 if self.is_inline() {
873 transaction.add(&self.index_file, data_size);
873 transaction.add(&self.index_file, data_size);
874 } else {
874 } else {
875 transaction.add(&self.index_file, index_size);
875 transaction.add(&self.index_file, index_size);
876 }
876 }
877 self.writing_handles = Some(WriteHandles {
877 self.writing_handles = Some(WriteHandles {
878 index_handle: index_handle.clone(),
878 index_handle: index_handle.clone(),
879 data_handle: data_handle.clone(),
879 data_handle: data_handle.clone(),
880 });
880 });
881 *self.segment_file.reading_handle.borrow_mut() = if self.is_inline() {
881 *self.segment_file.reading_handle.borrow_mut() = if self.is_inline() {
882 Some(index_handle)
882 Some(index_handle)
883 } else {
883 } else {
884 data_handle
884 data_handle
885 };
885 };
886 Ok(())
886 Ok(())
887 }
887 }
888
888
889 /// Get a write handle to the index, sought to the end of its data.
889 /// Get a write handle to the index, sought to the end of its data.
890 fn index_write_handle(&self) -> Result<FileHandle, HgError> {
890 fn index_write_handle(&self) -> Result<FileHandle, HgError> {
891 let res = if self.delayed_buffer.is_none() {
891 let res = if self.delayed_buffer.is_none() {
892 if self.data_config.check_ambig {
892 if self.data_config.check_ambig {
893 self.vfs.open_check_ambig(&self.index_file)
893 self.vfs.open_check_ambig(&self.index_file)
894 } else {
894 } else {
895 self.vfs.open_write(&self.index_file)
895 self.vfs.open_write(&self.index_file)
896 }
896 }
897 } else {
897 } else {
898 self.vfs.open_write(&self.index_file)
898 self.vfs.open_write(&self.index_file)
899 };
899 };
900 match res {
900 match res {
901 Ok(mut handle) => {
901 Ok(mut handle) => {
902 handle
902 handle
903 .seek(SeekFrom::End(0))
903 .seek(SeekFrom::End(0))
904 .when_reading_file(&self.index_file)?;
904 .when_reading_file(&self.index_file)?;
905 Ok(
905 Ok(
906 if let Some(delayed_buffer) = self.delayed_buffer.as_ref()
906 if let Some(delayed_buffer) = self.delayed_buffer.as_ref()
907 {
907 {
908 FileHandle::from_file_delayed(
908 FileHandle::from_file_delayed(
909 handle,
909 handle,
910 dyn_clone::clone_box(&*self.vfs),
910 dyn_clone::clone_box(&*self.vfs),
911 &self.index_file,
911 &self.index_file,
912 delayed_buffer.clone(),
912 delayed_buffer.clone(),
913 )?
913 )?
914 } else {
914 } else {
915 FileHandle::from_file(
915 FileHandle::from_file(
916 handle,
916 handle,
917 dyn_clone::clone_box(&*self.vfs),
917 dyn_clone::clone_box(&*self.vfs),
918 &self.index_file,
918 &self.index_file,
919 )
919 )
920 },
920 },
921 )
921 )
922 }
922 }
923 Err(e) => match e {
923 Err(e) => match e {
924 HgError::IoError { error, context } => {
924 HgError::IoError { error, context } => {
925 if error.kind() != ErrorKind::NotFound {
925 if error.kind() != ErrorKind::NotFound {
926 return Err(HgError::IoError { error, context });
926 return Err(HgError::IoError { error, context });
927 };
927 };
928 if let Some(delayed_buffer) = self.delayed_buffer.as_ref()
928 if let Some(delayed_buffer) = self.delayed_buffer.as_ref()
929 {
929 {
930 FileHandle::new_delayed(
930 FileHandle::new_delayed(
931 dyn_clone::clone_box(&*self.vfs),
931 dyn_clone::clone_box(&*self.vfs),
932 &self.index_file,
932 &self.index_file,
933 true,
933 true,
934 delayed_buffer.clone(),
934 delayed_buffer.clone(),
935 )
935 )
936 } else {
936 } else {
937 FileHandle::new(
937 FileHandle::new(
938 dyn_clone::clone_box(&*self.vfs),
938 dyn_clone::clone_box(&*self.vfs),
939 &self.index_file,
939 &self.index_file,
940 true,
940 true,
941 true,
941 true,
942 )
942 )
943 }
943 }
944 }
944 }
945 e => Err(e),
945 e => Err(e),
946 },
946 },
947 }
947 }
948 }
948 }
949
949
950 /// Split the data of an inline revlog into an index and a data file
950 /// Split the data of an inline revlog into an index and a data file
951 pub fn split_inline(
951 pub fn split_inline(
952 &mut self,
952 &mut self,
953 header: IndexHeader,
953 header: IndexHeader,
954 new_index_file_path: Option<PathBuf>,
954 new_index_file_path: Option<PathBuf>,
955 ) -> Result<PathBuf, RevlogError> {
955 ) -> Result<PathBuf, RevlogError> {
956 assert!(self.delayed_buffer.is_none());
956 assert!(self.delayed_buffer.is_none());
957 let existing_handles = self.writing_handles.is_some();
957 let existing_handles = self.writing_handles.is_some();
958 if let Some(handles) = &mut self.writing_handles {
958 if let Some(handles) = &mut self.writing_handles {
959 handles.index_handle.flush()?;
959 handles.index_handle.flush()?;
960 self.writing_handles.take();
960 self.writing_handles.take();
961 self.segment_file.writing_handle.take();
961 self.segment_file.writing_handle.take();
962 }
962 }
963 let mut new_data_file_handle =
963 let mut new_data_file_handle =
964 self.vfs.create(&self.data_file, true)?;
964 self.vfs.create(&self.data_file, true)?;
965 // Drop any potential data, possibly redundant with the VFS impl.
965 // Drop any potential data, possibly redundant with the VFS impl.
966 new_data_file_handle
966 new_data_file_handle
967 .set_len(0)
967 .set_len(0)
968 .when_writing_file(&self.data_file)?;
968 .when_writing_file(&self.data_file)?;
969
969
970 self.with_read(|| -> Result<(), RevlogError> {
970 self.with_read(|| -> Result<(), RevlogError> {
971 for r in 0..self.index.len() {
971 for r in 0..self.index.len() {
972 let rev = Revision(r as BaseRevision);
972 let rev = Revision(r as BaseRevision);
973 let rev_segment = self.get_segment_for_revs(rev, rev)?.1;
973 let rev_segment = self.get_segment_for_revs(rev, rev)?.1;
974 new_data_file_handle
974 new_data_file_handle
975 .write_all(&rev_segment)
975 .write_all(&rev_segment)
976 .when_writing_file(&self.data_file)?;
976 .when_writing_file(&self.data_file)?;
977 }
977 }
978 new_data_file_handle
978 new_data_file_handle
979 .flush()
979 .flush()
980 .when_writing_file(&self.data_file)?;
980 .when_writing_file(&self.data_file)?;
981 Ok(())
981 Ok(())
982 })?;
982 })?;
983
983
984 if let Some(index_path) = new_index_file_path {
984 if let Some(index_path) = new_index_file_path {
985 self.index_file = index_path
985 self.index_file = index_path
986 }
986 }
987
987
988 let mut new_index_handle = self.vfs.create(&self.index_file, true)?;
988 let mut new_index_handle = self.vfs.create(&self.index_file, true)?;
989 let mut new_data = Vec::with_capacity(self.len() * INDEX_ENTRY_SIZE);
989 let mut new_data = Vec::with_capacity(self.len() * INDEX_ENTRY_SIZE);
990 for r in 0..self.len() {
990 for r in 0..self.len() {
991 let rev = Revision(r as BaseRevision);
991 let rev = Revision(r as BaseRevision);
992 let entry = self.index.entry_binary(rev).unwrap_or_else(|| {
992 let entry = self.index.entry_binary(rev).unwrap_or_else(|| {
993 panic!(
993 panic!(
994 "entry {} should exist in {}",
994 "entry {} should exist in {}",
995 r,
995 r,
996 self.index_file.display()
996 self.index_file.display()
997 )
997 )
998 });
998 });
999 if r == 0 {
999 if r == 0 {
1000 new_data.extend(header.header_bytes);
1000 new_data.extend(header.header_bytes);
1001 }
1001 }
1002 new_data.extend(entry);
1002 new_data.extend(entry);
1003 }
1003 }
1004 new_index_handle
1004 new_index_handle
1005 .write_all(&new_data)
1005 .write_all(&new_data)
1006 .when_writing_file(&self.index_file)?;
1006 .when_writing_file(&self.index_file)?;
1007 // Replace the index with a new one because the buffer contains inline
1007 // Replace the index with a new one because the buffer contains inline
1008 // data
1008 // data
1009 self.index = Index::new(Box::new(new_data), header)?;
1009 self.index = Index::new(Box::new(new_data), header)?;
1010 self.inline = false;
1010 self.inline = false;
1011
1011
1012 self.segment_file = RandomAccessFile::new(
1012 self.segment_file = RandomAccessFile::new(
1013 dyn_clone::clone_box(&*self.vfs),
1013 dyn_clone::clone_box(&*self.vfs),
1014 self.data_file.to_owned(),
1014 self.data_file.to_owned(),
1015 );
1015 );
1016 if existing_handles {
1016 if existing_handles {
1017 // Switched from inline to conventional, reopen the index
1017 // Switched from inline to conventional, reopen the index
1018 let new_data_handle = Some(FileHandle::from_file(
1018 let new_data_handle = Some(FileHandle::from_file(
1019 new_data_file_handle,
1019 new_data_file_handle,
1020 dyn_clone::clone_box(&*self.vfs),
1020 dyn_clone::clone_box(&*self.vfs),
1021 &self.data_file,
1021 &self.data_file,
1022 ));
1022 ));
1023 self.writing_handles = Some(WriteHandles {
1023 self.writing_handles = Some(WriteHandles {
1024 index_handle: self.index_write_handle()?,
1024 index_handle: self.index_write_handle()?,
1025 data_handle: new_data_handle.clone(),
1025 data_handle: new_data_handle.clone(),
1026 });
1026 });
1027 *self.segment_file.writing_handle.borrow_mut() = new_data_handle;
1027 *self.segment_file.writing_handle.borrow_mut() = new_data_handle;
1028 }
1028 }
1029
1029
1030 Ok(self.index_file.to_owned())
1030 Ok(self.index_file.to_owned())
1031 }
1031 }
1032
1032
1033 /// Write a new entry to this revlog.
1033 /// Write a new entry to this revlog.
1034 /// - `entry` is the index bytes
1034 /// - `entry` is the index bytes
1035 /// - `header_and_data` is the compression header and the revision data
1035 /// - `header_and_data` is the compression header and the revision data
1036 /// - `offset` is the position in the data file to write to
1036 /// - `offset` is the position in the data file to write to
1037 /// - `index_end` is the overwritten position in the index in revlog-v2,
1037 /// - `index_end` is the overwritten position in the index in revlog-v2,
1038 /// since the format may allow a rewrite of garbage data at the end.
1038 /// since the format may allow a rewrite of garbage data at the end.
1039 /// - `data_end` is the overwritten position in the data-file in revlog-v2,
1039 /// - `data_end` is the overwritten position in the data-file in revlog-v2,
1040 /// since the format may allow a rewrite of garbage data at the end.
1040 /// since the format may allow a rewrite of garbage data at the end.
1041 ///
1041 ///
1042 /// XXX Why do we have `data_end` *and* `offset`? Same question in Python
1042 /// XXX Why do we have `data_end` *and* `offset`? Same question in Python
1043 pub fn write_entry(
1043 pub fn write_entry(
1044 &mut self,
1044 &mut self,
1045 mut transaction: impl Transaction,
1045 mut transaction: impl Transaction,
1046 entry: &[u8],
1046 entry: &[u8],
1047 header_and_data: (&[u8], &[u8]),
1047 header_and_data: (&[u8], &[u8]),
1048 mut offset: usize,
1048 mut offset: usize,
1049 index_end: Option<u64>,
1049 index_end: Option<u64>,
1050 data_end: Option<u64>,
1050 data_end: Option<u64>,
1051 ) -> Result<(u64, Option<u64>), HgError> {
1051 ) -> Result<(u64, Option<u64>), HgError> {
1052 let current_revision = self.len() - 1;
1052 let current_revision = self.len() - 1;
1053 let canonical_index_file = self.canonical_index_file();
1053 let canonical_index_file = self.canonical_index_file();
1054
1054
1055 let is_inline = self.is_inline();
1055 let is_inline = self.is_inline();
1056 let handles = match &mut self.writing_handles {
1056 let handles = match &mut self.writing_handles {
1057 None => {
1057 None => {
1058 return Err(HgError::abort(
1058 return Err(HgError::abort(
1059 "adding revision outside of the `with_write` context",
1059 "adding revision outside of the `with_write` context",
1060 exit_codes::ABORT,
1060 exit_codes::ABORT,
1061 None,
1061 None,
1062 ));
1062 ));
1063 }
1063 }
1064 Some(handles) => handles,
1064 Some(handles) => handles,
1065 };
1065 };
1066 let index_handle = &mut handles.index_handle;
1066 let index_handle = &mut handles.index_handle;
1067 let data_handle = &mut handles.data_handle;
1067 let data_handle = &mut handles.data_handle;
1068 if let Some(end) = index_end {
1068 if let Some(end) = index_end {
1069 index_handle
1069 index_handle
1070 .seek(SeekFrom::Start(end))
1070 .seek(SeekFrom::Start(end))
1071 .when_reading_file(&self.index_file)?;
1071 .when_reading_file(&self.index_file)?;
1072 } else {
1072 } else {
1073 index_handle
1073 index_handle
1074 .seek(SeekFrom::End(0))
1074 .seek(SeekFrom::End(0))
1075 .when_reading_file(&self.index_file)?;
1075 .when_reading_file(&self.index_file)?;
1076 }
1076 }
1077 if let Some(data_handle) = data_handle {
1077 if let Some(data_handle) = data_handle {
1078 if let Some(end) = data_end {
1078 if let Some(end) = data_end {
1079 data_handle
1079 data_handle
1080 .seek(SeekFrom::Start(end))
1080 .seek(SeekFrom::Start(end))
1081 .when_reading_file(&self.data_file)?;
1081 .when_reading_file(&self.data_file)?;
1082 } else {
1082 } else {
1083 data_handle
1083 data_handle
1084 .seek(SeekFrom::End(0))
1084 .seek(SeekFrom::End(0))
1085 .when_reading_file(&self.data_file)?;
1085 .when_reading_file(&self.data_file)?;
1086 }
1086 }
1087 }
1087 }
1088 let (header, data) = header_and_data;
1088 let (header, data) = header_and_data;
1089
1089
1090 if !is_inline {
1090 if !is_inline {
1091 transaction.add(&self.data_file, offset);
1091 transaction.add(&self.data_file, offset);
1092 transaction
1092 transaction
1093 .add(&canonical_index_file, current_revision * entry.len());
1093 .add(&canonical_index_file, current_revision * entry.len());
1094 let data_handle = data_handle
1094 let data_handle = data_handle
1095 .as_mut()
1095 .as_mut()
1096 .expect("data handle should exist when not inline");
1096 .expect("data handle should exist when not inline");
1097 if !header.is_empty() {
1097 if !header.is_empty() {
1098 data_handle.write_all(header)?;
1098 data_handle.write_all(header)?;
1099 }
1099 }
1100 data_handle.write_all(data)?;
1100 data_handle.write_all(data)?;
1101 match &mut self.delayed_buffer {
1101 match &mut self.delayed_buffer {
1102 Some(buf) => {
1102 Some(buf) => {
1103 buf.lock()
1103 buf.lock()
1104 .expect("propagate the panic")
1104 .expect("propagate the panic")
1105 .buffer
1105 .buffer
1106 .write_all(entry)
1106 .write_all(entry)
1107 .expect("write to delay buffer should succeed");
1107 .expect("write to delay buffer should succeed");
1108 }
1108 }
1109 None => index_handle.write_all(entry)?,
1109 None => index_handle.write_all(entry)?,
1110 }
1110 }
1111 } else if self.delayed_buffer.is_some() {
1111 } else if self.delayed_buffer.is_some() {
1112 return Err(HgError::abort(
1112 return Err(HgError::abort(
1113 "invalid delayed write on inline revlog",
1113 "invalid delayed write on inline revlog",
1114 exit_codes::ABORT,
1114 exit_codes::ABORT,
1115 None,
1115 None,
1116 ));
1116 ));
1117 } else {
1117 } else {
1118 offset += current_revision * entry.len();
1118 offset += current_revision * entry.len();
1119 transaction.add(&canonical_index_file, offset);
1119 transaction.add(&canonical_index_file, offset);
1120 index_handle.write_all(entry)?;
1120 index_handle.write_all(entry)?;
1121 index_handle.write_all(header)?;
1121 index_handle.write_all(header)?;
1122 index_handle.write_all(data)?;
1122 index_handle.write_all(data)?;
1123 }
1123 }
1124 let data_position = match data_handle {
1124 let data_position = match data_handle {
1125 Some(h) => Some(h.position()?),
1125 Some(h) => Some(h.position()?),
1126 None => None,
1126 None => None,
1127 };
1127 };
1128 Ok((index_handle.position()?, data_position))
1128 Ok((index_handle.position()?, data_position))
1129 }
1129 }
1130
1130
1131 /// Return the real target index file and not the temporary when diverting
1131 /// Return the real target index file and not the temporary when diverting
1132 pub fn canonical_index_file(&self) -> PathBuf {
1132 pub fn canonical_index_file(&self) -> PathBuf {
1133 self.original_index_file
1133 self.original_index_file
1134 .as_ref()
1134 .as_ref()
1135 .map(ToOwned::to_owned)
1135 .map(ToOwned::to_owned)
1136 .unwrap_or_else(|| self.index_file.to_owned())
1136 .unwrap_or_else(|| self.index_file.to_owned())
1137 }
1137 }
1138
1138
1139 /// Return the path to the diverted index
1139 /// Return the path to the diverted index
1140 fn diverted_index(&self) -> PathBuf {
1140 fn diverted_index(&self) -> PathBuf {
1141 self.index_file.with_extension("i.a")
1141 self.index_file.with_extension("i.a")
1142 }
1142 }
1143
1143
1144 /// True if we're in a [`Self::with_write`] or [`Self::with_read`] context
1144 /// True if we're in a [`Self::with_write`] or [`Self::with_read`] context
1145 pub fn is_open(&self) -> bool {
1145 pub fn is_open(&self) -> bool {
1146 self.segment_file.is_open()
1146 self.segment_file.is_open()
1147 }
1147 }
1148
1148
1149 /// Set this revlog to delay its writes to a buffer
1149 /// Set this revlog to delay its writes to a buffer
1150 pub fn delay(&mut self) -> Result<Option<PathBuf>, HgError> {
1150 pub fn delay(&mut self) -> Result<Option<PathBuf>, HgError> {
1151 assert!(!self.is_open());
1151 assert!(!self.is_open());
1152 if self.is_inline() {
1152 if self.is_inline() {
1153 return Err(HgError::abort(
1153 return Err(HgError::abort(
1154 "revlog with delayed write should not be inline",
1154 "revlog with delayed write should not be inline",
1155 exit_codes::ABORT,
1155 exit_codes::ABORT,
1156 None,
1156 None,
1157 ));
1157 ));
1158 }
1158 }
1159 if self.delayed_buffer.is_some() || self.original_index_file.is_some()
1159 if self.delayed_buffer.is_some() || self.original_index_file.is_some()
1160 {
1160 {
1161 // Delay or divert already happening
1161 // Delay or divert already happening
1162 return Ok(None);
1162 return Ok(None);
1163 }
1163 }
1164 if self.is_empty() {
1164 if self.is_empty() {
1165 self.original_index_file = Some(self.index_file.to_owned());
1165 self.original_index_file = Some(self.index_file.to_owned());
1166 self.index_file = self.diverted_index();
1166 self.index_file = self.diverted_index();
1167 if self.vfs.exists(&self.index_file) {
1167 if self.vfs.exists(&self.index_file) {
1168 self.vfs.unlink(&self.index_file)?;
1168 self.vfs.unlink(&self.index_file)?;
1169 }
1169 }
1170 Ok(Some(self.index_file.to_owned()))
1170 Ok(Some(self.index_file.to_owned()))
1171 } else {
1171 } else {
1172 self.delayed_buffer =
1172 self.delayed_buffer =
1173 Some(Arc::new(Mutex::new(DelayedBuffer::default())));
1173 Some(Arc::new(Mutex::new(DelayedBuffer::default())));
1174 Ok(None)
1174 Ok(None)
1175 }
1175 }
1176 }
1176 }
1177
1177
1178 /// Write the pending data (in memory) if any to the diverted index file
1178 /// Write the pending data (in memory) if any to the diverted index file
1179 /// (on disk temporary file)
1179 /// (on disk temporary file)
1180 pub fn write_pending(
1180 pub fn write_pending(
1181 &mut self,
1181 &mut self,
1182 ) -> Result<(Option<PathBuf>, bool), HgError> {
1182 ) -> Result<(Option<PathBuf>, bool), HgError> {
1183 assert!(!self.is_open());
1183 assert!(!self.is_open());
1184 if self.is_inline() {
1184 if self.is_inline() {
1185 return Err(HgError::abort(
1185 return Err(HgError::abort(
1186 "revlog with delayed write should not be inline",
1186 "revlog with delayed write should not be inline",
1187 exit_codes::ABORT,
1187 exit_codes::ABORT,
1188 None,
1188 None,
1189 ));
1189 ));
1190 }
1190 }
1191 if self.original_index_file.is_some() {
1191 if self.original_index_file.is_some() {
1192 return Ok((None, true));
1192 return Ok((None, true));
1193 }
1193 }
1194 let mut any_pending = false;
1194 let mut any_pending = false;
1195 let pending_index_file = self.diverted_index();
1195 let pending_index_file = self.diverted_index();
1196 if self.vfs.exists(&pending_index_file) {
1196 if self.vfs.exists(&pending_index_file) {
1197 self.vfs.unlink(&pending_index_file)?;
1197 self.vfs.unlink(&pending_index_file)?;
1198 }
1198 }
1199 self.vfs.copy(&self.index_file, &pending_index_file)?;
1199 self.vfs.copy(&self.index_file, &pending_index_file)?;
1200 if let Some(delayed_buffer) = self.delayed_buffer.take() {
1200 if let Some(delayed_buffer) = self.delayed_buffer.take() {
1201 let mut index_file_handle =
1201 let mut index_file_handle =
1202 self.vfs.open_write(&pending_index_file)?;
1202 self.vfs.open_write(&pending_index_file)?;
1203 index_file_handle
1203 index_file_handle
1204 .seek(SeekFrom::End(0))
1204 .seek(SeekFrom::End(0))
1205 .when_writing_file(&pending_index_file)?;
1205 .when_writing_file(&pending_index_file)?;
1206 let delayed_data =
1206 let delayed_data =
1207 &delayed_buffer.lock().expect("propagate the panic").buffer;
1207 &delayed_buffer.lock().expect("propagate the panic").buffer;
1208 index_file_handle
1208 index_file_handle
1209 .write_all(delayed_data)
1209 .write_all(delayed_data)
1210 .when_writing_file(&pending_index_file)?;
1210 .when_writing_file(&pending_index_file)?;
1211 any_pending = true;
1211 any_pending = true;
1212 }
1212 }
1213 self.original_index_file = Some(self.index_file.to_owned());
1213 self.original_index_file = Some(self.index_file.to_owned());
1214 self.index_file = pending_index_file;
1214 self.index_file = pending_index_file;
1215 Ok((Some(self.index_file.to_owned()), any_pending))
1215 Ok((Some(self.index_file.to_owned()), any_pending))
1216 }
1216 }
1217
1217
1218 /// Overwrite the canonical file with the diverted file, or write out the
1218 /// Overwrite the canonical file with the diverted file, or write out the
1219 /// delayed buffer.
1219 /// delayed buffer.
1220 /// Returns an error if the revlog is neither diverted nor delayed.
1220 /// Returns an error if the revlog is neither diverted nor delayed.
1221 pub fn finalize_pending(&mut self) -> Result<PathBuf, HgError> {
1221 pub fn finalize_pending(&mut self) -> Result<PathBuf, HgError> {
1222 assert!(!self.is_open());
1222 assert!(!self.is_open());
1223 if self.is_inline() {
1223 if self.is_inline() {
1224 return Err(HgError::abort(
1224 return Err(HgError::abort(
1225 "revlog with delayed write should not be inline",
1225 "revlog with delayed write should not be inline",
1226 exit_codes::ABORT,
1226 exit_codes::ABORT,
1227 None,
1227 None,
1228 ));
1228 ));
1229 }
1229 }
1230 match (
1230 match (
1231 self.delayed_buffer.as_ref(),
1231 self.delayed_buffer.as_ref(),
1232 self.original_index_file.as_ref(),
1232 self.original_index_file.as_ref(),
1233 ) {
1233 ) {
1234 (None, None) => {
1234 (None, None) => {
1235 return Err(HgError::abort(
1235 return Err(HgError::abort(
1236 "neither delay nor divert found on this revlog",
1236 "neither delay nor divert found on this revlog",
1237 exit_codes::ABORT,
1237 exit_codes::ABORT,
1238 None,
1238 None,
1239 ));
1239 ));
1240 }
1240 }
1241 (Some(delay), None) => {
1241 (Some(delay), None) => {
1242 let mut index_file_handle =
1242 let mut index_file_handle =
1243 self.vfs.open_write(&self.index_file)?;
1243 self.vfs.open_write(&self.index_file)?;
1244 index_file_handle
1244 index_file_handle
1245 .seek(SeekFrom::End(0))
1245 .seek(SeekFrom::End(0))
1246 .when_writing_file(&self.index_file)?;
1246 .when_writing_file(&self.index_file)?;
1247 index_file_handle
1247 index_file_handle
1248 .write_all(
1248 .write_all(
1249 &delay.lock().expect("propagate the panic").buffer,
1249 &delay.lock().expect("propagate the panic").buffer,
1250 )
1250 )
1251 .when_writing_file(&self.index_file)?;
1251 .when_writing_file(&self.index_file)?;
1252 self.delayed_buffer = None;
1252 self.delayed_buffer = None;
1253 }
1253 }
1254 (None, Some(divert)) => {
1254 (None, Some(divert)) => {
1255 if self.vfs.exists(&self.index_file) {
1255 if self.vfs.exists(&self.index_file) {
1256 self.vfs.rename(&self.index_file, divert, true)?;
1256 self.vfs.rename(&self.index_file, divert, true)?;
1257 }
1257 }
1258 divert.clone_into(&mut self.index_file);
1258 divert.clone_into(&mut self.index_file);
1259 self.original_index_file = None;
1259 self.original_index_file = None;
1260 }
1260 }
1261 (Some(_), Some(_)) => unreachable!(
1261 (Some(_), Some(_)) => unreachable!(
1262 "{} is in an inconsistent state of both delay and divert",
1262 "{} is in an inconsistent state of both delay and divert",
1263 self.canonical_index_file().display(),
1263 self.canonical_index_file().display(),
1264 ),
1264 ),
1265 }
1265 }
1266 Ok(self.canonical_index_file())
1266 Ok(self.canonical_index_file())
1267 }
1267 }
1268
1268
1269 /// `pub` only for `hg-cpython`. This is made a different method than
1269 /// `pub` only for `hg-cpython`. This is made a different method than
1270 /// [`Revlog::index`] in case there is a different invariant that pops up
1270 /// [`Revlog::index`] in case there is a different invariant that pops up
1271 /// later.
1271 /// later.
1272 #[doc(hidden)]
1272 #[doc(hidden)]
1273 pub fn shared_index(&self) -> &Index {
1273 pub fn shared_index(&self) -> &Index {
1274 &self.index
1274 &self.index
1275 }
1275 }
1276 }
1276 }
1277
1277
1278 /// The use of a [`Refcell`] assumes that a given revlog will only
1278 /// The use of a [`Refcell`] assumes that a given revlog will only
1279 /// be accessed (read or write) by a single thread.
1279 /// be accessed (read or write) by a single thread.
1280 type UncompressedChunkCache =
1280 type UncompressedChunkCache =
1281 RefCell<LruMap<Revision, Arc<[u8]>, ByMemoryUsage>>;
1281 RefCell<LruMap<Revision, Arc<[u8]>, ByMemoryUsage>>;
1282
1282
1283 /// The node, revision and data for the last revision we've seen. Speeds up
1283 /// The node, revision and data for the last revision we've seen. Speeds up
1284 /// a lot of sequential operations of the revlog.
1284 /// a lot of sequential operations of the revlog.
1285 ///
1285 ///
1286 /// The data is not just bytes since it can come from Python and we want to
1286 /// The data is not just bytes since it can come from Python and we want to
1287 /// avoid copies if possible.
1287 /// avoid copies if possible.
1288 type SingleRevisionCache =
1288 type SingleRevisionCache =
1289 (Node, Revision, Box<dyn Deref<Target = [u8]> + Send>);
1289 (Node, Revision, Box<dyn Deref<Target = [u8]> + Send>);
1290
1290
1291 /// A way of progressively filling a buffer with revision data, then return
1291 /// A way of progressively filling a buffer with revision data, then return
1292 /// that buffer. Used to abstract away Python-allocated code to reduce copying
1292 /// that buffer. Used to abstract away Python-allocated code to reduce copying
1293 /// for performance reasons.
1293 /// for performance reasons.
1294 pub trait RevisionBuffer {
1294 pub trait RevisionBuffer {
1295 /// The owned buffer type to return
1295 /// The owned buffer type to return
1296 type Target;
1296 type Target;
1297 /// Copies the slice into the buffer
1297 /// Copies the slice into the buffer
1298 fn extend_from_slice(&mut self, slice: &[u8]);
1298 fn extend_from_slice(&mut self, slice: &[u8]);
1299 /// Returns the now finished owned buffer
1299 /// Returns the now finished owned buffer
1300 fn finish(self) -> Self::Target;
1300 fn finish(self) -> Self::Target;
1301 }
1301 }
1302
1302
1303 /// A simple vec-based buffer. This is uselessly complicated for the pure Rust
1303 /// A simple vec-based buffer. This is uselessly complicated for the pure Rust
1304 /// case, but it's the price to pay for Python compatibility.
1304 /// case, but it's the price to pay for Python compatibility.
1305 #[derive(Debug)]
1305 #[derive(Debug)]
1306 pub(super) struct CoreRevisionBuffer {
1306 pub(super) struct CoreRevisionBuffer {
1307 buf: Vec<u8>,
1307 buf: Vec<u8>,
1308 }
1308 }
1309
1309
1310 impl CoreRevisionBuffer {
1310 impl CoreRevisionBuffer {
1311 pub fn new() -> Self {
1311 pub fn new() -> Self {
1312 Self { buf: vec![] }
1312 Self { buf: vec![] }
1313 }
1313 }
1314
1314
1315 #[inline]
1315 #[inline]
1316 pub fn resize(&mut self, size: usize) {
1316 pub fn resize(&mut self, size: usize) {
1317 self.buf.reserve_exact(size - self.buf.capacity());
1317 self.buf.reserve_exact(size - self.buf.capacity());
1318 }
1318 }
1319 }
1319 }
1320
1320
1321 impl RevisionBuffer for CoreRevisionBuffer {
1321 impl RevisionBuffer for CoreRevisionBuffer {
1322 type Target = Vec<u8>;
1322 type Target = Vec<u8>;
1323
1323
1324 #[inline]
1324 #[inline]
1325 fn extend_from_slice(&mut self, slice: &[u8]) {
1325 fn extend_from_slice(&mut self, slice: &[u8]) {
1326 self.buf.extend_from_slice(slice);
1326 self.buf.extend_from_slice(slice);
1327 }
1327 }
1328
1328
1329 #[inline]
1329 #[inline]
1330 fn finish(self) -> Self::Target {
1330 fn finish(self) -> Self::Target {
1331 self.buf
1331 self.buf
1332 }
1332 }
1333 }
1333 }
1334
1334
1335 /// Calculate the hash of a revision given its data and its parents.
1335 /// Calculate the hash of a revision given its data and its parents.
1336 pub fn hash(
1336 pub fn hash(
1337 data: &[u8],
1337 data: &[u8],
1338 p1_hash: &[u8],
1338 p1_hash: &[u8],
1339 p2_hash: &[u8],
1339 p2_hash: &[u8],
1340 ) -> [u8; NODE_BYTES_LENGTH] {
1340 ) -> [u8; NODE_BYTES_LENGTH] {
1341 let mut hasher = Sha1::new();
1341 let mut hasher = Sha1::new();
1342 let (a, b) = (p1_hash, p2_hash);
1342 let (a, b) = (p1_hash, p2_hash);
1343 if a > b {
1343 if a > b {
1344 hasher.update(b);
1344 hasher.update(b);
1345 hasher.update(a);
1345 hasher.update(a);
1346 } else {
1346 } else {
1347 hasher.update(a);
1347 hasher.update(a);
1348 hasher.update(b);
1348 hasher.update(b);
1349 }
1349 }
1350 hasher.update(data);
1350 hasher.update(data);
1351 *hasher.finalize().as_ref()
1351 *hasher.finalize().as_ref()
1352 }
1352 }
General Comments 0
You need to be logged in to leave comments. Login now