summaryrefslogtreecommitdiffstats
path: root/src/service.rs
diff options
context:
space:
mode:
authorToby Vincent <tobyv@tobyvin.dev>2024-09-28 00:54:46 -0500
committerToby Vincent <tobyv@tobyvin.dev>2024-09-28 00:58:45 -0500
commitcd774827dd14f68d8405c45d2d9da30b3fab050e (patch)
treea24e1cabb99170caa25edff53fc978111a1c9dd4 /src/service.rs
parent04c7f7609e5bc3fadf95c53b37a9e6e12c4e539c (diff)
feat: refactor into pub-sub and impl SSE
Diffstat (limited to 'src/service.rs')
-rw-r--r--src/service.rs106
1 files changed, 34 insertions, 72 deletions
diff --git a/src/service.rs b/src/service.rs
index bae6867..c45fcb1 100644
--- a/src/service.rs
+++ b/src/service.rs
@@ -1,10 +1,15 @@
-use std::{collections::HashMap, fmt::Display};
+use std::{collections::HashMap, sync::Arc};
-use futures::{stream::FuturesOrdered, TryStreamExt};
+use futures::Stream;
use http::Http;
use serde::Deserialize;
use systemd::Systemd;
use tcp::Tcp;
+use tokio::{
+ sync::watch::{Receiver, Sender},
+ task::JoinHandle,
+};
+use tokio_stream::wrappers::WatchStream;
use crate::{Error, Status};
@@ -12,67 +17,34 @@ pub mod http;
pub mod systemd;
pub mod tcp;
-#[derive(Debug, Clone, Deserialize)]
-pub struct Services {
- #[serde(flatten)]
- inner: HashMap<String, Service>,
- #[serde(skip, default = "Services::default_client")]
- client: reqwest::Client,
-}
+pub type ServiceHandles = Arc<HashMap<String, ServiceHandle>>;
-impl Services {
- pub fn new(services: HashMap<String, Service>) -> Self {
- let client = reqwest::Client::new();
- Self {
- inner: services,
- client,
- }
- }
-
- fn default_client() -> reqwest::Client {
- reqwest::Client::new()
- }
+pub trait ServiceSpawner {
+ fn spawn(
+ self,
+ tx: Sender<Status>,
+ ) -> impl std::future::Future<Output = Result<(), Error>> + std::marker::Send + 'static;
+}
- pub async fn check(&self) -> Result<HashMap<String, Status>, Error> {
- let checks = self
- .inner
- .values()
- .map(|service| service.check(self.client.clone()))
- .collect::<FuturesOrdered<_>>()
- .try_collect::<Vec<_>>()
- .await?;
+#[derive(Debug)]
+pub struct ServiceHandle {
+ pub handle: JoinHandle<Result<(), Error>>,
+ pub rx: Receiver<Status>,
+}
- Ok(self
- .inner
- .keys()
- .cloned()
- .zip(checks)
- .collect::<HashMap<_, _>>())
+impl ServiceHandle {
+ pub fn new(service: impl ServiceSpawner) -> Self {
+ let (tx, rx) = tokio::sync::watch::channel(Status::default());
+ let handle = tokio::spawn(service.spawn(tx));
+ Self { handle, rx }
}
- pub async fn check_one(&self, name: &str) -> Option<Result<Status, Error>> {
- Some(self.inner.get(name)?.check(self.client.clone()).await)
+ pub fn status(&self) -> Status {
+ self.rx.borrow().clone()
}
- pub async fn check_filtered<P>(&self, mut predicate: P) -> Result<HashMap<String, Status>, Error>
- where
- P: FnMut(&String) -> bool,
- {
- let checks = self
- .inner
- .iter()
- .filter_map(|(s, service)| predicate(s).then_some(service))
- .map(|service| service.check(self.client.clone()))
- .collect::<FuturesOrdered<_>>()
- .try_collect::<Vec<_>>()
- .await?;
-
- Ok(self
- .inner
- .keys()
- .cloned()
- .zip(checks)
- .collect::<HashMap<_, _>>())
+ pub fn into_stream(&self) -> impl Stream<Item = Status> {
+ WatchStream::new(self.rx.clone())
}
}
@@ -84,22 +56,12 @@ pub enum Service {
Systemd(Systemd),
}
-impl Service {
- pub async fn check(&self, client: reqwest::Client) -> Result<Status, Error> {
- match self {
- Service::Http(http) => http.check(client).await,
- Service::Tcp(tcp) => tcp.check().await,
- Service::Systemd(systemd) => systemd.check().await,
- }
- }
-}
-
-impl Display for Service {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- match self {
- Service::Http(http) => http.fmt(f),
- Service::Tcp(tcp) => tcp.fmt(f),
- Service::Systemd(systemd) => systemd.fmt(f),
+impl From<Service> for ServiceHandle {
+ fn from(value: Service) -> Self {
+ match value {
+ Service::Http(s) => ServiceHandle::new(s),
+ Service::Tcp(s) => ServiceHandle::new(s),
+ Service::Systemd(s) => ServiceHandle::new(s),
}
}
}