From b94f8e694bf01f5dba9ce2c01f589463a3dfbc69 Mon Sep 17 00:00:00 2001 From: Toby Vincent Date: Wed, 9 Oct 2024 18:23:58 -0500 Subject: feat!: rewrite to use traits and streams --- src/service/tcp.rs | 75 ++++++++++++++++-------------------------------------- 1 file changed, 22 insertions(+), 53 deletions(-) (limited to 'src/service/tcp.rs') diff --git a/src/service/tcp.rs b/src/service/tcp.rs index 7b79afd..6556af0 100644 --- a/src/service/tcp.rs +++ b/src/service/tcp.rs @@ -1,15 +1,19 @@ use std::{fmt::Display, net::SocketAddr, time::Duration}; +use async_stream::try_stream; +use futures::Stream; use serde::Deserialize; -use tokio::{io::Interest, net::TcpSocket, sync::watch::Sender}; +use tokio::{io::Interest, net::TcpSocket}; -use crate::{Error, Status}; +use super::IntoService; -use super::ServiceSpawner; +pub(crate) type Error = std::io::Error; #[derive(Debug, Clone, Deserialize)] pub struct Tcp { pub address: SocketAddr, + #[serde(default = "super::default_interval")] + pub interval: Duration, } impl Display for Tcp { @@ -18,59 +22,24 @@ impl Display for Tcp { } } -impl ServiceSpawner for Tcp { - async fn spawn(self, tx: Sender) -> Result<(), Error> { - let mut interval = tokio::time::interval(Duration::from_secs(5)); +impl IntoService for Tcp { + type Error = Error; - loop { - interval.tick().await; + fn into_service(self) -> impl Stream> { + let mut interval = tokio::time::interval(self.interval); - let sock = TcpSocket::new_v4()?; - sock.set_keepalive(true)?; + try_stream! { + loop { + interval.tick().await; - match sock.connect(self.address).await { - Ok(conn) => { - // TODO: figure out how to wait for connection to close - conn.ready(Interest::READABLE).await?; - tx.send_if_modified(|s| s.update(Status::Ok)); - } - Err(err) => { - tx.send_if_modified(|s| s.update(err.into())); - } - }; - } - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[tokio::test] - #[tracing_test::traced_test] - #[ignore] - async fn test_tcp_watch() { - let (tx, mut rx) = tokio::sync::watch::channel(Status::Error(None)); + let sock = TcpSocket::new_v4()?; + sock.set_keepalive(true)?; - let tests = tokio::spawn(async move { - assert!(matches!(*rx.borrow_and_update(), Status::Error(None))); - - rx.changed().await.unwrap(); - assert!(matches!(*rx.borrow_and_update(), Status::Ok)); - - rx.changed().await.unwrap(); - assert_eq!( - *rx.borrow_and_update(), - Status::Error(Some(String::from("Disconnected"))) - ); - }); - - let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap(); - let address = listener.local_addr().unwrap(); - tokio::spawn(async move { Tcp { address }.spawn(tx).await }); - listener.accept().await.unwrap(); - drop(listener); - - tests.await.unwrap() + let conn = sock.connect(self.address).await?; + // TODO: figure out how to wait for connection to close + conn.ready(Interest::READABLE).await?; + yield (); + } + } } } -- cgit v1.2.3-70-g09d2