tokio_stream/stream_ext/
chunks_timeout.rs

1use crate::stream_ext::Fuse;
2use crate::Stream;
3use tokio::time::{sleep, Sleep};
4
5use core::future::Future;
6use core::pin::Pin;
7use core::task::{ready, Context, Poll};
8use pin_project_lite::pin_project;
9use std::time::Duration;
10
11pin_project! {
12    /// Stream returned by the [`chunks_timeout`](super::StreamExt::chunks_timeout) method.
13    #[must_use = "streams do nothing unless polled"]
14    #[derive(Debug)]
15    pub struct ChunksTimeout<S: Stream> {
16        #[pin]
17        stream: Fuse<S>,
18        #[pin]
19        deadline: Option<Sleep>,
20        duration: Duration,
21        items: Vec<S::Item>,
22        cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
23    }
24}
25
26impl<S: Stream> ChunksTimeout<S> {
27    pub(super) fn new(stream: S, max_size: usize, duration: Duration) -> Self {
28        ChunksTimeout {
29            stream: Fuse::new(stream),
30            deadline: None,
31            duration,
32            items: Vec::with_capacity(max_size),
33            cap: max_size,
34        }
35    }
36
37    /// Consumes the [`ChunksTimeout`] and then returns all buffered items.
38    pub fn into_remainder(mut self: Pin<&mut Self>) -> Vec<S::Item> {
39        let me = self.as_mut().project();
40        std::mem::take(me.items)
41    }
42}
43
44impl<S: Stream> Stream for ChunksTimeout<S> {
45    type Item = Vec<S::Item>;
46
47    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
48        let mut me = self.as_mut().project();
49        loop {
50            match me.stream.as_mut().poll_next(cx) {
51                Poll::Pending => break,
52                Poll::Ready(Some(item)) => {
53                    if me.items.is_empty() {
54                        me.deadline.set(Some(sleep(*me.duration)));
55                        me.items.reserve_exact(*me.cap);
56                    }
57                    me.items.push(item);
58                    if me.items.len() >= *me.cap {
59                        return Poll::Ready(Some(std::mem::take(me.items)));
60                    }
61                }
62                Poll::Ready(None) => {
63                    // Returning Some here is only correct because we fuse the inner stream.
64                    let last = if me.items.is_empty() {
65                        None
66                    } else {
67                        Some(std::mem::take(me.items))
68                    };
69
70                    return Poll::Ready(last);
71                }
72            }
73        }
74
75        if !me.items.is_empty() {
76            if let Some(deadline) = me.deadline.as_pin_mut() {
77                ready!(deadline.poll(cx));
78            }
79            return Poll::Ready(Some(std::mem::take(me.items)));
80        }
81
82        Poll::Pending
83    }
84
85    fn size_hint(&self) -> (usize, Option<usize>) {
86        let chunk_len = if self.items.is_empty() { 0 } else { 1 };
87        let (lower, upper) = self.stream.size_hint();
88        let lower = (lower / self.cap).saturating_add(chunk_len);
89        let upper = upper.and_then(|x| x.checked_add(chunk_len));
90        (lower, upper)
91    }
92}