##// END OF EJS Templates
hgweb: skip logging ConnectionAbortedError...
hgweb: skip logging ConnectionAbortedError Not stacktracing on `ConnectionResetError` was added in 6bbb12cba5a8 (though it was spelled differently for py2 support), but for some reason Windows occasionally triggers a `ConnectionAbortedError` here across various *.t files (notably `test-archive.t` and `test-lfs-serve-access.t`, but there are others). The payload that fails to send seems to be the html that describes the error to the client, so I suspect some code is seeing the error status code and closing the connection before the server gets to write this html. So don't log it, for test stability- nothing we can do anyway. FWIW, the CPython implementation of wsgihander specifically ignores these two errors, plus `BrokenPipeError`, with a comment that "we expect the client to close the connection abruptly from time to time"[1]. The `BrokenPipeError` is swallowed a level up in `do_write()`, and avoids writing the response following this stacktrace. I'm puzzled why a response is being written after these connection errors are detected- the CPython code referenced doesn't, and the connection is now broken at this point. Perhaps these errors should both be handled with the `BrokenPipeError` after the freeze. (The refactoring away from py2 compat may not be desireable in the freeze, but this is much easier to read, and obviously correct given the referenced CPython code.) I suspect this is what 6bceecb28806 was attempting to fix, but it wasn't specific about the sporadic errors it was seeing. [1] https://github.com/python/cpython/blob/b2eaa75b176e07730215d76d8dce4d63fb493391/Lib/wsgiref/handlers.py#L139

File last commit:

