diff options
author | Toby Vincent <tobyv@tobyvin.dev> | 2024-09-28 00:54:46 -0500 |
---|---|---|
committer | Toby Vincent <tobyv@tobyvin.dev> | 2024-09-28 00:58:45 -0500 |
commit | cd774827dd14f68d8405c45d2d9da30b3fab050e (patch) | |
tree | a24e1cabb99170caa25edff53fc978111a1c9dd4 /src/service/tcp.rs | |
parent | 04c7f7609e5bc3fadf95c53b37a9e6e12c4e539c (diff) |
feat: refactor into pub-sub and impl SSE
Diffstat (limited to 'src/service/tcp.rs')
-rw-r--r-- | src/service/tcp.rs | 79 |
1 files changed, 72 insertions, 7 deletions
diff --git a/src/service/tcp.rs b/src/service/tcp.rs index 87e696a..5ec5f36 100644 --- a/src/service/tcp.rs +++ b/src/service/tcp.rs @@ -1,12 +1,17 @@ -use std::fmt::Display; +use std::{fmt::Display, net::SocketAddr, time::Duration}; +use futures::Stream; use serde::Deserialize; +use tokio::{io::Interest, net::TcpSocket, sync::watch::Sender}; +use tokio_stream::wrappers::WatchStream; use crate::{Error, Status}; +use super::ServiceSpawner; + #[derive(Debug, Clone, Deserialize)] pub struct Tcp { - pub address: String, + pub address: SocketAddr, } impl Display for Tcp { @@ -15,11 +20,71 @@ impl Display for Tcp { } } +impl ServiceSpawner for Tcp { + #[tracing::instrument(skip(tx))] + async fn spawn(self, tx: Sender<Status>) -> Result<(), Error> { + let mut interval = tokio::time::interval(Duration::from_secs(5)); + + loop { + interval.tick().await; + + let sock = TcpSocket::new_v4()?; + sock.set_keepalive(true)?; + + match sock.connect(self.address).await { + Ok(conn) => { + tracing::info!("Connected"); + tx.send_if_modified(|s| s.update(Status::Pass)); + conn.ready(Interest::ERROR).await?; + tx.send_replace(Status::Fail(Some("Disconnected".into()))); + tracing::info!("Disconnected"); + } + Err(err) => { + tracing::error!("Failed to connect"); + tx.send_if_modified(|s| s.update(err.into())); + } + }; + } + } +} + impl Tcp { - pub async fn check(&self) -> Result<Status, Error> { - Ok(std::net::TcpStream::connect(&self.address) - .err() - .map(Into::into) - .unwrap_or_default()) + pub fn into_stream(self) -> impl Stream<Item = Status> { + let (tx, rx) = tokio::sync::watch::channel(Status::default()); + tokio::spawn(self.spawn(tx)); + WatchStream::new(rx) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + #[tracing_test::traced_test] + #[ignore] + async fn test_tcp_watch() { + let (tx, mut rx) = tokio::sync::watch::channel(Status::default()); + + let tests = tokio::spawn(async move { + assert!(matches!(*rx.borrow_and_update(), Status::Unknown)); + + rx.changed().await.unwrap(); + assert!(matches!(*rx.borrow_and_update(), Status::Pass)); + + rx.changed().await.unwrap(); + assert_eq!( + *rx.borrow_and_update(), + Status::Fail(Some(String::from("Disconnected"))) + ); + }); + + let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); + let address = listener.local_addr().unwrap(); + tokio::spawn(async move { Tcp { address }.spawn(tx).await }); + listener.accept().await.unwrap(); + drop(listener); + + tests.await.unwrap() } } |