summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/api.rs7
-rw-r--r--src/main.rs2
-rw-r--r--src/service.rs57
-rw-r--r--src/service/command.rs104
-rw-r--r--src/service/http.rs51
-rw-r--r--src/service/tcp.rs42
-rw-r--r--src/state.rs24
7 files changed, 117 insertions, 170 deletions
diff --git a/src/api.rs b/src/api.rs
index bab2043..57dc4b7 100644
--- a/src/api.rs
+++ b/src/api.rs
@@ -1,3 +1,5 @@
+use axum::{extract::State, routing::get, Json};
+
use crate::AppState;
pub mod services;
@@ -7,4 +9,9 @@ pub fn router() -> axum::Router<AppState> {
axum::Router::new()
.nest("/sse", sse::router())
.nest("/status", services::router())
+ .route("/list", get(names))
+}
+
+pub async fn names(State(state): State<AppState>) -> Json<Vec<String>> {
+ Json(state.names())
}
diff --git a/src/main.rs b/src/main.rs
index d5a35b7..df69e16 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -30,7 +30,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
router = router.layer(tower_http::trace::TraceLayer::new_for_http());
- let listener = tokio::net::TcpListener::bind(config.address).await.unwrap();
+ let listener = tokio::net::TcpListener::bind(config.address).await?;
tracing::info!("listening on {}", listener.local_addr().unwrap());
axum::serve(listener, router).await.map_err(Into::into)
diff --git a/src/service.rs b/src/service.rs
index 9ca9fc6..b67ba51 100644
--- a/src/service.rs
+++ b/src/service.rs
@@ -1,8 +1,4 @@
-use std::collections::HashMap;
-
-use futures::{StreamExt, TryStreamExt};
use serde::Deserialize;
-use tokio_stream::{Stream, StreamMap};
use crate::Status;
@@ -10,35 +6,11 @@ pub mod command;
pub mod http;
pub mod tcp;
-pub type ServiceHandles = HashMap<String, Status>;
-
pub trait IntoService {
- type Error: std::error::Error + Sync + Send + Sized;
-
- fn into_service(self) -> impl Stream<Item = Result<(), Self::Error>> + Send;
-}
-
-pub trait IntoServiceMap<K> {
- type Error: std::error::Error + Sync + Send + Sized;
-
- fn into_service_map(self) -> impl Stream<Item = (K, Result<(), Self::Error>)> + Send;
-}
-
-impl<T, K, V> IntoServiceMap<K> for T
-where
- T: IntoIterator<Item = (K, V)>,
- V: IntoService,
- K: std::hash::Hash + std::cmp::Eq + std::clone::Clone + std::marker::Unpin + std::marker::Send,
-{
- type Error = V::Error;
-
- fn into_service_map(self) -> impl Stream<Item = (K, Result<(), Self::Error>)> + Send {
- let mut map = StreamMap::new();
- for (name, srv) in self.into_iter() {
- map.insert(name, Box::pin(srv.into_service()));
- }
- map
- }
+ fn into_service(
+ self,
+ tx: tokio::sync::watch::Sender<Status>,
+ ) -> impl std::future::Future<Output = ()>;
}
pub fn default_interval() -> std::time::Duration {
@@ -54,30 +26,19 @@ pub struct ServiceConfig {
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "lowercase")]
+#[serde(tag = "kind")]
pub enum ServiceKind {
Http(http::Http),
Tcp(tcp::Tcp),
Exec(command::Command),
}
-#[derive(Debug, thiserror::Error)]
-pub enum ServiceError {
- #[error(transparent)]
- Http(#[from] http::Error),
- #[error(transparent)]
- Tcp(#[from] tcp::Error),
- #[error(transparent)]
- Command(#[from] command::Error),
-}
-
impl IntoService for ServiceKind {
- type Error = ServiceError;
-
- fn into_service(self) -> impl Stream<Item = Result<(), Self::Error>> + Send {
+ async fn into_service(self, tx: tokio::sync::watch::Sender<Status>) {
match self {
- ServiceKind::Http(h) => h.into_service().map_err(ServiceError::from).boxed(),
- ServiceKind::Tcp(t) => t.into_service().map_err(ServiceError::from).boxed(),
- ServiceKind::Exec(c) => c.into_service().map_err(ServiceError::from).boxed(),
+ ServiceKind::Http(h) => h.into_service(tx).await,
+ ServiceKind::Tcp(t) => t.into_service(tx).await,
+ ServiceKind::Exec(c) => c.into_service(tx).await,
}
}
}
diff --git a/src/service/command.rs b/src/service/command.rs
index 3535ee2..6b44e1d 100644
--- a/src/service/command.rs
+++ b/src/service/command.rs
@@ -1,9 +1,12 @@
use std::{process::Stdio, time::Duration};
-use async_stream::stream;
-use futures::{Stream, StreamExt};
-use serde::Deserialize;
-use tokio::io::{AsyncBufReadExt, BufReader};
+use serde::{Deserialize, Serialize};
+use tokio::{
+ io::{AsyncBufReadExt, BufReader},
+ sync::watch::Sender,
+};
+
+use crate::Status;
use super::IntoService;
@@ -29,7 +32,7 @@ pub enum Error {
NoStdout,
}
-#[derive(Debug, Clone, Deserialize)]
+#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Command {
pub command: String,
pub args: Vec<String>,
@@ -40,74 +43,57 @@ pub struct Command {
}
impl Command {
- #[tracing::instrument]
- fn persist(
- mut interval: tokio::time::Interval,
- mut command: tokio::process::Command,
- ) -> impl Stream<Item = Result<(), Error>> {
- stream! {
- loop {
- interval.tick().await;
+ async fn persist(&self, tx: Sender<Status>) -> Result<(), Error> {
+ let mut command = tokio::process::Command::new(&self.command);
+ command.args(&self.args);
- let mut child = command
- .stdout(Stdio::piped())
- .stderr(Stdio::piped())
- .spawn()?;
+ let mut child = command
+ .stdout(Stdio::piped())
+ .stderr(Stdio::piped())
+ .spawn()?;
- let mut stdout_reader =
- BufReader::new(child.stdout.take().ok_or(Error::NoStdout)?).lines();
+ let mut lines = BufReader::new(child.stdout.take().ok_or(Error::NoStdout)?).lines();
- while let Some(line) = stdout_reader.next_line().await? {
- if "Ok" == line {
- yield Ok(());
- } else {
- yield Err(Error::Output(line))
- }
- }
+ while let Some(line) = lines.next_line().await? {
+ let res = ("Ok" == line).then_some(()).ok_or(Error::Output(line));
+ tx.send_if_modified(|s| s.update(res.into()));
+ }
- match child.wait().await?.code() {
- Some(0) => yield Ok(()),
- Some(code) => yield Err(Error::PersistExitCode(code)),
- None => yield Err(Error::Signal),
- };
- }
+ match child.wait().await?.code() {
+ Some(0) => Ok(()),
+ Some(code) => Err(Error::PersistExitCode(code)),
+ None => Err(Error::Signal),
}
}
- #[tracing::instrument]
- fn interval(
- mut interval: tokio::time::Interval,
- mut command: tokio::process::Command,
- ) -> impl Stream<Item = Result<(), Error>> {
- stream! {
- loop {
- interval.tick().await;
- let output = command.output().await?;
- match output.status.code() {
- Some(0) => yield Ok(()),
- Some(code) => {
- let stderr = String::from_utf8_lossy(&output.stderr).to_string();
- yield Err(Error::ExitCode { code, stderr })
- }
- None => yield Err(Error::Signal),
- }
+ async fn interval(&self) -> Result<(), Error> {
+ let mut command = tokio::process::Command::new(&self.command);
+ command.args(&self.args);
+
+ let output = command.output().await?;
+ match output.status.code() {
+ Some(0) => Ok(()),
+ Some(code) => {
+ let stderr = String::from_utf8_lossy(&output.stderr).to_string();
+ Err(Error::ExitCode { code, stderr })
}
+ None => Err(Error::Signal),
}
}
}
impl IntoService for Command {
- type Error = Error;
-
- fn into_service(self) -> impl Stream<Item = Result<(), Self::Error>> {
- let interval = tokio::time::interval(self.interval);
- let mut command = tokio::process::Command::new(self.command);
- command.args(self.args);
+ async fn into_service(self, tx: tokio::sync::watch::Sender<Status>) {
+ let mut interval = tokio::time::interval(self.interval);
+ loop {
+ interval.tick().await;
+ let res = if self.persist {
+ self.persist(tx.clone()).await
+ } else {
+ self.interval().await
+ };
- if self.persist {
- Self::persist(interval, command).boxed()
- } else {
- Self::interval(interval, command).boxed()
+ tx.send_if_modified(|s| s.update(res.into()));
}
}
}
diff --git a/src/service/http.rs b/src/service/http.rs
index c4fcee7..bfba007 100644
--- a/src/service/http.rs
+++ b/src/service/http.rs
@@ -1,11 +1,12 @@
use std::{fmt::Display, time::Duration};
-use async_stream::try_stream;
use axum::http::status::StatusCode;
-use futures::Stream;
-use serde::Deserialize;
+use reqwest::Client;
+use serde::{Deserialize, Serialize};
use url::Url;
+use crate::Status;
+
use super::IntoService;
#[derive(Debug, thiserror::Error)]
@@ -16,7 +17,7 @@ pub enum Error {
StatusCode(u16),
}
-#[derive(Debug, Clone, Deserialize)]
+#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Http {
pub url: Url,
#[serde(default)]
@@ -29,32 +30,34 @@ pub struct Http {
pub interval: Duration,
}
-impl IntoService for Http {
- type Error = Error;
+impl Http {
+ async fn check(&self) -> Result<(), Error> {
+ let client = match self.client.as_ref() {
+ Some(client) => client,
+ None => &Client::new(),
+ };
+ let req = client
+ .request(self.method.into(), self.url.clone())
+ .build()?;
+ let status_code = client.execute(req).await?.status().as_u16();
+ (status_code == self.status_code)
+ .then_some(())
+ .ok_or_else(|| Error::StatusCode(status_code))
+ }
+}
- fn into_service(self) -> impl Stream<Item = Result<(), Self::Error>> {
+impl IntoService for Http {
+ async fn into_service(self, tx: tokio::sync::watch::Sender<Status>) {
let mut interval = tokio::time::interval(self.interval);
-
- try_stream! {
- let client = self.client.unwrap_or_default();
- let req = client.request(self.method.into(), self.url).build()?;
- loop {
- interval.tick().await;
- let req = req
- .try_clone()
- .expect("Clone with no body should never fail");
- let status_code = client.execute(req).await?.status().as_u16();
- if status_code == self.status_code {
- yield ();
- } else {
- Err(Error::StatusCode(status_code))?
- }
- }
+ loop {
+ interval.tick().await;
+ let res = self.check().await;
+ tx.send_if_modified(|s| s.update(res.into()));
}
}
}
-#[derive(Debug, Clone, Copy, Default, Deserialize)]
+#[derive(Debug, Clone, Copy, Default, Deserialize, Serialize)]
pub enum Method {
#[serde(alias = "get", alias = "GET")]
#[default]
diff --git a/src/service/tcp.rs b/src/service/tcp.rs
index 6556af0..e28b19d 100644
--- a/src/service/tcp.rs
+++ b/src/service/tcp.rs
@@ -1,15 +1,13 @@
use std::{fmt::Display, net::SocketAddr, time::Duration};
-use async_stream::try_stream;
-use futures::Stream;
-use serde::Deserialize;
+use serde::{Deserialize, Serialize};
use tokio::{io::Interest, net::TcpSocket};
-use super::IntoService;
+use crate::Status;
-pub(crate) type Error = std::io::Error;
+use super::IntoService;
-#[derive(Debug, Clone, Deserialize)]
+#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Tcp {
pub address: SocketAddr,
#[serde(default = "super::default_interval")]
@@ -22,24 +20,26 @@ impl Display for Tcp {
}
}
-impl IntoService for Tcp {
- type Error = Error;
-
- fn into_service(self) -> impl Stream<Item = Result<(), Self::Error>> {
- let mut interval = tokio::time::interval(self.interval);
+impl Tcp {
+ async fn check(address: SocketAddr) -> Result<(), std::io::Error> {
+ let sock = TcpSocket::new_v4()?;
+ sock.set_keepalive(true)?;
- try_stream! {
- loop {
- interval.tick().await;
+ let conn = sock.connect(address).await?;
+ // TODO: figure out how to wait for connection to close
+ conn.ready(Interest::READABLE).await?;
+ Ok(())
+ }
+}
- let sock = TcpSocket::new_v4()?;
- sock.set_keepalive(true)?;
+impl IntoService for Tcp {
+ async fn into_service(self, tx: tokio::sync::watch::Sender<Status>) {
+ let mut interval = tokio::time::interval(self.interval);
- let conn = sock.connect(self.address).await?;
- // TODO: figure out how to wait for connection to close
- conn.ready(Interest::READABLE).await?;
- yield ();
- }
+ loop {
+ interval.tick().await;
+ let res = Self::check(self.address).await;
+ tx.send_if_modified(|s| s.update(res.into()));
}
}
}
diff --git a/src/state.rs b/src/state.rs
index c442d9d..857b8af 100644
--- a/src/state.rs
+++ b/src/state.rs
@@ -5,7 +5,7 @@ use tokio::sync::watch::Receiver;
use tokio_stream::wrappers::WatchStream;
use crate::{
- service::{IntoService, ServiceConfig, ServiceKind},
+ service::{IntoService, ServiceConfig},
Status,
};
@@ -22,30 +22,20 @@ impl AppState {
.into_iter()
.map(|ServiceConfig { name, kind }| {
indexes.push(name.clone());
+ tracing::debug!(name, "Added service");
let (tx, rx) = tokio::sync::watch::channel(Status::default());
- tokio::spawn(Self::spawn_service(kind, tx.clone()));
+ tokio::spawn(kind.into_service(tx.clone()));
(name, rx)
})
.collect();
+ tracing::debug!(?indexes, "Finished spawning services");
+
AppState { rx_map, indexes }
}
- #[tracing::instrument(skip(tx))]
- async fn spawn_service(kind: ServiceKind, tx: tokio::sync::watch::Sender<Status>) {
- let mut stream = kind.into_service();
- while let Some(res) = stream.next().await {
- let status = res.into();
- tx.send_if_modified(|s| {
- if *s != status {
- tracing::debug!(?status, "Updated service status");
- *s = status;
- true
- } else {
- false
- }
- });
- }
+ pub fn names(&self) -> Vec<String> {
+ self.indexes.clone()
}
pub fn status(&self, k: &str) -> Option<Status> {