diff --git a/rust/chg/src/lib.rs b/rust/chg/src/lib.rs --- a/rust/chg/src/lib.rs +++ b/rust/chg/src/lib.rs @@ -14,6 +14,7 @@ extern crate tokio_process; pub mod attachio; pub mod message; pub mod procutil; +pub mod runcommand; mod uihandler; pub use uihandler::{ChgUiHandler, SystemHandler}; diff --git a/rust/chg/src/runcommand.rs b/rust/chg/src/runcommand.rs new file mode 100644 --- /dev/null +++ b/rust/chg/src/runcommand.rs @@ -0,0 +1,163 @@ +// Copyright 2018 Yuya Nishihara +// +// This software may be used and distributed according to the terms of the +// GNU General Public License version 2 or any later version. + +//! Functions to run Mercurial command in cHg-aware command server. + +use bytes::Bytes; +use futures::future::IntoFuture; +use futures::{Async, Future, Poll}; +use std::io; +use std::mem; +use std::os::unix::io::AsRawFd; +use tokio_hglib::{Client, Connection}; +use tokio_hglib::codec::ChannelMessage; +use tokio_hglib::protocol::MessageLoop; + +use super::attachio::AttachIo; +use super::message::{self, CommandType}; +use super::uihandler::SystemHandler; + +enum AsyncS { + Ready(R), + NotReady(S), + PollAgain(S), +} + +enum CommandState + where C: Connection, + H: SystemHandler, +{ + Running(MessageLoop, H), + SpawningPager(Client, ::Future), + AttachingPager(AttachIo, H), + WaitingSystem(Client, ::Future), + Finished, +} + +type CommandPoll = io::Result<(AsyncS<(Client, H, i32), CommandState>)>; + +/// Future resolves to `(exit_code, client)`. +#[must_use = "futures do nothing unless polled"] +pub struct ChgRunCommand + where C: Connection, + H: SystemHandler, +{ + state: CommandState, +} + +impl ChgRunCommand + where C: Connection + AsRawFd, + H: SystemHandler, +{ + pub fn with_client(client: Client, handler: H, packed_args: Bytes) + -> ChgRunCommand { + let msg_loop = MessageLoop::start_with_args(client, b"runcommand", packed_args); + ChgRunCommand { + state: CommandState::Running(msg_loop, handler), + } + } +} + +impl Future for ChgRunCommand + where C: Connection + AsRawFd, + H: SystemHandler, +{ + type Item = (Client, H, i32); + type Error = io::Error; + + fn poll(&mut self) -> Poll { + loop { + let state = mem::replace(&mut self.state, CommandState::Finished); + match state.poll()? { + AsyncS::Ready((client, handler, code)) => { + return Ok(Async::Ready((client, handler, code))); + } + AsyncS::NotReady(newstate) => { + self.state = newstate; + return Ok(Async::NotReady); + } + AsyncS::PollAgain(newstate) => { + self.state = newstate; + } + } + } + } +} + +impl CommandState + where C: Connection + AsRawFd, + H: SystemHandler, +{ + fn poll(self) -> CommandPoll { + match self { + CommandState::Running(mut msg_loop, handler) => { + if let Async::Ready((client, msg)) = msg_loop.poll()? { + process_message(client, handler, msg) + } else { + Ok(AsyncS::NotReady(CommandState::Running(msg_loop, handler))) + } + } + CommandState::SpawningPager(client, mut fut) => { + if let Async::Ready((handler, pin)) = fut.poll()? { + let fut = AttachIo::with_client(client, io::stdin(), pin, None); + Ok(AsyncS::PollAgain(CommandState::AttachingPager(fut, handler))) + } else { + Ok(AsyncS::NotReady(CommandState::SpawningPager(client, fut))) + } + } + CommandState::AttachingPager(mut fut, handler) => { + if let Async::Ready(client) = fut.poll()? { + let msg_loop = MessageLoop::start(client, b""); // terminator + Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler))) + } else { + Ok(AsyncS::NotReady(CommandState::AttachingPager(fut, handler))) + } + } + CommandState::WaitingSystem(client, mut fut) => { + if let Async::Ready((handler, code)) = fut.poll()? { + let data = message::pack_result_code(code); + let msg_loop = MessageLoop::resume_with_data(client, data); + Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler))) + } else { + Ok(AsyncS::NotReady(CommandState::WaitingSystem(client, fut))) + } + } + CommandState::Finished => panic!("poll ChgRunCommand after it's done") + } + } +} + +fn process_message(client: Client, handler: H, msg: ChannelMessage) -> CommandPoll + where C: Connection, + H: SystemHandler, +{ + match msg { + ChannelMessage::Data(b'r', data) => { + let code = message::parse_result_code(data)?; + Ok(AsyncS::Ready((client, handler, code))) + } + ChannelMessage::Data(..) => { + // just ignores data sent to optional channel + let msg_loop = MessageLoop::resume(client); + Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler))) + } + ChannelMessage::InputRequest(..) | ChannelMessage::LineRequest(..) => { + Err(io::Error::new(io::ErrorKind::InvalidData, "unsupported request")) + } + ChannelMessage::SystemRequest(data) => { + let (cmd_type, cmd_spec) = message::parse_command_spec(data)?; + match cmd_type { + CommandType::Pager => { + let fut = handler.spawn_pager(cmd_spec).into_future(); + Ok(AsyncS::PollAgain(CommandState::SpawningPager(client, fut))) + } + CommandType::System => { + let fut = handler.run_system(cmd_spec).into_future(); + Ok(AsyncS::PollAgain(CommandState::WaitingSystem(client, fut))) + } + } + } + } +}