##// END OF EJS Templates
rust-chg: reimplement run_command operation as async function...
Yuya Nishihara -
r45235:94cace4b default
parent child Browse files
Show More
@@ -8,7 +8,7 b' mod attachio;'
8 //pub mod locator;
8 //pub mod locator;
9 pub mod message;
9 pub mod message;
10 pub mod procutil;
10 pub mod procutil;
11 //mod runcommand;
11 mod runcommand;
12 mod uihandler;
12 mod uihandler;
13
13
14 //pub use clientext::ChgClientExt;
14 //pub use clientext::ChgClientExt;
@@ -6,164 +6,56 b''
6 //! Functions to run Mercurial command in cHg-aware command server.
6 //! Functions to run Mercurial command in cHg-aware command server.
7
7
8 use bytes::Bytes;
8 use bytes::Bytes;
9 use futures::future::IntoFuture;
10 use futures::{Async, Future, Poll};
11 use std::io;
9 use std::io;
12 use std::mem;
13 use std::os::unix::io::AsRawFd;
10 use std::os::unix::io::AsRawFd;
14 use tokio_hglib::codec::ChannelMessage;
11 use tokio_hglib::codec::ChannelMessage;
15 use tokio_hglib::protocol::MessageLoop;
12 use tokio_hglib::{Connection, Protocol};
16 use tokio_hglib::{Client, Connection};
17
13
18 use crate::attachio::AttachIo;
14 use crate::attachio;
19 use crate::message::{self, CommandType};
15 use crate::message::{self, CommandType};
20 use crate::uihandler::SystemHandler;
16 use crate::uihandler::SystemHandler;
21
17
22 enum AsyncS<R, S> {
18 /// Runs the given Mercurial command in cHg-aware command server, and
23 Ready(R),
19 /// fetches the result code.
24 NotReady(S),
20 ///
25 PollAgain(S),
21 /// This is a subset of tokio-hglib's `run_command()` with the additional
26 }
22 /// SystemRequest support.
27
23 pub async fn run_command(
28 enum CommandState<C, H>
24 proto: &mut Protocol<impl Connection + AsRawFd>,
29 where
25 handler: &mut impl SystemHandler,
30 C: Connection,
26 packed_args: impl Into<Bytes>,
31 H: SystemHandler,
27 ) -> io::Result<i32> {
32 {
28 proto
33 Running(MessageLoop<C>, H),
29 .send_command_with_args("runcommand", packed_args)
34 SpawningPager(Client<C>, <H::SpawnPagerResult as IntoFuture>::Future),
30 .await?;
35 AttachingPager(AttachIo<C, io::Stdin, H::PagerStdin, H::PagerStdin>, H),
31 loop {
36 WaitingSystem(Client<C>, <H::RunSystemResult as IntoFuture>::Future),
32 match proto.fetch_response().await? {
37 Finished,
38 }
39
40 type CommandPoll<C, H> = io::Result<AsyncS<(Client<C>, H, i32), CommandState<C, H>>>;
41
42 /// Future resolves to `(exit_code, client)`.
43 #[must_use = "futures do nothing unless polled"]
44 pub struct ChgRunCommand<C, H>
45 where
46 C: Connection,
47 H: SystemHandler,
48 {
49 state: CommandState<C, H>,
50 }
51
52 impl<C, H> ChgRunCommand<C, H>
53 where
54 C: Connection + AsRawFd,
55 H: SystemHandler,
56 {
57 pub fn with_client(client: Client<C>, handler: H, packed_args: Bytes) -> ChgRunCommand<C, H> {
58 let msg_loop = MessageLoop::start_with_args(client, b"runcommand", packed_args);
59 ChgRunCommand {
60 state: CommandState::Running(msg_loop, handler),
61 }
62 }
63 }
64
65 impl<C, H> Future for ChgRunCommand<C, H>
66 where
67 C: Connection + AsRawFd,
68 H: SystemHandler,
69 {
70 type Item = (Client<C>, H, i32);
71 type Error = io::Error;
72
73 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
74 loop {
75 let state = mem::replace(&mut self.state, CommandState::Finished);
76 match state.poll()? {
77 AsyncS::Ready((client, handler, code)) => {
78 return Ok(Async::Ready((client, handler, code)));
79 }
80 AsyncS::NotReady(newstate) => {
81 self.state = newstate;
82 return Ok(Async::NotReady);
83 }
84 AsyncS::PollAgain(newstate) => {
85 self.state = newstate;
86 }
87 }
88 }
89 }
90 }
91
92 impl<C, H> CommandState<C, H>
93 where
94 C: Connection + AsRawFd,
95 H: SystemHandler,
96 {
97 fn poll(self) -> CommandPoll<C, H> {
98 match self {
99 CommandState::Running(mut msg_loop, handler) => {
100 if let Async::Ready((client, msg)) = msg_loop.poll()? {
101 process_message(client, handler, msg)
102 } else {
103 Ok(AsyncS::NotReady(CommandState::Running(msg_loop, handler)))
104 }
105 }
106 CommandState::SpawningPager(client, mut fut) => {
107 if let Async::Ready((handler, pin)) = fut.poll()? {
108 let fut = AttachIo::with_client(client, io::stdin(), pin, None);
109 Ok(AsyncS::PollAgain(CommandState::AttachingPager(
110 fut, handler,
111 )))
112 } else {
113 Ok(AsyncS::NotReady(CommandState::SpawningPager(client, fut)))
114 }
115 }
116 CommandState::AttachingPager(mut fut, handler) => {
117 if let Async::Ready(client) = fut.poll()? {
118 let msg_loop = MessageLoop::start(client, b""); // terminator
119 Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler)))
120 } else {
121 Ok(AsyncS::NotReady(CommandState::AttachingPager(fut, handler)))
122 }
123 }
124 CommandState::WaitingSystem(client, mut fut) => {
125 if let Async::Ready((handler, code)) = fut.poll()? {
126 let data = message::pack_result_code(code);
127 let msg_loop = MessageLoop::resume_with_data(client, data);
128 Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler)))
129 } else {
130 Ok(AsyncS::NotReady(CommandState::WaitingSystem(client, fut)))
131 }
132 }
133 CommandState::Finished => panic!("poll ChgRunCommand after it's done"),
134 }
135 }
136 }
137
138 fn process_message<C, H>(client: Client<C>, handler: H, msg: ChannelMessage) -> CommandPoll<C, H>
139 where
140 C: Connection,
141 H: SystemHandler,
142 {
143 {
144 match msg {
145 ChannelMessage::Data(b'r', data) => {
33 ChannelMessage::Data(b'r', data) => {
146 let code = message::parse_result_code(data)?;
34 return message::parse_result_code(data);
147 Ok(AsyncS::Ready((client, handler, code)))
148 }
35 }
149 ChannelMessage::Data(..) => {
36 ChannelMessage::Data(..) => {
150 // just ignores data sent to optional channel
37 // just ignores data sent to optional channel
151 let msg_loop = MessageLoop::resume(client);
152 Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler)))
153 }
38 }
154 ChannelMessage::InputRequest(..) | ChannelMessage::LineRequest(..) => Err(
39 ChannelMessage::InputRequest(..) | ChannelMessage::LineRequest(..) => {
155 io::Error::new(io::ErrorKind::InvalidData, "unsupported request"),
40 return Err(io::Error::new(
156 ),
41 io::ErrorKind::InvalidData,
42 "unsupported request",
43 ));
44 }
157 ChannelMessage::SystemRequest(data) => {
45 ChannelMessage::SystemRequest(data) => {
158 let (cmd_type, cmd_spec) = message::parse_command_spec(data)?;
46 let (cmd_type, cmd_spec) = message::parse_command_spec(data)?;
159 match cmd_type {
47 match cmd_type {
160 CommandType::Pager => {
48 CommandType::Pager => {
161 let fut = handler.spawn_pager(cmd_spec).into_future();
49 // server spins new command loop while pager request is
162 Ok(AsyncS::PollAgain(CommandState::SpawningPager(client, fut)))
50 // in progress, which can be terminated by "" command.
51 let pin = handler.spawn_pager(&cmd_spec).await?;
52 attachio::attach_io(proto, &io::stdin(), &pin, &pin).await?;
53 proto.send_command("").await?; // terminator
163 }
54 }
164 CommandType::System => {
55 CommandType::System => {
165 let fut = handler.run_system(cmd_spec).into_future();
56 let code = handler.run_system(&cmd_spec).await?;
166 Ok(AsyncS::PollAgain(CommandState::WaitingSystem(client, fut)))
57 let data = message::pack_result_code(code);
58 proto.send_data(data).await?;
167 }
59 }
168 }
60 }
169 }
61 }
General Comments 0
You need to be logged in to leave comments. Login now