##// END OF EJS Templates
rust-chg: update name of the server process...
Yuya Nishihara -
r45175:80d6e341 default
parent child Browse files
Show More
@@ -1,124 +1,136 b''
1 // Copyright 2018 Yuya Nishihara <yuya@tcha.org>
1 // Copyright 2018 Yuya Nishihara <yuya@tcha.org>
2 //
2 //
3 // This software may be used and distributed according to the terms of the
3 // This software may be used and distributed according to the terms of the
4 // GNU General Public License version 2 or any later version.
4 // GNU General Public License version 2 or any later version.
5
5
6 //! cHg extensions to command server client.
6 //! cHg extensions to command server client.
7
7
8 use bytes::{BufMut, Bytes, BytesMut};
8 use bytes::{BufMut, Bytes, BytesMut};
9 use std::ffi::OsStr;
9 use std::ffi::OsStr;
10 use std::io;
10 use std::io;
11 use std::mem;
11 use std::mem;
12 use std::os::unix::ffi::OsStrExt;
12 use std::os::unix::ffi::OsStrExt;
13 use std::os::unix::io::AsRawFd;
13 use std::os::unix::io::AsRawFd;
14 use std::path::Path;
14 use std::path::Path;
15 use tokio_hglib::protocol::{OneShotQuery, OneShotRequest};
15 use tokio_hglib::protocol::{OneShotQuery, OneShotRequest};
16 use tokio_hglib::{Client, Connection};
16 use tokio_hglib::{Client, Connection};
17
17
18 use super::attachio::AttachIo;
18 use super::attachio::AttachIo;
19 use super::message::{self, Instruction};
19 use super::message::{self, Instruction};
20 use super::runcommand::ChgRunCommand;
20 use super::runcommand::ChgRunCommand;
21 use super::uihandler::SystemHandler;
21 use super::uihandler::SystemHandler;
22
22
23 pub trait ChgClientExt<C>
23 pub trait ChgClientExt<C>
24 where
24 where
25 C: Connection + AsRawFd,
25 C: Connection + AsRawFd,
26 {
26 {
27 /// Attaches the client file descriptors to the server.
27 /// Attaches the client file descriptors to the server.
28 fn attach_io<I, O, E>(self, stdin: I, stdout: O, stderr: E) -> AttachIo<C, I, O, E>
28 fn attach_io<I, O, E>(self, stdin: I, stdout: O, stderr: E) -> AttachIo<C, I, O, E>
29 where
29 where
30 I: AsRawFd,
30 I: AsRawFd,
31 O: AsRawFd,
31 O: AsRawFd,
32 E: AsRawFd;
32 E: AsRawFd;
33
33
34 /// Changes the working directory of the server.
34 /// Changes the working directory of the server.
35 fn set_current_dir<P>(self, dir: P) -> OneShotRequest<C>
35 fn set_current_dir<P>(self, dir: P) -> OneShotRequest<C>
36 where
36 where
37 P: AsRef<Path>;
37 P: AsRef<Path>;
38
38
39 /// Updates the environment variables of the server.
39 /// Updates the environment variables of the server.
40 fn set_env_vars_os<I, P>(self, vars: I) -> OneShotRequest<C>
40 fn set_env_vars_os<I, P>(self, vars: I) -> OneShotRequest<C>
41 where
41 where
42 I: IntoIterator<Item = (P, P)>,
42 I: IntoIterator<Item = (P, P)>,
43 P: AsRef<OsStr>;
43 P: AsRef<OsStr>;
44
44
45 /// Changes the process title of the server.
46 fn set_process_name<P>(self, name: P) -> OneShotRequest<C>
47 where
48 P: AsRef<OsStr>;
49
45 /// Changes the umask of the server process.
50 /// Changes the umask of the server process.
46 fn set_umask(self, mask: u32) -> OneShotRequest<C>;
51 fn set_umask(self, mask: u32) -> OneShotRequest<C>;
47
52
48 /// Runs the specified Mercurial command with cHg extension.
53 /// Runs the specified Mercurial command with cHg extension.
49 fn run_command_chg<I, P, H>(self, handler: H, args: I) -> ChgRunCommand<C, H>
54 fn run_command_chg<I, P, H>(self, handler: H, args: I) -> ChgRunCommand<C, H>
50 where
55 where
51 I: IntoIterator<Item = P>,
56 I: IntoIterator<Item = P>,
52 P: AsRef<OsStr>,
57 P: AsRef<OsStr>,
53 H: SystemHandler;
58 H: SystemHandler;
54
59
55 /// Validates if the server can run Mercurial commands with the expected
60 /// Validates if the server can run Mercurial commands with the expected
56 /// configuration.
61 /// configuration.
57 ///
62 ///
58 /// The `args` should contain early command arguments such as `--config`
63 /// The `args` should contain early command arguments such as `--config`
59 /// and `-R`.
64 /// and `-R`.
60 ///
65 ///
61 /// Client-side environment must be sent prior to this request, by
66 /// Client-side environment must be sent prior to this request, by
62 /// `set_current_dir()` and `set_env_vars_os()`.
67 /// `set_current_dir()` and `set_env_vars_os()`.
63 fn validate<I, P>(self, args: I) -> OneShotQuery<C, fn(Bytes) -> io::Result<Vec<Instruction>>>
68 fn validate<I, P>(self, args: I) -> OneShotQuery<C, fn(Bytes) -> io::Result<Vec<Instruction>>>
64 where
69 where
65 I: IntoIterator<Item = P>,
70 I: IntoIterator<Item = P>,
66 P: AsRef<OsStr>;
71 P: AsRef<OsStr>;
67 }
72 }
68
73
69 impl<C> ChgClientExt<C> for Client<C>
74 impl<C> ChgClientExt<C> for Client<C>
70 where
75 where
71 C: Connection + AsRawFd,
76 C: Connection + AsRawFd,
72 {
77 {
73 fn attach_io<I, O, E>(self, stdin: I, stdout: O, stderr: E) -> AttachIo<C, I, O, E>
78 fn attach_io<I, O, E>(self, stdin: I, stdout: O, stderr: E) -> AttachIo<C, I, O, E>
74 where
79 where
75 I: AsRawFd,
80 I: AsRawFd,
76 O: AsRawFd,
81 O: AsRawFd,
77 E: AsRawFd,
82 E: AsRawFd,
78 {
83 {
79 AttachIo::with_client(self, stdin, stdout, Some(stderr))
84 AttachIo::with_client(self, stdin, stdout, Some(stderr))
80 }
85 }
81
86
82 fn set_current_dir<P>(self, dir: P) -> OneShotRequest<C>
87 fn set_current_dir<P>(self, dir: P) -> OneShotRequest<C>
83 where
88 where
84 P: AsRef<Path>,
89 P: AsRef<Path>,
85 {
90 {
86 OneShotRequest::start_with_args(self, b"chdir", dir.as_ref().as_os_str().as_bytes())
91 OneShotRequest::start_with_args(self, b"chdir", dir.as_ref().as_os_str().as_bytes())
87 }
92 }
88
93
89 fn set_env_vars_os<I, P>(self, vars: I) -> OneShotRequest<C>
94 fn set_env_vars_os<I, P>(self, vars: I) -> OneShotRequest<C>
90 where
95 where
91 I: IntoIterator<Item = (P, P)>,
96 I: IntoIterator<Item = (P, P)>,
92 P: AsRef<OsStr>,
97 P: AsRef<OsStr>,
93 {
98 {
94 OneShotRequest::start_with_args(self, b"setenv", message::pack_env_vars_os(vars))
99 OneShotRequest::start_with_args(self, b"setenv", message::pack_env_vars_os(vars))
95 }
100 }
96
101
102 fn set_process_name<P>(self, name: P) -> OneShotRequest<C>
103 where
104 P: AsRef<OsStr>,
105 {
106 OneShotRequest::start_with_args(self, b"setprocname", name.as_ref().as_bytes())
107 }
108
97 fn set_umask(self, mask: u32) -> OneShotRequest<C> {
109 fn set_umask(self, mask: u32) -> OneShotRequest<C> {
98 let mut args = BytesMut::with_capacity(mem::size_of_val(&mask));
110 let mut args = BytesMut::with_capacity(mem::size_of_val(&mask));
99 args.put_u32_be(mask);
111 args.put_u32_be(mask);
100 OneShotRequest::start_with_args(self, b"setumask2", args)
112 OneShotRequest::start_with_args(self, b"setumask2", args)
101 }
113 }
102
114
103 fn run_command_chg<I, P, H>(self, handler: H, args: I) -> ChgRunCommand<C, H>
115 fn run_command_chg<I, P, H>(self, handler: H, args: I) -> ChgRunCommand<C, H>
104 where
116 where
105 I: IntoIterator<Item = P>,
117 I: IntoIterator<Item = P>,
106 P: AsRef<OsStr>,
118 P: AsRef<OsStr>,
107 H: SystemHandler,
119 H: SystemHandler,
108 {
120 {
109 ChgRunCommand::with_client(self, handler, message::pack_args_os(args))
121 ChgRunCommand::with_client(self, handler, message::pack_args_os(args))
110 }
122 }
111
123
112 fn validate<I, P>(self, args: I) -> OneShotQuery<C, fn(Bytes) -> io::Result<Vec<Instruction>>>
124 fn validate<I, P>(self, args: I) -> OneShotQuery<C, fn(Bytes) -> io::Result<Vec<Instruction>>>
113 where
125 where
114 I: IntoIterator<Item = P>,
126 I: IntoIterator<Item = P>,
115 P: AsRef<OsStr>,
127 P: AsRef<OsStr>,
116 {
128 {
117 OneShotQuery::start_with_args(
129 OneShotQuery::start_with_args(
118 self,
130 self,
119 b"validate",
131 b"validate",
120 message::pack_args_os(args),
132 message::pack_args_os(args),
121 message::parse_instructions,
133 message::parse_instructions,
122 )
134 )
123 }
135 }
124 }
136 }
@@ -1,489 +1,500 b''
1 // Copyright 2011, 2018 Yuya Nishihara <yuya@tcha.org>
1 // Copyright 2011, 2018 Yuya Nishihara <yuya@tcha.org>
2 //
2 //
3 // This software may be used and distributed according to the terms of the
3 // This software may be used and distributed according to the terms of the
4 // GNU General Public License version 2 or any later version.
4 // GNU General Public License version 2 or any later version.
5
5
6 //! Utility for locating command-server process.
6 //! Utility for locating command-server process.
7
7
8 use futures::future::{self, Either, Loop};
8 use futures::future::{self, Either, Loop};
9 use std::env;
9 use std::env;
10 use std::ffi::{OsStr, OsString};
10 use std::ffi::{OsStr, OsString};
11 use std::fs::{self, DirBuilder};
11 use std::fs::{self, DirBuilder};
12 use std::io;
12 use std::io;
13 use std::os::unix::ffi::{OsStrExt, OsStringExt};
13 use std::os::unix::ffi::{OsStrExt, OsStringExt};
14 use std::os::unix::fs::{DirBuilderExt, MetadataExt};
14 use std::os::unix::fs::{DirBuilderExt, MetadataExt};
15 use std::path::{Path, PathBuf};
15 use std::path::{Path, PathBuf};
16 use std::process::{self, Command};
16 use std::process::{self, Command};
17 use std::time::Duration;
17 use std::time::Duration;
18 use tokio::prelude::*;
18 use tokio::prelude::*;
19 use tokio_hglib::UnixClient;
19 use tokio_hglib::UnixClient;
20 use tokio_process::{Child, CommandExt};
20 use tokio_process::{Child, CommandExt};
21 use tokio_timer;
21 use tokio_timer;
22
22
23 use super::clientext::ChgClientExt;
23 use super::clientext::ChgClientExt;
24 use super::message::{Instruction, ServerSpec};
24 use super::message::{Instruction, ServerSpec};
25 use super::procutil;
25 use super::procutil;
26
26
27 const REQUIRED_SERVER_CAPABILITIES: &[&str] = &[
27 const REQUIRED_SERVER_CAPABILITIES: &[&str] = &[
28 "attachio",
28 "attachio",
29 "chdir",
29 "chdir",
30 "runcommand",
30 "runcommand",
31 "setenv",
31 "setenv",
32 "setumask2",
32 "setumask2",
33 "validate",
33 "validate",
34 ];
34 ];
35
35
36 /// Helper to connect to and spawn a server process.
36 /// Helper to connect to and spawn a server process.
37 #[derive(Clone, Debug)]
37 #[derive(Clone, Debug)]
38 pub struct Locator {
38 pub struct Locator {
39 hg_command: OsString,
39 hg_command: OsString,
40 hg_early_args: Vec<OsString>,
40 hg_early_args: Vec<OsString>,
41 current_dir: PathBuf,
41 current_dir: PathBuf,
42 env_vars: Vec<(OsString, OsString)>,
42 env_vars: Vec<(OsString, OsString)>,
43 process_id: u32,
43 process_id: u32,
44 base_sock_path: PathBuf,
44 base_sock_path: PathBuf,
45 redirect_sock_path: Option<PathBuf>,
45 redirect_sock_path: Option<PathBuf>,
46 timeout: Duration,
46 timeout: Duration,
47 }
47 }
48
48
49 impl Locator {
49 impl Locator {
50 /// Creates locator capturing the current process environment.
50 /// Creates locator capturing the current process environment.
51 ///
51 ///
52 /// If no `$CHGSOCKNAME` is specified, the socket directory will be
52 /// If no `$CHGSOCKNAME` is specified, the socket directory will be
53 /// created as necessary.
53 /// created as necessary.
54 pub fn prepare_from_env() -> io::Result<Locator> {
54 pub fn prepare_from_env() -> io::Result<Locator> {
55 Ok(Locator {
55 Ok(Locator {
56 hg_command: default_hg_command(),
56 hg_command: default_hg_command(),
57 hg_early_args: Vec::new(),
57 hg_early_args: Vec::new(),
58 current_dir: env::current_dir()?,
58 current_dir: env::current_dir()?,
59 env_vars: env::vars_os().collect(),
59 env_vars: env::vars_os().collect(),
60 process_id: process::id(),
60 process_id: process::id(),
61 base_sock_path: prepare_server_socket_path()?,
61 base_sock_path: prepare_server_socket_path()?,
62 redirect_sock_path: None,
62 redirect_sock_path: None,
63 timeout: default_timeout(),
63 timeout: default_timeout(),
64 })
64 })
65 }
65 }
66
66
67 /// Temporary socket path for this client process.
67 /// Temporary socket path for this client process.
68 fn temp_sock_path(&self) -> PathBuf {
68 fn temp_sock_path(&self) -> PathBuf {
69 let src = self.base_sock_path.as_os_str().as_bytes();
69 let src = self.base_sock_path.as_os_str().as_bytes();
70 let mut buf = Vec::with_capacity(src.len() + 6); // "{src}.{pid}".len()
70 let mut buf = Vec::with_capacity(src.len() + 6); // "{src}.{pid}".len()
71 buf.extend_from_slice(src);
71 buf.extend_from_slice(src);
72 buf.extend_from_slice(format!(".{}", self.process_id).as_bytes());
72 buf.extend_from_slice(format!(".{}", self.process_id).as_bytes());
73 OsString::from_vec(buf).into()
73 OsString::from_vec(buf).into()
74 }
74 }
75
75
76 /// Specifies the arguments to be passed to the server at start.
76 /// Specifies the arguments to be passed to the server at start.
77 pub fn set_early_args<I, P>(&mut self, args: I)
77 pub fn set_early_args<I, P>(&mut self, args: I)
78 where
78 where
79 I: IntoIterator<Item = P>,
79 I: IntoIterator<Item = P>,
80 P: AsRef<OsStr>,
80 P: AsRef<OsStr>,
81 {
81 {
82 self.hg_early_args = args.into_iter().map(|a| a.as_ref().to_owned()).collect();
82 self.hg_early_args = args.into_iter().map(|a| a.as_ref().to_owned()).collect();
83 }
83 }
84
84
85 /// Connects to the server.
85 /// Connects to the server.
86 ///
86 ///
87 /// The server process will be spawned if not running.
87 /// The server process will be spawned if not running.
88 pub fn connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> {
88 pub fn connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> {
89 future::loop_fn((self, 0), |(loc, cnt)| {
89 future::loop_fn((self, 0), |(loc, cnt)| {
90 if cnt < 10 {
90 if cnt < 10 {
91 let fut = loc
91 let fut = loc
92 .try_connect()
92 .try_connect()
93 .and_then(|(loc, client)| {
93 .and_then(|(loc, client)| {
94 client
94 client
95 .validate(&loc.hg_early_args)
95 .validate(&loc.hg_early_args)
96 .map(|(client, instructions)| (loc, client, instructions))
96 .map(|(client, instructions)| (loc, client, instructions))
97 })
97 })
98 .and_then(move |(loc, client, instructions)| {
98 .and_then(move |(loc, client, instructions)| {
99 loc.run_instructions(client, instructions, cnt)
99 loc.run_instructions(client, instructions, cnt)
100 });
100 });
101 Either::A(fut)
101 Either::A(fut)
102 } else {
102 } else {
103 let msg = format!(
103 let msg = format!(
104 concat!(
104 concat!(
105 "too many redirections.\n",
105 "too many redirections.\n",
106 "Please make sure {:?} is not a wrapper which ",
106 "Please make sure {:?} is not a wrapper which ",
107 "changes sensitive environment variables ",
107 "changes sensitive environment variables ",
108 "before executing hg. If you have to use a ",
108 "before executing hg. If you have to use a ",
109 "wrapper, wrap chg instead of hg.",
109 "wrapper, wrap chg instead of hg.",
110 ),
110 ),
111 loc.hg_command
111 loc.hg_command
112 );
112 );
113 Either::B(future::err(io::Error::new(io::ErrorKind::Other, msg)))
113 Either::B(future::err(io::Error::new(io::ErrorKind::Other, msg)))
114 }
114 }
115 })
115 })
116 }
116 }
117
117
118 /// Runs instructions received from the server.
118 /// Runs instructions received from the server.
119 fn run_instructions(
119 fn run_instructions(
120 mut self,
120 mut self,
121 client: UnixClient,
121 client: UnixClient,
122 instructions: Vec<Instruction>,
122 instructions: Vec<Instruction>,
123 cnt: usize,
123 cnt: usize,
124 ) -> io::Result<Loop<(Self, UnixClient), (Self, usize)>> {
124 ) -> io::Result<Loop<(Self, UnixClient), (Self, usize)>> {
125 let mut reconnect = false;
125 let mut reconnect = false;
126 for inst in instructions {
126 for inst in instructions {
127 debug!("instruction: {:?}", inst);
127 debug!("instruction: {:?}", inst);
128 match inst {
128 match inst {
129 Instruction::Exit(_) => {
129 Instruction::Exit(_) => {
130 // Just returns the current connection to run the
130 // Just returns the current connection to run the
131 // unparsable command and report the error
131 // unparsable command and report the error
132 return Ok(Loop::Break((self, client)));
132 return Ok(Loop::Break((self, client)));
133 }
133 }
134 Instruction::Reconnect => {
134 Instruction::Reconnect => {
135 reconnect = true;
135 reconnect = true;
136 }
136 }
137 Instruction::Redirect(path) => {
137 Instruction::Redirect(path) => {
138 if path.parent() != self.base_sock_path.parent() {
138 if path.parent() != self.base_sock_path.parent() {
139 let msg = format!(
139 let msg = format!(
140 "insecure redirect instruction from server: {}",
140 "insecure redirect instruction from server: {}",
141 path.display()
141 path.display()
142 );
142 );
143 return Err(io::Error::new(io::ErrorKind::InvalidData, msg));
143 return Err(io::Error::new(io::ErrorKind::InvalidData, msg));
144 }
144 }
145 self.redirect_sock_path = Some(path);
145 self.redirect_sock_path = Some(path);
146 reconnect = true;
146 reconnect = true;
147 }
147 }
148 Instruction::Unlink(path) => {
148 Instruction::Unlink(path) => {
149 if path.parent() != self.base_sock_path.parent() {
149 if path.parent() != self.base_sock_path.parent() {
150 let msg = format!(
150 let msg = format!(
151 "insecure unlink instruction from server: {}",
151 "insecure unlink instruction from server: {}",
152 path.display()
152 path.display()
153 );
153 );
154 return Err(io::Error::new(io::ErrorKind::InvalidData, msg));
154 return Err(io::Error::new(io::ErrorKind::InvalidData, msg));
155 }
155 }
156 fs::remove_file(path).unwrap_or(()); // may race
156 fs::remove_file(path).unwrap_or(()); // may race
157 }
157 }
158 }
158 }
159 }
159 }
160
160
161 if reconnect {
161 if reconnect {
162 Ok(Loop::Continue((self, cnt + 1)))
162 Ok(Loop::Continue((self, cnt + 1)))
163 } else {
163 } else {
164 Ok(Loop::Break((self, client)))
164 Ok(Loop::Break((self, client)))
165 }
165 }
166 }
166 }
167
167
168 /// Tries to connect to the existing server, or spawns new if not running.
168 /// Tries to connect to the existing server, or spawns new if not running.
169 fn try_connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> {
169 fn try_connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> {
170 let sock_path = self
170 let sock_path = self
171 .redirect_sock_path
171 .redirect_sock_path
172 .as_ref()
172 .as_ref()
173 .unwrap_or(&self.base_sock_path)
173 .unwrap_or(&self.base_sock_path)
174 .clone();
174 .clone();
175 debug!("try connect to {}", sock_path.display());
175 debug!("try connect to {}", sock_path.display());
176 UnixClient::connect(sock_path)
176 UnixClient::connect(sock_path)
177 .then(|res| {
177 .then(|res| {
178 match res {
178 match res {
179 Ok(client) => Either::A(future::ok((self, client))),
179 Ok(client) => Either::A(future::ok((self, client))),
180 Err(_) => {
180 Err(_) => {
181 // Prevent us from being re-connected to the outdated
181 // Prevent us from being re-connected to the outdated
182 // master server: We were told by the server to redirect
182 // master server: We were told by the server to redirect
183 // to redirect_sock_path, which didn't work. We do not
183 // to redirect_sock_path, which didn't work. We do not
184 // want to connect to the same master server again
184 // want to connect to the same master server again
185 // because it would probably tell us the same thing.
185 // because it would probably tell us the same thing.
186 if self.redirect_sock_path.is_some() {
186 if self.redirect_sock_path.is_some() {
187 fs::remove_file(&self.base_sock_path).unwrap_or(());
187 fs::remove_file(&self.base_sock_path).unwrap_or(());
188 // may race
188 // may race
189 }
189 }
190 Either::B(self.spawn_connect())
190 Either::B(self.spawn_connect())
191 }
191 }
192 }
192 }
193 })
193 })
194 .and_then(|(loc, client)| {
194 .and_then(|(loc, client)| {
195 check_server_capabilities(client.server_spec())?;
195 check_server_capabilities(client.server_spec())?;
196 Ok((loc, client))
196 Ok((loc, client))
197 })
197 })
198 .and_then(|(loc, client)| {
198 .and_then(|(loc, client)| {
199 // It's purely optional, and the server might not support this command.
200 if client.server_spec().capabilities.contains("setprocname") {
201 let fut = client
202 .set_process_name(format!("chg[worker/{}]", loc.process_id))
203 .map(|client| (loc, client));
204 Either::A(fut)
205 } else {
206 Either::B(future::ok((loc, client)))
207 }
208 })
209 .and_then(|(loc, client)| {
199 client
210 client
200 .set_current_dir(&loc.current_dir)
211 .set_current_dir(&loc.current_dir)
201 .map(|client| (loc, client))
212 .map(|client| (loc, client))
202 })
213 })
203 .and_then(|(loc, client)| {
214 .and_then(|(loc, client)| {
204 client
215 client
205 .set_env_vars_os(loc.env_vars.iter().cloned())
216 .set_env_vars_os(loc.env_vars.iter().cloned())
206 .map(|client| (loc, client))
217 .map(|client| (loc, client))
207 })
218 })
208 }
219 }
209
220
210 /// Spawns new server process and connects to it.
221 /// Spawns new server process and connects to it.
211 ///
222 ///
212 /// The server will be spawned at the current working directory, then
223 /// The server will be spawned at the current working directory, then
213 /// chdir to "/", so that the server will load configs from the target
224 /// chdir to "/", so that the server will load configs from the target
214 /// repository.
225 /// repository.
215 fn spawn_connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> {
226 fn spawn_connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> {
216 let sock_path = self.temp_sock_path();
227 let sock_path = self.temp_sock_path();
217 debug!("start cmdserver at {}", sock_path.display());
228 debug!("start cmdserver at {}", sock_path.display());
218 Command::new(&self.hg_command)
229 Command::new(&self.hg_command)
219 .arg("serve")
230 .arg("serve")
220 .arg("--cmdserver")
231 .arg("--cmdserver")
221 .arg("chgunix")
232 .arg("chgunix")
222 .arg("--address")
233 .arg("--address")
223 .arg(&sock_path)
234 .arg(&sock_path)
224 .arg("--daemon-postexec")
235 .arg("--daemon-postexec")
225 .arg("chdir:/")
236 .arg("chdir:/")
226 .args(&self.hg_early_args)
237 .args(&self.hg_early_args)
227 .current_dir(&self.current_dir)
238 .current_dir(&self.current_dir)
228 .env_clear()
239 .env_clear()
229 .envs(self.env_vars.iter().cloned())
240 .envs(self.env_vars.iter().cloned())
230 .env("CHGINTERNALMARK", "")
241 .env("CHGINTERNALMARK", "")
231 .spawn_async()
242 .spawn_async()
232 .into_future()
243 .into_future()
233 .and_then(|server| self.connect_spawned(server, sock_path))
244 .and_then(|server| self.connect_spawned(server, sock_path))
234 .and_then(|(loc, client, sock_path)| {
245 .and_then(|(loc, client, sock_path)| {
235 debug!(
246 debug!(
236 "rename {} to {}",
247 "rename {} to {}",
237 sock_path.display(),
248 sock_path.display(),
238 loc.base_sock_path.display()
249 loc.base_sock_path.display()
239 );
250 );
240 fs::rename(&sock_path, &loc.base_sock_path)?;
251 fs::rename(&sock_path, &loc.base_sock_path)?;
241 Ok((loc, client))
252 Ok((loc, client))
242 })
253 })
243 }
254 }
244
255
245 /// Tries to connect to the just spawned server repeatedly until timeout
256 /// Tries to connect to the just spawned server repeatedly until timeout
246 /// exceeded.
257 /// exceeded.
247 fn connect_spawned(
258 fn connect_spawned(
248 self,
259 self,
249 server: Child,
260 server: Child,
250 sock_path: PathBuf,
261 sock_path: PathBuf,
251 ) -> impl Future<Item = (Self, UnixClient, PathBuf), Error = io::Error> {
262 ) -> impl Future<Item = (Self, UnixClient, PathBuf), Error = io::Error> {
252 debug!("try connect to {} repeatedly", sock_path.display());
263 debug!("try connect to {} repeatedly", sock_path.display());
253 let connect = future::loop_fn(sock_path, |sock_path| {
264 let connect = future::loop_fn(sock_path, |sock_path| {
254 UnixClient::connect(sock_path.clone()).then(|res| {
265 UnixClient::connect(sock_path.clone()).then(|res| {
255 match res {
266 match res {
256 Ok(client) => Either::A(future::ok(Loop::Break((client, sock_path)))),
267 Ok(client) => Either::A(future::ok(Loop::Break((client, sock_path)))),
257 Err(_) => {
268 Err(_) => {
258 // try again with slight delay
269 // try again with slight delay
259 let fut = tokio_timer::sleep(Duration::from_millis(10))
270 let fut = tokio_timer::sleep(Duration::from_millis(10))
260 .map(|()| Loop::Continue(sock_path))
271 .map(|()| Loop::Continue(sock_path))
261 .map_err(|err| io::Error::new(io::ErrorKind::Other, err));
272 .map_err(|err| io::Error::new(io::ErrorKind::Other, err));
262 Either::B(fut)
273 Either::B(fut)
263 }
274 }
264 }
275 }
265 })
276 })
266 });
277 });
267
278
268 // waits for either connection established or server failed to start
279 // waits for either connection established or server failed to start
269 connect
280 connect
270 .select2(server)
281 .select2(server)
271 .map_err(|res| res.split().0)
282 .map_err(|res| res.split().0)
272 .timeout(self.timeout)
283 .timeout(self.timeout)
273 .map_err(|err| {
284 .map_err(|err| {
274 err.into_inner().unwrap_or_else(|| {
285 err.into_inner().unwrap_or_else(|| {
275 io::Error::new(
286 io::Error::new(
276 io::ErrorKind::TimedOut,
287 io::ErrorKind::TimedOut,
277 "timed out while connecting to server",
288 "timed out while connecting to server",
278 )
289 )
279 })
290 })
280 })
291 })
281 .and_then(|res| {
292 .and_then(|res| {
282 match res {
293 match res {
283 Either::A(((client, sock_path), server)) => {
294 Either::A(((client, sock_path), server)) => {
284 server.forget(); // continue to run in background
295 server.forget(); // continue to run in background
285 Ok((self, client, sock_path))
296 Ok((self, client, sock_path))
286 }
297 }
287 Either::B((st, _)) => Err(io::Error::new(
298 Either::B((st, _)) => Err(io::Error::new(
288 io::ErrorKind::Other,
299 io::ErrorKind::Other,
289 format!("server exited too early: {}", st),
300 format!("server exited too early: {}", st),
290 )),
301 )),
291 }
302 }
292 })
303 })
293 }
304 }
294 }
305 }
295
306
296 /// Determines the server socket to connect to.
307 /// Determines the server socket to connect to.
297 ///
308 ///
298 /// If no `$CHGSOCKNAME` is specified, the socket directory will be created
309 /// If no `$CHGSOCKNAME` is specified, the socket directory will be created
299 /// as necessary.
310 /// as necessary.
300 fn prepare_server_socket_path() -> io::Result<PathBuf> {
311 fn prepare_server_socket_path() -> io::Result<PathBuf> {
301 if let Some(s) = env::var_os("CHGSOCKNAME") {
312 if let Some(s) = env::var_os("CHGSOCKNAME") {
302 Ok(PathBuf::from(s))
313 Ok(PathBuf::from(s))
303 } else {
314 } else {
304 let mut path = default_server_socket_dir();
315 let mut path = default_server_socket_dir();
305 create_secure_dir(&path)?;
316 create_secure_dir(&path)?;
306 path.push("server");
317 path.push("server");
307 Ok(path)
318 Ok(path)
308 }
319 }
309 }
320 }
310
321
311 /// Determines the default server socket path as follows.
322 /// Determines the default server socket path as follows.
312 ///
323 ///
313 /// 1. `$XDG_RUNTIME_DIR/chg`
324 /// 1. `$XDG_RUNTIME_DIR/chg`
314 /// 2. `$TMPDIR/chg$UID`
325 /// 2. `$TMPDIR/chg$UID`
315 /// 3. `/tmp/chg$UID`
326 /// 3. `/tmp/chg$UID`
316 pub fn default_server_socket_dir() -> PathBuf {
327 pub fn default_server_socket_dir() -> PathBuf {
317 // XDG_RUNTIME_DIR should be ignored if it has an insufficient permission.
328 // XDG_RUNTIME_DIR should be ignored if it has an insufficient permission.
318 // https://standards.freedesktop.org/basedir-spec/basedir-spec-latest.html
329 // https://standards.freedesktop.org/basedir-spec/basedir-spec-latest.html
319 if let Some(Ok(s)) = env::var_os("XDG_RUNTIME_DIR").map(check_secure_dir) {
330 if let Some(Ok(s)) = env::var_os("XDG_RUNTIME_DIR").map(check_secure_dir) {
320 let mut path = PathBuf::from(s);
331 let mut path = PathBuf::from(s);
321 path.push("chg");
332 path.push("chg");
322 path
333 path
323 } else {
334 } else {
324 let mut path = env::temp_dir();
335 let mut path = env::temp_dir();
325 path.push(format!("chg{}", procutil::get_effective_uid()));
336 path.push(format!("chg{}", procutil::get_effective_uid()));
326 path
337 path
327 }
338 }
328 }
339 }
329
340
330 /// Determines the default hg command.
341 /// Determines the default hg command.
331 pub fn default_hg_command() -> OsString {
342 pub fn default_hg_command() -> OsString {
332 // TODO: maybe allow embedding the path at compile time (or load from hgrc)
343 // TODO: maybe allow embedding the path at compile time (or load from hgrc)
333 env::var_os("CHGHG")
344 env::var_os("CHGHG")
334 .or(env::var_os("HG"))
345 .or(env::var_os("HG"))
335 .unwrap_or(OsStr::new("hg").to_owned())
346 .unwrap_or(OsStr::new("hg").to_owned())
336 }
347 }
337
348
338 fn default_timeout() -> Duration {
349 fn default_timeout() -> Duration {
339 let secs = env::var("CHGTIMEOUT")
350 let secs = env::var("CHGTIMEOUT")
340 .ok()
351 .ok()
341 .and_then(|s| s.parse().ok())
352 .and_then(|s| s.parse().ok())
342 .unwrap_or(60);
353 .unwrap_or(60);
343 Duration::from_secs(secs)
354 Duration::from_secs(secs)
344 }
355 }
345
356
346 /// Creates a directory which the other users cannot access to.
357 /// Creates a directory which the other users cannot access to.
347 ///
358 ///
348 /// If the directory already exists, tests its permission.
359 /// If the directory already exists, tests its permission.
349 fn create_secure_dir<P>(path: P) -> io::Result<()>
360 fn create_secure_dir<P>(path: P) -> io::Result<()>
350 where
361 where
351 P: AsRef<Path>,
362 P: AsRef<Path>,
352 {
363 {
353 DirBuilder::new()
364 DirBuilder::new()
354 .mode(0o700)
365 .mode(0o700)
355 .create(path.as_ref())
366 .create(path.as_ref())
356 .or_else(|err| {
367 .or_else(|err| {
357 if err.kind() == io::ErrorKind::AlreadyExists {
368 if err.kind() == io::ErrorKind::AlreadyExists {
358 check_secure_dir(path).map(|_| ())
369 check_secure_dir(path).map(|_| ())
359 } else {
370 } else {
360 Err(err)
371 Err(err)
361 }
372 }
362 })
373 })
363 }
374 }
364
375
365 fn check_secure_dir<P>(path: P) -> io::Result<P>
376 fn check_secure_dir<P>(path: P) -> io::Result<P>
366 where
377 where
367 P: AsRef<Path>,
378 P: AsRef<Path>,
368 {
379 {
369 let a = fs::symlink_metadata(path.as_ref())?;
380 let a = fs::symlink_metadata(path.as_ref())?;
370 if a.is_dir() && a.uid() == procutil::get_effective_uid() && (a.mode() & 0o777) == 0o700 {
381 if a.is_dir() && a.uid() == procutil::get_effective_uid() && (a.mode() & 0o777) == 0o700 {
371 Ok(path)
382 Ok(path)
372 } else {
383 } else {
373 Err(io::Error::new(io::ErrorKind::Other, "insecure directory"))
384 Err(io::Error::new(io::ErrorKind::Other, "insecure directory"))
374 }
385 }
375 }
386 }
376
387
377 fn check_server_capabilities(spec: &ServerSpec) -> io::Result<()> {
388 fn check_server_capabilities(spec: &ServerSpec) -> io::Result<()> {
378 let unsupported: Vec<_> = REQUIRED_SERVER_CAPABILITIES
389 let unsupported: Vec<_> = REQUIRED_SERVER_CAPABILITIES
379 .iter()
390 .iter()
380 .cloned()
391 .cloned()
381 .filter(|&s| !spec.capabilities.contains(s))
392 .filter(|&s| !spec.capabilities.contains(s))
382 .collect();
393 .collect();
383 if unsupported.is_empty() {
394 if unsupported.is_empty() {
384 Ok(())
395 Ok(())
385 } else {
396 } else {
386 let msg = format!(
397 let msg = format!(
387 "insufficient server capabilities: {}",
398 "insufficient server capabilities: {}",
388 unsupported.join(", ")
399 unsupported.join(", ")
389 );
400 );
390 Err(io::Error::new(io::ErrorKind::Other, msg))
401 Err(io::Error::new(io::ErrorKind::Other, msg))
391 }
402 }
392 }
403 }
393
404
394 /// Collects arguments which need to be passed to the server at start.
405 /// Collects arguments which need to be passed to the server at start.
395 pub fn collect_early_args<I, P>(args: I) -> Vec<OsString>
406 pub fn collect_early_args<I, P>(args: I) -> Vec<OsString>
396 where
407 where
397 I: IntoIterator<Item = P>,
408 I: IntoIterator<Item = P>,
398 P: AsRef<OsStr>,
409 P: AsRef<OsStr>,
399 {
410 {
400 let mut args_iter = args.into_iter();
411 let mut args_iter = args.into_iter();
401 let mut early_args = Vec::new();
412 let mut early_args = Vec::new();
402 while let Some(arg) = args_iter.next() {
413 while let Some(arg) = args_iter.next() {
403 let argb = arg.as_ref().as_bytes();
414 let argb = arg.as_ref().as_bytes();
404 if argb == b"--" {
415 if argb == b"--" {
405 break;
416 break;
406 } else if argb.starts_with(b"--") {
417 } else if argb.starts_with(b"--") {
407 let mut split = argb[2..].splitn(2, |&c| c == b'=');
418 let mut split = argb[2..].splitn(2, |&c| c == b'=');
408 match split.next().unwrap() {
419 match split.next().unwrap() {
409 b"traceback" => {
420 b"traceback" => {
410 if split.next().is_none() {
421 if split.next().is_none() {
411 early_args.push(arg.as_ref().to_owned());
422 early_args.push(arg.as_ref().to_owned());
412 }
423 }
413 }
424 }
414 b"config" | b"cwd" | b"repo" | b"repository" => {
425 b"config" | b"cwd" | b"repo" | b"repository" => {
415 if split.next().is_some() {
426 if split.next().is_some() {
416 // --<flag>=<val>
427 // --<flag>=<val>
417 early_args.push(arg.as_ref().to_owned());
428 early_args.push(arg.as_ref().to_owned());
418 } else {
429 } else {
419 // --<flag> <val>
430 // --<flag> <val>
420 args_iter.next().map(|val| {
431 args_iter.next().map(|val| {
421 early_args.push(arg.as_ref().to_owned());
432 early_args.push(arg.as_ref().to_owned());
422 early_args.push(val.as_ref().to_owned());
433 early_args.push(val.as_ref().to_owned());
423 });
434 });
424 }
435 }
425 }
436 }
426 _ => {}
437 _ => {}
427 }
438 }
428 } else if argb.starts_with(b"-R") {
439 } else if argb.starts_with(b"-R") {
429 if argb.len() > 2 {
440 if argb.len() > 2 {
430 // -R<val>
441 // -R<val>
431 early_args.push(arg.as_ref().to_owned());
442 early_args.push(arg.as_ref().to_owned());
432 } else {
443 } else {
433 // -R <val>
444 // -R <val>
434 args_iter.next().map(|val| {
445 args_iter.next().map(|val| {
435 early_args.push(arg.as_ref().to_owned());
446 early_args.push(arg.as_ref().to_owned());
436 early_args.push(val.as_ref().to_owned());
447 early_args.push(val.as_ref().to_owned());
437 });
448 });
438 }
449 }
439 }
450 }
440 }
451 }
441
452
442 early_args
453 early_args
443 }
454 }
444
455
445 #[cfg(test)]
456 #[cfg(test)]
446 mod tests {
457 mod tests {
447 use super::*;
458 use super::*;
448
459
449 #[test]
460 #[test]
450 fn collect_early_args_some() {
461 fn collect_early_args_some() {
451 assert!(collect_early_args(&[] as &[&OsStr]).is_empty());
462 assert!(collect_early_args(&[] as &[&OsStr]).is_empty());
452 assert!(collect_early_args(&["log"]).is_empty());
463 assert!(collect_early_args(&["log"]).is_empty());
453 assert_eq!(
464 assert_eq!(
454 collect_early_args(&["log", "-Ra", "foo"]),
465 collect_early_args(&["log", "-Ra", "foo"]),
455 os_string_vec_from(&[b"-Ra"])
466 os_string_vec_from(&[b"-Ra"])
456 );
467 );
457 assert_eq!(
468 assert_eq!(
458 collect_early_args(&["log", "-R", "repo", "", "--traceback", "a"]),
469 collect_early_args(&["log", "-R", "repo", "", "--traceback", "a"]),
459 os_string_vec_from(&[b"-R", b"repo", b"--traceback"])
470 os_string_vec_from(&[b"-R", b"repo", b"--traceback"])
460 );
471 );
461 assert_eq!(
472 assert_eq!(
462 collect_early_args(&["log", "--config", "diff.git=1", "-q"]),
473 collect_early_args(&["log", "--config", "diff.git=1", "-q"]),
463 os_string_vec_from(&[b"--config", b"diff.git=1"])
474 os_string_vec_from(&[b"--config", b"diff.git=1"])
464 );
475 );
465 assert_eq!(
476 assert_eq!(
466 collect_early_args(&["--cwd=..", "--repository", "r", "log"]),
477 collect_early_args(&["--cwd=..", "--repository", "r", "log"]),
467 os_string_vec_from(&[b"--cwd=..", b"--repository", b"r"])
478 os_string_vec_from(&[b"--cwd=..", b"--repository", b"r"])
468 );
479 );
469 assert_eq!(
480 assert_eq!(
470 collect_early_args(&["log", "--repo=r", "--repos", "a"]),
481 collect_early_args(&["log", "--repo=r", "--repos", "a"]),
471 os_string_vec_from(&[b"--repo=r"])
482 os_string_vec_from(&[b"--repo=r"])
472 );
483 );
473 }
484 }
474
485
475 #[test]
486 #[test]
476 fn collect_early_args_orphaned() {
487 fn collect_early_args_orphaned() {
477 assert!(collect_early_args(&["log", "-R"]).is_empty());
488 assert!(collect_early_args(&["log", "-R"]).is_empty());
478 assert!(collect_early_args(&["log", "--config"]).is_empty());
489 assert!(collect_early_args(&["log", "--config"]).is_empty());
479 }
490 }
480
491
481 #[test]
492 #[test]
482 fn collect_early_args_unwanted_value() {
493 fn collect_early_args_unwanted_value() {
483 assert!(collect_early_args(&["log", "--traceback="]).is_empty());
494 assert!(collect_early_args(&["log", "--traceback="]).is_empty());
484 }
495 }
485
496
486 fn os_string_vec_from(v: &[&[u8]]) -> Vec<OsString> {
497 fn os_string_vec_from(v: &[&[u8]]) -> Vec<OsString> {
487 v.iter().map(|s| OsStr::from_bytes(s).to_owned()).collect()
498 v.iter().map(|s| OsStr::from_bytes(s).to_owned()).collect()
488 }
499 }
489 }
500 }
General Comments 0
You need to be logged in to leave comments. Login now