diff options
Diffstat (limited to 'src/service.rs')
-rw-r--r-- | src/service.rs | 106 |
1 files changed, 34 insertions, 72 deletions
diff --git a/src/service.rs b/src/service.rs index bae6867..c45fcb1 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1,10 +1,15 @@ -use std::{collections::HashMap, fmt::Display}; +use std::{collections::HashMap, sync::Arc}; -use futures::{stream::FuturesOrdered, TryStreamExt}; +use futures::Stream; use http::Http; use serde::Deserialize; use systemd::Systemd; use tcp::Tcp; +use tokio::{ + sync::watch::{Receiver, Sender}, + task::JoinHandle, +}; +use tokio_stream::wrappers::WatchStream; use crate::{Error, Status}; @@ -12,67 +17,34 @@ pub mod http; pub mod systemd; pub mod tcp; -#[derive(Debug, Clone, Deserialize)] -pub struct Services { - #[serde(flatten)] - inner: HashMap<String, Service>, - #[serde(skip, default = "Services::default_client")] - client: reqwest::Client, -} +pub type ServiceHandles = Arc<HashMap<String, ServiceHandle>>; -impl Services { - pub fn new(services: HashMap<String, Service>) -> Self { - let client = reqwest::Client::new(); - Self { - inner: services, - client, - } - } - - fn default_client() -> reqwest::Client { - reqwest::Client::new() - } +pub trait ServiceSpawner { + fn spawn( + self, + tx: Sender<Status>, + ) -> impl std::future::Future<Output = Result<(), Error>> + std::marker::Send + 'static; +} - pub async fn check(&self) -> Result<HashMap<String, Status>, Error> { - let checks = self - .inner - .values() - .map(|service| service.check(self.client.clone())) - .collect::<FuturesOrdered<_>>() - .try_collect::<Vec<_>>() - .await?; +#[derive(Debug)] +pub struct ServiceHandle { + pub handle: JoinHandle<Result<(), Error>>, + pub rx: Receiver<Status>, +} - Ok(self - .inner - .keys() - .cloned() - .zip(checks) - .collect::<HashMap<_, _>>()) +impl ServiceHandle { + pub fn new(service: impl ServiceSpawner) -> Self { + let (tx, rx) = tokio::sync::watch::channel(Status::default()); + let handle = tokio::spawn(service.spawn(tx)); + Self { handle, rx } } - pub async fn check_one(&self, name: &str) -> Option<Result<Status, Error>> { - Some(self.inner.get(name)?.check(self.client.clone()).await) + pub fn status(&self) -> Status { + self.rx.borrow().clone() } - pub async fn check_filtered<P>(&self, mut predicate: P) -> Result<HashMap<String, Status>, Error> - where - P: FnMut(&String) -> bool, - { - let checks = self - .inner - .iter() - .filter_map(|(s, service)| predicate(s).then_some(service)) - .map(|service| service.check(self.client.clone())) - .collect::<FuturesOrdered<_>>() - .try_collect::<Vec<_>>() - .await?; - - Ok(self - .inner - .keys() - .cloned() - .zip(checks) - .collect::<HashMap<_, _>>()) + pub fn into_stream(&self) -> impl Stream<Item = Status> { + WatchStream::new(self.rx.clone()) } } @@ -84,22 +56,12 @@ pub enum Service { Systemd(Systemd), } -impl Service { - pub async fn check(&self, client: reqwest::Client) -> Result<Status, Error> { - match self { - Service::Http(http) => http.check(client).await, - Service::Tcp(tcp) => tcp.check().await, - Service::Systemd(systemd) => systemd.check().await, - } - } -} - -impl Display for Service { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - Service::Http(http) => http.fmt(f), - Service::Tcp(tcp) => tcp.fmt(f), - Service::Systemd(systemd) => systemd.fmt(f), +impl From<Service> for ServiceHandle { + fn from(value: Service) -> Self { + match value { + Service::Http(s) => ServiceHandle::new(s), + Service::Tcp(s) => ServiceHandle::new(s), + Service::Systemd(s) => ServiceHandle::new(s), } } } |