summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorToby Vincent <tobyv@tobyvin.dev>2024-10-15 17:22:31 -0500
committerToby Vincent <tobyv@tobyvin.dev>2024-10-15 17:22:31 -0500
commite5c1b2efb597bc7089e833bf570d8217c36f50c3 (patch)
tree5e9dd59db44d824e9df605154575eb3d1b4c79d1
parentfd13b0b215a0763fa065038e7add3922059a332c (diff)
refactor: removed unused depsv0.1.0
-rw-r--r--Cargo.lock69
-rw-r--r--Cargo.toml8
-rw-r--r--assets/index.js22
-rw-r--r--src/api.rs7
-rw-r--r--src/api/services.rs4
-rw-r--r--src/api/sse.rs3
-rw-r--r--src/lib.rs4
-rw-r--r--src/service.rs11
-rw-r--r--src/service/command.rs11
-rw-r--r--src/service/http.rs4
-rw-r--r--src/service/tcp.rs4
-rw-r--r--src/state.rs24
-rw-r--r--src/status.rs7
-rw-r--r--src/status/stream.rs114
14 files changed, 159 insertions, 133 deletions
diff --git a/Cargo.lock b/Cargo.lock
index 215b879..8607bc2 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -27,28 +27,6 @@ dependencies = [
]
[[package]]
-name = "async-stream"
-version = "0.3.5"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "cd56dd203fef61ac097dd65721a419ddccb106b2d2b70ba60a6b529f03961a51"
-dependencies = [
- "async-stream-impl",
- "futures-core",
- "pin-project-lite",
-]
-
-[[package]]
-name = "async-stream-impl"
-version = "0.3.5"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193"
-dependencies = [
- "proc-macro2",
- "quote",
- "syn",
-]
-
-[[package]]
name = "async-trait"
version = "0.1.83"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -384,9 +362,9 @@ dependencies = [
[[package]]
name = "hashbrown"
-version = "0.14.5"
+version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
+checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb"
[[package]]
name = "hermit-abi"
@@ -531,9 +509,9 @@ dependencies = [
[[package]]
name = "indexmap"
-version = "2.5.0"
+version = "2.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "68b900aa2f7301e21c36462b170ee99994de34dff39a4a6a528e80e7376d07e5"
+checksum = "707907fe3c25f5424cce2cb7e1cbcafee6bdbe735ca90ef77c29e84591e5b9da"
dependencies = [
"equivalent",
"hashbrown",
@@ -1103,22 +1081,18 @@ checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
name = "statsrv"
version = "0.1.0"
dependencies = [
- "async-stream",
"axum",
"futures",
- "futures-util",
- "hyper-util",
"reqwest",
"serde",
"serde_json",
"thiserror",
"tokio",
- "tokio-stream",
+ "tokio-util",
"toml",
"tower-http",
"tracing",
"tracing-subscriber",
- "tracing-test",
"url",
]
@@ -1283,18 +1257,6 @@ dependencies = [
]
[[package]]
-name = "tokio-stream"
-version = "0.1.16"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "4f4e6ce100d0eb49a2734f8c0812bcd324cf357d21810932c5df6b96ef2b86f1"
-dependencies = [
- "futures-core",
- "pin-project-lite",
- "tokio",
- "tokio-util",
-]
-
-[[package]]
name = "tokio-util"
version = "0.7.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -1457,27 +1419,6 @@ dependencies = [
]
[[package]]
-name = "tracing-test"
-version = "0.2.5"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "557b891436fe0d5e0e363427fc7f217abf9ccd510d5136549847bdcbcd011d68"
-dependencies = [
- "tracing-core",
- "tracing-subscriber",
- "tracing-test-macro",
-]
-
-[[package]]
-name = "tracing-test-macro"
-version = "0.2.5"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "04659ddb06c87d233c566112c1c9c5b9e98256d9af50ec3bc9c8327f873a7568"
-dependencies = [
- "quote",
- "syn",
-]
-
-[[package]]
name = "try-lock"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/Cargo.toml b/Cargo.toml
index ff7fccc..5dba372 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -7,11 +7,8 @@ authors = ["Toby Vincent <tobyv@tobyvin.dev>"]
description = "Minimal and flexable service status API"
[dependencies]
-async-stream = "0.3.5"
axum = "0.7.6"
futures = "0.3.30"
-futures-util = "0.3.30"
-hyper-util = { version = "0.1.9", features = ["tokio"] }
reqwest = "0.12.7"
serde = { version = "1.0.210", features = ["derive"] }
serde_json = "1.0.128"
@@ -23,12 +20,9 @@ tokio = { version = "1.40.0", features = [
"time",
"process",
] }
-tokio-stream = { version = "0.1.16", features = ["io-util", "sync"] }
+tokio-util = "0.7.12"
toml = "0.8.19"
tower-http = { version = "0.6.1", features = ["fs", "trace"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
url = { version = "2.5.2", features = ["serde"] }
-
-[dev-dependencies]
-tracing-test = "0.2.5"
diff --git a/assets/index.js b/assets/index.js
index a4e5c2b..4b3cbbb 100644
--- a/assets/index.js
+++ b/assets/index.js
@@ -1,8 +1,7 @@
const serviceMap = new Map();
-const elementMap = new Map();
async function getServices() {
- const url = "api/v1/list";
+ const url = "api/v1/status";
try {
const response = await fetch(url);
if (!response.ok) {
@@ -18,14 +17,13 @@ async function getServices() {
}
function updateStatus() {
- const statusElm = document.getElementById("status");
const issuesElm = document.getElementById("issues");
const issues = [...serviceMap.values()].filter((s) => !s).length;
issuesElm.textContent = `${issues} issue(s) detected`;
if (issues) {
- statusElm.setAttribute("class", "error");
+ issuesElm.setAttribute("class", "error");
} else {
- statusElm.setAttribute("class", "ok");
+ issuesElm.setAttribute("class", "ok");
}
}
@@ -70,19 +68,15 @@ function createStatusTbody(name, data) {
}
getServices().then((services) => {
+ const table = document.getElementById("services");
const evtSource = new EventSource("/api/v1/sse");
- evtSource.onmessage = (event) => {
- console.log(event.data);
- };
-
- for (const service of services) {
- const table = document.getElementById("services");
- let tbody = createStatusTbody(service, { "data": "ok" });
+ for (const [name, status] of services) {
+ let tbody = createStatusTbody(name, status);
table.appendChild(tbody);
- evtSource.addEventListener(service, (event) => {
+ evtSource.addEventListener(name, (event) => {
data = JSON.parse(event.data);
- tbodyNew = createStatusTbody(service, data);
+ tbodyNew = createStatusTbody(name, data);
tbody.replaceWith(tbodyNew);
tbody = tbodyNew;
updateStatus();
diff --git a/src/api.rs b/src/api.rs
index 57dc4b7..bab2043 100644
--- a/src/api.rs
+++ b/src/api.rs
@@ -1,5 +1,3 @@
-use axum::{extract::State, routing::get, Json};
-
use crate::AppState;
pub mod services;
@@ -9,9 +7,4 @@ pub fn router() -> axum::Router<AppState> {
axum::Router::new()
.nest("/sse", sse::router())
.nest("/status", services::router())
- .route("/list", get(names))
-}
-
-pub async fn names(State(state): State<AppState>) -> Json<Vec<String>> {
- Json(state.names())
}
diff --git a/src/api/services.rs b/src/api/services.rs
index aeca924..8c4860b 100644
--- a/src/api/services.rs
+++ b/src/api/services.rs
@@ -1,5 +1,3 @@
-use std::collections::HashMap;
-
use axum::{
extract::{Path, Query, State},
routing::get,
@@ -31,7 +29,7 @@ impl ServiceQuery {
pub async fn services(
Query(query): Query<ServiceQuery>,
State(state): State<AppState>,
-) -> Json<HashMap<String, Status>> {
+) -> Json<Vec<(String, Status)>> {
Json(
state
.statuses()
diff --git a/src/api/sse.rs b/src/api/sse.rs
index 5d913bb..4fe0bf4 100644
--- a/src/api/sse.rs
+++ b/src/api/sse.rs
@@ -9,8 +9,7 @@ use axum::{
routing::get,
Router,
};
-use futures::Stream;
-use tokio_stream::StreamExt;
+use futures::{Stream, StreamExt};
use crate::{AppState, Error};
diff --git a/src/lib.rs b/src/lib.rs
index 6a64876..a9550ba 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -11,5 +11,7 @@ pub mod state;
pub mod status;
pub fn router() -> axum::Router<AppState> {
- axum::Router::new().nest("/api/v1", api::router())
+ axum::Router::new()
+ .nest("/api/v1", api::router())
+ .fallback(|uri: axum::http::Uri| async { Error::RouteNotFound(uri) })
}
diff --git a/src/service.rs b/src/service.rs
index 74ecb1d..1df48be 100644
--- a/src/service.rs
+++ b/src/service.rs
@@ -1,20 +1,17 @@
use serde::Deserialize;
-use crate::Status;
+use crate::{status::Sender, Status};
pub mod command;
pub mod http;
pub mod tcp;
pub trait IntoService {
- fn into_service(
- self,
- tx: tokio::sync::watch::Sender<Status>,
- ) -> impl std::future::Future<Output = ()>;
+ fn into_service(self, tx: Sender) -> impl std::future::Future<Output = ()>;
}
impl IntoService for () {
- async fn into_service(self, tx: tokio::sync::watch::Sender<Status>) {
+ async fn into_service(self, tx: Sender) {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(3));
let mut status = Status::Ok;
@@ -50,7 +47,7 @@ pub enum ServiceKind {
}
impl IntoService for ServiceKind {
- async fn into_service(self, tx: tokio::sync::watch::Sender<Status>) {
+ async fn into_service(self, tx: Sender) {
match self {
ServiceKind::Test(()) => ().into_service(tx).await,
ServiceKind::Http(h) => h.into_service(tx).await,
diff --git a/src/service/command.rs b/src/service/command.rs
index 6b44e1d..7f3bd59 100644
--- a/src/service/command.rs
+++ b/src/service/command.rs
@@ -1,12 +1,9 @@
use std::{process::Stdio, time::Duration};
use serde::{Deserialize, Serialize};
-use tokio::{
- io::{AsyncBufReadExt, BufReader},
- sync::watch::Sender,
-};
+use tokio::io::{AsyncBufReadExt, BufReader};
-use crate::Status;
+use crate::status::Sender;
use super::IntoService;
@@ -43,7 +40,7 @@ pub struct Command {
}
impl Command {
- async fn persist(&self, tx: Sender<Status>) -> Result<(), Error> {
+ async fn persist(&self, tx: Sender) -> Result<(), Error> {
let mut command = tokio::process::Command::new(&self.command);
command.args(&self.args);
@@ -83,7 +80,7 @@ impl Command {
}
impl IntoService for Command {
- async fn into_service(self, tx: tokio::sync::watch::Sender<Status>) {
+ async fn into_service(self, tx: Sender) {
let mut interval = tokio::time::interval(self.interval);
loop {
interval.tick().await;
diff --git a/src/service/http.rs b/src/service/http.rs
index bfba007..5b0fdd5 100644
--- a/src/service/http.rs
+++ b/src/service/http.rs
@@ -5,7 +5,7 @@ use reqwest::Client;
use serde::{Deserialize, Serialize};
use url::Url;
-use crate::Status;
+use crate::status::Sender;
use super::IntoService;
@@ -47,7 +47,7 @@ impl Http {
}
impl IntoService for Http {
- async fn into_service(self, tx: tokio::sync::watch::Sender<Status>) {
+ async fn into_service(self, tx: Sender) {
let mut interval = tokio::time::interval(self.interval);
loop {
interval.tick().await;
diff --git a/src/service/tcp.rs b/src/service/tcp.rs
index e28b19d..974d2f6 100644
--- a/src/service/tcp.rs
+++ b/src/service/tcp.rs
@@ -3,7 +3,7 @@ use std::{fmt::Display, net::SocketAddr, time::Duration};
use serde::{Deserialize, Serialize};
use tokio::{io::Interest, net::TcpSocket};
-use crate::Status;
+use crate::status::Sender;
use super::IntoService;
@@ -33,7 +33,7 @@ impl Tcp {
}
impl IntoService for Tcp {
- async fn into_service(self, tx: tokio::sync::watch::Sender<Status>) {
+ async fn into_service(self, tx: Sender) {
let mut interval = tokio::time::interval(self.interval);
loop {
diff --git a/src/state.rs b/src/state.rs
index 857b8af..8c31065 100644
--- a/src/state.rs
+++ b/src/state.rs
@@ -1,17 +1,16 @@
use std::collections::HashMap;
-use futures::{Stream, StreamExt};
-use tokio::sync::watch::Receiver;
-use tokio_stream::wrappers::WatchStream;
+use futures::stream::SelectAll;
use crate::{
service::{IntoService, ServiceConfig},
+ status::{NamedStatusStream, Receiver, StatusStream},
Status,
};
#[derive(Clone)]
pub struct AppState {
- rx_map: HashMap<String, Receiver<Status>>,
+ rx_map: HashMap<String, Receiver>,
indexes: Vec<String>,
}
@@ -34,10 +33,6 @@ impl AppState {
AppState { rx_map, indexes }
}
- pub fn names(&self) -> Vec<String> {
- self.indexes.clone()
- }
-
pub fn status(&self, k: &str) -> Option<Status> {
self.rx_map.get(k).map(|rx| rx.borrow().clone())
}
@@ -53,16 +48,11 @@ impl AppState {
.collect()
}
- pub fn stream(&self, k: &str) -> Option<impl Stream<Item = Status>> {
- self.rx_map.get(k).cloned().map(WatchStream::new)
+ pub fn stream(&self, k: &str) -> Option<StatusStream> {
+ self.rx_map.get(k).map(Into::into)
}
- pub fn streams(&self) -> impl Stream<Item = (String, Status)> {
- let iter = self.indexes.iter().cloned().map(|s| {
- let stream = self.stream(&s).expect("Service was unexpectedly removed");
- stream.map(move |status| (s.to_owned(), status))
- });
-
- futures::stream::select_all(iter)
+ pub fn streams(&self) -> SelectAll<NamedStatusStream> {
+ futures::stream::select_all(self.rx_map.iter().map(Into::into))
}
}
diff --git a/src/status.rs b/src/status.rs
index 6e674c8..96f2cbd 100644
--- a/src/status.rs
+++ b/src/status.rs
@@ -1,6 +1,13 @@
use axum::response::sse::Event;
use serde::{Deserialize, Serialize};
+pub type Receiver = tokio::sync::watch::Receiver<Status>;
+pub type Sender = tokio::sync::watch::Sender<Status>;
+
+pub use stream::{NamedStatusStream, StatusStream};
+
+pub mod stream;
+
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
#[serde(rename_all = "lowercase", tag = "status", content = "output")]
pub enum Status {
diff --git a/src/status/stream.rs b/src/status/stream.rs
new file mode 100644
index 0000000..85a2574
--- /dev/null
+++ b/src/status/stream.rs
@@ -0,0 +1,114 @@
+use std::{
+ pin::Pin,
+ task::{ready, Context, Poll},
+};
+
+use tokio::sync::watch::{error::RecvError, Receiver};
+use tokio_util::sync::ReusableBoxFuture;
+
+use super::Status;
+
+pub struct StatusStream {
+ inner: ReusableBoxFuture<'static, (Result<(), RecvError>, Receiver<Status>)>,
+}
+
+impl StatusStream {
+ pub fn new(rx: Receiver<Status>) -> Self {
+ Self {
+ inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }),
+ }
+ }
+
+ pub fn named(self, name: String) -> NamedStatusStream {
+ NamedStatusStream {
+ name,
+ inner: self.inner,
+ }
+ }
+}
+
+impl futures::Stream for StatusStream {
+ type Item = Status;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let (result, mut rx) = ready!(self.inner.poll(cx));
+ match result {
+ Ok(_) => {
+ let received = (*rx.borrow_and_update()).clone();
+ self.inner.set(async { (rx.changed().await, rx) });
+ Poll::Ready(Some(received))
+ }
+ Err(_) => {
+ self.inner.set(async { (rx.changed().await, rx) });
+ Poll::Ready(None)
+ }
+ }
+ }
+}
+
+impl From<Receiver<Status>> for StatusStream {
+ fn from(value: Receiver<Status>) -> Self {
+ StatusStream::new(value)
+ }
+}
+
+impl From<&Receiver<Status>> for StatusStream {
+ fn from(value: &Receiver<Status>) -> Self {
+ value.to_owned().into()
+ }
+}
+
+pub struct NamedStatusStream {
+ name: String,
+ inner: ReusableBoxFuture<'static, (Result<(), RecvError>, Receiver<Status>)>,
+}
+
+impl NamedStatusStream {
+ pub fn new(name: String, rx: Receiver<Status>) -> Self {
+ Self {
+ name,
+ inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }),
+ }
+ }
+
+ pub fn unnamed(self) -> StatusStream {
+ StatusStream { inner: self.inner }
+ }
+}
+
+impl futures::Stream for NamedStatusStream {
+ type Item = (String, Status);
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let (result, mut rx) = ready!(self.inner.poll(cx));
+ match result {
+ Ok(_) => {
+ let received = (*rx.borrow_and_update()).clone();
+ self.inner.set(async { (rx.changed().await, rx) });
+ Poll::Ready(Some((self.name.to_owned(), received)))
+ }
+ Err(_) => {
+ self.inner.set(async { (rx.changed().await, rx) });
+ Poll::Ready(None)
+ }
+ }
+ }
+}
+
+impl From<(String, Receiver<Status>)> for NamedStatusStream {
+ fn from((name, rx): (String, Receiver<Status>)) -> Self {
+ NamedStatusStream::new(name, rx)
+ }
+}
+
+impl From<(&String, &Receiver<Status>)> for NamedStatusStream {
+ fn from(value: (&String, &Receiver<Status>)) -> Self {
+ (value.0.to_owned(), value.1.to_owned()).into()
+ }
+}
+
+impl From<&(String, Receiver<Status>)> for NamedStatusStream {
+ fn from(value: &(String, Receiver<Status>)) -> Self {
+ value.to_owned().into()
+ }
+}