##// END OF EJS Templates
rust: Move PyBytesWithData out of copy-tracing code...
Simon Sapin -
r48765:8f031a27 default
parent child Browse files
Show More
@@ -0,0 +1,53 b''
1 use cpython::{PyBytes, Python};
2
3 /// Safe abstraction over a `PyBytes` together with the `&[u8]` slice
4 /// that borrows it. Implements `Deref<Target = [u8]>`.
5 ///
6 /// Calling `PyBytes::data` requires a GIL marker but we want to access the
7 /// data in a thread that (ideally) does not need to acquire the GIL.
8 /// This type allows separating the call an the use.
9 ///
10 /// It also enables using a (wrapped) `PyBytes` in GIL-unaware generic code.
11 pub struct PyBytesDeref {
12 #[allow(unused)]
13 keep_alive: PyBytes,
14
15 /// Borrows the buffer inside `self.keep_alive`,
16 /// but the borrow-checker cannot express self-referential structs.
17 data: *const [u8],
18 }
19
20 impl PyBytesDeref {
21 pub fn new(py: Python, bytes: PyBytes) -> Self {
22 Self {
23 data: bytes.data(py),
24 keep_alive: bytes,
25 }
26 }
27
28 pub fn unwrap(self) -> PyBytes {
29 self.keep_alive
30 }
31 }
32
33 impl std::ops::Deref for PyBytesDeref {
34 type Target = [u8];
35
36 fn deref(&self) -> &[u8] {
37 // Safety: the raw pointer is valid as long as the PyBytes is still
38 // alive, and the returned slice borrows `self`.
39 unsafe { &*self.data }
40 }
41 }
42
43 fn require_send<T: Send>() {}
44
45 #[allow(unused)]
46 fn static_assert_pybytes_is_send() {
47 require_send::<PyBytes>;
48 }
49
50 // Safety: PyBytes is Send. Raw pointers are not by default,
51 // but here sending one to another thread is fine since we ensure it stays
52 // valid.
53 unsafe impl Send for PyBytesDeref {}
@@ -1,245 +1,194 b''
1 1 use cpython::ObjectProtocol;
2 2 use cpython::PyBytes;
3 3 use cpython::PyDict;
4 4 use cpython::PyDrop;
5 5 use cpython::PyList;
6 6 use cpython::PyModule;
7 7 use cpython::PyObject;
8 8 use cpython::PyResult;
9 9 use cpython::PyTuple;
10 10 use cpython::Python;
11 11
12 12 use hg::copy_tracing::ChangedFiles;
13 13 use hg::copy_tracing::CombineChangesetCopies;
14 14 use hg::Revision;
15 15
16 use self::pybytes_with_data::PyBytesWithData;
17
18 // Module to encapsulate private fields
19 mod pybytes_with_data {
20 use cpython::{PyBytes, Python};
21
22 /// Safe abstraction over a `PyBytes` together with the `&[u8]` slice
23 /// that borrows it.
24 ///
25 /// Calling `PyBytes::data` requires a GIL marker but we want to access the
26 /// data in a thread that (ideally) does not need to acquire the GIL.
27 /// This type allows separating the call an the use.
28 pub(super) struct PyBytesWithData {
29 #[allow(unused)]
30 keep_alive: PyBytes,
31
32 /// Borrows the buffer inside `self.keep_alive`,
33 /// but the borrow-checker cannot express self-referential structs.
34 data: *const [u8],
35 }
36
37 fn require_send<T: Send>() {}
38
39 #[allow(unused)]
40 fn static_assert_pybytes_is_send() {
41 require_send::<PyBytes>;
42 }
43
44 // Safety: PyBytes is Send. Raw pointers are not by default,
45 // but here sending one to another thread is fine since we ensure it stays
46 // valid.
47 unsafe impl Send for PyBytesWithData {}
48
49 impl PyBytesWithData {
50 pub fn new(py: Python, bytes: PyBytes) -> Self {
51 Self {
52 data: bytes.data(py),
53 keep_alive: bytes,
54 }
55 }
56
57 pub fn data(&self) -> &[u8] {
58 // Safety: the raw pointer is valid as long as the PyBytes is still
59 // alive, and the returned slice borrows `self`.
60 unsafe { &*self.data }
61 }
62
63 pub fn unwrap(self) -> PyBytes {
64 self.keep_alive
65 }
66 }
67 }
16 use crate::pybytes_deref::PyBytesDeref;
68 17
69 18 /// Combines copies information contained into revision `revs` to build a copy
70 19 /// map.
71 20 ///
72 21 /// See mercurial/copies.py for details
73 22 pub fn combine_changeset_copies_wrapper(
74 23 py: Python,
75 24 revs: PyList,
76 25 children_count: PyDict,
77 26 target_rev: Revision,
78 27 rev_info: PyObject,
79 28 multi_thread: bool,
80 29 ) -> PyResult<PyDict> {
81 30 let children_count = children_count
82 31 .items(py)
83 32 .iter()
84 33 .map(|(k, v)| Ok((k.extract(py)?, v.extract(py)?)))
85 34 .collect::<PyResult<_>>()?;
86 35
87 36 /// (Revision number, parent 1, parent 2, copy data for this revision)
88 37 type RevInfo<Bytes> = (Revision, Revision, Revision, Option<Bytes>);
89 38
90 39 let revs_info =
91 40 revs.iter(py).map(|rev_py| -> PyResult<RevInfo<PyBytes>> {
92 41 let rev = rev_py.extract(py)?;
93 42 let tuple: PyTuple =
94 43 rev_info.call(py, (rev_py,), None)?.cast_into(py)?;
95 44 let p1 = tuple.get_item(py, 0).extract(py)?;
96 45 let p2 = tuple.get_item(py, 1).extract(py)?;
97 46 let opt_bytes = tuple.get_item(py, 2).extract(py)?;
98 47 Ok((rev, p1, p2, opt_bytes))
99 48 });
100 49
101 50 let path_copies;
102 51 if !multi_thread {
103 52 let mut combine_changeset_copies =
104 53 CombineChangesetCopies::new(children_count);
105 54
106 55 for rev_info in revs_info {
107 56 let (rev, p1, p2, opt_bytes) = rev_info?;
108 57 let files = match &opt_bytes {
109 58 Some(bytes) => ChangedFiles::new(bytes.data(py)),
110 59 // Python None was extracted to Option::None,
111 60 // meaning there was no copy data.
112 61 None => ChangedFiles::new_empty(),
113 62 };
114 63
115 64 combine_changeset_copies.add_revision(rev, p1, p2, files)
116 65 }
117 66 path_copies = combine_changeset_copies.finish(target_rev)
118 67 } else {
119 68 // Use a bounded channel to provide back-pressure:
120 69 // if the child thread is slower to process revisions than this thread
121 70 // is to gather data for them, an unbounded channel would keep
122 71 // growing and eat memory.
123 72 //
124 73 // TODO: tweak the bound?
125 74 let (rev_info_sender, rev_info_receiver) =
126 crossbeam_channel::bounded::<RevInfo<PyBytesWithData>>(1000);
75 crossbeam_channel::bounded::<RevInfo<PyBytesDeref>>(1000);
127 76
128 77 // This channel (going the other way around) however is unbounded.
129 78 // If they were both bounded, there might potentially be deadlocks
130 79 // where both channels are full and both threads are waiting on each
131 80 // other.
132 81 let (pybytes_sender, pybytes_receiver) =
133 82 crossbeam_channel::unbounded();
134 83
135 84 // Start a thread that does CPU-heavy processing in parallel with the
136 85 // loop below.
137 86 //
138 87 // If the parent thread panics, `rev_info_sender` will be dropped and
139 88 // β€œdisconnected”. `rev_info_receiver` will be notified of this and
140 89 // exit its own loop.
141 90 let thread = std::thread::spawn(move || {
142 91 let mut combine_changeset_copies =
143 92 CombineChangesetCopies::new(children_count);
144 93 for (rev, p1, p2, opt_bytes) in rev_info_receiver {
145 94 let files = match &opt_bytes {
146 Some(raw) => ChangedFiles::new(raw.data()),
95 Some(raw) => ChangedFiles::new(raw.as_ref()),
147 96 // Python None was extracted to Option::None,
148 97 // meaning there was no copy data.
149 98 None => ChangedFiles::new_empty(),
150 99 };
151 100 combine_changeset_copies.add_revision(rev, p1, p2, files);
152 101
153 102 // Send `PyBytes` back to the parent thread so the parent
154 103 // thread can drop it. Otherwise the GIL would be implicitly
155 104 // acquired here through `impl Drop for PyBytes`.
156 105 if let Some(bytes) = opt_bytes {
157 106 if let Err(_) = pybytes_sender.send(bytes.unwrap()) {
158 107 // The channel is disconnected, meaning the parent
159 108 // thread panicked or returned
160 109 // early through
161 110 // `?` to propagate a Python exception.
162 111 break;
163 112 }
164 113 }
165 114 }
166 115
167 116 combine_changeset_copies.finish(target_rev)
168 117 });
169 118
170 119 for rev_info in revs_info {
171 120 let (rev, p1, p2, opt_bytes) = rev_info?;
172 let opt_bytes = opt_bytes.map(|b| PyBytesWithData::new(py, b));
121 let opt_bytes = opt_bytes.map(|b| PyBytesDeref::new(py, b));
173 122
174 123 // We’d prefer to avoid the child thread calling into Python code,
175 124 // but this avoids a potential deadlock on the GIL if it does:
176 125 py.allow_threads(|| {
177 126 rev_info_sender.send((rev, p1, p2, opt_bytes)).expect(
178 127 "combine_changeset_copies: channel is disconnected",
179 128 );
180 129 });
181 130
182 131 // Drop anything in the channel, without blocking
183 132 for pybytes in pybytes_receiver.try_iter() {
184 133 pybytes.release_ref(py)
185 134 }
186 135 }
187 136 // We’d prefer to avoid the child thread calling into Python code,
188 137 // but this avoids a potential deadlock on the GIL if it does:
189 138 path_copies = py.allow_threads(|| {
190 139 // Disconnect the channel to signal the child thread to stop:
191 140 // the `for … in rev_info_receiver` loop will end.
192 141 drop(rev_info_sender);
193 142
194 143 // Wait for the child thread to stop, and propagate any panic.
195 144 thread.join().unwrap_or_else(|panic_payload| {
196 145 std::panic::resume_unwind(panic_payload)
197 146 })
198 147 });
199 148
200 149 // Drop anything left in the channel
201 150 for pybytes in pybytes_receiver.iter() {
202 151 pybytes.release_ref(py)
203 152 }
204 153 };
205 154
206 155 let out = PyDict::new(py);
207 156 for (dest, source) in path_copies.into_iter() {
208 157 out.set_item(
209 158 py,
210 159 PyBytes::new(py, &dest.into_vec()),
211 160 PyBytes::new(py, &source.into_vec()),
212 161 )?;
213 162 }
214 163 Ok(out)
215 164 }
216 165
217 166 /// Create the module, with `__package__` given from parent
218 167 pub fn init_module(py: Python, package: &str) -> PyResult<PyModule> {
219 168 let dotted_name = &format!("{}.copy_tracing", package);
220 169 let m = PyModule::new(py, dotted_name)?;
221 170
222 171 m.add(py, "__package__", package)?;
223 172 m.add(py, "__doc__", "Copy tracing - Rust implementation")?;
224 173
225 174 m.add(
226 175 py,
227 176 "combine_changeset_copies",
228 177 py_fn!(
229 178 py,
230 179 combine_changeset_copies_wrapper(
231 180 revs: PyList,
232 181 children: PyDict,
233 182 target_rev: Revision,
234 183 rev_info: PyObject,
235 184 multi_thread: bool
236 185 )
237 186 ),
238 187 )?;
239 188
240 189 let sys = PyModule::import(py, "sys")?;
241 190 let sys_modules: PyDict = sys.get(py, "modules")?.extract(py)?;
242 191 sys_modules.set_item(py, dotted_name, &m)?;
243 192
244 193 Ok(m)
245 194 }
@@ -1,75 +1,76 b''
1 1 // lib.rs
2 2 //
3 3 // Copyright 2018 Georges Racinet <gracinet@anybox.fr>
4 4 //
5 5 // This software may be used and distributed according to the terms of the
6 6 // GNU General Public License version 2 or any later version.
7 7
8 8 //! Python bindings of `hg-core` objects using the `cpython` crate.
9 9 //! Once compiled, the resulting single shared library object can be placed in
10 10 //! the `mercurial` package directly as `rustext.so` or `rustext.dll`.
11 11 //! It holds several modules, so that from the point of view of Python,
12 12 //! it behaves as the `cext` package.
13 13 //!
14 14 //! Example:
15 15 //!
16 16 //! ```text
17 17 //! >>> from mercurial.rustext import ancestor
18 18 //! >>> ancestor.__doc__
19 19 //! 'Generic DAG ancestor algorithms - Rust implementation'
20 20 //! ```
21 21
22 22 /// This crate uses nested private macros, `extern crate` is still needed in
23 23 /// 2018 edition.
24 24 #[macro_use]
25 25 extern crate cpython;
26 26
27 27 pub mod ancestors;
28 28 mod cindex;
29 29 mod conversion;
30 30 #[macro_use]
31 31 pub mod ref_sharing;
32 32 pub mod copy_tracing;
33 33 pub mod dagops;
34 34 pub mod debug;
35 35 pub mod dirstate;
36 36 pub mod discovery;
37 37 pub mod exceptions;
38 38 pub mod parsers;
39 mod pybytes_deref;
39 40 pub mod revlog;
40 41 pub mod utils;
41 42
42 43 py_module_initializer!(rustext, initrustext, PyInit_rustext, |py, m| {
43 44 m.add(
44 45 py,
45 46 "__doc__",
46 47 "Mercurial core concepts - Rust implementation",
47 48 )?;
48 49
49 50 let dotted_name: String = m.get(py, "__name__")?.extract(py)?;
50 51 m.add(py, "ancestor", ancestors::init_module(py, &dotted_name)?)?;
51 52 m.add(py, "dagop", dagops::init_module(py, &dotted_name)?)?;
52 53 m.add(py, "debug", debug::init_module(py, &dotted_name)?)?;
53 54 m.add(
54 55 py,
55 56 "copy_tracing",
56 57 copy_tracing::init_module(py, &dotted_name)?,
57 58 )?;
58 59 m.add(py, "discovery", discovery::init_module(py, &dotted_name)?)?;
59 60 m.add(py, "dirstate", dirstate::init_module(py, &dotted_name)?)?;
60 61 m.add(py, "revlog", revlog::init_module(py, &dotted_name)?)?;
61 62 m.add(
62 63 py,
63 64 "parsers",
64 65 parsers::init_parsers_module(py, &dotted_name)?,
65 66 )?;
66 67 m.add(py, "GraphError", py.get_type::<exceptions::GraphError>())?;
67 68 Ok(())
68 69 });
69 70
70 71 #[cfg(not(any(feature = "python27-bin", feature = "python3-bin")))]
71 72 #[test]
72 73 #[ignore]
73 74 fn libpython_must_be_linked_to_run_tests() {
74 75 // stub function to tell that some tests wouldn't run
75 76 }
General Comments 0
You need to be logged in to leave comments. Login now