From e5c1b2efb597bc7089e833bf570d8217c36f50c3 Mon Sep 17 00:00:00 2001 From: Toby Vincent Date: Tue, 15 Oct 2024 17:22:31 -0500 Subject: refactor: removed unused deps --- Cargo.lock | 69 +++--------------------------- Cargo.toml | 8 +--- assets/index.js | 22 ++++------ src/api.rs | 7 --- src/api/services.rs | 4 +- src/api/sse.rs | 3 +- src/lib.rs | 4 +- src/service.rs | 11 ++--- src/service/command.rs | 11 ++--- src/service/http.rs | 4 +- src/service/tcp.rs | 4 +- src/state.rs | 24 +++-------- src/status.rs | 7 +++ src/status/stream.rs | 114 +++++++++++++++++++++++++++++++++++++++++++++++++ 14 files changed, 159 insertions(+), 133 deletions(-) create mode 100644 src/status/stream.rs diff --git a/Cargo.lock b/Cargo.lock index 215b879..8607bc2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -26,28 +26,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "async-stream" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51" -dependencies = [ - "async-stream-impl", - "futures-core", - "pin-project-lite", -] - -[[package]] -name = "async-stream-impl" -version = "0.3.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "async-trait" version = "0.1.83" @@ -384,9 +362,9 @@ dependencies = [ [[package]] name = "hashbrown" -version = "0.14.5" +version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" +checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb" [[package]] name = "hermit-abi" @@ -531,9 +509,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.5.0" +version = "2.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68b900aa2f7301e21c36462b170ee99994de34dff39a4a6a528e80e7376d07e5" +checksum = "707907fe3c25f5424cce2cb7e1cbcafee6bdbe735ca90ef77c29e84591e5b9da" dependencies = [ "equivalent", "hashbrown", @@ -1103,22 +1081,18 @@ checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" name = "statsrv" version = "0.1.0" dependencies = [ - "async-stream", "axum", "futures", - "futures-util", - "hyper-util", "reqwest", "serde", "serde_json", "thiserror", "tokio", - "tokio-stream", + "tokio-util", "toml", "tower-http", "tracing", "tracing-subscriber", - "tracing-test", "url", ] @@ -1282,18 +1256,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-stream" -version = "0.1.16" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4f4e6ce100d0eb49a2734f8c0812bcd324cf357d21810932c5df6b96ef2b86f1" -dependencies = [ - "futures-core", - "pin-project-lite", - "tokio", - "tokio-util", -] - [[package]] name = "tokio-util" version = "0.7.12" @@ -1456,27 +1418,6 @@ dependencies = [ "tracing-log", ] -[[package]] -name = "tracing-test" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "557b891436fe0d5e0e363427fc7f217abf9ccd510d5136549847bdcbcd011d68" -dependencies = [ - "tracing-core", - "tracing-subscriber", - "tracing-test-macro", -] - -[[package]] -name = "tracing-test-macro" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568" -dependencies = [ - "quote", - "syn", -] - [[package]] name = "try-lock" version = "0.2.5" diff --git a/Cargo.toml b/Cargo.toml index ff7fccc..5dba372 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,11 +7,8 @@ authors = ["Toby Vincent "] description = "Minimal and flexable service status API" [dependencies] -async-stream = "0.3.5" axum = "0.7.6" futures = "0.3.30" -futures-util = "0.3.30" -hyper-util = { version = "0.1.9", features = ["tokio"] } reqwest = "0.12.7" serde = { version = "1.0.210", features = ["derive"] } serde_json = "1.0.128" @@ -23,12 +20,9 @@ tokio = { version = "1.40.0", features = [ "time", "process", ] } -tokio-stream = { version = "0.1.16", features = ["io-util", "sync"] } +tokio-util = "0.7.12" toml = "0.8.19" tower-http = { version = "0.6.1", features = ["fs", "trace"] } tracing = "0.1.40" tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } url = { version = "2.5.2", features = ["serde"] } - -[dev-dependencies] -tracing-test = "0.2.5" diff --git a/assets/index.js b/assets/index.js index a4e5c2b..4b3cbbb 100644 --- a/assets/index.js +++ b/assets/index.js @@ -1,8 +1,7 @@ const serviceMap = new Map(); -const elementMap = new Map(); async function getServices() { - const url = "api/v1/list"; + const url = "api/v1/status"; try { const response = await fetch(url); if (!response.ok) { @@ -18,14 +17,13 @@ async function getServices() { } function updateStatus() { - const statusElm = document.getElementById("status"); const issuesElm = document.getElementById("issues"); const issues = [...serviceMap.values()].filter((s) => !s).length; issuesElm.textContent = `${issues} issue(s) detected`; if (issues) { - statusElm.setAttribute("class", "error"); + issuesElm.setAttribute("class", "error"); } else { - statusElm.setAttribute("class", "ok"); + issuesElm.setAttribute("class", "ok"); } } @@ -70,19 +68,15 @@ function createStatusTbody(name, data) { } getServices().then((services) => { + const table = document.getElementById("services"); const evtSource = new EventSource("/api/v1/sse"); - evtSource.onmessage = (event) => { - console.log(event.data); - }; - - for (const service of services) { - const table = document.getElementById("services"); - let tbody = createStatusTbody(service, { "data": "ok" }); + for (const [name, status] of services) { + let tbody = createStatusTbody(name, status); table.appendChild(tbody); - evtSource.addEventListener(service, (event) => { + evtSource.addEventListener(name, (event) => { data = JSON.parse(event.data); - tbodyNew = createStatusTbody(service, data); + tbodyNew = createStatusTbody(name, data); tbody.replaceWith(tbodyNew); tbody = tbodyNew; updateStatus(); diff --git a/src/api.rs b/src/api.rs index 57dc4b7..bab2043 100644 --- a/src/api.rs +++ b/src/api.rs @@ -1,5 +1,3 @@ -use axum::{extract::State, routing::get, Json}; - use crate::AppState; pub mod services; @@ -9,9 +7,4 @@ pub fn router() -> axum::Router { axum::Router::new() .nest("/sse", sse::router()) .nest("/status", services::router()) - .route("/list", get(names)) -} - -pub async fn names(State(state): State) -> Json> { - Json(state.names()) } diff --git a/src/api/services.rs b/src/api/services.rs index aeca924..8c4860b 100644 --- a/src/api/services.rs +++ b/src/api/services.rs @@ -1,5 +1,3 @@ -use std::collections::HashMap; - use axum::{ extract::{Path, Query, State}, routing::get, @@ -31,7 +29,7 @@ impl ServiceQuery { pub async fn services( Query(query): Query, State(state): State, -) -> Json> { +) -> Json> { Json( state .statuses() diff --git a/src/api/sse.rs b/src/api/sse.rs index 5d913bb..4fe0bf4 100644 --- a/src/api/sse.rs +++ b/src/api/sse.rs @@ -9,8 +9,7 @@ use axum::{ routing::get, Router, }; -use futures::Stream; -use tokio_stream::StreamExt; +use futures::{Stream, StreamExt}; use crate::{AppState, Error}; diff --git a/src/lib.rs b/src/lib.rs index 6a64876..a9550ba 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,5 +11,7 @@ pub mod state; pub mod status; pub fn router() -> axum::Router { - axum::Router::new().nest("/api/v1", api::router()) + axum::Router::new() + .nest("/api/v1", api::router()) + .fallback(|uri: axum::http::Uri| async { Error::RouteNotFound(uri) }) } diff --git a/src/service.rs b/src/service.rs index 74ecb1d..1df48be 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1,20 +1,17 @@ use serde::Deserialize; -use crate::Status; +use crate::{status::Sender, Status}; pub mod command; pub mod http; pub mod tcp; pub trait IntoService { - fn into_service( - self, - tx: tokio::sync::watch::Sender, - ) -> impl std::future::Future; + fn into_service(self, tx: Sender) -> impl std::future::Future; } impl IntoService for () { - async fn into_service(self, tx: tokio::sync::watch::Sender) { + async fn into_service(self, tx: Sender) { let mut interval = tokio::time::interval(std::time::Duration::from_secs(3)); let mut status = Status::Ok; @@ -50,7 +47,7 @@ pub enum ServiceKind { } impl IntoService for ServiceKind { - async fn into_service(self, tx: tokio::sync::watch::Sender) { + async fn into_service(self, tx: Sender) { match self { ServiceKind::Test(()) => ().into_service(tx).await, ServiceKind::Http(h) => h.into_service(tx).await, diff --git a/src/service/command.rs b/src/service/command.rs index 6b44e1d..7f3bd59 100644 --- a/src/service/command.rs +++ b/src/service/command.rs @@ -1,12 +1,9 @@ use std::{process::Stdio, time::Duration}; use serde::{Deserialize, Serialize}; -use tokio::{ - io::{AsyncBufReadExt, BufReader}, - sync::watch::Sender, -}; +use tokio::io::{AsyncBufReadExt, BufReader}; -use crate::Status; +use crate::status::Sender; use super::IntoService; @@ -43,7 +40,7 @@ pub struct Command { } impl Command { - async fn persist(&self, tx: Sender) -> Result<(), Error> { + async fn persist(&self, tx: Sender) -> Result<(), Error> { let mut command = tokio::process::Command::new(&self.command); command.args(&self.args); @@ -83,7 +80,7 @@ impl Command { } impl IntoService for Command { - async fn into_service(self, tx: tokio::sync::watch::Sender) { + async fn into_service(self, tx: Sender) { let mut interval = tokio::time::interval(self.interval); loop { interval.tick().await; diff --git a/src/service/http.rs b/src/service/http.rs index bfba007..5b0fdd5 100644 --- a/src/service/http.rs +++ b/src/service/http.rs @@ -5,7 +5,7 @@ use reqwest::Client; use serde::{Deserialize, Serialize}; use url::Url; -use crate::Status; +use crate::status::Sender; use super::IntoService; @@ -47,7 +47,7 @@ impl Http { } impl IntoService for Http { - async fn into_service(self, tx: tokio::sync::watch::Sender) { + async fn into_service(self, tx: Sender) { let mut interval = tokio::time::interval(self.interval); loop { interval.tick().await; diff --git a/src/service/tcp.rs b/src/service/tcp.rs index e28b19d..974d2f6 100644 --- a/src/service/tcp.rs +++ b/src/service/tcp.rs @@ -3,7 +3,7 @@ use std::{fmt::Display, net::SocketAddr, time::Duration}; use serde::{Deserialize, Serialize}; use tokio::{io::Interest, net::TcpSocket}; -use crate::Status; +use crate::status::Sender; use super::IntoService; @@ -33,7 +33,7 @@ impl Tcp { } impl IntoService for Tcp { - async fn into_service(self, tx: tokio::sync::watch::Sender) { + async fn into_service(self, tx: Sender) { let mut interval = tokio::time::interval(self.interval); loop { diff --git a/src/state.rs b/src/state.rs index 857b8af..8c31065 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,17 +1,16 @@ use std::collections::HashMap; -use futures::{Stream, StreamExt}; -use tokio::sync::watch::Receiver; -use tokio_stream::wrappers::WatchStream; +use futures::stream::SelectAll; use crate::{ service::{IntoService, ServiceConfig}, + status::{NamedStatusStream, Receiver, StatusStream}, Status, }; #[derive(Clone)] pub struct AppState { - rx_map: HashMap>, + rx_map: HashMap, indexes: Vec, } @@ -34,10 +33,6 @@ impl AppState { AppState { rx_map, indexes } } - pub fn names(&self) -> Vec { - self.indexes.clone() - } - pub fn status(&self, k: &str) -> Option { self.rx_map.get(k).map(|rx| rx.borrow().clone()) } @@ -53,16 +48,11 @@ impl AppState { .collect() } - pub fn stream(&self, k: &str) -> Option> { - self.rx_map.get(k).cloned().map(WatchStream::new) + pub fn stream(&self, k: &str) -> Option { + self.rx_map.get(k).map(Into::into) } - pub fn streams(&self) -> impl Stream { - let iter = self.indexes.iter().cloned().map(|s| { - let stream = self.stream(&s).expect("Service was unexpectedly removed"); - stream.map(move |status| (s.to_owned(), status)) - }); - - futures::stream::select_all(iter) + pub fn streams(&self) -> SelectAll { + futures::stream::select_all(self.rx_map.iter().map(Into::into)) } } diff --git a/src/status.rs b/src/status.rs index 6e674c8..96f2cbd 100644 --- a/src/status.rs +++ b/src/status.rs @@ -1,6 +1,13 @@ use axum::response::sse::Event; use serde::{Deserialize, Serialize}; +pub type Receiver = tokio::sync::watch::Receiver; +pub type Sender = tokio::sync::watch::Sender; + +pub use stream::{NamedStatusStream, StatusStream}; + +pub mod stream; + #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)] #[serde(rename_all = "lowercase", tag = "status", content = "output")] pub enum Status { diff --git a/src/status/stream.rs b/src/status/stream.rs new file mode 100644 index 0000000..85a2574 --- /dev/null +++ b/src/status/stream.rs @@ -0,0 +1,114 @@ +use std::{ + pin::Pin, + task::{ready, Context, Poll}, +}; + +use tokio::sync::watch::{error::RecvError, Receiver}; +use tokio_util::sync::ReusableBoxFuture; + +use super::Status; + +pub struct StatusStream { + inner: ReusableBoxFuture<'static, (Result<(), RecvError>, Receiver)>, +} + +impl StatusStream { + pub fn new(rx: Receiver) -> Self { + Self { + inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }), + } + } + + pub fn named(self, name: String) -> NamedStatusStream { + NamedStatusStream { + name, + inner: self.inner, + } + } +} + +impl futures::Stream for StatusStream { + type Item = Status; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let (result, mut rx) = ready!(self.inner.poll(cx)); + match result { + Ok(_) => { + let received = (*rx.borrow_and_update()).clone(); + self.inner.set(async { (rx.changed().await, rx) }); + Poll::Ready(Some(received)) + } + Err(_) => { + self.inner.set(async { (rx.changed().await, rx) }); + Poll::Ready(None) + } + } + } +} + +impl From> for StatusStream { + fn from(value: Receiver) -> Self { + StatusStream::new(value) + } +} + +impl From<&Receiver> for StatusStream { + fn from(value: &Receiver) -> Self { + value.to_owned().into() + } +} + +pub struct NamedStatusStream { + name: String, + inner: ReusableBoxFuture<'static, (Result<(), RecvError>, Receiver)>, +} + +impl NamedStatusStream { + pub fn new(name: String, rx: Receiver) -> Self { + Self { + name, + inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }), + } + } + + pub fn unnamed(self) -> StatusStream { + StatusStream { inner: self.inner } + } +} + +impl futures::Stream for NamedStatusStream { + type Item = (String, Status); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let (result, mut rx) = ready!(self.inner.poll(cx)); + match result { + Ok(_) => { + let received = (*rx.borrow_and_update()).clone(); + self.inner.set(async { (rx.changed().await, rx) }); + Poll::Ready(Some((self.name.to_owned(), received))) + } + Err(_) => { + self.inner.set(async { (rx.changed().await, rx) }); + Poll::Ready(None) + } + } + } +} + +impl From<(String, Receiver)> for NamedStatusStream { + fn from((name, rx): (String, Receiver)) -> Self { + NamedStatusStream::new(name, rx) + } +} + +impl From<(&String, &Receiver)> for NamedStatusStream { + fn from(value: (&String, &Receiver)) -> Self { + (value.0.to_owned(), value.1.to_owned()).into() + } +} + +impl From<&(String, Receiver)> for NamedStatusStream { + fn from(value: &(String, Receiver)) -> Self { + value.to_owned().into() + } +} -- cgit v1.2.3-70-g09d2