use std::{process::Stdio, time::Duration}; use serde::Deserialize; use tokio::{ io::{AsyncBufReadExt, BufReader}, sync::watch::Sender, }; use crate::{Error, Status}; use super::ServiceSpawner; #[derive(Debug, Clone, Deserialize)] pub struct Command { pub command: String, pub args: Vec, pub interval: Option, } impl Command { async fn spawn_interval( mut command: tokio::process::Command, period: Duration, tx: Sender, ) -> Result<(), Error> { let mut interval = tokio::time::interval(period); loop { interval.tick().await; let status = command.output().await.map_or_else(Into::into, |o| { if o.status.success() { Status::Ok } else { let stdout = String::from_utf8_lossy(&o.stdout).trim().to_string(); Status::Error(Some(format!("Service state: {}", stdout))) } }); tx.send_if_modified(|s| s.update(status)); } } async fn spawn_persist( mut command: tokio::process::Command, tx: Sender, ) -> Result<(), Error> { let mut child = command.stdout(Stdio::piped()).spawn()?; let mut stdout = BufReader::new(child.stdout.take().unwrap()).lines(); while let Some(line) = stdout.next_line().await? { let status: Status = serde_json::from_str(&line) .unwrap_or_else(|err| Status::Error(Some(format!("Serialization error: {err}")))); tx.send_if_modified(|s| s.update(status)); } let exit_status = child.wait().await?; let status = match exit_status.code() { Some(0) => Status::Ok, Some(code) => Status::Error(Some(format!("Exited with status code: {code}"))), None => Status::Error(Some("Process terminated by signal".to_string())), }; tx.send_if_modified(|s| s.update(status)); Ok(()) } } impl ServiceSpawner for Command { async fn spawn(self, tx: Sender) -> Result<(), Error> { let mut command = tokio::process::Command::new(self.command); command.args(self.args); if let Some(period) = self.interval { Self::spawn_interval(command, period, tx).await } else { Self::spawn_persist(command, tx).await } } }