Show More
@@ -700,6 +700,11 b' coreconfigitem(' | |||||
700 | ) |
|
700 | ) | |
701 | coreconfigitem( |
|
701 | coreconfigitem( | |
702 | b'devel', |
|
702 | b'devel', | |
|
703 | b'copy-tracing.multi-thread', | |||
|
704 | default=True, | |||
|
705 | ) | |||
|
706 | coreconfigitem( | |||
|
707 | b'devel', | |||
703 | b'debug.extensions', |
|
708 | b'debug.extensions', | |
704 | default=False, |
|
709 | default=False, | |
705 | ) |
|
710 | ) |
@@ -274,6 +274,7 b' def _changesetforwardcopies(a, b, match)' | |||||
274 | revs = cl.findmissingrevs(common=[a.rev()], heads=[b.rev()]) |
|
274 | revs = cl.findmissingrevs(common=[a.rev()], heads=[b.rev()]) | |
275 | roots = set() |
|
275 | roots = set() | |
276 | has_graph_roots = False |
|
276 | has_graph_roots = False | |
|
277 | multi_thread = repo.ui.configbool(b'devel', b'copy-tracing.multi-thread') | |||
277 |
|
278 | |||
278 | # iterate over `only(B, A)` |
|
279 | # iterate over `only(B, A)` | |
279 | for r in revs: |
|
280 | for r in revs: | |
@@ -321,7 +322,13 b' def _changesetforwardcopies(a, b, match)' | |||||
321 | children_count[p] += 1 |
|
322 | children_count[p] += 1 | |
322 | revinfo = _revinfo_getter(repo, match) |
|
323 | revinfo = _revinfo_getter(repo, match) | |
323 | return _combine_changeset_copies( |
|
324 | return _combine_changeset_copies( | |
324 | revs, children_count, b.rev(), revinfo, match, isancestor |
|
325 | revs, | |
|
326 | children_count, | |||
|
327 | b.rev(), | |||
|
328 | revinfo, | |||
|
329 | match, | |||
|
330 | isancestor, | |||
|
331 | multi_thread, | |||
325 | ) |
|
332 | ) | |
326 | else: |
|
333 | else: | |
327 | # When not using side-data, we will process the edges "from" the parent. |
|
334 | # When not using side-data, we will process the edges "from" the parent. | |
@@ -346,7 +353,7 b' def _changesetforwardcopies(a, b, match)' | |||||
346 |
|
353 | |||
347 |
|
354 | |||
348 | def _combine_changeset_copies( |
|
355 | def _combine_changeset_copies( | |
349 | revs, children_count, targetrev, revinfo, match, isancestor |
|
356 | revs, children_count, targetrev, revinfo, match, isancestor, multi_thread | |
350 | ): |
|
357 | ): | |
351 | """combine the copies information for each item of iterrevs |
|
358 | """combine the copies information for each item of iterrevs | |
352 |
|
359 | |||
@@ -363,7 +370,7 b' def _combine_changeset_copies(' | |||||
363 |
|
370 | |||
364 | if rustmod is not None: |
|
371 | if rustmod is not None: | |
365 | final_copies = rustmod.combine_changeset_copies( |
|
372 | final_copies = rustmod.combine_changeset_copies( | |
366 | list(revs), children_count, targetrev, revinfo |
|
373 | list(revs), children_count, targetrev, revinfo, multi_thread | |
367 | ) |
|
374 | ) | |
368 | else: |
|
375 | else: | |
369 | isancestor = cached_is_ancestor(isancestor) |
|
376 | isancestor = cached_is_ancestor(isancestor) |
@@ -331,6 +331,7 b' name = "hg-cpython"' | |||||
331 | version = "0.1.0" |
|
331 | version = "0.1.0" | |
332 | dependencies = [ |
|
332 | dependencies = [ | |
333 | "cpython 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", |
|
333 | "cpython 0.4.1 (registry+https://github.com/rust-lang/crates.io-index)", | |
|
334 | "crossbeam-channel 0.4.4 (registry+https://github.com/rust-lang/crates.io-index)", | |||
334 | "env_logger 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", |
|
335 | "env_logger 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", | |
335 | "hg-core 0.1.0", |
|
336 | "hg-core 0.1.0", | |
336 | "libc 0.2.81 (registry+https://github.com/rust-lang/crates.io-index)", |
|
337 | "libc 0.2.81 (registry+https://github.com/rust-lang/crates.io-index)", |
@@ -22,6 +22,7 b' python27-bin = ["cpython/python27-sys"]' | |||||
22 | python3-bin = ["cpython/python3-sys"] |
|
22 | python3-bin = ["cpython/python3-sys"] | |
23 |
|
23 | |||
24 | [dependencies] |
|
24 | [dependencies] | |
|
25 | crossbeam-channel = "0.4" | |||
25 | hg-core = { path = "../hg-core"} |
|
26 | hg-core = { path = "../hg-core"} | |
26 | libc = '*' |
|
27 | libc = '*' | |
27 | log = "0.4.8" |
|
28 | log = "0.4.8" |
@@ -22,6 +22,7 b' pub fn combine_changeset_copies_wrapper(' | |||||
22 | children_count: PyDict, |
|
22 | children_count: PyDict, | |
23 | target_rev: Revision, |
|
23 | target_rev: Revision, | |
24 | rev_info: PyObject, |
|
24 | rev_info: PyObject, | |
|
25 | multi_thread: bool, | |||
25 | ) -> PyResult<PyDict> { |
|
26 | ) -> PyResult<PyDict> { | |
26 | let children_count = children_count |
|
27 | let children_count = children_count | |
27 | .items(py) |
|
28 | .items(py) | |
@@ -42,20 +43,81 b' pub fn combine_changeset_copies_wrapper(' | |||||
42 | Ok((rev, p1, p2, opt_bytes)) |
|
43 | Ok((rev, p1, p2, opt_bytes)) | |
43 | }); |
|
44 | }); | |
44 |
|
45 | |||
45 | let mut combine_changeset_copies = |
|
46 | let path_copies = if !multi_thread { | |
46 | CombineChangesetCopies::new(children_count); |
|
47 | let mut combine_changeset_copies = | |
|
48 | CombineChangesetCopies::new(children_count); | |||
|
49 | ||||
|
50 | for rev_info in revs_info { | |||
|
51 | let (rev, p1, p2, opt_bytes) = rev_info?; | |||
|
52 | let files = match &opt_bytes { | |||
|
53 | Some(bytes) => ChangedFiles::new(bytes.data(py)), | |||
|
54 | // Python None was extracted to Option::None, | |||
|
55 | // meaning there was no copy data. | |||
|
56 | None => ChangedFiles::new_empty(), | |||
|
57 | }; | |||
|
58 | ||||
|
59 | combine_changeset_copies.add_revision(rev, p1, p2, files) | |||
|
60 | } | |||
|
61 | combine_changeset_copies.finish(target_rev) | |||
|
62 | } else { | |||
|
63 | // Use a bounded channel to provide back-pressure: | |||
|
64 | // if the child thread is slower to process revisions than this thread | |||
|
65 | // is to gather data for them, an unbounded channel would keep | |||
|
66 | // growing and eat memory. | |||
|
67 | // | |||
|
68 | // TODO: tweak the bound? | |||
|
69 | let (rev_info_sender, rev_info_receiver) = | |||
|
70 | crossbeam_channel::bounded::<RevInfo>(1000); | |||
47 |
|
71 | |||
48 | for rev_info in revs_info { |
|
72 | // Start a thread that does CPU-heavy processing in parallel with the | |
49 | let (rev, p1, p2, opt_bytes) = rev_info?; |
|
73 | // loop below. | |
50 | let files = match &opt_bytes { |
|
74 | // | |
51 | Some(bytes) => ChangedFiles::new(bytes.data(py)), |
|
75 | // If the parent thread panics, `rev_info_sender` will be dropped and | |
52 | // value was presumably None, meaning they was no copy data. |
|
76 | // “disconnected”. `rev_info_receiver` will be notified of this and | |
53 | None => ChangedFiles::new_empty(), |
|
77 | // exit its own loop. | |
54 | }; |
|
78 | let thread = std::thread::spawn(move || { | |
|
79 | let mut combine_changeset_copies = | |||
|
80 | CombineChangesetCopies::new(children_count); | |||
|
81 | for (rev, p1, p2, opt_bytes) in rev_info_receiver { | |||
|
82 | let gil = Python::acquire_gil(); | |||
|
83 | let py = gil.python(); | |||
|
84 | let files = match &opt_bytes { | |||
|
85 | Some(raw) => ChangedFiles::new(raw.data(py)), | |||
|
86 | // Python None was extracted to Option::None, | |||
|
87 | // meaning there was no copy data. | |||
|
88 | None => ChangedFiles::new_empty(), | |||
|
89 | }; | |||
|
90 | combine_changeset_copies.add_revision(rev, p1, p2, files) | |||
|
91 | } | |||
|
92 | ||||
|
93 | combine_changeset_copies.finish(target_rev) | |||
|
94 | }); | |||
55 |
|
95 | |||
56 | combine_changeset_copies.add_revision(rev, p1, p2, files) |
|
96 | for rev_info in revs_info { | |
57 | } |
|
97 | let (rev, p1, p2, opt_bytes) = rev_info?; | |
58 | let path_copies = combine_changeset_copies.finish(target_rev); |
|
98 | ||
|
99 | // We’d prefer to avoid the child thread calling into Python code, | |||
|
100 | // but this avoids a potential deadlock on the GIL if it does: | |||
|
101 | py.allow_threads(|| { | |||
|
102 | rev_info_sender.send((rev, p1, p2, opt_bytes)).expect( | |||
|
103 | "combine_changeset_copies: channel is disconnected", | |||
|
104 | ); | |||
|
105 | }); | |||
|
106 | } | |||
|
107 | // We’d prefer to avoid the child thread calling into Python code, | |||
|
108 | // but this avoids a potential deadlock on the GIL if it does: | |||
|
109 | py.allow_threads(|| { | |||
|
110 | // Disconnect the channel to signal the child thread to stop: | |||
|
111 | // the `for … in rev_info_receiver` loop will end. | |||
|
112 | drop(rev_info_sender); | |||
|
113 | ||||
|
114 | // Wait for the child thread to stop, and propagate any panic. | |||
|
115 | thread.join().unwrap_or_else(|panic_payload| { | |||
|
116 | std::panic::resume_unwind(panic_payload) | |||
|
117 | }) | |||
|
118 | }) | |||
|
119 | }; | |||
|
120 | ||||
59 | let out = PyDict::new(py); |
|
121 | let out = PyDict::new(py); | |
60 | for (dest, source) in path_copies.into_iter() { |
|
122 | for (dest, source) in path_copies.into_iter() { | |
61 | out.set_item( |
|
123 | out.set_item( | |
@@ -84,7 +146,8 b' pub fn init_module(py: Python, package: ' | |||||
84 | revs: PyList, |
|
146 | revs: PyList, | |
85 | children: PyDict, |
|
147 | children: PyDict, | |
86 | target_rev: Revision, |
|
148 | target_rev: Revision, | |
87 | rev_info: PyObject |
|
149 | rev_info: PyObject, | |
|
150 | multi_thread: bool | |||
88 |
|
|
151 | ) | |
89 | ), |
|
152 | ), | |
90 | )?; |
|
153 | )?; |
General Comments 0
You need to be logged in to leave comments.
Login now