Show More
@@ -5,7 +5,7 b'' | |||||
5 |
|
5 | |||
6 | mod attachio; |
|
6 | mod attachio; | |
7 | mod clientext; |
|
7 | mod clientext; | |
8 |
|
|
8 | pub mod locator; | |
9 | pub mod message; |
|
9 | pub mod message; | |
10 | pub mod procutil; |
|
10 | pub mod procutil; | |
11 | mod runcommand; |
|
11 | mod runcommand; |
@@ -5,7 +5,6 b'' | |||||
5 |
|
5 | |||
6 | //! Utility for locating command-server process. |
|
6 | //! Utility for locating command-server process. | |
7 |
|
7 | |||
8 | use futures::future::{self, Either, Loop}; |
|
|||
9 | use log::debug; |
|
8 | use log::debug; | |
10 | use std::env; |
|
9 | use std::env; | |
11 | use std::ffi::{OsStr, OsString}; |
|
10 | use std::ffi::{OsStr, OsString}; | |
@@ -14,14 +13,11 b' use std::io;' | |||||
14 | use std::os::unix::ffi::{OsStrExt, OsStringExt}; |
|
13 | use std::os::unix::ffi::{OsStrExt, OsStringExt}; | |
15 | use std::os::unix::fs::{DirBuilderExt, MetadataExt}; |
|
14 | use std::os::unix::fs::{DirBuilderExt, MetadataExt}; | |
16 | use std::path::{Path, PathBuf}; |
|
15 | use std::path::{Path, PathBuf}; | |
17 | use std::process; |
|
16 | use std::process::{self, Child, Command}; | |
18 | use std::time::Duration; |
|
17 | use std::time::{Duration, Instant}; | |
19 | use tokio::prelude::*; |
|
|||
20 | use tokio::process::{Child, Command}; |
|
|||
21 | use tokio::time; |
|
18 | use tokio::time; | |
22 | use tokio_hglib::UnixClient; |
|
|||
23 |
|
19 | |||
24 |
use crate::clientext::ChgClient |
|
20 | use crate::clientext::ChgClient; | |
25 | use crate::message::{Instruction, ServerSpec}; |
|
21 | use crate::message::{Instruction, ServerSpec}; | |
26 | use crate::procutil; |
|
22 | use crate::procutil; | |
27 |
|
23 | |||
@@ -82,21 +78,19 b' impl Locator {' | |||||
82 | /// Connects to the server. |
|
78 | /// Connects to the server. | |
83 | /// |
|
79 | /// | |
84 | /// The server process will be spawned if not running. |
|
80 | /// The server process will be spawned if not running. | |
85 | pub fn connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> { |
|
81 | pub async fn connect(&mut self) -> io::Result<ChgClient> { | |
86 | future::loop_fn((self, 0), |(loc, cnt)| { |
|
82 | for _cnt in 0..10 { | |
87 | if cnt < 10 { |
|
83 | let mut client = self.try_connect().await?; | |
88 | let fut = loc |
|
84 | let instructions = client.validate(&self.hg_early_args).await?; | |
89 | .try_connect() |
|
85 | let reconnect = self.run_instructions(&instructions)?; | |
90 | .and_then(|(loc, client)| { |
|
86 | if !reconnect { | |
91 |
|
|
87 | return Ok(client); | |
92 | .validate(&loc.hg_early_args) |
|
88 | } | |
93 | .map(|(client, instructions)| (loc, client, instructions)) |
|
89 | } | |
94 | }) |
|
90 | ||
95 | .and_then(move |(loc, client, instructions)| { |
|
91 | // TODO: unindent | |
96 | loc.run_instructions(client, instructions, cnt) |
|
92 | { | |
97 |
|
|
93 | { | |
98 | Either::A(fut) |
|
|||
99 | } else { |
|
|||
100 | let msg = format!( |
|
94 | let msg = format!( | |
101 | concat!( |
|
95 | concat!( | |
102 | "too many redirections.\n", |
|
96 | "too many redirections.\n", | |
@@ -105,20 +99,17 b' impl Locator {' | |||||
105 | "before executing hg. If you have to use a ", |
|
99 | "before executing hg. If you have to use a ", | |
106 | "wrapper, wrap chg instead of hg.", |
|
100 | "wrapper, wrap chg instead of hg.", | |
107 | ), |
|
101 | ), | |
108 |
|
|
102 | self.hg_command | |
109 | ); |
|
103 | ); | |
110 |
E |
|
104 | Err(io::Error::new(io::ErrorKind::Other, msg)) | |
111 | } |
|
105 | } | |
112 |
} |
|
106 | } | |
113 | } |
|
107 | } | |
114 |
|
108 | |||
115 | /// Runs instructions received from the server. |
|
109 | /// Runs instructions received from the server. | |
116 | fn run_instructions( |
|
110 | /// | |
117 | mut self, |
|
111 | /// Returns true if the client should try connecting to the other server. | |
118 | client: UnixClient, |
|
112 | fn run_instructions(&mut self, instructions: &[Instruction]) -> io::Result<bool> { | |
119 | instructions: Vec<Instruction>, |
|
|||
120 | cnt: usize, |
|
|||
121 | ) -> io::Result<Loop<(Self, UnixClient), (Self, usize)>> { |
|
|||
122 | let mut reconnect = false; |
|
113 | let mut reconnect = false; | |
123 | for inst in instructions { |
|
114 | for inst in instructions { | |
124 | debug!("instruction: {:?}", inst); |
|
115 | debug!("instruction: {:?}", inst); | |
@@ -126,7 +117,7 b' impl Locator {' | |||||
126 | Instruction::Exit(_) => { |
|
117 | Instruction::Exit(_) => { | |
127 | // Just returns the current connection to run the |
|
118 | // Just returns the current connection to run the | |
128 | // unparsable command and report the error |
|
119 | // unparsable command and report the error | |
129 |
return Ok( |
|
120 | return Ok(false); | |
130 | } |
|
121 | } | |
131 | Instruction::Reconnect => { |
|
122 | Instruction::Reconnect => { | |
132 | reconnect = true; |
|
123 | reconnect = true; | |
@@ -139,7 +130,7 b' impl Locator {' | |||||
139 | ); |
|
130 | ); | |
140 | return Err(io::Error::new(io::ErrorKind::InvalidData, msg)); |
|
131 | return Err(io::Error::new(io::ErrorKind::InvalidData, msg)); | |
141 | } |
|
132 | } | |
142 | self.redirect_sock_path = Some(path); |
|
133 | self.redirect_sock_path = Some(path.to_owned()); | |
143 | reconnect = true; |
|
134 | reconnect = true; | |
144 | } |
|
135 | } | |
145 | Instruction::Unlink(path) => { |
|
136 | Instruction::Unlink(path) => { | |
@@ -155,25 +146,22 b' impl Locator {' | |||||
155 | } |
|
146 | } | |
156 | } |
|
147 | } | |
157 |
|
148 | |||
158 |
|
|
149 | Ok(reconnect) | |
159 | Ok(Loop::Continue((self, cnt + 1))) |
|
|||
160 | } else { |
|
|||
161 | Ok(Loop::Break((self, client))) |
|
|||
162 | } |
|
|||
163 | } |
|
150 | } | |
164 |
|
151 | |||
165 | /// Tries to connect to the existing server, or spawns new if not running. |
|
152 | /// Tries to connect to the existing server, or spawns new if not running. | |
166 | fn try_connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> { |
|
153 | async fn try_connect(&mut self) -> io::Result<ChgClient> { | |
167 | let sock_path = self |
|
154 | let sock_path = self | |
168 | .redirect_sock_path |
|
155 | .redirect_sock_path | |
169 | .as_ref() |
|
156 | .as_ref() | |
170 | .unwrap_or(&self.base_sock_path) |
|
157 | .unwrap_or(&self.base_sock_path) | |
171 | .clone(); |
|
158 | .clone(); | |
172 | debug!("try connect to {}", sock_path.display()); |
|
159 | debug!("try connect to {}", sock_path.display()); | |
173 | UnixClient::connect(sock_path) |
|
160 | // TODO: unindent | |
174 | .then(|res| { |
|
161 | { | |
175 |
|
|
162 | { | |
176 | Ok(client) => Either::A(future::ok((self, client))), |
|
163 | let mut client = match ChgClient::connect(sock_path).await { | |
|
164 | Ok(client) => client, | |||
177 | Err(_) => { |
|
165 | Err(_) => { | |
178 | // Prevent us from being re-connected to the outdated |
|
166 | // Prevent us from being re-connected to the outdated | |
179 | // master server: We were told by the server to redirect |
|
167 | // master server: We were told by the server to redirect | |
@@ -184,35 +172,23 b' impl Locator {' | |||||
184 | fs::remove_file(&self.base_sock_path).unwrap_or(()); |
|
172 | fs::remove_file(&self.base_sock_path).unwrap_or(()); | |
185 | // may race |
|
173 | // may race | |
186 | } |
|
174 | } | |
187 |
|
|
175 | self.spawn_connect().await? | |
188 | } |
|
176 | } | |
189 | } |
|
177 | }; | |
190 | }) |
|
|||
191 | .and_then(|(loc, client)| { |
|
|||
192 | check_server_capabilities(client.server_spec())?; |
|
178 | check_server_capabilities(client.server_spec())?; | |
193 | Ok((loc, client)) |
|
|||
194 | }) |
|
|||
195 | .and_then(|(loc, client)| { |
|
|||
196 | // It's purely optional, and the server might not support this command. |
|
179 | // It's purely optional, and the server might not support this command. | |
197 | if client.server_spec().capabilities.contains("setprocname") { |
|
180 | if client.server_spec().capabilities.contains("setprocname") { | |
198 |
|
|
181 | client | |
199 |
.set_process_name(format!("chg[worker/{}]", |
|
182 | .set_process_name(format!("chg[worker/{}]", self.process_id)) | |
200 |
. |
|
183 | .await?; | |
201 | Either::A(fut) |
|
|||
202 | } else { |
|
|||
203 | Either::B(future::ok((loc, client))) |
|
|||
204 | } |
|
184 | } | |
205 | }) |
|
185 | client.set_current_dir(&self.current_dir).await?; | |
206 | .and_then(|(loc, client)| { |
|
|||
207 | client |
|
186 | client | |
208 | .set_current_dir(&loc.current_dir) |
|
187 | .set_env_vars_os(self.env_vars.iter().cloned()) | |
209 |
. |
|
188 | .await?; | |
210 |
|
|
189 | Ok(client) | |
211 | .and_then(|(loc, client)| { |
|
190 | } | |
212 | client |
|
191 | } | |
213 | .set_env_vars_os(loc.env_vars.iter().cloned()) |
|
|||
214 | .map(|client| (loc, client)) |
|
|||
215 | }) |
|
|||
216 | } |
|
192 | } | |
217 |
|
193 | |||
218 | /// Spawns new server process and connects to it. |
|
194 | /// Spawns new server process and connects to it. | |
@@ -220,10 +196,10 b' impl Locator {' | |||||
220 | /// The server will be spawned at the current working directory, then |
|
196 | /// The server will be spawned at the current working directory, then | |
221 | /// chdir to "/", so that the server will load configs from the target |
|
197 | /// chdir to "/", so that the server will load configs from the target | |
222 | /// repository. |
|
198 | /// repository. | |
223 | fn spawn_connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> { |
|
199 | async fn spawn_connect(&mut self) -> io::Result<ChgClient> { | |
224 | let sock_path = self.temp_sock_path(); |
|
200 | let sock_path = self.temp_sock_path(); | |
225 | debug!("start cmdserver at {}", sock_path.display()); |
|
201 | debug!("start cmdserver at {}", sock_path.display()); | |
226 | Command::new(&self.hg_command) |
|
202 | let server = Command::new(&self.hg_command) | |
227 | .arg("serve") |
|
203 | .arg("serve") | |
228 | .arg("--cmdserver") |
|
204 | .arg("--cmdserver") | |
229 | .arg("chgunix") |
|
205 | .arg("chgunix") | |
@@ -236,68 +212,54 b' impl Locator {' | |||||
236 | .env_clear() |
|
212 | .env_clear() | |
237 | .envs(self.env_vars.iter().cloned()) |
|
213 | .envs(self.env_vars.iter().cloned()) | |
238 | .env("CHGINTERNALMARK", "") |
|
214 | .env("CHGINTERNALMARK", "") | |
239 | .spawn() |
|
215 | .spawn()?; | |
240 | .into_future() |
|
216 | let client = self.connect_spawned(server, &sock_path).await?; | |
241 | .and_then(|server| self.connect_spawned(server, sock_path)) |
|
217 | // TODO: unindent | |
242 | .and_then(|(loc, client, sock_path)| { |
|
218 | { | |
|
219 | { | |||
243 | debug!( |
|
220 | debug!( | |
244 | "rename {} to {}", |
|
221 | "rename {} to {}", | |
245 | sock_path.display(), |
|
222 | sock_path.display(), | |
246 |
|
|
223 | self.base_sock_path.display() | |
247 | ); |
|
224 | ); | |
248 |
fs::rename(&sock_path, & |
|
225 | fs::rename(&sock_path, &self.base_sock_path)?; | |
249 |
Ok( |
|
226 | Ok(client) | |
250 |
} |
|
227 | } | |
|
228 | } | |||
251 | } |
|
229 | } | |
252 |
|
230 | |||
253 | /// Tries to connect to the just spawned server repeatedly until timeout |
|
231 | /// Tries to connect to the just spawned server repeatedly until timeout | |
254 | /// exceeded. |
|
232 | /// exceeded. | |
255 | fn connect_spawned( |
|
233 | async fn connect_spawned( | |
256 | self, |
|
234 | &mut self, | |
257 | server: Child, |
|
235 | mut server: Child, | |
258 |
sock_path: Path |
|
236 | sock_path: &Path, | |
259 | ) -> impl Future<Item = (Self, UnixClient, PathBuf), Error = io::Error> { |
|
237 | ) -> io::Result<ChgClient> { | |
260 | debug!("try connect to {} repeatedly", sock_path.display()); |
|
238 | debug!("try connect to {} repeatedly", sock_path.display()); | |
261 | let connect = future::loop_fn(sock_path, |sock_path| { |
|
|||
262 | UnixClient::connect(sock_path.clone()).then(|res| { |
|
|||
263 | match res { |
|
|||
264 | Ok(client) => Either::A(future::ok(Loop::Break((client, sock_path)))), |
|
|||
265 | Err(_) => { |
|
|||
266 | // try again with slight delay |
|
|||
267 | let fut = time::delay_for(Duration::from_millis(10)) |
|
|||
268 | .map(|()| Loop::Continue(sock_path)) |
|
|||
269 | .map_err(|err| io::Error::new(io::ErrorKind::Other, err)); |
|
|||
270 | Either::B(fut) |
|
|||
271 | } |
|
|||
272 | } |
|
|||
273 | }) |
|
|||
274 | }); |
|
|||
275 |
|
||||
276 | // waits for either connection established or server failed to start |
|
239 | // waits for either connection established or server failed to start | |
277 | connect |
|
240 | let start_time = Instant::now(); | |
278 | .select2(server) |
|
241 | while start_time.elapsed() < self.timeout { | |
279 | .map_err(|res| res.split().0) |
|
242 | if let Ok(client) = ChgClient::connect(&sock_path).await { | |
280 | .timeout(self.timeout) |
|
243 | // server handle is dropped here, but the detached process | |
281 | .map_err(|err| { |
|
244 | // will continue running in background | |
282 | err.into_inner().unwrap_or_else(|| { |
|
245 | return Ok(client); | |
283 | io::Error::new( |
|
246 | } | |
284 | io::ErrorKind::TimedOut, |
|
247 | ||
285 | "timed out while connecting to server", |
|
248 | if let Some(st) = server.try_wait()? { | |
286 | ) |
|
249 | return Err(io::Error::new( | |
287 | }) |
|
250 | io::ErrorKind::Other, | |
288 | }) |
|
251 | format!("server exited too early: {}", st), | |
289 | .and_then(|res| { |
|
252 | )); | |
290 |
|
|
253 | } | |
291 | Either::A(((client, sock_path), server)) => { |
|
254 | ||
292 | server.forget(); // continue to run in background |
|
255 | // try again with slight delay | |
293 | Ok((self, client, sock_path)) |
|
256 | time::delay_for(Duration::from_millis(10)).await; | |
294 |
|
|
257 | } | |
295 | Either::B((st, _)) => Err(io::Error::new( |
|
258 | ||
296 | io::ErrorKind::Other, |
|
259 | Err(io::Error::new( | |
297 | format!("server exited too early: {}", st), |
|
260 | io::ErrorKind::TimedOut, | |
298 | )), |
|
261 | "timed out while connecting to server", | |
299 | } |
|
262 | )) | |
300 | }) |
|
|||
301 | } |
|
263 | } | |
302 | } |
|
264 | } | |
303 |
|
265 |
General Comments 0
You need to be logged in to leave comments.
Login now