summaryrefslogtreecommitdiffstats
path: root/src/state.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/state.rs')
-rw-r--r--src/state.rs71
1 files changed, 38 insertions, 33 deletions
diff --git a/src/state.rs b/src/state.rs
index bbe235b..c442d9d 100644
--- a/src/state.rs
+++ b/src/state.rs
@@ -5,46 +5,47 @@ use tokio::sync::watch::Receiver;
use tokio_stream::wrappers::WatchStream;
use crate::{
- service::{IntoService, ServiceConfig},
+ service::{IntoService, ServiceConfig, ServiceKind},
Status,
};
#[derive(Clone)]
pub struct AppState {
rx_map: HashMap<String, Receiver<Status>>,
+ indexes: Vec<String>,
}
impl AppState {
- pub fn spawn_services(configs: HashMap<String, ServiceConfig>) -> AppState {
- let mut rx_map = HashMap::new();
- let mut tx_map = HashMap::new();
- for name in configs.keys() {
- let (tx, rx) = tokio::sync::watch::channel(Default::default());
- rx_map.insert(name.clone(), rx);
- tx_map.insert(name.clone(), tx);
- }
+ pub fn new(configs: Vec<ServiceConfig>) -> AppState {
+ let mut indexes = Vec::new();
+ let rx_map = configs
+ .into_iter()
+ .map(|ServiceConfig { name, kind }| {
+ indexes.push(name.clone());
+ let (tx, rx) = tokio::sync::watch::channel(Status::default());
+ tokio::spawn(Self::spawn_service(kind, tx.clone()));
+ (name, rx)
+ })
+ .collect();
+
+ AppState { rx_map, indexes }
+ }
- for (name, config) in configs {
- let (tx, rx) = tokio::sync::watch::channel(Status::default());
- rx_map.insert(name.clone(), rx);
- tokio::spawn(async move {
- let mut stream = config.into_service();
- while let Some(res) = stream.next().await {
- let status = res.into();
- tx.send_if_modified(|s| {
- if *s != status {
- tracing::debug!(name, ?status, "Updated service status");
- *s = status;
- true
- } else {
- false
- }
- });
+ #[tracing::instrument(skip(tx))]
+ async fn spawn_service(kind: ServiceKind, tx: tokio::sync::watch::Sender<Status>) {
+ let mut stream = kind.into_service();
+ while let Some(res) = stream.next().await {
+ let status = res.into();
+ tx.send_if_modified(|s| {
+ if *s != status {
+ tracing::debug!(?status, "Updated service status");
+ *s = status;
+ true
+ } else {
+ false
}
});
}
-
- AppState { rx_map }
}
pub fn status(&self, k: &str) -> Option<Status> {
@@ -52,9 +53,13 @@ impl AppState {
}
pub fn statuses(&self) -> Vec<(String, Status)> {
- self.rx_map
+ self.indexes
.iter()
- .map(|(k, v)| (k.clone(), v.borrow().clone()))
+ .cloned()
+ .map(|s| {
+ let status = self.status(&s).expect("Service was unexpectedly removed");
+ (s, status)
+ })
.collect()
}
@@ -63,10 +68,10 @@ impl AppState {
}
pub fn streams(&self) -> impl Stream<Item = (String, Status)> {
- let iter =
- self.rx_map.clone().into_iter().map(|(name, rx)| {
- WatchStream::new(rx).map(move |status| (name.to_owned(), status))
- });
+ let iter = self.indexes.iter().cloned().map(|s| {
+ let stream = self.stream(&s).expect("Service was unexpectedly removed");
+ stream.map(move |status| (s.to_owned(), status))
+ });
futures::stream::select_all(iter)
}