summaryrefslogtreecommitdiffstats
path: root/src/status/stream.rs
blob: 85a25748c83326c71863a609be4f4e299b442656 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
use std::{
    pin::Pin,
    task::{ready, Context, Poll},
};

use tokio::sync::watch::{error::RecvError, Receiver};
use tokio_util::sync::ReusableBoxFuture;

use super::Status;

pub struct StatusStream {
    inner: ReusableBoxFuture<'static, (Result<(), RecvError>, Receiver<Status>)>,
}

impl StatusStream {
    pub fn new(rx: Receiver<Status>) -> Self {
        Self {
            inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }),
        }
    }

    pub fn named(self, name: String) -> NamedStatusStream {
        NamedStatusStream {
            name,
            inner: self.inner,
        }
    }
}

impl futures::Stream for StatusStream {
    type Item = Status;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let (result, mut rx) = ready!(self.inner.poll(cx));
        match result {
            Ok(_) => {
                let received = (*rx.borrow_and_update()).clone();
                self.inner.set(async { (rx.changed().await, rx) });
                Poll::Ready(Some(received))
            }
            Err(_) => {
                self.inner.set(async { (rx.changed().await, rx) });
                Poll::Ready(None)
            }
        }
    }
}

impl From<Receiver<Status>> for StatusStream {
    fn from(value: Receiver<Status>) -> Self {
        StatusStream::new(value)
    }
}

impl From<&Receiver<Status>> for StatusStream {
    fn from(value: &Receiver<Status>) -> Self {
        value.to_owned().into()
    }
}

pub struct NamedStatusStream {
    name: String,
    inner: ReusableBoxFuture<'static, (Result<(), RecvError>, Receiver<Status>)>,
}

impl NamedStatusStream {
    pub fn new(name: String, rx: Receiver<Status>) -> Self {
        Self {
            name,
            inner: ReusableBoxFuture::new(async move { (Ok(()), rx) }),
        }
    }

    pub fn unnamed(self) -> StatusStream {
        StatusStream { inner: self.inner }
    }
}

impl futures::Stream for NamedStatusStream {
    type Item = (String, Status);

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let (result, mut rx) = ready!(self.inner.poll(cx));
        match result {
            Ok(_) => {
                let received = (*rx.borrow_and_update()).clone();
                self.inner.set(async { (rx.changed().await, rx) });
                Poll::Ready(Some((self.name.to_owned(), received)))
            }
            Err(_) => {
                self.inner.set(async { (rx.changed().await, rx) });
                Poll::Ready(None)
            }
        }
    }
}

impl From<(String, Receiver<Status>)> for NamedStatusStream {
    fn from((name, rx): (String, Receiver<Status>)) -> Self {
        NamedStatusStream::new(name, rx)
    }
}

impl From<(&String, &Receiver<Status>)> for NamedStatusStream {
    fn from(value: (&String, &Receiver<Status>)) -> Self {
        (value.0.to_owned(), value.1.to_owned()).into()
    }
}

impl From<&(String, Receiver<Status>)> for NamedStatusStream {
    fn from(value: &(String, Receiver<Status>)) -> Self {
        value.to_owned().into()
    }
}