summaryrefslogtreecommitdiffstats
path: root/src/state.rs
diff options
context:
space:
mode:
authorToby Vincent <tobyv@tobyvin.dev>2024-10-09 18:23:58 -0500
committerToby Vincent <tobyv@tobyvin.dev>2024-10-09 18:23:58 -0500
commitb94f8e694bf01f5dba9ce2c01f589463a3dfbc69 (patch)
treec787530e63fb510db31533166edf1b9ff54be62a /src/state.rs
parent117d33fc478bf529094850b1fe40c558f04c9865 (diff)
feat!: rewrite to use traits and streams
Diffstat (limited to 'src/state.rs')
-rw-r--r--src/state.rs73
1 files changed, 73 insertions, 0 deletions
diff --git a/src/state.rs b/src/state.rs
new file mode 100644
index 0000000..bbe235b
--- /dev/null
+++ b/src/state.rs
@@ -0,0 +1,73 @@
+use std::collections::HashMap;
+
+use futures::{Stream, StreamExt};
+use tokio::sync::watch::Receiver;
+use tokio_stream::wrappers::WatchStream;
+
+use crate::{
+ service::{IntoService, ServiceConfig},
+ Status,
+};
+
+#[derive(Clone)]
+pub struct AppState {
+ rx_map: HashMap<String, Receiver<Status>>,
+}
+
+impl AppState {
+ pub fn spawn_services(configs: HashMap<String, ServiceConfig>) -> AppState {
+ let mut rx_map = HashMap::new();
+ let mut tx_map = HashMap::new();
+ for name in configs.keys() {
+ let (tx, rx) = tokio::sync::watch::channel(Default::default());
+ rx_map.insert(name.clone(), rx);
+ tx_map.insert(name.clone(), tx);
+ }
+
+ for (name, config) in configs {
+ let (tx, rx) = tokio::sync::watch::channel(Status::default());
+ rx_map.insert(name.clone(), rx);
+ tokio::spawn(async move {
+ let mut stream = config.into_service();
+ while let Some(res) = stream.next().await {
+ let status = res.into();
+ tx.send_if_modified(|s| {
+ if *s != status {
+ tracing::debug!(name, ?status, "Updated service status");
+ *s = status;
+ true
+ } else {
+ false
+ }
+ });
+ }
+ });
+ }
+
+ AppState { rx_map }
+ }
+
+ pub fn status(&self, k: &str) -> Option<Status> {
+ self.rx_map.get(k).map(|rx| rx.borrow().clone())
+ }
+
+ pub fn statuses(&self) -> Vec<(String, Status)> {
+ self.rx_map
+ .iter()
+ .map(|(k, v)| (k.clone(), v.borrow().clone()))
+ .collect()
+ }
+
+ pub fn stream(&self, k: &str) -> Option<impl Stream<Item = Status>> {
+ self.rx_map.get(k).cloned().map(WatchStream::new)
+ }
+
+ pub fn streams(&self) -> impl Stream<Item = (String, Status)> {
+ let iter =
+ self.rx_map.clone().into_iter().map(|(name, rx)| {
+ WatchStream::new(rx).map(move |status| (name.to_owned(), status))
+ });
+
+ futures::stream::select_all(iter)
+ }
+}