##// END OF EJS Templates
rust-chg: reimplement locator by using async/await and tokio-0.2...
Yuya Nishihara -
r45237:a347a329 default
parent child Browse files
Show More
@@ -5,7 +5,7 b''
5
5
6 mod attachio;
6 mod attachio;
7 mod clientext;
7 mod clientext;
8 //pub mod locator;
8 pub mod locator;
9 pub mod message;
9 pub mod message;
10 pub mod procutil;
10 pub mod procutil;
11 mod runcommand;
11 mod runcommand;
@@ -5,7 +5,6 b''
5
5
6 //! Utility for locating command-server process.
6 //! Utility for locating command-server process.
7
7
8 use futures::future::{self, Either, Loop};
9 use log::debug;
8 use log::debug;
10 use std::env;
9 use std::env;
11 use std::ffi::{OsStr, OsString};
10 use std::ffi::{OsStr, OsString};
@@ -14,14 +13,11 b' use std::io;'
14 use std::os::unix::ffi::{OsStrExt, OsStringExt};
13 use std::os::unix::ffi::{OsStrExt, OsStringExt};
15 use std::os::unix::fs::{DirBuilderExt, MetadataExt};
14 use std::os::unix::fs::{DirBuilderExt, MetadataExt};
16 use std::path::{Path, PathBuf};
15 use std::path::{Path, PathBuf};
17 use std::process;
16 use std::process::{self, Child, Command};
18 use std::time::Duration;
17 use std::time::{Duration, Instant};
19 use tokio::prelude::*;
20 use tokio::process::{Child, Command};
21 use tokio::time;
18 use tokio::time;
22 use tokio_hglib::UnixClient;
23
19
24 use crate::clientext::ChgClientExt;
20 use crate::clientext::ChgClient;
25 use crate::message::{Instruction, ServerSpec};
21 use crate::message::{Instruction, ServerSpec};
26 use crate::procutil;
22 use crate::procutil;
27
23
@@ -82,21 +78,19 b' impl Locator {'
82 /// Connects to the server.
78 /// Connects to the server.
83 ///
79 ///
84 /// The server process will be spawned if not running.
80 /// The server process will be spawned if not running.
85 pub fn connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> {
81 pub async fn connect(&mut self) -> io::Result<ChgClient> {
86 future::loop_fn((self, 0), |(loc, cnt)| {
82 for _cnt in 0..10 {
87 if cnt < 10 {
83 let mut client = self.try_connect().await?;
88 let fut = loc
84 let instructions = client.validate(&self.hg_early_args).await?;
89 .try_connect()
85 let reconnect = self.run_instructions(&instructions)?;
90 .and_then(|(loc, client)| {
86 if !reconnect {
91 client
87 return Ok(client);
92 .validate(&loc.hg_early_args)
88 }
93 .map(|(client, instructions)| (loc, client, instructions))
89 }
94 })
90
95 .and_then(move |(loc, client, instructions)| {
91 // TODO: unindent
96 loc.run_instructions(client, instructions, cnt)
92 {
97 });
93 {
98 Either::A(fut)
99 } else {
100 let msg = format!(
94 let msg = format!(
101 concat!(
95 concat!(
102 "too many redirections.\n",
96 "too many redirections.\n",
@@ -105,20 +99,17 b' impl Locator {'
105 "before executing hg. If you have to use a ",
99 "before executing hg. If you have to use a ",
106 "wrapper, wrap chg instead of hg.",
100 "wrapper, wrap chg instead of hg.",
107 ),
101 ),
108 loc.hg_command
102 self.hg_command
109 );
103 );
110 Either::B(future::err(io::Error::new(io::ErrorKind::Other, msg)))
104 Err(io::Error::new(io::ErrorKind::Other, msg))
111 }
105 }
112 })
106 }
113 }
107 }
114
108
115 /// Runs instructions received from the server.
109 /// Runs instructions received from the server.
116 fn run_instructions(
110 ///
117 mut self,
111 /// Returns true if the client should try connecting to the other server.
118 client: UnixClient,
112 fn run_instructions(&mut self, instructions: &[Instruction]) -> io::Result<bool> {
119 instructions: Vec<Instruction>,
120 cnt: usize,
121 ) -> io::Result<Loop<(Self, UnixClient), (Self, usize)>> {
122 let mut reconnect = false;
113 let mut reconnect = false;
123 for inst in instructions {
114 for inst in instructions {
124 debug!("instruction: {:?}", inst);
115 debug!("instruction: {:?}", inst);
@@ -126,7 +117,7 b' impl Locator {'
126 Instruction::Exit(_) => {
117 Instruction::Exit(_) => {
127 // Just returns the current connection to run the
118 // Just returns the current connection to run the
128 // unparsable command and report the error
119 // unparsable command and report the error
129 return Ok(Loop::Break((self, client)));
120 return Ok(false);
130 }
121 }
131 Instruction::Reconnect => {
122 Instruction::Reconnect => {
132 reconnect = true;
123 reconnect = true;
@@ -139,7 +130,7 b' impl Locator {'
139 );
130 );
140 return Err(io::Error::new(io::ErrorKind::InvalidData, msg));
131 return Err(io::Error::new(io::ErrorKind::InvalidData, msg));
141 }
132 }
142 self.redirect_sock_path = Some(path);
133 self.redirect_sock_path = Some(path.to_owned());
143 reconnect = true;
134 reconnect = true;
144 }
135 }
145 Instruction::Unlink(path) => {
136 Instruction::Unlink(path) => {
@@ -155,25 +146,22 b' impl Locator {'
155 }
146 }
156 }
147 }
157
148
158 if reconnect {
149 Ok(reconnect)
159 Ok(Loop::Continue((self, cnt + 1)))
160 } else {
161 Ok(Loop::Break((self, client)))
162 }
163 }
150 }
164
151
165 /// Tries to connect to the existing server, or spawns new if not running.
152 /// Tries to connect to the existing server, or spawns new if not running.
166 fn try_connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> {
153 async fn try_connect(&mut self) -> io::Result<ChgClient> {
167 let sock_path = self
154 let sock_path = self
168 .redirect_sock_path
155 .redirect_sock_path
169 .as_ref()
156 .as_ref()
170 .unwrap_or(&self.base_sock_path)
157 .unwrap_or(&self.base_sock_path)
171 .clone();
158 .clone();
172 debug!("try connect to {}", sock_path.display());
159 debug!("try connect to {}", sock_path.display());
173 UnixClient::connect(sock_path)
160 // TODO: unindent
174 .then(|res| {
161 {
175 match res {
162 {
176 Ok(client) => Either::A(future::ok((self, client))),
163 let mut client = match ChgClient::connect(sock_path).await {
164 Ok(client) => client,
177 Err(_) => {
165 Err(_) => {
178 // Prevent us from being re-connected to the outdated
166 // Prevent us from being re-connected to the outdated
179 // master server: We were told by the server to redirect
167 // master server: We were told by the server to redirect
@@ -184,35 +172,23 b' impl Locator {'
184 fs::remove_file(&self.base_sock_path).unwrap_or(());
172 fs::remove_file(&self.base_sock_path).unwrap_or(());
185 // may race
173 // may race
186 }
174 }
187 Either::B(self.spawn_connect())
175 self.spawn_connect().await?
188 }
176 }
189 }
177 };
190 })
191 .and_then(|(loc, client)| {
192 check_server_capabilities(client.server_spec())?;
178 check_server_capabilities(client.server_spec())?;
193 Ok((loc, client))
194 })
195 .and_then(|(loc, client)| {
196 // It's purely optional, and the server might not support this command.
179 // It's purely optional, and the server might not support this command.
197 if client.server_spec().capabilities.contains("setprocname") {
180 if client.server_spec().capabilities.contains("setprocname") {
198 let fut = client
181 client
199 .set_process_name(format!("chg[worker/{}]", loc.process_id))
182 .set_process_name(format!("chg[worker/{}]", self.process_id))
200 .map(|client| (loc, client));
183 .await?;
201 Either::A(fut)
202 } else {
203 Either::B(future::ok((loc, client)))
204 }
184 }
205 })
185 client.set_current_dir(&self.current_dir).await?;
206 .and_then(|(loc, client)| {
207 client
186 client
208 .set_current_dir(&loc.current_dir)
187 .set_env_vars_os(self.env_vars.iter().cloned())
209 .map(|client| (loc, client))
188 .await?;
210 })
189 Ok(client)
211 .and_then(|(loc, client)| {
190 }
212 client
191 }
213 .set_env_vars_os(loc.env_vars.iter().cloned())
214 .map(|client| (loc, client))
215 })
216 }
192 }
217
193
218 /// Spawns new server process and connects to it.
194 /// Spawns new server process and connects to it.
@@ -220,10 +196,10 b' impl Locator {'
220 /// The server will be spawned at the current working directory, then
196 /// The server will be spawned at the current working directory, then
221 /// chdir to "/", so that the server will load configs from the target
197 /// chdir to "/", so that the server will load configs from the target
222 /// repository.
198 /// repository.
223 fn spawn_connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> {
199 async fn spawn_connect(&mut self) -> io::Result<ChgClient> {
224 let sock_path = self.temp_sock_path();
200 let sock_path = self.temp_sock_path();
225 debug!("start cmdserver at {}", sock_path.display());
201 debug!("start cmdserver at {}", sock_path.display());
226 Command::new(&self.hg_command)
202 let server = Command::new(&self.hg_command)
227 .arg("serve")
203 .arg("serve")
228 .arg("--cmdserver")
204 .arg("--cmdserver")
229 .arg("chgunix")
205 .arg("chgunix")
@@ -236,68 +212,54 b' impl Locator {'
236 .env_clear()
212 .env_clear()
237 .envs(self.env_vars.iter().cloned())
213 .envs(self.env_vars.iter().cloned())
238 .env("CHGINTERNALMARK", "")
214 .env("CHGINTERNALMARK", "")
239 .spawn()
215 .spawn()?;
240 .into_future()
216 let client = self.connect_spawned(server, &sock_path).await?;
241 .and_then(|server| self.connect_spawned(server, sock_path))
217 // TODO: unindent
242 .and_then(|(loc, client, sock_path)| {
218 {
219 {
243 debug!(
220 debug!(
244 "rename {} to {}",
221 "rename {} to {}",
245 sock_path.display(),
222 sock_path.display(),
246 loc.base_sock_path.display()
223 self.base_sock_path.display()
247 );
224 );
248 fs::rename(&sock_path, &loc.base_sock_path)?;
225 fs::rename(&sock_path, &self.base_sock_path)?;
249 Ok((loc, client))
226 Ok(client)
250 })
227 }
228 }
251 }
229 }
252
230
253 /// Tries to connect to the just spawned server repeatedly until timeout
231 /// Tries to connect to the just spawned server repeatedly until timeout
254 /// exceeded.
232 /// exceeded.
255 fn connect_spawned(
233 async fn connect_spawned(
256 self,
234 &mut self,
257 server: Child,
235 mut server: Child,
258 sock_path: PathBuf,
236 sock_path: &Path,
259 ) -> impl Future<Item = (Self, UnixClient, PathBuf), Error = io::Error> {
237 ) -> io::Result<ChgClient> {
260 debug!("try connect to {} repeatedly", sock_path.display());
238 debug!("try connect to {} repeatedly", sock_path.display());
261 let connect = future::loop_fn(sock_path, |sock_path| {
262 UnixClient::connect(sock_path.clone()).then(|res| {
263 match res {
264 Ok(client) => Either::A(future::ok(Loop::Break((client, sock_path)))),
265 Err(_) => {
266 // try again with slight delay
267 let fut = time::delay_for(Duration::from_millis(10))
268 .map(|()| Loop::Continue(sock_path))
269 .map_err(|err| io::Error::new(io::ErrorKind::Other, err));
270 Either::B(fut)
271 }
272 }
273 })
274 });
275
276 // waits for either connection established or server failed to start
239 // waits for either connection established or server failed to start
277 connect
240 let start_time = Instant::now();
278 .select2(server)
241 while start_time.elapsed() < self.timeout {
279 .map_err(|res| res.split().0)
242 if let Ok(client) = ChgClient::connect(&sock_path).await {
280 .timeout(self.timeout)
243 // server handle is dropped here, but the detached process
281 .map_err(|err| {
244 // will continue running in background
282 err.into_inner().unwrap_or_else(|| {
245 return Ok(client);
283 io::Error::new(
246 }
284 io::ErrorKind::TimedOut,
247
285 "timed out while connecting to server",
248 if let Some(st) = server.try_wait()? {
286 )
249 return Err(io::Error::new(
287 })
250 io::ErrorKind::Other,
288 })
251 format!("server exited too early: {}", st),
289 .and_then(|res| {
252 ));
290 match res {
253 }
291 Either::A(((client, sock_path), server)) => {
254
292 server.forget(); // continue to run in background
255 // try again with slight delay
293 Ok((self, client, sock_path))
256 time::delay_for(Duration::from_millis(10)).await;
294 }
257 }
295 Either::B((st, _)) => Err(io::Error::new(
258
296 io::ErrorKind::Other,
259 Err(io::Error::new(
297 format!("server exited too early: {}", st),
260 io::ErrorKind::TimedOut,
298 )),
261 "timed out while connecting to server",
299 }
262 ))
300 })
301 }
263 }
302 }
264 }
303
265
General Comments 0
You need to be logged in to leave comments. Login now