diff options
author | Toby Vincent <tobyv13@gmail.com> | 2022-04-22 01:53:42 -0500 |
---|---|---|
committer | Toby Vincent <tobyv13@gmail.com> | 2022-04-22 01:53:42 -0500 |
commit | b627ec4e0d9c8686d5d193b61e7e2496a1116f60 (patch) | |
tree | 87e24372a113a34d925e106b61a172865ba0311a /zoned/src | |
parent | e3fc06077d7334d627a86e38077386268a81376c (diff) |
feat(zone): impl attach subcommand
Diffstat (limited to 'zoned/src')
-rw-r--r-- | zoned/src/error.rs | 12 | ||||
-rw-r--r-- | zoned/src/http.rs | 46 | ||||
-rw-r--r-- | zoned/src/ws.rs | 33 |
3 files changed, 59 insertions, 32 deletions
diff --git a/zoned/src/error.rs b/zoned/src/error.rs index 922a352..0c28e48 100644 --- a/zoned/src/error.rs +++ b/zoned/src/error.rs @@ -20,9 +20,15 @@ pub enum Error { #[error("Container Error: {0:?}")] Container(String), + #[error("WebSocket Error: {0:?}")] + WebSocket(String), + #[error("Container not found")] NotFound, + #[error("Missing initialization message")] + WebSocketMissingInit, + #[error("ZFS Error: {source:?}")] ZFS { #[from] @@ -59,6 +65,12 @@ pub enum Error { source: std::io::Error, }, + #[error("Json Error: {source:?}")] + Json { + #[from] + source: serde_json::error::Error, + }, + #[error("Send Error: {source:?}")] Send { #[from] diff --git a/zoned/src/http.rs b/zoned/src/http.rs index dda3296..8d388b1 100644 --- a/zoned/src/http.rs +++ b/zoned/src/http.rs @@ -7,9 +7,7 @@ use axum::{ }; use std::{process::Command, sync::Arc}; use tracing::{info, instrument, warn}; -use zone_core::{ - CloneOptions, Container, ContainerOptions, ContainerStatus, FilterContainer, WebSocketOptions, -}; +use zone_core::{CloneOptions, Container, ContainerOptions, ContainerStatus, FilterContainer}; use crate::{ws, Error, Result, State}; @@ -19,7 +17,7 @@ pub fn build_routes() -> Router { .route("/test", get(test_endpoint)) .route("/container", post(clone_container)) .route("/container/list?<container..>", get(container_list)) - .route("/ws", get(ws_upgrade)) + .route("/container/attach", get(ws_upgrade)) } /// # Test endpoint @@ -35,19 +33,22 @@ async fn test_endpoint(Extension(state): Extension<Arc<State>>) -> Json<String> /// Returns a list of containers based on the query. #[instrument(err, ret)] async fn container_list( - container: Option<Query<ContainerOptions>>, + Query(params): Query<ContainerOptions>, Extension(state): Extension<Arc<State>>, ) -> Result<Json<Vec<Container>>> { - let mut containers = state.nspawn.list()?.into_iter().filter_map(|c| { - Container::try_from(c) - .map_err(|err| warn!("Ignoring invalid nspawn container {:?}", err)) - .ok() - }); + let containers = state + .nspawn + .list()? + .into_iter() + .filter_map(|c| { + Container::try_from(c) + .map_err(|err| warn!("Ignoring invalid nspawn container: {:?}", err)) + .ok() + }) + .filter_container(¶ms) + .into(); - match container { - Some(Query(params)) => Ok(containers.filter_container(params).into()), - _ => Ok(containers.collect::<Vec<_>>().into()), - } + Ok(containers) } /// Create container @@ -57,7 +58,7 @@ async fn container_list( async fn clone_container( Json(container): Json<CloneOptions>, Extension(state): Extension<Arc<State>>, -) -> Result<Json<Container>> { +) -> Result<Json<(Container, ContainerStatus)>> { let predicate = Container::builder() .user(container.user.to_owned()) .template(container.template.to_owned()) @@ -67,7 +68,7 @@ async fn clone_container( .zfs .get_file_systems()? .into_iter() - .filter_container(predicate) + .filter_container(&predicate) .into_iter() .max_by_key(|c| c.id) .map_or(0, |c| c.id + 1); @@ -92,13 +93,13 @@ async fn clone_container( let name = format!("{}-{}-{}", container.template, container.user, id); state.nspawn.create(root, name)?; - - Ok(Json::from(Container { + let container = Container { id, template: container.template, user: container.user, - status: ContainerStatus::Running, - })) + }; + + Ok(Json::from((container, ContainerStatus::Running))) } /// Upgrade to websocket @@ -107,14 +108,13 @@ async fn clone_container( #[instrument(ret, skip_all)] async fn ws_upgrade( ws: WebSocketUpgrade, - Json(options): Json<WebSocketOptions>, - Extension(state): Extension<Arc<State>>, user_agent: Option<TypedHeader<headers::UserAgent>>, + Extension(state): Extension<Arc<State>>, ) -> impl IntoResponse { let agent = user_agent.map_or("Unknown".to_string(), |u| u.to_string()); info!(%agent, "Client connected"); - ws.on_upgrade(|socket| ws::handler(socket, options, state)) + ws.on_upgrade(|socket| ws::handler(socket, state)) } #[cfg(test)] diff --git a/zoned/src/ws.rs b/zoned/src/ws.rs index b7714da..1f5a6fc 100644 --- a/zoned/src/ws.rs +++ b/zoned/src/ws.rs @@ -13,9 +13,9 @@ use tokio::{ }; use tracing::{instrument, warn}; use wspty::PtyMaster; -use zone_core::WebSocketOptions; +use zone_core::{Container, FilterContainer}; -use crate::{Result, State}; +use crate::{Error, Result, State}; #[derive(Deserialize, Debug)] struct WindowSize { @@ -23,17 +23,32 @@ struct WindowSize { rows: u16, } -#[instrument(err, skip_all)] -pub async fn handler( - ws_stream: WebSocket, - options: WebSocketOptions, - state: Arc<State>, -) -> Result<()> { +// #[instrument(err, skip_all)] +pub async fn handler(mut ws_stream: WebSocket, state: Arc<State>) -> Result<()> { + let msg = loop { + match ws_stream.next().await { + Some(Ok(Message::Text(msg))) => break msg, + Some(Ok(_)) => return Err(Error::WebSocketMissingInit), + Some(Err(err)) => return Err(err.into()), + None => continue, + }; + }; + + let container = + serde_json::from_str::<Container>(&msg).map_err(|_| Error::WebSocketMissingInit)?; + + state + .nspawn + .list()? + .into_iter() + .find_container((&container).into()) + .ok_or(Error::NotFound)?; + let (sender, receiver) = ws_stream.split(); let (kill_tx, kill_rx) = mpsc::unbounded_channel(); let (tx, rx) = mpsc::channel(1024); - let pty = state.nspawn.attach(options.into(), kill_rx).await?; + let pty = state.nspawn.attach(container.into(), kill_rx).await?; tokio::select! { res = msg_handler(receiver, pty.clone(), tx.clone(), kill_tx) => res, |