aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorToby Vincent <tobyv13@gmail.com>2022-04-22 01:53:42 -0500
committerToby Vincent <tobyv13@gmail.com>2022-04-22 01:53:42 -0500
commitb627ec4e0d9c8686d5d193b61e7e2496a1116f60 (patch)
tree87e24372a113a34d925e106b61a172865ba0311a
parente3fc06077d7334d627a86e38077386268a81376c (diff)
feat(zone): impl attach subcommand
-rw-r--r--zone/src/lib.rs58
-rw-r--r--zone/src/main.rs10
-rw-r--r--zone/src/ws.rs60
-rw-r--r--zone_core/src/container.rs52
-rw-r--r--zone_core/src/lib.rs8
-rw-r--r--zoned/src/error.rs12
-rw-r--r--zoned/src/http.rs46
-rw-r--r--zoned/src/ws.rs33
8 files changed, 194 insertions, 85 deletions
diff --git a/zone/src/lib.rs b/zone/src/lib.rs
index 48893e2..1f0b758 100644
--- a/zone/src/lib.rs
+++ b/zone/src/lib.rs
@@ -1,12 +1,16 @@
use anyhow::{Context, Result};
-use clap::{ArgEnum, Args, ErrorKind, IntoApp, Parser, Subcommand, ValueHint};
+use clap::{Args, ErrorKind, IntoApp, Parser, Subcommand, ValueHint};
use clap_complete::{generate, Shell};
-use log::LevelFilter;
+use futures::{SinkExt, StreamExt};
+use log::{debug, LevelFilter};
use reqwest::Url;
use std::{ffi::OsString, io, process::Command};
use tabled::{Style, Table};
+use tokio_tungstenite::{connect_async, tungstenite::Message};
use zone_core::Container;
+mod ws;
+
#[derive(Debug, Parser)]
#[clap(about, version)]
pub struct Cli {
@@ -47,6 +51,8 @@ pub enum Commands {
/// List existing containers
List(List),
+ Attach(Container),
+
/// Create a container
///
/// Create a new container from an existing template.
@@ -61,8 +67,6 @@ pub struct List {
pub filter: Option<String>,
}
-impl List {}
-
#[derive(Debug, Args)]
pub struct Create {
#[clap(short, long)]
@@ -72,12 +76,13 @@ pub struct Create {
}
impl Cli {
- pub fn run(self) -> Result<Option<String>> {
+ pub async fn run(self) -> Result<()> {
match self.command {
Commands::Completion { shell } => self.completion(shell),
Commands::List(ref list) => self.list(&list.filter),
Commands::External(args) => Cli::external(args),
Commands::Create(ref create) => self.create(create),
+ Commands::Attach(ref opts) => self.attach(opts).await,
}
}
@@ -92,33 +97,33 @@ impl Cli {
}
}
- fn external(args: Vec<OsString>) -> Result<Option<String>> {
+ fn external(args: Vec<OsString>) -> Result<()> {
Command::new(format!("zone-{:?}", &args[0]))
.args(&args[1..])
.spawn()
.unwrap_or_else(|_| {
- let mut app = Cli::into_app();
- app.error(
+ let mut cmd = Cli::command();
+ cmd.error(
ErrorKind::UnrecognizedSubcommand,
format!("Unrecognized subcommand '{:?}'", &args[0]),
)
.exit()
});
- Ok(None)
+ Ok(())
}
- fn completion(&self, gen: Shell) -> Result<Option<String>> {
+ fn completion(&self, gen: Shell) -> Result<()> {
eprintln!("Generating completion file for {:?}...", gen);
- let mut app = Cli::into_app();
- let bin_name = app.get_name().to_string();
+ let mut cmd = Cli::command();
+ let bin_name = cmd.get_name().to_string();
let buf = &mut io::stdout();
- generate(gen, &mut app, bin_name, buf);
+ generate(gen, &mut cmd, bin_name, buf);
- Ok(None)
+ Ok(())
}
- fn create(&self, create: &Create) -> Result<Option<String>> {
+ fn create(&self, create: &Create) -> Result<()> {
let client = reqwest::blocking::Client::new();
let mut url = self.endpoint.to_owned();
url.set_path("/container");
@@ -136,12 +141,11 @@ impl Cli {
.json::<Container>()
.context("Failed to parse json")?;
- Ok(Some(
- Table::new(&[container]).with(Style::NO_BORDER).to_string(),
- ))
+ println!("{}", Table::new(&[container]).with(Style::blank()));
+ Ok(())
}
- fn list(&self, query: &Option<String>) -> Result<Option<String>> {
+ fn list(&self, query: &Option<String>) -> Result<()> {
let mut url = self.endpoint.to_owned();
url.set_path("containers/list");
url.set_query(query.as_deref());
@@ -151,9 +155,17 @@ impl Cli {
.json::<Vec<Container>>()
.context("Failed to parse json")?;
- Ok(Some(
- Table::new(containers).with(Style::NO_BORDER).to_string(),
- ))
+ println!("{}", Table::new(containers).with(Style::blank()));
+ Ok(())
+ }
+
+ async fn attach(&self, container: &Container) -> Result<()> {
+ let (mut ws_stream, response) = connect_async(self.endpoint.to_owned()).await?;
+ debug!("{:?}", response);
+ ws_stream
+ .send(Message::Text(serde_json::to_string(container)?))
+ .await?;
+ ws::handle_ws(ws_stream).await
}
}
@@ -163,6 +175,6 @@ mod tests {
fn verify_app() {
use super::Cli;
use clap::IntoApp;
- Cli::into_app().debug_assert()
+ Cli::command().debug_assert()
}
}
diff --git a/zone/src/main.rs b/zone/src/main.rs
index 054633a..60df17d 100644
--- a/zone/src/main.rs
+++ b/zone/src/main.rs
@@ -1,8 +1,9 @@
+use anyhow::Result;
use clap::Parser;
-use log::error;
use zone::Cli;
-fn main() {
+#[tokio::main]
+async fn main() -> Result<()> {
let cli = Cli::parse();
simplelog::TermLogger::init(
@@ -13,8 +14,5 @@ fn main() {
)
.unwrap();
- if let Err(err) = Cli::parse().run() {
- error!("{}", err);
- std::process::exit(1);
- }
+ cli.run().await
}
diff --git a/zone/src/ws.rs b/zone/src/ws.rs
new file mode 100644
index 0000000..16ba8fd
--- /dev/null
+++ b/zone/src/ws.rs
@@ -0,0 +1,60 @@
+use anyhow::Result;
+use futures::{
+ stream::{SplitSink, SplitStream},
+ SinkExt, StreamExt,
+};
+use tokio::{
+ io::{AsyncReadExt, AsyncWriteExt, Stdin, Stdout},
+ net::TcpStream,
+ sync::mpsc,
+};
+use tokio_tungstenite::{tungstenite::Message, MaybeTlsStream, WebSocketStream};
+
+pub(crate) async fn handle_ws(ws_stream: WebSocketStream<MaybeTlsStream<TcpStream>>) -> Result<()> {
+ let (tx, rx) = mpsc::unbounded_channel();
+ let (sender, receiver) = ws_stream.split();
+
+ let stdin = tokio::io::stdin();
+ let stdout = tokio::io::stdout();
+
+ tokio::select! {
+ res = stdin_to_tx(tx, stdin) => res,
+ res = rx_to_ws(rx, sender) => res,
+ res = ws_to_stdout(receiver, stdout) => res,
+ }
+}
+
+async fn rx_to_ws(
+ mut rx: mpsc::UnboundedReceiver<Message>,
+ mut sender: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
+) -> Result<()> {
+ while let Some(msg) = rx.recv().await {
+ sender.send(msg).await?;
+ }
+ Ok(())
+}
+
+async fn stdin_to_tx(tx: mpsc::UnboundedSender<Message>, mut stdin: Stdin) -> Result<()> {
+ loop {
+ let mut buf = vec![0; 1024];
+ let n = match stdin.read(&mut buf).await {
+ Err(_) | Ok(0) => break,
+ Ok(n) => n,
+ };
+ buf.truncate(n);
+ tx.send(Message::binary(buf))?
+ }
+ Ok(())
+}
+
+async fn ws_to_stdout(
+ mut receiver: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
+ mut stdout: Stdout,
+) -> Result<()> {
+ while let Some(Ok(msg)) = receiver.next().await {
+ let data = msg.into_data();
+ stdout.write_all(&data).await?;
+ }
+
+ Ok(())
+}
diff --git a/zone_core/src/container.rs b/zone_core/src/container.rs
index 2020b20..330705e 100644
--- a/zone_core/src/container.rs
+++ b/zone_core/src/container.rs
@@ -10,7 +10,7 @@ pub use status::ContainerStatus;
mod status;
-#[derive(Debug, Default, Serialize, Deserialize, Builder, Tabled, Clone, Args)]
+#[derive(Debug, PartialEq, Default, Serialize, Deserialize, Builder, Tabled, Clone, Args)]
#[builder(
name = "ContainerOptions",
derive(Debug, Serialize, Deserialize),
@@ -26,9 +26,6 @@ pub struct Container {
#[tabled("User")]
pub user: String,
-
- #[tabled("Status")]
- pub status: ContainerStatus,
}
impl Container {
@@ -37,31 +34,50 @@ impl Container {
}
}
-#[derive(Debug, Serialize, Deserialize, Clone, Args)]
-pub struct CloneOptions {
- pub template: String,
- pub user: String,
+impl From<Container> for String {
+ fn from(val: Container) -> Self {
+ format!("{}-{}-{}", val.user, val.template, val.id)
+ }
}
-#[derive(Debug, Serialize, Deserialize, Clone, Args)]
-pub struct WebSocketOptions {
- pub id: u64,
- pub template: String,
- pub user: String,
+impl From<Container> for ContainerOptions {
+ fn from(val: Container) -> Self {
+ Self {
+ id: Some(val.id),
+ template: Some(val.template),
+ user: Some(val.user),
+ }
+ }
}
-impl From<WebSocketOptions> for String {
- fn from(val: WebSocketOptions) -> Self {
- format!("{}-{}-{}", val.user, val.template, val.id)
+impl From<&Container> for &ContainerOptions {
+ fn from(val: &Container) -> Self {
+ ContainerOptions {
+ id: Some(val.id.to_owned()),
+ template: Some(val.template.to_owned()),
+ user: Some(val.user.to_owned()),
+ }.into()
+ }
+}
+
+impl From<ContainerOptions> for &ContainerOptions {
+ fn from(val: ContainerOptions) -> Self {
+ val.into()
}
}
+#[derive(Debug, Serialize, Deserialize, Clone, Args)]
+pub struct CloneOptions {
+ pub template: String,
+ pub user: String,
+}
+
impl<T> FilterContainer for T
where
T: Iterator,
T::Item: TryInto<Container>,
{
- fn filter_container(&mut self, pred: ContainerOptions) -> Vec<Container> {
+ fn filter_container(&mut self, pred: &ContainerOptions) -> Vec<Container> {
self.filter_map(|c| -> Option<Container> { c.try_into().ok() })
.filter(|c| {
pred.id.map_or(false, |p| p == c.id)
@@ -107,7 +123,6 @@ impl TryFrom<zone_zfs::FileSystem> for Container {
id,
template,
user: user.to_string(),
- status: ContainerStatus::default(),
})
}
}
@@ -126,7 +141,6 @@ impl TryFrom<zone_nspawn::Container> for Container {
})?,
template: v[1].to_owned(),
user: v[0].to_owned(),
- status: ContainerStatus::Running,
})
}
}
diff --git a/zone_core/src/lib.rs b/zone_core/src/lib.rs
index e612766..17cc173 100644
--- a/zone_core/src/lib.rs
+++ b/zone_core/src/lib.rs
@@ -1,8 +1,6 @@
use std::net::{IpAddr, Ipv4Addr};
-pub use crate::container::{
- CloneOptions, Container, ContainerOptions, ContainerStatus, WebSocketOptions,
-};
+pub use crate::container::{CloneOptions, Container, ContainerOptions, ContainerStatus};
pub use crate::error::{Error, Result};
mod error;
@@ -11,8 +9,8 @@ pub static DEFAULT_IP_ADDRESS: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
pub static DEFAULT_PORT: u16 = 8000;
pub trait FilterContainer {
- fn filter_container(&mut self, predicate: ContainerOptions) -> Vec<Container>;
- fn find_container(&mut self, predicate: ContainerOptions) -> Option<Container> {
+ fn filter_container(&mut self, predicate: &ContainerOptions) -> Vec<Container>;
+ fn find_container(&mut self, predicate: &ContainerOptions) -> Option<Container> {
self.filter_container(predicate).into_iter().next()
}
}
diff --git a/zoned/src/error.rs b/zoned/src/error.rs
index 922a352..0c28e48 100644
--- a/zoned/src/error.rs
+++ b/zoned/src/error.rs
@@ -20,9 +20,15 @@ pub enum Error {
#[error("Container Error: {0:?}")]
Container(String),
+ #[error("WebSocket Error: {0:?}")]
+ WebSocket(String),
+
#[error("Container not found")]
NotFound,
+ #[error("Missing initialization message")]
+ WebSocketMissingInit,
+
#[error("ZFS Error: {source:?}")]
ZFS {
#[from]
@@ -59,6 +65,12 @@ pub enum Error {
source: std::io::Error,
},
+ #[error("Json Error: {source:?}")]
+ Json {
+ #[from]
+ source: serde_json::error::Error,
+ },
+
#[error("Send Error: {source:?}")]
Send {
#[from]
diff --git a/zoned/src/http.rs b/zoned/src/http.rs
index dda3296..8d388b1 100644
--- a/zoned/src/http.rs
+++ b/zoned/src/http.rs
@@ -7,9 +7,7 @@ use axum::{
};
use std::{process::Command, sync::Arc};
use tracing::{info, instrument, warn};
-use zone_core::{
- CloneOptions, Container, ContainerOptions, ContainerStatus, FilterContainer, WebSocketOptions,
-};
+use zone_core::{CloneOptions, Container, ContainerOptions, ContainerStatus, FilterContainer};
use crate::{ws, Error, Result, State};
@@ -19,7 +17,7 @@ pub fn build_routes() -> Router {
.route("/test", get(test_endpoint))
.route("/container", post(clone_container))
.route("/container/list?<container..>", get(container_list))
- .route("/ws", get(ws_upgrade))
+ .route("/container/attach", get(ws_upgrade))
}
/// # Test endpoint
@@ -35,19 +33,22 @@ async fn test_endpoint(Extension(state): Extension<Arc<State>>) -> Json<String>
/// Returns a list of containers based on the query.
#[instrument(err, ret)]
async fn container_list(
- container: Option<Query<ContainerOptions>>,
+ Query(params): Query<ContainerOptions>,
Extension(state): Extension<Arc<State>>,
) -> Result<Json<Vec<Container>>> {
- let mut containers = state.nspawn.list()?.into_iter().filter_map(|c| {
- Container::try_from(c)
- .map_err(|err| warn!("Ignoring invalid nspawn container {:?}", err))
- .ok()
- });
+ let containers = state
+ .nspawn
+ .list()?
+ .into_iter()
+ .filter_map(|c| {
+ Container::try_from(c)
+ .map_err(|err| warn!("Ignoring invalid nspawn container: {:?}", err))
+ .ok()
+ })
+ .filter_container(&params)
+ .into();
- match container {
- Some(Query(params)) => Ok(containers.filter_container(params).into()),
- _ => Ok(containers.collect::<Vec<_>>().into()),
- }
+ Ok(containers)
}
/// Create container
@@ -57,7 +58,7 @@ async fn container_list(
async fn clone_container(
Json(container): Json<CloneOptions>,
Extension(state): Extension<Arc<State>>,
-) -> Result<Json<Container>> {
+) -> Result<Json<(Container, ContainerStatus)>> {
let predicate = Container::builder()
.user(container.user.to_owned())
.template(container.template.to_owned())
@@ -67,7 +68,7 @@ async fn clone_container(
.zfs
.get_file_systems()?
.into_iter()
- .filter_container(predicate)
+ .filter_container(&predicate)
.into_iter()
.max_by_key(|c| c.id)
.map_or(0, |c| c.id + 1);
@@ -92,13 +93,13 @@ async fn clone_container(
let name = format!("{}-{}-{}", container.template, container.user, id);
state.nspawn.create(root, name)?;
-
- Ok(Json::from(Container {
+ let container = Container {
id,
template: container.template,
user: container.user,
- status: ContainerStatus::Running,
- }))
+ };
+
+ Ok(Json::from((container, ContainerStatus::Running)))
}
/// Upgrade to websocket
@@ -107,14 +108,13 @@ async fn clone_container(
#[instrument(ret, skip_all)]
async fn ws_upgrade(
ws: WebSocketUpgrade,
- Json(options): Json<WebSocketOptions>,
- Extension(state): Extension<Arc<State>>,
user_agent: Option<TypedHeader<headers::UserAgent>>,
+ Extension(state): Extension<Arc<State>>,
) -> impl IntoResponse {
let agent = user_agent.map_or("Unknown".to_string(), |u| u.to_string());
info!(%agent, "Client connected");
- ws.on_upgrade(|socket| ws::handler(socket, options, state))
+ ws.on_upgrade(|socket| ws::handler(socket, state))
}
#[cfg(test)]
diff --git a/zoned/src/ws.rs b/zoned/src/ws.rs
index b7714da..1f5a6fc 100644
--- a/zoned/src/ws.rs
+++ b/zoned/src/ws.rs
@@ -13,9 +13,9 @@ use tokio::{
};
use tracing::{instrument, warn};
use wspty::PtyMaster;
-use zone_core::WebSocketOptions;
+use zone_core::{Container, FilterContainer};
-use crate::{Result, State};
+use crate::{Error, Result, State};
#[derive(Deserialize, Debug)]
struct WindowSize {
@@ -23,17 +23,32 @@ struct WindowSize {
rows: u16,
}
-#[instrument(err, skip_all)]
-pub async fn handler(
- ws_stream: WebSocket,
- options: WebSocketOptions,
- state: Arc<State>,
-) -> Result<()> {
+// #[instrument(err, skip_all)]
+pub async fn handler(mut ws_stream: WebSocket, state: Arc<State>) -> Result<()> {
+ let msg = loop {
+ match ws_stream.next().await {
+ Some(Ok(Message::Text(msg))) => break msg,
+ Some(Ok(_)) => return Err(Error::WebSocketMissingInit),
+ Some(Err(err)) => return Err(err.into()),
+ None => continue,
+ };
+ };
+
+ let container =
+ serde_json::from_str::<Container>(&msg).map_err(|_| Error::WebSocketMissingInit)?;
+
+ state
+ .nspawn
+ .list()?
+ .into_iter()
+ .find_container((&container).into())
+ .ok_or(Error::NotFound)?;
+
let (sender, receiver) = ws_stream.split();
let (kill_tx, kill_rx) = mpsc::unbounded_channel();
let (tx, rx) = mpsc::channel(1024);
- let pty = state.nspawn.attach(options.into(), kill_rx).await?;
+ let pty = state.nspawn.attach(container.into(), kill_rx).await?;
tokio::select! {
res = msg_handler(receiver, pty.clone(), tx.clone(), kill_tx) => res,