##// END OF EJS Templates
copies-rust: move CPU-heavy Rust processing into a child thread...
Simon Sapin -
r47330:47557ea7 default
parent child Browse files
Show More
@@ -700,6 +700,11 b' coreconfigitem('
700 )
700 )
701 coreconfigitem(
701 coreconfigitem(
702 b'devel',
702 b'devel',
703 b'copy-tracing.multi-thread',
704 default=True,
705 )
706 coreconfigitem(
707 b'devel',
703 b'debug.extensions',
708 b'debug.extensions',
704 default=False,
709 default=False,
705 )
710 )
@@ -274,6 +274,7 b' def _changesetforwardcopies(a, b, match)'
274 revs = cl.findmissingrevs(common=[a.rev()], heads=[b.rev()])
274 revs = cl.findmissingrevs(common=[a.rev()], heads=[b.rev()])
275 roots = set()
275 roots = set()
276 has_graph_roots = False
276 has_graph_roots = False
277 multi_thread = repo.ui.configbool(b'devel', b'copy-tracing.multi-thread')
277
278
278 # iterate over `only(B, A)`
279 # iterate over `only(B, A)`
279 for r in revs:
280 for r in revs:
@@ -321,7 +322,13 b' def _changesetforwardcopies(a, b, match)'
321 children_count[p] += 1
322 children_count[p] += 1
322 revinfo = _revinfo_getter(repo, match)
323 revinfo = _revinfo_getter(repo, match)
323 return _combine_changeset_copies(
324 return _combine_changeset_copies(
324 revs, children_count, b.rev(), revinfo, match, isancestor
325 revs,
326 children_count,
327 b.rev(),
328 revinfo,
329 match,
330 isancestor,
331 multi_thread,
325 )
332 )
326 else:
333 else:
327 # When not using side-data, we will process the edges "from" the parent.
334 # When not using side-data, we will process the edges "from" the parent.
@@ -346,7 +353,7 b' def _changesetforwardcopies(a, b, match)'
346
353
347
354
348 def _combine_changeset_copies(
355 def _combine_changeset_copies(
349 revs, children_count, targetrev, revinfo, match, isancestor
356 revs, children_count, targetrev, revinfo, match, isancestor, multi_thread
350 ):
357 ):
351 """combine the copies information for each item of iterrevs
358 """combine the copies information for each item of iterrevs
352
359
@@ -363,7 +370,7 b' def _combine_changeset_copies('
363
370
364 if rustmod is not None:
371 if rustmod is not None:
365 final_copies = rustmod.combine_changeset_copies(
372 final_copies = rustmod.combine_changeset_copies(
366 list(revs), children_count, targetrev, revinfo
373 list(revs), children_count, targetrev, revinfo, multi_thread
367 )
374 )
368 else:
375 else:
369 isancestor = cached_is_ancestor(isancestor)
376 isancestor = cached_is_ancestor(isancestor)
@@ -331,6 +331,7 b' name = "hg-cpython"'
331 version = "0.1.0"
331 version = "0.1.0"
332 dependencies = [
332 dependencies = [
333 "cpython 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
333 "cpython 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)",
334 "crossbeam-channel 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)",
334 "env_logger 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
335 "env_logger 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
335 "hg-core 0.1.0",
336 "hg-core 0.1.0",
336 "libc 0.2.81 (registry+https://github.com/rust-lang/crates.io-index)",
337 "libc 0.2.81 (registry+https://github.com/rust-lang/crates.io-index)",
@@ -22,6 +22,7 b' python27-bin = ["cpython/python27-sys"]'
22 python3-bin = ["cpython/python3-sys"]
22 python3-bin = ["cpython/python3-sys"]
23
23
24 [dependencies]
24 [dependencies]
25 crossbeam-channel = "0.4"
25 hg-core = { path = "../hg-core"}
26 hg-core = { path = "../hg-core"}
26 libc = '*'
27 libc = '*'
27 log = "0.4.8"
28 log = "0.4.8"
@@ -22,6 +22,7 b' pub fn combine_changeset_copies_wrapper('
22 children_count: PyDict,
22 children_count: PyDict,
23 target_rev: Revision,
23 target_rev: Revision,
24 rev_info: PyObject,
24 rev_info: PyObject,
25 multi_thread: bool,
25 ) -> PyResult<PyDict> {
26 ) -> PyResult<PyDict> {
26 let children_count = children_count
27 let children_count = children_count
27 .items(py)
28 .items(py)
@@ -42,20 +43,81 b' pub fn combine_changeset_copies_wrapper('
42 Ok((rev, p1, p2, opt_bytes))
43 Ok((rev, p1, p2, opt_bytes))
43 });
44 });
44
45
45 let mut combine_changeset_copies =
46 let path_copies = if !multi_thread {
46 CombineChangesetCopies::new(children_count);
47 let mut combine_changeset_copies =
48 CombineChangesetCopies::new(children_count);
49
50 for rev_info in revs_info {
51 let (rev, p1, p2, opt_bytes) = rev_info?;
52 let files = match &opt_bytes {
53 Some(bytes) => ChangedFiles::new(bytes.data(py)),
54 // Python None was extracted to Option::None,
55 // meaning there was no copy data.
56 None => ChangedFiles::new_empty(),
57 };
58
59 combine_changeset_copies.add_revision(rev, p1, p2, files)
60 }
61 combine_changeset_copies.finish(target_rev)
62 } else {
63 // Use a bounded channel to provide back-pressure:
64 // if the child thread is slower to process revisions than this thread
65 // is to gather data for them, an unbounded channel would keep
66 // growing and eat memory.
67 //
68 // TODO: tweak the bound?
69 let (rev_info_sender, rev_info_receiver) =
70 crossbeam_channel::bounded::<RevInfo>(1000);
47
71
48 for rev_info in revs_info {
72 // Start a thread that does CPU-heavy processing in parallel with the
49 let (rev, p1, p2, opt_bytes) = rev_info?;
73 // loop below.
50 let files = match &opt_bytes {
74 //
51 Some(bytes) => ChangedFiles::new(bytes.data(py)),
75 // If the parent thread panics, `rev_info_sender` will be dropped and
52 // value was presumably None, meaning they was no copy data.
76 // “disconnected”. `rev_info_receiver` will be notified of this and
53 None => ChangedFiles::new_empty(),
77 // exit its own loop.
54 };
78 let thread = std::thread::spawn(move || {
79 let mut combine_changeset_copies =
80 CombineChangesetCopies::new(children_count);
81 for (rev, p1, p2, opt_bytes) in rev_info_receiver {
82 let gil = Python::acquire_gil();
83 let py = gil.python();
84 let files = match &opt_bytes {
85 Some(raw) => ChangedFiles::new(raw.data(py)),
86 // Python None was extracted to Option::None,
87 // meaning there was no copy data.
88 None => ChangedFiles::new_empty(),
89 };
90 combine_changeset_copies.add_revision(rev, p1, p2, files)
91 }
92
93 combine_changeset_copies.finish(target_rev)
94 });
55
95
56 combine_changeset_copies.add_revision(rev, p1, p2, files)
96 for rev_info in revs_info {
57 }
97 let (rev, p1, p2, opt_bytes) = rev_info?;
58 let path_copies = combine_changeset_copies.finish(target_rev);
98
99 // We’d prefer to avoid the child thread calling into Python code,
100 // but this avoids a potential deadlock on the GIL if it does:
101 py.allow_threads(|| {
102 rev_info_sender.send((rev, p1, p2, opt_bytes)).expect(
103 "combine_changeset_copies: channel is disconnected",
104 );
105 });
106 }
107 // We’d prefer to avoid the child thread calling into Python code,
108 // but this avoids a potential deadlock on the GIL if it does:
109 py.allow_threads(|| {
110 // Disconnect the channel to signal the child thread to stop:
111 // the `for … in rev_info_receiver` loop will end.
112 drop(rev_info_sender);
113
114 // Wait for the child thread to stop, and propagate any panic.
115 thread.join().unwrap_or_else(|panic_payload| {
116 std::panic::resume_unwind(panic_payload)
117 })
118 })
119 };
120
59 let out = PyDict::new(py);
121 let out = PyDict::new(py);
60 for (dest, source) in path_copies.into_iter() {
122 for (dest, source) in path_copies.into_iter() {
61 out.set_item(
123 out.set_item(
@@ -84,7 +146,8 b' pub fn init_module(py: Python, package: '
84 revs: PyList,
146 revs: PyList,
85 children: PyDict,
147 children: PyDict,
86 target_rev: Revision,
148 target_rev: Revision,
87 rev_info: PyObject
149 rev_info: PyObject,
150 multi_thread: bool
88 )
151 )
89 ),
152 ),
90 )?;
153 )?;
General Comments 0
You need to be logged in to leave comments. Login now