summaryrefslogtreecommitdiffstats
path: root/src/api/sse.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/api/sse.rs')
-rw-r--r--src/api/sse.rs45
1 files changed, 45 insertions, 0 deletions
diff --git a/src/api/sse.rs b/src/api/sse.rs
new file mode 100644
index 0000000..5d913bb
--- /dev/null
+++ b/src/api/sse.rs
@@ -0,0 +1,45 @@
+use std::convert::Infallible;
+
+use axum::{
+ extract::{Path, State},
+ response::{
+ sse::{Event, KeepAlive},
+ Sse,
+ },
+ routing::get,
+ Router,
+};
+use futures::Stream;
+use tokio_stream::StreamExt;
+
+use crate::{AppState, Error};
+
+pub fn router() -> Router<AppState> {
+ axum::Router::new()
+ .route("/", get(events))
+ .route("/:name", get(service_events))
+}
+
+pub async fn events(
+ State(state): State<AppState>,
+) -> Result<Sse<impl Stream<Item = Result<Event, Error>>>, Error> {
+ let stream = state.streams().map(|(name, status)| {
+ let data = serde_json::to_string(&status)?;
+ Ok(Event::default().event(name).data(data))
+ });
+
+ Ok(Sse::new(stream).keep_alive(KeepAlive::default()))
+}
+
+pub async fn service_events(
+ Path(name): Path<String>,
+ State(state): State<AppState>,
+) -> Result<Sse<impl Stream<Item = Result<Event, Infallible>>>, Error> {
+ let stream = state
+ .stream(&name)
+ .ok_or_else(|| Error::ServiceNotFound(name))?
+ .map(Event::from)
+ .map(Ok);
+
+ Ok(Sse::new(stream).keep_alive(KeepAlive::default()))
+}