From fe16a923190243dfde5db6ceff2ef0bcf9158926 Mon Sep 17 00:00:00 2001 From: Toby Vincent Date: Tue, 1 Oct 2024 13:15:24 -0500 Subject: feat: simplify service status type --- src/api.rs | 8 ++++---- src/lib.rs | 17 ++++++++++------- src/main.rs | 14 +++++--------- src/service.rs | 4 ++-- src/service/http.rs | 12 ++++-------- src/service/systemd.rs | 17 +++++++---------- src/service/tcp.rs | 10 +++++----- src/sse.rs | 11 ++++++++--- 8 files changed, 45 insertions(+), 48 deletions(-) (limited to 'src') diff --git a/src/api.rs b/src/api.rs index 1489c21..5a8deb6 100644 --- a/src/api.rs +++ b/src/api.rs @@ -45,12 +45,12 @@ pub async fn healthcheck(State(services): State) -> Health { let status = match checks .values() - .filter(|s| !matches!(s, Status::Pass)) + .filter(|s| !matches!(s, Status::Ok)) .count() { - 0 => Status::Pass, - 1 => Status::Fail(Some("1 issue detected".to_string())), - n => Status::Fail(Some(format!("{n} issues detected"))), + 0 => Status::Ok, + 1 => Status::Error(Some("1 issue detected".to_string())), + n => Status::Error(Some(format!("{n} issues detected"))), }; Health { status, checks } diff --git a/src/lib.rs b/src/lib.rs index d24f635..dc0efe7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,14 +14,17 @@ pub fn router() -> axum::Router { .nest("/sse", sse::router()) } -#[derive(Debug, Clone, Default, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] #[serde(rename_all = "lowercase", tag = "status", content = "output")] pub enum Status { - Pass, - Warn(Option), - Fail(Option), - #[default] - Unknown, + Ok, + Error(Option), +} + +impl Default for Status { + fn default() -> Self { + Status::Error(Some("Unknown".to_string())) + } } impl Status { @@ -36,7 +39,7 @@ impl Status { impl From for Status { fn from(value: T) -> Self { - Status::Fail(Some(value.to_string())) + Status::Error(Some(value.to_string())) } } diff --git a/src/main.rs b/src/main.rs index 99af338..fbf27cb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,7 @@ use std::{collections::HashMap, fs::File, path::PathBuf, sync::Arc}; use tower_http::services::ServeDir; use tracing::level_filters::LevelFilter; -use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter}; +use tracing_subscriber::EnvFilter; use statsrv::service::Service; @@ -12,14 +12,10 @@ const DEFAULT_CONFIG: &str = "./config.toml"; #[tokio::main] async fn main() -> Result<(), Box> { - tracing_subscriber::registry() - .with( - EnvFilter::builder() - .with_default_directive(LevelFilter::INFO.into()) - .from_env_lossy(), - ) - .with(tracing_subscriber::fmt::layer()) - .init(); + let filter = EnvFilter::builder() + .with_default_directive(LevelFilter::INFO.into()) + .from_env_lossy(); + tracing_subscriber::fmt().with_env_filter(filter).init(); let config = match Config::parse() { Ok(c) => c, diff --git a/src/service.rs b/src/service.rs index c45fcb1..8e0863c 100644 --- a/src/service.rs +++ b/src/service.rs @@ -6,7 +6,7 @@ use serde::Deserialize; use systemd::Systemd; use tcp::Tcp; use tokio::{ - sync::watch::{Receiver, Sender}, + sync::watch::{channel, Receiver, Sender}, task::JoinHandle, }; use tokio_stream::wrappers::WatchStream; @@ -34,7 +34,7 @@ pub struct ServiceHandle { impl ServiceHandle { pub fn new(service: impl ServiceSpawner) -> Self { - let (tx, rx) = tokio::sync::watch::channel(Status::default()); + let (tx, rx) = channel(Status::Error(None)); let handle = tokio::spawn(service.spawn(tx)); Self { handle, rx } } diff --git a/src/service/http.rs b/src/service/http.rs index 7c875b9..8950096 100644 --- a/src/service/http.rs +++ b/src/service/http.rs @@ -32,14 +32,10 @@ impl ServiceSpawner for Http { .try_clone() .expect("Clone with no body should never fail"); let resp = client.execute(req).await; - let status = match resp.map(|r| r.status().as_u16()) { - Ok(code) if code == self.status_code => Status::Pass, - Ok(code) => Status::Fail(Some(format!("Status code: {code}"))), - Err(err) => { - tracing::error!("HTTP request error: {err}"); - Status::Unknown - } - }; + let status = resp.map_or_else(Into::into, |r| match r.status().as_u16() { + c if c == self.status_code => Status::Ok, + c => Status::Error(Some(format!("Status code: {c}"))), + }); tx.send_if_modified(|s| s.update(status)); } diff --git a/src/service/systemd.rs b/src/service/systemd.rs index 90213a0..ee220b8 100644 --- a/src/service/systemd.rs +++ b/src/service/systemd.rs @@ -21,17 +21,14 @@ impl ServiceSpawner for Systemd { loop { interval.tick().await; - let status = match command.output() { - Ok(output) if output.status.success() => Status::Pass, - Ok(output) => { - let stdout = String::from_utf8_lossy(&output.stdout).trim().to_string(); - Status::Fail(Some(format!("Service state: {}", stdout))) + let status = command.output().map_or_else(Into::into, |o| { + if o.status.success() { + Status::Ok + } else { + let stdout = String::from_utf8_lossy(&o.stdout).trim().to_string(); + Status::Error(Some(format!("Service state: {}", stdout))) } - Err(err) => { - tracing::error!("Failed to spawn process: {err}"); - Status::Unknown - } - }; + }); tx.send_if_modified(|s| s.update(status)); } diff --git a/src/service/tcp.rs b/src/service/tcp.rs index 42791bc..7b79afd 100644 --- a/src/service/tcp.rs +++ b/src/service/tcp.rs @@ -32,7 +32,7 @@ impl ServiceSpawner for Tcp { Ok(conn) => { // TODO: figure out how to wait for connection to close conn.ready(Interest::READABLE).await?; - tx.send_if_modified(|s| s.update(Status::Pass)); + tx.send_if_modified(|s| s.update(Status::Ok)); } Err(err) => { tx.send_if_modified(|s| s.update(err.into())); @@ -50,18 +50,18 @@ mod tests { #[tracing_test::traced_test] #[ignore] async fn test_tcp_watch() { - let (tx, mut rx) = tokio::sync::watch::channel(Status::default()); + let (tx, mut rx) = tokio::sync::watch::channel(Status::Error(None)); let tests = tokio::spawn(async move { - assert!(matches!(*rx.borrow_and_update(), Status::Unknown)); + assert!(matches!(*rx.borrow_and_update(), Status::Error(None))); rx.changed().await.unwrap(); - assert!(matches!(*rx.borrow_and_update(), Status::Pass)); + assert!(matches!(*rx.borrow_and_update(), Status::Ok)); rx.changed().await.unwrap(); assert_eq!( *rx.borrow_and_update(), - Status::Fail(Some(String::from("Disconnected"))) + Status::Error(Some(String::from("Disconnected"))) ); }); diff --git a/src/sse.rs b/src/sse.rs index b4a8840..88befd1 100644 --- a/src/sse.rs +++ b/src/sse.rs @@ -13,10 +13,10 @@ use tokio_stream::StreamExt; use crate::{service::ServiceHandles, Error}; pub fn router() -> Router { - axum::Router::new().route("/:name", get(sse_handler)) + axum::Router::new().route("/:name", get(service_events)) } -pub async fn sse_handler( +pub async fn service_events( Path(name): Path, State(services): State, ) -> Result>>, Error> { @@ -24,7 +24,12 @@ pub async fn sse_handler( .get(&name) .ok_or_else(|| Error::ServiceNotFound(name))? .into_stream() - .map(|s| Event::default().json_data(s)); + .map(|s| match s { + crate::Status::Ok => Event::default().event("ok"), + crate::Status::Error(None) => Event::default().event("error"), + crate::Status::Error(Some(msg)) => Event::default().event("error").data(msg), + }) + .map(Ok); Ok(Sse::new(stream).keep_alive(KeepAlive::default())) } -- cgit v1.2.3-70-g09d2