Show More
@@ -700,6 +700,11 b' coreconfigitem(' | |||
|
700 | 700 | ) |
|
701 | 701 | coreconfigitem( |
|
702 | 702 | b'devel', |
|
703 | b'copy-tracing.multi-thread', | |
|
704 | default=True, | |
|
705 | ) | |
|
706 | coreconfigitem( | |
|
707 | b'devel', | |
|
703 | 708 | b'debug.extensions', |
|
704 | 709 | default=False, |
|
705 | 710 | ) |
@@ -274,6 +274,7 b' def _changesetforwardcopies(a, b, match)' | |||
|
274 | 274 | revs = cl.findmissingrevs(common=[a.rev()], heads=[b.rev()]) |
|
275 | 275 | roots = set() |
|
276 | 276 | has_graph_roots = False |
|
277 | multi_thread = repo.ui.configbool(b'devel', b'copy-tracing.multi-thread') | |
|
277 | 278 | |
|
278 | 279 | # iterate over `only(B, A)` |
|
279 | 280 | for r in revs: |
@@ -321,7 +322,13 b' def _changesetforwardcopies(a, b, match)' | |||
|
321 | 322 | children_count[p] += 1 |
|
322 | 323 | revinfo = _revinfo_getter(repo, match) |
|
323 | 324 | return _combine_changeset_copies( |
|
324 | revs, children_count, b.rev(), revinfo, match, isancestor | |
|
325 | revs, | |
|
326 | children_count, | |
|
327 | b.rev(), | |
|
328 | revinfo, | |
|
329 | match, | |
|
330 | isancestor, | |
|
331 | multi_thread, | |
|
325 | 332 | ) |
|
326 | 333 | else: |
|
327 | 334 | # When not using side-data, we will process the edges "from" the parent. |
@@ -346,7 +353,7 b' def _changesetforwardcopies(a, b, match)' | |||
|
346 | 353 | |
|
347 | 354 | |
|
348 | 355 | def _combine_changeset_copies( |
|
349 | revs, children_count, targetrev, revinfo, match, isancestor | |
|
356 | revs, children_count, targetrev, revinfo, match, isancestor, multi_thread | |
|
350 | 357 | ): |
|
351 | 358 | """combine the copies information for each item of iterrevs |
|
352 | 359 | |
@@ -363,7 +370,7 b' def _combine_changeset_copies(' | |||
|
363 | 370 | |
|
364 | 371 | if rustmod is not None: |
|
365 | 372 | final_copies = rustmod.combine_changeset_copies( |
|
366 | list(revs), children_count, targetrev, revinfo | |
|
373 | list(revs), children_count, targetrev, revinfo, multi_thread | |
|
367 | 374 | ) |
|
368 | 375 | else: |
|
369 | 376 | isancestor = cached_is_ancestor(isancestor) |
@@ -331,6 +331,7 b' name = "hg-cpython"' | |||
|
331 | 331 | version = "0.1.0" |
|
332 | 332 | dependencies = [ |
|
333 | 333 | "cpython 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", |
|
334 | "crossbeam-channel 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)", | |
|
334 | 335 | "env_logger 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", |
|
335 | 336 | "hg-core 0.1.0", |
|
336 | 337 | "libc 0.2.81 (registry+https://github.com/rust-lang/crates.io-index)", |
@@ -22,6 +22,7 b' python27-bin = ["cpython/python27-sys"]' | |||
|
22 | 22 | python3-bin = ["cpython/python3-sys"] |
|
23 | 23 | |
|
24 | 24 | [dependencies] |
|
25 | crossbeam-channel = "0.4" | |
|
25 | 26 | hg-core = { path = "../hg-core"} |
|
26 | 27 | libc = '*' |
|
27 | 28 | log = "0.4.8" |
@@ -22,6 +22,7 b' pub fn combine_changeset_copies_wrapper(' | |||
|
22 | 22 | children_count: PyDict, |
|
23 | 23 | target_rev: Revision, |
|
24 | 24 | rev_info: PyObject, |
|
25 | multi_thread: bool, | |
|
25 | 26 | ) -> PyResult<PyDict> { |
|
26 | 27 | let children_count = children_count |
|
27 | 28 | .items(py) |
@@ -42,20 +43,81 b' pub fn combine_changeset_copies_wrapper(' | |||
|
42 | 43 | Ok((rev, p1, p2, opt_bytes)) |
|
43 | 44 | }); |
|
44 | 45 | |
|
45 | let mut combine_changeset_copies = | |
|
46 | CombineChangesetCopies::new(children_count); | |
|
46 | let path_copies = if !multi_thread { | |
|
47 | let mut combine_changeset_copies = | |
|
48 | CombineChangesetCopies::new(children_count); | |
|
49 | ||
|
50 | for rev_info in revs_info { | |
|
51 | let (rev, p1, p2, opt_bytes) = rev_info?; | |
|
52 | let files = match &opt_bytes { | |
|
53 | Some(bytes) => ChangedFiles::new(bytes.data(py)), | |
|
54 | // Python None was extracted to Option::None, | |
|
55 | // meaning there was no copy data. | |
|
56 | None => ChangedFiles::new_empty(), | |
|
57 | }; | |
|
58 | ||
|
59 | combine_changeset_copies.add_revision(rev, p1, p2, files) | |
|
60 | } | |
|
61 | combine_changeset_copies.finish(target_rev) | |
|
62 | } else { | |
|
63 | // Use a bounded channel to provide back-pressure: | |
|
64 | // if the child thread is slower to process revisions than this thread | |
|
65 | // is to gather data for them, an unbounded channel would keep | |
|
66 | // growing and eat memory. | |
|
67 | // | |
|
68 | // TODO: tweak the bound? | |
|
69 | let (rev_info_sender, rev_info_receiver) = | |
|
70 | crossbeam_channel::bounded::<RevInfo>(1000); | |
|
47 | 71 | |
|
48 | for rev_info in revs_info { | |
|
49 | let (rev, p1, p2, opt_bytes) = rev_info?; | |
|
50 | let files = match &opt_bytes { | |
|
51 | Some(bytes) => ChangedFiles::new(bytes.data(py)), | |
|
52 | // value was presumably None, meaning they was no copy data. | |
|
53 | None => ChangedFiles::new_empty(), | |
|
54 | }; | |
|
72 | // Start a thread that does CPU-heavy processing in parallel with the | |
|
73 | // loop below. | |
|
74 | // | |
|
75 | // If the parent thread panics, `rev_info_sender` will be dropped and | |
|
76 | // “disconnected”. `rev_info_receiver` will be notified of this and | |
|
77 | // exit its own loop. | |
|
78 | let thread = std::thread::spawn(move || { | |
|
79 | let mut combine_changeset_copies = | |
|
80 | CombineChangesetCopies::new(children_count); | |
|
81 | for (rev, p1, p2, opt_bytes) in rev_info_receiver { | |
|
82 | let gil = Python::acquire_gil(); | |
|
83 | let py = gil.python(); | |
|
84 | let files = match &opt_bytes { | |
|
85 | Some(raw) => ChangedFiles::new(raw.data(py)), | |
|
86 | // Python None was extracted to Option::None, | |
|
87 | // meaning there was no copy data. | |
|
88 | None => ChangedFiles::new_empty(), | |
|
89 | }; | |
|
90 | combine_changeset_copies.add_revision(rev, p1, p2, files) | |
|
91 | } | |
|
92 | ||
|
93 | combine_changeset_copies.finish(target_rev) | |
|
94 | }); | |
|
55 | 95 | |
|
56 | combine_changeset_copies.add_revision(rev, p1, p2, files) | |
|
57 | } | |
|
58 | let path_copies = combine_changeset_copies.finish(target_rev); | |
|
96 | for rev_info in revs_info { | |
|
97 | let (rev, p1, p2, opt_bytes) = rev_info?; | |
|
98 | ||
|
99 | // We’d prefer to avoid the child thread calling into Python code, | |
|
100 | // but this avoids a potential deadlock on the GIL if it does: | |
|
101 | py.allow_threads(|| { | |
|
102 | rev_info_sender.send((rev, p1, p2, opt_bytes)).expect( | |
|
103 | "combine_changeset_copies: channel is disconnected", | |
|
104 | ); | |
|
105 | }); | |
|
106 | } | |
|
107 | // We’d prefer to avoid the child thread calling into Python code, | |
|
108 | // but this avoids a potential deadlock on the GIL if it does: | |
|
109 | py.allow_threads(|| { | |
|
110 | // Disconnect the channel to signal the child thread to stop: | |
|
111 | // the `for … in rev_info_receiver` loop will end. | |
|
112 | drop(rev_info_sender); | |
|
113 | ||
|
114 | // Wait for the child thread to stop, and propagate any panic. | |
|
115 | thread.join().unwrap_or_else(|panic_payload| { | |
|
116 | std::panic::resume_unwind(panic_payload) | |
|
117 | }) | |
|
118 | }) | |
|
119 | }; | |
|
120 | ||
|
59 | 121 | let out = PyDict::new(py); |
|
60 | 122 | for (dest, source) in path_copies.into_iter() { |
|
61 | 123 | out.set_item( |
@@ -84,7 +146,8 b' pub fn init_module(py: Python, package: ' | |||
|
84 | 146 | revs: PyList, |
|
85 | 147 | children: PyDict, |
|
86 | 148 | target_rev: Revision, |
|
87 | rev_info: PyObject | |
|
149 | rev_info: PyObject, | |
|
150 | multi_thread: bool | |
|
88 | 151 |
|
|
89 | 152 | ), |
|
90 | 153 | )?; |
General Comments 0
You need to be logged in to leave comments.
Login now