use std::{ pin::Pin, task::{ready, Context, Poll}, }; use tokio::sync::watch::{error::RecvError, Receiver}; use tokio_util::sync::ReusableBoxFuture; use super::Status; pub struct StatusStream { inner: ReusableBoxFuture<'static, (Result<(), RecvError>, Receiver)>, } impl StatusStream { pub fn new(rx: Receiver) -> Self { Self { inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }), } } pub fn named(self, name: String) -> NamedStatusStream { NamedStatusStream { name, inner: self.inner, } } } impl futures::Stream for StatusStream { type Item = Status; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let (result, mut rx) = ready!(self.inner.poll(cx)); match result { Ok(_) => { let received = (*rx.borrow_and_update()).clone(); self.inner.set(async { (rx.changed().await, rx) }); Poll::Ready(Some(received)) } Err(_) => { self.inner.set(async { (rx.changed().await, rx) }); Poll::Ready(None) } } } } impl From> for StatusStream { fn from(value: Receiver) -> Self { StatusStream::new(value) } } impl From<&Receiver> for StatusStream { fn from(value: &Receiver) -> Self { value.to_owned().into() } } pub struct NamedStatusStream { name: String, inner: ReusableBoxFuture<'static, (Result<(), RecvError>, Receiver)>, } impl NamedStatusStream { pub fn new(name: String, rx: Receiver) -> Self { Self { name, inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }), } } pub fn unnamed(self) -> StatusStream { StatusStream { inner: self.inner } } } impl futures::Stream for NamedStatusStream { type Item = (String, Status); fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let (result, mut rx) = ready!(self.inner.poll(cx)); match result { Ok(_) => { let received = (*rx.borrow_and_update()).clone(); self.inner.set(async { (rx.changed().await, rx) }); Poll::Ready(Some((self.name.to_owned(), received))) } Err(_) => { self.inner.set(async { (rx.changed().await, rx) }); Poll::Ready(None) } } } } impl From<(String, Receiver)> for NamedStatusStream { fn from((name, rx): (String, Receiver)) -> Self { NamedStatusStream::new(name, rx) } } impl From<(&String, &Receiver)> for NamedStatusStream { fn from(value: (&String, &Receiver)) -> Self { (value.0.to_owned(), value.1.to_owned()).into() } } impl From<&(String, Receiver)> for NamedStatusStream { fn from(value: &(String, Receiver)) -> Self { value.to_owned().into() } }