tokio_stream/stream_ext/
collect.rs

1use crate::Stream;
2
3use core::future::Future;
4use core::marker::{PhantomData, PhantomPinned};
5use core::mem;
6use core::pin::Pin;
7use core::task::{ready, Context, Poll};
8use pin_project_lite::pin_project;
9
10// Do not export this struct until `FromStream` can be unsealed.
11pin_project! {
12    /// Future returned by the [`collect`](super::StreamExt::collect) method.
13    #[must_use = "futures do nothing unless you `.await` or poll them"]
14    #[derive(Debug)]
15    pub struct Collect<T, U, C>
16    {
17        #[pin]
18        stream: T,
19        collection: C,
20        _output: PhantomData<U>,
21        // Make this future `!Unpin` for compatibility with async trait methods.
22        #[pin]
23        _pin: PhantomPinned,
24    }
25}
26
27/// Convert from a [`Stream`].
28///
29/// This trait is not intended to be used directly. Instead, call
30/// [`StreamExt::collect()`](super::StreamExt::collect).
31///
32/// # Implementing
33///
34/// Currently, this trait may not be implemented by third parties. The trait is
35/// sealed in order to make changes in the future. Stabilization is pending
36/// enhancements to the Rust language.
37pub trait FromStream<T>: sealed::FromStreamPriv<T> {}
38
39impl<T, U> Collect<T, U, U::InternalCollection>
40where
41    T: Stream,
42    U: FromStream<T::Item>,
43{
44    pub(super) fn new(stream: T) -> Collect<T, U, U::InternalCollection> {
45        let (lower, upper) = stream.size_hint();
46        let collection = U::initialize(sealed::Internal, lower, upper);
47
48        Collect {
49            stream,
50            collection,
51            _output: PhantomData,
52            _pin: PhantomPinned,
53        }
54    }
55}
56
57impl<T, U> Future for Collect<T, U, U::InternalCollection>
58where
59    T: Stream,
60    U: FromStream<T::Item>,
61{
62    type Output = U;
63
64    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<U> {
65        use Poll::Ready;
66
67        loop {
68            let me = self.as_mut().project();
69
70            let item = match ready!(me.stream.poll_next(cx)) {
71                Some(item) => item,
72                None => {
73                    return Ready(U::finalize(sealed::Internal, me.collection));
74                }
75            };
76
77            if !U::extend(sealed::Internal, me.collection, item) {
78                return Ready(U::finalize(sealed::Internal, me.collection));
79            }
80        }
81    }
82}
83
84// ===== FromStream implementations
85
86impl FromStream<()> for () {}
87
88impl sealed::FromStreamPriv<()> for () {
89    type InternalCollection = ();
90
91    fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) {}
92
93    fn extend(_: sealed::Internal, _collection: &mut (), _item: ()) -> bool {
94        true
95    }
96
97    fn finalize(_: sealed::Internal, _collection: &mut ()) {}
98}
99
100impl<T: AsRef<str>> FromStream<T> for String {}
101
102impl<T: AsRef<str>> sealed::FromStreamPriv<T> for String {
103    type InternalCollection = String;
104
105    fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) -> String {
106        String::new()
107    }
108
109    fn extend(_: sealed::Internal, collection: &mut String, item: T) -> bool {
110        collection.push_str(item.as_ref());
111        true
112    }
113
114    fn finalize(_: sealed::Internal, collection: &mut String) -> String {
115        mem::take(collection)
116    }
117}
118
119impl<T> FromStream<T> for Vec<T> {}
120
121impl<T> sealed::FromStreamPriv<T> for Vec<T> {
122    type InternalCollection = Vec<T>;
123
124    fn initialize(_: sealed::Internal, lower: usize, _upper: Option<usize>) -> Vec<T> {
125        Vec::with_capacity(lower)
126    }
127
128    fn extend(_: sealed::Internal, collection: &mut Vec<T>, item: T) -> bool {
129        collection.push(item);
130        true
131    }
132
133    fn finalize(_: sealed::Internal, collection: &mut Vec<T>) -> Vec<T> {
134        mem::take(collection)
135    }
136}
137
138impl<T> FromStream<T> for Box<[T]> {}
139
140impl<T> sealed::FromStreamPriv<T> for Box<[T]> {
141    type InternalCollection = Vec<T>;
142
143    fn initialize(_: sealed::Internal, lower: usize, upper: Option<usize>) -> Vec<T> {
144        <Vec<T> as sealed::FromStreamPriv<T>>::initialize(sealed::Internal, lower, upper)
145    }
146
147    fn extend(_: sealed::Internal, collection: &mut Vec<T>, item: T) -> bool {
148        <Vec<T> as sealed::FromStreamPriv<T>>::extend(sealed::Internal, collection, item)
149    }
150
151    fn finalize(_: sealed::Internal, collection: &mut Vec<T>) -> Box<[T]> {
152        <Vec<T> as sealed::FromStreamPriv<T>>::finalize(sealed::Internal, collection)
153            .into_boxed_slice()
154    }
155}
156
157impl<T, U, E> FromStream<Result<T, E>> for Result<U, E> where U: FromStream<T> {}
158
159impl<T, U, E> sealed::FromStreamPriv<Result<T, E>> for Result<U, E>
160where
161    U: FromStream<T>,
162{
163    type InternalCollection = Result<U::InternalCollection, E>;
164
165    fn initialize(
166        _: sealed::Internal,
167        lower: usize,
168        upper: Option<usize>,
169    ) -> Result<U::InternalCollection, E> {
170        Ok(U::initialize(sealed::Internal, lower, upper))
171    }
172
173    fn extend(
174        _: sealed::Internal,
175        collection: &mut Self::InternalCollection,
176        item: Result<T, E>,
177    ) -> bool {
178        assert!(collection.is_ok());
179        match item {
180            Ok(item) => {
181                let collection = collection.as_mut().ok().expect("invalid state");
182                U::extend(sealed::Internal, collection, item)
183            }
184            Err(err) => {
185                *collection = Err(err);
186                false
187            }
188        }
189    }
190
191    fn finalize(_: sealed::Internal, collection: &mut Self::InternalCollection) -> Result<U, E> {
192        if let Ok(collection) = collection.as_mut() {
193            Ok(U::finalize(sealed::Internal, collection))
194        } else {
195            let res = mem::replace(collection, Ok(U::initialize(sealed::Internal, 0, Some(0))));
196
197            Err(res.map(drop).unwrap_err())
198        }
199    }
200}
201
202pub(crate) mod sealed {
203    #[doc(hidden)]
204    pub trait FromStreamPriv<T> {
205        /// Intermediate type used during collection process
206        ///
207        /// The name of this type is internal and cannot be relied upon.
208        type InternalCollection;
209
210        /// Initialize the collection
211        fn initialize(
212            internal: Internal,
213            lower: usize,
214            upper: Option<usize>,
215        ) -> Self::InternalCollection;
216
217        /// Extend the collection with the received item
218        ///
219        /// Return `true` to continue streaming, `false` complete collection.
220        fn extend(internal: Internal, collection: &mut Self::InternalCollection, item: T) -> bool;
221
222        /// Finalize collection into target type.
223        fn finalize(internal: Internal, collection: &mut Self::InternalCollection) -> Self;
224    }
225
226    #[allow(missing_debug_implementations)]
227    pub struct Internal;
228}