summaryrefslogtreecommitdiffstats
path: root/src/status/stream.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/status/stream.rs')
-rw-r--r--src/status/stream.rs114
1 files changed, 114 insertions, 0 deletions
diff --git a/src/status/stream.rs b/src/status/stream.rs
new file mode 100644
index 0000000..85a2574
--- /dev/null
+++ b/src/status/stream.rs
@@ -0,0 +1,114 @@
+use std::{
+ pin::Pin,
+ task::{ready, Context, Poll},
+};
+
+use tokio::sync::watch::{error::RecvError, Receiver};
+use tokio_util::sync::ReusableBoxFuture;
+
+use super::Status;
+
+pub struct StatusStream {
+ inner: ReusableBoxFuture<'static, (Result<(), RecvError>, Receiver<Status>)>,
+}
+
+impl StatusStream {
+ pub fn new(rx: Receiver<Status>) -> Self {
+ Self {
+ inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }),
+ }
+ }
+
+ pub fn named(self, name: String) -> NamedStatusStream {
+ NamedStatusStream {
+ name,
+ inner: self.inner,
+ }
+ }
+}
+
+impl futures::Stream for StatusStream {
+ type Item = Status;
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let (result, mut rx) = ready!(self.inner.poll(cx));
+ match result {
+ Ok(_) => {
+ let received = (*rx.borrow_and_update()).clone();
+ self.inner.set(async { (rx.changed().await, rx) });
+ Poll::Ready(Some(received))
+ }
+ Err(_) => {
+ self.inner.set(async { (rx.changed().await, rx) });
+ Poll::Ready(None)
+ }
+ }
+ }
+}
+
+impl From<Receiver<Status>> for StatusStream {
+ fn from(value: Receiver<Status>) -> Self {
+ StatusStream::new(value)
+ }
+}
+
+impl From<&Receiver<Status>> for StatusStream {
+ fn from(value: &Receiver<Status>) -> Self {
+ value.to_owned().into()
+ }
+}
+
+pub struct NamedStatusStream {
+ name: String,
+ inner: ReusableBoxFuture<'static, (Result<(), RecvError>, Receiver<Status>)>,
+}
+
+impl NamedStatusStream {
+ pub fn new(name: String, rx: Receiver<Status>) -> Self {
+ Self {
+ name,
+ inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }),
+ }
+ }
+
+ pub fn unnamed(self) -> StatusStream {
+ StatusStream { inner: self.inner }
+ }
+}
+
+impl futures::Stream for NamedStatusStream {
+ type Item = (String, Status);
+
+ fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
+ let (result, mut rx) = ready!(self.inner.poll(cx));
+ match result {
+ Ok(_) => {
+ let received = (*rx.borrow_and_update()).clone();
+ self.inner.set(async { (rx.changed().await, rx) });
+ Poll::Ready(Some((self.name.to_owned(), received)))
+ }
+ Err(_) => {
+ self.inner.set(async { (rx.changed().await, rx) });
+ Poll::Ready(None)
+ }
+ }
+ }
+}
+
+impl From<(String, Receiver<Status>)> for NamedStatusStream {
+ fn from((name, rx): (String, Receiver<Status>)) -> Self {
+ NamedStatusStream::new(name, rx)
+ }
+}
+
+impl From<(&String, &Receiver<Status>)> for NamedStatusStream {
+ fn from(value: (&String, &Receiver<Status>)) -> Self {
+ (value.0.to_owned(), value.1.to_owned()).into()
+ }
+}
+
+impl From<&(String, Receiver<Status>)> for NamedStatusStream {
+ fn from(value: &(String, Receiver<Status>)) -> Self {
+ value.to_owned().into()
+ }
+}