##// END OF EJS Templates
rust-chg: add config validation and process returned instructions...
Yuya Nishihara -
r45173:9ce613d6 default
parent child Browse files
Show More
@@ -1,386 +1,483
1 1 // Copyright 2011, 2018 Yuya Nishihara <yuya@tcha.org>
2 2 //
3 3 // This software may be used and distributed according to the terms of the
4 4 // GNU General Public License version 2 or any later version.
5 5
6 6 //! Utility for locating command-server process.
7 7
8 8 use futures::future::{self, Either, Loop};
9 9 use std::env;
10 10 use std::ffi::{OsStr, OsString};
11 11 use std::fs::{self, DirBuilder};
12 12 use std::io;
13 13 use std::os::unix::ffi::{OsStrExt, OsStringExt};
14 14 use std::os::unix::fs::{DirBuilderExt, MetadataExt};
15 15 use std::path::{Path, PathBuf};
16 16 use std::process::{self, Command};
17 17 use std::time::Duration;
18 18 use tokio::prelude::*;
19 19 use tokio_hglib::UnixClient;
20 20 use tokio_process::{Child, CommandExt};
21 21 use tokio_timer;
22 22
23 23 use super::clientext::ChgClientExt;
24 use super::message::ServerSpec;
24 use super::message::{Instruction, ServerSpec};
25 25 use super::procutil;
26 26
27 const REQUIRED_SERVER_CAPABILITIES: &[&str] = &["attachio", "chdir", "runcommand", "setenv"];
27 const REQUIRED_SERVER_CAPABILITIES: &[&str] =
28 &["attachio", "chdir", "runcommand", "setenv", "validate"];
28 29
29 30 /// Helper to connect to and spawn a server process.
30 31 #[derive(Clone, Debug)]
31 32 pub struct Locator {
32 33 hg_command: OsString,
33 34 hg_early_args: Vec<OsString>,
34 35 current_dir: PathBuf,
35 36 env_vars: Vec<(OsString, OsString)>,
36 37 process_id: u32,
37 38 base_sock_path: PathBuf,
39 redirect_sock_path: Option<PathBuf>,
38 40 timeout: Duration,
39 41 }
40 42
41 43 impl Locator {
42 44 /// Creates locator capturing the current process environment.
43 45 ///
44 46 /// If no `$CHGSOCKNAME` is specified, the socket directory will be
45 47 /// created as necessary.
46 48 pub fn prepare_from_env() -> io::Result<Locator> {
47 49 Ok(Locator {
48 50 hg_command: default_hg_command(),
49 51 hg_early_args: Vec::new(),
50 52 current_dir: env::current_dir()?,
51 53 env_vars: env::vars_os().collect(),
52 54 process_id: process::id(),
53 55 base_sock_path: prepare_server_socket_path()?,
56 redirect_sock_path: None,
54 57 timeout: default_timeout(),
55 58 })
56 59 }
57 60
58 61 /// Temporary socket path for this client process.
59 62 fn temp_sock_path(&self) -> PathBuf {
60 63 let src = self.base_sock_path.as_os_str().as_bytes();
61 64 let mut buf = Vec::with_capacity(src.len() + 6); // "{src}.{pid}".len()
62 65 buf.extend_from_slice(src);
63 66 buf.extend_from_slice(format!(".{}", self.process_id).as_bytes());
64 67 OsString::from_vec(buf).into()
65 68 }
66 69
67 70 /// Specifies the arguments to be passed to the server at start.
68 71 pub fn set_early_args<I, P>(&mut self, args: I)
69 72 where
70 73 I: IntoIterator<Item = P>,
71 74 P: AsRef<OsStr>,
72 75 {
73 76 self.hg_early_args = args.into_iter().map(|a| a.as_ref().to_owned()).collect();
74 77 }
75 78
76 79 /// Connects to the server.
77 80 ///
78 81 /// The server process will be spawned if not running.
79 82 pub fn connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> {
80 self.try_connect()
83 future::loop_fn((self, 0), |(loc, cnt)| {
84 if cnt < 10 {
85 let fut = loc
86 .try_connect()
87 .and_then(|(loc, client)| {
88 client
89 .validate(&loc.hg_early_args)
90 .map(|(client, instructions)| (loc, client, instructions))
91 })
92 .and_then(move |(loc, client, instructions)| {
93 loc.run_instructions(client, instructions, cnt)
94 });
95 Either::A(fut)
96 } else {
97 let msg = format!(
98 concat!(
99 "too many redirections.\n",
100 "Please make sure {:?} is not a wrapper which ",
101 "changes sensitive environment variables ",
102 "before executing hg. If you have to use a ",
103 "wrapper, wrap chg instead of hg.",
104 ),
105 loc.hg_command
106 );
107 Either::B(future::err(io::Error::new(io::ErrorKind::Other, msg)))
108 }
109 })
110 }
111
112 /// Runs instructions received from the server.
113 fn run_instructions(
114 mut self,
115 client: UnixClient,
116 instructions: Vec<Instruction>,
117 cnt: usize,
118 ) -> io::Result<Loop<(Self, UnixClient), (Self, usize)>> {
119 let mut reconnect = false;
120 for inst in instructions {
121 debug!("instruction: {:?}", inst);
122 match inst {
123 Instruction::Exit(_) => {
124 // Just returns the current connection to run the
125 // unparsable command and report the error
126 return Ok(Loop::Break((self, client)));
127 }
128 Instruction::Reconnect => {
129 reconnect = true;
130 }
131 Instruction::Redirect(path) => {
132 if path.parent() != self.base_sock_path.parent() {
133 let msg = format!(
134 "insecure redirect instruction from server: {}",
135 path.display()
136 );
137 return Err(io::Error::new(io::ErrorKind::InvalidData, msg));
138 }
139 self.redirect_sock_path = Some(path);
140 reconnect = true;
141 }
142 Instruction::Unlink(path) => {
143 if path.parent() != self.base_sock_path.parent() {
144 let msg = format!(
145 "insecure unlink instruction from server: {}",
146 path.display()
147 );
148 return Err(io::Error::new(io::ErrorKind::InvalidData, msg));
149 }
150 fs::remove_file(path).unwrap_or(()); // may race
151 }
152 }
153 }
154
155 if reconnect {
156 Ok(Loop::Continue((self, cnt + 1)))
157 } else {
158 Ok(Loop::Break((self, client)))
159 }
81 160 }
82 161
83 162 /// Tries to connect to the existing server, or spawns new if not running.
84 163 fn try_connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> {
85 debug!("try connect to {}", self.base_sock_path.display());
86 UnixClient::connect(self.base_sock_path.clone())
87 .then(|res| match res {
164 let sock_path = self
165 .redirect_sock_path
166 .as_ref()
167 .unwrap_or(&self.base_sock_path)
168 .clone();
169 debug!("try connect to {}", sock_path.display());
170 UnixClient::connect(sock_path)
171 .then(|res| {
172 match res {
88 173 Ok(client) => Either::A(future::ok((self, client))),
89 Err(_) => Either::B(self.spawn_connect()),
174 Err(_) => {
175 // Prevent us from being re-connected to the outdated
176 // master server: We were told by the server to redirect
177 // to redirect_sock_path, which didn't work. We do not
178 // want to connect to the same master server again
179 // because it would probably tell us the same thing.
180 if self.redirect_sock_path.is_some() {
181 fs::remove_file(&self.base_sock_path).unwrap_or(());
182 // may race
183 }
184 Either::B(self.spawn_connect())
185 }
186 }
90 187 })
91 188 .and_then(|(loc, client)| {
92 189 check_server_capabilities(client.server_spec())?;
93 190 Ok((loc, client))
94 191 })
95 192 .and_then(|(loc, client)| {
96 193 client
97 194 .set_current_dir(&loc.current_dir)
98 195 .map(|client| (loc, client))
99 196 })
100 197 .and_then(|(loc, client)| {
101 198 client
102 199 .set_env_vars_os(loc.env_vars.iter().cloned())
103 200 .map(|client| (loc, client))
104 201 })
105 202 }
106 203
107 204 /// Spawns new server process and connects to it.
108 205 ///
109 206 /// The server will be spawned at the current working directory, then
110 207 /// chdir to "/", so that the server will load configs from the target
111 208 /// repository.
112 209 fn spawn_connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> {
113 210 let sock_path = self.temp_sock_path();
114 211 debug!("start cmdserver at {}", sock_path.display());
115 212 Command::new(&self.hg_command)
116 213 .arg("serve")
117 214 .arg("--cmdserver")
118 215 .arg("chgunix")
119 216 .arg("--address")
120 217 .arg(&sock_path)
121 218 .arg("--daemon-postexec")
122 219 .arg("chdir:/")
123 220 .args(&self.hg_early_args)
124 221 .current_dir(&self.current_dir)
125 222 .env_clear()
126 223 .envs(self.env_vars.iter().cloned())
127 224 .env("CHGINTERNALMARK", "")
128 225 .spawn_async()
129 226 .into_future()
130 227 .and_then(|server| self.connect_spawned(server, sock_path))
131 228 .and_then(|(loc, client, sock_path)| {
132 229 debug!(
133 230 "rename {} to {}",
134 231 sock_path.display(),
135 232 loc.base_sock_path.display()
136 233 );
137 234 fs::rename(&sock_path, &loc.base_sock_path)?;
138 235 Ok((loc, client))
139 236 })
140 237 }
141 238
142 239 /// Tries to connect to the just spawned server repeatedly until timeout
143 240 /// exceeded.
144 241 fn connect_spawned(
145 242 self,
146 243 server: Child,
147 244 sock_path: PathBuf,
148 245 ) -> impl Future<Item = (Self, UnixClient, PathBuf), Error = io::Error> {
149 246 debug!("try connect to {} repeatedly", sock_path.display());
150 247 let connect = future::loop_fn(sock_path, |sock_path| {
151 248 UnixClient::connect(sock_path.clone()).then(|res| {
152 249 match res {
153 250 Ok(client) => Either::A(future::ok(Loop::Break((client, sock_path)))),
154 251 Err(_) => {
155 252 // try again with slight delay
156 253 let fut = tokio_timer::sleep(Duration::from_millis(10))
157 254 .map(|()| Loop::Continue(sock_path))
158 255 .map_err(|err| io::Error::new(io::ErrorKind::Other, err));
159 256 Either::B(fut)
160 257 }
161 258 }
162 259 })
163 260 });
164 261
165 262 // waits for either connection established or server failed to start
166 263 connect
167 264 .select2(server)
168 265 .map_err(|res| res.split().0)
169 266 .timeout(self.timeout)
170 267 .map_err(|err| {
171 268 err.into_inner().unwrap_or_else(|| {
172 269 io::Error::new(
173 270 io::ErrorKind::TimedOut,
174 271 "timed out while connecting to server",
175 272 )
176 273 })
177 274 })
178 275 .and_then(|res| {
179 276 match res {
180 277 Either::A(((client, sock_path), server)) => {
181 278 server.forget(); // continue to run in background
182 279 Ok((self, client, sock_path))
183 280 }
184 281 Either::B((st, _)) => Err(io::Error::new(
185 282 io::ErrorKind::Other,
186 283 format!("server exited too early: {}", st),
187 284 )),
188 285 }
189 286 })
190 287 }
191 288 }
192 289
193 290 /// Determines the server socket to connect to.
194 291 ///
195 292 /// If no `$CHGSOCKNAME` is specified, the socket directory will be created
196 293 /// as necessary.
197 294 fn prepare_server_socket_path() -> io::Result<PathBuf> {
198 295 if let Some(s) = env::var_os("CHGSOCKNAME") {
199 296 Ok(PathBuf::from(s))
200 297 } else {
201 298 let mut path = default_server_socket_dir();
202 299 create_secure_dir(&path)?;
203 300 path.push("server");
204 301 Ok(path)
205 302 }
206 303 }
207 304
208 305 /// Determines the default server socket path as follows.
209 306 ///
210 307 /// 1. `$XDG_RUNTIME_DIR/chg`
211 308 /// 2. `$TMPDIR/chg$UID`
212 309 /// 3. `/tmp/chg$UID`
213 310 pub fn default_server_socket_dir() -> PathBuf {
214 311 // XDG_RUNTIME_DIR should be ignored if it has an insufficient permission.
215 312 // https://standards.freedesktop.org/basedir-spec/basedir-spec-latest.html
216 313 if let Some(Ok(s)) = env::var_os("XDG_RUNTIME_DIR").map(check_secure_dir) {
217 314 let mut path = PathBuf::from(s);
218 315 path.push("chg");
219 316 path
220 317 } else {
221 318 let mut path = env::temp_dir();
222 319 path.push(format!("chg{}", procutil::get_effective_uid()));
223 320 path
224 321 }
225 322 }
226 323
227 324 /// Determines the default hg command.
228 325 pub fn default_hg_command() -> OsString {
229 326 // TODO: maybe allow embedding the path at compile time (or load from hgrc)
230 327 env::var_os("CHGHG")
231 328 .or(env::var_os("HG"))
232 329 .unwrap_or(OsStr::new("hg").to_owned())
233 330 }
234 331
235 332 fn default_timeout() -> Duration {
236 333 let secs = env::var("CHGTIMEOUT")
237 334 .ok()
238 335 .and_then(|s| s.parse().ok())
239 336 .unwrap_or(60);
240 337 Duration::from_secs(secs)
241 338 }
242 339
243 340 /// Creates a directory which the other users cannot access to.
244 341 ///
245 342 /// If the directory already exists, tests its permission.
246 343 fn create_secure_dir<P>(path: P) -> io::Result<()>
247 344 where
248 345 P: AsRef<Path>,
249 346 {
250 347 DirBuilder::new()
251 348 .mode(0o700)
252 349 .create(path.as_ref())
253 350 .or_else(|err| {
254 351 if err.kind() == io::ErrorKind::AlreadyExists {
255 352 check_secure_dir(path).map(|_| ())
256 353 } else {
257 354 Err(err)
258 355 }
259 356 })
260 357 }
261 358
262 359 fn check_secure_dir<P>(path: P) -> io::Result<P>
263 360 where
264 361 P: AsRef<Path>,
265 362 {
266 363 let a = fs::symlink_metadata(path.as_ref())?;
267 364 if a.is_dir() && a.uid() == procutil::get_effective_uid() && (a.mode() & 0o777) == 0o700 {
268 365 Ok(path)
269 366 } else {
270 367 Err(io::Error::new(io::ErrorKind::Other, "insecure directory"))
271 368 }
272 369 }
273 370
274 371 fn check_server_capabilities(spec: &ServerSpec) -> io::Result<()> {
275 372 let unsupported: Vec<_> = REQUIRED_SERVER_CAPABILITIES
276 373 .iter()
277 374 .cloned()
278 375 .filter(|&s| !spec.capabilities.contains(s))
279 376 .collect();
280 377 if unsupported.is_empty() {
281 378 Ok(())
282 379 } else {
283 380 let msg = format!(
284 381 "insufficient server capabilities: {}",
285 382 unsupported.join(", ")
286 383 );
287 384 Err(io::Error::new(io::ErrorKind::Other, msg))
288 385 }
289 386 }
290 387
291 388 /// Collects arguments which need to be passed to the server at start.
292 389 pub fn collect_early_args<I, P>(args: I) -> Vec<OsString>
293 390 where
294 391 I: IntoIterator<Item = P>,
295 392 P: AsRef<OsStr>,
296 393 {
297 394 let mut args_iter = args.into_iter();
298 395 let mut early_args = Vec::new();
299 396 while let Some(arg) = args_iter.next() {
300 397 let argb = arg.as_ref().as_bytes();
301 398 if argb == b"--" {
302 399 break;
303 400 } else if argb.starts_with(b"--") {
304 401 let mut split = argb[2..].splitn(2, |&c| c == b'=');
305 402 match split.next().unwrap() {
306 403 b"traceback" => {
307 404 if split.next().is_none() {
308 405 early_args.push(arg.as_ref().to_owned());
309 406 }
310 407 }
311 408 b"config" | b"cwd" | b"repo" | b"repository" => {
312 409 if split.next().is_some() {
313 410 // --<flag>=<val>
314 411 early_args.push(arg.as_ref().to_owned());
315 412 } else {
316 413 // --<flag> <val>
317 414 args_iter.next().map(|val| {
318 415 early_args.push(arg.as_ref().to_owned());
319 416 early_args.push(val.as_ref().to_owned());
320 417 });
321 418 }
322 419 }
323 420 _ => {}
324 421 }
325 422 } else if argb.starts_with(b"-R") {
326 423 if argb.len() > 2 {
327 424 // -R<val>
328 425 early_args.push(arg.as_ref().to_owned());
329 426 } else {
330 427 // -R <val>
331 428 args_iter.next().map(|val| {
332 429 early_args.push(arg.as_ref().to_owned());
333 430 early_args.push(val.as_ref().to_owned());
334 431 });
335 432 }
336 433 }
337 434 }
338 435
339 436 early_args
340 437 }
341 438
342 439 #[cfg(test)]
343 440 mod tests {
344 441 use super::*;
345 442
346 443 #[test]
347 444 fn collect_early_args_some() {
348 445 assert!(collect_early_args(&[] as &[&OsStr]).is_empty());
349 446 assert!(collect_early_args(&["log"]).is_empty());
350 447 assert_eq!(
351 448 collect_early_args(&["log", "-Ra", "foo"]),
352 449 os_string_vec_from(&[b"-Ra"])
353 450 );
354 451 assert_eq!(
355 452 collect_early_args(&["log", "-R", "repo", "", "--traceback", "a"]),
356 453 os_string_vec_from(&[b"-R", b"repo", b"--traceback"])
357 454 );
358 455 assert_eq!(
359 456 collect_early_args(&["log", "--config", "diff.git=1", "-q"]),
360 457 os_string_vec_from(&[b"--config", b"diff.git=1"])
361 458 );
362 459 assert_eq!(
363 460 collect_early_args(&["--cwd=..", "--repository", "r", "log"]),
364 461 os_string_vec_from(&[b"--cwd=..", b"--repository", b"r"])
365 462 );
366 463 assert_eq!(
367 464 collect_early_args(&["log", "--repo=r", "--repos", "a"]),
368 465 os_string_vec_from(&[b"--repo=r"])
369 466 );
370 467 }
371 468
372 469 #[test]
373 470 fn collect_early_args_orphaned() {
374 471 assert!(collect_early_args(&["log", "-R"]).is_empty());
375 472 assert!(collect_early_args(&["log", "--config"]).is_empty());
376 473 }
377 474
378 475 #[test]
379 476 fn collect_early_args_unwanted_value() {
380 477 assert!(collect_early_args(&["log", "--traceback="]).is_empty());
381 478 }
382 479
383 480 fn os_string_vec_from(v: &[&[u8]]) -> Vec<OsString> {
384 481 v.iter().map(|s| OsStr::from_bytes(s).to_owned()).collect()
385 482 }
386 483 }
General Comments 0
You need to be logged in to leave comments. Login now