From e5c1b2efb597bc7089e833bf570d8217c36f50c3 Mon Sep 17 00:00:00 2001 From: Toby Vincent Date: Tue, 15 Oct 2024 17:22:31 -0500 Subject: refactor: removed unused deps --- src/status/stream.rs | 114 +++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 114 insertions(+) create mode 100644 src/status/stream.rs (limited to 'src/status/stream.rs') diff --git a/src/status/stream.rs b/src/status/stream.rs new file mode 100644 index 0000000..85a2574 --- /dev/null +++ b/src/status/stream.rs @@ -0,0 +1,114 @@ +use std::{ + pin::Pin, + task::{ready, Context, Poll}, +}; + +use tokio::sync::watch::{error::RecvError, Receiver}; +use tokio_util::sync::ReusableBoxFuture; + +use super::Status; + +pub struct StatusStream { + inner: ReusableBoxFuture<'static, (Result<(), RecvError>, Receiver)>, +} + +impl StatusStream { + pub fn new(rx: Receiver) -> Self { + Self { + inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }), + } + } + + pub fn named(self, name: String) -> NamedStatusStream { + NamedStatusStream { + name, + inner: self.inner, + } + } +} + +impl futures::Stream for StatusStream { + type Item = Status; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let (result, mut rx) = ready!(self.inner.poll(cx)); + match result { + Ok(_) => { + let received = (*rx.borrow_and_update()).clone(); + self.inner.set(async { (rx.changed().await, rx) }); + Poll::Ready(Some(received)) + } + Err(_) => { + self.inner.set(async { (rx.changed().await, rx) }); + Poll::Ready(None) + } + } + } +} + +impl From> for StatusStream { + fn from(value: Receiver) -> Self { + StatusStream::new(value) + } +} + +impl From<&Receiver> for StatusStream { + fn from(value: &Receiver) -> Self { + value.to_owned().into() + } +} + +pub struct NamedStatusStream { + name: String, + inner: ReusableBoxFuture<'static, (Result<(), RecvError>, Receiver)>, +} + +impl NamedStatusStream { + pub fn new(name: String, rx: Receiver) -> Self { + Self { + name, + inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }), + } + } + + pub fn unnamed(self) -> StatusStream { + StatusStream { inner: self.inner } + } +} + +impl futures::Stream for NamedStatusStream { + type Item = (String, Status); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let (result, mut rx) = ready!(self.inner.poll(cx)); + match result { + Ok(_) => { + let received = (*rx.borrow_and_update()).clone(); + self.inner.set(async { (rx.changed().await, rx) }); + Poll::Ready(Some((self.name.to_owned(), received))) + } + Err(_) => { + self.inner.set(async { (rx.changed().await, rx) }); + Poll::Ready(None) + } + } + } +} + +impl From<(String, Receiver)> for NamedStatusStream { + fn from((name, rx): (String, Receiver)) -> Self { + NamedStatusStream::new(name, rx) + } +} + +impl From<(&String, &Receiver)> for NamedStatusStream { + fn from(value: (&String, &Receiver)) -> Self { + (value.0.to_owned(), value.1.to_owned()).into() + } +} + +impl From<&(String, Receiver)> for NamedStatusStream { + fn from(value: &(String, Receiver)) -> Self { + value.to_owned().into() + } +} -- cgit v1.2.3-70-g09d2