summaryrefslogtreecommitdiffstats
path: root/src/service/tcp.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/service/tcp.rs')
-rw-r--r--src/service/tcp.rs42
1 files changed, 21 insertions, 21 deletions
diff --git a/src/service/tcp.rs b/src/service/tcp.rs
index 6556af0..e28b19d 100644
--- a/src/service/tcp.rs
+++ b/src/service/tcp.rs
@@ -1,15 +1,13 @@
use std::{fmt::Display, net::SocketAddr, time::Duration};
-use async_stream::try_stream;
-use futures::Stream;
-use serde::Deserialize;
+use serde::{Deserialize, Serialize};
use tokio::{io::Interest, net::TcpSocket};
-use super::IntoService;
+use crate::Status;
-pub(crate) type Error = std::io::Error;
+use super::IntoService;
-#[derive(Debug, Clone, Deserialize)]
+#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Tcp {
pub address: SocketAddr,
#[serde(default = "super::default_interval")]
@@ -22,24 +20,26 @@ impl Display for Tcp {
}
}
-impl IntoService for Tcp {
- type Error = Error;
-
- fn into_service(self) -> impl Stream<Item = Result<(), Self::Error>> {
- let mut interval = tokio::time::interval(self.interval);
+impl Tcp {
+ async fn check(address: SocketAddr) -> Result<(), std::io::Error> {
+ let sock = TcpSocket::new_v4()?;
+ sock.set_keepalive(true)?;
- try_stream! {
- loop {
- interval.tick().await;
+ let conn = sock.connect(address).await?;
+ // TODO: figure out how to wait for connection to close
+ conn.ready(Interest::READABLE).await?;
+ Ok(())
+ }
+}
- let sock = TcpSocket::new_v4()?;
- sock.set_keepalive(true)?;
+impl IntoService for Tcp {
+ async fn into_service(self, tx: tokio::sync::watch::Sender<Status>) {
+ let mut interval = tokio::time::interval(self.interval);
- let conn = sock.connect(self.address).await?;
- // TODO: figure out how to wait for connection to close
- conn.ready(Interest::READABLE).await?;
- yield ();
- }
+ loop {
+ interval.tick().await;
+ let res = Self::check(self.address).await;
+ tx.send_if_modified(|s| s.update(res.into()));
}
}
}