From 117d33fc478bf529094850b1fe40c558f04c9865 Mon Sep 17 00:00:00 2001 From: Toby Vincent Date: Tue, 1 Oct 2024 14:19:54 -0500 Subject: feat: added command service --- src/service.rs | 1 + src/service/command.rs | 79 ++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 80 insertions(+) create mode 100644 src/service/command.rs (limited to 'src') diff --git a/src/service.rs b/src/service.rs index 8e0863c..3e37503 100644 --- a/src/service.rs +++ b/src/service.rs @@ -16,6 +16,7 @@ use crate::{Error, Status}; pub mod http; pub mod systemd; pub mod tcp; +pub mod command; pub type ServiceHandles = Arc>; diff --git a/src/service/command.rs b/src/service/command.rs new file mode 100644 index 0000000..41a79b3 --- /dev/null +++ b/src/service/command.rs @@ -0,0 +1,79 @@ +use std::{process::Stdio, time::Duration}; + +use serde::Deserialize; +use tokio::{ + io::{AsyncBufReadExt, BufReader}, + sync::watch::Sender, +}; + +use crate::{Error, Status}; + +use super::ServiceSpawner; + +#[derive(Debug, Clone, Deserialize)] +pub struct Command { + pub command: String, + pub args: Vec, + pub interval: Option, +} + +impl Command { + async fn spawn_interval( + mut command: tokio::process::Command, + period: Duration, + tx: Sender, + ) -> Result<(), Error> { + let mut interval = tokio::time::interval(period); + loop { + interval.tick().await; + + let status = command.output().await.map_or_else(Into::into, |o| { + if o.status.success() { + Status::Ok + } else { + let stdout = String::from_utf8_lossy(&o.stdout).trim().to_string(); + Status::Error(Some(format!("Service state: {}", stdout))) + } + }); + + tx.send_if_modified(|s| s.update(status)); + } + } + + async fn spawn_persist( + mut command: tokio::process::Command, + tx: Sender, + ) -> Result<(), Error> { + let mut child = command.stdout(Stdio::piped()).spawn()?; + let mut stdout = BufReader::new(child.stdout.take().unwrap()).lines(); + + while let Some(line) = stdout.next_line().await? { + let status: Status = serde_json::from_str(&line) + .unwrap_or_else(|err| Status::Error(Some(format!("Serialization error: {err}")))); + tx.send_if_modified(|s| s.update(status)); + } + + let exit_status = child.wait().await?; + let status = match exit_status.code() { + Some(0) => Status::Ok, + Some(code) => Status::Error(Some(format!("Exited with status code: {code}"))), + None => Status::Error(Some("Process terminated by signal".to_string())), + }; + + tx.send_if_modified(|s| s.update(status)); + Ok(()) + } +} + +impl ServiceSpawner for Command { + async fn spawn(self, tx: Sender) -> Result<(), Error> { + let mut command = tokio::process::Command::new(self.command); + command.args(self.args); + + if let Some(period) = self.interval { + Self::spawn_interval(command, period, tx).await + } else { + Self::spawn_persist(command, tx).await + } + } +} -- cgit v1.2.3-70-g09d2