summaryrefslogtreecommitdiffstats
path: root/src/service/command.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/service/command.rs')
-rw-r--r--src/service/command.rs128
1 files changed, 81 insertions, 47 deletions
diff --git a/src/service/command.rs b/src/service/command.rs
index 41a79b3..3535ee2 100644
--- a/src/service/command.rs
+++ b/src/service/command.rs
@@ -1,79 +1,113 @@
use std::{process::Stdio, time::Duration};
+use async_stream::stream;
+use futures::{Stream, StreamExt};
use serde::Deserialize;
-use tokio::{
- io::{AsyncBufReadExt, BufReader},
- sync::watch::Sender,
-};
+use tokio::io::{AsyncBufReadExt, BufReader};
-use crate::{Error, Status};
+use super::IntoService;
-use super::ServiceSpawner;
+#[derive(Debug, thiserror::Error)]
+pub enum Error {
+ #[error("IO error: {0}")]
+ Io(#[from] std::io::Error),
+ #[error("Exited with status code: {code}\n{stderr}")]
+ ExitCode { code: i32, stderr: String },
+ #[error("Process terminated by signal")]
+ Signal,
+ #[error("Serialization error: {0}")]
+ Serialization(#[from] serde_json::error::Error),
+ #[error("{0}")]
+ Stderr(String),
+ #[error("{0}")]
+ Output(String),
+ #[error("Exited with status code: {0}")]
+ PersistExitCode(i32),
+ #[error("Failed to get stderr of child process")]
+ NoStderr,
+ #[error("Failed to get stdout of child process")]
+ NoStdout,
+}
#[derive(Debug, Clone, Deserialize)]
pub struct Command {
pub command: String,
pub args: Vec<String>,
- pub interval: Option<Duration>,
+ #[serde(default)]
+ pub persist: bool,
+ #[serde(default = "super::default_interval")]
+ pub interval: Duration,
}
impl Command {
- async fn spawn_interval(
+ #[tracing::instrument]
+ fn persist(
+ mut interval: tokio::time::Interval,
mut command: tokio::process::Command,
- period: Duration,
- tx: Sender<Status>,
- ) -> Result<(), Error> {
- let mut interval = tokio::time::interval(period);
- loop {
- interval.tick().await;
+ ) -> impl Stream<Item = Result<(), Error>> {
+ stream! {
+ loop {
+ interval.tick().await;
+
+ let mut child = command
+ .stdout(Stdio::piped())
+ .stderr(Stdio::piped())
+ .spawn()?;
- let status = command.output().await.map_or_else(Into::into, |o| {
- if o.status.success() {
- Status::Ok
- } else {
- let stdout = String::from_utf8_lossy(&o.stdout).trim().to_string();
- Status::Error(Some(format!("Service state: {}", stdout)))
+ let mut stdout_reader =
+ BufReader::new(child.stdout.take().ok_or(Error::NoStdout)?).lines();
+
+ while let Some(line) = stdout_reader.next_line().await? {
+ if "Ok" == line {
+ yield Ok(());
+ } else {
+ yield Err(Error::Output(line))
+ }
}
- });
- tx.send_if_modified(|s| s.update(status));
+ match child.wait().await?.code() {
+ Some(0) => yield Ok(()),
+ Some(code) => yield Err(Error::PersistExitCode(code)),
+ None => yield Err(Error::Signal),
+ };
+ }
}
}
- async fn spawn_persist(
+ #[tracing::instrument]
+ fn interval(
+ mut interval: tokio::time::Interval,
mut command: tokio::process::Command,
- tx: Sender<Status>,
- ) -> Result<(), Error> {
- let mut child = command.stdout(Stdio::piped()).spawn()?;
- let mut stdout = BufReader::new(child.stdout.take().unwrap()).lines();
-
- while let Some(line) = stdout.next_line().await? {
- let status: Status = serde_json::from_str(&line)
- .unwrap_or_else(|err| Status::Error(Some(format!("Serialization error: {err}"))));
- tx.send_if_modified(|s| s.update(status));
+ ) -> impl Stream<Item = Result<(), Error>> {
+ stream! {
+ loop {
+ interval.tick().await;
+ let output = command.output().await?;
+ match output.status.code() {
+ Some(0) => yield Ok(()),
+ Some(code) => {
+ let stderr = String::from_utf8_lossy(&output.stderr).to_string();
+ yield Err(Error::ExitCode { code, stderr })
+ }
+ None => yield Err(Error::Signal),
+ }
+ }
}
-
- let exit_status = child.wait().await?;
- let status = match exit_status.code() {
- Some(0) => Status::Ok,
- Some(code) => Status::Error(Some(format!("Exited with status code: {code}"))),
- None => Status::Error(Some("Process terminated by signal".to_string())),
- };
-
- tx.send_if_modified(|s| s.update(status));
- Ok(())
}
}
-impl ServiceSpawner for Command {
- async fn spawn(self, tx: Sender<Status>) -> Result<(), Error> {
+impl IntoService for Command {
+ type Error = Error;
+
+ fn into_service(self) -> impl Stream<Item = Result<(), Self::Error>> {
+ let interval = tokio::time::interval(self.interval);
let mut command = tokio::process::Command::new(self.command);
command.args(self.args);
- if let Some(period) = self.interval {
- Self::spawn_interval(command, period, tx).await
+ if self.persist {
+ Self::persist(interval, command).boxed()
} else {
- Self::spawn_persist(command, tx).await
+ Self::interval(interval, command).boxed()
}
}
}