# HG changeset patch # User Simon Sapin # Date 2021-01-06 13:09:01 # Node ID 47557ea79fc77fa6ac7d4aa2888a0b84a601300c # Parent cb4b0b0c6de4b8d87f62e0a265628d003cd1c10c copies-rust: move CPU-heavy Rust processing into a child thread … that runs in parallel with the parent thread fetching data. This can be disabled through a new config. CLI example: hg --config=devel.copy-tracing.multi-thread=no For now both threads use the GIL, later commits will reduce this. Differential Revision: https://phab.mercurial-scm.org/D9684 diff --git a/mercurial/configitems.py b/mercurial/configitems.py --- a/mercurial/configitems.py +++ b/mercurial/configitems.py @@ -700,6 +700,11 @@ coreconfigitem( ) coreconfigitem( b'devel', + b'copy-tracing.multi-thread', + default=True, +) +coreconfigitem( + b'devel', b'debug.extensions', default=False, ) diff --git a/mercurial/copies.py b/mercurial/copies.py --- a/mercurial/copies.py +++ b/mercurial/copies.py @@ -274,6 +274,7 @@ def _changesetforwardcopies(a, b, match) revs = cl.findmissingrevs(common=[a.rev()], heads=[b.rev()]) roots = set() has_graph_roots = False + multi_thread = repo.ui.configbool(b'devel', b'copy-tracing.multi-thread') # iterate over `only(B, A)` for r in revs: @@ -321,7 +322,13 @@ def _changesetforwardcopies(a, b, match) children_count[p] += 1 revinfo = _revinfo_getter(repo, match) return _combine_changeset_copies( - revs, children_count, b.rev(), revinfo, match, isancestor + revs, + children_count, + b.rev(), + revinfo, + match, + isancestor, + multi_thread, ) else: # When not using side-data, we will process the edges "from" the parent. @@ -346,7 +353,7 @@ def _changesetforwardcopies(a, b, match) def _combine_changeset_copies( - revs, children_count, targetrev, revinfo, match, isancestor + revs, children_count, targetrev, revinfo, match, isancestor, multi_thread ): """combine the copies information for each item of iterrevs @@ -363,7 +370,7 @@ def _combine_changeset_copies( if rustmod is not None: final_copies = rustmod.combine_changeset_copies( - list(revs), children_count, targetrev, revinfo + list(revs), children_count, targetrev, revinfo, multi_thread ) else: isancestor = cached_is_ancestor(isancestor) diff --git a/rust/Cargo.lock b/rust/Cargo.lock --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -331,6 +331,7 @@ name = "hg-cpython" version = "0.1.0" dependencies = [ "cpython 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", + "crossbeam-channel 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)", "env_logger 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "hg-core 0.1.0", "libc 0.2.81 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/rust/hg-cpython/Cargo.toml b/rust/hg-cpython/Cargo.toml --- a/rust/hg-cpython/Cargo.toml +++ b/rust/hg-cpython/Cargo.toml @@ -22,6 +22,7 @@ python27-bin = ["cpython/python27-sys"] python3-bin = ["cpython/python3-sys"] [dependencies] +crossbeam-channel = "0.4" hg-core = { path = "../hg-core"} libc = '*' log = "0.4.8" diff --git a/rust/hg-cpython/src/copy_tracing.rs b/rust/hg-cpython/src/copy_tracing.rs --- a/rust/hg-cpython/src/copy_tracing.rs +++ b/rust/hg-cpython/src/copy_tracing.rs @@ -22,6 +22,7 @@ pub fn combine_changeset_copies_wrapper( children_count: PyDict, target_rev: Revision, rev_info: PyObject, + multi_thread: bool, ) -> PyResult { let children_count = children_count .items(py) @@ -42,20 +43,81 @@ pub fn combine_changeset_copies_wrapper( Ok((rev, p1, p2, opt_bytes)) }); - let mut combine_changeset_copies = - CombineChangesetCopies::new(children_count); + let path_copies = if !multi_thread { + let mut combine_changeset_copies = + CombineChangesetCopies::new(children_count); + + for rev_info in revs_info { + let (rev, p1, p2, opt_bytes) = rev_info?; + let files = match &opt_bytes { + Some(bytes) => ChangedFiles::new(bytes.data(py)), + // Python None was extracted to Option::None, + // meaning there was no copy data. + None => ChangedFiles::new_empty(), + }; + + combine_changeset_copies.add_revision(rev, p1, p2, files) + } + combine_changeset_copies.finish(target_rev) + } else { + // Use a bounded channel to provide back-pressure: + // if the child thread is slower to process revisions than this thread + // is to gather data for them, an unbounded channel would keep + // growing and eat memory. + // + // TODO: tweak the bound? + let (rev_info_sender, rev_info_receiver) = + crossbeam_channel::bounded::(1000); - for rev_info in revs_info { - let (rev, p1, p2, opt_bytes) = rev_info?; - let files = match &opt_bytes { - Some(bytes) => ChangedFiles::new(bytes.data(py)), - // value was presumably None, meaning they was no copy data. - None => ChangedFiles::new_empty(), - }; + // Start a thread that does CPU-heavy processing in parallel with the + // loop below. + // + // If the parent thread panics, `rev_info_sender` will be dropped and + // “disconnected”. `rev_info_receiver` will be notified of this and + // exit its own loop. + let thread = std::thread::spawn(move || { + let mut combine_changeset_copies = + CombineChangesetCopies::new(children_count); + for (rev, p1, p2, opt_bytes) in rev_info_receiver { + let gil = Python::acquire_gil(); + let py = gil.python(); + let files = match &opt_bytes { + Some(raw) => ChangedFiles::new(raw.data(py)), + // Python None was extracted to Option::None, + // meaning there was no copy data. + None => ChangedFiles::new_empty(), + }; + combine_changeset_copies.add_revision(rev, p1, p2, files) + } + + combine_changeset_copies.finish(target_rev) + }); - combine_changeset_copies.add_revision(rev, p1, p2, files) - } - let path_copies = combine_changeset_copies.finish(target_rev); + for rev_info in revs_info { + let (rev, p1, p2, opt_bytes) = rev_info?; + + // We’d prefer to avoid the child thread calling into Python code, + // but this avoids a potential deadlock on the GIL if it does: + py.allow_threads(|| { + rev_info_sender.send((rev, p1, p2, opt_bytes)).expect( + "combine_changeset_copies: channel is disconnected", + ); + }); + } + // We’d prefer to avoid the child thread calling into Python code, + // but this avoids a potential deadlock on the GIL if it does: + py.allow_threads(|| { + // Disconnect the channel to signal the child thread to stop: + // the `for … in rev_info_receiver` loop will end. + drop(rev_info_sender); + + // Wait for the child thread to stop, and propagate any panic. + thread.join().unwrap_or_else(|panic_payload| { + std::panic::resume_unwind(panic_payload) + }) + }) + }; + let out = PyDict::new(py); for (dest, source) in path_copies.into_iter() { out.set_item( @@ -84,7 +146,8 @@ pub fn init_module(py: Python, package: revs: PyList, children: PyDict, target_rev: Revision, - rev_info: PyObject + rev_info: PyObject, + multi_thread: bool ) ), )?;