diff options
author | Toby Vincent <tobyv13@gmail.com> | 2022-04-22 01:53:42 -0500 |
---|---|---|
committer | Toby Vincent <tobyv13@gmail.com> | 2022-04-22 01:53:42 -0500 |
commit | b627ec4e0d9c8686d5d193b61e7e2496a1116f60 (patch) | |
tree | 87e24372a113a34d925e106b61a172865ba0311a | |
parent | e3fc06077d7334d627a86e38077386268a81376c (diff) |
feat(zone): impl attach subcommand
-rw-r--r-- | zone/src/lib.rs | 58 | ||||
-rw-r--r-- | zone/src/main.rs | 10 | ||||
-rw-r--r-- | zone/src/ws.rs | 60 | ||||
-rw-r--r-- | zone_core/src/container.rs | 52 | ||||
-rw-r--r-- | zone_core/src/lib.rs | 8 | ||||
-rw-r--r-- | zoned/src/error.rs | 12 | ||||
-rw-r--r-- | zoned/src/http.rs | 46 | ||||
-rw-r--r-- | zoned/src/ws.rs | 33 |
8 files changed, 194 insertions, 85 deletions
diff --git a/zone/src/lib.rs b/zone/src/lib.rs index 48893e2..1f0b758 100644 --- a/zone/src/lib.rs +++ b/zone/src/lib.rs @@ -1,12 +1,16 @@ use anyhow::{Context, Result}; -use clap::{ArgEnum, Args, ErrorKind, IntoApp, Parser, Subcommand, ValueHint}; +use clap::{Args, ErrorKind, IntoApp, Parser, Subcommand, ValueHint}; use clap_complete::{generate, Shell}; -use log::LevelFilter; +use futures::{SinkExt, StreamExt}; +use log::{debug, LevelFilter}; use reqwest::Url; use std::{ffi::OsString, io, process::Command}; use tabled::{Style, Table}; +use tokio_tungstenite::{connect_async, tungstenite::Message}; use zone_core::Container; +mod ws; + #[derive(Debug, Parser)] #[clap(about, version)] pub struct Cli { @@ -47,6 +51,8 @@ pub enum Commands { /// List existing containers List(List), + Attach(Container), + /// Create a container /// /// Create a new container from an existing template. @@ -61,8 +67,6 @@ pub struct List { pub filter: Option<String>, } -impl List {} - #[derive(Debug, Args)] pub struct Create { #[clap(short, long)] @@ -72,12 +76,13 @@ pub struct Create { } impl Cli { - pub fn run(self) -> Result<Option<String>> { + pub async fn run(self) -> Result<()> { match self.command { Commands::Completion { shell } => self.completion(shell), Commands::List(ref list) => self.list(&list.filter), Commands::External(args) => Cli::external(args), Commands::Create(ref create) => self.create(create), + Commands::Attach(ref opts) => self.attach(opts).await, } } @@ -92,33 +97,33 @@ impl Cli { } } - fn external(args: Vec<OsString>) -> Result<Option<String>> { + fn external(args: Vec<OsString>) -> Result<()> { Command::new(format!("zone-{:?}", &args[0])) .args(&args[1..]) .spawn() .unwrap_or_else(|_| { - let mut app = Cli::into_app(); - app.error( + let mut cmd = Cli::command(); + cmd.error( ErrorKind::UnrecognizedSubcommand, format!("Unrecognized subcommand '{:?}'", &args[0]), ) .exit() }); - Ok(None) + Ok(()) } - fn completion(&self, gen: Shell) -> Result<Option<String>> { + fn completion(&self, gen: Shell) -> Result<()> { eprintln!("Generating completion file for {:?}...", gen); - let mut app = Cli::into_app(); - let bin_name = app.get_name().to_string(); + let mut cmd = Cli::command(); + let bin_name = cmd.get_name().to_string(); let buf = &mut io::stdout(); - generate(gen, &mut app, bin_name, buf); + generate(gen, &mut cmd, bin_name, buf); - Ok(None) + Ok(()) } - fn create(&self, create: &Create) -> Result<Option<String>> { + fn create(&self, create: &Create) -> Result<()> { let client = reqwest::blocking::Client::new(); let mut url = self.endpoint.to_owned(); url.set_path("/container"); @@ -136,12 +141,11 @@ impl Cli { .json::<Container>() .context("Failed to parse json")?; - Ok(Some( - Table::new(&[container]).with(Style::NO_BORDER).to_string(), - )) + println!("{}", Table::new(&[container]).with(Style::blank())); + Ok(()) } - fn list(&self, query: &Option<String>) -> Result<Option<String>> { + fn list(&self, query: &Option<String>) -> Result<()> { let mut url = self.endpoint.to_owned(); url.set_path("containers/list"); url.set_query(query.as_deref()); @@ -151,9 +155,17 @@ impl Cli { .json::<Vec<Container>>() .context("Failed to parse json")?; - Ok(Some( - Table::new(containers).with(Style::NO_BORDER).to_string(), - )) + println!("{}", Table::new(containers).with(Style::blank())); + Ok(()) + } + + async fn attach(&self, container: &Container) -> Result<()> { + let (mut ws_stream, response) = connect_async(self.endpoint.to_owned()).await?; + debug!("{:?}", response); + ws_stream + .send(Message::Text(serde_json::to_string(container)?)) + .await?; + ws::handle_ws(ws_stream).await } } @@ -163,6 +175,6 @@ mod tests { fn verify_app() { use super::Cli; use clap::IntoApp; - Cli::into_app().debug_assert() + Cli::command().debug_assert() } } diff --git a/zone/src/main.rs b/zone/src/main.rs index 054633a..60df17d 100644 --- a/zone/src/main.rs +++ b/zone/src/main.rs @@ -1,8 +1,9 @@ +use anyhow::Result; use clap::Parser; -use log::error; use zone::Cli; -fn main() { +#[tokio::main] +async fn main() -> Result<()> { let cli = Cli::parse(); simplelog::TermLogger::init( @@ -13,8 +14,5 @@ fn main() { ) .unwrap(); - if let Err(err) = Cli::parse().run() { - error!("{}", err); - std::process::exit(1); - } + cli.run().await } diff --git a/zone/src/ws.rs b/zone/src/ws.rs new file mode 100644 index 0000000..16ba8fd --- /dev/null +++ b/zone/src/ws.rs @@ -0,0 +1,60 @@ +use anyhow::Result; +use futures::{ + stream::{SplitSink, SplitStream}, + SinkExt, StreamExt, +}; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt, Stdin, Stdout}, + net::TcpStream, + sync::mpsc, +}; +use tokio_tungstenite::{tungstenite::Message, MaybeTlsStream, WebSocketStream}; + +pub(crate) async fn handle_ws(ws_stream: WebSocketStream<MaybeTlsStream<TcpStream>>) -> Result<()> { + let (tx, rx) = mpsc::unbounded_channel(); + let (sender, receiver) = ws_stream.split(); + + let stdin = tokio::io::stdin(); + let stdout = tokio::io::stdout(); + + tokio::select! { + res = stdin_to_tx(tx, stdin) => res, + res = rx_to_ws(rx, sender) => res, + res = ws_to_stdout(receiver, stdout) => res, + } +} + +async fn rx_to_ws( + mut rx: mpsc::UnboundedReceiver<Message>, + mut sender: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>, +) -> Result<()> { + while let Some(msg) = rx.recv().await { + sender.send(msg).await?; + } + Ok(()) +} + +async fn stdin_to_tx(tx: mpsc::UnboundedSender<Message>, mut stdin: Stdin) -> Result<()> { + loop { + let mut buf = vec![0; 1024]; + let n = match stdin.read(&mut buf).await { + Err(_) | Ok(0) => break, + Ok(n) => n, + }; + buf.truncate(n); + tx.send(Message::binary(buf))? + } + Ok(()) +} + +async fn ws_to_stdout( + mut receiver: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>, + mut stdout: Stdout, +) -> Result<()> { + while let Some(Ok(msg)) = receiver.next().await { + let data = msg.into_data(); + stdout.write_all(&data).await?; + } + + Ok(()) +} diff --git a/zone_core/src/container.rs b/zone_core/src/container.rs index 2020b20..330705e 100644 --- a/zone_core/src/container.rs +++ b/zone_core/src/container.rs @@ -10,7 +10,7 @@ pub use status::ContainerStatus; mod status; -#[derive(Debug, Default, Serialize, Deserialize, Builder, Tabled, Clone, Args)] +#[derive(Debug, PartialEq, Default, Serialize, Deserialize, Builder, Tabled, Clone, Args)] #[builder( name = "ContainerOptions", derive(Debug, Serialize, Deserialize), @@ -26,9 +26,6 @@ pub struct Container { #[tabled("User")] pub user: String, - - #[tabled("Status")] - pub status: ContainerStatus, } impl Container { @@ -37,31 +34,50 @@ impl Container { } } -#[derive(Debug, Serialize, Deserialize, Clone, Args)] -pub struct CloneOptions { - pub template: String, - pub user: String, +impl From<Container> for String { + fn from(val: Container) -> Self { + format!("{}-{}-{}", val.user, val.template, val.id) + } } -#[derive(Debug, Serialize, Deserialize, Clone, Args)] -pub struct WebSocketOptions { - pub id: u64, - pub template: String, - pub user: String, +impl From<Container> for ContainerOptions { + fn from(val: Container) -> Self { + Self { + id: Some(val.id), + template: Some(val.template), + user: Some(val.user), + } + } } -impl From<WebSocketOptions> for String { - fn from(val: WebSocketOptions) -> Self { - format!("{}-{}-{}", val.user, val.template, val.id) +impl From<&Container> for &ContainerOptions { + fn from(val: &Container) -> Self { + ContainerOptions { + id: Some(val.id.to_owned()), + template: Some(val.template.to_owned()), + user: Some(val.user.to_owned()), + }.into() + } +} + +impl From<ContainerOptions> for &ContainerOptions { + fn from(val: ContainerOptions) -> Self { + val.into() } } +#[derive(Debug, Serialize, Deserialize, Clone, Args)] +pub struct CloneOptions { + pub template: String, + pub user: String, +} + impl<T> FilterContainer for T where T: Iterator, T::Item: TryInto<Container>, { - fn filter_container(&mut self, pred: ContainerOptions) -> Vec<Container> { + fn filter_container(&mut self, pred: &ContainerOptions) -> Vec<Container> { self.filter_map(|c| -> Option<Container> { c.try_into().ok() }) .filter(|c| { pred.id.map_or(false, |p| p == c.id) @@ -107,7 +123,6 @@ impl TryFrom<zone_zfs::FileSystem> for Container { id, template, user: user.to_string(), - status: ContainerStatus::default(), }) } } @@ -126,7 +141,6 @@ impl TryFrom<zone_nspawn::Container> for Container { })?, template: v[1].to_owned(), user: v[0].to_owned(), - status: ContainerStatus::Running, }) } } diff --git a/zone_core/src/lib.rs b/zone_core/src/lib.rs index e612766..17cc173 100644 --- a/zone_core/src/lib.rs +++ b/zone_core/src/lib.rs @@ -1,8 +1,6 @@ use std::net::{IpAddr, Ipv4Addr}; -pub use crate::container::{ - CloneOptions, Container, ContainerOptions, ContainerStatus, WebSocketOptions, -}; +pub use crate::container::{CloneOptions, Container, ContainerOptions, ContainerStatus}; pub use crate::error::{Error, Result}; mod error; @@ -11,8 +9,8 @@ pub static DEFAULT_IP_ADDRESS: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); pub static DEFAULT_PORT: u16 = 8000; pub trait FilterContainer { - fn filter_container(&mut self, predicate: ContainerOptions) -> Vec<Container>; - fn find_container(&mut self, predicate: ContainerOptions) -> Option<Container> { + fn filter_container(&mut self, predicate: &ContainerOptions) -> Vec<Container>; + fn find_container(&mut self, predicate: &ContainerOptions) -> Option<Container> { self.filter_container(predicate).into_iter().next() } } diff --git a/zoned/src/error.rs b/zoned/src/error.rs index 922a352..0c28e48 100644 --- a/zoned/src/error.rs +++ b/zoned/src/error.rs @@ -20,9 +20,15 @@ pub enum Error { #[error("Container Error: {0:?}")] Container(String), + #[error("WebSocket Error: {0:?}")] + WebSocket(String), + #[error("Container not found")] NotFound, + #[error("Missing initialization message")] + WebSocketMissingInit, + #[error("ZFS Error: {source:?}")] ZFS { #[from] @@ -59,6 +65,12 @@ pub enum Error { source: std::io::Error, }, + #[error("Json Error: {source:?}")] + Json { + #[from] + source: serde_json::error::Error, + }, + #[error("Send Error: {source:?}")] Send { #[from] diff --git a/zoned/src/http.rs b/zoned/src/http.rs index dda3296..8d388b1 100644 --- a/zoned/src/http.rs +++ b/zoned/src/http.rs @@ -7,9 +7,7 @@ use axum::{ }; use std::{process::Command, sync::Arc}; use tracing::{info, instrument, warn}; -use zone_core::{ - CloneOptions, Container, ContainerOptions, ContainerStatus, FilterContainer, WebSocketOptions, -}; +use zone_core::{CloneOptions, Container, ContainerOptions, ContainerStatus, FilterContainer}; use crate::{ws, Error, Result, State}; @@ -19,7 +17,7 @@ pub fn build_routes() -> Router { .route("/test", get(test_endpoint)) .route("/container", post(clone_container)) .route("/container/list?<container..>", get(container_list)) - .route("/ws", get(ws_upgrade)) + .route("/container/attach", get(ws_upgrade)) } /// # Test endpoint @@ -35,19 +33,22 @@ async fn test_endpoint(Extension(state): Extension<Arc<State>>) -> Json<String> /// Returns a list of containers based on the query. #[instrument(err, ret)] async fn container_list( - container: Option<Query<ContainerOptions>>, + Query(params): Query<ContainerOptions>, Extension(state): Extension<Arc<State>>, ) -> Result<Json<Vec<Container>>> { - let mut containers = state.nspawn.list()?.into_iter().filter_map(|c| { - Container::try_from(c) - .map_err(|err| warn!("Ignoring invalid nspawn container {:?}", err)) - .ok() - }); + let containers = state + .nspawn + .list()? + .into_iter() + .filter_map(|c| { + Container::try_from(c) + .map_err(|err| warn!("Ignoring invalid nspawn container: {:?}", err)) + .ok() + }) + .filter_container(¶ms) + .into(); - match container { - Some(Query(params)) => Ok(containers.filter_container(params).into()), - _ => Ok(containers.collect::<Vec<_>>().into()), - } + Ok(containers) } /// Create container @@ -57,7 +58,7 @@ async fn container_list( async fn clone_container( Json(container): Json<CloneOptions>, Extension(state): Extension<Arc<State>>, -) -> Result<Json<Container>> { +) -> Result<Json<(Container, ContainerStatus)>> { let predicate = Container::builder() .user(container.user.to_owned()) .template(container.template.to_owned()) @@ -67,7 +68,7 @@ async fn clone_container( .zfs .get_file_systems()? .into_iter() - .filter_container(predicate) + .filter_container(&predicate) .into_iter() .max_by_key(|c| c.id) .map_or(0, |c| c.id + 1); @@ -92,13 +93,13 @@ async fn clone_container( let name = format!("{}-{}-{}", container.template, container.user, id); state.nspawn.create(root, name)?; - - Ok(Json::from(Container { + let container = Container { id, template: container.template, user: container.user, - status: ContainerStatus::Running, - })) + }; + + Ok(Json::from((container, ContainerStatus::Running))) } /// Upgrade to websocket @@ -107,14 +108,13 @@ async fn clone_container( #[instrument(ret, skip_all)] async fn ws_upgrade( ws: WebSocketUpgrade, - Json(options): Json<WebSocketOptions>, - Extension(state): Extension<Arc<State>>, user_agent: Option<TypedHeader<headers::UserAgent>>, + Extension(state): Extension<Arc<State>>, ) -> impl IntoResponse { let agent = user_agent.map_or("Unknown".to_string(), |u| u.to_string()); info!(%agent, "Client connected"); - ws.on_upgrade(|socket| ws::handler(socket, options, state)) + ws.on_upgrade(|socket| ws::handler(socket, state)) } #[cfg(test)] diff --git a/zoned/src/ws.rs b/zoned/src/ws.rs index b7714da..1f5a6fc 100644 --- a/zoned/src/ws.rs +++ b/zoned/src/ws.rs @@ -13,9 +13,9 @@ use tokio::{ }; use tracing::{instrument, warn}; use wspty::PtyMaster; -use zone_core::WebSocketOptions; +use zone_core::{Container, FilterContainer}; -use crate::{Result, State}; +use crate::{Error, Result, State}; #[derive(Deserialize, Debug)] struct WindowSize { @@ -23,17 +23,32 @@ struct WindowSize { rows: u16, } -#[instrument(err, skip_all)] -pub async fn handler( - ws_stream: WebSocket, - options: WebSocketOptions, - state: Arc<State>, -) -> Result<()> { +// #[instrument(err, skip_all)] +pub async fn handler(mut ws_stream: WebSocket, state: Arc<State>) -> Result<()> { + let msg = loop { + match ws_stream.next().await { + Some(Ok(Message::Text(msg))) => break msg, + Some(Ok(_)) => return Err(Error::WebSocketMissingInit), + Some(Err(err)) => return Err(err.into()), + None => continue, + }; + }; + + let container = + serde_json::from_str::<Container>(&msg).map_err(|_| Error::WebSocketMissingInit)?; + + state + .nspawn + .list()? + .into_iter() + .find_container((&container).into()) + .ok_or(Error::NotFound)?; + let (sender, receiver) = ws_stream.split(); let (kill_tx, kill_rx) = mpsc::unbounded_channel(); let (tx, rx) = mpsc::channel(1024); - let pty = state.nspawn.attach(options.into(), kill_rx).await?; + let pty = state.nspawn.attach(container.into(), kill_rx).await?; tokio::select! { res = msg_handler(receiver, pty.clone(), tx.clone(), kill_tx) => res, |