##// END OF EJS Templates
revlog: add the glue to use the Rust `InnerRevlog` from Python...
revlog: add the glue to use the Rust `InnerRevlog` from Python The performance of this has been looked at for quite some time, and some workflows are actually quite a bit faster than with the Python + C code. However, we are still (up to 20%) slower in some crucial places like cloning certain repos, log, cat, which makes this an incomplete rewrite. This is mostly due to the high amount of overhead in Python <-> Rust FFI, especially around the VFS code. A future patch series will rewrite the VFS code in pure Rust, which should hopefully get us up to par with current perfomance, if not better in all important cases. This is a "save state" of sorts, as this is a ton of code, and I don't want to pile up even more things in a single review. Continuing to try to match the current performance will take an extremely long time, if it's not impossible, without the aforementioned VFS work.

File last commit:

r53052:426696af default
r53060:7346f93b default
Show More
file_io.rs
535 lines | 17.6 KiB | application/rls-services+xml | RustLexer
//! Helpers for revlog file reading and writing.
use std::{
cell::RefCell,
fs::File,
io::{Read, Seek, SeekFrom, Write},
path::{Path, PathBuf},
sync::{Arc, Mutex},
};
use crate::{
errors::{HgError, IoResultExt},
vfs::Vfs,
};
/// Wraps accessing arbitrary chunks of data within a file and reusing handles.
/// This is currently useful for accessing a revlog's data file, only reading
/// the ranges that are currently relevant, like a sort of basic and manual
/// file-based mmap.
///
/// XXX should this just be replaced with `mmap` + `madvise` ranges?
/// The upcoming `UncompressedChunkCache` will make up for most of the slowness
/// of re-reading the same chunks, so this might not be as useful. Aside from
/// the major benefit of having less code to take care of, using `mmap` will
/// allow multiple processes to share the same pages, especially for the
/// changelog and manifest, which would make a difference in server contexts.
pub struct RandomAccessFile {
/// The current store VFS to pass it to [`FileHandle`]
vfs: Box<dyn Vfs>,
/// Filename of the open file, relative to the vfs root
pub filename: PathBuf,
/// The current read-only handle on the file, if any
pub reading_handle: RefCell<Option<FileHandle>>,
/// The current read-write handle on the file, if any
pub writing_handle: RefCell<Option<FileHandle>>,
}
impl RandomAccessFile {
/// Wrap a file for random access
pub fn new(vfs: Box<dyn Vfs>, filename: PathBuf) -> Self {
assert!(filename.is_relative());
Self {
vfs,
filename,
reading_handle: RefCell::new(None),
writing_handle: RefCell::new(None),
}
}
/// Read a chunk of bytes from the file.
pub fn read_chunk(
&self,
offset: usize,
length: usize,
) -> Result<Vec<u8>, HgError> {
let mut handle = self.get_read_handle()?;
handle
.seek(SeekFrom::Start(offset as u64))
.when_reading_file(&self.filename)?;
handle.read_exact(length).when_reading_file(&self.filename)
}
/// `pub` only for hg-cpython
#[doc(hidden)]
pub fn get_read_handle(&self) -> Result<FileHandle, HgError> {
if let Some(handle) = &*self.writing_handle.borrow() {
// Use a file handle being actively used for writes, if available.
// There is some danger to doing this because reads will seek the
// file.
// However, [`Revlog::write_entry`] performs a `SeekFrom::End(0)`
// before all writes, so we should be safe.
return Ok(handle.clone());
}
if let Some(handle) = &*self.reading_handle.borrow() {
return Ok(handle.clone());
}
// early returns done to work around borrowck being overzealous
// See https://github.com/rust-lang/rust/issues/103108
let new_handle = FileHandle::new(
dyn_clone::clone_box(&*self.vfs),
&self.filename,
false,
false,
)?;
*self.reading_handle.borrow_mut() = Some(new_handle.clone());
Ok(new_handle)
}
/// `pub` only for hg-cpython
#[doc(hidden)]
pub fn exit_reading_context(&self) {
self.reading_handle.take();
}
// Returns whether this file currently open
pub fn is_open(&self) -> bool {
self.reading_handle.borrow().is_some()
|| self.writing_handle.borrow().is_some()
}
}
/// A buffer that holds new changelog index data that needs to be written
/// after the manifest and filelogs so that the repo is updated atomically to
/// external processes.
#[derive(Clone, Debug, Default)]
pub struct DelayedBuffer {
// The actual in-memory bytes storing the delayed writes
pub(super) buffer: Vec<u8>,
/// The current offset into the virtual file composed of file + buffer
offset: u64,
/// The size of the file at the time of opening
file_size: u64,
}
impl DelayedBuffer {
/// Returns the length of the full data (on-disk + buffer length).
pub fn len(&self) -> u64 {
self.buffer.len() as u64 + self.file_size
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
/// Holds an open [`File`] and the related data. This can be used for reading
/// and writing. Writes can be delayed to a buffer before touching the disk,
/// if relevant (in the changelog case), but reads are transparent.
pub struct FileHandle {
/// The actual open file
pub file: File,
/// The VFS with which the file was opened
vfs: Box<dyn Vfs>,
/// Filename of the open file, relative to the repo root
filename: PathBuf,
/// Buffer of delayed entry writes to the changelog index. This points
/// back to the buffer inside the revlog this handle refers to.
delayed_buffer: Option<Arc<Mutex<DelayedBuffer>>>,
}
impl std::fmt::Debug for FileHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FileHandle")
.field("filename", &self.filename)
.field("delayed_buffer", &self.delayed_buffer)
.field("file", &self.file)
.finish()
}
}
impl Clone for FileHandle {
fn clone(&self) -> Self {
Self {
vfs: dyn_clone::clone_box(&*self.vfs),
filename: self.filename.clone(),
delayed_buffer: self.delayed_buffer.clone(),
// This can only fail if the OS doesn't have the file handle
// anymore, so we're not going to do anything useful anyway.
file: self.file.try_clone().expect("couldn't clone file handle"),
}
}
}
impl FileHandle {
/// Get a (read or write) file handle to `filename`. Only creates the file
/// if `create` is `true`.
pub fn new(
vfs: Box<dyn Vfs>,
filename: impl AsRef<Path>,
create: bool,
write: bool,
) -> Result<Self, HgError> {
let file = if create {
vfs.create(filename.as_ref())?
} else if write {
vfs.open(filename.as_ref())?
} else {
vfs.open_read(filename.as_ref())?
};
Ok(Self {
vfs,
filename: filename.as_ref().to_owned(),
delayed_buffer: None,
file,
})
}
/// Get a file handle to `filename`, but writes go to a [`DelayedBuffer`].
pub fn new_delayed(
vfs: Box<dyn Vfs>,
filename: impl AsRef<Path>,
create: bool,
delayed_buffer: Arc<Mutex<DelayedBuffer>>,
) -> Result<Self, HgError> {
let mut file = if create {
vfs.create(filename.as_ref())?
} else {
vfs.open(filename.as_ref())?
};
let size = vfs.file_size(&file)?;
let offset = file
.stream_position()
.when_reading_file(filename.as_ref())?;
{
let mut buf = delayed_buffer.lock().unwrap();
buf.file_size = size;
buf.offset = offset;
}
Ok(Self {
vfs,
filename: filename.as_ref().to_owned(),
delayed_buffer: Some(delayed_buffer),
file,
})
}
/// Wrap an existing [`File`]
pub fn from_file(
file: File,
vfs: Box<dyn Vfs>,
filename: impl AsRef<Path>,
) -> Self {
Self {
vfs,
filename: filename.as_ref().to_owned(),
delayed_buffer: None,
file,
}
}
/// Wrap an existing [`File`], but writes go to a [`DelayedBuffer`].
pub fn from_file_delayed(
mut file: File,
vfs: Box<dyn Vfs>,
filename: impl AsRef<Path>,
delayed_buffer: Arc<Mutex<DelayedBuffer>>,
) -> Result<Self, HgError> {
let size = vfs.file_size(&file)?;
let offset = file
.stream_position()
.when_reading_file(filename.as_ref())?;
{
let mut buf = delayed_buffer.lock().unwrap();
buf.file_size = size;
buf.offset = offset;
}
Ok(Self {
vfs,
filename: filename.as_ref().to_owned(),
delayed_buffer: Some(delayed_buffer),
file,
})
}
/// Move the position of the handle to `pos`,
/// spanning the [`DelayedBuffer`] if defined. Will return an error if
/// an invalid seek position is asked, or for any standard io error.
pub fn seek(&mut self, pos: SeekFrom) -> Result<u64, std::io::Error> {
if let Some(delay_buf) = &self.delayed_buffer {
let mut delay_buf = delay_buf.lock().unwrap();
// Virtual file offset spans real file and data
match pos {
SeekFrom::Start(offset) => delay_buf.offset = offset,
SeekFrom::End(offset) => {
delay_buf.offset =
delay_buf.len().saturating_add_signed(offset)
}
SeekFrom::Current(offset) => {
delay_buf.offset =
delay_buf.offset.saturating_add_signed(offset);
}
}
if delay_buf.offset < delay_buf.file_size {
self.file.seek(pos)
} else {
Ok(delay_buf.offset)
}
} else {
self.file.seek(pos)
}
}
/// Read exactly `length` bytes from the current position.
/// Errors are the same as [`std::io::Read::read_exact`].
pub fn read_exact(
&mut self,
length: usize,
) -> Result<Vec<u8>, std::io::Error> {
if let Some(delay_buf) = self.delayed_buffer.as_mut() {
let mut delay_buf = delay_buf.lock().unwrap();
let mut buf = vec![0; length];
let offset: isize =
delay_buf.offset.try_into().expect("buffer too large");
let file_size: isize =
delay_buf.file_size.try_into().expect("file too large");
let span: isize = offset - file_size;
let length = length.try_into().expect("too large of a length");
let absolute_span: u64 =
span.unsigned_abs().try_into().expect("length too large");
if span < 0 {
if length <= absolute_span {
// We're only in the file
self.file.read_exact(&mut buf)?;
} else {
// We're spanning file and buffer
self.file
.read_exact(&mut buf[..absolute_span as usize])?;
delay_buf
.buffer
.take(length - absolute_span)
.read_exact(&mut buf[absolute_span as usize..])?;
}
} else {
// We're only in the buffer
delay_buf.buffer[absolute_span as usize..]
.take(length)
.read_exact(&mut buf)?;
}
delay_buf.offset += length;
Ok(buf.to_owned())
} else {
let mut buf = vec![0; length];
self.file.read_exact(&mut buf)?;
Ok(buf)
}
}
/// Flush the in-memory changes to disk. This does *not* write the
/// delayed buffer, only the pending file changes.
pub fn flush(&mut self) -> Result<(), HgError> {
self.file.flush().when_writing_file(&self.filename)
}
/// Return the current position in the file
pub fn position(&mut self) -> Result<u64, HgError> {
self.file
.stream_position()
.when_reading_file(&self.filename)
}
/// Append `data` to the file, or to the [`DelayedBuffer`], if any.
pub fn write_all(&mut self, data: &[u8]) -> Result<(), HgError> {
if let Some(buf) = &mut self.delayed_buffer {
let mut delayed_buffer = buf.lock().expect("propagate the panic");
assert_eq!(delayed_buffer.offset, delayed_buffer.len());
delayed_buffer.buffer.extend_from_slice(data);
delayed_buffer.offset += data.len() as u64;
Ok(())
} else {
self.file
.write_all(data)
.when_writing_file(&self.filename)?;
Ok(())
}
}
}
/// Write handles to a given revlog (index + maybe data)
#[derive(Debug)]
pub struct WriteHandles {
/// Handle to the index file
pub index_handle: FileHandle,
/// Handle to the data file, if the revlog is non-inline
pub data_handle: Option<FileHandle>,
}
#[cfg(test)]
mod tests {
use std::io::ErrorKind;
use crate::vfs::VfsImpl;
use super::*;
#[test]
fn test_random_access_file() {
let base = tempfile::tempdir().unwrap().into_path();
let filename = Path::new("a");
let file_path = base.join(filename);
let raf = RandomAccessFile::new(
Box::new(VfsImpl { base }),
filename.to_owned(),
);
assert!(!raf.is_open());
assert_eq!(&raf.filename, &filename);
// Should fail to read a non-existing file
match raf.get_read_handle().unwrap_err() {
HgError::IoError { error, .. } => match error.kind() {
std::io::ErrorKind::NotFound => {}
_ => panic!("should be not found"),
},
e => panic!("{}", e.to_string()),
}
std::fs::write(file_path, b"1234567890").unwrap();
// Should be able to open an existing file
let mut handle = raf.get_read_handle().unwrap();
assert!(raf.is_open());
assert_eq!(handle.read_exact(10).unwrap(), b"1234567890".to_vec());
}
#[test]
fn test_file_handle() {
let base = tempfile::tempdir().unwrap().into_path();
let filename = base.join("a");
// No `create` should fail
FileHandle::new(
Box::new(VfsImpl { base: base.clone() }),
&filename,
false,
false,
)
.unwrap_err();
std::fs::write(&filename, b"1234567890").unwrap();
let mut read_handle = FileHandle::new(
Box::new(VfsImpl { base: base.clone() }),
&filename,
false,
false,
)
.unwrap();
assert_eq!(&read_handle.filename, &filename);
assert_eq!(read_handle.position().unwrap(), 0);
// Writing to an explicit read handle should fail
read_handle.write_all(b"some data").unwrap_err();
// reading exactly n bytes should work
assert_eq!(read_handle.read_exact(3).unwrap(), b"123".to_vec());
// and the position should be remembered
assert_eq!(read_handle.read_exact(2).unwrap(), b"45".to_vec());
// Seeking should work
let position = read_handle.position().unwrap();
read_handle.seek(SeekFrom::Current(-2)).unwrap();
assert_eq!(position - 2, read_handle.position().unwrap());
// Seeking too much data should fail
read_handle.read_exact(1000).unwrap_err();
// Work around the yet unimplemented VFS for write
let mut options = std::fs::OpenOptions::new();
options.read(true);
options.write(true);
let file = options.open(&filename).unwrap();
// Open a write handle
let mut handle = FileHandle::from_file(
file,
Box::new(VfsImpl { base: base.clone() }),
&filename,
);
// Now writing should succeed
handle.write_all(b"new data").unwrap();
// Opening or writing does not seek, so we should be at the start
assert_eq!(handle.position().unwrap(), 8);
// We can still read
assert_eq!(handle.read_exact(2).unwrap(), b"90".to_vec());
// Flushing doesn't do anything unexpected
handle.flush().unwrap();
let delayed_buffer = Arc::new(Mutex::new(DelayedBuffer::default()));
let file = options.open(&filename).unwrap();
let mut handle = FileHandle::from_file_delayed(
file,
Box::new(VfsImpl { base: base.clone() }),
&filename,
delayed_buffer,
)
.unwrap();
assert_eq!(
handle
.delayed_buffer
.as_ref()
.unwrap()
.lock()
.unwrap()
.file_size,
10
);
handle.seek(SeekFrom::End(0)).unwrap();
handle.write_all(b"should go to buffer").unwrap();
assert_eq!(
handle
.delayed_buffer
.as_ref()
.unwrap()
.lock()
.unwrap()
.len(),
29
);
read_handle.seek(SeekFrom::Start(0)).unwrap();
// On-disk file contents should be unchanged
assert_eq!(
read_handle.read_exact(10).unwrap(),
b"new data90".to_vec(),
);
assert_eq!(
read_handle.read_exact(1).unwrap_err().kind(),
ErrorKind::UnexpectedEof
);
handle.flush().unwrap();
// On-disk file contents should still be unchanged after a flush
assert_eq!(
read_handle.read_exact(1).unwrap_err().kind(),
ErrorKind::UnexpectedEof
);
// Read from the buffer only
handle.seek(SeekFrom::End(-1)).unwrap();
assert_eq!(handle.read_exact(1).unwrap(), b"r".to_vec());
// Read from an overlapping section of file and buffer
handle.seek(SeekFrom::Start(6)).unwrap();
assert_eq!(
handle.read_exact(20).unwrap(),
b"ta90should go to buf".to_vec()
);
// Read from file only
handle.seek(SeekFrom::Start(0)).unwrap();
assert_eq!(handle.read_exact(8).unwrap(), b"new data".to_vec());
}
}