summaryrefslogtreecommitdiffstats
path: root/src/service/tcp.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/service/tcp.rs')
-rw-r--r--src/service/tcp.rs79
1 files changed, 72 insertions, 7 deletions
diff --git a/src/service/tcp.rs b/src/service/tcp.rs
index 87e696a..5ec5f36 100644
--- a/src/service/tcp.rs
+++ b/src/service/tcp.rs
@@ -1,12 +1,17 @@
-use std::fmt::Display;
+use std::{fmt::Display, net::SocketAddr, time::Duration};
+use futures::Stream;
use serde::Deserialize;
+use tokio::{io::Interest, net::TcpSocket, sync::watch::Sender};
+use tokio_stream::wrappers::WatchStream;
use crate::{Error, Status};
+use super::ServiceSpawner;
+
#[derive(Debug, Clone, Deserialize)]
pub struct Tcp {
- pub address: String,
+ pub address: SocketAddr,
}
impl Display for Tcp {
@@ -15,11 +20,71 @@ impl Display for Tcp {
}
}
+impl ServiceSpawner for Tcp {
+ #[tracing::instrument(skip(tx))]
+ async fn spawn(self, tx: Sender<Status>) -> Result<(), Error> {
+ let mut interval = tokio::time::interval(Duration::from_secs(5));
+
+ loop {
+ interval.tick().await;
+
+ let sock = TcpSocket::new_v4()?;
+ sock.set_keepalive(true)?;
+
+ match sock.connect(self.address).await {
+ Ok(conn) => {
+ tracing::info!("Connected");
+ tx.send_if_modified(|s| s.update(Status::Pass));
+ conn.ready(Interest::ERROR).await?;
+ tx.send_replace(Status::Fail(Some("Disconnected".into())));
+ tracing::info!("Disconnected");
+ }
+ Err(err) => {
+ tracing::error!("Failed to connect");
+ tx.send_if_modified(|s| s.update(err.into()));
+ }
+ };
+ }
+ }
+}
+
impl Tcp {
- pub async fn check(&self) -> Result<Status, Error> {
- Ok(std::net::TcpStream::connect(&self.address)
- .err()
- .map(Into::into)
- .unwrap_or_default())
+ pub fn into_stream(self) -> impl Stream<Item = Status> {
+ let (tx, rx) = tokio::sync::watch::channel(Status::default());
+ tokio::spawn(self.spawn(tx));
+ WatchStream::new(rx)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[tokio::test]
+ #[tracing_test::traced_test]
+ #[ignore]
+ async fn test_tcp_watch() {
+ let (tx, mut rx) = tokio::sync::watch::channel(Status::default());
+
+ let tests = tokio::spawn(async move {
+ assert!(matches!(*rx.borrow_and_update(), Status::Unknown));
+
+ rx.changed().await.unwrap();
+ assert!(matches!(*rx.borrow_and_update(), Status::Pass));
+
+ rx.changed().await.unwrap();
+ assert_eq!(
+ *rx.borrow_and_update(),
+ Status::Fail(Some(String::from("Disconnected")))
+ );
+ });
+
+ let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
+ let address = listener.local_addr().unwrap();
+ tokio::spawn(async move { Tcp { address }.spawn(tx).await });
+ listener.accept().await.unwrap();
+ drop(listener);
+
+ tests.await.unwrap()
}
}