From 0ea877c5d0de10b45768da80c658785835d625e6 Mon Sep 17 00:00:00 2001 From: Toby Vincent Date: Sat, 12 Oct 2024 12:58:51 -0500 Subject: fix: preserve configured service order --- src/state.rs | 71 ++++++++++++++++++++++++++++++++---------------------------- 1 file changed, 38 insertions(+), 33 deletions(-) (limited to 'src/state.rs') diff --git a/src/state.rs b/src/state.rs index bbe235b..c442d9d 100644 --- a/src/state.rs +++ b/src/state.rs @@ -5,46 +5,47 @@ use tokio::sync::watch::Receiver; use tokio_stream::wrappers::WatchStream; use crate::{ - service::{IntoService, ServiceConfig}, + service::{IntoService, ServiceConfig, ServiceKind}, Status, }; #[derive(Clone)] pub struct AppState { rx_map: HashMap>, + indexes: Vec, } impl AppState { - pub fn spawn_services(configs: HashMap) -> AppState { - let mut rx_map = HashMap::new(); - let mut tx_map = HashMap::new(); - for name in configs.keys() { - let (tx, rx) = tokio::sync::watch::channel(Default::default()); - rx_map.insert(name.clone(), rx); - tx_map.insert(name.clone(), tx); - } + pub fn new(configs: Vec) -> AppState { + let mut indexes = Vec::new(); + let rx_map = configs + .into_iter() + .map(|ServiceConfig { name, kind }| { + indexes.push(name.clone()); + let (tx, rx) = tokio::sync::watch::channel(Status::default()); + tokio::spawn(Self::spawn_service(kind, tx.clone())); + (name, rx) + }) + .collect(); + + AppState { rx_map, indexes } + } - for (name, config) in configs { - let (tx, rx) = tokio::sync::watch::channel(Status::default()); - rx_map.insert(name.clone(), rx); - tokio::spawn(async move { - let mut stream = config.into_service(); - while let Some(res) = stream.next().await { - let status = res.into(); - tx.send_if_modified(|s| { - if *s != status { - tracing::debug!(name, ?status, "Updated service status"); - *s = status; - true - } else { - false - } - }); + #[tracing::instrument(skip(tx))] + async fn spawn_service(kind: ServiceKind, tx: tokio::sync::watch::Sender) { + let mut stream = kind.into_service(); + while let Some(res) = stream.next().await { + let status = res.into(); + tx.send_if_modified(|s| { + if *s != status { + tracing::debug!(?status, "Updated service status"); + *s = status; + true + } else { + false } }); } - - AppState { rx_map } } pub fn status(&self, k: &str) -> Option { @@ -52,9 +53,13 @@ impl AppState { } pub fn statuses(&self) -> Vec<(String, Status)> { - self.rx_map + self.indexes .iter() - .map(|(k, v)| (k.clone(), v.borrow().clone())) + .cloned() + .map(|s| { + let status = self.status(&s).expect("Service was unexpectedly removed"); + (s, status) + }) .collect() } @@ -63,10 +68,10 @@ impl AppState { } pub fn streams(&self) -> impl Stream { - let iter = - self.rx_map.clone().into_iter().map(|(name, rx)| { - WatchStream::new(rx).map(move |status| (name.to_owned(), status)) - }); + let iter = self.indexes.iter().cloned().map(|s| { + let stream = self.stream(&s).expect("Service was unexpectedly removed"); + stream.map(move |status| (s.to_owned(), status)) + }); futures::stream::select_all(iter) } -- cgit v1.2.3-70-g09d2