summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorToby Vincent <tobyv@tobyvin.dev>2024-10-15 17:22:31 -0500
committerToby Vincent <tobyv@tobyvin.dev>2024-10-15 17:22:31 -0500
commite5c1b2efb597bc7089e833bf570d8217c36f50c3 (patch)
tree5e9dd59db44d824e9df605154575eb3d1b4c79d1 /src
parentfd13b0b215a0763fa065038e7add3922059a332c (diff)
refactor: removed unused depsv0.1.0
Diffstat (limited to 'src')
-rw-r--r--src/api.rs7
-rw-r--r--src/api/services.rs4
-rw-r--r--src/api/sse.rs3
-rw-r--r--src/lib.rs4
-rw-r--r--src/service.rs11
-rw-r--r--src/service/command.rs11
-rw-r--r--src/service/http.rs4
-rw-r--r--src/service/tcp.rs4
-rw-r--r--src/state.rs24
-rw-r--r--src/status.rs7
-rw-r--r--src/status/stream.rs114
11 files changed, 145 insertions, 48 deletions
diff --git a/src/api.rs b/src/api.rs
index 57dc4b7..bab2043 100644
--- a/src/api.rs
+++ b/src/api.rs
@@ -1,5 +1,3 @@
-use axum::{extract::State, routing::get, Json};
-
use crate::AppState;
pub mod services;
@@ -9,9 +7,4 @@ pub fn router() -> axum::Router<AppState> {
axum::Router::new()
.nest("/sse", sse::router())
.nest("/status", services::router())
- .route("/list", get(names))
-}
-
-pub async fn names(State(state): State<AppState>) -> Json<Vec<String>> {
- Json(state.names())
}
diff --git a/src/api/services.rs b/src/api/services.rs
index aeca924..8c4860b 100644
--- a/src/api/services.rs
+++ b/src/api/services.rs
@@ -1,5 +1,3 @@
-use std::collections::HashMap;
-
use axum::{
extract::{Path, Query, State},
routing::get,
@@ -31,7 +29,7 @@ impl ServiceQuery {
pub async fn services(
Query(query): Query<ServiceQuery>,
State(state): State<AppState>,
-) -> Json<HashMap<String, Status>> {
+) -> Json<Vec<(String, Status)>> {
Json(
state
.statuses()
diff --git a/src/api/sse.rs b/src/api/sse.rs
index 5d913bb..4fe0bf4 100644
--- a/src/api/sse.rs
+++ b/src/api/sse.rs
@@ -9,8 +9,7 @@ use axum::{
routing::get,
Router,
};
-use futures::Stream;
-use tokio_stream::StreamExt;
+use futures::{Stream, StreamExt};
use crate::{AppState, Error};
diff --git a/src/lib.rs b/src/lib.rs
index 6a64876..a9550ba 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -11,5 +11,7 @@ pub mod state;
pub mod status;
pub fn router() -> axum::Router<AppState> {
- axum::Router::new().nest("/api/v1", api::router())
+ axum::Router::new()
+ .nest("/api/v1", api::router())
+ .fallback(|uri: axum::http::Uri| async { Error::RouteNotFound(uri) })
}
diff --git a/src/service.rs b/src/service.rs
index 74ecb1d..1df48be 100644
--- a/src/service.rs
+++ b/src/service.rs
@@ -1,20 +1,17 @@
use serde::Deserialize;
-use crate::Status;
+use crate::{status::Sender, Status};
pub mod command;
pub mod http;
pub mod tcp;
pub trait IntoService {
- fn into_service(
- self,
- tx: tokio::sync::watch::Sender<Status>,
- ) -> impl std::future::Future<Output = ()>;
+ fn into_service(self, tx: Sender) -> impl std::future::Future<Output = ()>;
}
impl IntoService for () {
- async fn into_service(self, tx: tokio::sync::watch::Sender<Status>) {
+ async fn into_service(self, tx: Sender) {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(3));
let mut status = Status::Ok;
@@ -50,7 +47,7 @@ pub enum ServiceKind {
}
impl IntoService for ServiceKind {
- async fn into_service(self, tx: tokio::sync::watch::Sender<Status>) {
+ async fn into_service(self, tx: Sender) {
match self {
ServiceKind::Test(()) => ().into_service(tx).await,
ServiceKind::Http(h) => h.into_service(tx).await,
diff --git a/src/service/command.rs b/src/service/command.rs
index 6b44e1d..7f3bd59 100644
--- a/src/service/command.rs
+++ b/src/service/command.rs
@@ -1,12 +1,9 @@
use std::{process::Stdio, time::Duration};
use serde::{Deserialize, Serialize};
-use tokio::{
- io::{AsyncBufReadExt, BufReader},
- sync::watch::Sender,
-};
+use tokio::io::{AsyncBufReadExt, BufReader};
-use crate::Status;
+use crate::status::Sender;
use super::IntoService;
@@ -43,7 +40,7 @@ pub struct Command {
}
impl Command {
- async fn persist(&self, tx: Sender<Status>) -> Result<(), Error> {
+ async fn persist(&self, tx: Sender) -> Result<(), Error> {
let mut command = tokio::process::Command::new(&self.command);
command.args(&self.args);
@@ -83,7 +80,7 @@ impl Command {
}
impl IntoService for Command {
- async fn into_service(self, tx: tokio::sync::watch::Sender<Status>) {
+ async fn into_service(self, tx: Sender) {
let mut interval = tokio::time::interval(self.interval);
loop {
interval.tick().await;
diff --git a/src/service/http.rs b/src/service/http.rs
index bfba007..5b0fdd5 100644
--- a/src/service/http.rs
+++ b/src/service/http.rs
@@ -5,7 +5,7 @@ use reqwest::Client;
use serde::{Deserialize, Serialize};
use url::Url;
-use crate::Status;
+use crate::status::Sender;
use super::IntoService;
@@ -47,7 +47,7 @@ impl Http {
}
impl IntoService for Http {
- async fn into_service(self, tx: tokio::sync::watch::Sender<Status>) {
+ async fn into_service(self, tx: Sender) {
let mut interval = tokio::time::interval(self.interval);
loop {
interval.tick().await;
diff --git a/src/service/tcp.rs b/src/service/tcp.rs
index e28b19d..974d2f6 100644
--- a/src/service/tcp.rs
+++ b/src/service/tcp.rs
@@ -3,7 +3,7 @@ use std::{fmt::Display, net::SocketAddr, time::Duration};
use serde::{Deserialize, Serialize};
use tokio::{io::Interest, net::TcpSocket};
-use crate::Status;
+use crate::status::Sender;
use super::IntoService;
@@ -33,7 +33,7 @@ impl Tcp {
}
impl IntoService for Tcp {
- async fn into_service(self, tx: tokio::sync::watch::Sender<Status>) {
+ async fn into_service(self, tx: Sender) {
let mut interval = tokio::time::interval(self.interval);
loop {
diff --git a/src/state.rs b/src/state.rs
index 857b8af..8c31065 100644
--- a/src/state.rs
+++ b/src/state.rs
@@ -1,17 +1,16 @@
use std::collections::HashMap;
-use futures::{Stream, StreamExt};
-use tokio::sync::watch::Receiver;
-use tokio_stream::wrappers::WatchStream;
+use futures::stream::SelectAll;
use crate::{
service::{IntoService, ServiceConfig},
+ status::{NamedStatusStream, Receiver, StatusStream},
Status,
};
#[derive(Clone)]
pub struct AppState {
- rx_map: HashMap<String, Receiver<Status>>,
+ rx_map: HashMap<String, Receiver>,
indexes: Vec<String>,
}
@@ -34,10 +33,6 @@ impl AppState {
AppState { rx_map, indexes }
}
- pub fn names(&self) -> Vec<String> {
- self.indexes.clone()
- }
-
pub fn status(&self, k: &str) -> Option<Status> {
self.rx_map.get(k).map(|rx| rx.borrow().clone())
}
@@ -53,16 +48,11 @@ impl AppState {
.collect()
}
- pub fn stream(&self, k: &str) -> Option<impl Stream<Item = Status>> {
- self.rx_map.get(k).cloned().map(WatchStream::new)
+ pub fn stream(&self, k: &str) -> Option<StatusStream> {
+ self.rx_map.get(k).map(Into::into)
}
- pub fn streams(&self) -> impl Stream<Item = (String, Status)> {
- let iter = self.indexes.iter().cloned().map(|s| {
- let stream = self.stream(&s).expect("Service was unexpectedly removed");
- stream.map(move |status| (s.to_owned(), status))
- });
-
- futures::stream::select_all(iter)
+ pub fn streams(&self) -> SelectAll<NamedStatusStream> {
+ futures::stream::select_all(self.rx_map.iter().map(Into::into))
}
}
diff --git a/src/status.rs b/src/status.rs
index 6e674c8..96f2cbd 100644
--- a/src/status.rs
+++ b/src/status.rs
@@ -1,6 +1,13 @@
use axum::response::sse::Event;
use serde::{Deserialize, Serialize};
+pub type Receiver = tokio::sync::watch::Receiver<Status>;
+pub type Sender = tokio::sync::watch::Sender<Status>;
+
+pub use stream::{NamedStatusStream, StatusStream};
+
+pub mod stream;
+
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
#[serde(rename_all = "lowercase", tag = "status", content = "output")]
pub enum Status {
diff --git a/src/status/stream.rs b/src/status/stream.rs
new file mode 100644
index 0000000..85a2574
--- /dev/null
+++ b/src/status/stream.rs
@@ -0,0 +1,114 @@
+use std::{
+ pin::Pin,
+ task::{ready, Context, Poll},
+};
+
+use tokio::sync::watch::{error::RecvError, Receiver};
+use tokio_util::sync::ReusableBoxFuture;
+
+use super::Status;
+
+pub struct StatusStream {
+ inner: ReusableBoxFuture<'static, (Result<(), RecvError>, Receiver<Status>)>,
+}
+
+impl StatusStream {
+ pub fn new(rx: Receiver<Status>) -> Self {
+ Self {
+ inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }),
+ }
+ }
+
+ pub fn named(self, name: String) -> NamedStatusStream {
+ NamedStatusStream {
+ name,
+ inner: self.inner,
+ }
+ }
+}
+
+impl futures::Stream for StatusStream {
+ type Item = Status;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let (result, mut rx) = ready!(self.inner.poll(cx));
+ match result {
+ Ok(_) => {
+ let received = (*rx.borrow_and_update()).clone();
+ self.inner.set(async { (rx.changed().await, rx) });
+ Poll::Ready(Some(received))
+ }
+ Err(_) => {
+ self.inner.set(async { (rx.changed().await, rx) });
+ Poll::Ready(None)
+ }
+ }
+ }
+}
+
+impl From<Receiver<Status>> for StatusStream {
+ fn from(value: Receiver<Status>) -> Self {
+ StatusStream::new(value)
+ }
+}
+
+impl From<&Receiver<Status>> for StatusStream {
+ fn from(value: &Receiver<Status>) -> Self {
+ value.to_owned().into()
+ }
+}
+
+pub struct NamedStatusStream {
+ name: String,
+ inner: ReusableBoxFuture<'static, (Result<(), RecvError>, Receiver<Status>)>,
+}
+
+impl NamedStatusStream {
+ pub fn new(name: String, rx: Receiver<Status>) -> Self {
+ Self {
+ name,
+ inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }),
+ }
+ }
+
+ pub fn unnamed(self) -> StatusStream {
+ StatusStream { inner: self.inner }
+ }
+}
+
+impl futures::Stream for NamedStatusStream {
+ type Item = (String, Status);
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let (result, mut rx) = ready!(self.inner.poll(cx));
+ match result {
+ Ok(_) => {
+ let received = (*rx.borrow_and_update()).clone();
+ self.inner.set(async { (rx.changed().await, rx) });
+ Poll::Ready(Some((self.name.to_owned(), received)))
+ }
+ Err(_) => {
+ self.inner.set(async { (rx.changed().await, rx) });
+ Poll::Ready(None)
+ }
+ }
+ }
+}
+
+impl From<(String, Receiver<Status>)> for NamedStatusStream {
+ fn from((name, rx): (String, Receiver<Status>)) -> Self {
+ NamedStatusStream::new(name, rx)
+ }
+}
+
+impl From<(&String, &Receiver<Status>)> for NamedStatusStream {
+ fn from(value: (&String, &Receiver<Status>)) -> Self {
+ (value.0.to_owned(), value.1.to_owned()).into()
+ }
+}
+
+impl From<&(String, Receiver<Status>)> for NamedStatusStream {
+ fn from(value: &(String, Receiver<Status>)) -> Self {
+ value.to_owned().into()
+ }
+}