update.rs
491 lines
| 18.1 KiB
| application/rls-services+xml
|
RustLexer
Raphaël Gomès
|
r52953 | //! Tools for moving the repository to a given revision | ||
use std::{ | ||||
fs::Permissions, | ||||
io::Write, | ||||
os::unix::fs::{MetadataExt, PermissionsExt}, | ||||
path::Path, | ||||
time::Duration, | ||||
}; | ||||
use crate::{ | ||||
dirstate::{ParentFileData, TruncatedTimestamp}, | ||||
dirstate_tree::{ | ||||
dirstate_map::DirstateEntryReset, on_disk::write_tracked_key, | ||||
}, | ||||
errors::{HgError, IoResultExt}, | ||||
exit_codes, | ||||
filelog::Filelog, | ||||
narrow, | ||||
node::NULL_NODE, | ||||
operations::{list_rev_tracked_files, ExpandedManifestEntry}, | ||||
progress::Progress, | ||||
repo::Repo, | ||||
sparse, | ||||
utils::{ | ||||
files::{filesystem_now, get_path_from_bytes}, | ||||
hg_path::{hg_path_to_path_buf, HgPath, HgPathError}, | ||||
path_auditor::PathAuditor, | ||||
}, | ||||
vfs::{is_on_nfs_mount, VfsImpl}, | ||||
DirstateParents, RevlogError, RevlogOpenOptions, UncheckedRevision, | ||||
}; | ||||
use crossbeam_channel::{Receiver, Sender}; | ||||
use rayon::prelude::*; | ||||
fn write_dirstate(repo: &Repo) -> Result<(), HgError> { | ||||
repo.write_dirstate() | ||||
.map_err(|e| HgError::abort(e.to_string(), exit_codes::ABORT, None))?; | ||||
write_tracked_key(repo) | ||||
} | ||||
/// Update the current working copy of `repo` to the given revision `to`, from | ||||
/// the null revision and set + write out the dirstate to reflect that. | ||||
/// | ||||
/// Do not call this outside of a Python context. This does *not* handle any | ||||
/// of the checks, hooks, lock taking needed to setup and get out of this | ||||
/// update from the null revision. | ||||
pub fn update_from_null( | ||||
repo: &Repo, | ||||
to: UncheckedRevision, | ||||
progress: &dyn Progress, | ||||
) -> Result<usize, HgError> { | ||||
// Ignore the warnings, they've been displayed by Python already | ||||
// TODO non-Python clients: display narrow warnings | ||||
let (narrow_matcher, _) = narrow::matcher(repo)?; | ||||
let files_for_rev = list_rev_tracked_files(repo, to, narrow_matcher) | ||||
.map_err(handle_revlog_error)?; | ||||
repo.manually_set_parents(DirstateParents { | ||||
p1: repo.node(to).expect("update target should exist"), | ||||
p2: NULL_NODE, | ||||
})?; | ||||
// Filter the working copy according to the sparse spec | ||||
let tracked_files: Result<Vec<_>, _> = if !repo.has_sparse() { | ||||
files_for_rev.iter().collect() | ||||
} else { | ||||
// Ignore the warnings, they've been displayed by Python already | ||||
// TODO non-Python clients: display sparse warnings | ||||
let (sparse_matcher, _) = sparse::matcher(repo)?; | ||||
files_for_rev | ||||
.iter() | ||||
.filter(|f| { | ||||
match f { | ||||
Ok(f) => sparse_matcher.matches(f.0), | ||||
Err(_) => true, // Errors stop the update, include them | ||||
} | ||||
}) | ||||
.collect() | ||||
}; | ||||
let tracked_files = tracked_files?; | ||||
if tracked_files.is_empty() { | ||||
// Still write the dirstate because we might not be in the null | ||||
// revision. | ||||
// This can happen in narrow repos where all paths are excluded in | ||||
// this revision. | ||||
write_dirstate(repo)?; | ||||
return Ok(0); | ||||
} | ||||
let store_vfs = &repo.store_vfs(); | ||||
let options = repo.default_revlog_options(crate::RevlogType::Filelog)?; | ||||
let (errors_sender, errors_receiver) = crossbeam_channel::unbounded(); | ||||
let (files_sender, files_receiver) = crossbeam_channel::unbounded(); | ||||
let working_directory_path = &repo.working_directory_path(); | ||||
let files_count = tracked_files.len(); | ||||
let chunks = chunk_tracked_files(tracked_files); | ||||
progress.update(0, Some(files_count as u64)); | ||||
create_working_copy( | ||||
chunks, | ||||
working_directory_path, | ||||
store_vfs, | ||||
options, | ||||
files_sender, | ||||
errors_sender, | ||||
progress, | ||||
); | ||||
let errors: Vec<HgError> = errors_receiver.iter().collect(); | ||||
if !errors.is_empty() { | ||||
log::debug!("{} errors during update (see trace logs)", errors.len()); | ||||
for error in errors.iter() { | ||||
log::trace!("{}", error); | ||||
} | ||||
// Best we can do is raise the first error (in order of the channel) | ||||
return Err(errors.into_iter().next().expect("can never be empty")); | ||||
} | ||||
// TODO try to run this concurrently to update the dirstate while we're | ||||
// still writing out the working copy to see if that improves performance. | ||||
let total = update_dirstate(repo, files_receiver)?; | ||||
write_dirstate(repo)?; | ||||
Ok(total) | ||||
} | ||||
fn handle_revlog_error(e: RevlogError) -> HgError { | ||||
match e { | ||||
crate::RevlogError::Other(hg_error) => hg_error, | ||||
e => HgError::abort( | ||||
format!("revlog error: {}", e), | ||||
exit_codes::ABORT, | ||||
None, | ||||
), | ||||
} | ||||
} | ||||
/// Preallocated size of Vec holding directory contents. This aims at | ||||
/// preventing the need for re-allocating the Vec in most cases. | ||||
/// | ||||
/// The value is arbitrarily picked as a little over an average number of files | ||||
/// per directory done by looking at a few larger open-source repos. | ||||
/// Most of the runtime is IO anyway, so this doesn't matter too much. | ||||
const FILES_PER_DIRECTORY: usize = 16; | ||||
/// Chunk files per directory prefix, so almost every directory is handled | ||||
/// in a separate thread, which works around the FS inode mutex. | ||||
/// Chunking less (and doing approximately `files_count`/`threads`) actually | ||||
/// ends up being less performant: my hypothesis is `rayon`'s work stealing | ||||
/// being more efficient with tasks of varying lengths. | ||||
#[logging_timer::time("trace")] | ||||
fn chunk_tracked_files( | ||||
tracked_files: Vec<ExpandedManifestEntry>, | ||||
) -> Vec<(&HgPath, Vec<ExpandedManifestEntry>)> { | ||||
let files_count = tracked_files.len(); | ||||
let mut chunks = Vec::with_capacity(files_count / FILES_PER_DIRECTORY); | ||||
let mut current_chunk = Vec::with_capacity(FILES_PER_DIRECTORY); | ||||
let mut last_directory = tracked_files[0].0.parent(); | ||||
for file_info in tracked_files { | ||||
let current_directory = file_info.0.parent(); | ||||
let different_directory = current_directory != last_directory; | ||||
if different_directory { | ||||
chunks.push((last_directory, current_chunk)); | ||||
current_chunk = Vec::with_capacity(FILES_PER_DIRECTORY); | ||||
} | ||||
current_chunk.push(file_info); | ||||
last_directory = current_directory; | ||||
} | ||||
chunks.push((last_directory, current_chunk)); | ||||
chunks | ||||
} | ||||
#[logging_timer::time("trace")] | ||||
fn create_working_copy<'a: 'b, 'b>( | ||||
chunks: Vec<(&HgPath, Vec<ExpandedManifestEntry<'a>>)>, | ||||
working_directory_path: &Path, | ||||
store_vfs: &VfsImpl, | ||||
options: RevlogOpenOptions, | ||||
files_sender: Sender<(&'b HgPath, u32, usize, TruncatedTimestamp)>, | ||||
error_sender: Sender<HgError>, | ||||
progress: &dyn Progress, | ||||
) { | ||||
let auditor = PathAuditor::new(working_directory_path); | ||||
chunks.into_par_iter().for_each(|(dir_path, chunk)| { | ||||
if let Err(e) = working_copy_worker( | ||||
dir_path, | ||||
chunk, | ||||
working_directory_path, | ||||
store_vfs, | ||||
options, | ||||
&files_sender, | ||||
progress, | ||||
&auditor, | ||||
) { | ||||
error_sender | ||||
.send(e) | ||||
.expect("channel should not be disconnected") | ||||
} | ||||
}); | ||||
} | ||||
/// Represents a work unit for a single thread, responsible for this set of | ||||
/// files and restoring them to the working copy. | ||||
#[allow(clippy::too_many_arguments)] | ||||
fn working_copy_worker<'a: 'b, 'b>( | ||||
dir_path: &HgPath, | ||||
chunk: Vec<ExpandedManifestEntry<'a>>, | ||||
working_directory_path: &Path, | ||||
store_vfs: &VfsImpl, | ||||
options: RevlogOpenOptions, | ||||
files_sender: &Sender<(&'b HgPath, u32, usize, TruncatedTimestamp)>, | ||||
progress: &dyn Progress, | ||||
auditor: &PathAuditor, | ||||
) -> Result<(), HgError> { | ||||
let dir_path = | ||||
hg_path_to_path_buf(dir_path).expect("invalid path in manifest"); | ||||
let dir_path = working_directory_path.join(dir_path); | ||||
std::fs::create_dir_all(&dir_path).when_writing_file(&dir_path)?; | ||||
for (file, file_node, flags) in chunk { | ||||
auditor.audit_path(file)?; | ||||
let flags = flags.map(|f| f.into()); | ||||
let path = | ||||
working_directory_path.join(get_path_from_bytes(file.as_bytes())); | ||||
// Treemanifest is not supported | ||||
assert!(flags != Some(b't')); | ||||
let filelog = Filelog::open_vfs(store_vfs, file, options)?; | ||||
let filelog_revision_data = &filelog | ||||
.data_for_node(file_node) | ||||
.map_err(handle_revlog_error)?; | ||||
let file_data = filelog_revision_data.file_data()?; | ||||
if flags == Some(b'l') { | ||||
let target = get_path_from_bytes(file_data); | ||||
if let Err(e) = std::os::unix::fs::symlink(target, &path) { | ||||
// If the path already exists either: | ||||
// - another process created this file while ignoring the | ||||
// lock => error | ||||
// - our check for the fast path is incorrect => error | ||||
// - this is a malicious repo/bundle and this is symlink that | ||||
// tries to write things where it shouldn't be able to. | ||||
match e.kind() { | ||||
std::io::ErrorKind::AlreadyExists => { | ||||
let metadata = std::fs::symlink_metadata(&path) | ||||
.when_reading_file(&path)?; | ||||
if metadata.is_dir() { | ||||
return Err(HgError::Path( | ||||
HgPathError::TraversesSymbolicLink { | ||||
// Technically it should be one of the | ||||
// children, but good enough | ||||
path: file | ||||
.join(HgPath::new(b"*")) | ||||
.to_owned(), | ||||
symlink: file.to_owned(), | ||||
}, | ||||
)); | ||||
} | ||||
return Err(e).when_writing_file(&path); | ||||
} | ||||
_ => return Err(e).when_writing_file(&path), | ||||
} | ||||
} | ||||
} else { | ||||
let mut f = | ||||
std::fs::File::create(&path).when_writing_file(&path)?; | ||||
f.write_all(file_data).when_writing_file(&path)?; | ||||
} | ||||
if flags == Some(b'x') { | ||||
std::fs::set_permissions(&path, Permissions::from_mode(0o755)) | ||||
.when_writing_file(&path)?; | ||||
} | ||||
let metadata = | ||||
std::fs::symlink_metadata(&path).when_reading_file(&path)?; | ||||
let mode = metadata.mode(); | ||||
files_sender | ||||
.send(( | ||||
file, | ||||
mode, | ||||
file_data.len(), | ||||
TruncatedTimestamp::for_mtime_of(&metadata) | ||||
.when_reading_file(&path)?, | ||||
)) | ||||
.expect("channel should not be closed"); | ||||
progress.increment(1, None); | ||||
} | ||||
Ok(()) | ||||
} | ||||
#[logging_timer::time("trace")] | ||||
fn update_dirstate( | ||||
repo: &Repo, | ||||
files_receiver: Receiver<(&HgPath, u32, usize, TruncatedTimestamp)>, | ||||
) -> Result<usize, HgError> { | ||||
let mut dirstate = repo | ||||
.dirstate_map_mut() | ||||
.map_err(|e| HgError::abort(e.to_string(), exit_codes::ABORT, None))?; | ||||
// (see the comments in `filter_ambiguous_files` in `merge.py` for more) | ||||
// It turns out that (on Linux at least) the filesystem resolution time | ||||
// for most filesystems is based on the HZ kernel config. Their internal | ||||
// clocks do return nanoseconds if the hardware clock is precise enough, | ||||
// which should be the case on most recent computers but are only updated | ||||
// every few milliseconds at best (every "jiffy"). | ||||
// | ||||
// We are still not concerned with fixing the race with other | ||||
// processes that might modify the working copy right after it was created | ||||
// within the same tick, because it is impossible to catch. | ||||
// However, we might as well not race with operations that could run right | ||||
// after this one, especially other Mercurial operations that could be | ||||
// waiting for the wlock to change file contents and the dirstate. | ||||
// | ||||
// Thus: wait until the filesystem clock has ticked to filter ambiguous | ||||
// entries and write the dirstate, but only for dirstate-v2, since v1 only | ||||
// has second-level granularity and waiting for a whole second is too much | ||||
// of a penalty in the general case. | ||||
// Although we're assuming that people running dirstate-v2 on Linux | ||||
// don't have a second-granularity FS (with the exclusion of NFS), users | ||||
// can be surprising, and at some point in the future dirstate-v2 will | ||||
// become the default. To that end, we limit the wait time to 100ms and | ||||
// fall back to the filter method in case of a timeout. | ||||
// | ||||
// +------------+------+--------------+ | ||||
// | version | wait | filter level | | ||||
// +------------+------+--------------+ | ||||
// | V1 | No | Second | | ||||
// | V2 | Yes | Nanosecond | | ||||
// | V2-slow-fs | No | Second | | ||||
// +------------+------+--------------+ | ||||
let dirstate_v2 = repo.use_dirstate_v2(); | ||||
// Let's ignore NFS right off the bat | ||||
let mut fast_enough_fs = !is_on_nfs_mount(repo.working_directory_path()); | ||||
let fs_time_now = if dirstate_v2 && fast_enough_fs { | ||||
match wait_until_fs_tick(repo.working_directory_path()) { | ||||
None => None, | ||||
Some(Ok(time)) => Some(time), | ||||
Some(Err(time)) => { | ||||
fast_enough_fs = false; | ||||
Some(time) | ||||
} | ||||
} | ||||
} else { | ||||
filesystem_now(repo.working_directory_path()) | ||||
.ok() | ||||
.map(TruncatedTimestamp::from) | ||||
}; | ||||
let mut total = 0; | ||||
for (filename, mode, size, mtime) in files_receiver.into_iter() { | ||||
total += 1; | ||||
// When using dirstate-v2 on a filesystem with reasonable performance | ||||
// this is basically always true unless you get a mtime from the | ||||
// far future. | ||||
let has_meaningful_mtime = if let Some(fs_time) = fs_time_now { | ||||
mtime.for_reliable_mtime_of_self(&fs_time).is_some_and(|t| { | ||||
// Dirstate-v1 only has second-level information | ||||
!t.second_ambiguous || dirstate_v2 && fast_enough_fs | ||||
}) | ||||
} else { | ||||
// We somehow failed to write to the filesystem, so don't store | ||||
// the cache information. | ||||
false | ||||
}; | ||||
let reset = DirstateEntryReset { | ||||
filename, | ||||
wc_tracked: true, | ||||
p1_tracked: true, | ||||
p2_info: false, | ||||
has_meaningful_mtime, | ||||
parent_file_data_opt: Some(ParentFileData { | ||||
mode_size: Some(( | ||||
mode, | ||||
size.try_into().expect("invalid file size in manifest"), | ||||
)), | ||||
mtime: Some(mtime), | ||||
}), | ||||
from_empty: true, | ||||
}; | ||||
dirstate.reset_state(reset).map_err(|e| { | ||||
HgError::abort(e.to_string(), exit_codes::ABORT, None) | ||||
})?; | ||||
} | ||||
Ok(total) | ||||
} | ||||
/// Wait until the next update from the filesystem time by writing in a loop | ||||
/// a new temporary file inside the working directory and checking if its time | ||||
/// differs from the first one observed. | ||||
/// | ||||
/// Returns `None` if we are unable to get the filesystem time, | ||||
/// `Some(Err(timestamp))` if we've timed out waiting for the filesystem clock | ||||
/// to tick, and `Some(Ok(timestamp))` if we've waited successfully. | ||||
/// | ||||
/// On Linux, your average tick is going to be a "jiffy", or 1/HZ. | ||||
/// HZ is your kernel's tick rate (if it has one configured) and the value | ||||
/// is the one returned by `grep 'CONFIG_HZ=' /boot/config-$(uname -r)`, | ||||
/// again assuming a normal setup. | ||||
/// | ||||
/// In my case (Alphare) at the time of writing, I get `CONFIG_HZ=250`, | ||||
/// which equates to 4ms. | ||||
/// | ||||
/// This might change with a series that could make it to Linux 6.12: | ||||
/// https://lore.kernel.org/all/20241002-mgtime-v10-8-d1c4717f5284@kernel.org | ||||
fn wait_until_fs_tick( | ||||
working_directory_path: &Path, | ||||
) -> Option<Result<TruncatedTimestamp, TruncatedTimestamp>> { | ||||
let start = std::time::Instant::now(); | ||||
let old_fs_time = filesystem_now(working_directory_path).ok()?; | ||||
let mut fs_time = filesystem_now(working_directory_path).ok()?; | ||||
const FS_TICK_WAIT_TIMEOUT: Duration = Duration::from_millis(100); | ||||
while fs_time == old_fs_time { | ||||
if std::time::Instant::now() - start > FS_TICK_WAIT_TIMEOUT { | ||||
log::trace!( | ||||
"timed out waiting for the fs clock to tick after {:?}", | ||||
FS_TICK_WAIT_TIMEOUT | ||||
); | ||||
return Some(Err(TruncatedTimestamp::from(old_fs_time))); | ||||
} | ||||
fs_time = filesystem_now(working_directory_path).ok()?; | ||||
} | ||||
log::trace!( | ||||
"waited for {:?} before writing the dirstate", | ||||
fs_time.duration_since(old_fs_time) | ||||
); | ||||
Some(Ok(TruncatedTimestamp::from(fs_time))) | ||||
} | ||||
#[cfg(test)] | ||||
mod test { | ||||
use super::*; | ||||
use pretty_assertions::assert_eq; | ||||
#[test] | ||||
fn test_chunk_tracked_files() { | ||||
fn chunk(v: Vec<&'static str>) -> Vec<ExpandedManifestEntry> { | ||||
v.into_iter() | ||||
.map(|f| (HgPath::new(f.as_bytes()), NULL_NODE, None)) | ||||
.collect() | ||||
} | ||||
let p = HgPath::new; | ||||
let files = chunk(vec!["a"]); | ||||
let expected = vec![(p(""), chunk(vec!["a"]))]; | ||||
assert_eq!(chunk_tracked_files(files), expected); | ||||
let files = chunk(vec!["a", "b", "c"]); | ||||
let expected = vec![(p(""), chunk(vec!["a", "b", "c"]))]; | ||||
assert_eq!(chunk_tracked_files(files), expected); | ||||
let files = chunk(vec![ | ||||
"dir/a-new", | ||||
"dir/a/mut", | ||||
"dir/a/mut-mut", | ||||
"dir/albert", | ||||
"dir/b", | ||||
"dir/subdir/c", | ||||
"dir/subdir/d", | ||||
"file", | ||||
]); | ||||
let expected = vec![ | ||||
(p("dir"), chunk(vec!["dir/a-new"])), | ||||
(p("dir/a"), chunk(vec!["dir/a/mut", "dir/a/mut-mut"])), | ||||
(p("dir"), chunk(vec!["dir/albert", "dir/b"])), | ||||
(p("dir/subdir"), chunk(vec!["dir/subdir/c", "dir/subdir/d"])), | ||||
(p(""), chunk(vec!["file"])), | ||||
]; | ||||
assert_eq!(chunk_tracked_files(files), expected); | ||||
// Doesn't get split | ||||
let large_dir = vec![ | ||||
"1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12", | ||||
"13", "14", "15", "16", "17", "18", "19", "20", "21", "22", "23", | ||||
]; | ||||
let files = chunk(large_dir.clone()); | ||||
let expected = vec![(p(""), chunk(large_dir))]; | ||||
assert_eq!(chunk_tracked_files(files), expected); | ||||
} | ||||
} | ||||