From 0ea877c5d0de10b45768da80c658785835d625e6 Mon Sep 17 00:00:00 2001 From: Toby Vincent Date: Sat, 12 Oct 2024 12:58:51 -0500 Subject: fix: preserve configured service order --- src/main.rs | 6 ++--- src/service.rs | 21 +++++++++++------ src/state.rs | 71 +++++++++++++++++++++++++++++++--------------------------- 3 files changed, 55 insertions(+), 43 deletions(-) (limited to 'src') diff --git a/src/main.rs b/src/main.rs index 85ff708..d5a35b7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, fs::File, path::PathBuf}; +use std::{fs::File, path::PathBuf}; use tower_http::services::ServeDir; use tracing::level_filters::LevelFilter; @@ -21,7 +21,7 @@ async fn main() -> Result<(), Box> { } }; - let state = AppState::spawn_services(config.services); + let state = AppState::new(config.services); let mut router = statsrv::router().with_state(state); if let Some(path) = config.serve_dir { @@ -41,7 +41,7 @@ async fn main() -> Result<(), Box> { pub struct Config { pub serve_dir: Option, pub address: String, - pub services: HashMap, + pub services: Vec, } impl Config { diff --git a/src/service.rs b/src/service.rs index b10385a..9ca9fc6 100644 --- a/src/service.rs +++ b/src/service.rs @@ -46,11 +46,18 @@ pub fn default_interval() -> std::time::Duration { } #[derive(Debug, Clone, Deserialize)] -#[serde(untagged)] -pub enum ServiceConfig { +pub struct ServiceConfig { + pub name: String, + #[serde(flatten)] + pub kind: ServiceKind, +} + +#[derive(Debug, Clone, Deserialize)] +#[serde(rename_all = "lowercase")] +pub enum ServiceKind { Http(http::Http), Tcp(tcp::Tcp), - Command(command::Command), + Exec(command::Command), } #[derive(Debug, thiserror::Error)] @@ -63,14 +70,14 @@ pub enum ServiceError { Command(#[from] command::Error), } -impl IntoService for ServiceConfig { +impl IntoService for ServiceKind { type Error = ServiceError; fn into_service(self) -> impl Stream> + Send { match self { - ServiceConfig::Http(h) => h.into_service().map_err(ServiceError::from).boxed(), - ServiceConfig::Tcp(t) => t.into_service().map_err(ServiceError::from).boxed(), - ServiceConfig::Command(c) => c.into_service().map_err(ServiceError::from).boxed(), + ServiceKind::Http(h) => h.into_service().map_err(ServiceError::from).boxed(), + ServiceKind::Tcp(t) => t.into_service().map_err(ServiceError::from).boxed(), + ServiceKind::Exec(c) => c.into_service().map_err(ServiceError::from).boxed(), } } } diff --git a/src/state.rs b/src/state.rs index bbe235b..c442d9d 100644 --- a/src/state.rs +++ b/src/state.rs @@ -5,46 +5,47 @@ use tokio::sync::watch::Receiver; use tokio_stream::wrappers::WatchStream; use crate::{ - service::{IntoService, ServiceConfig}, + service::{IntoService, ServiceConfig, ServiceKind}, Status, }; #[derive(Clone)] pub struct AppState { rx_map: HashMap>, + indexes: Vec, } impl AppState { - pub fn spawn_services(configs: HashMap) -> AppState { - let mut rx_map = HashMap::new(); - let mut tx_map = HashMap::new(); - for name in configs.keys() { - let (tx, rx) = tokio::sync::watch::channel(Default::default()); - rx_map.insert(name.clone(), rx); - tx_map.insert(name.clone(), tx); - } + pub fn new(configs: Vec) -> AppState { + let mut indexes = Vec::new(); + let rx_map = configs + .into_iter() + .map(|ServiceConfig { name, kind }| { + indexes.push(name.clone()); + let (tx, rx) = tokio::sync::watch::channel(Status::default()); + tokio::spawn(Self::spawn_service(kind, tx.clone())); + (name, rx) + }) + .collect(); + + AppState { rx_map, indexes } + } - for (name, config) in configs { - let (tx, rx) = tokio::sync::watch::channel(Status::default()); - rx_map.insert(name.clone(), rx); - tokio::spawn(async move { - let mut stream = config.into_service(); - while let Some(res) = stream.next().await { - let status = res.into(); - tx.send_if_modified(|s| { - if *s != status { - tracing::debug!(name, ?status, "Updated service status"); - *s = status; - true - } else { - false - } - }); + #[tracing::instrument(skip(tx))] + async fn spawn_service(kind: ServiceKind, tx: tokio::sync::watch::Sender) { + let mut stream = kind.into_service(); + while let Some(res) = stream.next().await { + let status = res.into(); + tx.send_if_modified(|s| { + if *s != status { + tracing::debug!(?status, "Updated service status"); + *s = status; + true + } else { + false } }); } - - AppState { rx_map } } pub fn status(&self, k: &str) -> Option { @@ -52,9 +53,13 @@ impl AppState { } pub fn statuses(&self) -> Vec<(String, Status)> { - self.rx_map + self.indexes .iter() - .map(|(k, v)| (k.clone(), v.borrow().clone())) + .cloned() + .map(|s| { + let status = self.status(&s).expect("Service was unexpectedly removed"); + (s, status) + }) .collect() } @@ -63,10 +68,10 @@ impl AppState { } pub fn streams(&self) -> impl Stream { - let iter = - self.rx_map.clone().into_iter().map(|(name, rx)| { - WatchStream::new(rx).map(move |status| (name.to_owned(), status)) - }); + let iter = self.indexes.iter().cloned().map(|s| { + let stream = self.stream(&s).expect("Service was unexpectedly removed"); + stream.map(move |status| (s.to_owned(), status)) + }); futures::stream::select_all(iter) } -- cgit v1.2.3-70-g09d2