From b94f8e694bf01f5dba9ce2c01f589463a3dfbc69 Mon Sep 17 00:00:00 2001 From: Toby Vincent Date: Wed, 9 Oct 2024 18:23:58 -0500 Subject: feat!: rewrite to use traits and streams --- src/service.rs | 96 +++++++++++++++++++++++++++++++--------------------------- 1 file changed, 52 insertions(+), 44 deletions(-) (limited to 'src/service.rs') diff --git a/src/service.rs b/src/service.rs index 3e37503..b10385a 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1,68 +1,76 @@ -use std::{collections::HashMap, sync::Arc}; +use std::collections::HashMap; -use futures::Stream; -use http::Http; +use futures::{StreamExt, TryStreamExt}; use serde::Deserialize; -use systemd::Systemd; -use tcp::Tcp; -use tokio::{ - sync::watch::{channel, Receiver, Sender}, - task::JoinHandle, -}; -use tokio_stream::wrappers::WatchStream; +use tokio_stream::{Stream, StreamMap}; -use crate::{Error, Status}; +use crate::Status; +pub mod command; pub mod http; -pub mod systemd; pub mod tcp; -pub mod command; -pub type ServiceHandles = Arc>; +pub type ServiceHandles = HashMap; + +pub trait IntoService { + type Error: std::error::Error + Sync + Send + Sized; -pub trait ServiceSpawner { - fn spawn( - self, - tx: Sender, - ) -> impl std::future::Future> + std::marker::Send + 'static; + fn into_service(self) -> impl Stream> + Send; } -#[derive(Debug)] -pub struct ServiceHandle { - pub handle: JoinHandle>, - pub rx: Receiver, +pub trait IntoServiceMap { + type Error: std::error::Error + Sync + Send + Sized; + + fn into_service_map(self) -> impl Stream)> + Send; } -impl ServiceHandle { - pub fn new(service: impl ServiceSpawner) -> Self { - let (tx, rx) = channel(Status::Error(None)); - let handle = tokio::spawn(service.spawn(tx)); - Self { handle, rx } - } +impl IntoServiceMap for T +where + T: IntoIterator, + V: IntoService, + K: std::hash::Hash + std::cmp::Eq + std::clone::Clone + std::marker::Unpin + std::marker::Send, +{ + type Error = V::Error; - pub fn status(&self) -> Status { - self.rx.borrow().clone() + fn into_service_map(self) -> impl Stream)> + Send { + let mut map = StreamMap::new(); + for (name, srv) in self.into_iter() { + map.insert(name, Box::pin(srv.into_service())); + } + map } +} - pub fn into_stream(&self) -> impl Stream { - WatchStream::new(self.rx.clone()) - } +pub fn default_interval() -> std::time::Duration { + std::time::Duration::from_secs(5) } #[derive(Debug, Clone, Deserialize)] #[serde(untagged)] -pub enum Service { - Http(Http), - Tcp(Tcp), - Systemd(Systemd), +pub enum ServiceConfig { + Http(http::Http), + Tcp(tcp::Tcp), + Command(command::Command), } -impl From for ServiceHandle { - fn from(value: Service) -> Self { - match value { - Service::Http(s) => ServiceHandle::new(s), - Service::Tcp(s) => ServiceHandle::new(s), - Service::Systemd(s) => ServiceHandle::new(s), +#[derive(Debug, thiserror::Error)] +pub enum ServiceError { + #[error(transparent)] + Http(#[from] http::Error), + #[error(transparent)] + Tcp(#[from] tcp::Error), + #[error(transparent)] + Command(#[from] command::Error), +} + +impl IntoService for ServiceConfig { + type Error = ServiceError; + + fn into_service(self) -> impl Stream> + Send { + match self { + ServiceConfig::Http(h) => h.into_service().map_err(ServiceError::from).boxed(), + ServiceConfig::Tcp(t) => t.into_service().map_err(ServiceError::from).boxed(), + ServiceConfig::Command(c) => c.into_service().map_err(ServiceError::from).boxed(), } } } -- cgit v1.2.3-70-g09d2