diff --git a/rust/hg-core/src/revlog/file_io.rs b/rust/hg-core/src/revlog/file_io.rs new file mode 100644 --- /dev/null +++ b/rust/hg-core/src/revlog/file_io.rs @@ -0,0 +1,535 @@ +//! Helpers for revlog file reading and writing. + +use std::{ + cell::RefCell, + fs::File, + io::{Read, Seek, SeekFrom, Write}, + path::{Path, PathBuf}, + sync::{Arc, Mutex}, +}; + +use crate::{ + errors::{HgError, IoResultExt}, + vfs::Vfs, +}; + +/// Wraps accessing arbitrary chunks of data within a file and reusing handles. +/// This is currently useful for accessing a revlog's data file, only reading +/// the ranges that are currently relevant, like a sort of basic and manual +/// file-based mmap. +/// +/// XXX should this just be replaced with `mmap` + `madvise` ranges? +/// The upcoming `UncompressedChunkCache` will make up for most of the slowness +/// of re-reading the same chunks, so this might not be as useful. Aside from +/// the major benefit of having less code to take care of, using `mmap` will +/// allow multiple processes to share the same pages, especially for the +/// changelog and manifest, which would make a difference in server contexts. +pub struct RandomAccessFile { + /// The current store VFS to pass it to [`FileHandle`] + vfs: Box, + /// Filename of the open file, relative to the vfs root + pub filename: PathBuf, + /// The current read-only handle on the file, if any + pub reading_handle: RefCell>, + /// The current read-write handle on the file, if any + pub writing_handle: RefCell>, +} + +impl RandomAccessFile { + /// Wrap a file for random access + pub fn new(vfs: Box, filename: PathBuf) -> Self { + assert!(filename.is_relative()); + Self { + vfs, + filename, + reading_handle: RefCell::new(None), + writing_handle: RefCell::new(None), + } + } + + /// Read a chunk of bytes from the file. + pub fn read_chunk( + &self, + offset: usize, + length: usize, + ) -> Result, HgError> { + let mut handle = self.get_read_handle()?; + handle + .seek(SeekFrom::Start(offset as u64)) + .when_reading_file(&self.filename)?; + handle.read_exact(length).when_reading_file(&self.filename) + } + + /// `pub` only for hg-cpython + #[doc(hidden)] + pub fn get_read_handle(&self) -> Result { + if let Some(handle) = &*self.writing_handle.borrow() { + // Use a file handle being actively used for writes, if available. + // There is some danger to doing this because reads will seek the + // file. + // However, [`Revlog::write_entry`] performs a `SeekFrom::End(0)` + // before all writes, so we should be safe. + return Ok(handle.clone()); + } + if let Some(handle) = &*self.reading_handle.borrow() { + return Ok(handle.clone()); + } + // early returns done to work around borrowck being overzealous + // See https://github.com/rust-lang/rust/issues/103108 + let new_handle = FileHandle::new( + dyn_clone::clone_box(&*self.vfs), + &self.filename, + false, + false, + )?; + *self.reading_handle.borrow_mut() = Some(new_handle.clone()); + Ok(new_handle) + } + + /// `pub` only for hg-cpython + #[doc(hidden)] + pub fn exit_reading_context(&self) { + self.reading_handle.take(); + } + + // Returns whether this file currently open + pub fn is_open(&self) -> bool { + self.reading_handle.borrow().is_some() + || self.writing_handle.borrow().is_some() + } +} + +/// A buffer that holds new changelog index data that needs to be written +/// after the manifest and filelogs so that the repo is updated atomically to +/// external processes. +#[derive(Clone, Debug, Default)] +pub struct DelayedBuffer { + // The actual in-memory bytes storing the delayed writes + pub(super) buffer: Vec, + /// The current offset into the virtual file composed of file + buffer + offset: u64, + /// The size of the file at the time of opening + file_size: u64, +} + +impl DelayedBuffer { + /// Returns the length of the full data (on-disk + buffer length). + pub fn len(&self) -> u64 { + self.buffer.len() as u64 + self.file_size + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } +} + +/// Holds an open [`File`] and the related data. This can be used for reading +/// and writing. Writes can be delayed to a buffer before touching the disk, +/// if relevant (in the changelog case), but reads are transparent. +pub struct FileHandle { + /// The actual open file + pub file: File, + /// The VFS with which the file was opened + vfs: Box, + /// Filename of the open file, relative to the repo root + filename: PathBuf, + /// Buffer of delayed entry writes to the changelog index. This points + /// back to the buffer inside the revlog this handle refers to. + delayed_buffer: Option>>, +} + +impl std::fmt::Debug for FileHandle { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("FileHandle") + .field("filename", &self.filename) + .field("delayed_buffer", &self.delayed_buffer) + .field("file", &self.file) + .finish() + } +} + +impl Clone for FileHandle { + fn clone(&self) -> Self { + Self { + vfs: dyn_clone::clone_box(&*self.vfs), + filename: self.filename.clone(), + delayed_buffer: self.delayed_buffer.clone(), + // This can only fail if the OS doesn't have the file handle + // anymore, so we're not going to do anything useful anyway. + file: self.file.try_clone().expect("couldn't clone file handle"), + } + } +} + +impl FileHandle { + /// Get a (read or write) file handle to `filename`. Only creates the file + /// if `create` is `true`. + pub fn new( + vfs: Box, + filename: impl AsRef, + create: bool, + write: bool, + ) -> Result { + let file = if create { + vfs.create(filename.as_ref())? + } else if write { + vfs.open(filename.as_ref())? + } else { + vfs.open_read(filename.as_ref())? + }; + Ok(Self { + vfs, + filename: filename.as_ref().to_owned(), + delayed_buffer: None, + file, + }) + } + + /// Get a file handle to `filename`, but writes go to a [`DelayedBuffer`]. + pub fn new_delayed( + vfs: Box, + filename: impl AsRef, + create: bool, + delayed_buffer: Arc>, + ) -> Result { + let mut file = if create { + vfs.create(filename.as_ref())? + } else { + vfs.open(filename.as_ref())? + }; + let size = vfs.file_size(&file)?; + let offset = file + .stream_position() + .when_reading_file(filename.as_ref())?; + + { + let mut buf = delayed_buffer.lock().unwrap(); + buf.file_size = size; + buf.offset = offset; + } + + Ok(Self { + vfs, + filename: filename.as_ref().to_owned(), + delayed_buffer: Some(delayed_buffer), + file, + }) + } + + /// Wrap an existing [`File`] + pub fn from_file( + file: File, + vfs: Box, + filename: impl AsRef, + ) -> Self { + Self { + vfs, + filename: filename.as_ref().to_owned(), + delayed_buffer: None, + file, + } + } + + /// Wrap an existing [`File`], but writes go to a [`DelayedBuffer`]. + pub fn from_file_delayed( + mut file: File, + vfs: Box, + filename: impl AsRef, + delayed_buffer: Arc>, + ) -> Result { + let size = vfs.file_size(&file)?; + let offset = file + .stream_position() + .when_reading_file(filename.as_ref())?; + + { + let mut buf = delayed_buffer.lock().unwrap(); + buf.file_size = size; + buf.offset = offset; + } + + Ok(Self { + vfs, + filename: filename.as_ref().to_owned(), + delayed_buffer: Some(delayed_buffer), + file, + }) + } + + /// Move the position of the handle to `pos`, + /// spanning the [`DelayedBuffer`] if defined. Will return an error if + /// an invalid seek position is asked, or for any standard io error. + pub fn seek(&mut self, pos: SeekFrom) -> Result { + if let Some(delay_buf) = &self.delayed_buffer { + let mut delay_buf = delay_buf.lock().unwrap(); + // Virtual file offset spans real file and data + match pos { + SeekFrom::Start(offset) => delay_buf.offset = offset, + SeekFrom::End(offset) => { + delay_buf.offset = + delay_buf.len().saturating_add_signed(offset) + } + SeekFrom::Current(offset) => { + delay_buf.offset = + delay_buf.offset.saturating_add_signed(offset); + } + } + if delay_buf.offset < delay_buf.file_size { + self.file.seek(pos) + } else { + Ok(delay_buf.offset) + } + } else { + self.file.seek(pos) + } + } + + /// Read exactly `length` bytes from the current position. + /// Errors are the same as [`std::io::Read::read_exact`]. + pub fn read_exact( + &mut self, + length: usize, + ) -> Result, std::io::Error> { + if let Some(delay_buf) = self.delayed_buffer.as_mut() { + let mut delay_buf = delay_buf.lock().unwrap(); + let mut buf = vec![0; length]; + let offset: isize = + delay_buf.offset.try_into().expect("buffer too large"); + let file_size: isize = + delay_buf.file_size.try_into().expect("file too large"); + let span: isize = offset - file_size; + let length = length.try_into().expect("too large of a length"); + let absolute_span: u64 = + span.unsigned_abs().try_into().expect("length too large"); + if span < 0 { + if length <= absolute_span { + // We're only in the file + self.file.read_exact(&mut buf)?; + } else { + // We're spanning file and buffer + self.file + .read_exact(&mut buf[..absolute_span as usize])?; + delay_buf + .buffer + .take(length - absolute_span) + .read_exact(&mut buf[absolute_span as usize..])?; + } + } else { + // We're only in the buffer + delay_buf.buffer[absolute_span as usize..] + .take(length) + .read_exact(&mut buf)?; + } + delay_buf.offset += length; + Ok(buf.to_owned()) + } else { + let mut buf = vec![0; length]; + self.file.read_exact(&mut buf)?; + Ok(buf) + } + } + + /// Flush the in-memory changes to disk. This does *not* write the + /// delayed buffer, only the pending file changes. + pub fn flush(&mut self) -> Result<(), HgError> { + self.file.flush().when_writing_file(&self.filename) + } + + /// Return the current position in the file + pub fn position(&mut self) -> Result { + self.file + .stream_position() + .when_reading_file(&self.filename) + } + + /// Append `data` to the file, or to the [`DelayedBuffer`], if any. + pub fn write_all(&mut self, data: &[u8]) -> Result<(), HgError> { + if let Some(buf) = &mut self.delayed_buffer { + let mut delayed_buffer = buf.lock().expect("propagate the panic"); + assert_eq!(delayed_buffer.offset, delayed_buffer.len()); + delayed_buffer.buffer.extend_from_slice(data); + delayed_buffer.offset += data.len() as u64; + Ok(()) + } else { + self.file + .write_all(data) + .when_writing_file(&self.filename)?; + Ok(()) + } + } +} + +/// Write handles to a given revlog (index + maybe data) +#[derive(Debug)] +pub struct WriteHandles { + /// Handle to the index file + pub index_handle: FileHandle, + /// Handle to the data file, if the revlog is non-inline + pub data_handle: Option, +} + +#[cfg(test)] +mod tests { + use std::io::ErrorKind; + + use crate::vfs::VfsImpl; + + use super::*; + + #[test] + fn test_random_access_file() { + let base = tempfile::tempdir().unwrap().into_path(); + let filename = Path::new("a"); + let file_path = base.join(filename); + let raf = RandomAccessFile::new( + Box::new(VfsImpl { base }), + filename.to_owned(), + ); + + assert!(!raf.is_open()); + assert_eq!(&raf.filename, &filename); + // Should fail to read a non-existing file + match raf.get_read_handle().unwrap_err() { + HgError::IoError { error, .. } => match error.kind() { + std::io::ErrorKind::NotFound => {} + _ => panic!("should be not found"), + }, + e => panic!("{}", e.to_string()), + } + + std::fs::write(file_path, b"1234567890").unwrap(); + + // Should be able to open an existing file + let mut handle = raf.get_read_handle().unwrap(); + assert!(raf.is_open()); + assert_eq!(handle.read_exact(10).unwrap(), b"1234567890".to_vec()); + } + + #[test] + fn test_file_handle() { + let base = tempfile::tempdir().unwrap().into_path(); + let filename = base.join("a"); + // No `create` should fail + FileHandle::new( + Box::new(VfsImpl { base: base.clone() }), + &filename, + false, + false, + ) + .unwrap_err(); + std::fs::write(&filename, b"1234567890").unwrap(); + + let mut read_handle = FileHandle::new( + Box::new(VfsImpl { base: base.clone() }), + &filename, + false, + false, + ) + .unwrap(); + assert_eq!(&read_handle.filename, &filename); + assert_eq!(read_handle.position().unwrap(), 0); + + // Writing to an explicit read handle should fail + read_handle.write_all(b"some data").unwrap_err(); + + // reading exactly n bytes should work + assert_eq!(read_handle.read_exact(3).unwrap(), b"123".to_vec()); + // and the position should be remembered + assert_eq!(read_handle.read_exact(2).unwrap(), b"45".to_vec()); + + // Seeking should work + let position = read_handle.position().unwrap(); + read_handle.seek(SeekFrom::Current(-2)).unwrap(); + assert_eq!(position - 2, read_handle.position().unwrap()); + + // Seeking too much data should fail + read_handle.read_exact(1000).unwrap_err(); + + // Work around the yet unimplemented VFS for write + let mut options = std::fs::OpenOptions::new(); + options.read(true); + options.write(true); + let file = options.open(&filename).unwrap(); + // Open a write handle + let mut handle = FileHandle::from_file( + file, + Box::new(VfsImpl { base: base.clone() }), + &filename, + ); + + // Now writing should succeed + handle.write_all(b"new data").unwrap(); + // Opening or writing does not seek, so we should be at the start + assert_eq!(handle.position().unwrap(), 8); + // We can still read + assert_eq!(handle.read_exact(2).unwrap(), b"90".to_vec()); + // Flushing doesn't do anything unexpected + handle.flush().unwrap(); + + let delayed_buffer = Arc::new(Mutex::new(DelayedBuffer::default())); + let file = options.open(&filename).unwrap(); + let mut handle = FileHandle::from_file_delayed( + file, + Box::new(VfsImpl { base: base.clone() }), + &filename, + delayed_buffer, + ) + .unwrap(); + + assert_eq!( + handle + .delayed_buffer + .as_ref() + .unwrap() + .lock() + .unwrap() + .file_size, + 10 + ); + handle.seek(SeekFrom::End(0)).unwrap(); + handle.write_all(b"should go to buffer").unwrap(); + assert_eq!( + handle + .delayed_buffer + .as_ref() + .unwrap() + .lock() + .unwrap() + .len(), + 29 + ); + read_handle.seek(SeekFrom::Start(0)).unwrap(); + // On-disk file contents should be unchanged + assert_eq!( + read_handle.read_exact(10).unwrap(), + b"new data90".to_vec(), + ); + + assert_eq!( + read_handle.read_exact(1).unwrap_err().kind(), + ErrorKind::UnexpectedEof + ); + + handle.flush().unwrap(); + // On-disk file contents should still be unchanged after a flush + assert_eq!( + read_handle.read_exact(1).unwrap_err().kind(), + ErrorKind::UnexpectedEof + ); + + // Read from the buffer only + handle.seek(SeekFrom::End(-1)).unwrap(); + assert_eq!(handle.read_exact(1).unwrap(), b"r".to_vec()); + + // Read from an overlapping section of file and buffer + handle.seek(SeekFrom::Start(6)).unwrap(); + assert_eq!( + handle.read_exact(20).unwrap(), + b"ta90should go to buf".to_vec() + ); + + // Read from file only + handle.seek(SeekFrom::Start(0)).unwrap(); + assert_eq!(handle.read_exact(8).unwrap(), b"new data".to_vec()); + } +} diff --git a/rust/hg-core/src/revlog/mod.rs b/rust/hg-core/src/revlog/mod.rs --- a/rust/hg-core/src/revlog/mod.rs +++ b/rust/hg-core/src/revlog/mod.rs @@ -13,6 +13,7 @@ use compression::{uncompressed_zstd_data pub use node::{FromHexError, Node, NodePrefix}; pub mod changelog; pub mod compression; +pub mod file_io; pub mod filelog; pub mod index; pub mod manifest;