summaryrefslogtreecommitdiffstats
path: root/src/service/command.rs
diff options
context:
space:
mode:
authorToby Vincent <tobyv@tobyvin.dev>2024-10-12 18:23:46 -0500
committerToby Vincent <tobyv@tobyvin.dev>2024-10-12 18:23:46 -0500
commit8b9eb6eb88d871309348dff1527d69b4b32a98ec (patch)
tree4a8d25f3b0db4a6ff7c258f3ea7a508e6b68de82 /src/service/command.rs
parent0ea877c5d0de10b45768da80c658785835d625e6 (diff)
refactor: simplify service trait, again
Diffstat (limited to 'src/service/command.rs')
-rw-r--r--src/service/command.rs104
1 files changed, 45 insertions, 59 deletions
diff --git a/src/service/command.rs b/src/service/command.rs
index 3535ee2..6b44e1d 100644
--- a/src/service/command.rs
+++ b/src/service/command.rs
@@ -1,9 +1,12 @@
use std::{process::Stdio, time::Duration};
-use async_stream::stream;
-use futures::{Stream, StreamExt};
-use serde::Deserialize;
-use tokio::io::{AsyncBufReadExt, BufReader};
+use serde::{Deserialize, Serialize};
+use tokio::{
+ io::{AsyncBufReadExt, BufReader},
+ sync::watch::Sender,
+};
+
+use crate::Status;
use super::IntoService;
@@ -29,7 +32,7 @@ pub enum Error {
NoStdout,
}
-#[derive(Debug, Clone, Deserialize)]
+#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Command {
pub command: String,
pub args: Vec<String>,
@@ -40,74 +43,57 @@ pub struct Command {
}
impl Command {
- #[tracing::instrument]
- fn persist(
- mut interval: tokio::time::Interval,
- mut command: tokio::process::Command,
- ) -> impl Stream<Item = Result<(), Error>> {
- stream! {
- loop {
- interval.tick().await;
+ async fn persist(&self, tx: Sender<Status>) -> Result<(), Error> {
+ let mut command = tokio::process::Command::new(&self.command);
+ command.args(&self.args);
- let mut child = command
- .stdout(Stdio::piped())
- .stderr(Stdio::piped())
- .spawn()?;
+ let mut child = command
+ .stdout(Stdio::piped())
+ .stderr(Stdio::piped())
+ .spawn()?;
- let mut stdout_reader =
- BufReader::new(child.stdout.take().ok_or(Error::NoStdout)?).lines();
+ let mut lines = BufReader::new(child.stdout.take().ok_or(Error::NoStdout)?).lines();
- while let Some(line) = stdout_reader.next_line().await? {
- if "Ok" == line {
- yield Ok(());
- } else {
- yield Err(Error::Output(line))
- }
- }
+ while let Some(line) = lines.next_line().await? {
+ let res = ("Ok" == line).then_some(()).ok_or(Error::Output(line));
+ tx.send_if_modified(|s| s.update(res.into()));
+ }
- match child.wait().await?.code() {
- Some(0) => yield Ok(()),
- Some(code) => yield Err(Error::PersistExitCode(code)),
- None => yield Err(Error::Signal),
- };
- }
+ match child.wait().await?.code() {
+ Some(0) => Ok(()),
+ Some(code) => Err(Error::PersistExitCode(code)),
+ None => Err(Error::Signal),
}
}
- #[tracing::instrument]
- fn interval(
- mut interval: tokio::time::Interval,
- mut command: tokio::process::Command,
- ) -> impl Stream<Item = Result<(), Error>> {
- stream! {
- loop {
- interval.tick().await;
- let output = command.output().await?;
- match output.status.code() {
- Some(0) => yield Ok(()),
- Some(code) => {
- let stderr = String::from_utf8_lossy(&output.stderr).to_string();
- yield Err(Error::ExitCode { code, stderr })
- }
- None => yield Err(Error::Signal),
- }
+ async fn interval(&self) -> Result<(), Error> {
+ let mut command = tokio::process::Command::new(&self.command);
+ command.args(&self.args);
+
+ let output = command.output().await?;
+ match output.status.code() {
+ Some(0) => Ok(()),
+ Some(code) => {
+ let stderr = String::from_utf8_lossy(&output.stderr).to_string();
+ Err(Error::ExitCode { code, stderr })
}
+ None => Err(Error::Signal),
}
}
}
impl IntoService for Command {
- type Error = Error;
-
- fn into_service(self) -> impl Stream<Item = Result<(), Self::Error>> {
- let interval = tokio::time::interval(self.interval);
- let mut command = tokio::process::Command::new(self.command);
- command.args(self.args);
+ async fn into_service(self, tx: tokio::sync::watch::Sender<Status>) {
+ let mut interval = tokio::time::interval(self.interval);
+ loop {
+ interval.tick().await;
+ let res = if self.persist {
+ self.persist(tx.clone()).await
+ } else {
+ self.interval().await
+ };
- if self.persist {
- Self::persist(interval, command).boxed()
- } else {
- Self::interval(interval, command).boxed()
+ tx.send_if_modified(|s| s.update(res.into()));
}
}
}