summaryrefslogtreecommitdiffstats
path: root/src/service.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/service.rs')
-rw-r--r--src/service.rs96
1 files changed, 52 insertions, 44 deletions
diff --git a/src/service.rs b/src/service.rs
index 3e37503..b10385a 100644
--- a/src/service.rs
+++ b/src/service.rs
@@ -1,68 +1,76 @@
-use std::{collections::HashMap, sync::Arc};
+use std::collections::HashMap;
-use futures::Stream;
-use http::Http;
+use futures::{StreamExt, TryStreamExt};
use serde::Deserialize;
-use systemd::Systemd;
-use tcp::Tcp;
-use tokio::{
- sync::watch::{channel, Receiver, Sender},
- task::JoinHandle,
-};
-use tokio_stream::wrappers::WatchStream;
+use tokio_stream::{Stream, StreamMap};
-use crate::{Error, Status};
+use crate::Status;
+pub mod command;
pub mod http;
-pub mod systemd;
pub mod tcp;
-pub mod command;
-pub type ServiceHandles = Arc<HashMap<String, ServiceHandle>>;
+pub type ServiceHandles = HashMap<String, Status>;
+
+pub trait IntoService {
+ type Error: std::error::Error + Sync + Send + Sized;
-pub trait ServiceSpawner {
- fn spawn(
- self,
- tx: Sender<Status>,
- ) -> impl std::future::Future<Output = Result<(), Error>> + std::marker::Send + 'static;
+ fn into_service(self) -> impl Stream<Item = Result<(), Self::Error>> + Send;
}
-#[derive(Debug)]
-pub struct ServiceHandle {
- pub handle: JoinHandle<Result<(), Error>>,
- pub rx: Receiver<Status>,
+pub trait IntoServiceMap<K> {
+ type Error: std::error::Error + Sync + Send + Sized;
+
+ fn into_service_map(self) -> impl Stream<Item = (K, Result<(), Self::Error>)> + Send;
}
-impl ServiceHandle {
- pub fn new(service: impl ServiceSpawner) -> Self {
- let (tx, rx) = channel(Status::Error(None));
- let handle = tokio::spawn(service.spawn(tx));
- Self { handle, rx }
- }
+impl<T, K, V> IntoServiceMap<K> for T
+where
+ T: IntoIterator<Item = (K, V)>,
+ V: IntoService,
+ K: std::hash::Hash + std::cmp::Eq + std::clone::Clone + std::marker::Unpin + std::marker::Send,
+{
+ type Error = V::Error;
- pub fn status(&self) -> Status {
- self.rx.borrow().clone()
+ fn into_service_map(self) -> impl Stream<Item = (K, Result<(), Self::Error>)> + Send {
+ let mut map = StreamMap::new();
+ for (name, srv) in self.into_iter() {
+ map.insert(name, Box::pin(srv.into_service()));
+ }
+ map
}
+}
- pub fn into_stream(&self) -> impl Stream<Item = Status> {
- WatchStream::new(self.rx.clone())
- }
+pub fn default_interval() -> std::time::Duration {
+ std::time::Duration::from_secs(5)
}
#[derive(Debug, Clone, Deserialize)]
#[serde(untagged)]
-pub enum Service {
- Http(Http),
- Tcp(Tcp),
- Systemd(Systemd),
+pub enum ServiceConfig {
+ Http(http::Http),
+ Tcp(tcp::Tcp),
+ Command(command::Command),
}
-impl From<Service> for ServiceHandle {
- fn from(value: Service) -> Self {
- match value {
- Service::Http(s) => ServiceHandle::new(s),
- Service::Tcp(s) => ServiceHandle::new(s),
- Service::Systemd(s) => ServiceHandle::new(s),
+#[derive(Debug, thiserror::Error)]
+pub enum ServiceError {
+ #[error(transparent)]
+ Http(#[from] http::Error),
+ #[error(transparent)]
+ Tcp(#[from] tcp::Error),
+ #[error(transparent)]
+ Command(#[from] command::Error),
+}
+
+impl IntoService for ServiceConfig {
+ type Error = ServiceError;
+
+ fn into_service(self) -> impl Stream<Item = Result<(), Self::Error>> + Send {
+ match self {
+ ServiceConfig::Http(h) => h.into_service().map_err(ServiceError::from).boxed(),
+ ServiceConfig::Tcp(t) => t.into_service().map_err(ServiceError::from).boxed(),
+ ServiceConfig::Command(c) => c.into_service().map_err(ServiceError::from).boxed(),
}
}
}