From b94f8e694bf01f5dba9ce2c01f589463a3dfbc69 Mon Sep 17 00:00:00 2001 From: Toby Vincent Date: Wed, 9 Oct 2024 18:23:58 -0500 Subject: feat!: rewrite to use traits and streams --- src/state.rs | 73 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 73 insertions(+) create mode 100644 src/state.rs (limited to 'src/state.rs') diff --git a/src/state.rs b/src/state.rs new file mode 100644 index 0000000..bbe235b --- /dev/null +++ b/src/state.rs @@ -0,0 +1,73 @@ +use std::collections::HashMap; + +use futures::{Stream, StreamExt}; +use tokio::sync::watch::Receiver; +use tokio_stream::wrappers::WatchStream; + +use crate::{ + service::{IntoService, ServiceConfig}, + Status, +}; + +#[derive(Clone)] +pub struct AppState { + rx_map: HashMap>, +} + +impl AppState { + pub fn spawn_services(configs: HashMap) -> AppState { + let mut rx_map = HashMap::new(); + let mut tx_map = HashMap::new(); + for name in configs.keys() { + let (tx, rx) = tokio::sync::watch::channel(Default::default()); + rx_map.insert(name.clone(), rx); + tx_map.insert(name.clone(), tx); + } + + for (name, config) in configs { + let (tx, rx) = tokio::sync::watch::channel(Status::default()); + rx_map.insert(name.clone(), rx); + tokio::spawn(async move { + let mut stream = config.into_service(); + while let Some(res) = stream.next().await { + let status = res.into(); + tx.send_if_modified(|s| { + if *s != status { + tracing::debug!(name, ?status, "Updated service status"); + *s = status; + true + } else { + false + } + }); + } + }); + } + + AppState { rx_map } + } + + pub fn status(&self, k: &str) -> Option { + self.rx_map.get(k).map(|rx| rx.borrow().clone()) + } + + pub fn statuses(&self) -> Vec<(String, Status)> { + self.rx_map + .iter() + .map(|(k, v)| (k.clone(), v.borrow().clone())) + .collect() + } + + pub fn stream(&self, k: &str) -> Option> { + self.rx_map.get(k).cloned().map(WatchStream::new) + } + + pub fn streams(&self) -> impl Stream { + let iter = + self.rx_map.clone().into_iter().map(|(name, rx)| { + WatchStream::new(rx).map(move |status| (name.to_owned(), status)) + }); + + futures::stream::select_all(iter) + } +} -- cgit v1.2.3-70-g09d2