use std::collections::HashMap; use futures::{Stream, StreamExt}; use tokio::sync::watch::Receiver; use tokio_stream::wrappers::WatchStream; use crate::{ service::{IntoService, ServiceConfig}, Status, }; #[derive(Clone)] pub struct AppState { rx_map: HashMap>, indexes: Vec, } impl AppState { pub fn new(configs: Vec) -> AppState { let mut indexes = Vec::new(); let rx_map = configs .into_iter() .map(|ServiceConfig { name, kind }| { indexes.push(name.clone()); tracing::debug!(name, "Added service"); let (tx, rx) = tokio::sync::watch::channel(Status::default()); tokio::spawn(kind.into_service(tx.clone())); (name, rx) }) .collect(); tracing::debug!(?indexes, "Finished spawning services"); AppState { rx_map, indexes } } pub fn names(&self) -> Vec { self.indexes.clone() } pub fn status(&self, k: &str) -> Option { self.rx_map.get(k).map(|rx| rx.borrow().clone()) } pub fn statuses(&self) -> Vec<(String, Status)> { self.indexes .iter() .cloned() .map(|s| { let status = self.status(&s).expect("Service was unexpectedly removed"); (s, status) }) .collect() } pub fn stream(&self, k: &str) -> Option> { self.rx_map.get(k).cloned().map(WatchStream::new) } pub fn streams(&self) -> impl Stream { let iter = self.indexes.iter().cloned().map(|s| { let stream = self.stream(&s).expect("Service was unexpectedly removed"); stream.map(move |status| (s.to_owned(), status)) }); futures::stream::select_all(iter) } }