use anyhow::Context; use axum::{ extract::{ ws::{Message, WebSocket, WebSocketUpgrade}, Extension, Query, TypedHeader, }, headers, response::IntoResponse, routing::{get, post}, Json, Router, }; use bytes::BytesMut; use futures::{ stream::{SplitSink, SplitStream, StreamExt}, SinkExt, }; use serde::Deserialize; use std::sync::Arc; use tokio::{ io::{AsyncReadExt, AsyncWriteExt}, process::Command, sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, }; use tracing::{debug, error, info, warn}; use wspty::{PtyCommand, PtyMaster}; use zone_core::{Container, ContainerOptions, FilterContainer}; use zone_nspawn::NSpawn; use crate::{Error, Result, State}; #[derive(Deserialize, Debug)] struct WindowSize { cols: u16, rows: u16, } pub fn build_routes() -> Router { Router::new() .route("/test", get(test_endpoint)) .route("/container", post(clone_container)) .route("/container/list?", get(container_list)) .route("/ws", get(ws_handler)) } /// # Test endpoint /// /// Returns a list of containers based on the query. async fn test_endpoint(Extension(state): Extension>) -> Json { Json(state.zfs.config.pool_name.to_owned()) } /// List containers /// /// Returns a list of containers based on the query. async fn container_list( container: Option>, ) -> Result>> { let mut containers = NSpawn::get_containers()?.into_iter().filter_map(|c| { Container::try_from(c) .map_err(|err| warn!("Ignoring invalid nspawn container {:?}", err)) .ok() }); match container { Some(Query(params)) => Ok(containers.filter_container(params).into()), _ => Ok(containers.collect::>().into()), } } /// Create container /// /// Creates a new container volume from the provided container json data async fn clone_container( Json(container): Json, Extension(state): Extension>, ) -> Result> { state .zfs .clone_from_latest( format!("{}-{}", container.user, container.id).into(), container.template.into(), )? .try_into() .map_err(Error::from) .map(Container::into) } async fn ws_handler( ws: WebSocketUpgrade, user_agent: Option>, Extension(state): Extension>, ) -> impl IntoResponse { info!("Client connected"); if let Some(TypedHeader(user_agent)) = user_agent { debug!("`{}` connected", user_agent.as_str()); } ws.on_upgrade(|socket| websocket(socket, state)) } async fn websocket(ws_stream: WebSocket, _state: Arc) -> Result<()> { debug!("Handling websocket!"); let (sender, receiver) = unbounded_channel(); let ws_sender = sender.clone(); let (ws_outgoing, ws_incoming) = ws_stream.split(); let mut cmd = Command::new("bash"); cmd.arg("-l").env("TERM", "xterm-256color"); let mut pty_cmd = PtyCommand::from(cmd); let (stop_sender, stop_receiver) = unbounded_channel(); let pty_master = pty_cmd.run(stop_receiver).await?; let pty_shell_writer = pty_master.clone(); let pty_shell_reader = pty_master.clone(); let res = tokio::select! { res = handle_websocket_incoming(ws_incoming, pty_shell_writer, sender, stop_sender) => res, res = handle_pty_incoming(pty_shell_reader, ws_sender) => res, res = write_to_websocket(ws_outgoing, receiver) => res, }; debug!("res = {:?}", res); Ok(()) } async fn handle_websocket_incoming( mut incoming: SplitStream, mut pty_shell_writer: PtyMaster, websocket_sender: UnboundedSender, stop_sender: UnboundedSender<()>, ) -> Result<()> { while let Some(Ok(msg)) = incoming.next().await { match msg { Message::Binary(data) => match data[0] { 0 => { if data.len().gt(&0) { pty_shell_writer.write_all(&data[1..]).await?; } } 1 => { let resize_msg: WindowSize = serde_json::from_slice(&data[1..]).context("Failed to convert")?; pty_shell_writer.resize(resize_msg.cols, resize_msg.rows)?; } 2 => { websocket_sender .send(Message::Binary(vec![1u8])) .context("Failed to send")?; } _ => (), }, Message::Ping(data) => websocket_sender .send(Message::Pong(data)) .context("Failed to send")?, _ => (), }; } let _ = stop_sender .send(()) .map_err(|e| debug!("failed to send stop signal: {:?}", e)); Ok(()) } async fn handle_pty_incoming( mut pty_shell_reader: PtyMaster, websocket_sender: UnboundedSender, ) -> Result<()> { let fut = async move { let mut buffer = BytesMut::with_capacity(1024); buffer.resize(1024, 0u8); loop { buffer[0] = 0u8; let mut tail = &mut buffer[1..]; let n = pty_shell_reader.read_buf(&mut tail).await?; if n == 0 { break; } match websocket_sender.send(Message::Binary(buffer[..n + 1].to_vec())) { Ok(_) => (), Err(e) => anyhow::bail!("failed to send msg to client: {:?}", e), } } Ok::<(), anyhow::Error>(()) }; fut.await.map_err(|e| { error!("handle pty incoming error: {:?}", &e); e.into() }) } async fn write_to_websocket( mut outgoing: SplitSink, mut receiver: UnboundedReceiver, ) -> Result<()> { while let Some(msg) = receiver.recv().await { outgoing.send(msg).await?; } Ok(()) } #[cfg(test)] mod tests { #[test] fn hello_world() { // use super::*; assert!("true" == "true"); } }