1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
|
use std::{
pin::Pin,
task::{ready, Context, Poll},
};
use tokio::sync::watch::{error::RecvError, Receiver};
use tokio_util::sync::ReusableBoxFuture;
use super::Status;
pub struct StatusStream {
inner: ReusableBoxFuture<'static, (Result<(), RecvError>, Receiver<Status>)>,
}
impl StatusStream {
pub fn new(rx: Receiver<Status>) -> Self {
Self {
inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }),
}
}
pub fn named(self, name: String) -> NamedStatusStream {
NamedStatusStream {
name,
inner: self.inner,
}
}
}
impl futures::Stream for StatusStream {
type Item = Status;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let (result, mut rx) = ready!(self.inner.poll(cx));
match result {
Ok(_) => {
let received = (*rx.borrow_and_update()).clone();
self.inner.set(async { (rx.changed().await, rx) });
Poll::Ready(Some(received))
}
Err(_) => {
self.inner.set(async { (rx.changed().await, rx) });
Poll::Ready(None)
}
}
}
}
impl From<Receiver<Status>> for StatusStream {
fn from(value: Receiver<Status>) -> Self {
StatusStream::new(value)
}
}
impl From<&Receiver<Status>> for StatusStream {
fn from(value: &Receiver<Status>) -> Self {
value.to_owned().into()
}
}
pub struct NamedStatusStream {
name: String,
inner: ReusableBoxFuture<'static, (Result<(), RecvError>, Receiver<Status>)>,
}
impl NamedStatusStream {
pub fn new(name: String, rx: Receiver<Status>) -> Self {
Self {
name,
inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }),
}
}
pub fn unnamed(self) -> StatusStream {
StatusStream { inner: self.inner }
}
}
impl futures::Stream for NamedStatusStream {
type Item = (String, Status);
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let (result, mut rx) = ready!(self.inner.poll(cx));
match result {
Ok(_) => {
let received = (*rx.borrow_and_update()).clone();
self.inner.set(async { (rx.changed().await, rx) });
Poll::Ready(Some((self.name.to_owned(), received)))
}
Err(_) => {
self.inner.set(async { (rx.changed().await, rx) });
Poll::Ready(None)
}
}
}
}
impl From<(String, Receiver<Status>)> for NamedStatusStream {
fn from((name, rx): (String, Receiver<Status>)) -> Self {
NamedStatusStream::new(name, rx)
}
}
impl From<(&String, &Receiver<Status>)> for NamedStatusStream {
fn from(value: (&String, &Receiver<Status>)) -> Self {
(value.0.to_owned(), value.1.to_owned()).into()
}
}
impl From<&(String, Receiver<Status>)> for NamedStatusStream {
fn from(value: &(String, Receiver<Status>)) -> Self {
value.to_owned().into()
}
}
|