##// END OF EJS Templates
rust-chg: collect server flags from command arguments...
Yuya Nishihara -
r45172:00ac6065 default
parent child Browse files
Show More
@@ -1,277 +1,386 b''
1 1 // Copyright 2011, 2018 Yuya Nishihara <yuya@tcha.org>
2 2 //
3 3 // This software may be used and distributed according to the terms of the
4 4 // GNU General Public License version 2 or any later version.
5 5
6 6 //! Utility for locating command-server process.
7 7
8 8 use futures::future::{self, Either, Loop};
9 9 use std::env;
10 10 use std::ffi::{OsStr, OsString};
11 11 use std::fs::{self, DirBuilder};
12 12 use std::io;
13 13 use std::os::unix::ffi::{OsStrExt, OsStringExt};
14 14 use std::os::unix::fs::{DirBuilderExt, MetadataExt};
15 15 use std::path::{Path, PathBuf};
16 16 use std::process::{self, Command};
17 17 use std::time::Duration;
18 18 use tokio::prelude::*;
19 19 use tokio_hglib::UnixClient;
20 20 use tokio_process::{Child, CommandExt};
21 21 use tokio_timer;
22 22
23 23 use super::clientext::ChgClientExt;
24 24 use super::message::ServerSpec;
25 25 use super::procutil;
26 26
27 27 const REQUIRED_SERVER_CAPABILITIES: &[&str] = &["attachio", "chdir", "runcommand", "setenv"];
28 28
29 29 /// Helper to connect to and spawn a server process.
30 30 #[derive(Clone, Debug)]
31 31 pub struct Locator {
32 32 hg_command: OsString,
33 hg_early_args: Vec<OsString>,
33 34 current_dir: PathBuf,
34 35 env_vars: Vec<(OsString, OsString)>,
35 36 process_id: u32,
36 37 base_sock_path: PathBuf,
37 38 timeout: Duration,
38 39 }
39 40
40 41 impl Locator {
41 42 /// Creates locator capturing the current process environment.
42 43 ///
43 44 /// If no `$CHGSOCKNAME` is specified, the socket directory will be
44 45 /// created as necessary.
45 46 pub fn prepare_from_env() -> io::Result<Locator> {
46 47 Ok(Locator {
47 48 hg_command: default_hg_command(),
49 hg_early_args: Vec::new(),
48 50 current_dir: env::current_dir()?,
49 51 env_vars: env::vars_os().collect(),
50 52 process_id: process::id(),
51 53 base_sock_path: prepare_server_socket_path()?,
52 54 timeout: default_timeout(),
53 55 })
54 56 }
55 57
56 58 /// Temporary socket path for this client process.
57 59 fn temp_sock_path(&self) -> PathBuf {
58 60 let src = self.base_sock_path.as_os_str().as_bytes();
59 61 let mut buf = Vec::with_capacity(src.len() + 6); // "{src}.{pid}".len()
60 62 buf.extend_from_slice(src);
61 63 buf.extend_from_slice(format!(".{}", self.process_id).as_bytes());
62 64 OsString::from_vec(buf).into()
63 65 }
64 66
67 /// Specifies the arguments to be passed to the server at start.
68 pub fn set_early_args<I, P>(&mut self, args: I)
69 where
70 I: IntoIterator<Item = P>,
71 P: AsRef<OsStr>,
72 {
73 self.hg_early_args = args.into_iter().map(|a| a.as_ref().to_owned()).collect();
74 }
75
65 76 /// Connects to the server.
66 77 ///
67 78 /// The server process will be spawned if not running.
68 79 pub fn connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> {
69 80 self.try_connect()
70 81 }
71 82
72 83 /// Tries to connect to the existing server, or spawns new if not running.
73 84 fn try_connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> {
74 85 debug!("try connect to {}", self.base_sock_path.display());
75 86 UnixClient::connect(self.base_sock_path.clone())
76 87 .then(|res| match res {
77 88 Ok(client) => Either::A(future::ok((self, client))),
78 89 Err(_) => Either::B(self.spawn_connect()),
79 90 })
80 91 .and_then(|(loc, client)| {
81 92 check_server_capabilities(client.server_spec())?;
82 93 Ok((loc, client))
83 94 })
84 95 .and_then(|(loc, client)| {
85 96 client
86 97 .set_current_dir(&loc.current_dir)
87 98 .map(|client| (loc, client))
88 99 })
89 100 .and_then(|(loc, client)| {
90 101 client
91 102 .set_env_vars_os(loc.env_vars.iter().cloned())
92 103 .map(|client| (loc, client))
93 104 })
94 105 }
95 106
96 107 /// Spawns new server process and connects to it.
97 108 ///
98 109 /// The server will be spawned at the current working directory, then
99 110 /// chdir to "/", so that the server will load configs from the target
100 111 /// repository.
101 112 fn spawn_connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> {
102 113 let sock_path = self.temp_sock_path();
103 114 debug!("start cmdserver at {}", sock_path.display());
104 115 Command::new(&self.hg_command)
105 116 .arg("serve")
106 117 .arg("--cmdserver")
107 118 .arg("chgunix")
108 119 .arg("--address")
109 120 .arg(&sock_path)
110 121 .arg("--daemon-postexec")
111 122 .arg("chdir:/")
123 .args(&self.hg_early_args)
112 124 .current_dir(&self.current_dir)
113 125 .env_clear()
114 126 .envs(self.env_vars.iter().cloned())
115 127 .env("CHGINTERNALMARK", "")
116 128 .spawn_async()
117 129 .into_future()
118 130 .and_then(|server| self.connect_spawned(server, sock_path))
119 131 .and_then(|(loc, client, sock_path)| {
120 132 debug!(
121 133 "rename {} to {}",
122 134 sock_path.display(),
123 135 loc.base_sock_path.display()
124 136 );
125 137 fs::rename(&sock_path, &loc.base_sock_path)?;
126 138 Ok((loc, client))
127 139 })
128 140 }
129 141
130 142 /// Tries to connect to the just spawned server repeatedly until timeout
131 143 /// exceeded.
132 144 fn connect_spawned(
133 145 self,
134 146 server: Child,
135 147 sock_path: PathBuf,
136 148 ) -> impl Future<Item = (Self, UnixClient, PathBuf), Error = io::Error> {
137 149 debug!("try connect to {} repeatedly", sock_path.display());
138 150 let connect = future::loop_fn(sock_path, |sock_path| {
139 151 UnixClient::connect(sock_path.clone()).then(|res| {
140 152 match res {
141 153 Ok(client) => Either::A(future::ok(Loop::Break((client, sock_path)))),
142 154 Err(_) => {
143 155 // try again with slight delay
144 156 let fut = tokio_timer::sleep(Duration::from_millis(10))
145 157 .map(|()| Loop::Continue(sock_path))
146 158 .map_err(|err| io::Error::new(io::ErrorKind::Other, err));
147 159 Either::B(fut)
148 160 }
149 161 }
150 162 })
151 163 });
152 164
153 165 // waits for either connection established or server failed to start
154 166 connect
155 167 .select2(server)
156 168 .map_err(|res| res.split().0)
157 169 .timeout(self.timeout)
158 170 .map_err(|err| {
159 171 err.into_inner().unwrap_or_else(|| {
160 172 io::Error::new(
161 173 io::ErrorKind::TimedOut,
162 174 "timed out while connecting to server",
163 175 )
164 176 })
165 177 })
166 178 .and_then(|res| {
167 179 match res {
168 180 Either::A(((client, sock_path), server)) => {
169 181 server.forget(); // continue to run in background
170 182 Ok((self, client, sock_path))
171 183 }
172 184 Either::B((st, _)) => Err(io::Error::new(
173 185 io::ErrorKind::Other,
174 186 format!("server exited too early: {}", st),
175 187 )),
176 188 }
177 189 })
178 190 }
179 191 }
180 192
181 193 /// Determines the server socket to connect to.
182 194 ///
183 195 /// If no `$CHGSOCKNAME` is specified, the socket directory will be created
184 196 /// as necessary.
185 197 fn prepare_server_socket_path() -> io::Result<PathBuf> {
186 198 if let Some(s) = env::var_os("CHGSOCKNAME") {
187 199 Ok(PathBuf::from(s))
188 200 } else {
189 201 let mut path = default_server_socket_dir();
190 202 create_secure_dir(&path)?;
191 203 path.push("server");
192 204 Ok(path)
193 205 }
194 206 }
195 207
196 208 /// Determines the default server socket path as follows.
197 209 ///
198 210 /// 1. `$XDG_RUNTIME_DIR/chg`
199 211 /// 2. `$TMPDIR/chg$UID`
200 212 /// 3. `/tmp/chg$UID`
201 213 pub fn default_server_socket_dir() -> PathBuf {
202 214 // XDG_RUNTIME_DIR should be ignored if it has an insufficient permission.
203 215 // https://standards.freedesktop.org/basedir-spec/basedir-spec-latest.html
204 216 if let Some(Ok(s)) = env::var_os("XDG_RUNTIME_DIR").map(check_secure_dir) {
205 217 let mut path = PathBuf::from(s);
206 218 path.push("chg");
207 219 path
208 220 } else {
209 221 let mut path = env::temp_dir();
210 222 path.push(format!("chg{}", procutil::get_effective_uid()));
211 223 path
212 224 }
213 225 }
214 226
215 227 /// Determines the default hg command.
216 228 pub fn default_hg_command() -> OsString {
217 229 // TODO: maybe allow embedding the path at compile time (or load from hgrc)
218 230 env::var_os("CHGHG")
219 231 .or(env::var_os("HG"))
220 232 .unwrap_or(OsStr::new("hg").to_owned())
221 233 }
222 234
223 235 fn default_timeout() -> Duration {
224 236 let secs = env::var("CHGTIMEOUT")
225 237 .ok()
226 238 .and_then(|s| s.parse().ok())
227 239 .unwrap_or(60);
228 240 Duration::from_secs(secs)
229 241 }
230 242
231 243 /// Creates a directory which the other users cannot access to.
232 244 ///
233 245 /// If the directory already exists, tests its permission.
234 246 fn create_secure_dir<P>(path: P) -> io::Result<()>
235 247 where
236 248 P: AsRef<Path>,
237 249 {
238 250 DirBuilder::new()
239 251 .mode(0o700)
240 252 .create(path.as_ref())
241 253 .or_else(|err| {
242 254 if err.kind() == io::ErrorKind::AlreadyExists {
243 255 check_secure_dir(path).map(|_| ())
244 256 } else {
245 257 Err(err)
246 258 }
247 259 })
248 260 }
249 261
250 262 fn check_secure_dir<P>(path: P) -> io::Result<P>
251 263 where
252 264 P: AsRef<Path>,
253 265 {
254 266 let a = fs::symlink_metadata(path.as_ref())?;
255 267 if a.is_dir() && a.uid() == procutil::get_effective_uid() && (a.mode() & 0o777) == 0o700 {
256 268 Ok(path)
257 269 } else {
258 270 Err(io::Error::new(io::ErrorKind::Other, "insecure directory"))
259 271 }
260 272 }
261 273
262 274 fn check_server_capabilities(spec: &ServerSpec) -> io::Result<()> {
263 275 let unsupported: Vec<_> = REQUIRED_SERVER_CAPABILITIES
264 276 .iter()
265 277 .cloned()
266 278 .filter(|&s| !spec.capabilities.contains(s))
267 279 .collect();
268 280 if unsupported.is_empty() {
269 281 Ok(())
270 282 } else {
271 283 let msg = format!(
272 284 "insufficient server capabilities: {}",
273 285 unsupported.join(", ")
274 286 );
275 287 Err(io::Error::new(io::ErrorKind::Other, msg))
276 288 }
277 289 }
290
291 /// Collects arguments which need to be passed to the server at start.
292 pub fn collect_early_args<I, P>(args: I) -> Vec<OsString>
293 where
294 I: IntoIterator<Item = P>,
295 P: AsRef<OsStr>,
296 {
297 let mut args_iter = args.into_iter();
298 let mut early_args = Vec::new();
299 while let Some(arg) = args_iter.next() {
300 let argb = arg.as_ref().as_bytes();
301 if argb == b"--" {
302 break;
303 } else if argb.starts_with(b"--") {
304 let mut split = argb[2..].splitn(2, |&c| c == b'=');
305 match split.next().unwrap() {
306 b"traceback" => {
307 if split.next().is_none() {
308 early_args.push(arg.as_ref().to_owned());
309 }
310 }
311 b"config" | b"cwd" | b"repo" | b"repository" => {
312 if split.next().is_some() {
313 // --<flag>=<val>
314 early_args.push(arg.as_ref().to_owned());
315 } else {
316 // --<flag> <val>
317 args_iter.next().map(|val| {
318 early_args.push(arg.as_ref().to_owned());
319 early_args.push(val.as_ref().to_owned());
320 });
321 }
322 }
323 _ => {}
324 }
325 } else if argb.starts_with(b"-R") {
326 if argb.len() > 2 {
327 // -R<val>
328 early_args.push(arg.as_ref().to_owned());
329 } else {
330 // -R <val>
331 args_iter.next().map(|val| {
332 early_args.push(arg.as_ref().to_owned());
333 early_args.push(val.as_ref().to_owned());
334 });
335 }
336 }
337 }
338
339 early_args
340 }
341
342 #[cfg(test)]
343 mod tests {
344 use super::*;
345
346 #[test]
347 fn collect_early_args_some() {
348 assert!(collect_early_args(&[] as &[&OsStr]).is_empty());
349 assert!(collect_early_args(&["log"]).is_empty());
350 assert_eq!(
351 collect_early_args(&["log", "-Ra", "foo"]),
352 os_string_vec_from(&[b"-Ra"])
353 );
354 assert_eq!(
355 collect_early_args(&["log", "-R", "repo", "", "--traceback", "a"]),
356 os_string_vec_from(&[b"-R", b"repo", b"--traceback"])
357 );
358 assert_eq!(
359 collect_early_args(&["log", "--config", "diff.git=1", "-q"]),
360 os_string_vec_from(&[b"--config", b"diff.git=1"])
361 );
362 assert_eq!(
363 collect_early_args(&["--cwd=..", "--repository", "r", "log"]),
364 os_string_vec_from(&[b"--cwd=..", b"--repository", b"r"])
365 );
366 assert_eq!(
367 collect_early_args(&["log", "--repo=r", "--repos", "a"]),
368 os_string_vec_from(&[b"--repo=r"])
369 );
370 }
371
372 #[test]
373 fn collect_early_args_orphaned() {
374 assert!(collect_early_args(&["log", "-R"]).is_empty());
375 assert!(collect_early_args(&["log", "--config"]).is_empty());
376 }
377
378 #[test]
379 fn collect_early_args_unwanted_value() {
380 assert!(collect_early_args(&["log", "--traceback="]).is_empty());
381 }
382
383 fn os_string_vec_from(v: &[&[u8]]) -> Vec<OsString> {
384 v.iter().map(|s| OsStr::from_bytes(s).to_owned()).collect()
385 }
386 }
@@ -1,100 +1,101 b''
1 1 // Copyright 2018 Yuya Nishihara <yuya@tcha.org>
2 2 //
3 3 // This software may be used and distributed according to the terms of the
4 4 // GNU General Public License version 2 or any later version.
5 5
6 6 extern crate chg;
7 7 extern crate futures;
8 8 extern crate log;
9 9 extern crate tokio;
10 10 extern crate tokio_hglib;
11 11
12 use chg::locator::Locator;
12 use chg::locator::{self, Locator};
13 13 use chg::procutil;
14 14 use chg::{ChgClientExt, ChgUiHandler};
15 15 use futures::sync::oneshot;
16 16 use std::env;
17 17 use std::io;
18 18 use std::process;
19 19 use std::time::Instant;
20 20 use tokio::prelude::*;
21 21
22 22 struct DebugLogger {
23 23 start: Instant,
24 24 }
25 25
26 26 impl DebugLogger {
27 27 pub fn new() -> DebugLogger {
28 28 DebugLogger {
29 29 start: Instant::now(),
30 30 }
31 31 }
32 32 }
33 33
34 34 impl log::Log for DebugLogger {
35 35 fn enabled(&self, metadata: &log::Metadata) -> bool {
36 36 metadata.target().starts_with("chg::")
37 37 }
38 38
39 39 fn log(&self, record: &log::Record) {
40 40 if self.enabled(record.metadata()) {
41 41 // just make the output looks similar to chg of C
42 42 let l = format!("{}", record.level()).to_lowercase();
43 43 let t = self.start.elapsed();
44 44 writeln!(
45 45 io::stderr(),
46 46 "chg: {}: {}.{:06} {}",
47 47 l,
48 48 t.as_secs(),
49 49 t.subsec_micros(),
50 50 record.args()
51 51 )
52 52 .unwrap_or(());
53 53 }
54 54 }
55 55
56 56 fn flush(&self) {}
57 57 }
58 58
59 59 fn main() {
60 60 if env::var_os("CHGDEBUG").is_some() {
61 61 log::set_boxed_logger(Box::new(DebugLogger::new()))
62 62 .expect("any logger should not be installed yet");
63 63 log::set_max_level(log::LevelFilter::Debug);
64 64 }
65 65
66 66 // TODO: add loop detection by $CHGINTERNALMARK
67 67
68 68 let code = run().unwrap_or_else(|err| {
69 69 writeln!(io::stderr(), "chg: abort: {}", err).unwrap_or(());
70 70 255
71 71 });
72 72 process::exit(code);
73 73 }
74 74
75 75 fn run() -> io::Result<i32> {
76 let loc = Locator::prepare_from_env()?;
76 let mut loc = Locator::prepare_from_env()?;
77 loc.set_early_args(locator::collect_early_args(env::args_os().skip(1)));
77 78 let handler = ChgUiHandler::new();
78 79 let (result_tx, result_rx) = oneshot::channel();
79 80 let fut = loc
80 81 .connect()
81 82 .and_then(|(_, client)| client.attach_io(io::stdin(), io::stdout(), io::stderr()))
82 83 .and_then(|client| {
83 84 let pid = client.server_spec().process_id.unwrap();
84 85 let pgid = client.server_spec().process_group_id;
85 86 procutil::setup_signal_handler_once(pid, pgid)?;
86 87 Ok(client)
87 88 })
88 89 .and_then(|client| client.run_command_chg(handler, env::args_os().skip(1)))
89 90 .map(|(_client, _handler, code)| {
90 91 procutil::restore_signal_handler_once()?;
91 92 Ok(code)
92 93 })
93 94 .or_else(|err| Ok(Err(err))) // pass back error to caller
94 95 .map(|res| result_tx.send(res).unwrap());
95 96 tokio::run(fut);
96 97 result_rx.wait().unwrap_or(Err(io::Error::new(
97 98 io::ErrorKind::Other,
98 99 "no exit code set",
99 100 )))
100 101 }
General Comments 0
You need to be logged in to leave comments. Login now