From b94f8e694bf01f5dba9ce2c01f589463a3dfbc69 Mon Sep 17 00:00:00 2001 From: Toby Vincent Date: Wed, 9 Oct 2024 18:23:58 -0500 Subject: feat!: rewrite to use traits and streams --- src/service/http.rs | 54 ++++++++++++++++++++++++++++++++--------------------- 1 file changed, 33 insertions(+), 21 deletions(-) (limited to 'src/service/http.rs') diff --git a/src/service/http.rs b/src/service/http.rs index 8950096..c4fcee7 100644 --- a/src/service/http.rs +++ b/src/service/http.rs @@ -1,13 +1,20 @@ use std::{fmt::Display, time::Duration}; +use async_stream::try_stream; use axum::http::status::StatusCode; +use futures::Stream; use serde::Deserialize; -use tokio::sync::watch::Sender; use url::Url; -use crate::{Error, Status}; +use super::IntoService; -use super::ServiceSpawner; +#[derive(Debug, thiserror::Error)] +pub enum Error { + #[error("Request error: {0}")] + Reqwest(#[from] reqwest::Error), + #[error("Bad status code: {0}")] + StatusCode(u16), +} #[derive(Debug, Clone, Deserialize)] pub struct Http { @@ -18,26 +25,31 @@ pub struct Http { pub status_code: StatusCode, #[serde(skip, default)] pub client: Option, + #[serde(default = "super::default_interval")] + pub interval: Duration, } -impl ServiceSpawner for Http { - async fn spawn(self, tx: Sender) -> Result<(), Error> { - let client = self.client.unwrap_or_default(); - let request = client.request(self.method.into(), self.url).build()?; - - let mut interval = tokio::time::interval(Duration::from_secs(5)); - loop { - interval.tick().await; - let req = request - .try_clone() - .expect("Clone with no body should never fail"); - let resp = client.execute(req).await; - let status = resp.map_or_else(Into::into, |r| match r.status().as_u16() { - c if c == self.status_code => Status::Ok, - c => Status::Error(Some(format!("Status code: {c}"))), - }); - - tx.send_if_modified(|s| s.update(status)); +impl IntoService for Http { + type Error = Error; + + fn into_service(self) -> impl Stream> { + let mut interval = tokio::time::interval(self.interval); + + try_stream! { + let client = self.client.unwrap_or_default(); + let req = client.request(self.method.into(), self.url).build()?; + loop { + interval.tick().await; + let req = req + .try_clone() + .expect("Clone with no body should never fail"); + let status_code = client.execute(req).await?.status().as_u16(); + if status_code == self.status_code { + yield (); + } else { + Err(Error::StatusCode(status_code))? + } + } } } } -- cgit v1.2.3-70-g09d2