##// END OF EJS Templates
rust-chg: indent process_message() to prepare mass rewrite to futures-0.3...
Yuya Nishihara -
r45185:f8780482 default
parent child Browse files
Show More
@@ -1,171 +1,172
1 // Copyright 2018 Yuya Nishihara <yuya@tcha.org>
1 // Copyright 2018 Yuya Nishihara <yuya@tcha.org>
2 //
2 //
3 // This software may be used and distributed according to the terms of the
3 // This software may be used and distributed according to the terms of the
4 // GNU General Public License version 2 or any later version.
4 // GNU General Public License version 2 or any later version.
5
5
6 //! Functions to run Mercurial command in cHg-aware command server.
6 //! Functions to run Mercurial command in cHg-aware command server.
7
7
8 use bytes::Bytes;
8 use bytes::Bytes;
9 use futures::future::IntoFuture;
9 use futures::future::IntoFuture;
10 use futures::{Async, Future, Poll};
10 use futures::{Async, Future, Poll};
11 use std::io;
11 use std::io;
12 use std::mem;
12 use std::mem;
13 use std::os::unix::io::AsRawFd;
13 use std::os::unix::io::AsRawFd;
14 use tokio_hglib::codec::ChannelMessage;
14 use tokio_hglib::codec::ChannelMessage;
15 use tokio_hglib::protocol::MessageLoop;
15 use tokio_hglib::protocol::MessageLoop;
16 use tokio_hglib::{Client, Connection};
16 use tokio_hglib::{Client, Connection};
17
17
18 use crate::attachio::AttachIo;
18 use crate::attachio::AttachIo;
19 use crate::message::{self, CommandType};
19 use crate::message::{self, CommandType};
20 use crate::uihandler::SystemHandler;
20 use crate::uihandler::SystemHandler;
21
21
22 enum AsyncS<R, S> {
22 enum AsyncS<R, S> {
23 Ready(R),
23 Ready(R),
24 NotReady(S),
24 NotReady(S),
25 PollAgain(S),
25 PollAgain(S),
26 }
26 }
27
27
28 enum CommandState<C, H>
28 enum CommandState<C, H>
29 where
29 where
30 C: Connection,
30 C: Connection,
31 H: SystemHandler,
31 H: SystemHandler,
32 {
32 {
33 Running(MessageLoop<C>, H),
33 Running(MessageLoop<C>, H),
34 SpawningPager(Client<C>, <H::SpawnPagerResult as IntoFuture>::Future),
34 SpawningPager(Client<C>, <H::SpawnPagerResult as IntoFuture>::Future),
35 AttachingPager(AttachIo<C, io::Stdin, H::PagerStdin, H::PagerStdin>, H),
35 AttachingPager(AttachIo<C, io::Stdin, H::PagerStdin, H::PagerStdin>, H),
36 WaitingSystem(Client<C>, <H::RunSystemResult as IntoFuture>::Future),
36 WaitingSystem(Client<C>, <H::RunSystemResult as IntoFuture>::Future),
37 Finished,
37 Finished,
38 }
38 }
39
39
40 type CommandPoll<C, H> = io::Result<AsyncS<(Client<C>, H, i32), CommandState<C, H>>>;
40 type CommandPoll<C, H> = io::Result<AsyncS<(Client<C>, H, i32), CommandState<C, H>>>;
41
41
42 /// Future resolves to `(exit_code, client)`.
42 /// Future resolves to `(exit_code, client)`.
43 #[must_use = "futures do nothing unless polled"]
43 #[must_use = "futures do nothing unless polled"]
44 pub struct ChgRunCommand<C, H>
44 pub struct ChgRunCommand<C, H>
45 where
45 where
46 C: Connection,
46 C: Connection,
47 H: SystemHandler,
47 H: SystemHandler,
48 {
48 {
49 state: CommandState<C, H>,
49 state: CommandState<C, H>,
50 }
50 }
51
51
52 impl<C, H> ChgRunCommand<C, H>
52 impl<C, H> ChgRunCommand<C, H>
53 where
53 where
54 C: Connection + AsRawFd,
54 C: Connection + AsRawFd,
55 H: SystemHandler,
55 H: SystemHandler,
56 {
56 {
57 pub fn with_client(client: Client<C>, handler: H, packed_args: Bytes) -> ChgRunCommand<C, H> {
57 pub fn with_client(client: Client<C>, handler: H, packed_args: Bytes) -> ChgRunCommand<C, H> {
58 let msg_loop = MessageLoop::start_with_args(client, b"runcommand", packed_args);
58 let msg_loop = MessageLoop::start_with_args(client, b"runcommand", packed_args);
59 ChgRunCommand {
59 ChgRunCommand {
60 state: CommandState::Running(msg_loop, handler),
60 state: CommandState::Running(msg_loop, handler),
61 }
61 }
62 }
62 }
63 }
63 }
64
64
65 impl<C, H> Future for ChgRunCommand<C, H>
65 impl<C, H> Future for ChgRunCommand<C, H>
66 where
66 where
67 C: Connection + AsRawFd,
67 C: Connection + AsRawFd,
68 H: SystemHandler,
68 H: SystemHandler,
69 {
69 {
70 type Item = (Client<C>, H, i32);
70 type Item = (Client<C>, H, i32);
71 type Error = io::Error;
71 type Error = io::Error;
72
72
73 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
73 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
74 loop {
74 loop {
75 let state = mem::replace(&mut self.state, CommandState::Finished);
75 let state = mem::replace(&mut self.state, CommandState::Finished);
76 match state.poll()? {
76 match state.poll()? {
77 AsyncS::Ready((client, handler, code)) => {
77 AsyncS::Ready((client, handler, code)) => {
78 return Ok(Async::Ready((client, handler, code)));
78 return Ok(Async::Ready((client, handler, code)));
79 }
79 }
80 AsyncS::NotReady(newstate) => {
80 AsyncS::NotReady(newstate) => {
81 self.state = newstate;
81 self.state = newstate;
82 return Ok(Async::NotReady);
82 return Ok(Async::NotReady);
83 }
83 }
84 AsyncS::PollAgain(newstate) => {
84 AsyncS::PollAgain(newstate) => {
85 self.state = newstate;
85 self.state = newstate;
86 }
86 }
87 }
87 }
88 }
88 }
89 }
89 }
90 }
90 }
91
91
92 impl<C, H> CommandState<C, H>
92 impl<C, H> CommandState<C, H>
93 where
93 where
94 C: Connection + AsRawFd,
94 C: Connection + AsRawFd,
95 H: SystemHandler,
95 H: SystemHandler,
96 {
96 {
97 fn poll(self) -> CommandPoll<C, H> {
97 fn poll(self) -> CommandPoll<C, H> {
98 match self {
98 match self {
99 CommandState::Running(mut msg_loop, handler) => {
99 CommandState::Running(mut msg_loop, handler) => {
100 if let Async::Ready((client, msg)) = msg_loop.poll()? {
100 if let Async::Ready((client, msg)) = msg_loop.poll()? {
101 process_message(client, handler, msg)
101 process_message(client, handler, msg)
102 } else {
102 } else {
103 Ok(AsyncS::NotReady(CommandState::Running(msg_loop, handler)))
103 Ok(AsyncS::NotReady(CommandState::Running(msg_loop, handler)))
104 }
104 }
105 }
105 }
106 CommandState::SpawningPager(client, mut fut) => {
106 CommandState::SpawningPager(client, mut fut) => {
107 if let Async::Ready((handler, pin)) = fut.poll()? {
107 if let Async::Ready((handler, pin)) = fut.poll()? {
108 let fut = AttachIo::with_client(client, io::stdin(), pin, None);
108 let fut = AttachIo::with_client(client, io::stdin(), pin, None);
109 Ok(AsyncS::PollAgain(CommandState::AttachingPager(
109 Ok(AsyncS::PollAgain(CommandState::AttachingPager(
110 fut, handler,
110 fut, handler,
111 )))
111 )))
112 } else {
112 } else {
113 Ok(AsyncS::NotReady(CommandState::SpawningPager(client, fut)))
113 Ok(AsyncS::NotReady(CommandState::SpawningPager(client, fut)))
114 }
114 }
115 }
115 }
116 CommandState::AttachingPager(mut fut, handler) => {
116 CommandState::AttachingPager(mut fut, handler) => {
117 if let Async::Ready(client) = fut.poll()? {
117 if let Async::Ready(client) = fut.poll()? {
118 let msg_loop = MessageLoop::start(client, b""); // terminator
118 let msg_loop = MessageLoop::start(client, b""); // terminator
119 Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler)))
119 Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler)))
120 } else {
120 } else {
121 Ok(AsyncS::NotReady(CommandState::AttachingPager(fut, handler)))
121 Ok(AsyncS::NotReady(CommandState::AttachingPager(fut, handler)))
122 }
122 }
123 }
123 }
124 CommandState::WaitingSystem(client, mut fut) => {
124 CommandState::WaitingSystem(client, mut fut) => {
125 if let Async::Ready((handler, code)) = fut.poll()? {
125 if let Async::Ready((handler, code)) = fut.poll()? {
126 let data = message::pack_result_code(code);
126 let data = message::pack_result_code(code);
127 let msg_loop = MessageLoop::resume_with_data(client, data);
127 let msg_loop = MessageLoop::resume_with_data(client, data);
128 Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler)))
128 Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler)))
129 } else {
129 } else {
130 Ok(AsyncS::NotReady(CommandState::WaitingSystem(client, fut)))
130 Ok(AsyncS::NotReady(CommandState::WaitingSystem(client, fut)))
131 }
131 }
132 }
132 }
133 CommandState::Finished => panic!("poll ChgRunCommand after it's done"),
133 CommandState::Finished => panic!("poll ChgRunCommand after it's done"),
134 }
134 }
135 }
135 }
136 }
136 }
137
137
138 fn process_message<C, H>(client: Client<C>, handler: H, msg: ChannelMessage) -> CommandPoll<C, H>
138 fn process_message<C, H>(client: Client<C>, handler: H, msg: ChannelMessage) -> CommandPoll<C, H>
139 where
139 where
140 C: Connection,
140 C: Connection,
141 H: SystemHandler,
141 H: SystemHandler,
142 {
142 {
143 match msg {
143 {
144 ChannelMessage::Data(b'r', data) => {
144 match msg {
145 let code = message::parse_result_code(data)?;
145 ChannelMessage::Data(b'r', data) => {
146 Ok(AsyncS::Ready((client, handler, code)))
146 let code = message::parse_result_code(data)?;
147 }
147 Ok(AsyncS::Ready((client, handler, code)))
148 ChannelMessage::Data(..) => {
148 }
149 // just ignores data sent to optional channel
149 ChannelMessage::Data(..) => {
150 let msg_loop = MessageLoop::resume(client);
150 // just ignores data sent to optional channel
151 Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler)))
151 let msg_loop = MessageLoop::resume(client);
152 }
152 Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler)))
153 ChannelMessage::InputRequest(..) | ChannelMessage::LineRequest(..) => Err(io::Error::new(
153 }
154 io::ErrorKind::InvalidData,
154 ChannelMessage::InputRequest(..) | ChannelMessage::LineRequest(..) => Err(
155 "unsupported request",
155 io::Error::new(io::ErrorKind::InvalidData, "unsupported request"),
156 )),
156 ),
157 ChannelMessage::SystemRequest(data) => {
157 ChannelMessage::SystemRequest(data) => {
158 let (cmd_type, cmd_spec) = message::parse_command_spec(data)?;
158 let (cmd_type, cmd_spec) = message::parse_command_spec(data)?;
159 match cmd_type {
159 match cmd_type {
160 CommandType::Pager => {
160 CommandType::Pager => {
161 let fut = handler.spawn_pager(cmd_spec).into_future();
161 let fut = handler.spawn_pager(cmd_spec).into_future();
162 Ok(AsyncS::PollAgain(CommandState::SpawningPager(client, fut)))
162 Ok(AsyncS::PollAgain(CommandState::SpawningPager(client, fut)))
163 }
163 }
164 CommandType::System => {
164 CommandType::System => {
165 let fut = handler.run_system(cmd_spec).into_future();
165 let fut = handler.run_system(cmd_spec).into_future();
166 Ok(AsyncS::PollAgain(CommandState::WaitingSystem(client, fut)))
166 Ok(AsyncS::PollAgain(CommandState::WaitingSystem(client, fut)))
167 }
167 }
168 }
168 }
169 }
169 }
170 }
170 }
171 }
171 }
172 }
General Comments 0
You need to be logged in to leave comments. Login now