summaryrefslogtreecommitdiffstats
path: root/src/service/http.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/service/http.rs')
-rw-r--r--src/service/http.rs51
1 files changed, 27 insertions, 24 deletions
diff --git a/src/service/http.rs b/src/service/http.rs
index c4fcee7..bfba007 100644
--- a/src/service/http.rs
+++ b/src/service/http.rs
@@ -1,11 +1,12 @@
use std::{fmt::Display, time::Duration};
-use async_stream::try_stream;
use axum::http::status::StatusCode;
-use futures::Stream;
-use serde::Deserialize;
+use reqwest::Client;
+use serde::{Deserialize, Serialize};
use url::Url;
+use crate::Status;
+
use super::IntoService;
#[derive(Debug, thiserror::Error)]
@@ -16,7 +17,7 @@ pub enum Error {
StatusCode(u16),
}
-#[derive(Debug, Clone, Deserialize)]
+#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Http {
pub url: Url,
#[serde(default)]
@@ -29,32 +30,34 @@ pub struct Http {
pub interval: Duration,
}
-impl IntoService for Http {
- type Error = Error;
+impl Http {
+ async fn check(&self) -> Result<(), Error> {
+ let client = match self.client.as_ref() {
+ Some(client) => client,
+ None => &Client::new(),
+ };
+ let req = client
+ .request(self.method.into(), self.url.clone())
+ .build()?;
+ let status_code = client.execute(req).await?.status().as_u16();
+ (status_code == self.status_code)
+ .then_some(())
+ .ok_or_else(|| Error::StatusCode(status_code))
+ }
+}
- fn into_service(self) -> impl Stream<Item = Result<(), Self::Error>> {
+impl IntoService for Http {
+ async fn into_service(self, tx: tokio::sync::watch::Sender<Status>) {
let mut interval = tokio::time::interval(self.interval);
-
- try_stream! {
- let client = self.client.unwrap_or_default();
- let req = client.request(self.method.into(), self.url).build()?;
- loop {
- interval.tick().await;
- let req = req
- .try_clone()
- .expect("Clone with no body should never fail");
- let status_code = client.execute(req).await?.status().as_u16();
- if status_code == self.status_code {
- yield ();
- } else {
- Err(Error::StatusCode(status_code))?
- }
- }
+ loop {
+ interval.tick().await;
+ let res = self.check().await;
+ tx.send_if_modified(|s| s.update(res.into()));
}
}
}
-#[derive(Debug, Clone, Copy, Default, Deserialize)]
+#[derive(Debug, Clone, Copy, Default, Deserialize, Serialize)]
pub enum Method {
#[serde(alias = "get", alias = "GET")]
#[default]