##// END OF EJS Templates
rust-chg: use "crate::" to import local modules...
Yuya Nishihara -
r45180:6bef9d43 default
parent child Browse files
Show More
@@ -1,114 +1,114 b''
1 // Copyright 2018 Yuya Nishihara <yuya@tcha.org>
1 // Copyright 2018 Yuya Nishihara <yuya@tcha.org>
2 //
2 //
3 // This software may be used and distributed according to the terms of the
3 // This software may be used and distributed according to the terms of the
4 // GNU General Public License version 2 or any later version.
4 // GNU General Public License version 2 or any later version.
5
5
6 //! Functions to send client-side fds over the command server channel.
6 //! Functions to send client-side fds over the command server channel.
7
7
8 use futures::{try_ready, Async, Future, Poll};
8 use futures::{try_ready, Async, Future, Poll};
9 use std::io;
9 use std::io;
10 use std::os::unix::io::AsRawFd;
10 use std::os::unix::io::AsRawFd;
11 use tokio_hglib::codec::ChannelMessage;
11 use tokio_hglib::codec::ChannelMessage;
12 use tokio_hglib::protocol::MessageLoop;
12 use tokio_hglib::protocol::MessageLoop;
13 use tokio_hglib::{Client, Connection};
13 use tokio_hglib::{Client, Connection};
14
14
15 use super::message;
15 use crate::message;
16 use super::procutil;
16 use crate::procutil;
17
17
18 /// Future to send client-side fds over the command server channel.
18 /// Future to send client-side fds over the command server channel.
19 ///
19 ///
20 /// This works as follows:
20 /// This works as follows:
21 /// 1. Client sends "attachio" request.
21 /// 1. Client sends "attachio" request.
22 /// 2. Server sends back 1-byte input request.
22 /// 2. Server sends back 1-byte input request.
23 /// 3. Client sends fds with 1-byte dummy payload in response.
23 /// 3. Client sends fds with 1-byte dummy payload in response.
24 /// 4. Server returns the number of the fds received.
24 /// 4. Server returns the number of the fds received.
25 ///
25 ///
26 /// If the stderr is omitted, it will be redirected to the stdout. This
26 /// If the stderr is omitted, it will be redirected to the stdout. This
27 /// allows us to attach the pager stdin to both stdout and stderr, and
27 /// allows us to attach the pager stdin to both stdout and stderr, and
28 /// dispose of the client-side handle once attached.
28 /// dispose of the client-side handle once attached.
29 #[must_use = "futures do nothing unless polled"]
29 #[must_use = "futures do nothing unless polled"]
30 pub struct AttachIo<C, I, O, E>
30 pub struct AttachIo<C, I, O, E>
31 where
31 where
32 C: Connection,
32 C: Connection,
33 {
33 {
34 msg_loop: MessageLoop<C>,
34 msg_loop: MessageLoop<C>,
35 stdin: I,
35 stdin: I,
36 stdout: O,
36 stdout: O,
37 stderr: Option<E>,
37 stderr: Option<E>,
38 }
38 }
39
39
40 impl<C, I, O, E> AttachIo<C, I, O, E>
40 impl<C, I, O, E> AttachIo<C, I, O, E>
41 where
41 where
42 C: Connection + AsRawFd,
42 C: Connection + AsRawFd,
43 I: AsRawFd,
43 I: AsRawFd,
44 O: AsRawFd,
44 O: AsRawFd,
45 E: AsRawFd,
45 E: AsRawFd,
46 {
46 {
47 pub fn with_client(
47 pub fn with_client(
48 client: Client<C>,
48 client: Client<C>,
49 stdin: I,
49 stdin: I,
50 stdout: O,
50 stdout: O,
51 stderr: Option<E>,
51 stderr: Option<E>,
52 ) -> AttachIo<C, I, O, E> {
52 ) -> AttachIo<C, I, O, E> {
53 let msg_loop = MessageLoop::start(client, b"attachio");
53 let msg_loop = MessageLoop::start(client, b"attachio");
54 AttachIo {
54 AttachIo {
55 msg_loop,
55 msg_loop,
56 stdin,
56 stdin,
57 stdout,
57 stdout,
58 stderr,
58 stderr,
59 }
59 }
60 }
60 }
61 }
61 }
62
62
63 impl<C, I, O, E> Future for AttachIo<C, I, O, E>
63 impl<C, I, O, E> Future for AttachIo<C, I, O, E>
64 where
64 where
65 C: Connection + AsRawFd,
65 C: Connection + AsRawFd,
66 I: AsRawFd,
66 I: AsRawFd,
67 O: AsRawFd,
67 O: AsRawFd,
68 E: AsRawFd,
68 E: AsRawFd,
69 {
69 {
70 type Item = Client<C>;
70 type Item = Client<C>;
71 type Error = io::Error;
71 type Error = io::Error;
72
72
73 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
73 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
74 loop {
74 loop {
75 let (client, msg) = try_ready!(self.msg_loop.poll());
75 let (client, msg) = try_ready!(self.msg_loop.poll());
76 match msg {
76 match msg {
77 ChannelMessage::Data(b'r', data) => {
77 ChannelMessage::Data(b'r', data) => {
78 let fd_cnt = message::parse_result_code(data)?;
78 let fd_cnt = message::parse_result_code(data)?;
79 if fd_cnt == 3 {
79 if fd_cnt == 3 {
80 return Ok(Async::Ready(client));
80 return Ok(Async::Ready(client));
81 } else {
81 } else {
82 return Err(io::Error::new(
82 return Err(io::Error::new(
83 io::ErrorKind::InvalidData,
83 io::ErrorKind::InvalidData,
84 "unexpected attachio result",
84 "unexpected attachio result",
85 ));
85 ));
86 }
86 }
87 }
87 }
88 ChannelMessage::Data(..) => {
88 ChannelMessage::Data(..) => {
89 // just ignore data sent to uninteresting (optional) channel
89 // just ignore data sent to uninteresting (optional) channel
90 self.msg_loop = MessageLoop::resume(client);
90 self.msg_loop = MessageLoop::resume(client);
91 }
91 }
92 ChannelMessage::InputRequest(1) => {
92 ChannelMessage::InputRequest(1) => {
93 // this may fail with EWOULDBLOCK in theory, but the
93 // this may fail with EWOULDBLOCK in theory, but the
94 // payload is quite small, and the send buffer should
94 // payload is quite small, and the send buffer should
95 // be empty so the operation will complete immediately
95 // be empty so the operation will complete immediately
96 let sock_fd = client.as_raw_fd();
96 let sock_fd = client.as_raw_fd();
97 let ifd = self.stdin.as_raw_fd();
97 let ifd = self.stdin.as_raw_fd();
98 let ofd = self.stdout.as_raw_fd();
98 let ofd = self.stdout.as_raw_fd();
99 let efd = self.stderr.as_ref().map_or(ofd, |f| f.as_raw_fd());
99 let efd = self.stderr.as_ref().map_or(ofd, |f| f.as_raw_fd());
100 procutil::send_raw_fds(sock_fd, &[ifd, ofd, efd])?;
100 procutil::send_raw_fds(sock_fd, &[ifd, ofd, efd])?;
101 self.msg_loop = MessageLoop::resume(client);
101 self.msg_loop = MessageLoop::resume(client);
102 }
102 }
103 ChannelMessage::InputRequest(..)
103 ChannelMessage::InputRequest(..)
104 | ChannelMessage::LineRequest(..)
104 | ChannelMessage::LineRequest(..)
105 | ChannelMessage::SystemRequest(..) => {
105 | ChannelMessage::SystemRequest(..) => {
106 return Err(io::Error::new(
106 return Err(io::Error::new(
107 io::ErrorKind::InvalidData,
107 io::ErrorKind::InvalidData,
108 "unsupported request while attaching io",
108 "unsupported request while attaching io",
109 ));
109 ));
110 }
110 }
111 }
111 }
112 }
112 }
113 }
113 }
114 }
114 }
@@ -1,136 +1,136 b''
1 // Copyright 2018 Yuya Nishihara <yuya@tcha.org>
1 // Copyright 2018 Yuya Nishihara <yuya@tcha.org>
2 //
2 //
3 // This software may be used and distributed according to the terms of the
3 // This software may be used and distributed according to the terms of the
4 // GNU General Public License version 2 or any later version.
4 // GNU General Public License version 2 or any later version.
5
5
6 //! cHg extensions to command server client.
6 //! cHg extensions to command server client.
7
7
8 use bytes::{BufMut, Bytes, BytesMut};
8 use bytes::{BufMut, Bytes, BytesMut};
9 use std::ffi::OsStr;
9 use std::ffi::OsStr;
10 use std::io;
10 use std::io;
11 use std::mem;
11 use std::mem;
12 use std::os::unix::ffi::OsStrExt;
12 use std::os::unix::ffi::OsStrExt;
13 use std::os::unix::io::AsRawFd;
13 use std::os::unix::io::AsRawFd;
14 use std::path::Path;
14 use std::path::Path;
15 use tokio_hglib::protocol::{OneShotQuery, OneShotRequest};
15 use tokio_hglib::protocol::{OneShotQuery, OneShotRequest};
16 use tokio_hglib::{Client, Connection};
16 use tokio_hglib::{Client, Connection};
17
17
18 use super::attachio::AttachIo;
18 use crate::attachio::AttachIo;
19 use super::message::{self, Instruction};
19 use crate::message::{self, Instruction};
20 use super::runcommand::ChgRunCommand;
20 use crate::runcommand::ChgRunCommand;
21 use super::uihandler::SystemHandler;
21 use crate::uihandler::SystemHandler;
22
22
23 pub trait ChgClientExt<C>
23 pub trait ChgClientExt<C>
24 where
24 where
25 C: Connection + AsRawFd,
25 C: Connection + AsRawFd,
26 {
26 {
27 /// Attaches the client file descriptors to the server.
27 /// Attaches the client file descriptors to the server.
28 fn attach_io<I, O, E>(self, stdin: I, stdout: O, stderr: E) -> AttachIo<C, I, O, E>
28 fn attach_io<I, O, E>(self, stdin: I, stdout: O, stderr: E) -> AttachIo<C, I, O, E>
29 where
29 where
30 I: AsRawFd,
30 I: AsRawFd,
31 O: AsRawFd,
31 O: AsRawFd,
32 E: AsRawFd;
32 E: AsRawFd;
33
33
34 /// Changes the working directory of the server.
34 /// Changes the working directory of the server.
35 fn set_current_dir<P>(self, dir: P) -> OneShotRequest<C>
35 fn set_current_dir<P>(self, dir: P) -> OneShotRequest<C>
36 where
36 where
37 P: AsRef<Path>;
37 P: AsRef<Path>;
38
38
39 /// Updates the environment variables of the server.
39 /// Updates the environment variables of the server.
40 fn set_env_vars_os<I, P>(self, vars: I) -> OneShotRequest<C>
40 fn set_env_vars_os<I, P>(self, vars: I) -> OneShotRequest<C>
41 where
41 where
42 I: IntoIterator<Item = (P, P)>,
42 I: IntoIterator<Item = (P, P)>,
43 P: AsRef<OsStr>;
43 P: AsRef<OsStr>;
44
44
45 /// Changes the process title of the server.
45 /// Changes the process title of the server.
46 fn set_process_name<P>(self, name: P) -> OneShotRequest<C>
46 fn set_process_name<P>(self, name: P) -> OneShotRequest<C>
47 where
47 where
48 P: AsRef<OsStr>;
48 P: AsRef<OsStr>;
49
49
50 /// Changes the umask of the server process.
50 /// Changes the umask of the server process.
51 fn set_umask(self, mask: u32) -> OneShotRequest<C>;
51 fn set_umask(self, mask: u32) -> OneShotRequest<C>;
52
52
53 /// Runs the specified Mercurial command with cHg extension.
53 /// Runs the specified Mercurial command with cHg extension.
54 fn run_command_chg<I, P, H>(self, handler: H, args: I) -> ChgRunCommand<C, H>
54 fn run_command_chg<I, P, H>(self, handler: H, args: I) -> ChgRunCommand<C, H>
55 where
55 where
56 I: IntoIterator<Item = P>,
56 I: IntoIterator<Item = P>,
57 P: AsRef<OsStr>,
57 P: AsRef<OsStr>,
58 H: SystemHandler;
58 H: SystemHandler;
59
59
60 /// Validates if the server can run Mercurial commands with the expected
60 /// Validates if the server can run Mercurial commands with the expected
61 /// configuration.
61 /// configuration.
62 ///
62 ///
63 /// The `args` should contain early command arguments such as `--config`
63 /// The `args` should contain early command arguments such as `--config`
64 /// and `-R`.
64 /// and `-R`.
65 ///
65 ///
66 /// Client-side environment must be sent prior to this request, by
66 /// Client-side environment must be sent prior to this request, by
67 /// `set_current_dir()` and `set_env_vars_os()`.
67 /// `set_current_dir()` and `set_env_vars_os()`.
68 fn validate<I, P>(self, args: I) -> OneShotQuery<C, fn(Bytes) -> io::Result<Vec<Instruction>>>
68 fn validate<I, P>(self, args: I) -> OneShotQuery<C, fn(Bytes) -> io::Result<Vec<Instruction>>>
69 where
69 where
70 I: IntoIterator<Item = P>,
70 I: IntoIterator<Item = P>,
71 P: AsRef<OsStr>;
71 P: AsRef<OsStr>;
72 }
72 }
73
73
74 impl<C> ChgClientExt<C> for Client<C>
74 impl<C> ChgClientExt<C> for Client<C>
75 where
75 where
76 C: Connection + AsRawFd,
76 C: Connection + AsRawFd,
77 {
77 {
78 fn attach_io<I, O, E>(self, stdin: I, stdout: O, stderr: E) -> AttachIo<C, I, O, E>
78 fn attach_io<I, O, E>(self, stdin: I, stdout: O, stderr: E) -> AttachIo<C, I, O, E>
79 where
79 where
80 I: AsRawFd,
80 I: AsRawFd,
81 O: AsRawFd,
81 O: AsRawFd,
82 E: AsRawFd,
82 E: AsRawFd,
83 {
83 {
84 AttachIo::with_client(self, stdin, stdout, Some(stderr))
84 AttachIo::with_client(self, stdin, stdout, Some(stderr))
85 }
85 }
86
86
87 fn set_current_dir<P>(self, dir: P) -> OneShotRequest<C>
87 fn set_current_dir<P>(self, dir: P) -> OneShotRequest<C>
88 where
88 where
89 P: AsRef<Path>,
89 P: AsRef<Path>,
90 {
90 {
91 OneShotRequest::start_with_args(self, b"chdir", dir.as_ref().as_os_str().as_bytes())
91 OneShotRequest::start_with_args(self, b"chdir", dir.as_ref().as_os_str().as_bytes())
92 }
92 }
93
93
94 fn set_env_vars_os<I, P>(self, vars: I) -> OneShotRequest<C>
94 fn set_env_vars_os<I, P>(self, vars: I) -> OneShotRequest<C>
95 where
95 where
96 I: IntoIterator<Item = (P, P)>,
96 I: IntoIterator<Item = (P, P)>,
97 P: AsRef<OsStr>,
97 P: AsRef<OsStr>,
98 {
98 {
99 OneShotRequest::start_with_args(self, b"setenv", message::pack_env_vars_os(vars))
99 OneShotRequest::start_with_args(self, b"setenv", message::pack_env_vars_os(vars))
100 }
100 }
101
101
102 fn set_process_name<P>(self, name: P) -> OneShotRequest<C>
102 fn set_process_name<P>(self, name: P) -> OneShotRequest<C>
103 where
103 where
104 P: AsRef<OsStr>,
104 P: AsRef<OsStr>,
105 {
105 {
106 OneShotRequest::start_with_args(self, b"setprocname", name.as_ref().as_bytes())
106 OneShotRequest::start_with_args(self, b"setprocname", name.as_ref().as_bytes())
107 }
107 }
108
108
109 fn set_umask(self, mask: u32) -> OneShotRequest<C> {
109 fn set_umask(self, mask: u32) -> OneShotRequest<C> {
110 let mut args = BytesMut::with_capacity(mem::size_of_val(&mask));
110 let mut args = BytesMut::with_capacity(mem::size_of_val(&mask));
111 args.put_u32_be(mask);
111 args.put_u32_be(mask);
112 OneShotRequest::start_with_args(self, b"setumask2", args)
112 OneShotRequest::start_with_args(self, b"setumask2", args)
113 }
113 }
114
114
115 fn run_command_chg<I, P, H>(self, handler: H, args: I) -> ChgRunCommand<C, H>
115 fn run_command_chg<I, P, H>(self, handler: H, args: I) -> ChgRunCommand<C, H>
116 where
116 where
117 I: IntoIterator<Item = P>,
117 I: IntoIterator<Item = P>,
118 P: AsRef<OsStr>,
118 P: AsRef<OsStr>,
119 H: SystemHandler,
119 H: SystemHandler,
120 {
120 {
121 ChgRunCommand::with_client(self, handler, message::pack_args_os(args))
121 ChgRunCommand::with_client(self, handler, message::pack_args_os(args))
122 }
122 }
123
123
124 fn validate<I, P>(self, args: I) -> OneShotQuery<C, fn(Bytes) -> io::Result<Vec<Instruction>>>
124 fn validate<I, P>(self, args: I) -> OneShotQuery<C, fn(Bytes) -> io::Result<Vec<Instruction>>>
125 where
125 where
126 I: IntoIterator<Item = P>,
126 I: IntoIterator<Item = P>,
127 P: AsRef<OsStr>,
127 P: AsRef<OsStr>,
128 {
128 {
129 OneShotQuery::start_with_args(
129 OneShotQuery::start_with_args(
130 self,
130 self,
131 b"validate",
131 b"validate",
132 message::pack_args_os(args),
132 message::pack_args_os(args),
133 message::parse_instructions,
133 message::parse_instructions,
134 )
134 )
135 }
135 }
136 }
136 }
@@ -1,501 +1,501 b''
1 // Copyright 2011, 2018 Yuya Nishihara <yuya@tcha.org>
1 // Copyright 2011, 2018 Yuya Nishihara <yuya@tcha.org>
2 //
2 //
3 // This software may be used and distributed according to the terms of the
3 // This software may be used and distributed according to the terms of the
4 // GNU General Public License version 2 or any later version.
4 // GNU General Public License version 2 or any later version.
5
5
6 //! Utility for locating command-server process.
6 //! Utility for locating command-server process.
7
7
8 use futures::future::{self, Either, Loop};
8 use futures::future::{self, Either, Loop};
9 use log::debug;
9 use log::debug;
10 use std::env;
10 use std::env;
11 use std::ffi::{OsStr, OsString};
11 use std::ffi::{OsStr, OsString};
12 use std::fs::{self, DirBuilder};
12 use std::fs::{self, DirBuilder};
13 use std::io;
13 use std::io;
14 use std::os::unix::ffi::{OsStrExt, OsStringExt};
14 use std::os::unix::ffi::{OsStrExt, OsStringExt};
15 use std::os::unix::fs::{DirBuilderExt, MetadataExt};
15 use std::os::unix::fs::{DirBuilderExt, MetadataExt};
16 use std::path::{Path, PathBuf};
16 use std::path::{Path, PathBuf};
17 use std::process::{self, Command};
17 use std::process::{self, Command};
18 use std::time::Duration;
18 use std::time::Duration;
19 use tokio::prelude::*;
19 use tokio::prelude::*;
20 use tokio_hglib::UnixClient;
20 use tokio_hglib::UnixClient;
21 use tokio_process::{Child, CommandExt};
21 use tokio_process::{Child, CommandExt};
22 use tokio_timer;
22 use tokio_timer;
23
23
24 use super::clientext::ChgClientExt;
24 use crate::clientext::ChgClientExt;
25 use super::message::{Instruction, ServerSpec};
25 use crate::message::{Instruction, ServerSpec};
26 use super::procutil;
26 use crate::procutil;
27
27
28 const REQUIRED_SERVER_CAPABILITIES: &[&str] = &[
28 const REQUIRED_SERVER_CAPABILITIES: &[&str] = &[
29 "attachio",
29 "attachio",
30 "chdir",
30 "chdir",
31 "runcommand",
31 "runcommand",
32 "setenv",
32 "setenv",
33 "setumask2",
33 "setumask2",
34 "validate",
34 "validate",
35 ];
35 ];
36
36
37 /// Helper to connect to and spawn a server process.
37 /// Helper to connect to and spawn a server process.
38 #[derive(Clone, Debug)]
38 #[derive(Clone, Debug)]
39 pub struct Locator {
39 pub struct Locator {
40 hg_command: OsString,
40 hg_command: OsString,
41 hg_early_args: Vec<OsString>,
41 hg_early_args: Vec<OsString>,
42 current_dir: PathBuf,
42 current_dir: PathBuf,
43 env_vars: Vec<(OsString, OsString)>,
43 env_vars: Vec<(OsString, OsString)>,
44 process_id: u32,
44 process_id: u32,
45 base_sock_path: PathBuf,
45 base_sock_path: PathBuf,
46 redirect_sock_path: Option<PathBuf>,
46 redirect_sock_path: Option<PathBuf>,
47 timeout: Duration,
47 timeout: Duration,
48 }
48 }
49
49
50 impl Locator {
50 impl Locator {
51 /// Creates locator capturing the current process environment.
51 /// Creates locator capturing the current process environment.
52 ///
52 ///
53 /// If no `$CHGSOCKNAME` is specified, the socket directory will be
53 /// If no `$CHGSOCKNAME` is specified, the socket directory will be
54 /// created as necessary.
54 /// created as necessary.
55 pub fn prepare_from_env() -> io::Result<Locator> {
55 pub fn prepare_from_env() -> io::Result<Locator> {
56 Ok(Locator {
56 Ok(Locator {
57 hg_command: default_hg_command(),
57 hg_command: default_hg_command(),
58 hg_early_args: Vec::new(),
58 hg_early_args: Vec::new(),
59 current_dir: env::current_dir()?,
59 current_dir: env::current_dir()?,
60 env_vars: env::vars_os().collect(),
60 env_vars: env::vars_os().collect(),
61 process_id: process::id(),
61 process_id: process::id(),
62 base_sock_path: prepare_server_socket_path()?,
62 base_sock_path: prepare_server_socket_path()?,
63 redirect_sock_path: None,
63 redirect_sock_path: None,
64 timeout: default_timeout(),
64 timeout: default_timeout(),
65 })
65 })
66 }
66 }
67
67
68 /// Temporary socket path for this client process.
68 /// Temporary socket path for this client process.
69 fn temp_sock_path(&self) -> PathBuf {
69 fn temp_sock_path(&self) -> PathBuf {
70 let src = self.base_sock_path.as_os_str().as_bytes();
70 let src = self.base_sock_path.as_os_str().as_bytes();
71 let mut buf = Vec::with_capacity(src.len() + 6); // "{src}.{pid}".len()
71 let mut buf = Vec::with_capacity(src.len() + 6); // "{src}.{pid}".len()
72 buf.extend_from_slice(src);
72 buf.extend_from_slice(src);
73 buf.extend_from_slice(format!(".{}", self.process_id).as_bytes());
73 buf.extend_from_slice(format!(".{}", self.process_id).as_bytes());
74 OsString::from_vec(buf).into()
74 OsString::from_vec(buf).into()
75 }
75 }
76
76
77 /// Specifies the arguments to be passed to the server at start.
77 /// Specifies the arguments to be passed to the server at start.
78 pub fn set_early_args<I, P>(&mut self, args: I)
78 pub fn set_early_args<I, P>(&mut self, args: I)
79 where
79 where
80 I: IntoIterator<Item = P>,
80 I: IntoIterator<Item = P>,
81 P: AsRef<OsStr>,
81 P: AsRef<OsStr>,
82 {
82 {
83 self.hg_early_args = args.into_iter().map(|a| a.as_ref().to_owned()).collect();
83 self.hg_early_args = args.into_iter().map(|a| a.as_ref().to_owned()).collect();
84 }
84 }
85
85
86 /// Connects to the server.
86 /// Connects to the server.
87 ///
87 ///
88 /// The server process will be spawned if not running.
88 /// The server process will be spawned if not running.
89 pub fn connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> {
89 pub fn connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> {
90 future::loop_fn((self, 0), |(loc, cnt)| {
90 future::loop_fn((self, 0), |(loc, cnt)| {
91 if cnt < 10 {
91 if cnt < 10 {
92 let fut = loc
92 let fut = loc
93 .try_connect()
93 .try_connect()
94 .and_then(|(loc, client)| {
94 .and_then(|(loc, client)| {
95 client
95 client
96 .validate(&loc.hg_early_args)
96 .validate(&loc.hg_early_args)
97 .map(|(client, instructions)| (loc, client, instructions))
97 .map(|(client, instructions)| (loc, client, instructions))
98 })
98 })
99 .and_then(move |(loc, client, instructions)| {
99 .and_then(move |(loc, client, instructions)| {
100 loc.run_instructions(client, instructions, cnt)
100 loc.run_instructions(client, instructions, cnt)
101 });
101 });
102 Either::A(fut)
102 Either::A(fut)
103 } else {
103 } else {
104 let msg = format!(
104 let msg = format!(
105 concat!(
105 concat!(
106 "too many redirections.\n",
106 "too many redirections.\n",
107 "Please make sure {:?} is not a wrapper which ",
107 "Please make sure {:?} is not a wrapper which ",
108 "changes sensitive environment variables ",
108 "changes sensitive environment variables ",
109 "before executing hg. If you have to use a ",
109 "before executing hg. If you have to use a ",
110 "wrapper, wrap chg instead of hg.",
110 "wrapper, wrap chg instead of hg.",
111 ),
111 ),
112 loc.hg_command
112 loc.hg_command
113 );
113 );
114 Either::B(future::err(io::Error::new(io::ErrorKind::Other, msg)))
114 Either::B(future::err(io::Error::new(io::ErrorKind::Other, msg)))
115 }
115 }
116 })
116 })
117 }
117 }
118
118
119 /// Runs instructions received from the server.
119 /// Runs instructions received from the server.
120 fn run_instructions(
120 fn run_instructions(
121 mut self,
121 mut self,
122 client: UnixClient,
122 client: UnixClient,
123 instructions: Vec<Instruction>,
123 instructions: Vec<Instruction>,
124 cnt: usize,
124 cnt: usize,
125 ) -> io::Result<Loop<(Self, UnixClient), (Self, usize)>> {
125 ) -> io::Result<Loop<(Self, UnixClient), (Self, usize)>> {
126 let mut reconnect = false;
126 let mut reconnect = false;
127 for inst in instructions {
127 for inst in instructions {
128 debug!("instruction: {:?}", inst);
128 debug!("instruction: {:?}", inst);
129 match inst {
129 match inst {
130 Instruction::Exit(_) => {
130 Instruction::Exit(_) => {
131 // Just returns the current connection to run the
131 // Just returns the current connection to run the
132 // unparsable command and report the error
132 // unparsable command and report the error
133 return Ok(Loop::Break((self, client)));
133 return Ok(Loop::Break((self, client)));
134 }
134 }
135 Instruction::Reconnect => {
135 Instruction::Reconnect => {
136 reconnect = true;
136 reconnect = true;
137 }
137 }
138 Instruction::Redirect(path) => {
138 Instruction::Redirect(path) => {
139 if path.parent() != self.base_sock_path.parent() {
139 if path.parent() != self.base_sock_path.parent() {
140 let msg = format!(
140 let msg = format!(
141 "insecure redirect instruction from server: {}",
141 "insecure redirect instruction from server: {}",
142 path.display()
142 path.display()
143 );
143 );
144 return Err(io::Error::new(io::ErrorKind::InvalidData, msg));
144 return Err(io::Error::new(io::ErrorKind::InvalidData, msg));
145 }
145 }
146 self.redirect_sock_path = Some(path);
146 self.redirect_sock_path = Some(path);
147 reconnect = true;
147 reconnect = true;
148 }
148 }
149 Instruction::Unlink(path) => {
149 Instruction::Unlink(path) => {
150 if path.parent() != self.base_sock_path.parent() {
150 if path.parent() != self.base_sock_path.parent() {
151 let msg = format!(
151 let msg = format!(
152 "insecure unlink instruction from server: {}",
152 "insecure unlink instruction from server: {}",
153 path.display()
153 path.display()
154 );
154 );
155 return Err(io::Error::new(io::ErrorKind::InvalidData, msg));
155 return Err(io::Error::new(io::ErrorKind::InvalidData, msg));
156 }
156 }
157 fs::remove_file(path).unwrap_or(()); // may race
157 fs::remove_file(path).unwrap_or(()); // may race
158 }
158 }
159 }
159 }
160 }
160 }
161
161
162 if reconnect {
162 if reconnect {
163 Ok(Loop::Continue((self, cnt + 1)))
163 Ok(Loop::Continue((self, cnt + 1)))
164 } else {
164 } else {
165 Ok(Loop::Break((self, client)))
165 Ok(Loop::Break((self, client)))
166 }
166 }
167 }
167 }
168
168
169 /// Tries to connect to the existing server, or spawns new if not running.
169 /// Tries to connect to the existing server, or spawns new if not running.
170 fn try_connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> {
170 fn try_connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> {
171 let sock_path = self
171 let sock_path = self
172 .redirect_sock_path
172 .redirect_sock_path
173 .as_ref()
173 .as_ref()
174 .unwrap_or(&self.base_sock_path)
174 .unwrap_or(&self.base_sock_path)
175 .clone();
175 .clone();
176 debug!("try connect to {}", sock_path.display());
176 debug!("try connect to {}", sock_path.display());
177 UnixClient::connect(sock_path)
177 UnixClient::connect(sock_path)
178 .then(|res| {
178 .then(|res| {
179 match res {
179 match res {
180 Ok(client) => Either::A(future::ok((self, client))),
180 Ok(client) => Either::A(future::ok((self, client))),
181 Err(_) => {
181 Err(_) => {
182 // Prevent us from being re-connected to the outdated
182 // Prevent us from being re-connected to the outdated
183 // master server: We were told by the server to redirect
183 // master server: We were told by the server to redirect
184 // to redirect_sock_path, which didn't work. We do not
184 // to redirect_sock_path, which didn't work. We do not
185 // want to connect to the same master server again
185 // want to connect to the same master server again
186 // because it would probably tell us the same thing.
186 // because it would probably tell us the same thing.
187 if self.redirect_sock_path.is_some() {
187 if self.redirect_sock_path.is_some() {
188 fs::remove_file(&self.base_sock_path).unwrap_or(());
188 fs::remove_file(&self.base_sock_path).unwrap_or(());
189 // may race
189 // may race
190 }
190 }
191 Either::B(self.spawn_connect())
191 Either::B(self.spawn_connect())
192 }
192 }
193 }
193 }
194 })
194 })
195 .and_then(|(loc, client)| {
195 .and_then(|(loc, client)| {
196 check_server_capabilities(client.server_spec())?;
196 check_server_capabilities(client.server_spec())?;
197 Ok((loc, client))
197 Ok((loc, client))
198 })
198 })
199 .and_then(|(loc, client)| {
199 .and_then(|(loc, client)| {
200 // It's purely optional, and the server might not support this command.
200 // It's purely optional, and the server might not support this command.
201 if client.server_spec().capabilities.contains("setprocname") {
201 if client.server_spec().capabilities.contains("setprocname") {
202 let fut = client
202 let fut = client
203 .set_process_name(format!("chg[worker/{}]", loc.process_id))
203 .set_process_name(format!("chg[worker/{}]", loc.process_id))
204 .map(|client| (loc, client));
204 .map(|client| (loc, client));
205 Either::A(fut)
205 Either::A(fut)
206 } else {
206 } else {
207 Either::B(future::ok((loc, client)))
207 Either::B(future::ok((loc, client)))
208 }
208 }
209 })
209 })
210 .and_then(|(loc, client)| {
210 .and_then(|(loc, client)| {
211 client
211 client
212 .set_current_dir(&loc.current_dir)
212 .set_current_dir(&loc.current_dir)
213 .map(|client| (loc, client))
213 .map(|client| (loc, client))
214 })
214 })
215 .and_then(|(loc, client)| {
215 .and_then(|(loc, client)| {
216 client
216 client
217 .set_env_vars_os(loc.env_vars.iter().cloned())
217 .set_env_vars_os(loc.env_vars.iter().cloned())
218 .map(|client| (loc, client))
218 .map(|client| (loc, client))
219 })
219 })
220 }
220 }
221
221
222 /// Spawns new server process and connects to it.
222 /// Spawns new server process and connects to it.
223 ///
223 ///
224 /// The server will be spawned at the current working directory, then
224 /// The server will be spawned at the current working directory, then
225 /// chdir to "/", so that the server will load configs from the target
225 /// chdir to "/", so that the server will load configs from the target
226 /// repository.
226 /// repository.
227 fn spawn_connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> {
227 fn spawn_connect(self) -> impl Future<Item = (Self, UnixClient), Error = io::Error> {
228 let sock_path = self.temp_sock_path();
228 let sock_path = self.temp_sock_path();
229 debug!("start cmdserver at {}", sock_path.display());
229 debug!("start cmdserver at {}", sock_path.display());
230 Command::new(&self.hg_command)
230 Command::new(&self.hg_command)
231 .arg("serve")
231 .arg("serve")
232 .arg("--cmdserver")
232 .arg("--cmdserver")
233 .arg("chgunix")
233 .arg("chgunix")
234 .arg("--address")
234 .arg("--address")
235 .arg(&sock_path)
235 .arg(&sock_path)
236 .arg("--daemon-postexec")
236 .arg("--daemon-postexec")
237 .arg("chdir:/")
237 .arg("chdir:/")
238 .args(&self.hg_early_args)
238 .args(&self.hg_early_args)
239 .current_dir(&self.current_dir)
239 .current_dir(&self.current_dir)
240 .env_clear()
240 .env_clear()
241 .envs(self.env_vars.iter().cloned())
241 .envs(self.env_vars.iter().cloned())
242 .env("CHGINTERNALMARK", "")
242 .env("CHGINTERNALMARK", "")
243 .spawn_async()
243 .spawn_async()
244 .into_future()
244 .into_future()
245 .and_then(|server| self.connect_spawned(server, sock_path))
245 .and_then(|server| self.connect_spawned(server, sock_path))
246 .and_then(|(loc, client, sock_path)| {
246 .and_then(|(loc, client, sock_path)| {
247 debug!(
247 debug!(
248 "rename {} to {}",
248 "rename {} to {}",
249 sock_path.display(),
249 sock_path.display(),
250 loc.base_sock_path.display()
250 loc.base_sock_path.display()
251 );
251 );
252 fs::rename(&sock_path, &loc.base_sock_path)?;
252 fs::rename(&sock_path, &loc.base_sock_path)?;
253 Ok((loc, client))
253 Ok((loc, client))
254 })
254 })
255 }
255 }
256
256
257 /// Tries to connect to the just spawned server repeatedly until timeout
257 /// Tries to connect to the just spawned server repeatedly until timeout
258 /// exceeded.
258 /// exceeded.
259 fn connect_spawned(
259 fn connect_spawned(
260 self,
260 self,
261 server: Child,
261 server: Child,
262 sock_path: PathBuf,
262 sock_path: PathBuf,
263 ) -> impl Future<Item = (Self, UnixClient, PathBuf), Error = io::Error> {
263 ) -> impl Future<Item = (Self, UnixClient, PathBuf), Error = io::Error> {
264 debug!("try connect to {} repeatedly", sock_path.display());
264 debug!("try connect to {} repeatedly", sock_path.display());
265 let connect = future::loop_fn(sock_path, |sock_path| {
265 let connect = future::loop_fn(sock_path, |sock_path| {
266 UnixClient::connect(sock_path.clone()).then(|res| {
266 UnixClient::connect(sock_path.clone()).then(|res| {
267 match res {
267 match res {
268 Ok(client) => Either::A(future::ok(Loop::Break((client, sock_path)))),
268 Ok(client) => Either::A(future::ok(Loop::Break((client, sock_path)))),
269 Err(_) => {
269 Err(_) => {
270 // try again with slight delay
270 // try again with slight delay
271 let fut = tokio_timer::sleep(Duration::from_millis(10))
271 let fut = tokio_timer::sleep(Duration::from_millis(10))
272 .map(|()| Loop::Continue(sock_path))
272 .map(|()| Loop::Continue(sock_path))
273 .map_err(|err| io::Error::new(io::ErrorKind::Other, err));
273 .map_err(|err| io::Error::new(io::ErrorKind::Other, err));
274 Either::B(fut)
274 Either::B(fut)
275 }
275 }
276 }
276 }
277 })
277 })
278 });
278 });
279
279
280 // waits for either connection established or server failed to start
280 // waits for either connection established or server failed to start
281 connect
281 connect
282 .select2(server)
282 .select2(server)
283 .map_err(|res| res.split().0)
283 .map_err(|res| res.split().0)
284 .timeout(self.timeout)
284 .timeout(self.timeout)
285 .map_err(|err| {
285 .map_err(|err| {
286 err.into_inner().unwrap_or_else(|| {
286 err.into_inner().unwrap_or_else(|| {
287 io::Error::new(
287 io::Error::new(
288 io::ErrorKind::TimedOut,
288 io::ErrorKind::TimedOut,
289 "timed out while connecting to server",
289 "timed out while connecting to server",
290 )
290 )
291 })
291 })
292 })
292 })
293 .and_then(|res| {
293 .and_then(|res| {
294 match res {
294 match res {
295 Either::A(((client, sock_path), server)) => {
295 Either::A(((client, sock_path), server)) => {
296 server.forget(); // continue to run in background
296 server.forget(); // continue to run in background
297 Ok((self, client, sock_path))
297 Ok((self, client, sock_path))
298 }
298 }
299 Either::B((st, _)) => Err(io::Error::new(
299 Either::B((st, _)) => Err(io::Error::new(
300 io::ErrorKind::Other,
300 io::ErrorKind::Other,
301 format!("server exited too early: {}", st),
301 format!("server exited too early: {}", st),
302 )),
302 )),
303 }
303 }
304 })
304 })
305 }
305 }
306 }
306 }
307
307
308 /// Determines the server socket to connect to.
308 /// Determines the server socket to connect to.
309 ///
309 ///
310 /// If no `$CHGSOCKNAME` is specified, the socket directory will be created
310 /// If no `$CHGSOCKNAME` is specified, the socket directory will be created
311 /// as necessary.
311 /// as necessary.
312 fn prepare_server_socket_path() -> io::Result<PathBuf> {
312 fn prepare_server_socket_path() -> io::Result<PathBuf> {
313 if let Some(s) = env::var_os("CHGSOCKNAME") {
313 if let Some(s) = env::var_os("CHGSOCKNAME") {
314 Ok(PathBuf::from(s))
314 Ok(PathBuf::from(s))
315 } else {
315 } else {
316 let mut path = default_server_socket_dir();
316 let mut path = default_server_socket_dir();
317 create_secure_dir(&path)?;
317 create_secure_dir(&path)?;
318 path.push("server");
318 path.push("server");
319 Ok(path)
319 Ok(path)
320 }
320 }
321 }
321 }
322
322
323 /// Determines the default server socket path as follows.
323 /// Determines the default server socket path as follows.
324 ///
324 ///
325 /// 1. `$XDG_RUNTIME_DIR/chg`
325 /// 1. `$XDG_RUNTIME_DIR/chg`
326 /// 2. `$TMPDIR/chg$UID`
326 /// 2. `$TMPDIR/chg$UID`
327 /// 3. `/tmp/chg$UID`
327 /// 3. `/tmp/chg$UID`
328 pub fn default_server_socket_dir() -> PathBuf {
328 pub fn default_server_socket_dir() -> PathBuf {
329 // XDG_RUNTIME_DIR should be ignored if it has an insufficient permission.
329 // XDG_RUNTIME_DIR should be ignored if it has an insufficient permission.
330 // https://standards.freedesktop.org/basedir-spec/basedir-spec-latest.html
330 // https://standards.freedesktop.org/basedir-spec/basedir-spec-latest.html
331 if let Some(Ok(s)) = env::var_os("XDG_RUNTIME_DIR").map(check_secure_dir) {
331 if let Some(Ok(s)) = env::var_os("XDG_RUNTIME_DIR").map(check_secure_dir) {
332 let mut path = PathBuf::from(s);
332 let mut path = PathBuf::from(s);
333 path.push("chg");
333 path.push("chg");
334 path
334 path
335 } else {
335 } else {
336 let mut path = env::temp_dir();
336 let mut path = env::temp_dir();
337 path.push(format!("chg{}", procutil::get_effective_uid()));
337 path.push(format!("chg{}", procutil::get_effective_uid()));
338 path
338 path
339 }
339 }
340 }
340 }
341
341
342 /// Determines the default hg command.
342 /// Determines the default hg command.
343 pub fn default_hg_command() -> OsString {
343 pub fn default_hg_command() -> OsString {
344 // TODO: maybe allow embedding the path at compile time (or load from hgrc)
344 // TODO: maybe allow embedding the path at compile time (or load from hgrc)
345 env::var_os("CHGHG")
345 env::var_os("CHGHG")
346 .or(env::var_os("HG"))
346 .or(env::var_os("HG"))
347 .unwrap_or(OsStr::new("hg").to_owned())
347 .unwrap_or(OsStr::new("hg").to_owned())
348 }
348 }
349
349
350 fn default_timeout() -> Duration {
350 fn default_timeout() -> Duration {
351 let secs = env::var("CHGTIMEOUT")
351 let secs = env::var("CHGTIMEOUT")
352 .ok()
352 .ok()
353 .and_then(|s| s.parse().ok())
353 .and_then(|s| s.parse().ok())
354 .unwrap_or(60);
354 .unwrap_or(60);
355 Duration::from_secs(secs)
355 Duration::from_secs(secs)
356 }
356 }
357
357
358 /// Creates a directory which the other users cannot access to.
358 /// Creates a directory which the other users cannot access to.
359 ///
359 ///
360 /// If the directory already exists, tests its permission.
360 /// If the directory already exists, tests its permission.
361 fn create_secure_dir<P>(path: P) -> io::Result<()>
361 fn create_secure_dir<P>(path: P) -> io::Result<()>
362 where
362 where
363 P: AsRef<Path>,
363 P: AsRef<Path>,
364 {
364 {
365 DirBuilder::new()
365 DirBuilder::new()
366 .mode(0o700)
366 .mode(0o700)
367 .create(path.as_ref())
367 .create(path.as_ref())
368 .or_else(|err| {
368 .or_else(|err| {
369 if err.kind() == io::ErrorKind::AlreadyExists {
369 if err.kind() == io::ErrorKind::AlreadyExists {
370 check_secure_dir(path).map(|_| ())
370 check_secure_dir(path).map(|_| ())
371 } else {
371 } else {
372 Err(err)
372 Err(err)
373 }
373 }
374 })
374 })
375 }
375 }
376
376
377 fn check_secure_dir<P>(path: P) -> io::Result<P>
377 fn check_secure_dir<P>(path: P) -> io::Result<P>
378 where
378 where
379 P: AsRef<Path>,
379 P: AsRef<Path>,
380 {
380 {
381 let a = fs::symlink_metadata(path.as_ref())?;
381 let a = fs::symlink_metadata(path.as_ref())?;
382 if a.is_dir() && a.uid() == procutil::get_effective_uid() && (a.mode() & 0o777) == 0o700 {
382 if a.is_dir() && a.uid() == procutil::get_effective_uid() && (a.mode() & 0o777) == 0o700 {
383 Ok(path)
383 Ok(path)
384 } else {
384 } else {
385 Err(io::Error::new(io::ErrorKind::Other, "insecure directory"))
385 Err(io::Error::new(io::ErrorKind::Other, "insecure directory"))
386 }
386 }
387 }
387 }
388
388
389 fn check_server_capabilities(spec: &ServerSpec) -> io::Result<()> {
389 fn check_server_capabilities(spec: &ServerSpec) -> io::Result<()> {
390 let unsupported: Vec<_> = REQUIRED_SERVER_CAPABILITIES
390 let unsupported: Vec<_> = REQUIRED_SERVER_CAPABILITIES
391 .iter()
391 .iter()
392 .cloned()
392 .cloned()
393 .filter(|&s| !spec.capabilities.contains(s))
393 .filter(|&s| !spec.capabilities.contains(s))
394 .collect();
394 .collect();
395 if unsupported.is_empty() {
395 if unsupported.is_empty() {
396 Ok(())
396 Ok(())
397 } else {
397 } else {
398 let msg = format!(
398 let msg = format!(
399 "insufficient server capabilities: {}",
399 "insufficient server capabilities: {}",
400 unsupported.join(", ")
400 unsupported.join(", ")
401 );
401 );
402 Err(io::Error::new(io::ErrorKind::Other, msg))
402 Err(io::Error::new(io::ErrorKind::Other, msg))
403 }
403 }
404 }
404 }
405
405
406 /// Collects arguments which need to be passed to the server at start.
406 /// Collects arguments which need to be passed to the server at start.
407 pub fn collect_early_args<I, P>(args: I) -> Vec<OsString>
407 pub fn collect_early_args<I, P>(args: I) -> Vec<OsString>
408 where
408 where
409 I: IntoIterator<Item = P>,
409 I: IntoIterator<Item = P>,
410 P: AsRef<OsStr>,
410 P: AsRef<OsStr>,
411 {
411 {
412 let mut args_iter = args.into_iter();
412 let mut args_iter = args.into_iter();
413 let mut early_args = Vec::new();
413 let mut early_args = Vec::new();
414 while let Some(arg) = args_iter.next() {
414 while let Some(arg) = args_iter.next() {
415 let argb = arg.as_ref().as_bytes();
415 let argb = arg.as_ref().as_bytes();
416 if argb == b"--" {
416 if argb == b"--" {
417 break;
417 break;
418 } else if argb.starts_with(b"--") {
418 } else if argb.starts_with(b"--") {
419 let mut split = argb[2..].splitn(2, |&c| c == b'=');
419 let mut split = argb[2..].splitn(2, |&c| c == b'=');
420 match split.next().unwrap() {
420 match split.next().unwrap() {
421 b"traceback" => {
421 b"traceback" => {
422 if split.next().is_none() {
422 if split.next().is_none() {
423 early_args.push(arg.as_ref().to_owned());
423 early_args.push(arg.as_ref().to_owned());
424 }
424 }
425 }
425 }
426 b"config" | b"cwd" | b"repo" | b"repository" => {
426 b"config" | b"cwd" | b"repo" | b"repository" => {
427 if split.next().is_some() {
427 if split.next().is_some() {
428 // --<flag>=<val>
428 // --<flag>=<val>
429 early_args.push(arg.as_ref().to_owned());
429 early_args.push(arg.as_ref().to_owned());
430 } else {
430 } else {
431 // --<flag> <val>
431 // --<flag> <val>
432 args_iter.next().map(|val| {
432 args_iter.next().map(|val| {
433 early_args.push(arg.as_ref().to_owned());
433 early_args.push(arg.as_ref().to_owned());
434 early_args.push(val.as_ref().to_owned());
434 early_args.push(val.as_ref().to_owned());
435 });
435 });
436 }
436 }
437 }
437 }
438 _ => {}
438 _ => {}
439 }
439 }
440 } else if argb.starts_with(b"-R") {
440 } else if argb.starts_with(b"-R") {
441 if argb.len() > 2 {
441 if argb.len() > 2 {
442 // -R<val>
442 // -R<val>
443 early_args.push(arg.as_ref().to_owned());
443 early_args.push(arg.as_ref().to_owned());
444 } else {
444 } else {
445 // -R <val>
445 // -R <val>
446 args_iter.next().map(|val| {
446 args_iter.next().map(|val| {
447 early_args.push(arg.as_ref().to_owned());
447 early_args.push(arg.as_ref().to_owned());
448 early_args.push(val.as_ref().to_owned());
448 early_args.push(val.as_ref().to_owned());
449 });
449 });
450 }
450 }
451 }
451 }
452 }
452 }
453
453
454 early_args
454 early_args
455 }
455 }
456
456
457 #[cfg(test)]
457 #[cfg(test)]
458 mod tests {
458 mod tests {
459 use super::*;
459 use super::*;
460
460
461 #[test]
461 #[test]
462 fn collect_early_args_some() {
462 fn collect_early_args_some() {
463 assert!(collect_early_args(&[] as &[&OsStr]).is_empty());
463 assert!(collect_early_args(&[] as &[&OsStr]).is_empty());
464 assert!(collect_early_args(&["log"]).is_empty());
464 assert!(collect_early_args(&["log"]).is_empty());
465 assert_eq!(
465 assert_eq!(
466 collect_early_args(&["log", "-Ra", "foo"]),
466 collect_early_args(&["log", "-Ra", "foo"]),
467 os_string_vec_from(&[b"-Ra"])
467 os_string_vec_from(&[b"-Ra"])
468 );
468 );
469 assert_eq!(
469 assert_eq!(
470 collect_early_args(&["log", "-R", "repo", "", "--traceback", "a"]),
470 collect_early_args(&["log", "-R", "repo", "", "--traceback", "a"]),
471 os_string_vec_from(&[b"-R", b"repo", b"--traceback"])
471 os_string_vec_from(&[b"-R", b"repo", b"--traceback"])
472 );
472 );
473 assert_eq!(
473 assert_eq!(
474 collect_early_args(&["log", "--config", "diff.git=1", "-q"]),
474 collect_early_args(&["log", "--config", "diff.git=1", "-q"]),
475 os_string_vec_from(&[b"--config", b"diff.git=1"])
475 os_string_vec_from(&[b"--config", b"diff.git=1"])
476 );
476 );
477 assert_eq!(
477 assert_eq!(
478 collect_early_args(&["--cwd=..", "--repository", "r", "log"]),
478 collect_early_args(&["--cwd=..", "--repository", "r", "log"]),
479 os_string_vec_from(&[b"--cwd=..", b"--repository", b"r"])
479 os_string_vec_from(&[b"--cwd=..", b"--repository", b"r"])
480 );
480 );
481 assert_eq!(
481 assert_eq!(
482 collect_early_args(&["log", "--repo=r", "--repos", "a"]),
482 collect_early_args(&["log", "--repo=r", "--repos", "a"]),
483 os_string_vec_from(&[b"--repo=r"])
483 os_string_vec_from(&[b"--repo=r"])
484 );
484 );
485 }
485 }
486
486
487 #[test]
487 #[test]
488 fn collect_early_args_orphaned() {
488 fn collect_early_args_orphaned() {
489 assert!(collect_early_args(&["log", "-R"]).is_empty());
489 assert!(collect_early_args(&["log", "-R"]).is_empty());
490 assert!(collect_early_args(&["log", "--config"]).is_empty());
490 assert!(collect_early_args(&["log", "--config"]).is_empty());
491 }
491 }
492
492
493 #[test]
493 #[test]
494 fn collect_early_args_unwanted_value() {
494 fn collect_early_args_unwanted_value() {
495 assert!(collect_early_args(&["log", "--traceback="]).is_empty());
495 assert!(collect_early_args(&["log", "--traceback="]).is_empty());
496 }
496 }
497
497
498 fn os_string_vec_from(v: &[&[u8]]) -> Vec<OsString> {
498 fn os_string_vec_from(v: &[&[u8]]) -> Vec<OsString> {
499 v.iter().map(|s| OsStr::from_bytes(s).to_owned()).collect()
499 v.iter().map(|s| OsStr::from_bytes(s).to_owned()).collect()
500 }
500 }
501 }
501 }
@@ -1,171 +1,171 b''
1 // Copyright 2018 Yuya Nishihara <yuya@tcha.org>
1 // Copyright 2018 Yuya Nishihara <yuya@tcha.org>
2 //
2 //
3 // This software may be used and distributed according to the terms of the
3 // This software may be used and distributed according to the terms of the
4 // GNU General Public License version 2 or any later version.
4 // GNU General Public License version 2 or any later version.
5
5
6 //! Functions to run Mercurial command in cHg-aware command server.
6 //! Functions to run Mercurial command in cHg-aware command server.
7
7
8 use bytes::Bytes;
8 use bytes::Bytes;
9 use futures::future::IntoFuture;
9 use futures::future::IntoFuture;
10 use futures::{Async, Future, Poll};
10 use futures::{Async, Future, Poll};
11 use std::io;
11 use std::io;
12 use std::mem;
12 use std::mem;
13 use std::os::unix::io::AsRawFd;
13 use std::os::unix::io::AsRawFd;
14 use tokio_hglib::codec::ChannelMessage;
14 use tokio_hglib::codec::ChannelMessage;
15 use tokio_hglib::protocol::MessageLoop;
15 use tokio_hglib::protocol::MessageLoop;
16 use tokio_hglib::{Client, Connection};
16 use tokio_hglib::{Client, Connection};
17
17
18 use super::attachio::AttachIo;
18 use crate::attachio::AttachIo;
19 use super::message::{self, CommandType};
19 use crate::message::{self, CommandType};
20 use super::uihandler::SystemHandler;
20 use crate::uihandler::SystemHandler;
21
21
22 enum AsyncS<R, S> {
22 enum AsyncS<R, S> {
23 Ready(R),
23 Ready(R),
24 NotReady(S),
24 NotReady(S),
25 PollAgain(S),
25 PollAgain(S),
26 }
26 }
27
27
28 enum CommandState<C, H>
28 enum CommandState<C, H>
29 where
29 where
30 C: Connection,
30 C: Connection,
31 H: SystemHandler,
31 H: SystemHandler,
32 {
32 {
33 Running(MessageLoop<C>, H),
33 Running(MessageLoop<C>, H),
34 SpawningPager(Client<C>, <H::SpawnPagerResult as IntoFuture>::Future),
34 SpawningPager(Client<C>, <H::SpawnPagerResult as IntoFuture>::Future),
35 AttachingPager(AttachIo<C, io::Stdin, H::PagerStdin, H::PagerStdin>, H),
35 AttachingPager(AttachIo<C, io::Stdin, H::PagerStdin, H::PagerStdin>, H),
36 WaitingSystem(Client<C>, <H::RunSystemResult as IntoFuture>::Future),
36 WaitingSystem(Client<C>, <H::RunSystemResult as IntoFuture>::Future),
37 Finished,
37 Finished,
38 }
38 }
39
39
40 type CommandPoll<C, H> = io::Result<AsyncS<(Client<C>, H, i32), CommandState<C, H>>>;
40 type CommandPoll<C, H> = io::Result<AsyncS<(Client<C>, H, i32), CommandState<C, H>>>;
41
41
42 /// Future resolves to `(exit_code, client)`.
42 /// Future resolves to `(exit_code, client)`.
43 #[must_use = "futures do nothing unless polled"]
43 #[must_use = "futures do nothing unless polled"]
44 pub struct ChgRunCommand<C, H>
44 pub struct ChgRunCommand<C, H>
45 where
45 where
46 C: Connection,
46 C: Connection,
47 H: SystemHandler,
47 H: SystemHandler,
48 {
48 {
49 state: CommandState<C, H>,
49 state: CommandState<C, H>,
50 }
50 }
51
51
52 impl<C, H> ChgRunCommand<C, H>
52 impl<C, H> ChgRunCommand<C, H>
53 where
53 where
54 C: Connection + AsRawFd,
54 C: Connection + AsRawFd,
55 H: SystemHandler,
55 H: SystemHandler,
56 {
56 {
57 pub fn with_client(client: Client<C>, handler: H, packed_args: Bytes) -> ChgRunCommand<C, H> {
57 pub fn with_client(client: Client<C>, handler: H, packed_args: Bytes) -> ChgRunCommand<C, H> {
58 let msg_loop = MessageLoop::start_with_args(client, b"runcommand", packed_args);
58 let msg_loop = MessageLoop::start_with_args(client, b"runcommand", packed_args);
59 ChgRunCommand {
59 ChgRunCommand {
60 state: CommandState::Running(msg_loop, handler),
60 state: CommandState::Running(msg_loop, handler),
61 }
61 }
62 }
62 }
63 }
63 }
64
64
65 impl<C, H> Future for ChgRunCommand<C, H>
65 impl<C, H> Future for ChgRunCommand<C, H>
66 where
66 where
67 C: Connection + AsRawFd,
67 C: Connection + AsRawFd,
68 H: SystemHandler,
68 H: SystemHandler,
69 {
69 {
70 type Item = (Client<C>, H, i32);
70 type Item = (Client<C>, H, i32);
71 type Error = io::Error;
71 type Error = io::Error;
72
72
73 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
73 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
74 loop {
74 loop {
75 let state = mem::replace(&mut self.state, CommandState::Finished);
75 let state = mem::replace(&mut self.state, CommandState::Finished);
76 match state.poll()? {
76 match state.poll()? {
77 AsyncS::Ready((client, handler, code)) => {
77 AsyncS::Ready((client, handler, code)) => {
78 return Ok(Async::Ready((client, handler, code)));
78 return Ok(Async::Ready((client, handler, code)));
79 }
79 }
80 AsyncS::NotReady(newstate) => {
80 AsyncS::NotReady(newstate) => {
81 self.state = newstate;
81 self.state = newstate;
82 return Ok(Async::NotReady);
82 return Ok(Async::NotReady);
83 }
83 }
84 AsyncS::PollAgain(newstate) => {
84 AsyncS::PollAgain(newstate) => {
85 self.state = newstate;
85 self.state = newstate;
86 }
86 }
87 }
87 }
88 }
88 }
89 }
89 }
90 }
90 }
91
91
92 impl<C, H> CommandState<C, H>
92 impl<C, H> CommandState<C, H>
93 where
93 where
94 C: Connection + AsRawFd,
94 C: Connection + AsRawFd,
95 H: SystemHandler,
95 H: SystemHandler,
96 {
96 {
97 fn poll(self) -> CommandPoll<C, H> {
97 fn poll(self) -> CommandPoll<C, H> {
98 match self {
98 match self {
99 CommandState::Running(mut msg_loop, handler) => {
99 CommandState::Running(mut msg_loop, handler) => {
100 if let Async::Ready((client, msg)) = msg_loop.poll()? {
100 if let Async::Ready((client, msg)) = msg_loop.poll()? {
101 process_message(client, handler, msg)
101 process_message(client, handler, msg)
102 } else {
102 } else {
103 Ok(AsyncS::NotReady(CommandState::Running(msg_loop, handler)))
103 Ok(AsyncS::NotReady(CommandState::Running(msg_loop, handler)))
104 }
104 }
105 }
105 }
106 CommandState::SpawningPager(client, mut fut) => {
106 CommandState::SpawningPager(client, mut fut) => {
107 if let Async::Ready((handler, pin)) = fut.poll()? {
107 if let Async::Ready((handler, pin)) = fut.poll()? {
108 let fut = AttachIo::with_client(client, io::stdin(), pin, None);
108 let fut = AttachIo::with_client(client, io::stdin(), pin, None);
109 Ok(AsyncS::PollAgain(CommandState::AttachingPager(
109 Ok(AsyncS::PollAgain(CommandState::AttachingPager(
110 fut, handler,
110 fut, handler,
111 )))
111 )))
112 } else {
112 } else {
113 Ok(AsyncS::NotReady(CommandState::SpawningPager(client, fut)))
113 Ok(AsyncS::NotReady(CommandState::SpawningPager(client, fut)))
114 }
114 }
115 }
115 }
116 CommandState::AttachingPager(mut fut, handler) => {
116 CommandState::AttachingPager(mut fut, handler) => {
117 if let Async::Ready(client) = fut.poll()? {
117 if let Async::Ready(client) = fut.poll()? {
118 let msg_loop = MessageLoop::start(client, b""); // terminator
118 let msg_loop = MessageLoop::start(client, b""); // terminator
119 Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler)))
119 Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler)))
120 } else {
120 } else {
121 Ok(AsyncS::NotReady(CommandState::AttachingPager(fut, handler)))
121 Ok(AsyncS::NotReady(CommandState::AttachingPager(fut, handler)))
122 }
122 }
123 }
123 }
124 CommandState::WaitingSystem(client, mut fut) => {
124 CommandState::WaitingSystem(client, mut fut) => {
125 if let Async::Ready((handler, code)) = fut.poll()? {
125 if let Async::Ready((handler, code)) = fut.poll()? {
126 let data = message::pack_result_code(code);
126 let data = message::pack_result_code(code);
127 let msg_loop = MessageLoop::resume_with_data(client, data);
127 let msg_loop = MessageLoop::resume_with_data(client, data);
128 Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler)))
128 Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler)))
129 } else {
129 } else {
130 Ok(AsyncS::NotReady(CommandState::WaitingSystem(client, fut)))
130 Ok(AsyncS::NotReady(CommandState::WaitingSystem(client, fut)))
131 }
131 }
132 }
132 }
133 CommandState::Finished => panic!("poll ChgRunCommand after it's done"),
133 CommandState::Finished => panic!("poll ChgRunCommand after it's done"),
134 }
134 }
135 }
135 }
136 }
136 }
137
137
138 fn process_message<C, H>(client: Client<C>, handler: H, msg: ChannelMessage) -> CommandPoll<C, H>
138 fn process_message<C, H>(client: Client<C>, handler: H, msg: ChannelMessage) -> CommandPoll<C, H>
139 where
139 where
140 C: Connection,
140 C: Connection,
141 H: SystemHandler,
141 H: SystemHandler,
142 {
142 {
143 match msg {
143 match msg {
144 ChannelMessage::Data(b'r', data) => {
144 ChannelMessage::Data(b'r', data) => {
145 let code = message::parse_result_code(data)?;
145 let code = message::parse_result_code(data)?;
146 Ok(AsyncS::Ready((client, handler, code)))
146 Ok(AsyncS::Ready((client, handler, code)))
147 }
147 }
148 ChannelMessage::Data(..) => {
148 ChannelMessage::Data(..) => {
149 // just ignores data sent to optional channel
149 // just ignores data sent to optional channel
150 let msg_loop = MessageLoop::resume(client);
150 let msg_loop = MessageLoop::resume(client);
151 Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler)))
151 Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler)))
152 }
152 }
153 ChannelMessage::InputRequest(..) | ChannelMessage::LineRequest(..) => Err(io::Error::new(
153 ChannelMessage::InputRequest(..) | ChannelMessage::LineRequest(..) => Err(io::Error::new(
154 io::ErrorKind::InvalidData,
154 io::ErrorKind::InvalidData,
155 "unsupported request",
155 "unsupported request",
156 )),
156 )),
157 ChannelMessage::SystemRequest(data) => {
157 ChannelMessage::SystemRequest(data) => {
158 let (cmd_type, cmd_spec) = message::parse_command_spec(data)?;
158 let (cmd_type, cmd_spec) = message::parse_command_spec(data)?;
159 match cmd_type {
159 match cmd_type {
160 CommandType::Pager => {
160 CommandType::Pager => {
161 let fut = handler.spawn_pager(cmd_spec).into_future();
161 let fut = handler.spawn_pager(cmd_spec).into_future();
162 Ok(AsyncS::PollAgain(CommandState::SpawningPager(client, fut)))
162 Ok(AsyncS::PollAgain(CommandState::SpawningPager(client, fut)))
163 }
163 }
164 CommandType::System => {
164 CommandType::System => {
165 let fut = handler.run_system(cmd_spec).into_future();
165 let fut = handler.run_system(cmd_spec).into_future();
166 Ok(AsyncS::PollAgain(CommandState::WaitingSystem(client, fut)))
166 Ok(AsyncS::PollAgain(CommandState::WaitingSystem(client, fut)))
167 }
167 }
168 }
168 }
169 }
169 }
170 }
170 }
171 }
171 }
@@ -1,88 +1,88 b''
1 // Copyright 2018 Yuya Nishihara <yuya@tcha.org>
1 // Copyright 2018 Yuya Nishihara <yuya@tcha.org>
2 //
2 //
3 // This software may be used and distributed according to the terms of the
3 // This software may be used and distributed according to the terms of the
4 // GNU General Public License version 2 or any later version.
4 // GNU General Public License version 2 or any later version.
5
5
6 use futures::future::IntoFuture;
6 use futures::future::IntoFuture;
7 use futures::Future;
7 use futures::Future;
8 use std::io;
8 use std::io;
9 use std::os::unix::io::AsRawFd;
9 use std::os::unix::io::AsRawFd;
10 use std::os::unix::process::ExitStatusExt;
10 use std::os::unix::process::ExitStatusExt;
11 use std::process::{Command, Stdio};
11 use std::process::{Command, Stdio};
12 use tokio;
12 use tokio;
13 use tokio_process::{ChildStdin, CommandExt};
13 use tokio_process::{ChildStdin, CommandExt};
14
14
15 use super::message::CommandSpec;
15 use crate::message::CommandSpec;
16 use super::procutil;
16 use crate::procutil;
17
17
18 /// Callback to process shell command requests received from server.
18 /// Callback to process shell command requests received from server.
19 pub trait SystemHandler: Sized {
19 pub trait SystemHandler: Sized {
20 type PagerStdin: AsRawFd;
20 type PagerStdin: AsRawFd;
21 type SpawnPagerResult: IntoFuture<Item = (Self, Self::PagerStdin), Error = io::Error>;
21 type SpawnPagerResult: IntoFuture<Item = (Self, Self::PagerStdin), Error = io::Error>;
22 type RunSystemResult: IntoFuture<Item = (Self, i32), Error = io::Error>;
22 type RunSystemResult: IntoFuture<Item = (Self, i32), Error = io::Error>;
23
23
24 /// Handles pager command request.
24 /// Handles pager command request.
25 ///
25 ///
26 /// Returns the pipe to be attached to the server if the pager is spawned.
26 /// Returns the pipe to be attached to the server if the pager is spawned.
27 fn spawn_pager(self, spec: CommandSpec) -> Self::SpawnPagerResult;
27 fn spawn_pager(self, spec: CommandSpec) -> Self::SpawnPagerResult;
28
28
29 /// Handles system command request.
29 /// Handles system command request.
30 ///
30 ///
31 /// Returns command exit code (positive) or signal number (negative).
31 /// Returns command exit code (positive) or signal number (negative).
32 fn run_system(self, spec: CommandSpec) -> Self::RunSystemResult;
32 fn run_system(self, spec: CommandSpec) -> Self::RunSystemResult;
33 }
33 }
34
34
35 /// Default cHg implementation to process requests received from server.
35 /// Default cHg implementation to process requests received from server.
36 pub struct ChgUiHandler {}
36 pub struct ChgUiHandler {}
37
37
38 impl ChgUiHandler {
38 impl ChgUiHandler {
39 pub fn new() -> ChgUiHandler {
39 pub fn new() -> ChgUiHandler {
40 ChgUiHandler {}
40 ChgUiHandler {}
41 }
41 }
42 }
42 }
43
43
44 impl SystemHandler for ChgUiHandler {
44 impl SystemHandler for ChgUiHandler {
45 type PagerStdin = ChildStdin;
45 type PagerStdin = ChildStdin;
46 type SpawnPagerResult = io::Result<(Self, Self::PagerStdin)>;
46 type SpawnPagerResult = io::Result<(Self, Self::PagerStdin)>;
47 type RunSystemResult = Box<dyn Future<Item = (Self, i32), Error = io::Error> + Send>;
47 type RunSystemResult = Box<dyn Future<Item = (Self, i32), Error = io::Error> + Send>;
48
48
49 fn spawn_pager(self, spec: CommandSpec) -> Self::SpawnPagerResult {
49 fn spawn_pager(self, spec: CommandSpec) -> Self::SpawnPagerResult {
50 let mut pager = new_shell_command(&spec)
50 let mut pager = new_shell_command(&spec)
51 .stdin(Stdio::piped())
51 .stdin(Stdio::piped())
52 .spawn_async()?;
52 .spawn_async()?;
53 let pin = pager.stdin().take().unwrap();
53 let pin = pager.stdin().take().unwrap();
54 procutil::set_blocking_fd(pin.as_raw_fd())?;
54 procutil::set_blocking_fd(pin.as_raw_fd())?;
55 // TODO: if pager exits, notify the server with SIGPIPE immediately.
55 // TODO: if pager exits, notify the server with SIGPIPE immediately.
56 // otherwise the server won't get SIGPIPE if it does not write
56 // otherwise the server won't get SIGPIPE if it does not write
57 // anything. (issue5278)
57 // anything. (issue5278)
58 // kill(peerpid, SIGPIPE);
58 // kill(peerpid, SIGPIPE);
59 tokio::spawn(pager.map(|_| ()).map_err(|_| ())); // just ignore errors
59 tokio::spawn(pager.map(|_| ()).map_err(|_| ())); // just ignore errors
60 Ok((self, pin))
60 Ok((self, pin))
61 }
61 }
62
62
63 fn run_system(self, spec: CommandSpec) -> Self::RunSystemResult {
63 fn run_system(self, spec: CommandSpec) -> Self::RunSystemResult {
64 let fut = new_shell_command(&spec)
64 let fut = new_shell_command(&spec)
65 .spawn_async()
65 .spawn_async()
66 .into_future()
66 .into_future()
67 .flatten()
67 .flatten()
68 .map(|status| {
68 .map(|status| {
69 let code = status
69 let code = status
70 .code()
70 .code()
71 .or_else(|| status.signal().map(|n| -n))
71 .or_else(|| status.signal().map(|n| -n))
72 .expect("either exit code or signal should be set");
72 .expect("either exit code or signal should be set");
73 (self, code)
73 (self, code)
74 });
74 });
75 Box::new(fut)
75 Box::new(fut)
76 }
76 }
77 }
77 }
78
78
79 fn new_shell_command(spec: &CommandSpec) -> Command {
79 fn new_shell_command(spec: &CommandSpec) -> Command {
80 let mut builder = Command::new("/bin/sh");
80 let mut builder = Command::new("/bin/sh");
81 builder
81 builder
82 .arg("-c")
82 .arg("-c")
83 .arg(&spec.command)
83 .arg(&spec.command)
84 .current_dir(&spec.current_dir)
84 .current_dir(&spec.current_dir)
85 .env_clear()
85 .env_clear()
86 .envs(spec.envs.iter().cloned());
86 .envs(spec.envs.iter().cloned());
87 builder
87 builder
88 }
88 }
General Comments 0
You need to be logged in to leave comments. Login now