summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorToby Vincent <tobyv@tobyvin.dev>2024-10-01 14:19:54 -0500
committerToby Vincent <tobyv@tobyvin.dev>2024-10-01 14:19:54 -0500
commit117d33fc478bf529094850b1fe40c558f04c9865 (patch)
tree223ad4091c71aa3f4f098524c7b27bf34298901f
parentfe16a923190243dfde5db6ceff2ef0bcf9158926 (diff)
feat: added command service
-rw-r--r--Cargo.lock10
-rw-r--r--Cargo.toml7
-rw-r--r--src/service.rs1
-rw-r--r--src/service/command.rs79
4 files changed, 91 insertions, 6 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 9b36084..4f4b2d7 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1088,6 +1088,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64"
[[package]]
+name = "signal-hook-registry"
+version = "1.4.2"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1"
+dependencies = [
+ "libc",
+]
+
+[[package]]
name = "slab"
version = "0.4.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1265,6 +1274,7 @@ dependencies = [
"libc",
"mio",
"pin-project-lite",
+ "signal-hook-registry",
"socket2",
"tokio-macros",
"windows-sys 0.52.0",
diff --git a/Cargo.toml b/Cargo.toml
index b70af54..b0e6613 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -15,12 +15,7 @@ reqwest = "0.12.7"
serde = { version = "1.0.210", features = ["derive"] }
serde_json = "1.0.128"
thiserror = "1.0.63"
-tokio = { version = "1.40.0", features = [
- "macros",
- "rt-multi-thread",
- "net",
- "time",
-] }
+tokio = { version = "1.40.0", features = ["macros", "rt-multi-thread", "net", "time", "process"] }
tokio-stream = { version = "0.1.16", features = ["sync"] }
toml = "0.8.19"
tower-http = { version = "0.6.1", features = ["fs", "trace"] }
diff --git a/src/service.rs b/src/service.rs
index 8e0863c..3e37503 100644
--- a/src/service.rs
+++ b/src/service.rs
@@ -16,6 +16,7 @@ use crate::{Error, Status};
pub mod http;
pub mod systemd;
pub mod tcp;
+pub mod command;
pub type ServiceHandles = Arc<HashMap<String, ServiceHandle>>;
diff --git a/src/service/command.rs b/src/service/command.rs
new file mode 100644
index 0000000..41a79b3
--- /dev/null
+++ b/src/service/command.rs
@@ -0,0 +1,79 @@
+use std::{process::Stdio, time::Duration};
+
+use serde::Deserialize;
+use tokio::{
+ io::{AsyncBufReadExt, BufReader},
+ sync::watch::Sender,
+};
+
+use crate::{Error, Status};
+
+use super::ServiceSpawner;
+
+#[derive(Debug, Clone, Deserialize)]
+pub struct Command {
+ pub command: String,
+ pub args: Vec<String>,
+ pub interval: Option<Duration>,
+}
+
+impl Command {
+ async fn spawn_interval(
+ mut command: tokio::process::Command,
+ period: Duration,
+ tx: Sender<Status>,
+ ) -> Result<(), Error> {
+ let mut interval = tokio::time::interval(period);
+ loop {
+ interval.tick().await;
+
+ let status = command.output().await.map_or_else(Into::into, |o| {
+ if o.status.success() {
+ Status::Ok
+ } else {
+ let stdout = String::from_utf8_lossy(&o.stdout).trim().to_string();
+ Status::Error(Some(format!("Service state: {}", stdout)))
+ }
+ });
+
+ tx.send_if_modified(|s| s.update(status));
+ }
+ }
+
+ async fn spawn_persist(
+ mut command: tokio::process::Command,
+ tx: Sender<Status>,
+ ) -> Result<(), Error> {
+ let mut child = command.stdout(Stdio::piped()).spawn()?;
+ let mut stdout = BufReader::new(child.stdout.take().unwrap()).lines();
+
+ while let Some(line) = stdout.next_line().await? {
+ let status: Status = serde_json::from_str(&line)
+ .unwrap_or_else(|err| Status::Error(Some(format!("Serialization error: {err}"))));
+ tx.send_if_modified(|s| s.update(status));
+ }
+
+ let exit_status = child.wait().await?;
+ let status = match exit_status.code() {
+ Some(0) => Status::Ok,
+ Some(code) => Status::Error(Some(format!("Exited with status code: {code}"))),
+ None => Status::Error(Some("Process terminated by signal".to_string())),
+ };
+
+ tx.send_if_modified(|s| s.update(status));
+ Ok(())
+ }
+}
+
+impl ServiceSpawner for Command {
+ async fn spawn(self, tx: Sender<Status>) -> Result<(), Error> {
+ let mut command = tokio::process::Command::new(self.command);
+ command.args(self.args);
+
+ if let Some(period) = self.interval {
+ Self::spawn_interval(command, period, tx).await
+ } else {
+ Self::spawn_persist(command, tx).await
+ }
+ }
+}