summaryrefslogtreecommitdiffstats
path: root/src/service/http.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/service/http.rs')
-rw-r--r--src/service/http.rs54
1 files changed, 33 insertions, 21 deletions
diff --git a/src/service/http.rs b/src/service/http.rs
index 8950096..c4fcee7 100644
--- a/src/service/http.rs
+++ b/src/service/http.rs
@@ -1,13 +1,20 @@
use std::{fmt::Display, time::Duration};
+use async_stream::try_stream;
use axum::http::status::StatusCode;
+use futures::Stream;
use serde::Deserialize;
-use tokio::sync::watch::Sender;
use url::Url;
-use crate::{Error, Status};
+use super::IntoService;
-use super::ServiceSpawner;
+#[derive(Debug, thiserror::Error)]
+pub enum Error {
+ #[error("Request error: {0}")]
+ Reqwest(#[from] reqwest::Error),
+ #[error("Bad status code: {0}")]
+ StatusCode(u16),
+}
#[derive(Debug, Clone, Deserialize)]
pub struct Http {
@@ -18,26 +25,31 @@ pub struct Http {
pub status_code: StatusCode,
#[serde(skip, default)]
pub client: Option<reqwest::Client>,
+ #[serde(default = "super::default_interval")]
+ pub interval: Duration,
}
-impl ServiceSpawner for Http {
- async fn spawn(self, tx: Sender<Status>) -> Result<(), Error> {
- let client = self.client.unwrap_or_default();
- let request = client.request(self.method.into(), self.url).build()?;
-
- let mut interval = tokio::time::interval(Duration::from_secs(5));
- loop {
- interval.tick().await;
- let req = request
- .try_clone()
- .expect("Clone with no body should never fail");
- let resp = client.execute(req).await;
- let status = resp.map_or_else(Into::into, |r| match r.status().as_u16() {
- c if c == self.status_code => Status::Ok,
- c => Status::Error(Some(format!("Status code: {c}"))),
- });
-
- tx.send_if_modified(|s| s.update(status));
+impl IntoService for Http {
+ type Error = Error;
+
+ fn into_service(self) -> impl Stream<Item = Result<(), Self::Error>> {
+ let mut interval = tokio::time::interval(self.interval);
+
+ try_stream! {
+ let client = self.client.unwrap_or_default();
+ let req = client.request(self.method.into(), self.url).build()?;
+ loop {
+ interval.tick().await;
+ let req = req
+ .try_clone()
+ .expect("Clone with no body should never fail");
+ let status_code = client.execute(req).await?.status().as_u16();
+ if status_code == self.status_code {
+ yield ();
+ } else {
+ Err(Error::StatusCode(status_code))?
+ }
+ }
}
}
}