summaryrefslogtreecommitdiffstats
path: root/src/service
diff options
context:
space:
mode:
authorToby Vincent <tobyv@tobyvin.dev>2024-10-01 14:19:54 -0500
committerToby Vincent <tobyv@tobyvin.dev>2024-10-01 14:19:54 -0500
commit117d33fc478bf529094850b1fe40c558f04c9865 (patch)
tree223ad4091c71aa3f4f098524c7b27bf34298901f /src/service
parentfe16a923190243dfde5db6ceff2ef0bcf9158926 (diff)
feat: added command service
Diffstat (limited to 'src/service')
-rw-r--r--src/service/command.rs79
1 files changed, 79 insertions, 0 deletions
diff --git a/src/service/command.rs b/src/service/command.rs
new file mode 100644
index 0000000..41a79b3
--- /dev/null
+++ b/src/service/command.rs
@@ -0,0 +1,79 @@
+use std::{process::Stdio, time::Duration};
+
+use serde::Deserialize;
+use tokio::{
+ io::{AsyncBufReadExt, BufReader},
+ sync::watch::Sender,
+};
+
+use crate::{Error, Status};
+
+use super::ServiceSpawner;
+
+#[derive(Debug, Clone, Deserialize)]
+pub struct Command {
+ pub command: String,
+ pub args: Vec<String>,
+ pub interval: Option<Duration>,
+}
+
+impl Command {
+ async fn spawn_interval(
+ mut command: tokio::process::Command,
+ period: Duration,
+ tx: Sender<Status>,
+ ) -> Result<(), Error> {
+ let mut interval = tokio::time::interval(period);
+ loop {
+ interval.tick().await;
+
+ let status = command.output().await.map_or_else(Into::into, |o| {
+ if o.status.success() {
+ Status::Ok
+ } else {
+ let stdout = String::from_utf8_lossy(&o.stdout).trim().to_string();
+ Status::Error(Some(format!("Service state: {}", stdout)))
+ }
+ });
+
+ tx.send_if_modified(|s| s.update(status));
+ }
+ }
+
+ async fn spawn_persist(
+ mut command: tokio::process::Command,
+ tx: Sender<Status>,
+ ) -> Result<(), Error> {
+ let mut child = command.stdout(Stdio::piped()).spawn()?;
+ let mut stdout = BufReader::new(child.stdout.take().unwrap()).lines();
+
+ while let Some(line) = stdout.next_line().await? {
+ let status: Status = serde_json::from_str(&line)
+ .unwrap_or_else(|err| Status::Error(Some(format!("Serialization error: {err}"))));
+ tx.send_if_modified(|s| s.update(status));
+ }
+
+ let exit_status = child.wait().await?;
+ let status = match exit_status.code() {
+ Some(0) => Status::Ok,
+ Some(code) => Status::Error(Some(format!("Exited with status code: {code}"))),
+ None => Status::Error(Some("Process terminated by signal".to_string())),
+ };
+
+ tx.send_if_modified(|s| s.update(status));
+ Ok(())
+ }
+}
+
+impl ServiceSpawner for Command {
+ async fn spawn(self, tx: Sender<Status>) -> Result<(), Error> {
+ let mut command = tokio::process::Command::new(self.command);
+ command.args(self.args);
+
+ if let Some(period) = self.interval {
+ Self::spawn_interval(command, period, tx).await
+ } else {
+ Self::spawn_persist(command, tx).await
+ }
+ }
+}