diff options
Diffstat (limited to 'src/service')
-rw-r--r-- | src/service/command.rs | 79 |
1 files changed, 79 insertions, 0 deletions
diff --git a/src/service/command.rs b/src/service/command.rs new file mode 100644 index 0000000..41a79b3 --- /dev/null +++ b/src/service/command.rs @@ -0,0 +1,79 @@ +use std::{process::Stdio, time::Duration}; + +use serde::Deserialize; +use tokio::{ + io::{AsyncBufReadExt, BufReader}, + sync::watch::Sender, +}; + +use crate::{Error, Status}; + +use super::ServiceSpawner; + +#[derive(Debug, Clone, Deserialize)] +pub struct Command { + pub command: String, + pub args: Vec<String>, + pub interval: Option<Duration>, +} + +impl Command { + async fn spawn_interval( + mut command: tokio::process::Command, + period: Duration, + tx: Sender<Status>, + ) -> Result<(), Error> { + let mut interval = tokio::time::interval(period); + loop { + interval.tick().await; + + let status = command.output().await.map_or_else(Into::into, |o| { + if o.status.success() { + Status::Ok + } else { + let stdout = String::from_utf8_lossy(&o.stdout).trim().to_string(); + Status::Error(Some(format!("Service state: {}", stdout))) + } + }); + + tx.send_if_modified(|s| s.update(status)); + } + } + + async fn spawn_persist( + mut command: tokio::process::Command, + tx: Sender<Status>, + ) -> Result<(), Error> { + let mut child = command.stdout(Stdio::piped()).spawn()?; + let mut stdout = BufReader::new(child.stdout.take().unwrap()).lines(); + + while let Some(line) = stdout.next_line().await? { + let status: Status = serde_json::from_str(&line) + .unwrap_or_else(|err| Status::Error(Some(format!("Serialization error: {err}")))); + tx.send_if_modified(|s| s.update(status)); + } + + let exit_status = child.wait().await?; + let status = match exit_status.code() { + Some(0) => Status::Ok, + Some(code) => Status::Error(Some(format!("Exited with status code: {code}"))), + None => Status::Error(Some("Process terminated by signal".to_string())), + }; + + tx.send_if_modified(|s| s.update(status)); + Ok(()) + } +} + +impl ServiceSpawner for Command { + async fn spawn(self, tx: Sender<Status>) -> Result<(), Error> { + let mut command = tokio::process::Command::new(self.command); + command.args(self.args); + + if let Some(period) = self.interval { + Self::spawn_interval(command, period, tx).await + } else { + Self::spawn_persist(command, tx).await + } + } +} |