From 117d33fc478bf529094850b1fe40c558f04c9865 Mon Sep 17 00:00:00 2001 From: Toby Vincent Date: Tue, 1 Oct 2024 14:19:54 -0500 Subject: feat: added command service --- Cargo.lock | 10 +++++++ Cargo.toml | 7 +---- src/service.rs | 1 + src/service/command.rs | 79 ++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 91 insertions(+), 6 deletions(-) create mode 100644 src/service/command.rs diff --git a/Cargo.lock b/Cargo.lock index 9b36084..4f4b2d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1087,6 +1087,15 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook-registry" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1" +dependencies = [ + "libc", +] + [[package]] name = "slab" version = "0.4.9" @@ -1265,6 +1274,7 @@ dependencies = [ "libc", "mio", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.52.0", diff --git a/Cargo.toml b/Cargo.toml index b70af54..b0e6613 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,12 +15,7 @@ reqwest = "0.12.7" serde = { version = "1.0.210", features = ["derive"] } serde_json = "1.0.128" thiserror = "1.0.63" -tokio = { version = "1.40.0", features = [ - "macros", - "rt-multi-thread", - "net", - "time", -] } +tokio = { version = "1.40.0", features = ["macros", "rt-multi-thread", "net", "time", "process"] } tokio-stream = { version = "0.1.16", features = ["sync"] } toml = "0.8.19" tower-http = { version = "0.6.1", features = ["fs", "trace"] } diff --git a/src/service.rs b/src/service.rs index 8e0863c..3e37503 100644 --- a/src/service.rs +++ b/src/service.rs @@ -16,6 +16,7 @@ use crate::{Error, Status}; pub mod http; pub mod systemd; pub mod tcp; +pub mod command; pub type ServiceHandles = Arc>; diff --git a/src/service/command.rs b/src/service/command.rs new file mode 100644 index 0000000..41a79b3 --- /dev/null +++ b/src/service/command.rs @@ -0,0 +1,79 @@ +use std::{process::Stdio, time::Duration}; + +use serde::Deserialize; +use tokio::{ + io::{AsyncBufReadExt, BufReader}, + sync::watch::Sender, +}; + +use crate::{Error, Status}; + +use super::ServiceSpawner; + +#[derive(Debug, Clone, Deserialize)] +pub struct Command { + pub command: String, + pub args: Vec, + pub interval: Option, +} + +impl Command { + async fn spawn_interval( + mut command: tokio::process::Command, + period: Duration, + tx: Sender, + ) -> Result<(), Error> { + let mut interval = tokio::time::interval(period); + loop { + interval.tick().await; + + let status = command.output().await.map_or_else(Into::into, |o| { + if o.status.success() { + Status::Ok + } else { + let stdout = String::from_utf8_lossy(&o.stdout).trim().to_string(); + Status::Error(Some(format!("Service state: {}", stdout))) + } + }); + + tx.send_if_modified(|s| s.update(status)); + } + } + + async fn spawn_persist( + mut command: tokio::process::Command, + tx: Sender, + ) -> Result<(), Error> { + let mut child = command.stdout(Stdio::piped()).spawn()?; + let mut stdout = BufReader::new(child.stdout.take().unwrap()).lines(); + + while let Some(line) = stdout.next_line().await? { + let status: Status = serde_json::from_str(&line) + .unwrap_or_else(|err| Status::Error(Some(format!("Serialization error: {err}")))); + tx.send_if_modified(|s| s.update(status)); + } + + let exit_status = child.wait().await?; + let status = match exit_status.code() { + Some(0) => Status::Ok, + Some(code) => Status::Error(Some(format!("Exited with status code: {code}"))), + None => Status::Error(Some("Process terminated by signal".to_string())), + }; + + tx.send_if_modified(|s| s.update(status)); + Ok(()) + } +} + +impl ServiceSpawner for Command { + async fn spawn(self, tx: Sender) -> Result<(), Error> { + let mut command = tokio::process::Command::new(self.command); + command.args(self.args); + + if let Some(period) = self.interval { + Self::spawn_interval(command, period, tx).await + } else { + Self::spawn_persist(command, tx).await + } + } +} -- cgit v1.2.3-70-g09d2