##// END OF EJS Templates
rust-chg: reimplement run_command operation as async function...
Yuya Nishihara -
r45235:94cace4b default
parent child Browse files
Show More
@@ -8,7 +8,7 b' mod attachio;'
8 8 //pub mod locator;
9 9 pub mod message;
10 10 pub mod procutil;
11 //mod runcommand;
11 mod runcommand;
12 12 mod uihandler;
13 13
14 14 //pub use clientext::ChgClientExt;
@@ -6,164 +6,56 b''
6 6 //! Functions to run Mercurial command in cHg-aware command server.
7 7
8 8 use bytes::Bytes;
9 use futures::future::IntoFuture;
10 use futures::{Async, Future, Poll};
11 9 use std::io;
12 use std::mem;
13 10 use std::os::unix::io::AsRawFd;
14 11 use tokio_hglib::codec::ChannelMessage;
15 use tokio_hglib::protocol::MessageLoop;
16 use tokio_hglib::{Client, Connection};
12 use tokio_hglib::{Connection, Protocol};
17 13
18 use crate::attachio::AttachIo;
14 use crate::attachio;
19 15 use crate::message::{self, CommandType};
20 16 use crate::uihandler::SystemHandler;
21 17
22 enum AsyncS<R, S> {
23 Ready(R),
24 NotReady(S),
25 PollAgain(S),
26 }
27
28 enum CommandState<C, H>
29 where
30 C: Connection,
31 H: SystemHandler,
32 {
33 Running(MessageLoop<C>, H),
34 SpawningPager(Client<C>, <H::SpawnPagerResult as IntoFuture>::Future),
35 AttachingPager(AttachIo<C, io::Stdin, H::PagerStdin, H::PagerStdin>, H),
36 WaitingSystem(Client<C>, <H::RunSystemResult as IntoFuture>::Future),
37 Finished,
38 }
39
40 type CommandPoll<C, H> = io::Result<AsyncS<(Client<C>, H, i32), CommandState<C, H>>>;
41
42 /// Future resolves to `(exit_code, client)`.
43 #[must_use = "futures do nothing unless polled"]
44 pub struct ChgRunCommand<C, H>
45 where
46 C: Connection,
47 H: SystemHandler,
48 {
49 state: CommandState<C, H>,
50 }
51
52 impl<C, H> ChgRunCommand<C, H>
53 where
54 C: Connection + AsRawFd,
55 H: SystemHandler,
56 {
57 pub fn with_client(client: Client<C>, handler: H, packed_args: Bytes) -> ChgRunCommand<C, H> {
58 let msg_loop = MessageLoop::start_with_args(client, b"runcommand", packed_args);
59 ChgRunCommand {
60 state: CommandState::Running(msg_loop, handler),
61 }
62 }
63 }
64
65 impl<C, H> Future for ChgRunCommand<C, H>
66 where
67 C: Connection + AsRawFd,
68 H: SystemHandler,
69 {
70 type Item = (Client<C>, H, i32);
71 type Error = io::Error;
72
73 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
74 loop {
75 let state = mem::replace(&mut self.state, CommandState::Finished);
76 match state.poll()? {
77 AsyncS::Ready((client, handler, code)) => {
78 return Ok(Async::Ready((client, handler, code)));
79 }
80 AsyncS::NotReady(newstate) => {
81 self.state = newstate;
82 return Ok(Async::NotReady);
83 }
84 AsyncS::PollAgain(newstate) => {
85 self.state = newstate;
86 }
87 }
88 }
89 }
90 }
91
92 impl<C, H> CommandState<C, H>
93 where
94 C: Connection + AsRawFd,
95 H: SystemHandler,
96 {
97 fn poll(self) -> CommandPoll<C, H> {
98 match self {
99 CommandState::Running(mut msg_loop, handler) => {
100 if let Async::Ready((client, msg)) = msg_loop.poll()? {
101 process_message(client, handler, msg)
102 } else {
103 Ok(AsyncS::NotReady(CommandState::Running(msg_loop, handler)))
104 }
105 }
106 CommandState::SpawningPager(client, mut fut) => {
107 if let Async::Ready((handler, pin)) = fut.poll()? {
108 let fut = AttachIo::with_client(client, io::stdin(), pin, None);
109 Ok(AsyncS::PollAgain(CommandState::AttachingPager(
110 fut, handler,
111 )))
112 } else {
113 Ok(AsyncS::NotReady(CommandState::SpawningPager(client, fut)))
114 }
115 }
116 CommandState::AttachingPager(mut fut, handler) => {
117 if let Async::Ready(client) = fut.poll()? {
118 let msg_loop = MessageLoop::start(client, b""); // terminator
119 Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler)))
120 } else {
121 Ok(AsyncS::NotReady(CommandState::AttachingPager(fut, handler)))
122 }
123 }
124 CommandState::WaitingSystem(client, mut fut) => {
125 if let Async::Ready((handler, code)) = fut.poll()? {
126 let data = message::pack_result_code(code);
127 let msg_loop = MessageLoop::resume_with_data(client, data);
128 Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler)))
129 } else {
130 Ok(AsyncS::NotReady(CommandState::WaitingSystem(client, fut)))
131 }
132 }
133 CommandState::Finished => panic!("poll ChgRunCommand after it's done"),
134 }
135 }
136 }
137
138 fn process_message<C, H>(client: Client<C>, handler: H, msg: ChannelMessage) -> CommandPoll<C, H>
139 where
140 C: Connection,
141 H: SystemHandler,
142 {
143 {
144 match msg {
18 /// Runs the given Mercurial command in cHg-aware command server, and
19 /// fetches the result code.
20 ///
21 /// This is a subset of tokio-hglib's `run_command()` with the additional
22 /// SystemRequest support.
23 pub async fn run_command(
24 proto: &mut Protocol<impl Connection + AsRawFd>,
25 handler: &mut impl SystemHandler,
26 packed_args: impl Into<Bytes>,
27 ) -> io::Result<i32> {
28 proto
29 .send_command_with_args("runcommand", packed_args)
30 .await?;
31 loop {
32 match proto.fetch_response().await? {
145 33 ChannelMessage::Data(b'r', data) => {
146 let code = message::parse_result_code(data)?;
147 Ok(AsyncS::Ready((client, handler, code)))
34 return message::parse_result_code(data);
148 35 }
149 36 ChannelMessage::Data(..) => {
150 37 // just ignores data sent to optional channel
151 let msg_loop = MessageLoop::resume(client);
152 Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler)))
153 38 }
154 ChannelMessage::InputRequest(..) | ChannelMessage::LineRequest(..) => Err(
155 io::Error::new(io::ErrorKind::InvalidData, "unsupported request"),
156 ),
39 ChannelMessage::InputRequest(..) | ChannelMessage::LineRequest(..) => {
40 return Err(io::Error::new(
41 io::ErrorKind::InvalidData,
42 "unsupported request",
43 ));
44 }
157 45 ChannelMessage::SystemRequest(data) => {
158 46 let (cmd_type, cmd_spec) = message::parse_command_spec(data)?;
159 47 match cmd_type {
160 48 CommandType::Pager => {
161 let fut = handler.spawn_pager(cmd_spec).into_future();
162 Ok(AsyncS::PollAgain(CommandState::SpawningPager(client, fut)))
49 // server spins new command loop while pager request is
50 // in progress, which can be terminated by "" command.
51 let pin = handler.spawn_pager(&cmd_spec).await?;
52 attachio::attach_io(proto, &io::stdin(), &pin, &pin).await?;
53 proto.send_command("").await?; // terminator
163 54 }
164 55 CommandType::System => {
165 let fut = handler.run_system(cmd_spec).into_future();
166 Ok(AsyncS::PollAgain(CommandState::WaitingSystem(client, fut)))
56 let code = handler.run_system(&cmd_spec).await?;
57 let data = message::pack_result_code(code);
58 proto.send_data(data).await?;
167 59 }
168 60 }
169 61 }
General Comments 0
You need to be logged in to leave comments. Login now