From b94f8e694bf01f5dba9ce2c01f589463a3dfbc69 Mon Sep 17 00:00:00 2001 From: Toby Vincent Date: Wed, 9 Oct 2024 18:23:58 -0500 Subject: feat!: rewrite to use traits and streams --- src/api/sse.rs | 45 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 src/api/sse.rs (limited to 'src/api/sse.rs') diff --git a/src/api/sse.rs b/src/api/sse.rs new file mode 100644 index 0000000..5d913bb --- /dev/null +++ b/src/api/sse.rs @@ -0,0 +1,45 @@ +use std::convert::Infallible; + +use axum::{ + extract::{Path, State}, + response::{ + sse::{Event, KeepAlive}, + Sse, + }, + routing::get, + Router, +}; +use futures::Stream; +use tokio_stream::StreamExt; + +use crate::{AppState, Error}; + +pub fn router() -> Router { + axum::Router::new() + .route("/", get(events)) + .route("/:name", get(service_events)) +} + +pub async fn events( + State(state): State, +) -> Result>>, Error> { + let stream = state.streams().map(|(name, status)| { + let data = serde_json::to_string(&status)?; + Ok(Event::default().event(name).data(data)) + }); + + Ok(Sse::new(stream).keep_alive(KeepAlive::default())) +} + +pub async fn service_events( + Path(name): Path, + State(state): State, +) -> Result>>, Error> { + let stream = state + .stream(&name) + .ok_or_else(|| Error::ServiceNotFound(name))? + .map(Event::from) + .map(Ok); + + Ok(Sse::new(stream).keep_alive(KeepAlive::default())) +} -- cgit v1.2.3-70-g09d2