summaryrefslogtreecommitdiffstats
path: root/src/service
diff options
context:
space:
mode:
authorToby Vincent <tobyv@tobyvin.dev>2024-10-12 18:23:46 -0500
committerToby Vincent <tobyv@tobyvin.dev>2024-10-12 18:23:46 -0500
commit8b9eb6eb88d871309348dff1527d69b4b32a98ec (patch)
tree4a8d25f3b0db4a6ff7c258f3ea7a508e6b68de82 /src/service
parent0ea877c5d0de10b45768da80c658785835d625e6 (diff)
refactor: simplify service trait, again
Diffstat (limited to 'src/service')
-rw-r--r--src/service/command.rs104
-rw-r--r--src/service/http.rs51
-rw-r--r--src/service/tcp.rs42
3 files changed, 93 insertions, 104 deletions
diff --git a/src/service/command.rs b/src/service/command.rs
index 3535ee2..6b44e1d 100644
--- a/src/service/command.rs
+++ b/src/service/command.rs
@@ -1,9 +1,12 @@
use std::{process::Stdio, time::Duration};
-use async_stream::stream;
-use futures::{Stream, StreamExt};
-use serde::Deserialize;
-use tokio::io::{AsyncBufReadExt, BufReader};
+use serde::{Deserialize, Serialize};
+use tokio::{
+ io::{AsyncBufReadExt, BufReader},
+ sync::watch::Sender,
+};
+
+use crate::Status;
use super::IntoService;
@@ -29,7 +32,7 @@ pub enum Error {
NoStdout,
}
-#[derive(Debug, Clone, Deserialize)]
+#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Command {
pub command: String,
pub args: Vec<String>,
@@ -40,74 +43,57 @@ pub struct Command {
}
impl Command {
- #[tracing::instrument]
- fn persist(
- mut interval: tokio::time::Interval,
- mut command: tokio::process::Command,
- ) -> impl Stream<Item = Result<(), Error>> {
- stream! {
- loop {
- interval.tick().await;
+ async fn persist(&self, tx: Sender<Status>) -> Result<(), Error> {
+ let mut command = tokio::process::Command::new(&self.command);
+ command.args(&self.args);
- let mut child = command
- .stdout(Stdio::piped())
- .stderr(Stdio::piped())
- .spawn()?;
+ let mut child = command
+ .stdout(Stdio::piped())
+ .stderr(Stdio::piped())
+ .spawn()?;
- let mut stdout_reader =
- BufReader::new(child.stdout.take().ok_or(Error::NoStdout)?).lines();
+ let mut lines = BufReader::new(child.stdout.take().ok_or(Error::NoStdout)?).lines();
- while let Some(line) = stdout_reader.next_line().await? {
- if "Ok" == line {
- yield Ok(());
- } else {
- yield Err(Error::Output(line))
- }
- }
+ while let Some(line) = lines.next_line().await? {
+ let res = ("Ok" == line).then_some(()).ok_or(Error::Output(line));
+ tx.send_if_modified(|s| s.update(res.into()));
+ }
- match child.wait().await?.code() {
- Some(0) => yield Ok(()),
- Some(code) => yield Err(Error::PersistExitCode(code)),
- None => yield Err(Error::Signal),
- };
- }
+ match child.wait().await?.code() {
+ Some(0) => Ok(()),
+ Some(code) => Err(Error::PersistExitCode(code)),
+ None => Err(Error::Signal),
}
}
- #[tracing::instrument]
- fn interval(
- mut interval: tokio::time::Interval,
- mut command: tokio::process::Command,
- ) -> impl Stream<Item = Result<(), Error>> {
- stream! {
- loop {
- interval.tick().await;
- let output = command.output().await?;
- match output.status.code() {
- Some(0) => yield Ok(()),
- Some(code) => {
- let stderr = String::from_utf8_lossy(&output.stderr).to_string();
- yield Err(Error::ExitCode { code, stderr })
- }
- None => yield Err(Error::Signal),
- }
+ async fn interval(&self) -> Result<(), Error> {
+ let mut command = tokio::process::Command::new(&self.command);
+ command.args(&self.args);
+
+ let output = command.output().await?;
+ match output.status.code() {
+ Some(0) => Ok(()),
+ Some(code) => {
+ let stderr = String::from_utf8_lossy(&output.stderr).to_string();
+ Err(Error::ExitCode { code, stderr })
}
+ None => Err(Error::Signal),
}
}
}
impl IntoService for Command {
- type Error = Error;
-
- fn into_service(self) -> impl Stream<Item = Result<(), Self::Error>> {
- let interval = tokio::time::interval(self.interval);
- let mut command = tokio::process::Command::new(self.command);
- command.args(self.args);
+ async fn into_service(self, tx: tokio::sync::watch::Sender<Status>) {
+ let mut interval = tokio::time::interval(self.interval);
+ loop {
+ interval.tick().await;
+ let res = if self.persist {
+ self.persist(tx.clone()).await
+ } else {
+ self.interval().await
+ };
- if self.persist {
- Self::persist(interval, command).boxed()
- } else {
- Self::interval(interval, command).boxed()
+ tx.send_if_modified(|s| s.update(res.into()));
}
}
}
diff --git a/src/service/http.rs b/src/service/http.rs
index c4fcee7..bfba007 100644
--- a/src/service/http.rs
+++ b/src/service/http.rs
@@ -1,11 +1,12 @@
use std::{fmt::Display, time::Duration};
-use async_stream::try_stream;
use axum::http::status::StatusCode;
-use futures::Stream;
-use serde::Deserialize;
+use reqwest::Client;
+use serde::{Deserialize, Serialize};
use url::Url;
+use crate::Status;
+
use super::IntoService;
#[derive(Debug, thiserror::Error)]
@@ -16,7 +17,7 @@ pub enum Error {
StatusCode(u16),
}
-#[derive(Debug, Clone, Deserialize)]
+#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Http {
pub url: Url,
#[serde(default)]
@@ -29,32 +30,34 @@ pub struct Http {
pub interval: Duration,
}
-impl IntoService for Http {
- type Error = Error;
+impl Http {
+ async fn check(&self) -> Result<(), Error> {
+ let client = match self.client.as_ref() {
+ Some(client) => client,
+ None => &Client::new(),
+ };
+ let req = client
+ .request(self.method.into(), self.url.clone())
+ .build()?;
+ let status_code = client.execute(req).await?.status().as_u16();
+ (status_code == self.status_code)
+ .then_some(())
+ .ok_or_else(|| Error::StatusCode(status_code))
+ }
+}
- fn into_service(self) -> impl Stream<Item = Result<(), Self::Error>> {
+impl IntoService for Http {
+ async fn into_service(self, tx: tokio::sync::watch::Sender<Status>) {
let mut interval = tokio::time::interval(self.interval);
-
- try_stream! {
- let client = self.client.unwrap_or_default();
- let req = client.request(self.method.into(), self.url).build()?;
- loop {
- interval.tick().await;
- let req = req
- .try_clone()
- .expect("Clone with no body should never fail");
- let status_code = client.execute(req).await?.status().as_u16();
- if status_code == self.status_code {
- yield ();
- } else {
- Err(Error::StatusCode(status_code))?
- }
- }
+ loop {
+ interval.tick().await;
+ let res = self.check().await;
+ tx.send_if_modified(|s| s.update(res.into()));
}
}
}
-#[derive(Debug, Clone, Copy, Default, Deserialize)]
+#[derive(Debug, Clone, Copy, Default, Deserialize, Serialize)]
pub enum Method {
#[serde(alias = "get", alias = "GET")]
#[default]
diff --git a/src/service/tcp.rs b/src/service/tcp.rs
index 6556af0..e28b19d 100644
--- a/src/service/tcp.rs
+++ b/src/service/tcp.rs
@@ -1,15 +1,13 @@
use std::{fmt::Display, net::SocketAddr, time::Duration};
-use async_stream::try_stream;
-use futures::Stream;
-use serde::Deserialize;
+use serde::{Deserialize, Serialize};
use tokio::{io::Interest, net::TcpSocket};
-use super::IntoService;
+use crate::Status;
-pub(crate) type Error = std::io::Error;
+use super::IntoService;
-#[derive(Debug, Clone, Deserialize)]
+#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Tcp {
pub address: SocketAddr,
#[serde(default = "super::default_interval")]
@@ -22,24 +20,26 @@ impl Display for Tcp {
}
}
-impl IntoService for Tcp {
- type Error = Error;
-
- fn into_service(self) -> impl Stream<Item = Result<(), Self::Error>> {
- let mut interval = tokio::time::interval(self.interval);
+impl Tcp {
+ async fn check(address: SocketAddr) -> Result<(), std::io::Error> {
+ let sock = TcpSocket::new_v4()?;
+ sock.set_keepalive(true)?;
- try_stream! {
- loop {
- interval.tick().await;
+ let conn = sock.connect(address).await?;
+ // TODO: figure out how to wait for connection to close
+ conn.ready(Interest::READABLE).await?;
+ Ok(())
+ }
+}
- let sock = TcpSocket::new_v4()?;
- sock.set_keepalive(true)?;
+impl IntoService for Tcp {
+ async fn into_service(self, tx: tokio::sync::watch::Sender<Status>) {
+ let mut interval = tokio::time::interval(self.interval);
- let conn = sock.connect(self.address).await?;
- // TODO: figure out how to wait for connection to close
- conn.ready(Interest::READABLE).await?;
- yield ();
- }
+ loop {
+ interval.tick().await;
+ let res = Self::check(self.address).await;
+ tx.send_if_modified(|s| s.update(res.into()));
}
}
}