##// END OF EJS Templates
rust-chg: use "crate::" to import local modules...
Yuya Nishihara -
r45180:6bef9d43 default
parent child Browse files
Show More
@@ -1,114 +1,114 b''
1 1 // Copyright 2018 Yuya Nishihara <yuya@tcha.org>
2 2 //
3 3 // This software may be used and distributed according to the terms of the
4 4 // GNU General Public License version 2 or any later version.
5 5
6 6 //! Functions to send client-side fds over the command server channel.
7 7
8 8 use futures::{try_ready, Async, Future, Poll};
9 9 use std::io;
10 10 use std::os::unix::io::AsRawFd;
11 11 use tokio_hglib::codec::ChannelMessage;
12 12 use tokio_hglib::protocol::MessageLoop;
13 13 use tokio_hglib::{Client, Connection};
14 14
15 use super::message;
16 use super::procutil;
15 use crate::message;
16 use crate::procutil;
17 17
18 18 /// Future to send client-side fds over the command server channel.
19 19 ///
20 20 /// This works as follows:
21 21 /// 1. Client sends "attachio" request.
22 22 /// 2. Server sends back 1-byte input request.
23 23 /// 3. Client sends fds with 1-byte dummy payload in response.
24 24 /// 4. Server returns the number of the fds received.
25 25 ///
26 26 /// If the stderr is omitted, it will be redirected to the stdout. This
27 27 /// allows us to attach the pager stdin to both stdout and stderr, and
28 28 /// dispose of the client-side handle once attached.
29 29 #[must_use = "futures do nothing unless polled"]
30 30 pub struct AttachIo<C, I, O, E>
31 31 where
32 32 C: Connection,
33 33 {
34 34 msg_loop: MessageLoop<C>,
35 35 stdin: I,
36 36 stdout: O,
37 37 stderr: Option<E>,
38 38 }
39 39
40 40 impl<C, I, O, E> AttachIo<C, I, O, E>
41 41 where
42 42 C: Connection + AsRawFd,
43 43 I: AsRawFd,
44 44 O: AsRawFd,
45 45 E: AsRawFd,
46 46 {
47 47 pub fn with_client(
48 48 client: Client<C>,
49 49 stdin: I,
50 50 stdout: O,
51 51 stderr: Option<E>,
52 52 ) -> AttachIo<C, I, O, E> {
53 53 let msg_loop = MessageLoop::start(client, b"attachio");
54 54 AttachIo {
55 55 msg_loop,
56 56 stdin,
57 57 stdout,
58 58 stderr,
59 59 }
60 60 }
61 61 }
62 62
63 63 impl<C, I, O, E> Future for AttachIo<C, I, O, E>
64 64 where
65 65 C: Connection + AsRawFd,
66 66 I: AsRawFd,
67 67 O: AsRawFd,
68 68 E: AsRawFd,
69 69 {
70 70 type Item = Client<C>;
71 71 type Error = io::Error;
72 72
73 73 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
74 74 loop {
75 75 let (client, msg) = try_ready!(self.msg_loop.poll());
76 76 match msg {
77 77 ChannelMessage::Data(b'r', data) => {
78 78 let fd_cnt = message::parse_result_code(data)?;
79 79 if fd_cnt == 3 {
80 80 return Ok(Async::Ready(client));
81 81 } else {
82 82 return Err(io::Error::new(
83 83 io::ErrorKind::InvalidData,
84 84 "unexpected attachio result",
85 85 ));
86 86 }
87 87 }
88 88 ChannelMessage::Data(..) => {
89 89 // just ignore data sent to uninteresting (optional) channel
90 90 self.msg_loop = MessageLoop::resume(client);
91 91 }
92 92 ChannelMessage::InputRequest(1) => {
93 93 // this may fail with EWOULDBLOCK in theory, but the
94 94 // payload is quite small, and the send buffer should
95 95 // be empty so the operation will complete immediately
96 96 let sock_fd = client.as_raw_fd();
97 97 let ifd = self.stdin.as_raw_fd();
98 98 let ofd = self.stdout.as_raw_fd();
99 99 let efd = self.stderr.as_ref().map_or(ofd, |f| f.as_raw_fd());
100 100 procutil::send_raw_fds(sock_fd, &[ifd, ofd, efd])?;
101 101 self.msg_loop = MessageLoop::resume(client);
102 102 }
103 103 ChannelMessage::InputRequest(..)
104 104 | ChannelMessage::LineRequest(..)
105 105 | ChannelMessage::SystemRequest(..) => {
106 106 return Err(io::Error::new(
107 107 io::ErrorKind::InvalidData,
108 108 "unsupported request while attaching io",
109 109 ));
110 110 }
111 111 }
112 112 }
113 113 }
114 114 }
@@ -1,136 +1,136 b''
1 1 // Copyright 2018 Yuya Nishihara <yuya@tcha.org>
2 2 //
3 3 // This software may be used and distributed according to the terms of the
4 4 // GNU General Public License version 2 or any later version.
5 5
6 6 //! cHg extensions to command server client.
7 7
8 8 use bytes::{BufMut, Bytes, BytesMut};
9 9 use std::ffi::OsStr;
10 10 use std::io;
11 11 use std::mem;
12 12 use std::os::unix::ffi::OsStrExt;
13 13 use std::os::unix::io::AsRawFd;
14 14 use std::path::Path;
15 15 use tokio_hglib::protocol::{OneShotQuery, OneShotRequest};
16 16 use tokio_hglib::{Client, Connection};
17 17
18 use super::attachio::AttachIo;
19 use super::message::{self, Instruction};
20 use super::runcommand::ChgRunCommand;
21 use super::uihandler::SystemHandler;
18 use crate::attachio::AttachIo;
19 use crate::message::{self, Instruction};
20 use crate::runcommand::ChgRunCommand;
21 use crate::uihandler::SystemHandler;
22 22
23 23 pub trait ChgClientExt<C>
24 24 where
25 25 C: Connection + AsRawFd,
26 26 {
27 27 /// Attaches the client file descriptors to the server.
28 28 fn attach_io<I, O, E>(self, stdin: I, stdout: O, stderr: E) -> AttachIo<C, I, O, E>
29 29 where
30 30 I: AsRawFd,
31 31 O: AsRawFd,
32 32 E: AsRawFd;
33 33
34 34 /// Changes the working directory of the server.
35 35 fn set_current_dir<P>(self, dir: P) -> OneShotRequest<C>
36 36 where
37 37 P: AsRef<Path>;
38 38
39 39 /// Updates the environment variables of the server.
40 40 fn set_env_vars_os<I, P>(self, vars: I) -> OneShotRequest<C>
41 41 where
42 42 I: IntoIterator<Item = (P, P)>,
43 43 P: AsRef<OsStr>;
44 44
45 45 /// Changes the process title of the server.
46 46 fn set_process_name<P>(self, name: P) -> OneShotRequest<C>
47 47 where
48 48 P: AsRef<OsStr>;
49 49
50 50 /// Changes the umask of the server process.
51 51 fn set_umask(self, mask: u32) -> OneShotRequest<C>;
52 52
53 53 /// Runs the specified Mercurial command with cHg extension.
54 54 fn run_command_chg<I, P, H>(self, handler: H, args: I) -> ChgRunCommand<C, H>
55 55 where
56 56 I: IntoIterator<Item = P>,
57 57 P: AsRef<OsStr>,
58 58 H: SystemHandler;
59 59
60 60 /// Validates if the server can run Mercurial commands with the expected
61 61 /// configuration.
62 62 ///
63 63 /// The `args` should contain early command arguments such as `--config`
64 64 /// and `-R`.
65 65 ///
66 66 /// Client-side environment must be sent prior to this request, by
67 67 /// `set_current_dir()` and `set_env_vars_os()`.
68 68 fn validate<I, P>(self, args: I) -> OneShotQuery<C, fn(Bytes) -> io::Result<Vec<Instruction>>>
69 69 where
70 70 I: IntoIterator<Item = P>,
71 71 P: AsRef<OsStr>;
72 72 }
73 73
74 74 impl<C> ChgClientExt<C> for Client<C>
75 75 where
76 76 C: Connection + AsRawFd,
77 77 {
78 78 fn attach_io<I, O, E>(self, stdin: I, stdout: O, stderr: E) -> AttachIo<C, I, O, E>
79 79 where
80 80 I: AsRawFd,
81 81 O: AsRawFd,
82 82 E: AsRawFd,
83 83 {
84 84 AttachIo::with_client(self, stdin, stdout, Some(stderr))
85 85 }
86 86
87 87 fn set_current_dir<P>(self, dir: P) -> OneShotRequest<C>
88 88 where
89 89 P: AsRef<Path>,
90 90 {
91 91 OneShotRequest::start_with_args(self, b"chdir", dir.as_ref().as_os_str().as_bytes())
92 92 }
93 93
94 94 fn set_env_vars_os<I, P>(self, vars: I) -> OneShotRequest<C>
95 95 where
96 96 I: IntoIterator<Item = (P, P)>,
97 97 P: AsRef<OsStr>,
98 98 {
99 99 OneShotRequest::start_with_args(self, b"setenv", message::pack_env_vars_os(vars))
100 100 }
101 101
102 102 fn set_process_name<P>(self, name: P) -> OneShotRequest<C>
103 103 where
104 104 P: AsRef<OsStr>,
105 105 {
106 106 OneShotRequest::start_with_args(self, b"setprocname", name.as_ref().as_bytes())
107 107 }
108 108
109 109 fn set_umask(self, mask: u32) -> OneShotRequest<C> {
110 110 let mut args = BytesMut::with_capacity(mem::size_of_val(&mask));
111 111 args.put_u32_be(mask);
112 112 OneShotRequest::start_with_args(self, b"setumask2", args)
113 113 }
114 114
115 115 fn run_command_chg<I, P, H>(self, handler: H, args: I) -> ChgRunCommand<C, H>
116 116 where
117 117 I: IntoIterator<Item = P>,
118 118 P: AsRef<OsStr>,
119 119 H: SystemHandler,
120 120 {
121 121 ChgRunCommand::with_client(self, handler, message::pack_args_os(args))
122 122 }
123 123
124 124 fn validate<I, P>(self, args: I) -> OneShotQuery<C, fn(Bytes) -> io::Result<Vec<Instruction>>>
125 125 where
126 126 I: IntoIterator<Item = P>,
127 127 P: AsRef<OsStr>,
128 128 {
129 129 OneShotQuery::start_with_args(
130 130 self,
131 131 b"validate",
132 132 message::pack_args_os(args),
133 133 message::parse_instructions,
134 134 )
135 135 }
136 136 }
@@ -1,501 +1,501 b''
1 1 // Copyright 2011, 2018 Yuya Nishihara <yuya@tcha.org>
2 2 //
3 3 // This software may be used and distributed according to the terms of the
4 4 // GNU General Public License version 2 or any later version.
5 5
6 6 //! Utility for locating command-server process.
7 7
8 8 use futures::future::{self, Either, Loop};
9 9 use log::debug;
10 10 use std::env;
11 11 use std::ffi::{OsStr, OsString};
12 12 use std::fs::{self, DirBuilder};
13 13 use std::io;
14 14 use std::os::unix::ffi::{OsStrExt, OsStringExt};
15 15 use std::os::unix::fs::{DirBuilderExt, MetadataExt};
16 16 use std::path::{Path, PathBuf};
17 17 use std::process::{self, Command};
18 18 use std::time::Duration;
19 19 use tokio::prelude::*;
20 20 use tokio_hglib::UnixClient;
21 21 use tokio_process::{Child, CommandExt};
22 22 use tokio_timer;
23 23
24 use super::clientext::ChgClientExt;
25 use super::message::{Instruction, ServerSpec};
26 use super::procutil;
24 use crate::clientext::ChgClientExt;
25 use crate::message::{Instruction, ServerSpec};
26 use crate::procutil;
27 27
28 28 const REQUIRED_SERVER_CAPABILITIES: &[&str] = &[
29 29 "attachio",
30 30 "chdir",
31 31 "runcommand",
32 32 "setenv",
33 33 "setumask2",
34 34 "validate",
35 35 ];
36 36
37 37 /// Helper to connect to and spawn a server process.
38 38 #[derive(Clone, Debug)]
39 39 pub struct Locator {
40 40 hg_command: OsString,
41 41 hg_early_args: Vec<OsString>,
42 42 current_dir: PathBuf,
43 43 env_vars: Vec<(OsString, OsString)>,
44 44 process_id: u32,
45 45 base_sock_path: PathBuf,
46 46 redirect_sock_path: Option<PathBuf>,
47 47 timeout: Duration,
48 48 }
49 49
50 50 impl Locator {
51 51 /// Creates locator capturing the current process environment.
52 52 ///
53 53 /// If no `$CHGSOCKNAME` is specified, the socket directory will be
54 54 /// created as necessary.
55 55 pub fn prepare_from_env() -> io::Result<Locator> {
56 56 Ok(Locator {
57 57 hg_command: default_hg_command(),
58 58 hg_early_args: Vec::new(),
59 59 current_dir: env::current_dir()?,
60 60 env_vars: env::vars_os().collect(),
61 61 process_id: process::id(),
62 62 base_sock_path: prepare_server_socket_path()?,
63 63 redirect_sock_path: None,
64 64 timeout: default_timeout(),
65 65 })
66 66 }
67 67
68 68 /// Temporary socket path for this client process.
69 69 fn temp_sock_path(&self) -> PathBuf {
70 70 let src = self.base_sock_path.as_os_str().as_bytes();
71 71 let mut buf = Vec::with_capacity(src.len() + 6); // "{src}.{pid}".len()
72 72 buf.extend_from_slice(src);
73 73 buf.extend_from_slice(format!(".{}", self.process_id).as_bytes());
74 74 OsString::from_vec(buf).into()
75 75 }
76 76
77 77 /// Specifies the arguments to be passed to the server at start.
78 78 pub fn set_early_args<I, P>(&mut self, args: I)
79 79 where
80 80 I: IntoIterator<Item = P>,
81 81 P: AsRef<OsStr>,
82 82 {
83 83 self.hg_early_args = args.into_iter().map(|a| a.as_ref().to_owned()).collect();
84 84 }
85 85
86 86 /// Connects to the server.
87 87 ///
88 88 /// The server process will be spawned if not running.
89 89 pub fn connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> {
90 90 future::loop_fn((self, 0), |(loc, cnt)| {
91 91 if cnt < 10 {
92 92 let fut = loc
93 93 .try_connect()
94 94 .and_then(|(loc, client)| {
95 95 client
96 96 .validate(&loc.hg_early_args)
97 97 .map(|(client, instructions)| (loc, client, instructions))
98 98 })
99 99 .and_then(move |(loc, client, instructions)| {
100 100 loc.run_instructions(client, instructions, cnt)
101 101 });
102 102 Either::A(fut)
103 103 } else {
104 104 let msg = format!(
105 105 concat!(
106 106 "too many redirections.\n",
107 107 "Please make sure {:?} is not a wrapper which ",
108 108 "changes sensitive environment variables ",
109 109 "before executing hg. If you have to use a ",
110 110 "wrapper, wrap chg instead of hg.",
111 111 ),
112 112 loc.hg_command
113 113 );
114 114 Either::B(future::err(io::Error::new(io::ErrorKind::Other, msg)))
115 115 }
116 116 })
117 117 }
118 118
119 119 /// Runs instructions received from the server.
120 120 fn run_instructions(
121 121 mut self,
122 122 client: UnixClient,
123 123 instructions: Vec<Instruction>,
124 124 cnt: usize,
125 125 ) -> io::Result<Loop<(Self, UnixClient), (Self, usize)>> {
126 126 let mut reconnect = false;
127 127 for inst in instructions {
128 128 debug!("instruction: {:?}", inst);
129 129 match inst {
130 130 Instruction::Exit(_) => {
131 131 // Just returns the current connection to run the
132 132 // unparsable command and report the error
133 133 return Ok(Loop::Break((self, client)));
134 134 }
135 135 Instruction::Reconnect => {
136 136 reconnect = true;
137 137 }
138 138 Instruction::Redirect(path) => {
139 139 if path.parent() != self.base_sock_path.parent() {
140 140 let msg = format!(
141 141 "insecure redirect instruction from server: {}",
142 142 path.display()
143 143 );
144 144 return Err(io::Error::new(io::ErrorKind::InvalidData, msg));
145 145 }
146 146 self.redirect_sock_path = Some(path);
147 147 reconnect = true;
148 148 }
149 149 Instruction::Unlink(path) => {
150 150 if path.parent() != self.base_sock_path.parent() {
151 151 let msg = format!(
152 152 "insecure unlink instruction from server: {}",
153 153 path.display()
154 154 );
155 155 return Err(io::Error::new(io::ErrorKind::InvalidData, msg));
156 156 }
157 157 fs::remove_file(path).unwrap_or(()); // may race
158 158 }
159 159 }
160 160 }
161 161
162 162 if reconnect {
163 163 Ok(Loop::Continue((self, cnt + 1)))
164 164 } else {
165 165 Ok(Loop::Break((self, client)))
166 166 }
167 167 }
168 168
169 169 /// Tries to connect to the existing server, or spawns new if not running.
170 170 fn try_connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> {
171 171 let sock_path = self
172 172 .redirect_sock_path
173 173 .as_ref()
174 174 .unwrap_or(&self.base_sock_path)
175 175 .clone();
176 176 debug!("try connect to {}", sock_path.display());
177 177 UnixClient::connect(sock_path)
178 178 .then(|res| {
179 179 match res {
180 180 Ok(client) => Either::A(future::ok((self, client))),
181 181 Err(_) => {
182 182 // Prevent us from being re-connected to the outdated
183 183 // master server: We were told by the server to redirect
184 184 // to redirect_sock_path, which didn't work. We do not
185 185 // want to connect to the same master server again
186 186 // because it would probably tell us the same thing.
187 187 if self.redirect_sock_path.is_some() {
188 188 fs::remove_file(&self.base_sock_path).unwrap_or(());
189 189 // may race
190 190 }
191 191 Either::B(self.spawn_connect())
192 192 }
193 193 }
194 194 })
195 195 .and_then(|(loc, client)| {
196 196 check_server_capabilities(client.server_spec())?;
197 197 Ok((loc, client))
198 198 })
199 199 .and_then(|(loc, client)| {
200 200 // It's purely optional, and the server might not support this command.
201 201 if client.server_spec().capabilities.contains("setprocname") {
202 202 let fut = client
203 203 .set_process_name(format!("chg[worker/{}]", loc.process_id))
204 204 .map(|client| (loc, client));
205 205 Either::A(fut)
206 206 } else {
207 207 Either::B(future::ok((loc, client)))
208 208 }
209 209 })
210 210 .and_then(|(loc, client)| {
211 211 client
212 212 .set_current_dir(&loc.current_dir)
213 213 .map(|client| (loc, client))
214 214 })
215 215 .and_then(|(loc, client)| {
216 216 client
217 217 .set_env_vars_os(loc.env_vars.iter().cloned())
218 218 .map(|client| (loc, client))
219 219 })
220 220 }
221 221
222 222 /// Spawns new server process and connects to it.
223 223 ///
224 224 /// The server will be spawned at the current working directory, then
225 225 /// chdir to "/", so that the server will load configs from the target
226 226 /// repository.
227 227 fn spawn_connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> {
228 228 let sock_path = self.temp_sock_path();
229 229 debug!("start cmdserver at {}", sock_path.display());
230 230 Command::new(&self.hg_command)
231 231 .arg("serve")
232 232 .arg("--cmdserver")
233 233 .arg("chgunix")
234 234 .arg("--address")
235 235 .arg(&sock_path)
236 236 .arg("--daemon-postexec")
237 237 .arg("chdir:/")
238 238 .args(&self.hg_early_args)
239 239 .current_dir(&self.current_dir)
240 240 .env_clear()
241 241 .envs(self.env_vars.iter().cloned())
242 242 .env("CHGINTERNALMARK", "")
243 243 .spawn_async()
244 244 .into_future()
245 245 .and_then(|server| self.connect_spawned(server, sock_path))
246 246 .and_then(|(loc, client, sock_path)| {
247 247 debug!(
248 248 "rename {} to {}",
249 249 sock_path.display(),
250 250 loc.base_sock_path.display()
251 251 );
252 252 fs::rename(&sock_path, &loc.base_sock_path)?;
253 253 Ok((loc, client))
254 254 })
255 255 }
256 256
257 257 /// Tries to connect to the just spawned server repeatedly until timeout
258 258 /// exceeded.
259 259 fn connect_spawned(
260 260 self,
261 261 server: Child,
262 262 sock_path: PathBuf,
263 263 ) -> impl Future<Item = (Self, UnixClient, PathBuf), Error = io::Error> {
264 264 debug!("try connect to {} repeatedly", sock_path.display());
265 265 let connect = future::loop_fn(sock_path, |sock_path| {
266 266 UnixClient::connect(sock_path.clone()).then(|res| {
267 267 match res {
268 268 Ok(client) => Either::A(future::ok(Loop::Break((client, sock_path)))),
269 269 Err(_) => {
270 270 // try again with slight delay
271 271 let fut = tokio_timer::sleep(Duration::from_millis(10))
272 272 .map(|()| Loop::Continue(sock_path))
273 273 .map_err(|err| io::Error::new(io::ErrorKind::Other, err));
274 274 Either::B(fut)
275 275 }
276 276 }
277 277 })
278 278 });
279 279
280 280 // waits for either connection established or server failed to start
281 281 connect
282 282 .select2(server)
283 283 .map_err(|res| res.split().0)
284 284 .timeout(self.timeout)
285 285 .map_err(|err| {
286 286 err.into_inner().unwrap_or_else(|| {
287 287 io::Error::new(
288 288 io::ErrorKind::TimedOut,
289 289 "timed out while connecting to server",
290 290 )
291 291 })
292 292 })
293 293 .and_then(|res| {
294 294 match res {
295 295 Either::A(((client, sock_path), server)) => {
296 296 server.forget(); // continue to run in background
297 297 Ok((self, client, sock_path))
298 298 }
299 299 Either::B((st, _)) => Err(io::Error::new(
300 300 io::ErrorKind::Other,
301 301 format!("server exited too early: {}", st),
302 302 )),
303 303 }
304 304 })
305 305 }
306 306 }
307 307
308 308 /// Determines the server socket to connect to.
309 309 ///
310 310 /// If no `$CHGSOCKNAME` is specified, the socket directory will be created
311 311 /// as necessary.
312 312 fn prepare_server_socket_path() -> io::Result<PathBuf> {
313 313 if let Some(s) = env::var_os("CHGSOCKNAME") {
314 314 Ok(PathBuf::from(s))
315 315 } else {
316 316 let mut path = default_server_socket_dir();
317 317 create_secure_dir(&path)?;
318 318 path.push("server");
319 319 Ok(path)
320 320 }
321 321 }
322 322
323 323 /// Determines the default server socket path as follows.
324 324 ///
325 325 /// 1. `$XDG_RUNTIME_DIR/chg`
326 326 /// 2. `$TMPDIR/chg$UID`
327 327 /// 3. `/tmp/chg$UID`
328 328 pub fn default_server_socket_dir() -> PathBuf {
329 329 // XDG_RUNTIME_DIR should be ignored if it has an insufficient permission.
330 330 // https://standards.freedesktop.org/basedir-spec/basedir-spec-latest.html
331 331 if let Some(Ok(s)) = env::var_os("XDG_RUNTIME_DIR").map(check_secure_dir) {
332 332 let mut path = PathBuf::from(s);
333 333 path.push("chg");
334 334 path
335 335 } else {
336 336 let mut path = env::temp_dir();
337 337 path.push(format!("chg{}", procutil::get_effective_uid()));
338 338 path
339 339 }
340 340 }
341 341
342 342 /// Determines the default hg command.
343 343 pub fn default_hg_command() -> OsString {
344 344 // TODO: maybe allow embedding the path at compile time (or load from hgrc)
345 345 env::var_os("CHGHG")
346 346 .or(env::var_os("HG"))
347 347 .unwrap_or(OsStr::new("hg").to_owned())
348 348 }
349 349
350 350 fn default_timeout() -> Duration {
351 351 let secs = env::var("CHGTIMEOUT")
352 352 .ok()
353 353 .and_then(|s| s.parse().ok())
354 354 .unwrap_or(60);
355 355 Duration::from_secs(secs)
356 356 }
357 357
358 358 /// Creates a directory which the other users cannot access to.
359 359 ///
360 360 /// If the directory already exists, tests its permission.
361 361 fn create_secure_dir<P>(path: P) -> io::Result<()>
362 362 where
363 363 P: AsRef<Path>,
364 364 {
365 365 DirBuilder::new()
366 366 .mode(0o700)
367 367 .create(path.as_ref())
368 368 .or_else(|err| {
369 369 if err.kind() == io::ErrorKind::AlreadyExists {
370 370 check_secure_dir(path).map(|_| ())
371 371 } else {
372 372 Err(err)
373 373 }
374 374 })
375 375 }
376 376
377 377 fn check_secure_dir<P>(path: P) -> io::Result<P>
378 378 where
379 379 P: AsRef<Path>,
380 380 {
381 381 let a = fs::symlink_metadata(path.as_ref())?;
382 382 if a.is_dir() && a.uid() == procutil::get_effective_uid() && (a.mode() & 0o777) == 0o700 {
383 383 Ok(path)
384 384 } else {
385 385 Err(io::Error::new(io::ErrorKind::Other, "insecure directory"))
386 386 }
387 387 }
388 388
389 389 fn check_server_capabilities(spec: &ServerSpec) -> io::Result<()> {
390 390 let unsupported: Vec<_> = REQUIRED_SERVER_CAPABILITIES
391 391 .iter()
392 392 .cloned()
393 393 .filter(|&s| !spec.capabilities.contains(s))
394 394 .collect();
395 395 if unsupported.is_empty() {
396 396 Ok(())
397 397 } else {
398 398 let msg = format!(
399 399 "insufficient server capabilities: {}",
400 400 unsupported.join(", ")
401 401 );
402 402 Err(io::Error::new(io::ErrorKind::Other, msg))
403 403 }
404 404 }
405 405
406 406 /// Collects arguments which need to be passed to the server at start.
407 407 pub fn collect_early_args<I, P>(args: I) -> Vec<OsString>
408 408 where
409 409 I: IntoIterator<Item = P>,
410 410 P: AsRef<OsStr>,
411 411 {
412 412 let mut args_iter = args.into_iter();
413 413 let mut early_args = Vec::new();
414 414 while let Some(arg) = args_iter.next() {
415 415 let argb = arg.as_ref().as_bytes();
416 416 if argb == b"--" {
417 417 break;
418 418 } else if argb.starts_with(b"--") {
419 419 let mut split = argb[2..].splitn(2, |&c| c == b'=');
420 420 match split.next().unwrap() {
421 421 b"traceback" => {
422 422 if split.next().is_none() {
423 423 early_args.push(arg.as_ref().to_owned());
424 424 }
425 425 }
426 426 b"config" | b"cwd" | b"repo" | b"repository" => {
427 427 if split.next().is_some() {
428 428 // --<flag>=<val>
429 429 early_args.push(arg.as_ref().to_owned());
430 430 } else {
431 431 // --<flag> <val>
432 432 args_iter.next().map(|val| {
433 433 early_args.push(arg.as_ref().to_owned());
434 434 early_args.push(val.as_ref().to_owned());
435 435 });
436 436 }
437 437 }
438 438 _ => {}
439 439 }
440 440 } else if argb.starts_with(b"-R") {
441 441 if argb.len() > 2 {
442 442 // -R<val>
443 443 early_args.push(arg.as_ref().to_owned());
444 444 } else {
445 445 // -R <val>
446 446 args_iter.next().map(|val| {
447 447 early_args.push(arg.as_ref().to_owned());
448 448 early_args.push(val.as_ref().to_owned());
449 449 });
450 450 }
451 451 }
452 452 }
453 453
454 454 early_args
455 455 }
456 456
457 457 #[cfg(test)]
458 458 mod tests {
459 459 use super::*;
460 460
461 461 #[test]
462 462 fn collect_early_args_some() {
463 463 assert!(collect_early_args(&[] as &[&OsStr]).is_empty());
464 464 assert!(collect_early_args(&["log"]).is_empty());
465 465 assert_eq!(
466 466 collect_early_args(&["log", "-Ra", "foo"]),
467 467 os_string_vec_from(&[b"-Ra"])
468 468 );
469 469 assert_eq!(
470 470 collect_early_args(&["log", "-R", "repo", "", "--traceback", "a"]),
471 471 os_string_vec_from(&[b"-R", b"repo", b"--traceback"])
472 472 );
473 473 assert_eq!(
474 474 collect_early_args(&["log", "--config", "diff.git=1", "-q"]),
475 475 os_string_vec_from(&[b"--config", b"diff.git=1"])
476 476 );
477 477 assert_eq!(
478 478 collect_early_args(&["--cwd=..", "--repository", "r", "log"]),
479 479 os_string_vec_from(&[b"--cwd=..", b"--repository", b"r"])
480 480 );
481 481 assert_eq!(
482 482 collect_early_args(&["log", "--repo=r", "--repos", "a"]),
483 483 os_string_vec_from(&[b"--repo=r"])
484 484 );
485 485 }
486 486
487 487 #[test]
488 488 fn collect_early_args_orphaned() {
489 489 assert!(collect_early_args(&["log", "-R"]).is_empty());
490 490 assert!(collect_early_args(&["log", "--config"]).is_empty());
491 491 }
492 492
493 493 #[test]
494 494 fn collect_early_args_unwanted_value() {
495 495 assert!(collect_early_args(&["log", "--traceback="]).is_empty());
496 496 }
497 497
498 498 fn os_string_vec_from(v: &[&[u8]]) -> Vec<OsString> {
499 499 v.iter().map(|s| OsStr::from_bytes(s).to_owned()).collect()
500 500 }
501 501 }
@@ -1,171 +1,171 b''
1 1 // Copyright 2018 Yuya Nishihara <yuya@tcha.org>
2 2 //
3 3 // This software may be used and distributed according to the terms of the
4 4 // GNU General Public License version 2 or any later version.
5 5
6 6 //! Functions to run Mercurial command in cHg-aware command server.
7 7
8 8 use bytes::Bytes;
9 9 use futures::future::IntoFuture;
10 10 use futures::{Async, Future, Poll};
11 11 use std::io;
12 12 use std::mem;
13 13 use std::os::unix::io::AsRawFd;
14 14 use tokio_hglib::codec::ChannelMessage;
15 15 use tokio_hglib::protocol::MessageLoop;
16 16 use tokio_hglib::{Client, Connection};
17 17
18 use super::attachio::AttachIo;
19 use super::message::{self, CommandType};
20 use super::uihandler::SystemHandler;
18 use crate::attachio::AttachIo;
19 use crate::message::{self, CommandType};
20 use crate::uihandler::SystemHandler;
21 21
22 22 enum AsyncS<R, S> {
23 23 Ready(R),
24 24 NotReady(S),
25 25 PollAgain(S),
26 26 }
27 27
28 28 enum CommandState<C, H>
29 29 where
30 30 C: Connection,
31 31 H: SystemHandler,
32 32 {
33 33 Running(MessageLoop<C>, H),
34 34 SpawningPager(Client<C>, <H::SpawnPagerResult as IntoFuture>::Future),
35 35 AttachingPager(AttachIo<C, io::Stdin, H::PagerStdin, H::PagerStdin>, H),
36 36 WaitingSystem(Client<C>, <H::RunSystemResult as IntoFuture>::Future),
37 37 Finished,
38 38 }
39 39
40 40 type CommandPoll<C, H> = io::Result<AsyncS<(Client<C>, H, i32), CommandState<C, H>>>;
41 41
42 42 /// Future resolves to `(exit_code, client)`.
43 43 #[must_use = "futures do nothing unless polled"]
44 44 pub struct ChgRunCommand<C, H>
45 45 where
46 46 C: Connection,
47 47 H: SystemHandler,
48 48 {
49 49 state: CommandState<C, H>,
50 50 }
51 51
52 52 impl<C, H> ChgRunCommand<C, H>
53 53 where
54 54 C: Connection + AsRawFd,
55 55 H: SystemHandler,
56 56 {
57 57 pub fn with_client(client: Client<C>, handler: H, packed_args: Bytes) -> ChgRunCommand<C, H> {
58 58 let msg_loop = MessageLoop::start_with_args(client, b"runcommand", packed_args);
59 59 ChgRunCommand {
60 60 state: CommandState::Running(msg_loop, handler),
61 61 }
62 62 }
63 63 }
64 64
65 65 impl<C, H> Future for ChgRunCommand<C, H>
66 66 where
67 67 C: Connection + AsRawFd,
68 68 H: SystemHandler,
69 69 {
70 70 type Item = (Client<C>, H, i32);
71 71 type Error = io::Error;
72 72
73 73 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
74 74 loop {
75 75 let state = mem::replace(&mut self.state, CommandState::Finished);
76 76 match state.poll()? {
77 77 AsyncS::Ready((client, handler, code)) => {
78 78 return Ok(Async::Ready((client, handler, code)));
79 79 }
80 80 AsyncS::NotReady(newstate) => {
81 81 self.state = newstate;
82 82 return Ok(Async::NotReady);
83 83 }
84 84 AsyncS::PollAgain(newstate) => {
85 85 self.state = newstate;
86 86 }
87 87 }
88 88 }
89 89 }
90 90 }
91 91
92 92 impl<C, H> CommandState<C, H>
93 93 where
94 94 C: Connection + AsRawFd,
95 95 H: SystemHandler,
96 96 {
97 97 fn poll(self) -> CommandPoll<C, H> {
98 98 match self {
99 99 CommandState::Running(mut msg_loop, handler) => {
100 100 if let Async::Ready((client, msg)) = msg_loop.poll()? {
101 101 process_message(client, handler, msg)
102 102 } else {
103 103 Ok(AsyncS::NotReady(CommandState::Running(msg_loop, handler)))
104 104 }
105 105 }
106 106 CommandState::SpawningPager(client, mut fut) => {
107 107 if let Async::Ready((handler, pin)) = fut.poll()? {
108 108 let fut = AttachIo::with_client(client, io::stdin(), pin, None);
109 109 Ok(AsyncS::PollAgain(CommandState::AttachingPager(
110 110 fut, handler,
111 111 )))
112 112 } else {
113 113 Ok(AsyncS::NotReady(CommandState::SpawningPager(client, fut)))
114 114 }
115 115 }
116 116 CommandState::AttachingPager(mut fut, handler) => {
117 117 if let Async::Ready(client) = fut.poll()? {
118 118 let msg_loop = MessageLoop::start(client, b""); // terminator
119 119 Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler)))
120 120 } else {
121 121 Ok(AsyncS::NotReady(CommandState::AttachingPager(fut, handler)))
122 122 }
123 123 }
124 124 CommandState::WaitingSystem(client, mut fut) => {
125 125 if let Async::Ready((handler, code)) = fut.poll()? {
126 126 let data = message::pack_result_code(code);
127 127 let msg_loop = MessageLoop::resume_with_data(client, data);
128 128 Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler)))
129 129 } else {
130 130 Ok(AsyncS::NotReady(CommandState::WaitingSystem(client, fut)))
131 131 }
132 132 }
133 133 CommandState::Finished => panic!("poll ChgRunCommand after it's done"),
134 134 }
135 135 }
136 136 }
137 137
138 138 fn process_message<C, H>(client: Client<C>, handler: H, msg: ChannelMessage) -> CommandPoll<C, H>
139 139 where
140 140 C: Connection,
141 141 H: SystemHandler,
142 142 {
143 143 match msg {
144 144 ChannelMessage::Data(b'r', data) => {
145 145 let code = message::parse_result_code(data)?;
146 146 Ok(AsyncS::Ready((client, handler, code)))
147 147 }
148 148 ChannelMessage::Data(..) => {
149 149 // just ignores data sent to optional channel
150 150 let msg_loop = MessageLoop::resume(client);
151 151 Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler)))
152 152 }
153 153 ChannelMessage::InputRequest(..) | ChannelMessage::LineRequest(..) => Err(io::Error::new(
154 154 io::ErrorKind::InvalidData,
155 155 "unsupported request",
156 156 )),
157 157 ChannelMessage::SystemRequest(data) => {
158 158 let (cmd_type, cmd_spec) = message::parse_command_spec(data)?;
159 159 match cmd_type {
160 160 CommandType::Pager => {
161 161 let fut = handler.spawn_pager(cmd_spec).into_future();
162 162 Ok(AsyncS::PollAgain(CommandState::SpawningPager(client, fut)))
163 163 }
164 164 CommandType::System => {
165 165 let fut = handler.run_system(cmd_spec).into_future();
166 166 Ok(AsyncS::PollAgain(CommandState::WaitingSystem(client, fut)))
167 167 }
168 168 }
169 169 }
170 170 }
171 171 }
@@ -1,88 +1,88 b''
1 1 // Copyright 2018 Yuya Nishihara <yuya@tcha.org>
2 2 //
3 3 // This software may be used and distributed according to the terms of the
4 4 // GNU General Public License version 2 or any later version.
5 5
6 6 use futures::future::IntoFuture;
7 7 use futures::Future;
8 8 use std::io;
9 9 use std::os::unix::io::AsRawFd;
10 10 use std::os::unix::process::ExitStatusExt;
11 11 use std::process::{Command, Stdio};
12 12 use tokio;
13 13 use tokio_process::{ChildStdin, CommandExt};
14 14
15 use super::message::CommandSpec;
16 use super::procutil;
15 use crate::message::CommandSpec;
16 use crate::procutil;
17 17
18 18 /// Callback to process shell command requests received from server.
19 19 pub trait SystemHandler: Sized {
20 20 type PagerStdin: AsRawFd;
21 21 type SpawnPagerResult: IntoFuture<Item = (Self, Self::PagerStdin), Error = io::Error>;
22 22 type RunSystemResult: IntoFuture<Item = (Self, i32), Error = io::Error>;
23 23
24 24 /// Handles pager command request.
25 25 ///
26 26 /// Returns the pipe to be attached to the server if the pager is spawned.
27 27 fn spawn_pager(self, spec: CommandSpec) -> Self::SpawnPagerResult;
28 28
29 29 /// Handles system command request.
30 30 ///
31 31 /// Returns command exit code (positive) or signal number (negative).
32 32 fn run_system(self, spec: CommandSpec) -> Self::RunSystemResult;
33 33 }
34 34
35 35 /// Default cHg implementation to process requests received from server.
36 36 pub struct ChgUiHandler {}
37 37
38 38 impl ChgUiHandler {
39 39 pub fn new() -> ChgUiHandler {
40 40 ChgUiHandler {}
41 41 }
42 42 }
43 43
44 44 impl SystemHandler for ChgUiHandler {
45 45 type PagerStdin = ChildStdin;
46 46 type SpawnPagerResult = io::Result<(Self, Self::PagerStdin)>;
47 47 type RunSystemResult = Box<dyn Future<Item = (Self, i32), Error = io::Error> + Send>;
48 48
49 49 fn spawn_pager(self, spec: CommandSpec) -> Self::SpawnPagerResult {
50 50 let mut pager = new_shell_command(&spec)
51 51 .stdin(Stdio::piped())
52 52 .spawn_async()?;
53 53 let pin = pager.stdin().take().unwrap();
54 54 procutil::set_blocking_fd(pin.as_raw_fd())?;
55 55 // TODO: if pager exits, notify the server with SIGPIPE immediately.
56 56 // otherwise the server won't get SIGPIPE if it does not write
57 57 // anything. (issue5278)
58 58 // kill(peerpid, SIGPIPE);
59 59 tokio::spawn(pager.map(|_| ()).map_err(|_| ())); // just ignore errors
60 60 Ok((self, pin))
61 61 }
62 62
63 63 fn run_system(self, spec: CommandSpec) -> Self::RunSystemResult {
64 64 let fut = new_shell_command(&spec)
65 65 .spawn_async()
66 66 .into_future()
67 67 .flatten()
68 68 .map(|status| {
69 69 let code = status
70 70 .code()
71 71 .or_else(|| status.signal().map(|n| -n))
72 72 .expect("either exit code or signal should be set");
73 73 (self, code)
74 74 });
75 75 Box::new(fut)
76 76 }
77 77 }
78 78
79 79 fn new_shell_command(spec: &CommandSpec) -> Command {
80 80 let mut builder = Command::new("/bin/sh");
81 81 builder
82 82 .arg("-c")
83 83 .arg(&spec.command)
84 84 .current_dir(&spec.current_dir)
85 85 .env_clear()
86 86 .envs(spec.envs.iter().cloned());
87 87 builder
88 88 }
General Comments 0
You need to be logged in to leave comments. Login now