summaryrefslogtreecommitdiffstats
path: root/src/service/tcp.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/service/tcp.rs')
-rw-r--r--src/service/tcp.rs75
1 files changed, 22 insertions, 53 deletions
diff --git a/src/service/tcp.rs b/src/service/tcp.rs
index 7b79afd..6556af0 100644
--- a/src/service/tcp.rs
+++ b/src/service/tcp.rs
@@ -1,15 +1,19 @@
use std::{fmt::Display, net::SocketAddr, time::Duration};
+use async_stream::try_stream;
+use futures::Stream;
use serde::Deserialize;
-use tokio::{io::Interest, net::TcpSocket, sync::watch::Sender};
+use tokio::{io::Interest, net::TcpSocket};
-use crate::{Error, Status};
+use super::IntoService;
-use super::ServiceSpawner;
+pub(crate) type Error = std::io::Error;
#[derive(Debug, Clone, Deserialize)]
pub struct Tcp {
pub address: SocketAddr,
+ #[serde(default = "super::default_interval")]
+ pub interval: Duration,
}
impl Display for Tcp {
@@ -18,59 +22,24 @@ impl Display for Tcp {
}
}
-impl ServiceSpawner for Tcp {
- async fn spawn(self, tx: Sender<Status>) -> Result<(), Error> {
- let mut interval = tokio::time::interval(Duration::from_secs(5));
+impl IntoService for Tcp {
+ type Error = Error;
- loop {
- interval.tick().await;
+ fn into_service(self) -> impl Stream<Item = Result<(), Self::Error>> {
+ let mut interval = tokio::time::interval(self.interval);
- let sock = TcpSocket::new_v4()?;
- sock.set_keepalive(true)?;
+ try_stream! {
+ loop {
+ interval.tick().await;
- match sock.connect(self.address).await {
- Ok(conn) => {
- // TODO: figure out how to wait for connection to close
- conn.ready(Interest::READABLE).await?;
- tx.send_if_modified(|s| s.update(Status::Ok));
- }
- Err(err) => {
- tx.send_if_modified(|s| s.update(err.into()));
- }
- };
- }
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[tokio::test]
- #[tracing_test::traced_test]
- #[ignore]
- async fn test_tcp_watch() {
- let (tx, mut rx) = tokio::sync::watch::channel(Status::Error(None));
+ let sock = TcpSocket::new_v4()?;
+ sock.set_keepalive(true)?;
- let tests = tokio::spawn(async move {
- assert!(matches!(*rx.borrow_and_update(), Status::Error(None)));
-
- rx.changed().await.unwrap();
- assert!(matches!(*rx.borrow_and_update(), Status::Ok));
-
- rx.changed().await.unwrap();
- assert_eq!(
- *rx.borrow_and_update(),
- Status::Error(Some(String::from("Disconnected")))
- );
- });
-
- let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
- let address = listener.local_addr().unwrap();
- tokio::spawn(async move { Tcp { address }.spawn(tx).await });
- listener.accept().await.unwrap();
- drop(listener);
-
- tests.await.unwrap()
+ let conn = sock.connect(self.address).await?;
+ // TODO: figure out how to wait for connection to close
+ conn.ready(Interest::READABLE).await?;
+ yield ();
+ }
+ }
}
}