From b94f8e694bf01f5dba9ce2c01f589463a3dfbc69 Mon Sep 17 00:00:00 2001 From: Toby Vincent Date: Wed, 9 Oct 2024 18:23:58 -0500 Subject: feat!: rewrite to use traits and streams --- src/service/command.rs | 128 +++++++++++++++++++++++++++++++------------------ 1 file changed, 81 insertions(+), 47 deletions(-) (limited to 'src/service/command.rs') diff --git a/src/service/command.rs b/src/service/command.rs index 41a79b3..3535ee2 100644 --- a/src/service/command.rs +++ b/src/service/command.rs @@ -1,79 +1,113 @@ use std::{process::Stdio, time::Duration}; +use async_stream::stream; +use futures::{Stream, StreamExt}; use serde::Deserialize; -use tokio::{ - io::{AsyncBufReadExt, BufReader}, - sync::watch::Sender, -}; +use tokio::io::{AsyncBufReadExt, BufReader}; -use crate::{Error, Status}; +use super::IntoService; -use super::ServiceSpawner; +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("IO error: {0}")] + Io(#[from] std::io::Error), + #[error("Exited with status code: {code}\n{stderr}")] + ExitCode { code: i32, stderr: String }, + #[error("Process terminated by signal")] + Signal, + #[error("Serialization error: {0}")] + Serialization(#[from] serde_json::error::Error), + #[error("{0}")] + Stderr(String), + #[error("{0}")] + Output(String), + #[error("Exited with status code: {0}")] + PersistExitCode(i32), + #[error("Failed to get stderr of child process")] + NoStderr, + #[error("Failed to get stdout of child process")] + NoStdout, +} #[derive(Debug, Clone, Deserialize)] pub struct Command { pub command: String, pub args: Vec, - pub interval: Option, + #[serde(default)] + pub persist: bool, + #[serde(default = "super::default_interval")] + pub interval: Duration, } impl Command { - async fn spawn_interval( + #[tracing::instrument] + fn persist( + mut interval: tokio::time::Interval, mut command: tokio::process::Command, - period: Duration, - tx: Sender, - ) -> Result<(), Error> { - let mut interval = tokio::time::interval(period); - loop { - interval.tick().await; + ) -> impl Stream> { + stream! { + loop { + interval.tick().await; + + let mut child = command + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn()?; - let status = command.output().await.map_or_else(Into::into, |o| { - if o.status.success() { - Status::Ok - } else { - let stdout = String::from_utf8_lossy(&o.stdout).trim().to_string(); - Status::Error(Some(format!("Service state: {}", stdout))) + let mut stdout_reader = + BufReader::new(child.stdout.take().ok_or(Error::NoStdout)?).lines(); + + while let Some(line) = stdout_reader.next_line().await? { + if "Ok" == line { + yield Ok(()); + } else { + yield Err(Error::Output(line)) + } } - }); - tx.send_if_modified(|s| s.update(status)); + match child.wait().await?.code() { + Some(0) => yield Ok(()), + Some(code) => yield Err(Error::PersistExitCode(code)), + None => yield Err(Error::Signal), + }; + } } } - async fn spawn_persist( + #[tracing::instrument] + fn interval( + mut interval: tokio::time::Interval, mut command: tokio::process::Command, - tx: Sender, - ) -> Result<(), Error> { - let mut child = command.stdout(Stdio::piped()).spawn()?; - let mut stdout = BufReader::new(child.stdout.take().unwrap()).lines(); - - while let Some(line) = stdout.next_line().await? { - let status: Status = serde_json::from_str(&line) - .unwrap_or_else(|err| Status::Error(Some(format!("Serialization error: {err}")))); - tx.send_if_modified(|s| s.update(status)); + ) -> impl Stream> { + stream! { + loop { + interval.tick().await; + let output = command.output().await?; + match output.status.code() { + Some(0) => yield Ok(()), + Some(code) => { + let stderr = String::from_utf8_lossy(&output.stderr).to_string(); + yield Err(Error::ExitCode { code, stderr }) + } + None => yield Err(Error::Signal), + } + } } - - let exit_status = child.wait().await?; - let status = match exit_status.code() { - Some(0) => Status::Ok, - Some(code) => Status::Error(Some(format!("Exited with status code: {code}"))), - None => Status::Error(Some("Process terminated by signal".to_string())), - }; - - tx.send_if_modified(|s| s.update(status)); - Ok(()) } } -impl ServiceSpawner for Command { - async fn spawn(self, tx: Sender) -> Result<(), Error> { +impl IntoService for Command { + type Error = Error; + + fn into_service(self) -> impl Stream> { + let interval = tokio::time::interval(self.interval); let mut command = tokio::process::Command::new(self.command); command.args(self.args); - if let Some(period) = self.interval { - Self::spawn_interval(command, period, tx).await + if self.persist { + Self::persist(interval, command).boxed() } else { - Self::spawn_persist(command, tx).await + Self::interval(interval, command).boxed() } } } -- cgit v1.2.3-70-g09d2