##// END OF EJS Templates
rust-chg: add state machine to handle "runcommand" request with cHg extension...
Yuya Nishihara -
r40011:571d8eb3 default
parent child Browse files
Show More
@@ -0,0 +1,163
1 // Copyright 2018 Yuya Nishihara <yuya@tcha.org>
2 //
3 // This software may be used and distributed according to the terms of the
4 // GNU General Public License version 2 or any later version.
5
6 //! Functions to run Mercurial command in cHg-aware command server.
7
8 use bytes::Bytes;
9 use futures::future::IntoFuture;
10 use futures::{Async, Future, Poll};
11 use std::io;
12 use std::mem;
13 use std::os::unix::io::AsRawFd;
14 use tokio_hglib::{Client, Connection};
15 use tokio_hglib::codec::ChannelMessage;
16 use tokio_hglib::protocol::MessageLoop;
17
18 use super::attachio::AttachIo;
19 use super::message::{self, CommandType};
20 use super::uihandler::SystemHandler;
21
22 enum AsyncS<R, S> {
23 Ready(R),
24 NotReady(S),
25 PollAgain(S),
26 }
27
28 enum CommandState<C, H>
29 where C: Connection,
30 H: SystemHandler,
31 {
32 Running(MessageLoop<C>, H),
33 SpawningPager(Client<C>, <H::SpawnPagerResult as IntoFuture>::Future),
34 AttachingPager(AttachIo<C, io::Stdin, H::PagerStdin, H::PagerStdin>, H),
35 WaitingSystem(Client<C>, <H::RunSystemResult as IntoFuture>::Future),
36 Finished,
37 }
38
39 type CommandPoll<C, H> = io::Result<(AsyncS<(Client<C>, H, i32), CommandState<C, H>>)>;
40
41 /// Future resolves to `(exit_code, client)`.
42 #[must_use = "futures do nothing unless polled"]
43 pub struct ChgRunCommand<C, H>
44 where C: Connection,
45 H: SystemHandler,
46 {
47 state: CommandState<C, H>,
48 }
49
50 impl<C, H> ChgRunCommand<C, H>
51 where C: Connection + AsRawFd,
52 H: SystemHandler,
53 {
54 pub fn with_client(client: Client<C>, handler: H, packed_args: Bytes)
55 -> ChgRunCommand<C, H> {
56 let msg_loop = MessageLoop::start_with_args(client, b"runcommand", packed_args);
57 ChgRunCommand {
58 state: CommandState::Running(msg_loop, handler),
59 }
60 }
61 }
62
63 impl<C, H> Future for ChgRunCommand<C, H>
64 where C: Connection + AsRawFd,
65 H: SystemHandler,
66 {
67 type Item = (Client<C>, H, i32);
68 type Error = io::Error;
69
70 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
71 loop {
72 let state = mem::replace(&mut self.state, CommandState::Finished);
73 match state.poll()? {
74 AsyncS::Ready((client, handler, code)) => {
75 return Ok(Async::Ready((client, handler, code)));
76 }
77 AsyncS::NotReady(newstate) => {
78 self.state = newstate;
79 return Ok(Async::NotReady);
80 }
81 AsyncS::PollAgain(newstate) => {
82 self.state = newstate;
83 }
84 }
85 }
86 }
87 }
88
89 impl<C, H> CommandState<C, H>
90 where C: Connection + AsRawFd,
91 H: SystemHandler,
92 {
93 fn poll(self) -> CommandPoll<C, H> {
94 match self {
95 CommandState::Running(mut msg_loop, handler) => {
96 if let Async::Ready((client, msg)) = msg_loop.poll()? {
97 process_message(client, handler, msg)
98 } else {
99 Ok(AsyncS::NotReady(CommandState::Running(msg_loop, handler)))
100 }
101 }
102 CommandState::SpawningPager(client, mut fut) => {
103 if let Async::Ready((handler, pin)) = fut.poll()? {
104 let fut = AttachIo::with_client(client, io::stdin(), pin, None);
105 Ok(AsyncS::PollAgain(CommandState::AttachingPager(fut, handler)))
106 } else {
107 Ok(AsyncS::NotReady(CommandState::SpawningPager(client, fut)))
108 }
109 }
110 CommandState::AttachingPager(mut fut, handler) => {
111 if let Async::Ready(client) = fut.poll()? {
112 let msg_loop = MessageLoop::start(client, b""); // terminator
113 Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler)))
114 } else {
115 Ok(AsyncS::NotReady(CommandState::AttachingPager(fut, handler)))
116 }
117 }
118 CommandState::WaitingSystem(client, mut fut) => {
119 if let Async::Ready((handler, code)) = fut.poll()? {
120 let data = message::pack_result_code(code);
121 let msg_loop = MessageLoop::resume_with_data(client, data);
122 Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler)))
123 } else {
124 Ok(AsyncS::NotReady(CommandState::WaitingSystem(client, fut)))
125 }
126 }
127 CommandState::Finished => panic!("poll ChgRunCommand after it's done")
128 }
129 }
130 }
131
132 fn process_message<C, H>(client: Client<C>, handler: H, msg: ChannelMessage) -> CommandPoll<C, H>
133 where C: Connection,
134 H: SystemHandler,
135 {
136 match msg {
137 ChannelMessage::Data(b'r', data) => {
138 let code = message::parse_result_code(data)?;
139 Ok(AsyncS::Ready((client, handler, code)))
140 }
141 ChannelMessage::Data(..) => {
142 // just ignores data sent to optional channel
143 let msg_loop = MessageLoop::resume(client);
144 Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler)))
145 }
146 ChannelMessage::InputRequest(..) | ChannelMessage::LineRequest(..) => {
147 Err(io::Error::new(io::ErrorKind::InvalidData, "unsupported request"))
148 }
149 ChannelMessage::SystemRequest(data) => {
150 let (cmd_type, cmd_spec) = message::parse_command_spec(data)?;
151 match cmd_type {
152 CommandType::Pager => {
153 let fut = handler.spawn_pager(cmd_spec).into_future();
154 Ok(AsyncS::PollAgain(CommandState::SpawningPager(client, fut)))
155 }
156 CommandType::System => {
157 let fut = handler.run_system(cmd_spec).into_future();
158 Ok(AsyncS::PollAgain(CommandState::WaitingSystem(client, fut)))
159 }
160 }
161 }
162 }
163 }
@@ -1,19 +1,20
1 1 // Copyright 2018 Yuya Nishihara <yuya@tcha.org>
2 2 //
3 3 // This software may be used and distributed according to the terms of the
4 4 // GNU General Public License version 2 or any later version.
5 5
6 6 extern crate bytes;
7 7 #[macro_use]
8 8 extern crate futures;
9 9 extern crate libc;
10 10 extern crate tokio;
11 11 extern crate tokio_hglib;
12 12 extern crate tokio_process;
13 13
14 14 pub mod attachio;
15 15 pub mod message;
16 16 pub mod procutil;
17 pub mod runcommand;
17 18 mod uihandler;
18 19
19 20 pub use uihandler::{ChgUiHandler, SystemHandler};
General Comments 0
You need to be logged in to leave comments. Login now