diff options
-rw-r--r-- | Cargo.lock | 10 | ||||
-rw-r--r-- | Cargo.toml | 7 | ||||
-rw-r--r-- | src/service.rs | 1 | ||||
-rw-r--r-- | src/service/command.rs | 79 |
4 files changed, 91 insertions, 6 deletions
@@ -1088,6 +1088,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" [[package]] +name = "signal-hook-registry" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1" +dependencies = [ + "libc", +] + +[[package]] name = "slab" version = "0.4.9" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1265,6 +1274,7 @@ dependencies = [ "libc", "mio", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.52.0", @@ -15,12 +15,7 @@ reqwest = "0.12.7" serde = { version = "1.0.210", features = ["derive"] } serde_json = "1.0.128" thiserror = "1.0.63" -tokio = { version = "1.40.0", features = [ - "macros", - "rt-multi-thread", - "net", - "time", -] } +tokio = { version = "1.40.0", features = ["macros", "rt-multi-thread", "net", "time", "process"] } tokio-stream = { version = "0.1.16", features = ["sync"] } toml = "0.8.19" tower-http = { version = "0.6.1", features = ["fs", "trace"] } diff --git a/src/service.rs b/src/service.rs index 8e0863c..3e37503 100644 --- a/src/service.rs +++ b/src/service.rs @@ -16,6 +16,7 @@ use crate::{Error, Status}; pub mod http; pub mod systemd; pub mod tcp; +pub mod command; pub type ServiceHandles = Arc<HashMap<String, ServiceHandle>>; diff --git a/src/service/command.rs b/src/service/command.rs new file mode 100644 index 0000000..41a79b3 --- /dev/null +++ b/src/service/command.rs @@ -0,0 +1,79 @@ +use std::{process::Stdio, time::Duration}; + +use serde::Deserialize; +use tokio::{ + io::{AsyncBufReadExt, BufReader}, + sync::watch::Sender, +}; + +use crate::{Error, Status}; + +use super::ServiceSpawner; + +#[derive(Debug, Clone, Deserialize)] +pub struct Command { + pub command: String, + pub args: Vec<String>, + pub interval: Option<Duration>, +} + +impl Command { + async fn spawn_interval( + mut command: tokio::process::Command, + period: Duration, + tx: Sender<Status>, + ) -> Result<(), Error> { + let mut interval = tokio::time::interval(period); + loop { + interval.tick().await; + + let status = command.output().await.map_or_else(Into::into, |o| { + if o.status.success() { + Status::Ok + } else { + let stdout = String::from_utf8_lossy(&o.stdout).trim().to_string(); + Status::Error(Some(format!("Service state: {}", stdout))) + } + }); + + tx.send_if_modified(|s| s.update(status)); + } + } + + async fn spawn_persist( + mut command: tokio::process::Command, + tx: Sender<Status>, + ) -> Result<(), Error> { + let mut child = command.stdout(Stdio::piped()).spawn()?; + let mut stdout = BufReader::new(child.stdout.take().unwrap()).lines(); + + while let Some(line) = stdout.next_line().await? { + let status: Status = serde_json::from_str(&line) + .unwrap_or_else(|err| Status::Error(Some(format!("Serialization error: {err}")))); + tx.send_if_modified(|s| s.update(status)); + } + + let exit_status = child.wait().await?; + let status = match exit_status.code() { + Some(0) => Status::Ok, + Some(code) => Status::Error(Some(format!("Exited with status code: {code}"))), + None => Status::Error(Some("Process terminated by signal".to_string())), + }; + + tx.send_if_modified(|s| s.update(status)); + Ok(()) + } +} + +impl ServiceSpawner for Command { + async fn spawn(self, tx: Sender<Status>) -> Result<(), Error> { + let mut command = tokio::process::Command::new(self.command); + command.args(self.args); + + if let Some(period) = self.interval { + Self::spawn_interval(command, period, tx).await + } else { + Self::spawn_persist(command, tx).await + } + } +} |