From 8b9eb6eb88d871309348dff1527d69b4b32a98ec Mon Sep 17 00:00:00 2001 From: Toby Vincent Date: Sat, 12 Oct 2024 18:23:46 -0500 Subject: refactor: simplify service trait, again --- config.toml | 10 +++-- src/api.rs | 7 ++++ src/main.rs | 2 +- src/service.rs | 57 +++++---------------------- src/service/command.rs | 104 +++++++++++++++++++++---------------------------- src/service/http.rs | 51 ++++++++++++------------ src/service/tcp.rs | 42 ++++++++++---------- src/state.rs | 24 ++++-------- 8 files changed, 124 insertions(+), 173 deletions(-) diff --git a/config.toml b/config.toml index 555502f..e91e94f 100644 --- a/config.toml +++ b/config.toml @@ -3,12 +3,16 @@ address = "127.0.0.1:8080" [[services]] name = "tobyvin.dev" -http = { url = "https://tobyvin.dev" } +kind = "http" +url = "https://tobyvin.dev" [[services]] name = "test_cmd" -exec = { command = "stat", args = ["/tmp/test_file"] } +kind = "exec" +command = "stat" +args = ["/tmp/test_file"] [[services]] name = "dns" -tcp = { address = "10.42.0.1:53" } +kind = "tcp" +address = "10.42.0.1:53" diff --git a/src/api.rs b/src/api.rs index bab2043..57dc4b7 100644 --- a/src/api.rs +++ b/src/api.rs @@ -1,3 +1,5 @@ +use axum::{extract::State, routing::get, Json}; + use crate::AppState; pub mod services; @@ -7,4 +9,9 @@ pub fn router() -> axum::Router { axum::Router::new() .nest("/sse", sse::router()) .nest("/status", services::router()) + .route("/list", get(names)) +} + +pub async fn names(State(state): State) -> Json> { + Json(state.names()) } diff --git a/src/main.rs b/src/main.rs index d5a35b7..df69e16 100644 --- a/src/main.rs +++ b/src/main.rs @@ -30,7 +30,7 @@ async fn main() -> Result<(), Box> { router = router.layer(tower_http::trace::TraceLayer::new_for_http()); - let listener = tokio::net::TcpListener::bind(config.address).await.unwrap(); + let listener = tokio::net::TcpListener::bind(config.address).await?; tracing::info!("listening on {}", listener.local_addr().unwrap()); axum::serve(listener, router).await.map_err(Into::into) diff --git a/src/service.rs b/src/service.rs index 9ca9fc6..b67ba51 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1,8 +1,4 @@ -use std::collections::HashMap; - -use futures::{StreamExt, TryStreamExt}; use serde::Deserialize; -use tokio_stream::{Stream, StreamMap}; use crate::Status; @@ -10,35 +6,11 @@ pub mod command; pub mod http; pub mod tcp; -pub type ServiceHandles = HashMap; - pub trait IntoService { - type Error: std::error::Error + Sync + Send + Sized; - - fn into_service(self) -> impl Stream> + Send; -} - -pub trait IntoServiceMap { - type Error: std::error::Error + Sync + Send + Sized; - - fn into_service_map(self) -> impl Stream)> + Send; -} - -impl IntoServiceMap for T -where - T: IntoIterator, - V: IntoService, - K: std::hash::Hash + std::cmp::Eq + std::clone::Clone + std::marker::Unpin + std::marker::Send, -{ - type Error = V::Error; - - fn into_service_map(self) -> impl Stream)> + Send { - let mut map = StreamMap::new(); - for (name, srv) in self.into_iter() { - map.insert(name, Box::pin(srv.into_service())); - } - map - } + fn into_service( + self, + tx: tokio::sync::watch::Sender, + ) -> impl std::future::Future; } pub fn default_interval() -> std::time::Duration { @@ -54,30 +26,19 @@ pub struct ServiceConfig { #[derive(Debug, Clone, Deserialize)] #[serde(rename_all = "lowercase")] +#[serde(tag = "kind")] pub enum ServiceKind { Http(http::Http), Tcp(tcp::Tcp), Exec(command::Command), } -#[derive(Debug, thiserror::Error)] -pub enum ServiceError { - #[error(transparent)] - Http(#[from] http::Error), - #[error(transparent)] - Tcp(#[from] tcp::Error), - #[error(transparent)] - Command(#[from] command::Error), -} - impl IntoService for ServiceKind { - type Error = ServiceError; - - fn into_service(self) -> impl Stream> + Send { + async fn into_service(self, tx: tokio::sync::watch::Sender) { match self { - ServiceKind::Http(h) => h.into_service().map_err(ServiceError::from).boxed(), - ServiceKind::Tcp(t) => t.into_service().map_err(ServiceError::from).boxed(), - ServiceKind::Exec(c) => c.into_service().map_err(ServiceError::from).boxed(), + ServiceKind::Http(h) => h.into_service(tx).await, + ServiceKind::Tcp(t) => t.into_service(tx).await, + ServiceKind::Exec(c) => c.into_service(tx).await, } } } diff --git a/src/service/command.rs b/src/service/command.rs index 3535ee2..6b44e1d 100644 --- a/src/service/command.rs +++ b/src/service/command.rs @@ -1,9 +1,12 @@ use std::{process::Stdio, time::Duration}; -use async_stream::stream; -use futures::{Stream, StreamExt}; -use serde::Deserialize; -use tokio::io::{AsyncBufReadExt, BufReader}; +use serde::{Deserialize, Serialize}; +use tokio::{ + io::{AsyncBufReadExt, BufReader}, + sync::watch::Sender, +}; + +use crate::Status; use super::IntoService; @@ -29,7 +32,7 @@ pub enum Error { NoStdout, } -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone, Deserialize, Serialize)] pub struct Command { pub command: String, pub args: Vec, @@ -40,74 +43,57 @@ pub struct Command { } impl Command { - #[tracing::instrument] - fn persist( - mut interval: tokio::time::Interval, - mut command: tokio::process::Command, - ) -> impl Stream> { - stream! { - loop { - interval.tick().await; + async fn persist(&self, tx: Sender) -> Result<(), Error> { + let mut command = tokio::process::Command::new(&self.command); + command.args(&self.args); - let mut child = command - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn()?; + let mut child = command + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn()?; - let mut stdout_reader = - BufReader::new(child.stdout.take().ok_or(Error::NoStdout)?).lines(); + let mut lines = BufReader::new(child.stdout.take().ok_or(Error::NoStdout)?).lines(); - while let Some(line) = stdout_reader.next_line().await? { - if "Ok" == line { - yield Ok(()); - } else { - yield Err(Error::Output(line)) - } - } + while let Some(line) = lines.next_line().await? { + let res = ("Ok" == line).then_some(()).ok_or(Error::Output(line)); + tx.send_if_modified(|s| s.update(res.into())); + } - match child.wait().await?.code() { - Some(0) => yield Ok(()), - Some(code) => yield Err(Error::PersistExitCode(code)), - None => yield Err(Error::Signal), - }; - } + match child.wait().await?.code() { + Some(0) => Ok(()), + Some(code) => Err(Error::PersistExitCode(code)), + None => Err(Error::Signal), } } - #[tracing::instrument] - fn interval( - mut interval: tokio::time::Interval, - mut command: tokio::process::Command, - ) -> impl Stream> { - stream! { - loop { - interval.tick().await; - let output = command.output().await?; - match output.status.code() { - Some(0) => yield Ok(()), - Some(code) => { - let stderr = String::from_utf8_lossy(&output.stderr).to_string(); - yield Err(Error::ExitCode { code, stderr }) - } - None => yield Err(Error::Signal), - } + async fn interval(&self) -> Result<(), Error> { + let mut command = tokio::process::Command::new(&self.command); + command.args(&self.args); + + let output = command.output().await?; + match output.status.code() { + Some(0) => Ok(()), + Some(code) => { + let stderr = String::from_utf8_lossy(&output.stderr).to_string(); + Err(Error::ExitCode { code, stderr }) } + None => Err(Error::Signal), } } } impl IntoService for Command { - type Error = Error; - - fn into_service(self) -> impl Stream> { - let interval = tokio::time::interval(self.interval); - let mut command = tokio::process::Command::new(self.command); - command.args(self.args); + async fn into_service(self, tx: tokio::sync::watch::Sender) { + let mut interval = tokio::time::interval(self.interval); + loop { + interval.tick().await; + let res = if self.persist { + self.persist(tx.clone()).await + } else { + self.interval().await + }; - if self.persist { - Self::persist(interval, command).boxed() - } else { - Self::interval(interval, command).boxed() + tx.send_if_modified(|s| s.update(res.into())); } } } diff --git a/src/service/http.rs b/src/service/http.rs index c4fcee7..bfba007 100644 --- a/src/service/http.rs +++ b/src/service/http.rs @@ -1,11 +1,12 @@ use std::{fmt::Display, time::Duration}; -use async_stream::try_stream; use axum::http::status::StatusCode; -use futures::Stream; -use serde::Deserialize; +use reqwest::Client; +use serde::{Deserialize, Serialize}; use url::Url; +use crate::Status; + use super::IntoService; #[derive(Debug, thiserror::Error)] @@ -16,7 +17,7 @@ pub enum Error { StatusCode(u16), } -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone, Deserialize, Serialize)] pub struct Http { pub url: Url, #[serde(default)] @@ -29,32 +30,34 @@ pub struct Http { pub interval: Duration, } -impl IntoService for Http { - type Error = Error; +impl Http { + async fn check(&self) -> Result<(), Error> { + let client = match self.client.as_ref() { + Some(client) => client, + None => &Client::new(), + }; + let req = client + .request(self.method.into(), self.url.clone()) + .build()?; + let status_code = client.execute(req).await?.status().as_u16(); + (status_code == self.status_code) + .then_some(()) + .ok_or_else(|| Error::StatusCode(status_code)) + } +} - fn into_service(self) -> impl Stream> { +impl IntoService for Http { + async fn into_service(self, tx: tokio::sync::watch::Sender) { let mut interval = tokio::time::interval(self.interval); - - try_stream! { - let client = self.client.unwrap_or_default(); - let req = client.request(self.method.into(), self.url).build()?; - loop { - interval.tick().await; - let req = req - .try_clone() - .expect("Clone with no body should never fail"); - let status_code = client.execute(req).await?.status().as_u16(); - if status_code == self.status_code { - yield (); - } else { - Err(Error::StatusCode(status_code))? - } - } + loop { + interval.tick().await; + let res = self.check().await; + tx.send_if_modified(|s| s.update(res.into())); } } } -#[derive(Debug, Clone, Copy, Default, Deserialize)] +#[derive(Debug, Clone, Copy, Default, Deserialize, Serialize)] pub enum Method { #[serde(alias = "get", alias = "GET")] #[default] diff --git a/src/service/tcp.rs b/src/service/tcp.rs index 6556af0..e28b19d 100644 --- a/src/service/tcp.rs +++ b/src/service/tcp.rs @@ -1,15 +1,13 @@ use std::{fmt::Display, net::SocketAddr, time::Duration}; -use async_stream::try_stream; -use futures::Stream; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use tokio::{io::Interest, net::TcpSocket}; -use super::IntoService; +use crate::Status; -pub(crate) type Error = std::io::Error; +use super::IntoService; -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone, Deserialize, Serialize)] pub struct Tcp { pub address: SocketAddr, #[serde(default = "super::default_interval")] @@ -22,24 +20,26 @@ impl Display for Tcp { } } -impl IntoService for Tcp { - type Error = Error; - - fn into_service(self) -> impl Stream> { - let mut interval = tokio::time::interval(self.interval); +impl Tcp { + async fn check(address: SocketAddr) -> Result<(), std::io::Error> { + let sock = TcpSocket::new_v4()?; + sock.set_keepalive(true)?; - try_stream! { - loop { - interval.tick().await; + let conn = sock.connect(address).await?; + // TODO: figure out how to wait for connection to close + conn.ready(Interest::READABLE).await?; + Ok(()) + } +} - let sock = TcpSocket::new_v4()?; - sock.set_keepalive(true)?; +impl IntoService for Tcp { + async fn into_service(self, tx: tokio::sync::watch::Sender) { + let mut interval = tokio::time::interval(self.interval); - let conn = sock.connect(self.address).await?; - // TODO: figure out how to wait for connection to close - conn.ready(Interest::READABLE).await?; - yield (); - } + loop { + interval.tick().await; + let res = Self::check(self.address).await; + tx.send_if_modified(|s| s.update(res.into())); } } } diff --git a/src/state.rs b/src/state.rs index c442d9d..857b8af 100644 --- a/src/state.rs +++ b/src/state.rs @@ -5,7 +5,7 @@ use tokio::sync::watch::Receiver; use tokio_stream::wrappers::WatchStream; use crate::{ - service::{IntoService, ServiceConfig, ServiceKind}, + service::{IntoService, ServiceConfig}, Status, }; @@ -22,30 +22,20 @@ impl AppState { .into_iter() .map(|ServiceConfig { name, kind }| { indexes.push(name.clone()); + tracing::debug!(name, "Added service"); let (tx, rx) = tokio::sync::watch::channel(Status::default()); - tokio::spawn(Self::spawn_service(kind, tx.clone())); + tokio::spawn(kind.into_service(tx.clone())); (name, rx) }) .collect(); + tracing::debug!(?indexes, "Finished spawning services"); + AppState { rx_map, indexes } } - #[tracing::instrument(skip(tx))] - async fn spawn_service(kind: ServiceKind, tx: tokio::sync::watch::Sender) { - let mut stream = kind.into_service(); - while let Some(res) = stream.next().await { - let status = res.into(); - tx.send_if_modified(|s| { - if *s != status { - tracing::debug!(?status, "Updated service status"); - *s = status; - true - } else { - false - } - }); - } + pub fn names(&self) -> Vec { + self.indexes.clone() } pub fn status(&self, k: &str) -> Option { -- cgit v1.2.3-70-g09d2