From 8b9eb6eb88d871309348dff1527d69b4b32a98ec Mon Sep 17 00:00:00 2001 From: Toby Vincent Date: Sat, 12 Oct 2024 18:23:46 -0500 Subject: refactor: simplify service trait, again --- src/service/command.rs | 104 +++++++++++++++++++++---------------------------- 1 file changed, 45 insertions(+), 59 deletions(-) (limited to 'src/service/command.rs') diff --git a/src/service/command.rs b/src/service/command.rs index 3535ee2..6b44e1d 100644 --- a/src/service/command.rs +++ b/src/service/command.rs @@ -1,9 +1,12 @@ use std::{process::Stdio, time::Duration}; -use async_stream::stream; -use futures::{Stream, StreamExt}; -use serde::Deserialize; -use tokio::io::{AsyncBufReadExt, BufReader}; +use serde::{Deserialize, Serialize}; +use tokio::{ + io::{AsyncBufReadExt, BufReader}, + sync::watch::Sender, +}; + +use crate::Status; use super::IntoService; @@ -29,7 +32,7 @@ pub enum Error { NoStdout, } -#[derive(Debug, Clone, Deserialize)] +#[derive(Debug, Clone, Deserialize, Serialize)] pub struct Command { pub command: String, pub args: Vec, @@ -40,74 +43,57 @@ pub struct Command { } impl Command { - #[tracing::instrument] - fn persist( - mut interval: tokio::time::Interval, - mut command: tokio::process::Command, - ) -> impl Stream> { - stream! { - loop { - interval.tick().await; + async fn persist(&self, tx: Sender) -> Result<(), Error> { + let mut command = tokio::process::Command::new(&self.command); + command.args(&self.args); - let mut child = command - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .spawn()?; + let mut child = command + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn()?; - let mut stdout_reader = - BufReader::new(child.stdout.take().ok_or(Error::NoStdout)?).lines(); + let mut lines = BufReader::new(child.stdout.take().ok_or(Error::NoStdout)?).lines(); - while let Some(line) = stdout_reader.next_line().await? { - if "Ok" == line { - yield Ok(()); - } else { - yield Err(Error::Output(line)) - } - } + while let Some(line) = lines.next_line().await? { + let res = ("Ok" == line).then_some(()).ok_or(Error::Output(line)); + tx.send_if_modified(|s| s.update(res.into())); + } - match child.wait().await?.code() { - Some(0) => yield Ok(()), - Some(code) => yield Err(Error::PersistExitCode(code)), - None => yield Err(Error::Signal), - }; - } + match child.wait().await?.code() { + Some(0) => Ok(()), + Some(code) => Err(Error::PersistExitCode(code)), + None => Err(Error::Signal), } } - #[tracing::instrument] - fn interval( - mut interval: tokio::time::Interval, - mut command: tokio::process::Command, - ) -> impl Stream> { - stream! { - loop { - interval.tick().await; - let output = command.output().await?; - match output.status.code() { - Some(0) => yield Ok(()), - Some(code) => { - let stderr = String::from_utf8_lossy(&output.stderr).to_string(); - yield Err(Error::ExitCode { code, stderr }) - } - None => yield Err(Error::Signal), - } + async fn interval(&self) -> Result<(), Error> { + let mut command = tokio::process::Command::new(&self.command); + command.args(&self.args); + + let output = command.output().await?; + match output.status.code() { + Some(0) => Ok(()), + Some(code) => { + let stderr = String::from_utf8_lossy(&output.stderr).to_string(); + Err(Error::ExitCode { code, stderr }) } + None => Err(Error::Signal), } } } impl IntoService for Command { - type Error = Error; - - fn into_service(self) -> impl Stream> { - let interval = tokio::time::interval(self.interval); - let mut command = tokio::process::Command::new(self.command); - command.args(self.args); + async fn into_service(self, tx: tokio::sync::watch::Sender) { + let mut interval = tokio::time::interval(self.interval); + loop { + interval.tick().await; + let res = if self.persist { + self.persist(tx.clone()).await + } else { + self.interval().await + }; - if self.persist { - Self::persist(interval, command).boxed() - } else { - Self::interval(interval, command).boxed() + tx.send_if_modified(|s| s.update(res.into())); } } } -- cgit v1.2.3-70-g09d2