From b94f8e694bf01f5dba9ce2c01f589463a3dfbc69 Mon Sep 17 00:00:00 2001 From: Toby Vincent Date: Wed, 9 Oct 2024 18:23:58 -0500 Subject: feat!: rewrite to use traits and streams --- src/api/services.rs | 52 +++++++++++++++++++++++++++------------------------- src/api/sse.rs | 45 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+), 25 deletions(-) create mode 100644 src/api/sse.rs (limited to 'src/api') diff --git a/src/api/services.rs b/src/api/services.rs index 132ecb1..aeca924 100644 --- a/src/api/services.rs +++ b/src/api/services.rs @@ -2,48 +2,50 @@ use std::collections::HashMap; use axum::{ extract::{Path, Query, State}, - Json, Router, + routing::get, + Json, }; -use axum_extra::routing::Resource; use serde::{Deserialize, Serialize}; -use crate::{service::ServiceHandles, Error, Status}; +use crate::{AppState, Error, Status}; + +pub fn router() -> axum::Router { + axum::Router::new() + .route("/", get(services)) + .route("/:name", get(service)) +} #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct ServiceQuery { pub name: Option, - pub state: Option, + pub status: Option, } -pub fn router() -> Router { - Resource::named("services").index(index).show(show).into() +impl ServiceQuery { + pub fn filter(&self, value: &(String, Status)) -> bool { + !self.name.as_ref().is_some_and(|n| *n != value.0) + && !self.status.as_ref().is_some_and(|s| *s != value.1) + } } -pub async fn index( +pub async fn services( Query(query): Query, - State(services): State, + State(state): State, ) -> Json> { - let map = match query.name { - Some(n) => services - .iter() - .filter(|(name, _)| n == **name) - .map(|(name, srv)| (name.clone(), srv.status())) + Json( + state + .statuses() + .into_iter() + .filter(|item| query.filter(item)) .collect(), - None => services - .iter() - .map(|(name, srv)| (name.clone(), srv.status())) - .collect(), - }; - - Json(map) + ) } -pub async fn show( +pub async fn service( Path(name): Path, - State(services): State, + State(state): State, ) -> Result { - services - .get(&name) - .map(|s| s.status()) + state + .status(&name) .ok_or_else(|| Error::ServiceNotFound(name)) } diff --git a/src/api/sse.rs b/src/api/sse.rs new file mode 100644 index 0000000..5d913bb --- /dev/null +++ b/src/api/sse.rs @@ -0,0 +1,45 @@ +use std::convert::Infallible; + +use axum::{ + extract::{Path, State}, + response::{ + sse::{Event, KeepAlive}, + Sse, + }, + routing::get, + Router, +}; +use futures::Stream; +use tokio_stream::StreamExt; + +use crate::{AppState, Error}; + +pub fn router() -> Router { + axum::Router::new() + .route("/", get(events)) + .route("/:name", get(service_events)) +} + +pub async fn events( + State(state): State, +) -> Result>>, Error> { + let stream = state.streams().map(|(name, status)| { + let data = serde_json::to_string(&status)?; + Ok(Event::default().event(name).data(data)) + }); + + Ok(Sse::new(stream).keep_alive(KeepAlive::default())) +} + +pub async fn service_events( + Path(name): Path, + State(state): State, +) -> Result>>, Error> { + let stream = state + .stream(&name) + .ok_or_else(|| Error::ServiceNotFound(name))? + .map(Event::from) + .map(Ok); + + Ok(Sse::new(stream).keep_alive(KeepAlive::default())) +} -- cgit v1.2.3-70-g09d2