tokio_stream/stream_ext/
chunks_timeout.rs1use crate::stream_ext::Fuse;
2use crate::Stream;
3use tokio::time::{sleep, Sleep};
4
5use core::future::Future;
6use core::pin::Pin;
7use core::task::{ready, Context, Poll};
8use pin_project_lite::pin_project;
9use std::time::Duration;
10
11pin_project! {
12 #[must_use = "streams do nothing unless polled"]
14 #[derive(Debug)]
15 pub struct ChunksTimeout<S: Stream> {
16 #[pin]
17 stream: Fuse<S>,
18 #[pin]
19 deadline: Option<Sleep>,
20 duration: Duration,
21 items: Vec<S::Item>,
22 cap: usize, }
24}
25
26impl<S: Stream> ChunksTimeout<S> {
27 pub(super) fn new(stream: S, max_size: usize, duration: Duration) -> Self {
28 ChunksTimeout {
29 stream: Fuse::new(stream),
30 deadline: None,
31 duration,
32 items: Vec::with_capacity(max_size),
33 cap: max_size,
34 }
35 }
36
37 pub fn into_remainder(mut self: Pin<&mut Self>) -> Vec<S::Item> {
39 let me = self.as_mut().project();
40 std::mem::take(me.items)
41 }
42}
43
44impl<S: Stream> Stream for ChunksTimeout<S> {
45 type Item = Vec<S::Item>;
46
47 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
48 let mut me = self.as_mut().project();
49 loop {
50 match me.stream.as_mut().poll_next(cx) {
51 Poll::Pending => break,
52 Poll::Ready(Some(item)) => {
53 if me.items.is_empty() {
54 me.deadline.set(Some(sleep(*me.duration)));
55 me.items.reserve_exact(*me.cap);
56 }
57 me.items.push(item);
58 if me.items.len() >= *me.cap {
59 return Poll::Ready(Some(std::mem::take(me.items)));
60 }
61 }
62 Poll::Ready(None) => {
63 let last = if me.items.is_empty() {
65 None
66 } else {
67 Some(std::mem::take(me.items))
68 };
69
70 return Poll::Ready(last);
71 }
72 }
73 }
74
75 if !me.items.is_empty() {
76 if let Some(deadline) = me.deadline.as_pin_mut() {
77 ready!(deadline.poll(cx));
78 }
79 return Poll::Ready(Some(std::mem::take(me.items)));
80 }
81
82 Poll::Pending
83 }
84
85 fn size_hint(&self) -> (usize, Option<usize>) {
86 let chunk_len = if self.items.is_empty() { 0 } else { 1 };
87 let (lower, upper) = self.stream.size_hint();
88 let lower = (lower / self.cap).saturating_add(chunk_len);
89 let upper = upper.and_then(|x| x.checked_add(chunk_len));
90 (lower, upper)
91 }
92}