##// END OF EJS Templates
rust-chg: spawn server process if not running...
Yuya Nishihara -
r45161:bb936e25 default
parent child Browse files
Show More
@@ -1,23 +1,26 b''
1 // Copyright 2018 Yuya Nishihara <yuya@tcha.org>
1 // Copyright 2018 Yuya Nishihara <yuya@tcha.org>
2 //
2 //
3 // This software may be used and distributed according to the terms of the
3 // This software may be used and distributed according to the terms of the
4 // GNU General Public License version 2 or any later version.
4 // GNU General Public License version 2 or any later version.
5
5
6 extern crate bytes;
6 extern crate bytes;
7 #[macro_use]
7 #[macro_use]
8 extern crate futures;
8 extern crate futures;
9 extern crate libc;
9 extern crate libc;
10 #[macro_use]
11 extern crate log;
10 extern crate tokio;
12 extern crate tokio;
11 extern crate tokio_hglib;
13 extern crate tokio_hglib;
12 extern crate tokio_process;
14 extern crate tokio_process;
15 extern crate tokio_timer;
13
16
14 mod attachio;
17 mod attachio;
15 mod clientext;
18 mod clientext;
16 pub mod locator;
19 pub mod locator;
17 pub mod message;
20 pub mod message;
18 pub mod procutil;
21 pub mod procutil;
19 mod runcommand;
22 mod runcommand;
20 mod uihandler;
23 mod uihandler;
21
24
22 pub use clientext::ChgClientExt;
25 pub use clientext::ChgClientExt;
23 pub use uihandler::{ChgUiHandler, SystemHandler};
26 pub use uihandler::{ChgUiHandler, SystemHandler};
@@ -1,136 +1,241 b''
1 // Copyright 2011, 2018 Yuya Nishihara <yuya@tcha.org>
1 // Copyright 2011, 2018 Yuya Nishihara <yuya@tcha.org>
2 //
2 //
3 // This software may be used and distributed according to the terms of the
3 // This software may be used and distributed according to the terms of the
4 // GNU General Public License version 2 or any later version.
4 // GNU General Public License version 2 or any later version.
5
5
6 //! Utility for locating command-server process.
6 //! Utility for locating command-server process.
7
7
8 use futures::future::{self, Either, Loop};
8 use std::env;
9 use std::env;
9 use std::ffi::{OsStr, OsString};
10 use std::ffi::{OsStr, OsString};
10 use std::fs::{self, DirBuilder};
11 use std::fs::{self, DirBuilder};
11 use std::io;
12 use std::io;
12 use std::os::unix::ffi::{OsStrExt, OsStringExt};
13 use std::os::unix::ffi::{OsStrExt, OsStringExt};
13 use std::os::unix::fs::{DirBuilderExt, MetadataExt};
14 use std::os::unix::fs::{DirBuilderExt, MetadataExt};
14 use std::path::{Path, PathBuf};
15 use std::path::{Path, PathBuf};
15 use std::process;
16 use std::process::{self, Command};
16 use std::time::Duration;
17 use std::time::Duration;
18 use tokio::prelude::*;
19 use tokio_hglib::UnixClient;
20 use tokio_process::{Child, CommandExt};
21 use tokio_timer;
17
22
18 use super::procutil;
23 use super::procutil;
19
24
20 /// Helper to connect to and spawn a server process.
25 /// Helper to connect to and spawn a server process.
21 #[derive(Clone, Debug)]
26 #[derive(Clone, Debug)]
22 pub struct Locator {
27 pub struct Locator {
23 hg_command: OsString,
28 hg_command: OsString,
24 current_dir: PathBuf,
29 current_dir: PathBuf,
25 env_vars: Vec<(OsString, OsString)>,
30 env_vars: Vec<(OsString, OsString)>,
26 process_id: u32,
31 process_id: u32,
27 base_sock_path: PathBuf,
32 base_sock_path: PathBuf,
28 timeout: Duration,
33 timeout: Duration,
29 }
34 }
30
35
31 impl Locator {
36 impl Locator {
32 /// Creates locator capturing the current process environment.
37 /// Creates locator capturing the current process environment.
33 ///
38 ///
34 /// If no `$CHGSOCKNAME` is specified, the socket directory will be
39 /// If no `$CHGSOCKNAME` is specified, the socket directory will be
35 /// created as necessary.
40 /// created as necessary.
36 pub fn prepare_from_env() -> io::Result<Locator> {
41 pub fn prepare_from_env() -> io::Result<Locator> {
37 Ok(Locator {
42 Ok(Locator {
38 hg_command: default_hg_command(),
43 hg_command: default_hg_command(),
39 current_dir: env::current_dir()?,
44 current_dir: env::current_dir()?,
40 env_vars: env::vars_os().collect(),
45 env_vars: env::vars_os().collect(),
41 process_id: process::id(),
46 process_id: process::id(),
42 base_sock_path: prepare_server_socket_path()?,
47 base_sock_path: prepare_server_socket_path()?,
43 timeout: default_timeout(),
48 timeout: default_timeout(),
44 })
49 })
45 }
50 }
46
51
47 /// Temporary socket path for this client process.
52 /// Temporary socket path for this client process.
48 fn temp_sock_path(&self) -> PathBuf {
53 fn temp_sock_path(&self) -> PathBuf {
49 let src = self.base_sock_path.as_os_str().as_bytes();
54 let src = self.base_sock_path.as_os_str().as_bytes();
50 let mut buf = Vec::with_capacity(src.len() + 6); // "{src}.{pid}".len()
55 let mut buf = Vec::with_capacity(src.len() + 6); // "{src}.{pid}".len()
51 buf.extend_from_slice(src);
56 buf.extend_from_slice(src);
52 buf.extend_from_slice(format!(".{}", self.process_id).as_bytes());
57 buf.extend_from_slice(format!(".{}", self.process_id).as_bytes());
53 OsString::from_vec(buf).into()
58 OsString::from_vec(buf).into()
54 }
59 }
60
61 /// Connects to the server.
62 ///
63 /// The server process will be spawned if not running.
64 pub fn connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> {
65 self.try_connect()
66 }
67
68 /// Tries to connect to the existing server, or spawns new if not running.
69 fn try_connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> {
70 debug!("try connect to {}", self.base_sock_path.display());
71 UnixClient::connect(self.base_sock_path.clone()).then(|res| match res {
72 Ok(client) => Either::A(future::ok((self, client))),
73 Err(_) => Either::B(self.spawn_connect()),
74 })
75 }
76
77 /// Spawns new server process and connects to it.
78 ///
79 /// The server will be spawned at the current working directory, then
80 /// chdir to "/", so that the server will load configs from the target
81 /// repository.
82 fn spawn_connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> {
83 let sock_path = self.temp_sock_path();
84 debug!("start cmdserver at {}", sock_path.display());
85 Command::new(&self.hg_command)
86 .arg("serve")
87 .arg("--cmdserver")
88 .arg("chgunix")
89 .arg("--address")
90 .arg(&sock_path)
91 .arg("--daemon-postexec")
92 .arg("chdir:/")
93 .current_dir(&self.current_dir)
94 .env_clear()
95 .envs(self.env_vars.iter().cloned())
96 .env("CHGINTERNALMARK", "")
97 .spawn_async()
98 .into_future()
99 .and_then(|server| self.connect_spawned(server, sock_path))
100 .and_then(|(loc, client, sock_path)| {
101 debug!(
102 "rename {} to {}",
103 sock_path.display(),
104 loc.base_sock_path.display()
105 );
106 fs::rename(&sock_path, &loc.base_sock_path)?;
107 Ok((loc, client))
108 })
109 }
110
111 /// Tries to connect to the just spawned server repeatedly until timeout
112 /// exceeded.
113 fn connect_spawned(
114 self,
115 server: Child,
116 sock_path: PathBuf,
117 ) -> impl Future<Item = (Self, UnixClient, PathBuf), Error = io::Error> {
118 debug!("try connect to {} repeatedly", sock_path.display());
119 let connect = future::loop_fn(sock_path, |sock_path| {
120 UnixClient::connect(sock_path.clone()).then(|res| {
121 match res {
122 Ok(client) => Either::A(future::ok(Loop::Break((client, sock_path)))),
123 Err(_) => {
124 // try again with slight delay
125 let fut = tokio_timer::sleep(Duration::from_millis(10))
126 .map(|()| Loop::Continue(sock_path))
127 .map_err(|err| io::Error::new(io::ErrorKind::Other, err));
128 Either::B(fut)
129 }
130 }
131 })
132 });
133
134 // waits for either connection established or server failed to start
135 connect
136 .select2(server)
137 .map_err(|res| res.split().0)
138 .timeout(self.timeout)
139 .map_err(|err| {
140 err.into_inner().unwrap_or_else(|| {
141 io::Error::new(
142 io::ErrorKind::TimedOut,
143 "timed out while connecting to server",
144 )
145 })
146 })
147 .and_then(|res| {
148 match res {
149 Either::A(((client, sock_path), server)) => {
150 server.forget(); // continue to run in background
151 Ok((self, client, sock_path))
152 }
153 Either::B((st, _)) => Err(io::Error::new(
154 io::ErrorKind::Other,
155 format!("server exited too early: {}", st),
156 )),
157 }
158 })
159 }
55 }
160 }
56
161
57 /// Determines the server socket to connect to.
162 /// Determines the server socket to connect to.
58 ///
163 ///
59 /// If no `$CHGSOCKNAME` is specified, the socket directory will be created
164 /// If no `$CHGSOCKNAME` is specified, the socket directory will be created
60 /// as necessary.
165 /// as necessary.
61 pub fn prepare_server_socket_path() -> io::Result<PathBuf> {
166 fn prepare_server_socket_path() -> io::Result<PathBuf> {
62 if let Some(s) = env::var_os("CHGSOCKNAME") {
167 if let Some(s) = env::var_os("CHGSOCKNAME") {
63 Ok(PathBuf::from(s))
168 Ok(PathBuf::from(s))
64 } else {
169 } else {
65 let mut path = default_server_socket_dir();
170 let mut path = default_server_socket_dir();
66 create_secure_dir(&path)?;
171 create_secure_dir(&path)?;
67 path.push("server");
172 path.push("server");
68 Ok(path)
173 Ok(path)
69 }
174 }
70 }
175 }
71
176
72 /// Determines the default server socket path as follows.
177 /// Determines the default server socket path as follows.
73 ///
178 ///
74 /// 1. `$XDG_RUNTIME_DIR/chg`
179 /// 1. `$XDG_RUNTIME_DIR/chg`
75 /// 2. `$TMPDIR/chg$UID`
180 /// 2. `$TMPDIR/chg$UID`
76 /// 3. `/tmp/chg$UID`
181 /// 3. `/tmp/chg$UID`
77 pub fn default_server_socket_dir() -> PathBuf {
182 pub fn default_server_socket_dir() -> PathBuf {
78 // XDG_RUNTIME_DIR should be ignored if it has an insufficient permission.
183 // XDG_RUNTIME_DIR should be ignored if it has an insufficient permission.
79 // https://standards.freedesktop.org/basedir-spec/basedir-spec-latest.html
184 // https://standards.freedesktop.org/basedir-spec/basedir-spec-latest.html
80 if let Some(Ok(s)) = env::var_os("XDG_RUNTIME_DIR").map(check_secure_dir) {
185 if let Some(Ok(s)) = env::var_os("XDG_RUNTIME_DIR").map(check_secure_dir) {
81 let mut path = PathBuf::from(s);
186 let mut path = PathBuf::from(s);
82 path.push("chg");
187 path.push("chg");
83 path
188 path
84 } else {
189 } else {
85 let mut path = env::temp_dir();
190 let mut path = env::temp_dir();
86 path.push(format!("chg{}", procutil::get_effective_uid()));
191 path.push(format!("chg{}", procutil::get_effective_uid()));
87 path
192 path
88 }
193 }
89 }
194 }
90
195
91 /// Determines the default hg command.
196 /// Determines the default hg command.
92 pub fn default_hg_command() -> OsString {
197 pub fn default_hg_command() -> OsString {
93 // TODO: maybe allow embedding the path at compile time (or load from hgrc)
198 // TODO: maybe allow embedding the path at compile time (or load from hgrc)
94 env::var_os("CHGHG")
199 env::var_os("CHGHG")
95 .or(env::var_os("HG"))
200 .or(env::var_os("HG"))
96 .unwrap_or(OsStr::new("hg").to_owned())
201 .unwrap_or(OsStr::new("hg").to_owned())
97 }
202 }
98
203
99 fn default_timeout() -> Duration {
204 fn default_timeout() -> Duration {
100 let secs = env::var("CHGTIMEOUT")
205 let secs = env::var("CHGTIMEOUT")
101 .ok()
206 .ok()
102 .and_then(|s| s.parse().ok())
207 .and_then(|s| s.parse().ok())
103 .unwrap_or(60);
208 .unwrap_or(60);
104 Duration::from_secs(secs)
209 Duration::from_secs(secs)
105 }
210 }
106
211
107 /// Creates a directory which the other users cannot access to.
212 /// Creates a directory which the other users cannot access to.
108 ///
213 ///
109 /// If the directory already exists, tests its permission.
214 /// If the directory already exists, tests its permission.
110 fn create_secure_dir<P>(path: P) -> io::Result<()>
215 fn create_secure_dir<P>(path: P) -> io::Result<()>
111 where
216 where
112 P: AsRef<Path>,
217 P: AsRef<Path>,
113 {
218 {
114 DirBuilder::new()
219 DirBuilder::new()
115 .mode(0o700)
220 .mode(0o700)
116 .create(path.as_ref())
221 .create(path.as_ref())
117 .or_else(|err| {
222 .or_else(|err| {
118 if err.kind() == io::ErrorKind::AlreadyExists {
223 if err.kind() == io::ErrorKind::AlreadyExists {
119 check_secure_dir(path).map(|_| ())
224 check_secure_dir(path).map(|_| ())
120 } else {
225 } else {
121 Err(err)
226 Err(err)
122 }
227 }
123 })
228 })
124 }
229 }
125
230
126 fn check_secure_dir<P>(path: P) -> io::Result<P>
231 fn check_secure_dir<P>(path: P) -> io::Result<P>
127 where
232 where
128 P: AsRef<Path>,
233 P: AsRef<Path>,
129 {
234 {
130 let a = fs::symlink_metadata(path.as_ref())?;
235 let a = fs::symlink_metadata(path.as_ref())?;
131 if a.is_dir() && a.uid() == procutil::get_effective_uid() && (a.mode() & 0o777) == 0o700 {
236 if a.is_dir() && a.uid() == procutil::get_effective_uid() && (a.mode() & 0o777) == 0o700 {
132 Ok(path)
237 Ok(path)
133 } else {
238 } else {
134 Err(io::Error::new(io::ErrorKind::Other, "insecure directory"))
239 Err(io::Error::new(io::ErrorKind::Other, "insecure directory"))
135 }
240 }
136 }
241 }
@@ -1,100 +1,102 b''
1 // Copyright 2018 Yuya Nishihara <yuya@tcha.org>
1 // Copyright 2018 Yuya Nishihara <yuya@tcha.org>
2 //
2 //
3 // This software may be used and distributed according to the terms of the
3 // This software may be used and distributed according to the terms of the
4 // GNU General Public License version 2 or any later version.
4 // GNU General Public License version 2 or any later version.
5
5
6 extern crate chg;
6 extern crate chg;
7 extern crate futures;
7 extern crate futures;
8 extern crate log;
8 extern crate log;
9 extern crate tokio;
9 extern crate tokio;
10 extern crate tokio_hglib;
10 extern crate tokio_hglib;
11
11
12 use chg::locator;
12 use chg::locator::Locator;
13 use chg::procutil;
13 use chg::procutil;
14 use chg::{ChgClientExt, ChgUiHandler};
14 use chg::{ChgClientExt, ChgUiHandler};
15 use futures::sync::oneshot;
15 use futures::sync::oneshot;
16 use std::env;
16 use std::env;
17 use std::io;
17 use std::io;
18 use std::process;
18 use std::process;
19 use std::time::Instant;
19 use std::time::Instant;
20 use tokio::prelude::*;
20 use tokio::prelude::*;
21 use tokio_hglib::UnixClient;
22
21
23 struct DebugLogger {
22 struct DebugLogger {
24 start: Instant,
23 start: Instant,
25 }
24 }
26
25
27 impl DebugLogger {
26 impl DebugLogger {
28 pub fn new() -> DebugLogger {
27 pub fn new() -> DebugLogger {
29 DebugLogger {
28 DebugLogger {
30 start: Instant::now(),
29 start: Instant::now(),
31 }
30 }
32 }
31 }
33 }
32 }
34
33
35 impl log::Log for DebugLogger {
34 impl log::Log for DebugLogger {
36 fn enabled(&self, metadata: &log::Metadata) -> bool {
35 fn enabled(&self, metadata: &log::Metadata) -> bool {
37 metadata.target().starts_with("chg::")
36 metadata.target().starts_with("chg::")
38 }
37 }
39
38
40 fn log(&self, record: &log::Record) {
39 fn log(&self, record: &log::Record) {
41 if self.enabled(record.metadata()) {
40 if self.enabled(record.metadata()) {
42 // just make the output looks similar to chg of C
41 // just make the output looks similar to chg of C
43 let l = format!("{}", record.level()).to_lowercase();
42 let l = format!("{}", record.level()).to_lowercase();
44 let t = self.start.elapsed();
43 let t = self.start.elapsed();
45 writeln!(
44 writeln!(
46 io::stderr(),
45 io::stderr(),
47 "chg: {}: {}.{:06} {}",
46 "chg: {}: {}.{:06} {}",
48 l,
47 l,
49 t.as_secs(),
48 t.as_secs(),
50 t.subsec_micros(),
49 t.subsec_micros(),
51 record.args()
50 record.args()
52 )
51 )
53 .unwrap_or(());
52 .unwrap_or(());
54 }
53 }
55 }
54 }
56
55
57 fn flush(&self) {}
56 fn flush(&self) {}
58 }
57 }
59
58
60 fn main() {
59 fn main() {
61 if env::var_os("CHGDEBUG").is_some() {
60 if env::var_os("CHGDEBUG").is_some() {
62 log::set_boxed_logger(Box::new(DebugLogger::new()))
61 log::set_boxed_logger(Box::new(DebugLogger::new()))
63 .expect("any logger should not be installed yet");
62 .expect("any logger should not be installed yet");
64 log::set_max_level(log::LevelFilter::Debug);
63 log::set_max_level(log::LevelFilter::Debug);
65 }
64 }
66
65
66 // TODO: add loop detection by $CHGINTERNALMARK
67
67 let code = run().unwrap_or_else(|err| {
68 let code = run().unwrap_or_else(|err| {
68 writeln!(io::stderr(), "chg: abort: {}", err).unwrap_or(());
69 writeln!(io::stderr(), "chg: abort: {}", err).unwrap_or(());
69 255
70 255
70 });
71 });
71 process::exit(code);
72 process::exit(code);
72 }
73 }
73
74
74 fn run() -> io::Result<i32> {
75 fn run() -> io::Result<i32> {
75 let current_dir = env::current_dir()?;
76 let current_dir = env::current_dir()?;
76 let sock_path = locator::prepare_server_socket_path()?;
77 let loc = Locator::prepare_from_env()?;
77 let handler = ChgUiHandler::new();
78 let handler = ChgUiHandler::new();
78 let (result_tx, result_rx) = oneshot::channel();
79 let (result_tx, result_rx) = oneshot::channel();
79 let fut = UnixClient::connect(sock_path)
80 let fut = loc
80 .and_then(|client| client.set_current_dir(current_dir))
81 .connect()
82 .and_then(|(_, client)| client.set_current_dir(current_dir))
81 .and_then(|client| client.attach_io(io::stdin(), io::stdout(), io::stderr()))
83 .and_then(|client| client.attach_io(io::stdin(), io::stdout(), io::stderr()))
82 .and_then(|client| {
84 .and_then(|client| {
83 let pid = client.server_spec().process_id.unwrap();
85 let pid = client.server_spec().process_id.unwrap();
84 let pgid = client.server_spec().process_group_id;
86 let pgid = client.server_spec().process_group_id;
85 procutil::setup_signal_handler_once(pid, pgid)?;
87 procutil::setup_signal_handler_once(pid, pgid)?;
86 Ok(client)
88 Ok(client)
87 })
89 })
88 .and_then(|client| client.run_command_chg(handler, env::args_os().skip(1)))
90 .and_then(|client| client.run_command_chg(handler, env::args_os().skip(1)))
89 .map(|(_client, _handler, code)| {
91 .map(|(_client, _handler, code)| {
90 procutil::restore_signal_handler_once()?;
92 procutil::restore_signal_handler_once()?;
91 Ok(code)
93 Ok(code)
92 })
94 })
93 .or_else(|err| Ok(Err(err))) // pass back error to caller
95 .or_else(|err| Ok(Err(err))) // pass back error to caller
94 .map(|res| result_tx.send(res).unwrap());
96 .map(|res| result_tx.send(res).unwrap());
95 tokio::run(fut);
97 tokio::run(fut);
96 result_rx.wait().unwrap_or(Err(io::Error::new(
98 result_rx.wait().unwrap_or(Err(io::Error::new(
97 io::ErrorKind::Other,
99 io::ErrorKind::Other,
98 "no exit code set",
100 "no exit code set",
99 )))
101 )))
100 }
102 }
General Comments 0
You need to be logged in to leave comments. Login now