diff --git a/rust/chg/src/locator.rs b/rust/chg/src/locator.rs --- a/rust/chg/src/locator.rs +++ b/rust/chg/src/locator.rs @@ -21,10 +21,11 @@ use tokio_process::{Child, CommandExt}; use tokio_timer; use super::clientext::ChgClientExt; -use super::message::ServerSpec; +use super::message::{Instruction, ServerSpec}; use super::procutil; -const REQUIRED_SERVER_CAPABILITIES: &[&str] = &["attachio", "chdir", "runcommand", "setenv"]; +const REQUIRED_SERVER_CAPABILITIES: &[&str] = + &["attachio", "chdir", "runcommand", "setenv", "validate"]; /// Helper to connect to and spawn a server process. #[derive(Clone, Debug)] @@ -35,6 +36,7 @@ pub struct Locator { env_vars: Vec<(OsString, OsString)>, process_id: u32, base_sock_path: PathBuf, + redirect_sock_path: Option, timeout: Duration, } @@ -51,6 +53,7 @@ impl Locator { env_vars: env::vars_os().collect(), process_id: process::id(), base_sock_path: prepare_server_socket_path()?, + redirect_sock_path: None, timeout: default_timeout(), }) } @@ -77,16 +80,110 @@ impl Locator { /// /// The server process will be spawned if not running. pub fn connect(self) -> impl Future { - self.try_connect() + future::loop_fn((self, 0), |(loc, cnt)| { + if cnt < 10 { + let fut = loc + .try_connect() + .and_then(|(loc, client)| { + client + .validate(&loc.hg_early_args) + .map(|(client, instructions)| (loc, client, instructions)) + }) + .and_then(move |(loc, client, instructions)| { + loc.run_instructions(client, instructions, cnt) + }); + Either::A(fut) + } else { + let msg = format!( + concat!( + "too many redirections.\n", + "Please make sure {:?} is not a wrapper which ", + "changes sensitive environment variables ", + "before executing hg. If you have to use a ", + "wrapper, wrap chg instead of hg.", + ), + loc.hg_command + ); + Either::B(future::err(io::Error::new(io::ErrorKind::Other, msg))) + } + }) + } + + /// Runs instructions received from the server. + fn run_instructions( + mut self, + client: UnixClient, + instructions: Vec, + cnt: usize, + ) -> io::Result> { + let mut reconnect = false; + for inst in instructions { + debug!("instruction: {:?}", inst); + match inst { + Instruction::Exit(_) => { + // Just returns the current connection to run the + // unparsable command and report the error + return Ok(Loop::Break((self, client))); + } + Instruction::Reconnect => { + reconnect = true; + } + Instruction::Redirect(path) => { + if path.parent() != self.base_sock_path.parent() { + let msg = format!( + "insecure redirect instruction from server: {}", + path.display() + ); + return Err(io::Error::new(io::ErrorKind::InvalidData, msg)); + } + self.redirect_sock_path = Some(path); + reconnect = true; + } + Instruction::Unlink(path) => { + if path.parent() != self.base_sock_path.parent() { + let msg = format!( + "insecure unlink instruction from server: {}", + path.display() + ); + return Err(io::Error::new(io::ErrorKind::InvalidData, msg)); + } + fs::remove_file(path).unwrap_or(()); // may race + } + } + } + + if reconnect { + Ok(Loop::Continue((self, cnt + 1))) + } else { + Ok(Loop::Break((self, client))) + } } /// Tries to connect to the existing server, or spawns new if not running. fn try_connect(self) -> impl Future { - debug!("try connect to {}", self.base_sock_path.display()); - UnixClient::connect(self.base_sock_path.clone()) - .then(|res| match res { - Ok(client) => Either::A(future::ok((self, client))), - Err(_) => Either::B(self.spawn_connect()), + let sock_path = self + .redirect_sock_path + .as_ref() + .unwrap_or(&self.base_sock_path) + .clone(); + debug!("try connect to {}", sock_path.display()); + UnixClient::connect(sock_path) + .then(|res| { + match res { + Ok(client) => Either::A(future::ok((self, client))), + Err(_) => { + // Prevent us from being re-connected to the outdated + // master server: We were told by the server to redirect + // to redirect_sock_path, which didn't work. We do not + // want to connect to the same master server again + // because it would probably tell us the same thing. + if self.redirect_sock_path.is_some() { + fs::remove_file(&self.base_sock_path).unwrap_or(()); + // may race + } + Either::B(self.spawn_connect()) + } + } }) .and_then(|(loc, client)| { check_server_capabilities(client.server_spec())?;