aboutsummaryrefslogtreecommitdiffstats
path: root/zoned/src/api.rs
diff options
context:
space:
mode:
authorToby Vincent <tobyv13@gmail.com>2022-03-24 22:17:49 -0500
committerToby Vincent <tobyv13@gmail.com>2022-03-24 22:17:49 -0500
commit40d1c9be82be7008eae59963a8df899174094102 (patch)
tree03e633e1b12f246c6156e87f3b551a4b87d0d7c9 /zoned/src/api.rs
parent6d61152a981f6d4568836458d8e566f7b991a9bc (diff)
refactor: extract websocket into it's own module
Diffstat (limited to 'zoned/src/api.rs')
-rw-r--r--zoned/src/api.rs139
1 files changed, 16 insertions, 123 deletions
diff --git a/zoned/src/api.rs b/zoned/src/api.rs
index 160ab15..5236764 100644
--- a/zoned/src/api.rs
+++ b/zoned/src/api.rs
@@ -1,47 +1,30 @@
-use anyhow::Context;
use axum::{
- extract::{
- ws::{Message, WebSocket, WebSocketUpgrade},
- Extension, Query, TypedHeader,
- },
+ extract::{ws::WebSocketUpgrade, Extension, Query, TypedHeader},
headers,
response::IntoResponse,
routing::{get, post},
Json, Router,
};
-use bytes::BytesMut;
-use futures::{stream::StreamExt, SinkExt};
-use serde::Deserialize;
use std::sync::Arc;
-use tokio::{
- io::{AsyncReadExt, AsyncWriteExt},
- process::Command,
- sync::mpsc::unbounded_channel,
-};
-use tracing::{debug, error, info, warn};
-use wspty::PtyCommand;
+use tracing::{info, instrument, warn};
use zone_core::{Container, ContainerOptions, FilterContainer};
use zone_nspawn::NSpawn;
-use crate::{Error, Result, State};
-
-#[derive(Deserialize, Debug)]
-struct WindowSize {
- cols: u16,
- rows: u16,
-}
+use crate::{ws, Error, Result, State};
+#[instrument()]
pub fn build_routes() -> Router {
Router::new()
.route("/test", get(test_endpoint))
.route("/container", post(clone_container))
.route("/container/list?<container..>", get(container_list))
- .route("/ws", get(ws_handler))
+ .route("/ws", get(ws_upgrade))
}
/// # Test endpoint
///
/// Returns a list of containers based on the query.
+#[instrument(ret, skip(state))]
async fn test_endpoint(Extension(state): Extension<Arc<State>>) -> Json<String> {
Json(state.zfs.config.pool_name.to_owned())
}
@@ -49,6 +32,7 @@ async fn test_endpoint(Extension(state): Extension<Arc<State>>) -> Json<String>
/// List containers
///
/// Returns a list of containers based on the query.
+#[instrument(err, ret)]
async fn container_list(
container: Option<Query<ContainerOptions>>,
) -> Result<Json<Vec<Container>>> {
@@ -67,6 +51,7 @@ async fn container_list(
/// Create container
///
/// Creates a new container volume from the provided container json data
+#[instrument(err, ret, skip(state))]
async fn clone_container(
Json(container): Json<Container>,
Extension(state): Extension<Arc<State>>,
@@ -82,111 +67,19 @@ async fn clone_container(
.map(Container::into)
}
-async fn ws_handler(
+/// Upgrade to websocket
+///
+/// Creates a new container volume from the provided container json data
+#[instrument(ret, skip_all)]
+async fn ws_upgrade(
ws: WebSocketUpgrade,
user_agent: Option<TypedHeader<headers::UserAgent>>,
Extension(state): Extension<Arc<State>>,
) -> impl IntoResponse {
- info!("Client connected");
-
- if let Some(TypedHeader(user_agent)) = user_agent {
- debug!("`{}` connected", user_agent.as_str());
- }
-
- ws.on_upgrade(|socket| websocket(socket, state))
-}
-
-async fn websocket(ws_stream: WebSocket, _state: Arc<State>) {
- debug!("Handling websocket!");
-
- let (mut sender, mut receiver) = ws_stream.split();
-
- let (tx, mut rx) = unbounded_channel();
- let ws_tx = tx.clone();
+ let ua = user_agent.map_or("Unknown".to_string(), |u| u.to_string());
+ info!(%ua, "Client connected");
- let mut cmd = Command::new("bash");
-
- cmd.arg("-l").env("TERM", "xterm-256color");
-
- let mut pty_cmd = PtyCommand::from(cmd);
- let (kill_tx, kill_rx) = unbounded_channel();
-
- let (mut pty_write, mut pty_read) = match pty_cmd.run(kill_rx).await {
- Ok(pty) => (pty.clone(), pty),
- Err(err) => {
- error!(?err);
- return;
- }
- };
-
- let recv_task = tokio::spawn(async move {
- while let Some(Ok(msg)) = receiver.next().await {
- match msg {
- Message::Binary(data) => match data[0] {
- 0 => {
- if data.len().gt(&0) {
- pty_write.write_all(&data[1..]).await?;
- }
- }
- 1 => {
- let resize_msg: WindowSize =
- serde_json::from_slice(&data[1..]).context("Failed to convert")?;
- pty_write.resize(resize_msg.cols, resize_msg.rows)?;
- }
- 2 => {
- tx.send(Message::Binary(vec![1u8]))
- .context("Failed to send")?;
- }
- _ => (),
- },
- Message::Ping(data) => tx.send(Message::Pong(data)).context("Failed to send")?,
- _ => (),
- };
- }
- let _ = kill_tx
- .send(())
- .map_err(|e| debug!("failed to send stop signal: {:?}", e));
- Ok(())
- });
-
- let read_task = tokio::spawn(async move {
- let fut = async move {
- let mut buffer = BytesMut::with_capacity(1024);
- buffer.resize(1024, 0u8);
- loop {
- buffer[0] = 0u8;
- let mut tail = &mut buffer[1..];
- let n = pty_read.read_buf(&mut tail).await?;
- if n == 0 {
- break;
- }
- match ws_tx.send(Message::Binary(buffer[..n + 1].to_vec())) {
- Ok(_) => (),
- Err(e) => anyhow::bail!("failed to send msg to client: {:?}", e),
- }
- }
- Ok::<(), anyhow::Error>(())
- };
- fut.await.map_err(|e| {
- error!("handle pty incoming error: {:?}", &e);
- e.into()
- })
- });
-
- let send_task = tokio::spawn(async move {
- while let Some(msg) = rx.recv().await {
- sender.send(msg).await?;
- }
- Result::Ok(())
- });
-
- if let Err(err) = tokio::select! {
- res = recv_task => res,
- res = read_task => res,
- res = send_task => res,
- } {
- error!(?err);
- }
+ ws.on_upgrade(|socket| ws::handler(socket, state))
}
#[cfg(test)]