runcommand.rs
163 lines
| 5.8 KiB
| application/rls-services+xml
|
RustLexer
Yuya Nishihara
|
r40011 | // Copyright 2018 Yuya Nishihara <yuya@tcha.org> | ||
// | ||||
// This software may be used and distributed according to the terms of the | ||||
// GNU General Public License version 2 or any later version. | ||||
//! Functions to run Mercurial command in cHg-aware command server. | ||||
use bytes::Bytes; | ||||
use futures::future::IntoFuture; | ||||
use futures::{Async, Future, Poll}; | ||||
use std::io; | ||||
use std::mem; | ||||
use std::os::unix::io::AsRawFd; | ||||
use tokio_hglib::{Client, Connection}; | ||||
use tokio_hglib::codec::ChannelMessage; | ||||
use tokio_hglib::protocol::MessageLoop; | ||||
use super::attachio::AttachIo; | ||||
use super::message::{self, CommandType}; | ||||
use super::uihandler::SystemHandler; | ||||
enum AsyncS<R, S> { | ||||
Ready(R), | ||||
NotReady(S), | ||||
PollAgain(S), | ||||
} | ||||
enum CommandState<C, H> | ||||
where C: Connection, | ||||
H: SystemHandler, | ||||
{ | ||||
Running(MessageLoop<C>, H), | ||||
SpawningPager(Client<C>, <H::SpawnPagerResult as IntoFuture>::Future), | ||||
AttachingPager(AttachIo<C, io::Stdin, H::PagerStdin, H::PagerStdin>, H), | ||||
WaitingSystem(Client<C>, <H::RunSystemResult as IntoFuture>::Future), | ||||
Finished, | ||||
} | ||||
type CommandPoll<C, H> = io::Result<(AsyncS<(Client<C>, H, i32), CommandState<C, H>>)>; | ||||
/// Future resolves to `(exit_code, client)`. | ||||
#[must_use = "futures do nothing unless polled"] | ||||
pub struct ChgRunCommand<C, H> | ||||
where C: Connection, | ||||
H: SystemHandler, | ||||
{ | ||||
state: CommandState<C, H>, | ||||
} | ||||
impl<C, H> ChgRunCommand<C, H> | ||||
where C: Connection + AsRawFd, | ||||
H: SystemHandler, | ||||
{ | ||||
pub fn with_client(client: Client<C>, handler: H, packed_args: Bytes) | ||||
-> ChgRunCommand<C, H> { | ||||
let msg_loop = MessageLoop::start_with_args(client, b"runcommand", packed_args); | ||||
ChgRunCommand { | ||||
state: CommandState::Running(msg_loop, handler), | ||||
} | ||||
} | ||||
} | ||||
impl<C, H> Future for ChgRunCommand<C, H> | ||||
where C: Connection + AsRawFd, | ||||
H: SystemHandler, | ||||
{ | ||||
type Item = (Client<C>, H, i32); | ||||
type Error = io::Error; | ||||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { | ||||
loop { | ||||
let state = mem::replace(&mut self.state, CommandState::Finished); | ||||
match state.poll()? { | ||||
AsyncS::Ready((client, handler, code)) => { | ||||
return Ok(Async::Ready((client, handler, code))); | ||||
} | ||||
AsyncS::NotReady(newstate) => { | ||||
self.state = newstate; | ||||
return Ok(Async::NotReady); | ||||
} | ||||
AsyncS::PollAgain(newstate) => { | ||||
self.state = newstate; | ||||
} | ||||
} | ||||
} | ||||
} | ||||
} | ||||
impl<C, H> CommandState<C, H> | ||||
where C: Connection + AsRawFd, | ||||
H: SystemHandler, | ||||
{ | ||||
fn poll(self) -> CommandPoll<C, H> { | ||||
match self { | ||||
CommandState::Running(mut msg_loop, handler) => { | ||||
if let Async::Ready((client, msg)) = msg_loop.poll()? { | ||||
process_message(client, handler, msg) | ||||
} else { | ||||
Ok(AsyncS::NotReady(CommandState::Running(msg_loop, handler))) | ||||
} | ||||
} | ||||
CommandState::SpawningPager(client, mut fut) => { | ||||
if let Async::Ready((handler, pin)) = fut.poll()? { | ||||
let fut = AttachIo::with_client(client, io::stdin(), pin, None); | ||||
Ok(AsyncS::PollAgain(CommandState::AttachingPager(fut, handler))) | ||||
} else { | ||||
Ok(AsyncS::NotReady(CommandState::SpawningPager(client, fut))) | ||||
} | ||||
} | ||||
CommandState::AttachingPager(mut fut, handler) => { | ||||
if let Async::Ready(client) = fut.poll()? { | ||||
let msg_loop = MessageLoop::start(client, b""); // terminator | ||||
Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler))) | ||||
} else { | ||||
Ok(AsyncS::NotReady(CommandState::AttachingPager(fut, handler))) | ||||
} | ||||
} | ||||
CommandState::WaitingSystem(client, mut fut) => { | ||||
if let Async::Ready((handler, code)) = fut.poll()? { | ||||
let data = message::pack_result_code(code); | ||||
let msg_loop = MessageLoop::resume_with_data(client, data); | ||||
Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler))) | ||||
} else { | ||||
Ok(AsyncS::NotReady(CommandState::WaitingSystem(client, fut))) | ||||
} | ||||
} | ||||
CommandState::Finished => panic!("poll ChgRunCommand after it's done") | ||||
} | ||||
} | ||||
} | ||||
fn process_message<C, H>(client: Client<C>, handler: H, msg: ChannelMessage) -> CommandPoll<C, H> | ||||
where C: Connection, | ||||
H: SystemHandler, | ||||
{ | ||||
match msg { | ||||
ChannelMessage::Data(b'r', data) => { | ||||
let code = message::parse_result_code(data)?; | ||||
Ok(AsyncS::Ready((client, handler, code))) | ||||
} | ||||
ChannelMessage::Data(..) => { | ||||
// just ignores data sent to optional channel | ||||
let msg_loop = MessageLoop::resume(client); | ||||
Ok(AsyncS::PollAgain(CommandState::Running(msg_loop, handler))) | ||||
} | ||||
ChannelMessage::InputRequest(..) | ChannelMessage::LineRequest(..) => { | ||||
Err(io::Error::new(io::ErrorKind::InvalidData, "unsupported request")) | ||||
} | ||||
ChannelMessage::SystemRequest(data) => { | ||||
let (cmd_type, cmd_spec) = message::parse_command_spec(data)?; | ||||
match cmd_type { | ||||
CommandType::Pager => { | ||||
let fut = handler.spawn_pager(cmd_spec).into_future(); | ||||
Ok(AsyncS::PollAgain(CommandState::SpawningPager(client, fut))) | ||||
} | ||||
CommandType::System => { | ||||
let fut = handler.run_system(cmd_spec).into_future(); | ||||
Ok(AsyncS::PollAgain(CommandState::WaitingSystem(client, fut))) | ||||
} | ||||
} | ||||
} | ||||
} | ||||
} | ||||