diff options
Diffstat (limited to 'src/state.rs')
-rw-r--r-- | src/state.rs | 73 |
1 files changed, 73 insertions, 0 deletions
diff --git a/src/state.rs b/src/state.rs new file mode 100644 index 0000000..bbe235b --- /dev/null +++ b/src/state.rs @@ -0,0 +1,73 @@ +use std::collections::HashMap; + +use futures::{Stream, StreamExt}; +use tokio::sync::watch::Receiver; +use tokio_stream::wrappers::WatchStream; + +use crate::{ + service::{IntoService, ServiceConfig}, + Status, +}; + +#[derive(Clone)] +pub struct AppState { + rx_map: HashMap<String, Receiver<Status>>, +} + +impl AppState { + pub fn spawn_services(configs: HashMap<String, ServiceConfig>) -> AppState { + let mut rx_map = HashMap::new(); + let mut tx_map = HashMap::new(); + for name in configs.keys() { + let (tx, rx) = tokio::sync::watch::channel(Default::default()); + rx_map.insert(name.clone(), rx); + tx_map.insert(name.clone(), tx); + } + + for (name, config) in configs { + let (tx, rx) = tokio::sync::watch::channel(Status::default()); + rx_map.insert(name.clone(), rx); + tokio::spawn(async move { + let mut stream = config.into_service(); + while let Some(res) = stream.next().await { + let status = res.into(); + tx.send_if_modified(|s| { + if *s != status { + tracing::debug!(name, ?status, "Updated service status"); + *s = status; + true + } else { + false + } + }); + } + }); + } + + AppState { rx_map } + } + + pub fn status(&self, k: &str) -> Option<Status> { + self.rx_map.get(k).map(|rx| rx.borrow().clone()) + } + + pub fn statuses(&self) -> Vec<(String, Status)> { + self.rx_map + .iter() + .map(|(k, v)| (k.clone(), v.borrow().clone())) + .collect() + } + + pub fn stream(&self, k: &str) -> Option<impl Stream<Item = Status>> { + self.rx_map.get(k).cloned().map(WatchStream::new) + } + + pub fn streams(&self) -> impl Stream<Item = (String, Status)> { + let iter = + self.rx_map.clone().into_iter().map(|(name, rx)| { + WatchStream::new(rx).map(move |status| (name.to_owned(), status)) + }); + + futures::stream::select_all(iter) + } +} |