r52953:8b7123c8 default
r53050:891f6d56 stable
Show More
update.rs
491 lines | 18.1 KiB | application/rls-services+xml | RustLexer
//! Tools for moving the repository to a given revision
use std::{
fs::Permissions,
io::Write,
os::unix::fs::{MetadataExt, PermissionsExt},
path::Path,
time::Duration,
};
use crate::{
dirstate::{ParentFileData, TruncatedTimestamp},
dirstate_tree::{
dirstate_map::DirstateEntryReset, on_disk::write_tracked_key,
},
errors::{HgError, IoResultExt},
exit_codes,
filelog::Filelog,
narrow,
node::NULL_NODE,
operations::{list_rev_tracked_files, ExpandedManifestEntry},
progress::Progress,
repo::Repo,
sparse,
utils::{
files::{filesystem_now, get_path_from_bytes},
hg_path::{hg_path_to_path_buf, HgPath, HgPathError},
path_auditor::PathAuditor,
},
vfs::{is_on_nfs_mount, VfsImpl},
DirstateParents, RevlogError, RevlogOpenOptions, UncheckedRevision,
};
use crossbeam_channel::{Receiver, Sender};
use rayon::prelude::*;
fn write_dirstate(repo: &Repo) -> Result<(), HgError> {
repo.write_dirstate()
.map_err(|e| HgError::abort(e.to_string(), exit_codes::ABORT, None))?;
write_tracked_key(repo)
}
/// Update the current working copy of `repo` to the given revision `to`, from
/// the null revision and set + write out the dirstate to reflect that.
///
/// Do not call this outside of a Python context. This does *not* handle any
/// of the checks, hooks, lock taking needed to setup and get out of this
/// update from the null revision.
pub fn update_from_null(
repo: &Repo,
to: UncheckedRevision,
progress: &dyn Progress,
) -> Result<usize, HgError> {
// Ignore the warnings, they've been displayed by Python already
// TODO non-Python clients: display narrow warnings
let (narrow_matcher, _) = narrow::matcher(repo)?;
let files_for_rev = list_rev_tracked_files(repo, to, narrow_matcher)
.map_err(handle_revlog_error)?;
repo.manually_set_parents(DirstateParents {
p1: repo.node(to).expect("update target should exist"),
p2: NULL_NODE,
})?;
// Filter the working copy according to the sparse spec
let tracked_files: Result<Vec<_>, _> = if !repo.has_sparse() {
files_for_rev.iter().collect()
} else {
// Ignore the warnings, they've been displayed by Python already
// TODO non-Python clients: display sparse warnings
let (sparse_matcher, _) = sparse::matcher(repo)?;
files_for_rev
.iter()
.filter(|f| {
match f {
Ok(f) => sparse_matcher.matches(f.0),
Err(_) => true, // Errors stop the update, include them
}
})
.collect()
};
let tracked_files = tracked_files?;
if tracked_files.is_empty() {
// Still write the dirstate because we might not be in the null
// revision.
// This can happen in narrow repos where all paths are excluded in
// this revision.
write_dirstate(repo)?;
return Ok(0);
}
let store_vfs = &repo.store_vfs();
let options = repo.default_revlog_options(crate::RevlogType::Filelog)?;
let (errors_sender, errors_receiver) = crossbeam_channel::unbounded();
let (files_sender, files_receiver) = crossbeam_channel::unbounded();
let working_directory_path = &repo.working_directory_path();
let files_count = tracked_files.len();
let chunks = chunk_tracked_files(tracked_files);
progress.update(0, Some(files_count as u64));
create_working_copy(
chunks,
working_directory_path,
store_vfs,
options,
files_sender,
errors_sender,
progress,
);
let errors: Vec<HgError> = errors_receiver.iter().collect();
if !errors.is_empty() {
log::debug!("{} errors during update (see trace logs)", errors.len());
for error in errors.iter() {
log::trace!("{}", error);
}
// Best we can do is raise the first error (in order of the channel)
return Err(errors.into_iter().next().expect("can never be empty"));
}
// TODO try to run this concurrently to update the dirstate while we're
// still writing out the working copy to see if that improves performance.
let total = update_dirstate(repo, files_receiver)?;
write_dirstate(repo)?;
Ok(total)
}
fn handle_revlog_error(e: RevlogError) -> HgError {
match e {
crate::RevlogError::Other(hg_error) => hg_error,
e => HgError::abort(
format!("revlog error: {}", e),
exit_codes::ABORT,
None,
),
}
}
/// Preallocated size of Vec holding directory contents. This aims at
/// preventing the need for re-allocating the Vec in most cases.
///
/// The value is arbitrarily picked as a little over an average number of files
/// per directory done by looking at a few larger open-source repos.
/// Most of the runtime is IO anyway, so this doesn't matter too much.
const FILES_PER_DIRECTORY: usize = 16;
/// Chunk files per directory prefix, so almost every directory is handled
/// in a separate thread, which works around the FS inode mutex.
/// Chunking less (and doing approximately `files_count`/`threads`) actually
/// ends up being less performant: my hypothesis is `rayon`'s work stealing
/// being more efficient with tasks of varying lengths.
#[logging_timer::time("trace")]
fn chunk_tracked_files(
tracked_files: Vec<ExpandedManifestEntry>,
) -> Vec<(&HgPath, Vec<ExpandedManifestEntry>)> {
let files_count = tracked_files.len();
let mut chunks = Vec::with_capacity(files_count / FILES_PER_DIRECTORY);
let mut current_chunk = Vec::with_capacity(FILES_PER_DIRECTORY);
let mut last_directory = tracked_files[0].0.parent();
for file_info in tracked_files {
let current_directory = file_info.0.parent();
let different_directory = current_directory != last_directory;
if different_directory {
chunks.push((last_directory, current_chunk));
current_chunk = Vec::with_capacity(FILES_PER_DIRECTORY);
}
current_chunk.push(file_info);
last_directory = current_directory;
}
chunks.push((last_directory, current_chunk));
chunks
}
#[logging_timer::time("trace")]
fn create_working_copy<'a: 'b, 'b>(
chunks: Vec<(&HgPath, Vec<ExpandedManifestEntry<'a>>)>,
working_directory_path: &Path,
store_vfs: &VfsImpl,
options: RevlogOpenOptions,
files_sender: Sender<(&'b HgPath, u32, usize, TruncatedTimestamp)>,
error_sender: Sender<HgError>,
progress: &dyn Progress,
) {
let auditor = PathAuditor::new(working_directory_path);
chunks.into_par_iter().for_each(|(dir_path, chunk)| {
if let Err(e) = working_copy_worker(
dir_path,
chunk,
working_directory_path,
store_vfs,
options,
&files_sender,
progress,
&auditor,
) {
error_sender
.send(e)
.expect("channel should not be disconnected")
}
});
}
/// Represents a work unit for a single thread, responsible for this set of
/// files and restoring them to the working copy.
#[allow(clippy::too_many_arguments)]
fn working_copy_worker<'a: 'b, 'b>(
dir_path: &HgPath,
chunk: Vec<ExpandedManifestEntry<'a>>,
working_directory_path: &Path,
store_vfs: &VfsImpl,
options: RevlogOpenOptions,
files_sender: &Sender<(&'b HgPath, u32, usize, TruncatedTimestamp)>,
progress: &dyn Progress,
auditor: &PathAuditor,
) -> Result<(), HgError> {
let dir_path =
hg_path_to_path_buf(dir_path).expect("invalid path in manifest");
let dir_path = working_directory_path.join(dir_path);
std::fs::create_dir_all(&dir_path).when_writing_file(&dir_path)?;
for (file, file_node, flags) in chunk {
auditor.audit_path(file)?;
let flags = flags.map(|f| f.into());
let path =
working_directory_path.join(get_path_from_bytes(file.as_bytes()));
// Treemanifest is not supported
assert!(flags != Some(b't'));
let filelog = Filelog::open_vfs(store_vfs, file, options)?;
let filelog_revision_data = &filelog
.data_for_node(file_node)
.map_err(handle_revlog_error)?;
let file_data = filelog_revision_data.file_data()?;
if flags == Some(b'l') {
let target = get_path_from_bytes(file_data);
if let Err(e) = std::os::unix::fs::symlink(target, &path) {
// If the path already exists either:
// - another process created this file while ignoring the
// lock => error
// - our check for the fast path is incorrect => error
// - this is a malicious repo/bundle and this is symlink that
// tries to write things where it shouldn't be able to.
match e.kind() {
std::io::ErrorKind::AlreadyExists => {
let metadata = std::fs::symlink_metadata(&path)
.when_reading_file(&path)?;
if metadata.is_dir() {
return Err(HgError::Path(
HgPathError::TraversesSymbolicLink {
// Technically it should be one of the
// children, but good enough
path: file
.join(HgPath::new(b"*"))
.to_owned(),
symlink: file.to_owned(),
},
));
}
return Err(e).when_writing_file(&path);
}
_ => return Err(e).when_writing_file(&path),
}
}
} else {
let mut f =
std::fs::File::create(&path).when_writing_file(&path)?;
f.write_all(file_data).when_writing_file(&path)?;
}
if flags == Some(b'x') {
std::fs::set_permissions(&path, Permissions::from_mode(0o755))
.when_writing_file(&path)?;
}
let metadata =
std::fs::symlink_metadata(&path).when_reading_file(&path)?;
let mode = metadata.mode();
files_sender
.send((
file,
mode,
file_data.len(),
TruncatedTimestamp::for_mtime_of(&metadata)
.when_reading_file(&path)?,
))
.expect("channel should not be closed");
progress.increment(1, None);
}
Ok(())
}
#[logging_timer::time("trace")]
fn update_dirstate(
repo: &Repo,
files_receiver: Receiver<(&HgPath, u32, usize, TruncatedTimestamp)>,
) -> Result<usize, HgError> {
let mut dirstate = repo
.dirstate_map_mut()
.map_err(|e| HgError::abort(e.to_string(), exit_codes::ABORT, None))?;
// (see the comments in `filter_ambiguous_files` in `merge.py` for more)
// It turns out that (on Linux at least) the filesystem resolution time
// for most filesystems is based on the HZ kernel config. Their internal
// clocks do return nanoseconds if the hardware clock is precise enough,
// which should be the case on most recent computers but are only updated
// every few milliseconds at best (every "jiffy").
//
// We are still not concerned with fixing the race with other
// processes that might modify the working copy right after it was created
// within the same tick, because it is impossible to catch.
// However, we might as well not race with operations that could run right
// after this one, especially other Mercurial operations that could be
// waiting for the wlock to change file contents and the dirstate.
//
// Thus: wait until the filesystem clock has ticked to filter ambiguous
// entries and write the dirstate, but only for dirstate-v2, since v1 only
// has second-level granularity and waiting for a whole second is too much
// of a penalty in the general case.
// Although we're assuming that people running dirstate-v2 on Linux
// don't have a second-granularity FS (with the exclusion of NFS), users
// can be surprising, and at some point in the future dirstate-v2 will
// become the default. To that end, we limit the wait time to 100ms and
// fall back to the filter method in case of a timeout.
//
// +------------+------+--------------+
// | version | wait | filter level |
// +------------+------+--------------+
// | V1 | No | Second |
// | V2 | Yes | Nanosecond |
// | V2-slow-fs | No | Second |
// +------------+------+--------------+
let dirstate_v2 = repo.use_dirstate_v2();
// Let's ignore NFS right off the bat
let mut fast_enough_fs = !is_on_nfs_mount(repo.working_directory_path());
let fs_time_now = if dirstate_v2 && fast_enough_fs {
match wait_until_fs_tick(repo.working_directory_path()) {
None => None,
Some(Ok(time)) => Some(time),
Some(Err(time)) => {
fast_enough_fs = false;
Some(time)
}
}
} else {
filesystem_now(repo.working_directory_path())
.ok()
.map(TruncatedTimestamp::from)
};
let mut total = 0;
for (filename, mode, size, mtime) in files_receiver.into_iter() {
total += 1;
// When using dirstate-v2 on a filesystem with reasonable performance
// this is basically always true unless you get a mtime from the
// far future.
let has_meaningful_mtime = if let Some(fs_time) = fs_time_now {
mtime.for_reliable_mtime_of_self(&fs_time).is_some_and(|t| {
// Dirstate-v1 only has second-level information
!t.second_ambiguous || dirstate_v2 && fast_enough_fs
})
} else {
// We somehow failed to write to the filesystem, so don't store
// the cache information.
false
};
let reset = DirstateEntryReset {
filename,
wc_tracked: true,
p1_tracked: true,
p2_info: false,
has_meaningful_mtime,
parent_file_data_opt: Some(ParentFileData {
mode_size: Some((
mode,
size.try_into().expect("invalid file size in manifest"),
)),
mtime: Some(mtime),
}),
from_empty: true,
};
dirstate.reset_state(reset).map_err(|e| {
HgError::abort(e.to_string(), exit_codes::ABORT, None)
})?;
}
Ok(total)
}
/// Wait until the next update from the filesystem time by writing in a loop
/// a new temporary file inside the working directory and checking if its time
/// differs from the first one observed.
///
/// Returns `None` if we are unable to get the filesystem time,
/// `Some(Err(timestamp))` if we've timed out waiting for the filesystem clock
/// to tick, and `Some(Ok(timestamp))` if we've waited successfully.
///
/// On Linux, your average tick is going to be a "jiffy", or 1/HZ.
/// HZ is your kernel's tick rate (if it has one configured) and the value
/// is the one returned by `grep 'CONFIG_HZ=' /boot/config-$(uname -r)`,
/// again assuming a normal setup.
///
/// In my case (Alphare) at the time of writing, I get `CONFIG_HZ=250`,
/// which equates to 4ms.
///
/// This might change with a series that could make it to Linux 6.12:
/// https://lore.kernel.org/all/20241002-mgtime-v10-8-d1c4717f5284@kernel.org
fn wait_until_fs_tick(
working_directory_path: &Path,
) -> Option<Result<TruncatedTimestamp, TruncatedTimestamp>> {
let start = std::time::Instant::now();
let old_fs_time = filesystem_now(working_directory_path).ok()?;
let mut fs_time = filesystem_now(working_directory_path).ok()?;
const FS_TICK_WAIT_TIMEOUT: Duration = Duration::from_millis(100);
while fs_time == old_fs_time {
if std::time::Instant::now() - start > FS_TICK_WAIT_TIMEOUT {
log::trace!(
"timed out waiting for the fs clock to tick after {:?}",
FS_TICK_WAIT_TIMEOUT
);
return Some(Err(TruncatedTimestamp::from(old_fs_time)));
}
fs_time = filesystem_now(working_directory_path).ok()?;
}
log::trace!(
"waited for {:?} before writing the dirstate",
fs_time.duration_since(old_fs_time)
);
Some(Ok(TruncatedTimestamp::from(fs_time)))
}
#[cfg(test)]
mod test {
use super::*;
use pretty_assertions::assert_eq;
#[test]
fn test_chunk_tracked_files() {
fn chunk(v: Vec<&'static str>) -> Vec<ExpandedManifestEntry> {
v.into_iter()
.map(|f| (HgPath::new(f.as_bytes()), NULL_NODE, None))
.collect()
}
let p = HgPath::new;
let files = chunk(vec!["a"]);
let expected = vec![(p(""), chunk(vec!["a"]))];
assert_eq!(chunk_tracked_files(files), expected);
let files = chunk(vec!["a", "b", "c"]);
let expected = vec![(p(""), chunk(vec!["a", "b", "c"]))];
assert_eq!(chunk_tracked_files(files), expected);
let files = chunk(vec![
"dir/a-new",
"dir/a/mut",
"dir/a/mut-mut",
"dir/albert",
"dir/b",
"dir/subdir/c",
"dir/subdir/d",
"file",
]);
let expected = vec![
(p("dir"), chunk(vec!["dir/a-new"])),
(p("dir/a"), chunk(vec!["dir/a/mut", "dir/a/mut-mut"])),
(p("dir"), chunk(vec!["dir/albert", "dir/b"])),
(p("dir/subdir"), chunk(vec!["dir/subdir/c", "dir/subdir/d"])),
(p(""), chunk(vec!["file"])),
];
assert_eq!(chunk_tracked_files(files), expected);
// Doesn't get split
let large_dir = vec![
"1", "2", "3", "4", "5", "6", "7", "8", "9", "10", "11", "12",
"13", "14", "15", "16", "17", "18", "19", "20", "21", "22", "23",
];
let files = chunk(large_dir.clone());
let expected = vec![(p(""), chunk(large_dir))];
assert_eq!(chunk_tracked_files(files), expected);
}
}