##// END OF EJS Templates
rust-compression: move the `Send` bound to the `Compressor` trait...
Raphaël Gomès -
r53202:f69a3f55 default
parent child Browse files
Show More
@@ -1,383 +1,383
1 1 //! Helpers around revlog compression
2 2
3 3 use std::cell::RefCell;
4 4 use std::collections::HashSet;
5 5 use std::io::Read;
6 6
7 7 use flate2::bufread::ZlibEncoder;
8 8 use flate2::read::ZlibDecoder;
9 9
10 10 use crate::config::Config;
11 11 use crate::errors::HgError;
12 12 use crate::exit_codes;
13 13
14 14 use super::corrupted;
15 15 use super::RevlogError;
16 16
17 17 /// Header byte used to identify ZSTD-compressed data
18 18 pub const ZSTD_BYTE: u8 = b'\x28';
19 19 /// Header byte used to identify Zlib-compressed data
20 20 pub const ZLIB_BYTE: u8 = b'x';
21 21
22 22 const ZSTD_DEFAULT_LEVEL: u8 = 3;
23 23 const ZLIB_DEFAULT_LEVEL: u8 = 6;
24 24 /// The length of data below which we don't even try to compress it when using
25 25 /// Zstandard.
26 26 const MINIMUM_LENGTH_ZSTD: usize = 50;
27 27 /// The length of data below which we don't even try to compress it when using
28 28 /// Zlib.
29 29 const MINIMUM_LENGTH_ZLIB: usize = 44;
30 30
31 31 /// Defines the available compression engines and their options.
32 32 #[derive(Debug, Copy, Clone, PartialEq, Eq)]
33 33 pub enum CompressionConfig {
34 34 Zlib {
35 35 /// Between 0 and 9 included
36 36 level: u8,
37 37 },
38 38 Zstd {
39 39 /// Between 0 and 22 included
40 40 level: u8,
41 41 /// Never used in practice for now
42 42 threads: u8,
43 43 },
44 44 /// No compression is performed
45 45 None,
46 46 }
47 47
48 48 impl CompressionConfig {
49 49 pub fn new(
50 50 config: &Config,
51 51 requirements: &HashSet<String>,
52 52 ) -> Result<Self, HgError> {
53 53 let mut new = Self::default();
54 54
55 55 let zlib_level = config.get_u32(b"storage", b"revlog.zlib.level")?;
56 56 let zstd_level = config.get_u32(b"storage", b"revlog.zstd.level")?;
57 57
58 58 for requirement in requirements {
59 59 if requirement.starts_with("revlog-compression-")
60 60 || requirement.starts_with("exp-compression-")
61 61 {
62 62 let split = &mut requirement.splitn(3, '-');
63 63 split.next();
64 64 split.next();
65 65 new = match split.next().unwrap() {
66 66 "zstd" => CompressionConfig::zstd(zstd_level)?,
67 67 e => {
68 68 return Err(HgError::UnsupportedFeature(format!(
69 69 "Unsupported compression engine '{e}'"
70 70 )))
71 71 }
72 72 };
73 73 }
74 74 }
75 75 if let Some(level) = zlib_level {
76 76 if matches!(new, CompressionConfig::Zlib { .. }) {
77 77 new.set_level(level as usize)?;
78 78 }
79 79 }
80 80 Ok(new)
81 81 }
82 82
83 83 /// Sets the level of the current compression engine
84 84 pub fn set_level(&mut self, new_level: usize) -> Result<(), HgError> {
85 85 match self {
86 86 CompressionConfig::Zlib { level } => {
87 87 if new_level > 9 {
88 88 return Err(HgError::abort(
89 89 format!(
90 90 "invalid compression zlib compression level {}, \
91 91 expected between 0 and 9 included",
92 92 new_level
93 93 ),
94 94 exit_codes::ABORT,
95 95 None,
96 96 ));
97 97 }
98 98 *level = new_level as u8;
99 99 }
100 100 CompressionConfig::Zstd { level, .. } => {
101 101 if new_level > 22 {
102 102 return Err(HgError::abort(
103 103 format!(
104 104 "invalid compression zstd compression level {}, \
105 105 expected between 0 and 22 included",
106 106 new_level
107 107 ),
108 108 exit_codes::ABORT,
109 109 None,
110 110 ));
111 111 }
112 112 *level = new_level as u8;
113 113 }
114 114 CompressionConfig::None => {}
115 115 }
116 116 Ok(())
117 117 }
118 118
119 119 /// Return a ZSTD compression config
120 120 pub fn zstd(
121 121 zstd_level: Option<u32>,
122 122 ) -> Result<CompressionConfig, HgError> {
123 123 let mut engine = CompressionConfig::Zstd {
124 124 level: ZSTD_DEFAULT_LEVEL,
125 125 threads: 0,
126 126 };
127 127 if let Some(level) = zstd_level {
128 128 engine.set_level(level as usize)?;
129 129 }
130 130 Ok(engine)
131 131 }
132 132 }
133 133
134 134 impl Default for CompressionConfig {
135 135 fn default() -> Self {
136 136 Self::Zlib {
137 137 level: ZLIB_DEFAULT_LEVEL,
138 138 }
139 139 }
140 140 }
141 141
142 142 /// A high-level trait to define compressors that should be able to compress
143 143 /// and decompress arbitrary bytes.
144 pub trait Compressor {
144 pub trait Compressor: Send {
145 145 /// Returns a new [`Vec`] with the compressed data.
146 146 /// Should return `Ok(None)` if compression does not apply (e.g. too small)
147 147 fn compress(
148 148 &mut self,
149 149 data: &[u8],
150 150 ) -> Result<Option<Vec<u8>>, RevlogError>;
151 151 /// Returns a new [`Vec`] with the decompressed data.
152 152 fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, RevlogError>;
153 153 }
154 154
155 155 /// A compressor that does nothing (useful in tests)
156 156 pub struct NoneCompressor;
157 157
158 158 impl Compressor for NoneCompressor {
159 159 fn compress(
160 160 &mut self,
161 161 _data: &[u8],
162 162 ) -> Result<Option<Vec<u8>>, RevlogError> {
163 163 Ok(None)
164 164 }
165 165
166 166 fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, RevlogError> {
167 167 Ok(data.to_owned())
168 168 }
169 169 }
170 170
171 171 /// A compressor for Zstandard
172 172 pub struct ZstdCompressor {
173 173 /// Level of compression to use
174 174 level: u8,
175 175 /// How many threads are used (not implemented yet)
176 176 threads: u8,
177 177 /// The underlying zstd compressor
178 178 compressor: zstd::bulk::Compressor<'static>,
179 179 }
180 180
181 181 impl ZstdCompressor {
182 182 pub fn new(level: u8, threads: u8) -> Self {
183 183 Self {
184 184 level,
185 185 threads,
186 186 compressor: zstd::bulk::Compressor::new(level.into())
187 187 .expect("invalid zstd arguments"),
188 188 }
189 189 }
190 190 }
191 191
192 192 impl Compressor for ZstdCompressor {
193 193 fn compress(
194 194 &mut self,
195 195 data: &[u8],
196 196 ) -> Result<Option<Vec<u8>>, RevlogError> {
197 197 if self.threads != 0 {
198 198 // TODO use a zstd builder + zstd cargo feature to support this
199 199 unimplemented!("zstd parallel compression is not implemented");
200 200 }
201 201 if data.len() < MINIMUM_LENGTH_ZSTD {
202 202 return Ok(None);
203 203 }
204 204 let level = self.level as i32;
205 205 if data.len() <= 1000000 {
206 206 let compressed = self.compressor.compress(data).map_err(|e| {
207 207 corrupted(format!("revlog compress error: {}", e))
208 208 })?;
209 209 Ok(if compressed.len() < data.len() {
210 210 Some(compressed)
211 211 } else {
212 212 None
213 213 })
214 214 } else {
215 215 Ok(Some(zstd::stream::encode_all(data, level).map_err(
216 216 |e| corrupted(format!("revlog compress error: {}", e)),
217 217 )?))
218 218 }
219 219 }
220 220
221 221 fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, RevlogError> {
222 222 zstd::stream::decode_all(data).map_err(|e| {
223 223 corrupted(format!("revlog decompress error: {}", e)).into()
224 224 })
225 225 }
226 226 }
227 227
228 228 /// A compressor for Zlib
229 229 pub struct ZlibCompressor {
230 230 /// Level of compression to use
231 231 level: flate2::Compression,
232 232 }
233 233
234 234 impl ZlibCompressor {
235 235 pub fn new(level: u8) -> Self {
236 236 Self {
237 237 level: flate2::Compression::new(level.into()),
238 238 }
239 239 }
240 240 }
241 241
242 242 impl Compressor for ZlibCompressor {
243 243 fn compress(
244 244 &mut self,
245 245 data: &[u8],
246 246 ) -> Result<Option<Vec<u8>>, RevlogError> {
247 247 assert!(!data.is_empty());
248 248 if data.len() < MINIMUM_LENGTH_ZLIB {
249 249 return Ok(None);
250 250 }
251 251 let mut buf = Vec::with_capacity(data.len());
252 252 ZlibEncoder::new(data, self.level)
253 253 .read_to_end(&mut buf)
254 254 .map_err(|e| corrupted(format!("revlog compress error: {}", e)))?;
255 255
256 256 Ok(if buf.len() < data.len() {
257 257 buf.shrink_to_fit();
258 258 Some(buf)
259 259 } else {
260 260 None
261 261 })
262 262 }
263 263
264 264 fn decompress(&self, data: &[u8]) -> Result<Vec<u8>, RevlogError> {
265 265 let mut decoder = ZlibDecoder::new(data);
266 266 // TODO reuse the allocation somehow?
267 267 let mut buf = vec![];
268 268 decoder.read_to_end(&mut buf).map_err(|e| {
269 269 corrupted(format!("revlog decompress error: {}", e))
270 270 })?;
271 271 Ok(buf)
272 272 }
273 273 }
274 274
275 275 thread_local! {
276 276 // seems fine to [unwrap] here: this can only fail due to memory allocation
277 277 // failing, and it's normal for that to cause panic.
278 278 static ZSTD_DECODER : RefCell<zstd::bulk::Decompressor<'static>> =
279 279 RefCell::new(zstd::bulk::Decompressor::new().ok().unwrap());
280 280 }
281 281
282 282 /// Util to wrap the reuse of a zstd decoder while controlling its buffer size.
283 283 fn zstd_decompress_to_buffer(
284 284 bytes: &[u8],
285 285 buf: &mut Vec<u8>,
286 286 ) -> Result<usize, std::io::Error> {
287 287 ZSTD_DECODER
288 288 .with(|decoder| decoder.borrow_mut().decompress_to_buffer(bytes, buf))
289 289 }
290 290
291 291 /// Specialized revlog decompression to use less memory for deltas while
292 292 /// keeping performance acceptable.
293 293 pub(super) fn uncompressed_zstd_data(
294 294 bytes: &[u8],
295 295 is_delta: bool,
296 296 uncompressed_len: i32,
297 297 ) -> Result<Vec<u8>, HgError> {
298 298 let cap = uncompressed_len.max(0) as usize;
299 299 if is_delta {
300 300 // [cap] is usually an over-estimate of the space needed because
301 301 // it's the length of delta-decoded data, but we're interested
302 302 // in the size of the delta.
303 303 // This means we have to [shrink_to_fit] to avoid holding on
304 304 // to a large chunk of memory, but it also means we must have a
305 305 // fallback branch, for the case when the delta is longer than
306 306 // the original data (surprisingly, this does happen in practice)
307 307 let mut buf = Vec::with_capacity(cap);
308 308 match zstd_decompress_to_buffer(bytes, &mut buf) {
309 309 Ok(_) => buf.shrink_to_fit(),
310 310 Err(_) => {
311 311 buf.clear();
312 312 zstd::stream::copy_decode(bytes, &mut buf)
313 313 .map_err(|e| corrupted(e.to_string()))?;
314 314 }
315 315 };
316 316 Ok(buf)
317 317 } else {
318 318 let mut buf = Vec::with_capacity(cap);
319 319 let len = zstd_decompress_to_buffer(bytes, &mut buf)
320 320 .map_err(|e| corrupted(e.to_string()))?;
321 321 if len != uncompressed_len as usize {
322 322 Err(corrupted("uncompressed length does not match"))
323 323 } else {
324 324 Ok(buf)
325 325 }
326 326 }
327 327 }
328 328
329 329 #[cfg(test)]
330 330 mod tests {
331 331 use super::*;
332 332
333 333 const LARGE_TEXT: &[u8] = b"
334 334 Patali Dirapata, Cromda Cromda Ripalo, Pata Pata, Ko Ko Ko
335 335 Bokoro Dipoulito, Rondi Rondi Pepino, Pata Pata, Ko Ko Ko
336 336 Emana Karassoli, Loucra Loucra Nonponto, Pata Pata, Ko Ko Ko.";
337 337
338 338 #[test]
339 339 fn test_zlib_compressor() {
340 340 // Can return `Ok(None)`
341 341 let mut compressor = ZlibCompressor::new(1);
342 342 assert_eq!(compressor.compress(b"too small").unwrap(), None);
343 343
344 344 // Compression returns bytes
345 345 let compressed_with_1 =
346 346 compressor.compress(LARGE_TEXT).unwrap().unwrap();
347 347 assert!(compressed_with_1.len() < LARGE_TEXT.len());
348 348 // Round trip works
349 349 assert_eq!(
350 350 compressor.decompress(&compressed_with_1).unwrap(),
351 351 LARGE_TEXT
352 352 );
353 353
354 354 // Compression levels mean something
355 355 let mut compressor = ZlibCompressor::new(9);
356 356 // Compression returns bytes
357 357 let compressed = compressor.compress(LARGE_TEXT).unwrap().unwrap();
358 358 assert!(compressed.len() < compressed_with_1.len());
359 359 }
360 360
361 361 #[test]
362 362 fn test_zstd_compressor() {
363 363 // Can return `Ok(None)`
364 364 let mut compressor = ZstdCompressor::new(1, 0);
365 365 assert_eq!(compressor.compress(b"too small").unwrap(), None);
366 366
367 367 // Compression returns bytes
368 368 let compressed_with_1 =
369 369 compressor.compress(LARGE_TEXT).unwrap().unwrap();
370 370 assert!(compressed_with_1.len() < LARGE_TEXT.len());
371 371 // Round trip works
372 372 assert_eq!(
373 373 compressor.decompress(&compressed_with_1).unwrap(),
374 374 LARGE_TEXT
375 375 );
376 376
377 377 // Compression levels mean something
378 378 let mut compressor = ZstdCompressor::new(22, 0);
379 379 // Compression returns bytes
380 380 let compressed = compressor.compress(LARGE_TEXT).unwrap().unwrap();
381 381 assert!(compressed.len() < compressed_with_1.len());
382 382 }
383 383 }
@@ -1,1352 +1,1352
1 1 //! A layer of lower-level revlog functionality to encapsulate most of the
2 2 //! IO work and expensive operations.
3 3 use std::{
4 4 borrow::Cow,
5 5 cell::RefCell,
6 6 io::{ErrorKind, Seek, SeekFrom, Write},
7 7 ops::Deref,
8 8 path::PathBuf,
9 9 sync::{Arc, Mutex},
10 10 };
11 11
12 12 use schnellru::{ByMemoryUsage, LruMap};
13 13 use sha1::{Digest, Sha1};
14 14
15 15 use crate::{
16 16 errors::{HgError, IoResultExt},
17 17 exit_codes,
18 18 transaction::Transaction,
19 19 vfs::Vfs,
20 20 };
21 21
22 22 use super::{
23 23 compression::{
24 24 uncompressed_zstd_data, CompressionConfig, Compressor, NoneCompressor,
25 25 ZlibCompressor, ZstdCompressor, ZLIB_BYTE, ZSTD_BYTE,
26 26 },
27 27 file_io::{DelayedBuffer, FileHandle, RandomAccessFile, WriteHandles},
28 28 index::{Index, IndexHeader, INDEX_ENTRY_SIZE},
29 29 node::{NODE_BYTES_LENGTH, NULL_NODE},
30 30 options::{RevlogDataConfig, RevlogDeltaConfig, RevlogFeatureConfig},
31 31 BaseRevision, Node, Revision, RevlogEntry, RevlogError, RevlogIndex,
32 32 UncheckedRevision, NULL_REVISION, NULL_REVLOG_ENTRY_FLAGS,
33 33 };
34 34
35 35 /// Matches the `_InnerRevlog` class in the Python code, as an arbitrary
36 36 /// boundary to incrementally rewrite higher-level revlog functionality in
37 37 /// Rust.
38 38 pub struct InnerRevlog {
39 39 /// When index and data are not interleaved: bytes of the revlog index.
40 40 /// When index and data are interleaved (inline revlog): bytes of the
41 41 /// revlog index and data.
42 42 pub index: Index,
43 43 /// The store vfs that is used to interact with the filesystem
44 44 vfs: Box<dyn Vfs>,
45 45 /// The index file path, relative to the vfs root
46 46 pub index_file: PathBuf,
47 47 /// The data file path, relative to the vfs root (same as `index_file`
48 48 /// if inline)
49 49 data_file: PathBuf,
50 50 /// Data config that applies to this revlog
51 51 data_config: RevlogDataConfig,
52 52 /// Delta config that applies to this revlog
53 53 delta_config: RevlogDeltaConfig,
54 54 /// Feature config that applies to this revlog
55 55 #[allow(unused)]
56 56 feature_config: RevlogFeatureConfig,
57 57 /// A view into this revlog's data file
58 58 segment_file: RandomAccessFile,
59 59 /// A cache of uncompressed chunks that have previously been restored.
60 60 /// Its eviction policy is defined in [`Self::new`].
61 61 uncompressed_chunk_cache: Option<UncompressedChunkCache>,
62 62 /// Used to keep track of the actual target during diverted writes
63 63 /// for the changelog
64 64 original_index_file: Option<PathBuf>,
65 65 /// Write handles to the index and data files
66 66 /// XXX why duplicate from `index` and `segment_file`?
67 67 writing_handles: Option<WriteHandles>,
68 68 /// See [`DelayedBuffer`].
69 69 delayed_buffer: Option<Arc<Mutex<DelayedBuffer>>>,
70 70 /// Whether this revlog is inline. XXX why duplicate from `index`?
71 71 pub inline: bool,
72 72 /// A cache of the last revision, which is usually accessed multiple
73 73 /// times.
74 74 pub last_revision_cache: Mutex<Option<SingleRevisionCache>>,
75 75 /// The [`Compressor`] that this revlog uses by default to compress data.
76 76 /// This does not mean that this revlog uses this compressor for reading
77 77 /// data, as different revisions may have different compression modes.
78 compressor: Mutex<Box<dyn Compressor + Send>>,
78 compressor: Mutex<Box<dyn Compressor>>,
79 79 }
80 80
81 81 impl InnerRevlog {
82 82 pub fn new(
83 83 vfs: Box<dyn Vfs>,
84 84 index: Index,
85 85 index_file: PathBuf,
86 86 data_file: PathBuf,
87 87 data_config: RevlogDataConfig,
88 88 delta_config: RevlogDeltaConfig,
89 89 feature_config: RevlogFeatureConfig,
90 90 ) -> Self {
91 91 assert!(index_file.is_relative());
92 92 assert!(data_file.is_relative());
93 93 let segment_file = RandomAccessFile::new(
94 94 dyn_clone::clone_box(&*vfs),
95 95 if index.is_inline() {
96 96 index_file.to_owned()
97 97 } else {
98 98 data_file.to_owned()
99 99 },
100 100 );
101 101
102 102 let uncompressed_chunk_cache =
103 103 data_config.uncompressed_cache_factor.map(
104 104 // Arbitrary initial value
105 105 // TODO check if using a hasher specific to integers is useful
106 106 |_factor| RefCell::new(LruMap::with_memory_budget(65536)),
107 107 );
108 108
109 109 let inline = index.is_inline();
110 110 Self {
111 111 index,
112 112 vfs,
113 113 index_file,
114 114 data_file,
115 115 data_config,
116 116 delta_config,
117 117 feature_config,
118 118 segment_file,
119 119 uncompressed_chunk_cache,
120 120 original_index_file: None,
121 121 writing_handles: None,
122 122 delayed_buffer: None,
123 123 inline,
124 124 last_revision_cache: Mutex::new(None),
125 125 compressor: Mutex::new(match feature_config.compression_engine {
126 126 CompressionConfig::Zlib { level } => {
127 127 Box::new(ZlibCompressor::new(level))
128 128 }
129 129 CompressionConfig::Zstd { level, threads } => {
130 130 Box::new(ZstdCompressor::new(level, threads))
131 131 }
132 132 CompressionConfig::None => Box::new(NoneCompressor),
133 133 }),
134 134 }
135 135 }
136 136
137 137 /// Return number of entries of the revlog index
138 138 pub fn len(&self) -> usize {
139 139 self.index.len()
140 140 }
141 141
142 142 /// Return `true` if this revlog has no entries
143 143 pub fn is_empty(&self) -> bool {
144 144 self.len() == 0
145 145 }
146 146
147 147 /// Return whether this revlog is inline (mixed index and data)
148 148 pub fn is_inline(&self) -> bool {
149 149 self.inline
150 150 }
151 151
152 152 /// Clear all caches from this revlog
153 153 pub fn clear_cache(&mut self) {
154 154 assert!(!self.is_delaying());
155 155 if let Some(cache) = self.uncompressed_chunk_cache.as_ref() {
156 156 // We don't clear the allocation here because it's probably faster.
157 157 // We could change our minds later if this ends up being a problem
158 158 // with regards to memory consumption.
159 159 cache.borrow_mut().clear();
160 160 }
161 161 }
162 162
163 163 /// Return an entry for the null revision
164 164 pub fn make_null_entry(&self) -> RevlogEntry {
165 165 RevlogEntry {
166 166 revlog: self,
167 167 rev: NULL_REVISION,
168 168 uncompressed_len: 0,
169 169 p1: NULL_REVISION,
170 170 p2: NULL_REVISION,
171 171 flags: NULL_REVLOG_ENTRY_FLAGS,
172 172 hash: NULL_NODE,
173 173 }
174 174 }
175 175
176 176 /// Return the [`RevlogEntry`] for a [`Revision`] that is known to exist
177 177 pub fn get_entry(
178 178 &self,
179 179 rev: Revision,
180 180 ) -> Result<RevlogEntry, RevlogError> {
181 181 if rev == NULL_REVISION {
182 182 return Ok(self.make_null_entry());
183 183 }
184 184 let index_entry = self
185 185 .index
186 186 .get_entry(rev)
187 187 .ok_or_else(|| RevlogError::InvalidRevision(rev.to_string()))?;
188 188 let p1 =
189 189 self.index.check_revision(index_entry.p1()).ok_or_else(|| {
190 190 RevlogError::corrupted(format!(
191 191 "p1 for rev {} is invalid",
192 192 rev
193 193 ))
194 194 })?;
195 195 let p2 =
196 196 self.index.check_revision(index_entry.p2()).ok_or_else(|| {
197 197 RevlogError::corrupted(format!(
198 198 "p2 for rev {} is invalid",
199 199 rev
200 200 ))
201 201 })?;
202 202 let entry = RevlogEntry {
203 203 revlog: self,
204 204 rev,
205 205 uncompressed_len: index_entry.uncompressed_len(),
206 206 p1,
207 207 p2,
208 208 flags: index_entry.flags(),
209 209 hash: *index_entry.hash(),
210 210 };
211 211 Ok(entry)
212 212 }
213 213
214 214 /// Return the [`RevlogEntry`] for `rev`. If `rev` fails to check, this
215 215 /// returns a [`RevlogError`].
216 216 pub fn get_entry_for_unchecked_rev(
217 217 &self,
218 218 rev: UncheckedRevision,
219 219 ) -> Result<RevlogEntry, RevlogError> {
220 220 if rev == NULL_REVISION.into() {
221 221 return Ok(self.make_null_entry());
222 222 }
223 223 let rev = self.index.check_revision(rev).ok_or_else(|| {
224 224 RevlogError::corrupted(format!("rev {} is invalid", rev))
225 225 })?;
226 226 self.get_entry(rev)
227 227 }
228 228
229 229 /// Is the revlog currently delaying the visibility of written data?
230 230 ///
231 231 /// The delaying mechanism can be either in-memory or written on disk in a
232 232 /// side-file.
233 233 pub fn is_delaying(&self) -> bool {
234 234 self.delayed_buffer.is_some() || self.original_index_file.is_some()
235 235 }
236 236
237 237 /// The offset of the data chunk for this revision
238 238 #[inline(always)]
239 239 pub fn data_start(&self, rev: Revision) -> usize {
240 240 self.index.start(
241 241 rev,
242 242 &self
243 243 .index
244 244 .get_entry(rev)
245 245 .unwrap_or_else(|| self.index.make_null_entry()),
246 246 )
247 247 }
248 248
249 249 /// The length of the data chunk for this revision
250 250 #[inline(always)]
251 251 pub fn data_compressed_length(&self, rev: Revision) -> usize {
252 252 self.index
253 253 .get_entry(rev)
254 254 .unwrap_or_else(|| self.index.make_null_entry())
255 255 .compressed_len() as usize
256 256 }
257 257
258 258 /// The end of the data chunk for this revision
259 259 #[inline(always)]
260 260 pub fn data_end(&self, rev: Revision) -> usize {
261 261 self.data_start(rev) + self.data_compressed_length(rev)
262 262 }
263 263
264 264 /// Return the delta parent of the given revision
265 265 pub fn delta_parent(&self, rev: Revision) -> Revision {
266 266 let base = self
267 267 .index
268 268 .get_entry(rev)
269 269 .unwrap()
270 270 .base_revision_or_base_of_delta_chain();
271 271 if base.0 == rev.0 {
272 272 NULL_REVISION
273 273 } else if self.delta_config.general_delta {
274 274 Revision(base.0)
275 275 } else {
276 276 Revision(rev.0 - 1)
277 277 }
278 278 }
279 279
280 280 /// Return whether `rev` points to a snapshot revision (i.e. does not have
281 281 /// a delta base).
282 282 pub fn is_snapshot(&self, rev: Revision) -> Result<bool, RevlogError> {
283 283 if !self.delta_config.sparse_revlog {
284 284 return Ok(self.delta_parent(rev) == NULL_REVISION);
285 285 }
286 286 self.index.is_snapshot_unchecked(rev)
287 287 }
288 288
289 289 /// Return the delta chain for `rev` according to this revlog's config.
290 290 /// See [`Index::delta_chain`] for more information.
291 291 pub fn delta_chain(
292 292 &self,
293 293 rev: Revision,
294 294 stop_rev: Option<Revision>,
295 295 ) -> Result<(Vec<Revision>, bool), HgError> {
296 296 self.index.delta_chain(
297 297 rev,
298 298 stop_rev,
299 299 self.delta_config.general_delta.into(),
300 300 )
301 301 }
302 302
303 303 /// Generate a possibly-compressed representation of data.
304 304 /// Returns `None` if the data was not compressed.
305 305 pub fn compress<'data>(
306 306 &self,
307 307 data: &'data [u8],
308 308 ) -> Result<Option<Cow<'data, [u8]>>, RevlogError> {
309 309 if data.is_empty() {
310 310 return Ok(Some(data.into()));
311 311 }
312 312 let res = self.compressor.lock().unwrap().compress(data)?;
313 313 if let Some(compressed) = res {
314 314 // The revlog compressor added the header in the returned data.
315 315 return Ok(Some(compressed.into()));
316 316 }
317 317
318 318 if data[0] == b'\0' {
319 319 return Ok(Some(data.into()));
320 320 }
321 321 Ok(None)
322 322 }
323 323
324 324 /// Decompress a revlog chunk.
325 325 ///
326 326 /// The chunk is expected to begin with a header identifying the
327 327 /// format type so it can be routed to an appropriate decompressor.
328 328 pub fn decompress<'a>(
329 329 &'a self,
330 330 data: &'a [u8],
331 331 ) -> Result<Cow<[u8]>, RevlogError> {
332 332 if data.is_empty() {
333 333 return Ok(data.into());
334 334 }
335 335
336 336 // Revlogs are read much more frequently than they are written and many
337 337 // chunks only take microseconds to decompress, so performance is
338 338 // important here.
339 339
340 340 let header = data[0];
341 341 match header {
342 342 // Settings don't matter as they only affect compression
343 343 ZLIB_BYTE => Ok(ZlibCompressor::new(0).decompress(data)?.into()),
344 344 // Settings don't matter as they only affect compression
345 345 ZSTD_BYTE => {
346 346 Ok(ZstdCompressor::new(0, 0).decompress(data)?.into())
347 347 }
348 348 b'\0' => Ok(data.into()),
349 349 b'u' => Ok((&data[1..]).into()),
350 350 other => Err(HgError::UnsupportedFeature(format!(
351 351 "unknown compression header '{}'",
352 352 other
353 353 ))
354 354 .into()),
355 355 }
356 356 }
357 357
358 358 /// Obtain a segment of raw data corresponding to a range of revisions.
359 359 ///
360 360 /// Requests for data may be satisfied by a cache.
361 361 ///
362 362 /// Returns a 2-tuple of (offset, data) for the requested range of
363 363 /// revisions. Offset is the integer offset from the beginning of the
364 364 /// revlog and data is a slice of the raw byte data.
365 365 ///
366 366 /// Callers will need to call `self.start(rev)` and `self.length(rev)`
367 367 /// to determine where each revision's data begins and ends.
368 368 pub fn get_segment_for_revs(
369 369 &self,
370 370 start_rev: Revision,
371 371 end_rev: Revision,
372 372 ) -> Result<(usize, Vec<u8>), HgError> {
373 373 let start = if start_rev == NULL_REVISION {
374 374 0
375 375 } else {
376 376 let start_entry = self
377 377 .index
378 378 .get_entry(start_rev)
379 379 .expect("null revision segment");
380 380 self.index.start(start_rev, &start_entry)
381 381 };
382 382 let end_entry = self
383 383 .index
384 384 .get_entry(end_rev)
385 385 .expect("null revision segment");
386 386 let end = self.index.start(end_rev, &end_entry)
387 387 + self.data_compressed_length(end_rev);
388 388
389 389 let length = end - start;
390 390
391 391 // XXX should we use mmap instead of doing this for platforms that
392 392 // support madvise/populate?
393 393 Ok((start, self.segment_file.read_chunk(start, length)?))
394 394 }
395 395
396 396 /// Return the uncompressed raw data for `rev`
397 397 pub fn chunk_for_rev(&self, rev: Revision) -> Result<Arc<[u8]>, HgError> {
398 398 if let Some(cache) = self.uncompressed_chunk_cache.as_ref() {
399 399 if let Some(chunk) = cache.borrow_mut().get(&rev) {
400 400 return Ok(chunk.clone());
401 401 }
402 402 }
403 403 // TODO revlogv2 should check the compression mode
404 404 let data = self.get_segment_for_revs(rev, rev)?.1;
405 405 let uncompressed = self.decompress(&data).map_err(|e| {
406 406 HgError::abort(
407 407 format!("revlog decompression error: {}", e),
408 408 exit_codes::ABORT,
409 409 None,
410 410 )
411 411 })?;
412 412 let uncompressed: Arc<[u8]> = Arc::from(uncompressed.into_owned());
413 413 if let Some(cache) = self.uncompressed_chunk_cache.as_ref() {
414 414 cache.borrow_mut().insert(rev, uncompressed.clone());
415 415 }
416 416 Ok(uncompressed)
417 417 }
418 418
419 419 /// Execute `func` within a read context for the data file, meaning that
420 420 /// the read handle will be taken and discarded after the operation.
421 421 pub fn with_read<R>(
422 422 &self,
423 423 func: impl FnOnce() -> Result<R, RevlogError>,
424 424 ) -> Result<R, RevlogError> {
425 425 self.enter_reading_context()?;
426 426 let res = func();
427 427 self.exit_reading_context();
428 428 res.map_err(Into::into)
429 429 }
430 430
431 431 /// `pub` only for use in hg-cpython
432 432 #[doc(hidden)]
433 433 pub fn enter_reading_context(&self) -> Result<(), HgError> {
434 434 if self.is_empty() {
435 435 // Nothing to be read
436 436 return Ok(());
437 437 }
438 438 if self.delayed_buffer.is_some() && self.is_inline() {
439 439 return Err(HgError::abort(
440 440 "revlog with delayed write should not be inline",
441 441 exit_codes::ABORT,
442 442 None,
443 443 ));
444 444 }
445 445 self.segment_file.get_read_handle()?;
446 446 Ok(())
447 447 }
448 448
449 449 /// `pub` only for use in hg-cpython
450 450 #[doc(hidden)]
451 451 pub fn exit_reading_context(&self) {
452 452 self.segment_file.exit_reading_context()
453 453 }
454 454
455 455 /// Fill the buffer returned by `get_buffer` with the possibly un-validated
456 456 /// raw text for a revision. It can be already validated if it comes
457 457 /// from the cache.
458 458 pub fn raw_text<G, T>(
459 459 &self,
460 460 rev: Revision,
461 461 get_buffer: G,
462 462 ) -> Result<(), RevlogError>
463 463 where
464 464 G: FnOnce(
465 465 usize,
466 466 &mut dyn FnMut(
467 467 &mut dyn RevisionBuffer<Target = T>,
468 468 ) -> Result<(), RevlogError>,
469 469 ) -> Result<(), RevlogError>,
470 470 {
471 471 let entry = &self.get_entry(rev)?;
472 472 let raw_size = entry.uncompressed_len();
473 473 let mut mutex_guard = self
474 474 .last_revision_cache
475 475 .lock()
476 476 .expect("lock should not be held");
477 477 let cached_rev = if let Some((_node, rev, data)) = &*mutex_guard {
478 478 Some((*rev, data.deref().as_ref()))
479 479 } else {
480 480 None
481 481 };
482 482 if let Some(cache) = &self.uncompressed_chunk_cache {
483 483 let cache = &mut cache.borrow_mut();
484 484 if let Some(size) = raw_size {
485 485 // Dynamically update the uncompressed_chunk_cache size to the
486 486 // largest revision we've seen in this revlog.
487 487 // Do it *before* restoration in case the current revision
488 488 // is the largest.
489 489 let factor = self
490 490 .data_config
491 491 .uncompressed_cache_factor
492 492 .expect("cache should not exist without factor");
493 493 let candidate_size = (size as f64 * factor) as usize;
494 494 let limiter_mut = cache.limiter_mut();
495 495 if candidate_size > limiter_mut.max_memory_usage() {
496 496 std::mem::swap(
497 497 limiter_mut,
498 498 &mut ByMemoryUsage::new(candidate_size),
499 499 );
500 500 }
501 501 }
502 502 }
503 503 entry.rawdata(cached_rev, get_buffer)?;
504 504 // drop cache to save memory, the caller is expected to update
505 505 // the revision cache after validating the text
506 506 mutex_guard.take();
507 507 Ok(())
508 508 }
509 509
510 510 /// Only `pub` for `hg-cpython`.
511 511 /// Obtain decompressed raw data for the specified revisions that are
512 512 /// assumed to be in ascending order.
513 513 ///
514 514 /// Returns a list with decompressed data for each requested revision.
515 515 #[doc(hidden)]
516 516 pub fn chunks(
517 517 &self,
518 518 revs: Vec<Revision>,
519 519 target_size: Option<u64>,
520 520 ) -> Result<Vec<Arc<[u8]>>, RevlogError> {
521 521 if revs.is_empty() {
522 522 return Ok(vec![]);
523 523 }
524 524 let mut fetched_revs = vec![];
525 525 let mut chunks = Vec::with_capacity(revs.len());
526 526
527 527 match self.uncompressed_chunk_cache.as_ref() {
528 528 Some(cache) => {
529 529 let mut cache = cache.borrow_mut();
530 530 for rev in revs.iter() {
531 531 match cache.get(rev) {
532 532 Some(hit) => chunks.push((*rev, hit.to_owned())),
533 533 None => fetched_revs.push(*rev),
534 534 }
535 535 }
536 536 }
537 537 None => fetched_revs = revs,
538 538 }
539 539
540 540 let already_cached = chunks.len();
541 541
542 542 let sliced_chunks = if fetched_revs.is_empty() {
543 543 vec![]
544 544 } else if !self.data_config.with_sparse_read || self.is_inline() {
545 545 vec![fetched_revs]
546 546 } else {
547 547 self.slice_chunk(&fetched_revs, target_size)?
548 548 };
549 549
550 550 self.with_read(|| {
551 551 for revs_chunk in sliced_chunks {
552 552 let first_rev = revs_chunk[0];
553 553 // Skip trailing revisions with empty diff
554 554 let last_rev_idx = revs_chunk
555 555 .iter()
556 556 .rposition(|r| self.data_compressed_length(*r) != 0)
557 557 .unwrap_or(revs_chunk.len() - 1);
558 558
559 559 let last_rev = revs_chunk[last_rev_idx];
560 560
561 561 let (offset, data) =
562 562 self.get_segment_for_revs(first_rev, last_rev)?;
563 563
564 564 let revs_chunk = &revs_chunk[..=last_rev_idx];
565 565
566 566 for rev in revs_chunk {
567 567 let chunk_start = self.data_start(*rev);
568 568 let chunk_length = self.data_compressed_length(*rev);
569 569 // TODO revlogv2 should check the compression mode
570 570 let bytes = &data[chunk_start - offset..][..chunk_length];
571 571 let chunk = if !bytes.is_empty() && bytes[0] == ZSTD_BYTE {
572 572 // If we're using `zstd`, we want to try a more
573 573 // specialized decompression
574 574 let entry = self.index.get_entry(*rev).unwrap();
575 575 let is_delta = entry
576 576 .base_revision_or_base_of_delta_chain()
577 577 != (*rev).into();
578 578 let uncompressed = uncompressed_zstd_data(
579 579 bytes,
580 580 is_delta,
581 581 entry.uncompressed_len(),
582 582 )?;
583 583 Cow::Owned(uncompressed)
584 584 } else {
585 585 // Otherwise just fallback to generic decompression.
586 586 self.decompress(bytes)?
587 587 };
588 588
589 589 chunks.push((*rev, chunk.into()));
590 590 }
591 591 }
592 592 Ok(())
593 593 })?;
594 594
595 595 if let Some(cache) = self.uncompressed_chunk_cache.as_ref() {
596 596 let mut cache = cache.borrow_mut();
597 597 for (rev, chunk) in chunks.iter().skip(already_cached) {
598 598 cache.insert(*rev, chunk.clone());
599 599 }
600 600 }
601 601 // Use stable sort here since it's *mostly* sorted
602 602 chunks.sort_by(|a, b| a.0.cmp(&b.0));
603 603 Ok(chunks.into_iter().map(|(_r, chunk)| chunk).collect())
604 604 }
605 605
606 606 /// Slice revs to reduce the amount of unrelated data to be read from disk.
607 607 ///
608 608 /// ``revs`` is sliced into groups that should be read in one time.
609 609 /// Assume that revs are sorted.
610 610 ///
611 611 /// The initial chunk is sliced until the overall density
612 612 /// (payload/chunks-span ratio) is above
613 613 /// `revlog.data_config.sr_density_threshold`.
614 614 /// No gap smaller than `revlog.data_config.sr_min_gap_size` is skipped.
615 615 ///
616 616 /// If `target_size` is set, no chunk larger than `target_size`
617 617 /// will be returned.
618 618 /// For consistency with other slicing choices, this limit won't go lower
619 619 /// than `revlog.data_config.sr_min_gap_size`.
620 620 ///
621 621 /// If individual revision chunks are larger than this limit, they will
622 622 /// still be raised individually.
623 623 pub fn slice_chunk(
624 624 &self,
625 625 revs: &[Revision],
626 626 target_size: Option<u64>,
627 627 ) -> Result<Vec<Vec<Revision>>, RevlogError> {
628 628 let target_size =
629 629 target_size.map(|size| size.max(self.data_config.sr_min_gap_size));
630 630
631 631 let target_density = self.data_config.sr_density_threshold;
632 632 let min_gap_size = self.data_config.sr_min_gap_size as usize;
633 633 let to_density = self.index.slice_chunk_to_density(
634 634 revs,
635 635 target_density,
636 636 min_gap_size,
637 637 );
638 638
639 639 let mut sliced = vec![];
640 640
641 641 for chunk in to_density {
642 642 sliced.extend(
643 643 self.slice_chunk_to_size(&chunk, target_size)?
644 644 .into_iter()
645 645 .map(ToOwned::to_owned),
646 646 );
647 647 }
648 648
649 649 Ok(sliced)
650 650 }
651 651
652 652 /// Slice revs to match the target size
653 653 ///
654 654 /// This is intended to be used on chunks that density slicing selected,
655 655 /// but that are still too large compared to the read guarantee of revlogs.
656 656 /// This might happen when the "minimal gap size" interrupted the slicing
657 657 /// or when chains are built in a way that create large blocks next to
658 658 /// each other.
659 659 fn slice_chunk_to_size<'a>(
660 660 &self,
661 661 revs: &'a [Revision],
662 662 target_size: Option<u64>,
663 663 ) -> Result<Vec<&'a [Revision]>, RevlogError> {
664 664 let mut start_data = self.data_start(revs[0]);
665 665 let end_data = self.data_end(revs[revs.len() - 1]);
666 666 let full_span = end_data - start_data;
667 667
668 668 let nothing_to_do = target_size
669 669 .map(|size| full_span <= size as usize)
670 670 .unwrap_or(true);
671 671
672 672 if nothing_to_do {
673 673 return Ok(vec![revs]);
674 674 }
675 675 let target_size = target_size.expect("target_size is set") as usize;
676 676
677 677 let mut start_rev_idx = 0;
678 678 let mut end_rev_idx = 1;
679 679 let mut chunks = vec![];
680 680
681 681 for (idx, rev) in revs.iter().enumerate().skip(1) {
682 682 let span = self.data_end(*rev) - start_data;
683 683 let is_snapshot = self.is_snapshot(*rev)?;
684 684 if span <= target_size && is_snapshot {
685 685 end_rev_idx = idx + 1;
686 686 } else {
687 687 let chunk =
688 688 self.trim_chunk(revs, start_rev_idx, Some(end_rev_idx));
689 689 if !chunk.is_empty() {
690 690 chunks.push(chunk);
691 691 }
692 692 start_rev_idx = idx;
693 693 start_data = self.data_start(*rev);
694 694 end_rev_idx = idx + 1;
695 695 }
696 696 if !is_snapshot {
697 697 break;
698 698 }
699 699 }
700 700
701 701 // For the others, we use binary slicing to quickly converge towards
702 702 // valid chunks (otherwise, we might end up looking for the start/end
703 703 // of many revisions). This logic is not looking for the perfect
704 704 // slicing point, it quickly converges towards valid chunks.
705 705 let number_of_items = revs.len();
706 706
707 707 while (end_data - start_data) > target_size {
708 708 end_rev_idx = number_of_items;
709 709 if number_of_items - start_rev_idx <= 1 {
710 710 // Protect against individual chunks larger than the limit
711 711 break;
712 712 }
713 713 let mut local_end_data = self.data_end(revs[end_rev_idx - 1]);
714 714 let mut span = local_end_data - start_data;
715 715 while span > target_size {
716 716 if end_rev_idx - start_rev_idx <= 1 {
717 717 // Protect against individual chunks larger than the limit
718 718 break;
719 719 }
720 720 end_rev_idx -= (end_rev_idx - start_rev_idx) / 2;
721 721 local_end_data = self.data_end(revs[end_rev_idx - 1]);
722 722 span = local_end_data - start_data;
723 723 }
724 724 let chunk =
725 725 self.trim_chunk(revs, start_rev_idx, Some(end_rev_idx));
726 726 if !chunk.is_empty() {
727 727 chunks.push(chunk);
728 728 }
729 729 start_rev_idx = end_rev_idx;
730 730 start_data = self.data_start(revs[start_rev_idx]);
731 731 }
732 732
733 733 let chunk = self.trim_chunk(revs, start_rev_idx, None);
734 734 if !chunk.is_empty() {
735 735 chunks.push(chunk);
736 736 }
737 737
738 738 Ok(chunks)
739 739 }
740 740
741 741 /// Returns `revs[startidx..endidx]` without empty trailing revs
742 742 fn trim_chunk<'a>(
743 743 &self,
744 744 revs: &'a [Revision],
745 745 start_rev_idx: usize,
746 746 end_rev_idx: Option<usize>,
747 747 ) -> &'a [Revision] {
748 748 let mut end_rev_idx = end_rev_idx.unwrap_or(revs.len());
749 749
750 750 // If we have a non-empty delta candidate, there is nothing to trim
751 751 if revs[end_rev_idx - 1].0 < self.len() as BaseRevision {
752 752 // Trim empty revs at the end, except the very first rev of a chain
753 753 while end_rev_idx > 1
754 754 && end_rev_idx > start_rev_idx
755 755 && self.data_compressed_length(revs[end_rev_idx - 1]) == 0
756 756 {
757 757 end_rev_idx -= 1
758 758 }
759 759 }
760 760
761 761 &revs[start_rev_idx..end_rev_idx]
762 762 }
763 763
764 764 /// Check the hash of some given data against the recorded hash.
765 765 pub fn check_hash(
766 766 &self,
767 767 p1: Revision,
768 768 p2: Revision,
769 769 expected: &[u8],
770 770 data: &[u8],
771 771 ) -> bool {
772 772 let e1 = self.index.get_entry(p1);
773 773 let h1 = match e1 {
774 774 Some(ref entry) => entry.hash(),
775 775 None => &NULL_NODE,
776 776 };
777 777 let e2 = self.index.get_entry(p2);
778 778 let h2 = match e2 {
779 779 Some(ref entry) => entry.hash(),
780 780 None => &NULL_NODE,
781 781 };
782 782
783 783 hash(data, h1.as_bytes(), h2.as_bytes()) == expected
784 784 }
785 785
786 786 /// Returns whether we are currently in a [`Self::with_write`] context
787 787 pub fn is_writing(&self) -> bool {
788 788 self.writing_handles.is_some()
789 789 }
790 790
791 791 /// Open the revlog files for writing
792 792 ///
793 793 /// Adding content to a revlog should be done within this context.
794 794 /// TODO try using `BufRead` and `BufWrite` and see if performance improves
795 795 pub fn with_write<R>(
796 796 &mut self,
797 797 transaction: &mut impl Transaction,
798 798 data_end: Option<usize>,
799 799 func: impl FnOnce() -> R,
800 800 ) -> Result<R, HgError> {
801 801 if self.is_writing() {
802 802 return Ok(func());
803 803 }
804 804 self.enter_writing_context(data_end, transaction)
805 805 .inspect_err(|_| {
806 806 self.exit_writing_context();
807 807 })?;
808 808 let res = func();
809 809 self.exit_writing_context();
810 810 Ok(res)
811 811 }
812 812
813 813 /// `pub` only for use in hg-cpython
814 814 #[doc(hidden)]
815 815 pub fn exit_writing_context(&mut self) {
816 816 self.writing_handles.take();
817 817 self.segment_file.writing_handle.take();
818 818 self.segment_file.reading_handle.take();
819 819 }
820 820
821 821 /// `pub` only for use in hg-cpython
822 822 #[doc(hidden)]
823 823 pub fn python_writing_handles(&self) -> Option<&WriteHandles> {
824 824 self.writing_handles.as_ref()
825 825 }
826 826
827 827 /// `pub` only for use in hg-cpython
828 828 #[doc(hidden)]
829 829 pub fn enter_writing_context(
830 830 &mut self,
831 831 data_end: Option<usize>,
832 832 transaction: &mut impl Transaction,
833 833 ) -> Result<(), HgError> {
834 834 let data_size = if self.is_empty() {
835 835 0
836 836 } else {
837 837 self.data_end(Revision((self.len() - 1) as BaseRevision))
838 838 };
839 839 let data_handle = if !self.is_inline() {
840 840 let data_handle = match self.vfs.open_write(&self.data_file) {
841 841 Ok(mut f) => {
842 842 if let Some(end) = data_end {
843 843 f.seek(SeekFrom::Start(end as u64))
844 844 .when_reading_file(&self.data_file)?;
845 845 } else {
846 846 f.seek(SeekFrom::End(0))
847 847 .when_reading_file(&self.data_file)?;
848 848 }
849 849 f
850 850 }
851 851 Err(e) => match e {
852 852 HgError::IoError { error, context } => {
853 853 if error.kind() != ErrorKind::NotFound {
854 854 return Err(HgError::IoError { error, context });
855 855 }
856 856 self.vfs.create(&self.data_file, true)?
857 857 }
858 858 e => return Err(e),
859 859 },
860 860 };
861 861 transaction.add(&self.data_file, data_size);
862 862 Some(FileHandle::from_file(
863 863 data_handle,
864 864 dyn_clone::clone_box(&*self.vfs),
865 865 &self.data_file,
866 866 ))
867 867 } else {
868 868 None
869 869 };
870 870 let index_size = self.len() * INDEX_ENTRY_SIZE;
871 871 let index_handle = self.index_write_handle()?;
872 872 if self.is_inline() {
873 873 transaction.add(&self.index_file, data_size);
874 874 } else {
875 875 transaction.add(&self.index_file, index_size);
876 876 }
877 877 self.writing_handles = Some(WriteHandles {
878 878 index_handle: index_handle.clone(),
879 879 data_handle: data_handle.clone(),
880 880 });
881 881 *self.segment_file.reading_handle.borrow_mut() = if self.is_inline() {
882 882 Some(index_handle)
883 883 } else {
884 884 data_handle
885 885 };
886 886 Ok(())
887 887 }
888 888
889 889 /// Get a write handle to the index, sought to the end of its data.
890 890 fn index_write_handle(&self) -> Result<FileHandle, HgError> {
891 891 let res = if self.delayed_buffer.is_none() {
892 892 if self.data_config.check_ambig {
893 893 self.vfs.open_check_ambig(&self.index_file)
894 894 } else {
895 895 self.vfs.open_write(&self.index_file)
896 896 }
897 897 } else {
898 898 self.vfs.open_write(&self.index_file)
899 899 };
900 900 match res {
901 901 Ok(mut handle) => {
902 902 handle
903 903 .seek(SeekFrom::End(0))
904 904 .when_reading_file(&self.index_file)?;
905 905 Ok(
906 906 if let Some(delayed_buffer) = self.delayed_buffer.as_ref()
907 907 {
908 908 FileHandle::from_file_delayed(
909 909 handle,
910 910 dyn_clone::clone_box(&*self.vfs),
911 911 &self.index_file,
912 912 delayed_buffer.clone(),
913 913 )?
914 914 } else {
915 915 FileHandle::from_file(
916 916 handle,
917 917 dyn_clone::clone_box(&*self.vfs),
918 918 &self.index_file,
919 919 )
920 920 },
921 921 )
922 922 }
923 923 Err(e) => match e {
924 924 HgError::IoError { error, context } => {
925 925 if error.kind() != ErrorKind::NotFound {
926 926 return Err(HgError::IoError { error, context });
927 927 };
928 928 if let Some(delayed_buffer) = self.delayed_buffer.as_ref()
929 929 {
930 930 FileHandle::new_delayed(
931 931 dyn_clone::clone_box(&*self.vfs),
932 932 &self.index_file,
933 933 true,
934 934 delayed_buffer.clone(),
935 935 )
936 936 } else {
937 937 FileHandle::new(
938 938 dyn_clone::clone_box(&*self.vfs),
939 939 &self.index_file,
940 940 true,
941 941 true,
942 942 )
943 943 }
944 944 }
945 945 e => Err(e),
946 946 },
947 947 }
948 948 }
949 949
950 950 /// Split the data of an inline revlog into an index and a data file
951 951 pub fn split_inline(
952 952 &mut self,
953 953 header: IndexHeader,
954 954 new_index_file_path: Option<PathBuf>,
955 955 ) -> Result<PathBuf, RevlogError> {
956 956 assert!(self.delayed_buffer.is_none());
957 957 let existing_handles = self.writing_handles.is_some();
958 958 if let Some(handles) = &mut self.writing_handles {
959 959 handles.index_handle.flush()?;
960 960 self.writing_handles.take();
961 961 self.segment_file.writing_handle.take();
962 962 }
963 963 let mut new_data_file_handle =
964 964 self.vfs.create(&self.data_file, true)?;
965 965 // Drop any potential data, possibly redundant with the VFS impl.
966 966 new_data_file_handle
967 967 .set_len(0)
968 968 .when_writing_file(&self.data_file)?;
969 969
970 970 self.with_read(|| -> Result<(), RevlogError> {
971 971 for r in 0..self.index.len() {
972 972 let rev = Revision(r as BaseRevision);
973 973 let rev_segment = self.get_segment_for_revs(rev, rev)?.1;
974 974 new_data_file_handle
975 975 .write_all(&rev_segment)
976 976 .when_writing_file(&self.data_file)?;
977 977 }
978 978 new_data_file_handle
979 979 .flush()
980 980 .when_writing_file(&self.data_file)?;
981 981 Ok(())
982 982 })?;
983 983
984 984 if let Some(index_path) = new_index_file_path {
985 985 self.index_file = index_path
986 986 }
987 987
988 988 let mut new_index_handle = self.vfs.create(&self.index_file, true)?;
989 989 let mut new_data = Vec::with_capacity(self.len() * INDEX_ENTRY_SIZE);
990 990 for r in 0..self.len() {
991 991 let rev = Revision(r as BaseRevision);
992 992 let entry = self.index.entry_binary(rev).unwrap_or_else(|| {
993 993 panic!(
994 994 "entry {} should exist in {}",
995 995 r,
996 996 self.index_file.display()
997 997 )
998 998 });
999 999 if r == 0 {
1000 1000 new_data.extend(header.header_bytes);
1001 1001 }
1002 1002 new_data.extend(entry);
1003 1003 }
1004 1004 new_index_handle
1005 1005 .write_all(&new_data)
1006 1006 .when_writing_file(&self.index_file)?;
1007 1007 // Replace the index with a new one because the buffer contains inline
1008 1008 // data
1009 1009 self.index = Index::new(Box::new(new_data), header)?;
1010 1010 self.inline = false;
1011 1011
1012 1012 self.segment_file = RandomAccessFile::new(
1013 1013 dyn_clone::clone_box(&*self.vfs),
1014 1014 self.data_file.to_owned(),
1015 1015 );
1016 1016 if existing_handles {
1017 1017 // Switched from inline to conventional, reopen the index
1018 1018 let new_data_handle = Some(FileHandle::from_file(
1019 1019 new_data_file_handle,
1020 1020 dyn_clone::clone_box(&*self.vfs),
1021 1021 &self.data_file,
1022 1022 ));
1023 1023 self.writing_handles = Some(WriteHandles {
1024 1024 index_handle: self.index_write_handle()?,
1025 1025 data_handle: new_data_handle.clone(),
1026 1026 });
1027 1027 *self.segment_file.writing_handle.borrow_mut() = new_data_handle;
1028 1028 }
1029 1029
1030 1030 Ok(self.index_file.to_owned())
1031 1031 }
1032 1032
1033 1033 /// Write a new entry to this revlog.
1034 1034 /// - `entry` is the index bytes
1035 1035 /// - `header_and_data` is the compression header and the revision data
1036 1036 /// - `offset` is the position in the data file to write to
1037 1037 /// - `index_end` is the overwritten position in the index in revlog-v2,
1038 1038 /// since the format may allow a rewrite of garbage data at the end.
1039 1039 /// - `data_end` is the overwritten position in the data-file in revlog-v2,
1040 1040 /// since the format may allow a rewrite of garbage data at the end.
1041 1041 ///
1042 1042 /// XXX Why do we have `data_end` *and* `offset`? Same question in Python
1043 1043 pub fn write_entry(
1044 1044 &mut self,
1045 1045 mut transaction: impl Transaction,
1046 1046 entry: &[u8],
1047 1047 header_and_data: (&[u8], &[u8]),
1048 1048 mut offset: usize,
1049 1049 index_end: Option<u64>,
1050 1050 data_end: Option<u64>,
1051 1051 ) -> Result<(u64, Option<u64>), HgError> {
1052 1052 let current_revision = self.len() - 1;
1053 1053 let canonical_index_file = self.canonical_index_file();
1054 1054
1055 1055 let is_inline = self.is_inline();
1056 1056 let handles = match &mut self.writing_handles {
1057 1057 None => {
1058 1058 return Err(HgError::abort(
1059 1059 "adding revision outside of the `with_write` context",
1060 1060 exit_codes::ABORT,
1061 1061 None,
1062 1062 ));
1063 1063 }
1064 1064 Some(handles) => handles,
1065 1065 };
1066 1066 let index_handle = &mut handles.index_handle;
1067 1067 let data_handle = &mut handles.data_handle;
1068 1068 if let Some(end) = index_end {
1069 1069 index_handle
1070 1070 .seek(SeekFrom::Start(end))
1071 1071 .when_reading_file(&self.index_file)?;
1072 1072 } else {
1073 1073 index_handle
1074 1074 .seek(SeekFrom::End(0))
1075 1075 .when_reading_file(&self.index_file)?;
1076 1076 }
1077 1077 if let Some(data_handle) = data_handle {
1078 1078 if let Some(end) = data_end {
1079 1079 data_handle
1080 1080 .seek(SeekFrom::Start(end))
1081 1081 .when_reading_file(&self.data_file)?;
1082 1082 } else {
1083 1083 data_handle
1084 1084 .seek(SeekFrom::End(0))
1085 1085 .when_reading_file(&self.data_file)?;
1086 1086 }
1087 1087 }
1088 1088 let (header, data) = header_and_data;
1089 1089
1090 1090 if !is_inline {
1091 1091 transaction.add(&self.data_file, offset);
1092 1092 transaction
1093 1093 .add(&canonical_index_file, current_revision * entry.len());
1094 1094 let data_handle = data_handle
1095 1095 .as_mut()
1096 1096 .expect("data handle should exist when not inline");
1097 1097 if !header.is_empty() {
1098 1098 data_handle.write_all(header)?;
1099 1099 }
1100 1100 data_handle.write_all(data)?;
1101 1101 match &mut self.delayed_buffer {
1102 1102 Some(buf) => {
1103 1103 buf.lock()
1104 1104 .expect("propagate the panic")
1105 1105 .buffer
1106 1106 .write_all(entry)
1107 1107 .expect("write to delay buffer should succeed");
1108 1108 }
1109 1109 None => index_handle.write_all(entry)?,
1110 1110 }
1111 1111 } else if self.delayed_buffer.is_some() {
1112 1112 return Err(HgError::abort(
1113 1113 "invalid delayed write on inline revlog",
1114 1114 exit_codes::ABORT,
1115 1115 None,
1116 1116 ));
1117 1117 } else {
1118 1118 offset += current_revision * entry.len();
1119 1119 transaction.add(&canonical_index_file, offset);
1120 1120 index_handle.write_all(entry)?;
1121 1121 index_handle.write_all(header)?;
1122 1122 index_handle.write_all(data)?;
1123 1123 }
1124 1124 let data_position = match data_handle {
1125 1125 Some(h) => Some(h.position()?),
1126 1126 None => None,
1127 1127 };
1128 1128 Ok((index_handle.position()?, data_position))
1129 1129 }
1130 1130
1131 1131 /// Return the real target index file and not the temporary when diverting
1132 1132 pub fn canonical_index_file(&self) -> PathBuf {
1133 1133 self.original_index_file
1134 1134 .as_ref()
1135 1135 .map(ToOwned::to_owned)
1136 1136 .unwrap_or_else(|| self.index_file.to_owned())
1137 1137 }
1138 1138
1139 1139 /// Return the path to the diverted index
1140 1140 fn diverted_index(&self) -> PathBuf {
1141 1141 self.index_file.with_extension("i.a")
1142 1142 }
1143 1143
1144 1144 /// True if we're in a [`Self::with_write`] or [`Self::with_read`] context
1145 1145 pub fn is_open(&self) -> bool {
1146 1146 self.segment_file.is_open()
1147 1147 }
1148 1148
1149 1149 /// Set this revlog to delay its writes to a buffer
1150 1150 pub fn delay(&mut self) -> Result<Option<PathBuf>, HgError> {
1151 1151 assert!(!self.is_open());
1152 1152 if self.is_inline() {
1153 1153 return Err(HgError::abort(
1154 1154 "revlog with delayed write should not be inline",
1155 1155 exit_codes::ABORT,
1156 1156 None,
1157 1157 ));
1158 1158 }
1159 1159 if self.delayed_buffer.is_some() || self.original_index_file.is_some()
1160 1160 {
1161 1161 // Delay or divert already happening
1162 1162 return Ok(None);
1163 1163 }
1164 1164 if self.is_empty() {
1165 1165 self.original_index_file = Some(self.index_file.to_owned());
1166 1166 self.index_file = self.diverted_index();
1167 1167 if self.vfs.exists(&self.index_file) {
1168 1168 self.vfs.unlink(&self.index_file)?;
1169 1169 }
1170 1170 Ok(Some(self.index_file.to_owned()))
1171 1171 } else {
1172 1172 self.delayed_buffer =
1173 1173 Some(Arc::new(Mutex::new(DelayedBuffer::default())));
1174 1174 Ok(None)
1175 1175 }
1176 1176 }
1177 1177
1178 1178 /// Write the pending data (in memory) if any to the diverted index file
1179 1179 /// (on disk temporary file)
1180 1180 pub fn write_pending(
1181 1181 &mut self,
1182 1182 ) -> Result<(Option<PathBuf>, bool), HgError> {
1183 1183 assert!(!self.is_open());
1184 1184 if self.is_inline() {
1185 1185 return Err(HgError::abort(
1186 1186 "revlog with delayed write should not be inline",
1187 1187 exit_codes::ABORT,
1188 1188 None,
1189 1189 ));
1190 1190 }
1191 1191 if self.original_index_file.is_some() {
1192 1192 return Ok((None, true));
1193 1193 }
1194 1194 let mut any_pending = false;
1195 1195 let pending_index_file = self.diverted_index();
1196 1196 if self.vfs.exists(&pending_index_file) {
1197 1197 self.vfs.unlink(&pending_index_file)?;
1198 1198 }
1199 1199 self.vfs.copy(&self.index_file, &pending_index_file)?;
1200 1200 if let Some(delayed_buffer) = self.delayed_buffer.take() {
1201 1201 let mut index_file_handle =
1202 1202 self.vfs.open_write(&pending_index_file)?;
1203 1203 index_file_handle
1204 1204 .seek(SeekFrom::End(0))
1205 1205 .when_writing_file(&pending_index_file)?;
1206 1206 let delayed_data =
1207 1207 &delayed_buffer.lock().expect("propagate the panic").buffer;
1208 1208 index_file_handle
1209 1209 .write_all(delayed_data)
1210 1210 .when_writing_file(&pending_index_file)?;
1211 1211 any_pending = true;
1212 1212 }
1213 1213 self.original_index_file = Some(self.index_file.to_owned());
1214 1214 self.index_file = pending_index_file;
1215 1215 Ok((Some(self.index_file.to_owned()), any_pending))
1216 1216 }
1217 1217
1218 1218 /// Overwrite the canonical file with the diverted file, or write out the
1219 1219 /// delayed buffer.
1220 1220 /// Returns an error if the revlog is neither diverted nor delayed.
1221 1221 pub fn finalize_pending(&mut self) -> Result<PathBuf, HgError> {
1222 1222 assert!(!self.is_open());
1223 1223 if self.is_inline() {
1224 1224 return Err(HgError::abort(
1225 1225 "revlog with delayed write should not be inline",
1226 1226 exit_codes::ABORT,
1227 1227 None,
1228 1228 ));
1229 1229 }
1230 1230 match (
1231 1231 self.delayed_buffer.as_ref(),
1232 1232 self.original_index_file.as_ref(),
1233 1233 ) {
1234 1234 (None, None) => {
1235 1235 return Err(HgError::abort(
1236 1236 "neither delay nor divert found on this revlog",
1237 1237 exit_codes::ABORT,
1238 1238 None,
1239 1239 ));
1240 1240 }
1241 1241 (Some(delay), None) => {
1242 1242 let mut index_file_handle =
1243 1243 self.vfs.open_write(&self.index_file)?;
1244 1244 index_file_handle
1245 1245 .seek(SeekFrom::End(0))
1246 1246 .when_writing_file(&self.index_file)?;
1247 1247 index_file_handle
1248 1248 .write_all(
1249 1249 &delay.lock().expect("propagate the panic").buffer,
1250 1250 )
1251 1251 .when_writing_file(&self.index_file)?;
1252 1252 self.delayed_buffer = None;
1253 1253 }
1254 1254 (None, Some(divert)) => {
1255 1255 if self.vfs.exists(&self.index_file) {
1256 1256 self.vfs.rename(&self.index_file, divert, true)?;
1257 1257 }
1258 1258 divert.clone_into(&mut self.index_file);
1259 1259 self.original_index_file = None;
1260 1260 }
1261 1261 (Some(_), Some(_)) => unreachable!(
1262 1262 "{} is in an inconsistent state of both delay and divert",
1263 1263 self.canonical_index_file().display(),
1264 1264 ),
1265 1265 }
1266 1266 Ok(self.canonical_index_file())
1267 1267 }
1268 1268
1269 1269 /// `pub` only for `hg-cpython`. This is made a different method than
1270 1270 /// [`Revlog::index`] in case there is a different invariant that pops up
1271 1271 /// later.
1272 1272 #[doc(hidden)]
1273 1273 pub fn shared_index(&self) -> &Index {
1274 1274 &self.index
1275 1275 }
1276 1276 }
1277 1277
1278 1278 /// The use of a [`Refcell`] assumes that a given revlog will only
1279 1279 /// be accessed (read or write) by a single thread.
1280 1280 type UncompressedChunkCache =
1281 1281 RefCell<LruMap<Revision, Arc<[u8]>, ByMemoryUsage>>;
1282 1282
1283 1283 /// The node, revision and data for the last revision we've seen. Speeds up
1284 1284 /// a lot of sequential operations of the revlog.
1285 1285 ///
1286 1286 /// The data is not just bytes since it can come from Python and we want to
1287 1287 /// avoid copies if possible.
1288 1288 type SingleRevisionCache =
1289 1289 (Node, Revision, Box<dyn Deref<Target = [u8]> + Send>);
1290 1290
1291 1291 /// A way of progressively filling a buffer with revision data, then return
1292 1292 /// that buffer. Used to abstract away Python-allocated code to reduce copying
1293 1293 /// for performance reasons.
1294 1294 pub trait RevisionBuffer {
1295 1295 /// The owned buffer type to return
1296 1296 type Target;
1297 1297 /// Copies the slice into the buffer
1298 1298 fn extend_from_slice(&mut self, slice: &[u8]);
1299 1299 /// Returns the now finished owned buffer
1300 1300 fn finish(self) -> Self::Target;
1301 1301 }
1302 1302
1303 1303 /// A simple vec-based buffer. This is uselessly complicated for the pure Rust
1304 1304 /// case, but it's the price to pay for Python compatibility.
1305 1305 #[derive(Debug)]
1306 1306 pub(super) struct CoreRevisionBuffer {
1307 1307 buf: Vec<u8>,
1308 1308 }
1309 1309
1310 1310 impl CoreRevisionBuffer {
1311 1311 pub fn new() -> Self {
1312 1312 Self { buf: vec![] }
1313 1313 }
1314 1314
1315 1315 #[inline]
1316 1316 pub fn resize(&mut self, size: usize) {
1317 1317 self.buf.reserve_exact(size - self.buf.capacity());
1318 1318 }
1319 1319 }
1320 1320
1321 1321 impl RevisionBuffer for CoreRevisionBuffer {
1322 1322 type Target = Vec<u8>;
1323 1323
1324 1324 #[inline]
1325 1325 fn extend_from_slice(&mut self, slice: &[u8]) {
1326 1326 self.buf.extend_from_slice(slice);
1327 1327 }
1328 1328
1329 1329 #[inline]
1330 1330 fn finish(self) -> Self::Target {
1331 1331 self.buf
1332 1332 }
1333 1333 }
1334 1334
1335 1335 /// Calculate the hash of a revision given its data and its parents.
1336 1336 pub fn hash(
1337 1337 data: &[u8],
1338 1338 p1_hash: &[u8],
1339 1339 p2_hash: &[u8],
1340 1340 ) -> [u8; NODE_BYTES_LENGTH] {
1341 1341 let mut hasher = Sha1::new();
1342 1342 let (a, b) = (p1_hash, p2_hash);
1343 1343 if a > b {
1344 1344 hasher.update(b);
1345 1345 hasher.update(a);
1346 1346 } else {
1347 1347 hasher.update(a);
1348 1348 hasher.update(b);
1349 1349 }
1350 1350 hasher.update(data);
1351 1351 *hasher.finalize().as_ref()
1352 1352 }
General Comments 0
You need to be logged in to leave comments. Login now