From e5c1b2efb597bc7089e833bf570d8217c36f50c3 Mon Sep 17 00:00:00 2001 From: Toby Vincent Date: Tue, 15 Oct 2024 17:22:31 -0500 Subject: refactor: removed unused deps --- src/api.rs | 7 --- src/api/services.rs | 4 +- src/api/sse.rs | 3 +- src/lib.rs | 4 +- src/service.rs | 11 ++--- src/service/command.rs | 11 ++--- src/service/http.rs | 4 +- src/service/tcp.rs | 4 +- src/state.rs | 24 +++-------- src/status.rs | 7 +++ src/status/stream.rs | 114 +++++++++++++++++++++++++++++++++++++++++++++++++ 11 files changed, 145 insertions(+), 48 deletions(-) create mode 100644 src/status/stream.rs (limited to 'src') diff --git a/src/api.rs b/src/api.rs index 57dc4b7..bab2043 100644 --- a/src/api.rs +++ b/src/api.rs @@ -1,5 +1,3 @@ -use axum::{extract::State, routing::get, Json}; - use crate::AppState; pub mod services; @@ -9,9 +7,4 @@ pub fn router() -> axum::Router { axum::Router::new() .nest("/sse", sse::router()) .nest("/status", services::router()) - .route("/list", get(names)) -} - -pub async fn names(State(state): State) -> Json> { - Json(state.names()) } diff --git a/src/api/services.rs b/src/api/services.rs index aeca924..8c4860b 100644 --- a/src/api/services.rs +++ b/src/api/services.rs @@ -1,5 +1,3 @@ -use std::collections::HashMap; - use axum::{ extract::{Path, Query, State}, routing::get, @@ -31,7 +29,7 @@ impl ServiceQuery { pub async fn services( Query(query): Query, State(state): State, -) -> Json> { +) -> Json> { Json( state .statuses() diff --git a/src/api/sse.rs b/src/api/sse.rs index 5d913bb..4fe0bf4 100644 --- a/src/api/sse.rs +++ b/src/api/sse.rs @@ -9,8 +9,7 @@ use axum::{ routing::get, Router, }; -use futures::Stream; -use tokio_stream::StreamExt; +use futures::{Stream, StreamExt}; use crate::{AppState, Error}; diff --git a/src/lib.rs b/src/lib.rs index 6a64876..a9550ba 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,5 +11,7 @@ pub mod state; pub mod status; pub fn router() -> axum::Router { - axum::Router::new().nest("/api/v1", api::router()) + axum::Router::new() + .nest("/api/v1", api::router()) + .fallback(|uri: axum::http::Uri| async { Error::RouteNotFound(uri) }) } diff --git a/src/service.rs b/src/service.rs index 74ecb1d..1df48be 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1,20 +1,17 @@ use serde::Deserialize; -use crate::Status; +use crate::{status::Sender, Status}; pub mod command; pub mod http; pub mod tcp; pub trait IntoService { - fn into_service( - self, - tx: tokio::sync::watch::Sender, - ) -> impl std::future::Future; + fn into_service(self, tx: Sender) -> impl std::future::Future; } impl IntoService for () { - async fn into_service(self, tx: tokio::sync::watch::Sender) { + async fn into_service(self, tx: Sender) { let mut interval = tokio::time::interval(std::time::Duration::from_secs(3)); let mut status = Status::Ok; @@ -50,7 +47,7 @@ pub enum ServiceKind { } impl IntoService for ServiceKind { - async fn into_service(self, tx: tokio::sync::watch::Sender) { + async fn into_service(self, tx: Sender) { match self { ServiceKind::Test(()) => ().into_service(tx).await, ServiceKind::Http(h) => h.into_service(tx).await, diff --git a/src/service/command.rs b/src/service/command.rs index 6b44e1d..7f3bd59 100644 --- a/src/service/command.rs +++ b/src/service/command.rs @@ -1,12 +1,9 @@ use std::{process::Stdio, time::Duration}; use serde::{Deserialize, Serialize}; -use tokio::{ - io::{AsyncBufReadExt, BufReader}, - sync::watch::Sender, -}; +use tokio::io::{AsyncBufReadExt, BufReader}; -use crate::Status; +use crate::status::Sender; use super::IntoService; @@ -43,7 +40,7 @@ pub struct Command { } impl Command { - async fn persist(&self, tx: Sender) -> Result<(), Error> { + async fn persist(&self, tx: Sender) -> Result<(), Error> { let mut command = tokio::process::Command::new(&self.command); command.args(&self.args); @@ -83,7 +80,7 @@ impl Command { } impl IntoService for Command { - async fn into_service(self, tx: tokio::sync::watch::Sender) { + async fn into_service(self, tx: Sender) { let mut interval = tokio::time::interval(self.interval); loop { interval.tick().await; diff --git a/src/service/http.rs b/src/service/http.rs index bfba007..5b0fdd5 100644 --- a/src/service/http.rs +++ b/src/service/http.rs @@ -5,7 +5,7 @@ use reqwest::Client; use serde::{Deserialize, Serialize}; use url::Url; -use crate::Status; +use crate::status::Sender; use super::IntoService; @@ -47,7 +47,7 @@ impl Http { } impl IntoService for Http { - async fn into_service(self, tx: tokio::sync::watch::Sender) { + async fn into_service(self, tx: Sender) { let mut interval = tokio::time::interval(self.interval); loop { interval.tick().await; diff --git a/src/service/tcp.rs b/src/service/tcp.rs index e28b19d..974d2f6 100644 --- a/src/service/tcp.rs +++ b/src/service/tcp.rs @@ -3,7 +3,7 @@ use std::{fmt::Display, net::SocketAddr, time::Duration}; use serde::{Deserialize, Serialize}; use tokio::{io::Interest, net::TcpSocket}; -use crate::Status; +use crate::status::Sender; use super::IntoService; @@ -33,7 +33,7 @@ impl Tcp { } impl IntoService for Tcp { - async fn into_service(self, tx: tokio::sync::watch::Sender) { + async fn into_service(self, tx: Sender) { let mut interval = tokio::time::interval(self.interval); loop { diff --git a/src/state.rs b/src/state.rs index 857b8af..8c31065 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,17 +1,16 @@ use std::collections::HashMap; -use futures::{Stream, StreamExt}; -use tokio::sync::watch::Receiver; -use tokio_stream::wrappers::WatchStream; +use futures::stream::SelectAll; use crate::{ service::{IntoService, ServiceConfig}, + status::{NamedStatusStream, Receiver, StatusStream}, Status, }; #[derive(Clone)] pub struct AppState { - rx_map: HashMap>, + rx_map: HashMap, indexes: Vec, } @@ -34,10 +33,6 @@ impl AppState { AppState { rx_map, indexes } } - pub fn names(&self) -> Vec { - self.indexes.clone() - } - pub fn status(&self, k: &str) -> Option { self.rx_map.get(k).map(|rx| rx.borrow().clone()) } @@ -53,16 +48,11 @@ impl AppState { .collect() } - pub fn stream(&self, k: &str) -> Option> { - self.rx_map.get(k).cloned().map(WatchStream::new) + pub fn stream(&self, k: &str) -> Option { + self.rx_map.get(k).map(Into::into) } - pub fn streams(&self) -> impl Stream { - let iter = self.indexes.iter().cloned().map(|s| { - let stream = self.stream(&s).expect("Service was unexpectedly removed"); - stream.map(move |status| (s.to_owned(), status)) - }); - - futures::stream::select_all(iter) + pub fn streams(&self) -> SelectAll { + futures::stream::select_all(self.rx_map.iter().map(Into::into)) } } diff --git a/src/status.rs b/src/status.rs index 6e674c8..96f2cbd 100644 --- a/src/status.rs +++ b/src/status.rs @@ -1,6 +1,13 @@ use axum::response::sse::Event; use serde::{Deserialize, Serialize}; +pub type Receiver = tokio::sync::watch::Receiver; +pub type Sender = tokio::sync::watch::Sender; + +pub use stream::{NamedStatusStream, StatusStream}; + +pub mod stream; + #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] #[serde(rename_all = "lowercase", tag = "status", content = "output")] pub enum Status { diff --git a/src/status/stream.rs b/src/status/stream.rs new file mode 100644 index 0000000..85a2574 --- /dev/null +++ b/src/status/stream.rs @@ -0,0 +1,114 @@ +use std::{ + pin::Pin, + task::{ready, Context, Poll}, +}; + +use tokio::sync::watch::{error::RecvError, Receiver}; +use tokio_util::sync::ReusableBoxFuture; + +use super::Status; + +pub struct StatusStream { + inner: ReusableBoxFuture<'static, (Result<(), RecvError>, Receiver)>, +} + +impl StatusStream { + pub fn new(rx: Receiver) -> Self { + Self { + inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }), + } + } + + pub fn named(self, name: String) -> NamedStatusStream { + NamedStatusStream { + name, + inner: self.inner, + } + } +} + +impl futures::Stream for StatusStream { + type Item = Status; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let (result, mut rx) = ready!(self.inner.poll(cx)); + match result { + Ok(_) => { + let received = (*rx.borrow_and_update()).clone(); + self.inner.set(async { (rx.changed().await, rx) }); + Poll::Ready(Some(received)) + } + Err(_) => { + self.inner.set(async { (rx.changed().await, rx) }); + Poll::Ready(None) + } + } + } +} + +impl From> for StatusStream { + fn from(value: Receiver) -> Self { + StatusStream::new(value) + } +} + +impl From<&Receiver> for StatusStream { + fn from(value: &Receiver) -> Self { + value.to_owned().into() + } +} + +pub struct NamedStatusStream { + name: String, + inner: ReusableBoxFuture<'static, (Result<(), RecvError>, Receiver)>, +} + +impl NamedStatusStream { + pub fn new(name: String, rx: Receiver) -> Self { + Self { + name, + inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }), + } + } + + pub fn unnamed(self) -> StatusStream { + StatusStream { inner: self.inner } + } +} + +impl futures::Stream for NamedStatusStream { + type Item = (String, Status); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let (result, mut rx) = ready!(self.inner.poll(cx)); + match result { + Ok(_) => { + let received = (*rx.borrow_and_update()).clone(); + self.inner.set(async { (rx.changed().await, rx) }); + Poll::Ready(Some((self.name.to_owned(), received))) + } + Err(_) => { + self.inner.set(async { (rx.changed().await, rx) }); + Poll::Ready(None) + } + } + } +} + +impl From<(String, Receiver)> for NamedStatusStream { + fn from((name, rx): (String, Receiver)) -> Self { + NamedStatusStream::new(name, rx) + } +} + +impl From<(&String, &Receiver)> for NamedStatusStream { + fn from(value: (&String, &Receiver)) -> Self { + (value.0.to_owned(), value.1.to_owned()).into() + } +} + +impl From<&(String, Receiver)> for NamedStatusStream { + fn from(value: &(String, Receiver)) -> Self { + value.to_owned().into() + } +} -- cgit v1.2.3-70-g09d2