summaryrefslogtreecommitdiffstats
path: root/src/state.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/state.rs')
-rw-r--r--src/state.rs24
1 files changed, 7 insertions, 17 deletions
diff --git a/src/state.rs b/src/state.rs
index 857b8af..8c31065 100644
--- a/src/state.rs
+++ b/src/state.rs
@@ -1,17 +1,16 @@
use std::collections::HashMap;
-use futures::{Stream, StreamExt};
-use tokio::sync::watch::Receiver;
-use tokio_stream::wrappers::WatchStream;
+use futures::stream::SelectAll;
use crate::{
service::{IntoService, ServiceConfig},
+ status::{NamedStatusStream, Receiver, StatusStream},
Status,
};
#[derive(Clone)]
pub struct AppState {
- rx_map: HashMap<String, Receiver<Status>>,
+ rx_map: HashMap<String, Receiver>,
indexes: Vec<String>,
}
@@ -34,10 +33,6 @@ impl AppState {
AppState { rx_map, indexes }
}
- pub fn names(&self) -> Vec<String> {
- self.indexes.clone()
- }
-
pub fn status(&self, k: &str) -> Option<Status> {
self.rx_map.get(k).map(|rx| rx.borrow().clone())
}
@@ -53,16 +48,11 @@ impl AppState {
.collect()
}
- pub fn stream(&self, k: &str) -> Option<impl Stream<Item = Status>> {
- self.rx_map.get(k).cloned().map(WatchStream::new)
+ pub fn stream(&self, k: &str) -> Option<StatusStream> {
+ self.rx_map.get(k).map(Into::into)
}
- pub fn streams(&self) -> impl Stream<Item = (String, Status)> {
- let iter = self.indexes.iter().cloned().map(|s| {
- let stream = self.stream(&s).expect("Service was unexpectedly removed");
- stream.map(move |status| (s.to_owned(), status))
- });
-
- futures::stream::select_all(iter)
+ pub fn streams(&self) -> SelectAll<NamedStatusStream> {
+ futures::stream::select_all(self.rx_map.iter().map(Into::into))
}
}