use std::collections::HashMap; use futures::{Stream, StreamExt}; use tokio::sync::watch::Receiver; use tokio_stream::wrappers::WatchStream; use crate::{ service::{IntoService, ServiceConfig}, Status, }; #[derive(Clone)] pub struct AppState { rx_map: HashMap>, } impl AppState { pub fn spawn_services(configs: HashMap) -> AppState { let mut rx_map = HashMap::new(); let mut tx_map = HashMap::new(); for name in configs.keys() { let (tx, rx) = tokio::sync::watch::channel(Default::default()); rx_map.insert(name.clone(), rx); tx_map.insert(name.clone(), tx); } for (name, config) in configs { let (tx, rx) = tokio::sync::watch::channel(Status::default()); rx_map.insert(name.clone(), rx); tokio::spawn(async move { let mut stream = config.into_service(); while let Some(res) = stream.next().await { let status = res.into(); tx.send_if_modified(|s| { if *s != status { tracing::debug!(name, ?status, "Updated service status"); *s = status; true } else { false } }); } }); } AppState { rx_map } } pub fn status(&self, k: &str) -> Option { self.rx_map.get(k).map(|rx| rx.borrow().clone()) } pub fn statuses(&self) -> Vec<(String, Status)> { self.rx_map .iter() .map(|(k, v)| (k.clone(), v.borrow().clone())) .collect() } pub fn stream(&self, k: &str) -> Option> { self.rx_map.get(k).cloned().map(WatchStream::new) } pub fn streams(&self) -> impl Stream { let iter = self.rx_map.clone().into_iter().map(|(name, rx)| { WatchStream::new(rx).map(move |status| (name.to_owned(), status)) }); futures::stream::select_all(iter) } }