##// END OF EJS Templates
rust-chg: indent process_message() to prepare mass rewrite to futures-0.3...
Yuya Nishihara -
r45185:f8780482 default
parent child Browse files
Show More
@@ -1,171 +1,172
1 1 // Copyright 2018 Yuya Nishihara <yuya@tcha.org>
2 2 //
3 3 // This software may be used and distributed according to the terms of the
4 4 // GNU General Public License version 2 or any later version.
5 5
6 6 //! Functions to run Mercurial command in cHg-aware command server.
7 7
8 8 use bytes::Bytes;
9 9 use futures::future::IntoFuture;
10 10 use futures::{Async, Future, Poll};
11 11 use std::io;
12 12 use std::mem;
13 13 use std::os::unix::io::AsRawFd;
14 14 use tokio_hglib::codec::ChannelMessage;
15 15 use tokio_hglib::protocol::MessageLoop;
16 16 use tokio_hglib::{Client, Connection};
17 17
18 18 use crate::attachio::AttachIo;
19 19 use crate::message::{self, CommandType};
20 20 use crate::uihandler::SystemHandler;
21 21
22 22 enum AsyncS<R, S> {
23 23 Ready(R),
24 24 NotReady(S),
25 25 PollAgain(S),
26 26 }
27 27
28 28 enum CommandState<C, H>
29 29 where
30 30 C: Connection,
31 31 H: SystemHandler,
32 32 {
33 33 Running(MessageLoop<C>, H),
34 34 SpawningPager(Client<C>, <H::SpawnPagerResult as IntoFuture>::Future),
35 35 AttachingPager(AttachIo<C, io::Stdin, H::PagerStdin, H::PagerStdin>, H),
36 36 WaitingSystem(Client<C>, <H::RunSystemResult as IntoFuture>::Future),
37 37 Finished,
38 38 }
39 39
40 40 type CommandPoll<C, H> = io::Result<AsyncS<(Client<C>, H, i32), CommandState<C, H>>>;
41 41
42 42 /// Future resolves to `(exit_code, client)`.
43 43 #[must_use = "futures do nothing unless polled"]
44 44 pub struct ChgRunCommand<C, H>
45 45 where
46 46 C: Connection,
47 47 H: SystemHandler,
48 48 {
49 49 state: CommandState<C, H>,
50 50 }
51 51
52 52 impl<C, H> ChgRunCommand<C, H>
53 53 where
54 54 C: Connection + AsRawFd,
55 55 H: SystemHandler,
56 56 {
57 57 pub fn with_client(client: Client<C>, handler: H, packed_args: Bytes) -> ChgRunCommand<C, H> {
58 58 let msg_loop = MessageLoop::start_with_args(client, b"runcommand", packed_args);
59 59 ChgRunCommand {
60 60 state: CommandState::Running(msg_loop, handler),
61 61 }
62 62 }
63 63 }
64 64
65 65 impl<C, H> Future for ChgRunCommand<C, H>
66 66 where
67 67 C: Connection + AsRawFd,
68 68 H: SystemHandler,
69 69 {
70 70 type Item = (Client<C>, H, i32);
71 71 type Error = io::Error;
72 72
73 73 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
74 74 loop {
75 75 let state = mem::replace(&mut self.state, CommandState::Finished);
76 76 match state.poll()? {
77 77 AsyncS::Ready((client, handler, code)) => {
78 78 return Ok(Async::Ready((client, handler, code)));
79 79 }
80 80 AsyncS::NotReady(newstate) => {
81 81 self.state = newstate;
82 82 return Ok(Async::NotReady);
83 83 }
84 84 AsyncS::PollAgain(newstate) => {
85 85 self.state = newstate;
86 86 }
87 87 }
88 88 }
89 89 }
90 90 }
91 91
92 92 impl<C, H> CommandState<C, H>
93 93 where
94 94 C: Connection + AsRawFd,
95 95 H: SystemHandler,
96 96 {
97 97 fn poll(self) -> CommandPoll<C, H> {
98 98 match self {
99 99 CommandState::Running(mut msg_loop, handler) => {
100 100 if let Async::Ready((client, msg)) = msg_loop.poll()? {
101 101 process_message(client, handler, msg)
102 102 } else {
103 103 Ok(AsyncS::NotReady(CommandState::Running(msg_loop, handler)))
104 104 }
105 105 }
106 106 CommandState::SpawningPager(client, mut fut) => {
107 107 if let Async::Ready((handler, pin)) = fut.poll()? {
108 108 let fut = AttachIo::with_client(client, io::stdin(), pin, None);
109 109 Ok(AsyncS::PollAgain(CommandState::AttachingPager(
110 110 fut, handler,
111 111 )))
112 112 } else {
113 113 Ok(AsyncS::NotReady(CommandState::SpawningPager(client, fut)))
114 114 }
115 115 }
116 116 CommandState::AttachingPager(mut fut, handler) => {
117 117 if let Async::Ready(client) = fut.poll()? {
118 118 let msg_loop = MessageLoop::start(client, b""); // terminator
119 119 Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler)))
120 120 } else {
121 121 Ok(AsyncS::NotReady(CommandState::AttachingPager(fut, handler)))
122 122 }
123 123 }
124 124 CommandState::WaitingSystem(client, mut fut) => {
125 125 if let Async::Ready((handler, code)) = fut.poll()? {
126 126 let data = message::pack_result_code(code);
127 127 let msg_loop = MessageLoop::resume_with_data(client, data);
128 128 Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler)))
129 129 } else {
130 130 Ok(AsyncS::NotReady(CommandState::WaitingSystem(client, fut)))
131 131 }
132 132 }
133 133 CommandState::Finished => panic!("poll ChgRunCommand after it's done"),
134 134 }
135 135 }
136 136 }
137 137
138 138 fn process_message<C, H>(client: Client<C>, handler: H, msg: ChannelMessage) -> CommandPoll<C, H>
139 139 where
140 140 C: Connection,
141 141 H: SystemHandler,
142 142 {
143 match msg {
144 ChannelMessage::Data(b'r', data) => {
145 let code = message::parse_result_code(data)?;
146 Ok(AsyncS::Ready((client, handler, code)))
147 }
148 ChannelMessage::Data(..) => {
149 // just ignores data sent to optional channel
150 let msg_loop = MessageLoop::resume(client);
151 Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler)))
152 }
153 ChannelMessage::InputRequest(..) | ChannelMessage::LineRequest(..) => Err(io::Error::new(
154 io::ErrorKind::InvalidData,
155 "unsupported request",
156 )),
157 ChannelMessage::SystemRequest(data) => {
158 let (cmd_type, cmd_spec) = message::parse_command_spec(data)?;
159 match cmd_type {
160 CommandType::Pager => {
161 let fut = handler.spawn_pager(cmd_spec).into_future();
162 Ok(AsyncS::PollAgain(CommandState::SpawningPager(client, fut)))
163 }
164 CommandType::System => {
165 let fut = handler.run_system(cmd_spec).into_future();
166 Ok(AsyncS::PollAgain(CommandState::WaitingSystem(client, fut)))
143 {
144 match msg {
145 ChannelMessage::Data(b'r', data) => {
146 let code = message::parse_result_code(data)?;
147 Ok(AsyncS::Ready((client, handler, code)))
148 }
149 ChannelMessage::Data(..) => {
150 // just ignores data sent to optional channel
151 let msg_loop = MessageLoop::resume(client);
152 Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler)))
153 }
154 ChannelMessage::InputRequest(..) | ChannelMessage::LineRequest(..) => Err(
155 io::Error::new(io::ErrorKind::InvalidData, "unsupported request"),
156 ),
157 ChannelMessage::SystemRequest(data) => {
158 let (cmd_type, cmd_spec) = message::parse_command_spec(data)?;
159 match cmd_type {
160 CommandType::Pager => {
161 let fut = handler.spawn_pager(cmd_spec).into_future();
162 Ok(AsyncS::PollAgain(CommandState::SpawningPager(client, fut)))
163 }
164 CommandType::System => {
165 let fut = handler.run_system(cmd_spec).into_future();
166 Ok(AsyncS::PollAgain(CommandState::WaitingSystem(client, fut)))
167 }
167 168 }
168 169 }
169 170 }
170 171 }
171 172 }
General Comments 0
You need to be logged in to leave comments. Login now