summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--config.toml15
-rw-r--r--src/main.rs6
-rw-r--r--src/service.rs21
-rw-r--r--src/state.rs71
4 files changed, 66 insertions, 47 deletions
diff --git a/config.toml b/config.toml
index 00a0866..555502f 100644
--- a/config.toml
+++ b/config.toml
@@ -1,7 +1,14 @@
serve_dir = "assets"
address = "127.0.0.1:8080"
-[services]
-"tobyvin.dev" = { url = "https://tobyvin.dev" }
-test_cmd = { command = "stat", args = ["/tmp/test_file"] }
-dns = { address = "10.42.0.1:53" }
+[[services]]
+name = "tobyvin.dev"
+http = { url = "https://tobyvin.dev" }
+
+[[services]]
+name = "test_cmd"
+exec = { command = "stat", args = ["/tmp/test_file"] }
+
+[[services]]
+name = "dns"
+tcp = { address = "10.42.0.1:53" }
diff --git a/src/main.rs b/src/main.rs
index 85ff708..d5a35b7 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,4 +1,4 @@
-use std::{collections::HashMap, fs::File, path::PathBuf};
+use std::{fs::File, path::PathBuf};
use tower_http::services::ServeDir;
use tracing::level_filters::LevelFilter;
@@ -21,7 +21,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
};
- let state = AppState::spawn_services(config.services);
+ let state = AppState::new(config.services);
let mut router = statsrv::router().with_state(state);
if let Some(path) = config.serve_dir {
@@ -41,7 +41,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
pub struct Config {
pub serve_dir: Option<PathBuf>,
pub address: String,
- pub services: HashMap<String, ServiceConfig>,
+ pub services: Vec<ServiceConfig>,
}
impl Config {
diff --git a/src/service.rs b/src/service.rs
index b10385a..9ca9fc6 100644
--- a/src/service.rs
+++ b/src/service.rs
@@ -46,11 +46,18 @@ pub fn default_interval() -> std::time::Duration {
}
#[derive(Debug, Clone, Deserialize)]
-#[serde(untagged)]
-pub enum ServiceConfig {
+pub struct ServiceConfig {
+ pub name: String,
+ #[serde(flatten)]
+ pub kind: ServiceKind,
+}
+
+#[derive(Debug, Clone, Deserialize)]
+#[serde(rename_all = "lowercase")]
+pub enum ServiceKind {
Http(http::Http),
Tcp(tcp::Tcp),
- Command(command::Command),
+ Exec(command::Command),
}
#[derive(Debug, thiserror::Error)]
@@ -63,14 +70,14 @@ pub enum ServiceError {
Command(#[from] command::Error),
}
-impl IntoService for ServiceConfig {
+impl IntoService for ServiceKind {
type Error = ServiceError;
fn into_service(self) -> impl Stream<Item = Result<(), Self::Error>> + Send {
match self {
- ServiceConfig::Http(h) => h.into_service().map_err(ServiceError::from).boxed(),
- ServiceConfig::Tcp(t) => t.into_service().map_err(ServiceError::from).boxed(),
- ServiceConfig::Command(c) => c.into_service().map_err(ServiceError::from).boxed(),
+ ServiceKind::Http(h) => h.into_service().map_err(ServiceError::from).boxed(),
+ ServiceKind::Tcp(t) => t.into_service().map_err(ServiceError::from).boxed(),
+ ServiceKind::Exec(c) => c.into_service().map_err(ServiceError::from).boxed(),
}
}
}
diff --git a/src/state.rs b/src/state.rs
index bbe235b..c442d9d 100644
--- a/src/state.rs
+++ b/src/state.rs
@@ -5,46 +5,47 @@ use tokio::sync::watch::Receiver;
use tokio_stream::wrappers::WatchStream;
use crate::{
- service::{IntoService, ServiceConfig},
+ service::{IntoService, ServiceConfig, ServiceKind},
Status,
};
#[derive(Clone)]
pub struct AppState {
rx_map: HashMap<String, Receiver<Status>>,
+ indexes: Vec<String>,
}
impl AppState {
- pub fn spawn_services(configs: HashMap<String, ServiceConfig>) -> AppState {
- let mut rx_map = HashMap::new();
- let mut tx_map = HashMap::new();
- for name in configs.keys() {
- let (tx, rx) = tokio::sync::watch::channel(Default::default());
- rx_map.insert(name.clone(), rx);
- tx_map.insert(name.clone(), tx);
- }
+ pub fn new(configs: Vec<ServiceConfig>) -> AppState {
+ let mut indexes = Vec::new();
+ let rx_map = configs
+ .into_iter()
+ .map(|ServiceConfig { name, kind }| {
+ indexes.push(name.clone());
+ let (tx, rx) = tokio::sync::watch::channel(Status::default());
+ tokio::spawn(Self::spawn_service(kind, tx.clone()));
+ (name, rx)
+ })
+ .collect();
+
+ AppState { rx_map, indexes }
+ }
- for (name, config) in configs {
- let (tx, rx) = tokio::sync::watch::channel(Status::default());
- rx_map.insert(name.clone(), rx);
- tokio::spawn(async move {
- let mut stream = config.into_service();
- while let Some(res) = stream.next().await {
- let status = res.into();
- tx.send_if_modified(|s| {
- if *s != status {
- tracing::debug!(name, ?status, "Updated service status");
- *s = status;
- true
- } else {
- false
- }
- });
+ #[tracing::instrument(skip(tx))]
+ async fn spawn_service(kind: ServiceKind, tx: tokio::sync::watch::Sender<Status>) {
+ let mut stream = kind.into_service();
+ while let Some(res) = stream.next().await {
+ let status = res.into();
+ tx.send_if_modified(|s| {
+ if *s != status {
+ tracing::debug!(?status, "Updated service status");
+ *s = status;
+ true
+ } else {
+ false
}
});
}
-
- AppState { rx_map }
}
pub fn status(&self, k: &str) -> Option<Status> {
@@ -52,9 +53,13 @@ impl AppState {
}
pub fn statuses(&self) -> Vec<(String, Status)> {
- self.rx_map
+ self.indexes
.iter()
- .map(|(k, v)| (k.clone(), v.borrow().clone()))
+ .cloned()
+ .map(|s| {
+ let status = self.status(&s).expect("Service was unexpectedly removed");
+ (s, status)
+ })
.collect()
}
@@ -63,10 +68,10 @@ impl AppState {
}
pub fn streams(&self) -> impl Stream<Item = (String, Status)> {
- let iter =
- self.rx_map.clone().into_iter().map(|(name, rx)| {
- WatchStream::new(rx).map(move |status| (name.to_owned(), status))
- });
+ let iter = self.indexes.iter().cloned().map(|s| {
+ let stream = self.stream(&s).expect("Service was unexpectedly removed");
+ stream.map(move |status| (s.to_owned(), status))
+ });
futures::stream::select_all(iter)
}