##// END OF EJS Templates
rust-clippy: fix remaining warnings in `hg-cpython`
Raphaël Gomès -
r50828:be3b545c default
parent child Browse files
Show More
@@ -1,194 +1,194
1 1 use cpython::ObjectProtocol;
2 2 use cpython::PyBytes;
3 3 use cpython::PyDict;
4 4 use cpython::PyDrop;
5 5 use cpython::PyList;
6 6 use cpython::PyModule;
7 7 use cpython::PyObject;
8 8 use cpython::PyResult;
9 9 use cpython::PyTuple;
10 10 use cpython::Python;
11 11
12 12 use hg::copy_tracing::ChangedFiles;
13 13 use hg::copy_tracing::CombineChangesetCopies;
14 14 use hg::Revision;
15 15
16 16 use crate::pybytes_deref::PyBytesDeref;
17 17
18 18 /// Combines copies information contained into revision `revs` to build a copy
19 19 /// map.
20 20 ///
21 21 /// See mercurial/copies.py for details
22 22 pub fn combine_changeset_copies_wrapper(
23 23 py: Python,
24 24 revs: PyList,
25 25 children_count: PyDict,
26 26 target_rev: Revision,
27 27 rev_info: PyObject,
28 28 multi_thread: bool,
29 29 ) -> PyResult<PyDict> {
30 30 let children_count = children_count
31 31 .items(py)
32 32 .iter()
33 33 .map(|(k, v)| Ok((k.extract(py)?, v.extract(py)?)))
34 34 .collect::<PyResult<_>>()?;
35 35
36 36 /// (Revision number, parent 1, parent 2, copy data for this revision)
37 37 type RevInfo<Bytes> = (Revision, Revision, Revision, Option<Bytes>);
38 38
39 39 let revs_info =
40 40 revs.iter(py).map(|rev_py| -> PyResult<RevInfo<PyBytes>> {
41 41 let rev = rev_py.extract(py)?;
42 42 let tuple: PyTuple =
43 43 rev_info.call(py, (rev_py,), None)?.cast_into(py)?;
44 44 let p1 = tuple.get_item(py, 0).extract(py)?;
45 45 let p2 = tuple.get_item(py, 1).extract(py)?;
46 46 let opt_bytes = tuple.get_item(py, 2).extract(py)?;
47 47 Ok((rev, p1, p2, opt_bytes))
48 48 });
49 49
50 50 let path_copies;
51 51 if !multi_thread {
52 52 let mut combine_changeset_copies =
53 53 CombineChangesetCopies::new(children_count);
54 54
55 55 for rev_info in revs_info {
56 56 let (rev, p1, p2, opt_bytes) = rev_info?;
57 57 let files = match &opt_bytes {
58 58 Some(bytes) => ChangedFiles::new(bytes.data(py)),
59 59 // Python None was extracted to Option::None,
60 60 // meaning there was no copy data.
61 61 None => ChangedFiles::new_empty(),
62 62 };
63 63
64 64 combine_changeset_copies.add_revision(rev, p1, p2, files)
65 65 }
66 66 path_copies = combine_changeset_copies.finish(target_rev)
67 67 } else {
68 68 // Use a bounded channel to provide back-pressure:
69 69 // if the child thread is slower to process revisions than this thread
70 70 // is to gather data for them, an unbounded channel would keep
71 71 // growing and eat memory.
72 72 //
73 73 // TODO: tweak the bound?
74 74 let (rev_info_sender, rev_info_receiver) =
75 75 crossbeam_channel::bounded::<RevInfo<PyBytesDeref>>(1000);
76 76
77 77 // This channel (going the other way around) however is unbounded.
78 78 // If they were both bounded, there might potentially be deadlocks
79 79 // where both channels are full and both threads are waiting on each
80 80 // other.
81 81 let (pybytes_sender, pybytes_receiver) =
82 82 crossbeam_channel::unbounded();
83 83
84 84 // Start a thread that does CPU-heavy processing in parallel with the
85 85 // loop below.
86 86 //
87 87 // If the parent thread panics, `rev_info_sender` will be dropped and
88 88 // “disconnected”. `rev_info_receiver` will be notified of this and
89 89 // exit its own loop.
90 90 let thread = std::thread::spawn(move || {
91 91 let mut combine_changeset_copies =
92 92 CombineChangesetCopies::new(children_count);
93 93 for (rev, p1, p2, opt_bytes) in rev_info_receiver {
94 94 let files = match &opt_bytes {
95 95 Some(raw) => ChangedFiles::new(raw.as_ref()),
96 96 // Python None was extracted to Option::None,
97 97 // meaning there was no copy data.
98 98 None => ChangedFiles::new_empty(),
99 99 };
100 100 combine_changeset_copies.add_revision(rev, p1, p2, files);
101 101
102 102 // Send `PyBytes` back to the parent thread so the parent
103 103 // thread can drop it. Otherwise the GIL would be implicitly
104 104 // acquired here through `impl Drop for PyBytes`.
105 105 if let Some(bytes) = opt_bytes {
106 if let Err(_) = pybytes_sender.send(bytes.unwrap()) {
106 if pybytes_sender.send(bytes.unwrap()).is_err() {
107 107 // The channel is disconnected, meaning the parent
108 108 // thread panicked or returned
109 109 // early through
110 110 // `?` to propagate a Python exception.
111 111 break;
112 112 }
113 113 }
114 114 }
115 115
116 116 combine_changeset_copies.finish(target_rev)
117 117 });
118 118
119 119 for rev_info in revs_info {
120 120 let (rev, p1, p2, opt_bytes) = rev_info?;
121 121 let opt_bytes = opt_bytes.map(|b| PyBytesDeref::new(py, b));
122 122
123 123 // We’d prefer to avoid the child thread calling into Python code,
124 124 // but this avoids a potential deadlock on the GIL if it does:
125 125 py.allow_threads(|| {
126 126 rev_info_sender.send((rev, p1, p2, opt_bytes)).expect(
127 127 "combine_changeset_copies: channel is disconnected",
128 128 );
129 129 });
130 130
131 131 // Drop anything in the channel, without blocking
132 132 for pybytes in pybytes_receiver.try_iter() {
133 133 pybytes.release_ref(py)
134 134 }
135 135 }
136 136 // We’d prefer to avoid the child thread calling into Python code,
137 137 // but this avoids a potential deadlock on the GIL if it does:
138 138 path_copies = py.allow_threads(|| {
139 139 // Disconnect the channel to signal the child thread to stop:
140 140 // the `for … in rev_info_receiver` loop will end.
141 141 drop(rev_info_sender);
142 142
143 143 // Wait for the child thread to stop, and propagate any panic.
144 144 thread.join().unwrap_or_else(|panic_payload| {
145 145 std::panic::resume_unwind(panic_payload)
146 146 })
147 147 });
148 148
149 149 // Drop anything left in the channel
150 150 for pybytes in pybytes_receiver.iter() {
151 151 pybytes.release_ref(py)
152 152 }
153 153 };
154 154
155 155 let out = PyDict::new(py);
156 156 for (dest, source) in path_copies.into_iter() {
157 157 out.set_item(
158 158 py,
159 159 PyBytes::new(py, &dest.into_vec()),
160 160 PyBytes::new(py, &source.into_vec()),
161 161 )?;
162 162 }
163 163 Ok(out)
164 164 }
165 165
166 166 /// Create the module, with `__package__` given from parent
167 167 pub fn init_module(py: Python, package: &str) -> PyResult<PyModule> {
168 168 let dotted_name = &format!("{}.copy_tracing", package);
169 169 let m = PyModule::new(py, dotted_name)?;
170 170
171 171 m.add(py, "__package__", package)?;
172 172 m.add(py, "__doc__", "Copy tracing - Rust implementation")?;
173 173
174 174 m.add(
175 175 py,
176 176 "combine_changeset_copies",
177 177 py_fn!(
178 178 py,
179 179 combine_changeset_copies_wrapper(
180 180 revs: PyList,
181 181 children: PyDict,
182 182 target_rev: Revision,
183 183 rev_info: PyObject,
184 184 multi_thread: bool
185 185 )
186 186 ),
187 187 )?;
188 188
189 189 let sys = PyModule::import(py, "sys")?;
190 190 let sys_modules: PyDict = sys.get(py, "modules")?.extract(py)?;
191 191 sys_modules.set_item(py, dotted_name, &m)?;
192 192
193 193 Ok(m)
194 194 }
@@ -1,56 +1,57
1 1 use cpython::{PyBytes, Python};
2 2 use stable_deref_trait::StableDeref;
3 3
4 4 /// Safe abstraction over a `PyBytes` together with the `&[u8]` slice
5 5 /// that borrows it. Implements `Deref<Target = [u8]>`.
6 6 ///
7 7 /// Calling `PyBytes::data` requires a GIL marker but we want to access the
8 8 /// data in a thread that (ideally) does not need to acquire the GIL.
9 9 /// This type allows separating the call an the use.
10 10 ///
11 11 /// It also enables using a (wrapped) `PyBytes` in GIL-unaware generic code.
12 12 pub struct PyBytesDeref {
13 13 #[allow(unused)]
14 14 keep_alive: PyBytes,
15 15
16 16 /// Borrows the buffer inside `self.keep_alive`,
17 17 /// but the borrow-checker cannot express self-referential structs.
18 18 data: *const [u8],
19 19 }
20 20
21 21 impl PyBytesDeref {
22 22 pub fn new(py: Python, bytes: PyBytes) -> Self {
23 23 Self {
24 24 data: bytes.data(py),
25 25 keep_alive: bytes,
26 26 }
27 27 }
28 28
29 29 pub fn unwrap(self) -> PyBytes {
30 30 self.keep_alive
31 31 }
32 32 }
33 33
34 34 impl std::ops::Deref for PyBytesDeref {
35 35 type Target = [u8];
36 36
37 37 fn deref(&self) -> &[u8] {
38 38 // Safety: the raw pointer is valid as long as the PyBytes is still
39 39 // alive, and the returned slice borrows `self`.
40 40 unsafe { &*self.data }
41 41 }
42 42 }
43 43
44 44 unsafe impl StableDeref for PyBytesDeref {}
45 45
46 46 fn require_send<T: Send>() {}
47 47
48 48 #[allow(unused)]
49 49 fn static_assert_pybytes_is_send() {
50 #[allow(clippy::no_effect)]
50 51 require_send::<PyBytes>;
51 52 }
52 53
53 54 // Safety: PyBytes is Send. Raw pointers are not by default,
54 55 // but here sending one to another thread is fine since we ensure it stays
55 56 // valid.
56 57 unsafe impl Send for PyBytesDeref {}
General Comments 0
You need to be logged in to leave comments. Login now