diff options
author | Toby Vincent <tobyv@tobyvin.dev> | 2024-10-12 18:23:46 -0500 |
---|---|---|
committer | Toby Vincent <tobyv@tobyvin.dev> | 2024-10-12 18:23:46 -0500 |
commit | 8b9eb6eb88d871309348dff1527d69b4b32a98ec (patch) | |
tree | 4a8d25f3b0db4a6ff7c258f3ea7a508e6b68de82 /src/service | |
parent | 0ea877c5d0de10b45768da80c658785835d625e6 (diff) |
refactor: simplify service trait, again
Diffstat (limited to 'src/service')
-rw-r--r-- | src/service/command.rs | 104 | ||||
-rw-r--r-- | src/service/http.rs | 51 | ||||
-rw-r--r-- | src/service/tcp.rs | 42 |
3 files changed, 93 insertions, 104 deletions
diff --git a/src/service/command.rs b/src/service/command.rs index 3535ee2..6b44e1d 100644 --- a/src/service/command.rs +++ b/src/service/command.rs @@ -1,9 +1,12 @@ use std::{process::Stdio, time::Duration}; -use async_stream::stream; -use futures::{Stream, StreamExt}; -use serde::Deserialize; -use tokio::io::{AsyncBufReadExt, BufReader}; +use serde::{Deserialize, Serialize}; +use tokio::{ + io::{AsyncBufReadExt, BufReader}, + sync::watch::Sender, +}; + +use crate::Status; use super::IntoService; @@ -29,7 +32,7 @@ pub enum Error { NoStdout, } -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone, Deserialize, Serialize)] pub struct Command { pub command: String, pub args: Vec<String>, @@ -40,74 +43,57 @@ pub struct Command { } impl Command { - #[tracing::instrument] - fn persist( - mut interval: tokio::time::Interval, - mut command: tokio::process::Command, - ) -> impl Stream<Item = Result<(), Error>> { - stream! { - loop { - interval.tick().await; + async fn persist(&self, tx: Sender<Status>) -> Result<(), Error> { + let mut command = tokio::process::Command::new(&self.command); + command.args(&self.args); - let mut child = command - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn()?; + let mut child = command + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn()?; - let mut stdout_reader = - BufReader::new(child.stdout.take().ok_or(Error::NoStdout)?).lines(); + let mut lines = BufReader::new(child.stdout.take().ok_or(Error::NoStdout)?).lines(); - while let Some(line) = stdout_reader.next_line().await? { - if "Ok" == line { - yield Ok(()); - } else { - yield Err(Error::Output(line)) - } - } + while let Some(line) = lines.next_line().await? { + let res = ("Ok" == line).then_some(()).ok_or(Error::Output(line)); + tx.send_if_modified(|s| s.update(res.into())); + } - match child.wait().await?.code() { - Some(0) => yield Ok(()), - Some(code) => yield Err(Error::PersistExitCode(code)), - None => yield Err(Error::Signal), - }; - } + match child.wait().await?.code() { + Some(0) => Ok(()), + Some(code) => Err(Error::PersistExitCode(code)), + None => Err(Error::Signal), } } - #[tracing::instrument] - fn interval( - mut interval: tokio::time::Interval, - mut command: tokio::process::Command, - ) -> impl Stream<Item = Result<(), Error>> { - stream! { - loop { - interval.tick().await; - let output = command.output().await?; - match output.status.code() { - Some(0) => yield Ok(()), - Some(code) => { - let stderr = String::from_utf8_lossy(&output.stderr).to_string(); - yield Err(Error::ExitCode { code, stderr }) - } - None => yield Err(Error::Signal), - } + async fn interval(&self) -> Result<(), Error> { + let mut command = tokio::process::Command::new(&self.command); + command.args(&self.args); + + let output = command.output().await?; + match output.status.code() { + Some(0) => Ok(()), + Some(code) => { + let stderr = String::from_utf8_lossy(&output.stderr).to_string(); + Err(Error::ExitCode { code, stderr }) } + None => Err(Error::Signal), } } } impl IntoService for Command { - type Error = Error; - - fn into_service(self) -> impl Stream<Item = Result<(), Self::Error>> { - let interval = tokio::time::interval(self.interval); - let mut command = tokio::process::Command::new(self.command); - command.args(self.args); + async fn into_service(self, tx: tokio::sync::watch::Sender<Status>) { + let mut interval = tokio::time::interval(self.interval); + loop { + interval.tick().await; + let res = if self.persist { + self.persist(tx.clone()).await + } else { + self.interval().await + }; - if self.persist { - Self::persist(interval, command).boxed() - } else { - Self::interval(interval, command).boxed() + tx.send_if_modified(|s| s.update(res.into())); } } } diff --git a/src/service/http.rs b/src/service/http.rs index c4fcee7..bfba007 100644 --- a/src/service/http.rs +++ b/src/service/http.rs @@ -1,11 +1,12 @@ use std::{fmt::Display, time::Duration}; -use async_stream::try_stream; use axum::http::status::StatusCode; -use futures::Stream; -use serde::Deserialize; +use reqwest::Client; +use serde::{Deserialize, Serialize}; use url::Url; +use crate::Status; + use super::IntoService; #[derive(Debug, thiserror::Error)] @@ -16,7 +17,7 @@ pub enum Error { StatusCode(u16), } -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone, Deserialize, Serialize)] pub struct Http { pub url: Url, #[serde(default)] @@ -29,32 +30,34 @@ pub struct Http { pub interval: Duration, } -impl IntoService for Http { - type Error = Error; +impl Http { + async fn check(&self) -> Result<(), Error> { + let client = match self.client.as_ref() { + Some(client) => client, + None => &Client::new(), + }; + let req = client + .request(self.method.into(), self.url.clone()) + .build()?; + let status_code = client.execute(req).await?.status().as_u16(); + (status_code == self.status_code) + .then_some(()) + .ok_or_else(|| Error::StatusCode(status_code)) + } +} - fn into_service(self) -> impl Stream<Item = Result<(), Self::Error>> { +impl IntoService for Http { + async fn into_service(self, tx: tokio::sync::watch::Sender<Status>) { let mut interval = tokio::time::interval(self.interval); - - try_stream! { - let client = self.client.unwrap_or_default(); - let req = client.request(self.method.into(), self.url).build()?; - loop { - interval.tick().await; - let req = req - .try_clone() - .expect("Clone with no body should never fail"); - let status_code = client.execute(req).await?.status().as_u16(); - if status_code == self.status_code { - yield (); - } else { - Err(Error::StatusCode(status_code))? - } - } + loop { + interval.tick().await; + let res = self.check().await; + tx.send_if_modified(|s| s.update(res.into())); } } } -#[derive(Debug, Clone, Copy, Default, Deserialize)] +#[derive(Debug, Clone, Copy, Default, Deserialize, Serialize)] pub enum Method { #[serde(alias = "get", alias = "GET")] #[default] diff --git a/src/service/tcp.rs b/src/service/tcp.rs index 6556af0..e28b19d 100644 --- a/src/service/tcp.rs +++ b/src/service/tcp.rs @@ -1,15 +1,13 @@ use std::{fmt::Display, net::SocketAddr, time::Duration}; -use async_stream::try_stream; -use futures::Stream; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use tokio::{io::Interest, net::TcpSocket}; -use super::IntoService; +use crate::Status; -pub(crate) type Error = std::io::Error; +use super::IntoService; -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone, Deserialize, Serialize)] pub struct Tcp { pub address: SocketAddr, #[serde(default = "super::default_interval")] @@ -22,24 +20,26 @@ impl Display for Tcp { } } -impl IntoService for Tcp { - type Error = Error; - - fn into_service(self) -> impl Stream<Item = Result<(), Self::Error>> { - let mut interval = tokio::time::interval(self.interval); +impl Tcp { + async fn check(address: SocketAddr) -> Result<(), std::io::Error> { + let sock = TcpSocket::new_v4()?; + sock.set_keepalive(true)?; - try_stream! { - loop { - interval.tick().await; + let conn = sock.connect(address).await?; + // TODO: figure out how to wait for connection to close + conn.ready(Interest::READABLE).await?; + Ok(()) + } +} - let sock = TcpSocket::new_v4()?; - sock.set_keepalive(true)?; +impl IntoService for Tcp { + async fn into_service(self, tx: tokio::sync::watch::Sender<Status>) { + let mut interval = tokio::time::interval(self.interval); - let conn = sock.connect(self.address).await?; - // TODO: figure out how to wait for connection to close - conn.ready(Interest::READABLE).await?; - yield (); - } + loop { + interval.tick().await; + let res = Self::check(self.address).await; + tx.send_if_modified(|s| s.update(res.into())); } } } |