##// END OF EJS Templates
rust-revlog: add file IO helpers...
Raphaël Gomès -
r53052:426696af default
parent child Browse files
Show More
This diff has been collapsed as it changes many lines, (535 lines changed) Show them Hide them
@@ -0,0 +1,535
1 //! Helpers for revlog file reading and writing.
2
3 use std::{
4 cell::RefCell,
5 fs::File,
6 io::{Read, Seek, SeekFrom, Write},
7 path::{Path, PathBuf},
8 sync::{Arc, Mutex},
9 };
10
11 use crate::{
12 errors::{HgError, IoResultExt},
13 vfs::Vfs,
14 };
15
16 /// Wraps accessing arbitrary chunks of data within a file and reusing handles.
17 /// This is currently useful for accessing a revlog's data file, only reading
18 /// the ranges that are currently relevant, like a sort of basic and manual
19 /// file-based mmap.
20 ///
21 /// XXX should this just be replaced with `mmap` + `madvise` ranges?
22 /// The upcoming `UncompressedChunkCache` will make up for most of the slowness
23 /// of re-reading the same chunks, so this might not be as useful. Aside from
24 /// the major benefit of having less code to take care of, using `mmap` will
25 /// allow multiple processes to share the same pages, especially for the
26 /// changelog and manifest, which would make a difference in server contexts.
27 pub struct RandomAccessFile {
28 /// The current store VFS to pass it to [`FileHandle`]
29 vfs: Box<dyn Vfs>,
30 /// Filename of the open file, relative to the vfs root
31 pub filename: PathBuf,
32 /// The current read-only handle on the file, if any
33 pub reading_handle: RefCell<Option<FileHandle>>,
34 /// The current read-write handle on the file, if any
35 pub writing_handle: RefCell<Option<FileHandle>>,
36 }
37
38 impl RandomAccessFile {
39 /// Wrap a file for random access
40 pub fn new(vfs: Box<dyn Vfs>, filename: PathBuf) -> Self {
41 assert!(filename.is_relative());
42 Self {
43 vfs,
44 filename,
45 reading_handle: RefCell::new(None),
46 writing_handle: RefCell::new(None),
47 }
48 }
49
50 /// Read a chunk of bytes from the file.
51 pub fn read_chunk(
52 &self,
53 offset: usize,
54 length: usize,
55 ) -> Result<Vec<u8>, HgError> {
56 let mut handle = self.get_read_handle()?;
57 handle
58 .seek(SeekFrom::Start(offset as u64))
59 .when_reading_file(&self.filename)?;
60 handle.read_exact(length).when_reading_file(&self.filename)
61 }
62
63 /// `pub` only for hg-cpython
64 #[doc(hidden)]
65 pub fn get_read_handle(&self) -> Result<FileHandle, HgError> {
66 if let Some(handle) = &*self.writing_handle.borrow() {
67 // Use a file handle being actively used for writes, if available.
68 // There is some danger to doing this because reads will seek the
69 // file.
70 // However, [`Revlog::write_entry`] performs a `SeekFrom::End(0)`
71 // before all writes, so we should be safe.
72 return Ok(handle.clone());
73 }
74 if let Some(handle) = &*self.reading_handle.borrow() {
75 return Ok(handle.clone());
76 }
77 // early returns done to work around borrowck being overzealous
78 // See https://github.com/rust-lang/rust/issues/103108
79 let new_handle = FileHandle::new(
80 dyn_clone::clone_box(&*self.vfs),
81 &self.filename,
82 false,
83 false,
84 )?;
85 *self.reading_handle.borrow_mut() = Some(new_handle.clone());
86 Ok(new_handle)
87 }
88
89 /// `pub` only for hg-cpython
90 #[doc(hidden)]
91 pub fn exit_reading_context(&self) {
92 self.reading_handle.take();
93 }
94
95 // Returns whether this file currently open
96 pub fn is_open(&self) -> bool {
97 self.reading_handle.borrow().is_some()
98 || self.writing_handle.borrow().is_some()
99 }
100 }
101
102 /// A buffer that holds new changelog index data that needs to be written
103 /// after the manifest and filelogs so that the repo is updated atomically to
104 /// external processes.
105 #[derive(Clone, Debug, Default)]
106 pub struct DelayedBuffer {
107 // The actual in-memory bytes storing the delayed writes
108 pub(super) buffer: Vec<u8>,
109 /// The current offset into the virtual file composed of file + buffer
110 offset: u64,
111 /// The size of the file at the time of opening
112 file_size: u64,
113 }
114
115 impl DelayedBuffer {
116 /// Returns the length of the full data (on-disk + buffer length).
117 pub fn len(&self) -> u64 {
118 self.buffer.len() as u64 + self.file_size
119 }
120
121 pub fn is_empty(&self) -> bool {
122 self.len() == 0
123 }
124 }
125
126 /// Holds an open [`File`] and the related data. This can be used for reading
127 /// and writing. Writes can be delayed to a buffer before touching the disk,
128 /// if relevant (in the changelog case), but reads are transparent.
129 pub struct FileHandle {
130 /// The actual open file
131 pub file: File,
132 /// The VFS with which the file was opened
133 vfs: Box<dyn Vfs>,
134 /// Filename of the open file, relative to the repo root
135 filename: PathBuf,
136 /// Buffer of delayed entry writes to the changelog index. This points
137 /// back to the buffer inside the revlog this handle refers to.
138 delayed_buffer: Option<Arc<Mutex<DelayedBuffer>>>,
139 }
140
141 impl std::fmt::Debug for FileHandle {
142 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
143 f.debug_struct("FileHandle")
144 .field("filename", &self.filename)
145 .field("delayed_buffer", &self.delayed_buffer)
146 .field("file", &self.file)
147 .finish()
148 }
149 }
150
151 impl Clone for FileHandle {
152 fn clone(&self) -> Self {
153 Self {
154 vfs: dyn_clone::clone_box(&*self.vfs),
155 filename: self.filename.clone(),
156 delayed_buffer: self.delayed_buffer.clone(),
157 // This can only fail if the OS doesn't have the file handle
158 // anymore, so we're not going to do anything useful anyway.
159 file: self.file.try_clone().expect("couldn't clone file handle"),
160 }
161 }
162 }
163
164 impl FileHandle {
165 /// Get a (read or write) file handle to `filename`. Only creates the file
166 /// if `create` is `true`.
167 pub fn new(
168 vfs: Box<dyn Vfs>,
169 filename: impl AsRef<Path>,
170 create: bool,
171 write: bool,
172 ) -> Result<Self, HgError> {
173 let file = if create {
174 vfs.create(filename.as_ref())?
175 } else if write {
176 vfs.open(filename.as_ref())?
177 } else {
178 vfs.open_read(filename.as_ref())?
179 };
180 Ok(Self {
181 vfs,
182 filename: filename.as_ref().to_owned(),
183 delayed_buffer: None,
184 file,
185 })
186 }
187
188 /// Get a file handle to `filename`, but writes go to a [`DelayedBuffer`].
189 pub fn new_delayed(
190 vfs: Box<dyn Vfs>,
191 filename: impl AsRef<Path>,
192 create: bool,
193 delayed_buffer: Arc<Mutex<DelayedBuffer>>,
194 ) -> Result<Self, HgError> {
195 let mut file = if create {
196 vfs.create(filename.as_ref())?
197 } else {
198 vfs.open(filename.as_ref())?
199 };
200 let size = vfs.file_size(&file)?;
201 let offset = file
202 .stream_position()
203 .when_reading_file(filename.as_ref())?;
204
205 {
206 let mut buf = delayed_buffer.lock().unwrap();
207 buf.file_size = size;
208 buf.offset = offset;
209 }
210
211 Ok(Self {
212 vfs,
213 filename: filename.as_ref().to_owned(),
214 delayed_buffer: Some(delayed_buffer),
215 file,
216 })
217 }
218
219 /// Wrap an existing [`File`]
220 pub fn from_file(
221 file: File,
222 vfs: Box<dyn Vfs>,
223 filename: impl AsRef<Path>,
224 ) -> Self {
225 Self {
226 vfs,
227 filename: filename.as_ref().to_owned(),
228 delayed_buffer: None,
229 file,
230 }
231 }
232
233 /// Wrap an existing [`File`], but writes go to a [`DelayedBuffer`].
234 pub fn from_file_delayed(
235 mut file: File,
236 vfs: Box<dyn Vfs>,
237 filename: impl AsRef<Path>,
238 delayed_buffer: Arc<Mutex<DelayedBuffer>>,
239 ) -> Result<Self, HgError> {
240 let size = vfs.file_size(&file)?;
241 let offset = file
242 .stream_position()
243 .when_reading_file(filename.as_ref())?;
244
245 {
246 let mut buf = delayed_buffer.lock().unwrap();
247 buf.file_size = size;
248 buf.offset = offset;
249 }
250
251 Ok(Self {
252 vfs,
253 filename: filename.as_ref().to_owned(),
254 delayed_buffer: Some(delayed_buffer),
255 file,
256 })
257 }
258
259 /// Move the position of the handle to `pos`,
260 /// spanning the [`DelayedBuffer`] if defined. Will return an error if
261 /// an invalid seek position is asked, or for any standard io error.
262 pub fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
263 if let Some(delay_buf) = &self.delayed_buffer {
264 let mut delay_buf = delay_buf.lock().unwrap();
265 // Virtual file offset spans real file and data
266 match pos {
267 SeekFrom::Start(offset) => delay_buf.offset = offset,
268 SeekFrom::End(offset) => {
269 delay_buf.offset =
270 delay_buf.len().saturating_add_signed(offset)
271 }
272 SeekFrom::Current(offset) => {
273 delay_buf.offset =
274 delay_buf.offset.saturating_add_signed(offset);
275 }
276 }
277 if delay_buf.offset < delay_buf.file_size {
278 self.file.seek(pos)
279 } else {
280 Ok(delay_buf.offset)
281 }
282 } else {
283 self.file.seek(pos)
284 }
285 }
286
287 /// Read exactly `length` bytes from the current position.
288 /// Errors are the same as [`std::io::Read::read_exact`].
289 pub fn read_exact(
290 &mut self,
291 length: usize,
292 ) -> Result<Vec<u8>, std::io::Error> {
293 if let Some(delay_buf) = self.delayed_buffer.as_mut() {
294 let mut delay_buf = delay_buf.lock().unwrap();
295 let mut buf = vec![0; length];
296 let offset: isize =
297 delay_buf.offset.try_into().expect("buffer too large");
298 let file_size: isize =
299 delay_buf.file_size.try_into().expect("file too large");
300 let span: isize = offset - file_size;
301 let length = length.try_into().expect("too large of a length");
302 let absolute_span: u64 =
303 span.unsigned_abs().try_into().expect("length too large");
304 if span < 0 {
305 if length <= absolute_span {
306 // We're only in the file
307 self.file.read_exact(&mut buf)?;
308 } else {
309 // We're spanning file and buffer
310 self.file
311 .read_exact(&mut buf[..absolute_span as usize])?;
312 delay_buf
313 .buffer
314 .take(length - absolute_span)
315 .read_exact(&mut buf[absolute_span as usize..])?;
316 }
317 } else {
318 // We're only in the buffer
319 delay_buf.buffer[absolute_span as usize..]
320 .take(length)
321 .read_exact(&mut buf)?;
322 }
323 delay_buf.offset += length;
324 Ok(buf.to_owned())
325 } else {
326 let mut buf = vec![0; length];
327 self.file.read_exact(&mut buf)?;
328 Ok(buf)
329 }
330 }
331
332 /// Flush the in-memory changes to disk. This does *not* write the
333 /// delayed buffer, only the pending file changes.
334 pub fn flush(&mut self) -> Result<(), HgError> {
335 self.file.flush().when_writing_file(&self.filename)
336 }
337
338 /// Return the current position in the file
339 pub fn position(&mut self) -> Result<u64, HgError> {
340 self.file
341 .stream_position()
342 .when_reading_file(&self.filename)
343 }
344
345 /// Append `data` to the file, or to the [`DelayedBuffer`], if any.
346 pub fn write_all(&mut self, data: &[u8]) -> Result<(), HgError> {
347 if let Some(buf) = &mut self.delayed_buffer {
348 let mut delayed_buffer = buf.lock().expect("propagate the panic");
349 assert_eq!(delayed_buffer.offset, delayed_buffer.len());
350 delayed_buffer.buffer.extend_from_slice(data);
351 delayed_buffer.offset += data.len() as u64;
352 Ok(())
353 } else {
354 self.file
355 .write_all(data)
356 .when_writing_file(&self.filename)?;
357 Ok(())
358 }
359 }
360 }
361
362 /// Write handles to a given revlog (index + maybe data)
363 #[derive(Debug)]
364 pub struct WriteHandles {
365 /// Handle to the index file
366 pub index_handle: FileHandle,
367 /// Handle to the data file, if the revlog is non-inline
368 pub data_handle: Option<FileHandle>,
369 }
370
371 #[cfg(test)]
372 mod tests {
373 use std::io::ErrorKind;
374
375 use crate::vfs::VfsImpl;
376
377 use super::*;
378
379 #[test]
380 fn test_random_access_file() {
381 let base = tempfile::tempdir().unwrap().into_path();
382 let filename = Path::new("a");
383 let file_path = base.join(filename);
384 let raf = RandomAccessFile::new(
385 Box::new(VfsImpl { base }),
386 filename.to_owned(),
387 );
388
389 assert!(!raf.is_open());
390 assert_eq!(&raf.filename, &filename);
391 // Should fail to read a non-existing file
392 match raf.get_read_handle().unwrap_err() {
393 HgError::IoError { error, .. } => match error.kind() {
394 std::io::ErrorKind::NotFound => {}
395 _ => panic!("should be not found"),
396 },
397 e => panic!("{}", e.to_string()),
398 }
399
400 std::fs::write(file_path, b"1234567890").unwrap();
401
402 // Should be able to open an existing file
403 let mut handle = raf.get_read_handle().unwrap();
404 assert!(raf.is_open());
405 assert_eq!(handle.read_exact(10).unwrap(), b"1234567890".to_vec());
406 }
407
408 #[test]
409 fn test_file_handle() {
410 let base = tempfile::tempdir().unwrap().into_path();
411 let filename = base.join("a");
412 // No `create` should fail
413 FileHandle::new(
414 Box::new(VfsImpl { base: base.clone() }),
415 &filename,
416 false,
417 false,
418 )
419 .unwrap_err();
420 std::fs::write(&filename, b"1234567890").unwrap();
421
422 let mut read_handle = FileHandle::new(
423 Box::new(VfsImpl { base: base.clone() }),
424 &filename,
425 false,
426 false,
427 )
428 .unwrap();
429 assert_eq!(&read_handle.filename, &filename);
430 assert_eq!(read_handle.position().unwrap(), 0);
431
432 // Writing to an explicit read handle should fail
433 read_handle.write_all(b"some data").unwrap_err();
434
435 // reading exactly n bytes should work
436 assert_eq!(read_handle.read_exact(3).unwrap(), b"123".to_vec());
437 // and the position should be remembered
438 assert_eq!(read_handle.read_exact(2).unwrap(), b"45".to_vec());
439
440 // Seeking should work
441 let position = read_handle.position().unwrap();
442 read_handle.seek(SeekFrom::Current(-2)).unwrap();
443 assert_eq!(position - 2, read_handle.position().unwrap());
444
445 // Seeking too much data should fail
446 read_handle.read_exact(1000).unwrap_err();
447
448 // Work around the yet unimplemented VFS for write
449 let mut options = std::fs::OpenOptions::new();
450 options.read(true);
451 options.write(true);
452 let file = options.open(&filename).unwrap();
453 // Open a write handle
454 let mut handle = FileHandle::from_file(
455 file,
456 Box::new(VfsImpl { base: base.clone() }),
457 &filename,
458 );
459
460 // Now writing should succeed
461 handle.write_all(b"new data").unwrap();
462 // Opening or writing does not seek, so we should be at the start
463 assert_eq!(handle.position().unwrap(), 8);
464 // We can still read
465 assert_eq!(handle.read_exact(2).unwrap(), b"90".to_vec());
466 // Flushing doesn't do anything unexpected
467 handle.flush().unwrap();
468
469 let delayed_buffer = Arc::new(Mutex::new(DelayedBuffer::default()));
470 let file = options.open(&filename).unwrap();
471 let mut handle = FileHandle::from_file_delayed(
472 file,
473 Box::new(VfsImpl { base: base.clone() }),
474 &filename,
475 delayed_buffer,
476 )
477 .unwrap();
478
479 assert_eq!(
480 handle
481 .delayed_buffer
482 .as_ref()
483 .unwrap()
484 .lock()
485 .unwrap()
486 .file_size,
487 10
488 );
489 handle.seek(SeekFrom::End(0)).unwrap();
490 handle.write_all(b"should go to buffer").unwrap();
491 assert_eq!(
492 handle
493 .delayed_buffer
494 .as_ref()
495 .unwrap()
496 .lock()
497 .unwrap()
498 .len(),
499 29
500 );
501 read_handle.seek(SeekFrom::Start(0)).unwrap();
502 // On-disk file contents should be unchanged
503 assert_eq!(
504 read_handle.read_exact(10).unwrap(),
505 b"new data90".to_vec(),
506 );
507
508 assert_eq!(
509 read_handle.read_exact(1).unwrap_err().kind(),
510 ErrorKind::UnexpectedEof
511 );
512
513 handle.flush().unwrap();
514 // On-disk file contents should still be unchanged after a flush
515 assert_eq!(
516 read_handle.read_exact(1).unwrap_err().kind(),
517 ErrorKind::UnexpectedEof
518 );
519
520 // Read from the buffer only
521 handle.seek(SeekFrom::End(-1)).unwrap();
522 assert_eq!(handle.read_exact(1).unwrap(), b"r".to_vec());
523
524 // Read from an overlapping section of file and buffer
525 handle.seek(SeekFrom::Start(6)).unwrap();
526 assert_eq!(
527 handle.read_exact(20).unwrap(),
528 b"ta90should go to buf".to_vec()
529 );
530
531 // Read from file only
532 handle.seek(SeekFrom::Start(0)).unwrap();
533 assert_eq!(handle.read_exact(8).unwrap(), b"new data".to_vec());
534 }
535 }
@@ -13,6 +13,7 use compression::{uncompressed_zstd_data
13 pub use node::{FromHexError, Node, NodePrefix};
13 pub use node::{FromHexError, Node, NodePrefix};
14 pub mod changelog;
14 pub mod changelog;
15 pub mod compression;
15 pub mod compression;
16 pub mod file_io;
16 pub mod filelog;
17 pub mod filelog;
17 pub mod index;
18 pub mod index;
18 pub mod manifest;
19 pub mod manifest;
General Comments 0
You need to be logged in to leave comments. Login now