diff options
author | Toby Vincent <tobyv@tobyvin.dev> | 2024-10-15 17:22:31 -0500 |
---|---|---|
committer | Toby Vincent <tobyv@tobyvin.dev> | 2024-10-15 17:22:31 -0500 |
commit | e5c1b2efb597bc7089e833bf570d8217c36f50c3 (patch) | |
tree | 5e9dd59db44d824e9df605154575eb3d1b4c79d1 /src/status/stream.rs | |
parent | fd13b0b215a0763fa065038e7add3922059a332c (diff) |
refactor: removed unused depsv0.1.0
Diffstat (limited to 'src/status/stream.rs')
-rw-r--r-- | src/status/stream.rs | 114 |
1 files changed, 114 insertions, 0 deletions
diff --git a/src/status/stream.rs b/src/status/stream.rs new file mode 100644 index 0000000..85a2574 --- /dev/null +++ b/src/status/stream.rs @@ -0,0 +1,114 @@ +use std::{ + pin::Pin, + task::{ready, Context, Poll}, +}; + +use tokio::sync::watch::{error::RecvError, Receiver}; +use tokio_util::sync::ReusableBoxFuture; + +use super::Status; + +pub struct StatusStream { + inner: ReusableBoxFuture<'static, (Result<(), RecvError>, Receiver<Status>)>, +} + +impl StatusStream { + pub fn new(rx: Receiver<Status>) -> Self { + Self { + inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }), + } + } + + pub fn named(self, name: String) -> NamedStatusStream { + NamedStatusStream { + name, + inner: self.inner, + } + } +} + +impl futures::Stream for StatusStream { + type Item = Status; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let (result, mut rx) = ready!(self.inner.poll(cx)); + match result { + Ok(_) => { + let received = (*rx.borrow_and_update()).clone(); + self.inner.set(async { (rx.changed().await, rx) }); + Poll::Ready(Some(received)) + } + Err(_) => { + self.inner.set(async { (rx.changed().await, rx) }); + Poll::Ready(None) + } + } + } +} + +impl From<Receiver<Status>> for StatusStream { + fn from(value: Receiver<Status>) -> Self { + StatusStream::new(value) + } +} + +impl From<&Receiver<Status>> for StatusStream { + fn from(value: &Receiver<Status>) -> Self { + value.to_owned().into() + } +} + +pub struct NamedStatusStream { + name: String, + inner: ReusableBoxFuture<'static, (Result<(), RecvError>, Receiver<Status>)>, +} + +impl NamedStatusStream { + pub fn new(name: String, rx: Receiver<Status>) -> Self { + Self { + name, + inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }), + } + } + + pub fn unnamed(self) -> StatusStream { + StatusStream { inner: self.inner } + } +} + +impl futures::Stream for NamedStatusStream { + type Item = (String, Status); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { + let (result, mut rx) = ready!(self.inner.poll(cx)); + match result { + Ok(_) => { + let received = (*rx.borrow_and_update()).clone(); + self.inner.set(async { (rx.changed().await, rx) }); + Poll::Ready(Some((self.name.to_owned(), received))) + } + Err(_) => { + self.inner.set(async { (rx.changed().await, rx) }); + Poll::Ready(None) + } + } + } +} + +impl From<(String, Receiver<Status>)> for NamedStatusStream { + fn from((name, rx): (String, Receiver<Status>)) -> Self { + NamedStatusStream::new(name, rx) + } +} + +impl From<(&String, &Receiver<Status>)> for NamedStatusStream { + fn from(value: (&String, &Receiver<Status>)) -> Self { + (value.0.to_owned(), value.1.to_owned()).into() + } +} + +impl From<&(String, Receiver<Status>)> for NamedStatusStream { + fn from(value: &(String, Receiver<Status>)) -> Self { + value.to_owned().into() + } +} |