tokio_stream/wrappers/mpsc_unbounded.rs
1use crate::Stream;
2use std::pin::Pin;
3use std::task::{Context, Poll};
4use tokio::sync::mpsc::UnboundedReceiver;
5
6/// A wrapper around [`tokio::sync::mpsc::UnboundedReceiver`] that implements [`Stream`].
7///
8/// # Example
9///
10/// ```
11/// use tokio::sync::mpsc;
12/// use tokio_stream::wrappers::UnboundedReceiverStream;
13/// use tokio_stream::StreamExt;
14///
15/// # #[tokio::main(flavor = "current_thread")]
16/// # async fn main() -> Result<(), tokio::sync::mpsc::error::SendError<u8>> {
17/// let (tx, rx) = mpsc::unbounded_channel();
18/// tx.send(10)?;
19/// tx.send(20)?;
20/// # // prevent the doc test from hanging
21/// drop(tx);
22///
23/// let mut stream = UnboundedReceiverStream::new(rx);
24/// assert_eq!(stream.next().await, Some(10));
25/// assert_eq!(stream.next().await, Some(20));
26/// assert_eq!(stream.next().await, None);
27/// # Ok(())
28/// # }
29/// ```
30///
31/// [`tokio::sync::mpsc::UnboundedReceiver`]: struct@tokio::sync::mpsc::UnboundedReceiver
32/// [`Stream`]: trait@crate::Stream
33#[derive(Debug)]
34pub struct UnboundedReceiverStream<T> {
35 inner: UnboundedReceiver<T>,
36}
37
38impl<T> UnboundedReceiverStream<T> {
39 /// Create a new `UnboundedReceiverStream`.
40 pub fn new(recv: UnboundedReceiver<T>) -> Self {
41 Self { inner: recv }
42 }
43
44 /// Get back the inner `UnboundedReceiver`.
45 pub fn into_inner(self) -> UnboundedReceiver<T> {
46 self.inner
47 }
48
49 /// Closes the receiving half of a channel without dropping it.
50 ///
51 /// This prevents any further messages from being sent on the channel while
52 /// still enabling the receiver to drain messages that are buffered.
53 pub fn close(&mut self) {
54 self.inner.close();
55 }
56}
57
58impl<T> Stream for UnboundedReceiverStream<T> {
59 type Item = T;
60
61 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
62 self.inner.poll_recv(cx)
63 }
64
65 /// Returns the bounds of the stream based on the underlying receiver.
66 ///
67 /// For open channels, it returns `(receiver.len(), None)`.
68 ///
69 /// For closed channels, it returns `(receiver.len(), receiver.len())`.
70 fn size_hint(&self) -> (usize, Option<usize>) {
71 if self.inner.is_closed() {
72 let len = self.inner.len();
73 (len, Some(len))
74 } else {
75 (self.inner.len(), None)
76 }
77 }
78}
79
80impl<T> AsRef<UnboundedReceiver<T>> for UnboundedReceiverStream<T> {
81 fn as_ref(&self) -> &UnboundedReceiver<T> {
82 &self.inner
83 }
84}
85
86impl<T> AsMut<UnboundedReceiver<T>> for UnboundedReceiverStream<T> {
87 fn as_mut(&mut self) -> &mut UnboundedReceiver<T> {
88 &mut self.inner
89 }
90}
91
92impl<T> From<UnboundedReceiver<T>> for UnboundedReceiverStream<T> {
93 fn from(recv: UnboundedReceiver<T>) -> Self {
94 Self::new(recv)
95 }
96}