aboutsummaryrefslogtreecommitdiffstats
path: root/zoned/src
diff options
context:
space:
mode:
authorToby Vincent <tobyv13@gmail.com>2022-04-22 01:53:42 -0500
committerToby Vincent <tobyv13@gmail.com>2022-04-22 01:53:42 -0500
commitb627ec4e0d9c8686d5d193b61e7e2496a1116f60 (patch)
tree87e24372a113a34d925e106b61a172865ba0311a /zoned/src
parente3fc06077d7334d627a86e38077386268a81376c (diff)
feat(zone): impl attach subcommand
Diffstat (limited to 'zoned/src')
-rw-r--r--zoned/src/error.rs12
-rw-r--r--zoned/src/http.rs46
-rw-r--r--zoned/src/ws.rs33
3 files changed, 59 insertions, 32 deletions
diff --git a/zoned/src/error.rs b/zoned/src/error.rs
index 922a352..0c28e48 100644
--- a/zoned/src/error.rs
+++ b/zoned/src/error.rs
@@ -20,9 +20,15 @@ pub enum Error {
#[error("Container Error: {0:?}")]
Container(String),
+ #[error("WebSocket Error: {0:?}")]
+ WebSocket(String),
+
#[error("Container not found")]
NotFound,
+ #[error("Missing initialization message")]
+ WebSocketMissingInit,
+
#[error("ZFS Error: {source:?}")]
ZFS {
#[from]
@@ -59,6 +65,12 @@ pub enum Error {
source: std::io::Error,
},
+ #[error("Json Error: {source:?}")]
+ Json {
+ #[from]
+ source: serde_json::error::Error,
+ },
+
#[error("Send Error: {source:?}")]
Send {
#[from]
diff --git a/zoned/src/http.rs b/zoned/src/http.rs
index dda3296..8d388b1 100644
--- a/zoned/src/http.rs
+++ b/zoned/src/http.rs
@@ -7,9 +7,7 @@ use axum::{
};
use std::{process::Command, sync::Arc};
use tracing::{info, instrument, warn};
-use zone_core::{
- CloneOptions, Container, ContainerOptions, ContainerStatus, FilterContainer, WebSocketOptions,
-};
+use zone_core::{CloneOptions, Container, ContainerOptions, ContainerStatus, FilterContainer};
use crate::{ws, Error, Result, State};
@@ -19,7 +17,7 @@ pub fn build_routes() -> Router {
.route("/test", get(test_endpoint))
.route("/container", post(clone_container))
.route("/container/list?<container..>", get(container_list))
- .route("/ws", get(ws_upgrade))
+ .route("/container/attach", get(ws_upgrade))
}
/// # Test endpoint
@@ -35,19 +33,22 @@ async fn test_endpoint(Extension(state): Extension<Arc<State>>) -> Json<String>
/// Returns a list of containers based on the query.
#[instrument(err, ret)]
async fn container_list(
- container: Option<Query<ContainerOptions>>,
+ Query(params): Query<ContainerOptions>,
Extension(state): Extension<Arc<State>>,
) -> Result<Json<Vec<Container>>> {
- let mut containers = state.nspawn.list()?.into_iter().filter_map(|c| {
- Container::try_from(c)
- .map_err(|err| warn!("Ignoring invalid nspawn container {:?}", err))
- .ok()
- });
+ let containers = state
+ .nspawn
+ .list()?
+ .into_iter()
+ .filter_map(|c| {
+ Container::try_from(c)
+ .map_err(|err| warn!("Ignoring invalid nspawn container: {:?}", err))
+ .ok()
+ })
+ .filter_container(&params)
+ .into();
- match container {
- Some(Query(params)) => Ok(containers.filter_container(params).into()),
- _ => Ok(containers.collect::<Vec<_>>().into()),
- }
+ Ok(containers)
}
/// Create container
@@ -57,7 +58,7 @@ async fn container_list(
async fn clone_container(
Json(container): Json<CloneOptions>,
Extension(state): Extension<Arc<State>>,
-) -> Result<Json<Container>> {
+) -> Result<Json<(Container, ContainerStatus)>> {
let predicate = Container::builder()
.user(container.user.to_owned())
.template(container.template.to_owned())
@@ -67,7 +68,7 @@ async fn clone_container(
.zfs
.get_file_systems()?
.into_iter()
- .filter_container(predicate)
+ .filter_container(&predicate)
.into_iter()
.max_by_key(|c| c.id)
.map_or(0, |c| c.id + 1);
@@ -92,13 +93,13 @@ async fn clone_container(
let name = format!("{}-{}-{}", container.template, container.user, id);
state.nspawn.create(root, name)?;
-
- Ok(Json::from(Container {
+ let container = Container {
id,
template: container.template,
user: container.user,
- status: ContainerStatus::Running,
- }))
+ };
+
+ Ok(Json::from((container, ContainerStatus::Running)))
}
/// Upgrade to websocket
@@ -107,14 +108,13 @@ async fn clone_container(
#[instrument(ret, skip_all)]
async fn ws_upgrade(
ws: WebSocketUpgrade,
- Json(options): Json<WebSocketOptions>,
- Extension(state): Extension<Arc<State>>,
user_agent: Option<TypedHeader<headers::UserAgent>>,
+ Extension(state): Extension<Arc<State>>,
) -> impl IntoResponse {
let agent = user_agent.map_or("Unknown".to_string(), |u| u.to_string());
info!(%agent, "Client connected");
- ws.on_upgrade(|socket| ws::handler(socket, options, state))
+ ws.on_upgrade(|socket| ws::handler(socket, state))
}
#[cfg(test)]
diff --git a/zoned/src/ws.rs b/zoned/src/ws.rs
index b7714da..1f5a6fc 100644
--- a/zoned/src/ws.rs
+++ b/zoned/src/ws.rs
@@ -13,9 +13,9 @@ use tokio::{
};
use tracing::{instrument, warn};
use wspty::PtyMaster;
-use zone_core::WebSocketOptions;
+use zone_core::{Container, FilterContainer};
-use crate::{Result, State};
+use crate::{Error, Result, State};
#[derive(Deserialize, Debug)]
struct WindowSize {
@@ -23,17 +23,32 @@ struct WindowSize {
rows: u16,
}
-#[instrument(err, skip_all)]
-pub async fn handler(
- ws_stream: WebSocket,
- options: WebSocketOptions,
- state: Arc<State>,
-) -> Result<()> {
+// #[instrument(err, skip_all)]
+pub async fn handler(mut ws_stream: WebSocket, state: Arc<State>) -> Result<()> {
+ let msg = loop {
+ match ws_stream.next().await {
+ Some(Ok(Message::Text(msg))) => break msg,
+ Some(Ok(_)) => return Err(Error::WebSocketMissingInit),
+ Some(Err(err)) => return Err(err.into()),
+ None => continue,
+ };
+ };
+
+ let container =
+ serde_json::from_str::<Container>(&msg).map_err(|_| Error::WebSocketMissingInit)?;
+
+ state
+ .nspawn
+ .list()?
+ .into_iter()
+ .find_container((&container).into())
+ .ok_or(Error::NotFound)?;
+
let (sender, receiver) = ws_stream.split();
let (kill_tx, kill_rx) = mpsc::unbounded_channel();
let (tx, rx) = mpsc::channel(1024);
- let pty = state.nspawn.attach(options.into(), kill_rx).await?;
+ let pty = state.nspawn.attach(container.into(), kill_rx).await?;
tokio::select! {
res = msg_handler(receiver, pty.clone(), tx.clone(), kill_tx) => res,