1use crate::Stream;
2
3use core::future::Future;
4use core::marker::{PhantomData, PhantomPinned};
5use core::mem;
6use core::pin::Pin;
7use core::task::{ready, Context, Poll};
8use pin_project_lite::pin_project;
9
10pin_project! {
12 #[must_use = "futures do nothing unless you `.await` or poll them"]
14 #[derive(Debug)]
15 pub struct Collect<T, U, C>
16 {
17 #[pin]
18 stream: T,
19 collection: C,
20 _output: PhantomData<U>,
21 #[pin]
23 _pin: PhantomPinned,
24 }
25}
26
27pub trait FromStream<T>: sealed::FromStreamPriv<T> {}
38
39impl<T, U> Collect<T, U, U::InternalCollection>
40where
41 T: Stream,
42 U: FromStream<T::Item>,
43{
44 pub(super) fn new(stream: T) -> Collect<T, U, U::InternalCollection> {
45 let (lower, upper) = stream.size_hint();
46 let collection = U::initialize(sealed::Internal, lower, upper);
47
48 Collect {
49 stream,
50 collection,
51 _output: PhantomData,
52 _pin: PhantomPinned,
53 }
54 }
55}
56
57impl<T, U> Future for Collect<T, U, U::InternalCollection>
58where
59 T: Stream,
60 U: FromStream<T::Item>,
61{
62 type Output = U;
63
64 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<U> {
65 use Poll::Ready;
66
67 loop {
68 let me = self.as_mut().project();
69
70 let item = match ready!(me.stream.poll_next(cx)) {
71 Some(item) => item,
72 None => {
73 return Ready(U::finalize(sealed::Internal, me.collection));
74 }
75 };
76
77 if !U::extend(sealed::Internal, me.collection, item) {
78 return Ready(U::finalize(sealed::Internal, me.collection));
79 }
80 }
81 }
82}
83
84impl FromStream<()> for () {}
87
88impl sealed::FromStreamPriv<()> for () {
89 type InternalCollection = ();
90
91 fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) {}
92
93 fn extend(_: sealed::Internal, _collection: &mut (), _item: ()) -> bool {
94 true
95 }
96
97 fn finalize(_: sealed::Internal, _collection: &mut ()) {}
98}
99
100impl<T: AsRef<str>> FromStream<T> for String {}
101
102impl<T: AsRef<str>> sealed::FromStreamPriv<T> for String {
103 type InternalCollection = String;
104
105 fn initialize(_: sealed::Internal, _lower: usize, _upper: Option<usize>) -> String {
106 String::new()
107 }
108
109 fn extend(_: sealed::Internal, collection: &mut String, item: T) -> bool {
110 collection.push_str(item.as_ref());
111 true
112 }
113
114 fn finalize(_: sealed::Internal, collection: &mut String) -> String {
115 mem::take(collection)
116 }
117}
118
119impl<T> FromStream<T> for Vec<T> {}
120
121impl<T> sealed::FromStreamPriv<T> for Vec<T> {
122 type InternalCollection = Vec<T>;
123
124 fn initialize(_: sealed::Internal, lower: usize, _upper: Option<usize>) -> Vec<T> {
125 Vec::with_capacity(lower)
126 }
127
128 fn extend(_: sealed::Internal, collection: &mut Vec<T>, item: T) -> bool {
129 collection.push(item);
130 true
131 }
132
133 fn finalize(_: sealed::Internal, collection: &mut Vec<T>) -> Vec<T> {
134 mem::take(collection)
135 }
136}
137
138impl<T> FromStream<T> for Box<[T]> {}
139
140impl<T> sealed::FromStreamPriv<T> for Box<[T]> {
141 type InternalCollection = Vec<T>;
142
143 fn initialize(_: sealed::Internal, lower: usize, upper: Option<usize>) -> Vec<T> {
144 <Vec<T> as sealed::FromStreamPriv<T>>::initialize(sealed::Internal, lower, upper)
145 }
146
147 fn extend(_: sealed::Internal, collection: &mut Vec<T>, item: T) -> bool {
148 <Vec<T> as sealed::FromStreamPriv<T>>::extend(sealed::Internal, collection, item)
149 }
150
151 fn finalize(_: sealed::Internal, collection: &mut Vec<T>) -> Box<[T]> {
152 <Vec<T> as sealed::FromStreamPriv<T>>::finalize(sealed::Internal, collection)
153 .into_boxed_slice()
154 }
155}
156
157impl<T, U, E> FromStream<Result<T, E>> for Result<U, E> where U: FromStream<T> {}
158
159impl<T, U, E> sealed::FromStreamPriv<Result<T, E>> for Result<U, E>
160where
161 U: FromStream<T>,
162{
163 type InternalCollection = Result<U::InternalCollection, E>;
164
165 fn initialize(
166 _: sealed::Internal,
167 lower: usize,
168 upper: Option<usize>,
169 ) -> Result<U::InternalCollection, E> {
170 Ok(U::initialize(sealed::Internal, lower, upper))
171 }
172
173 fn extend(
174 _: sealed::Internal,
175 collection: &mut Self::InternalCollection,
176 item: Result<T, E>,
177 ) -> bool {
178 assert!(collection.is_ok());
179 match item {
180 Ok(item) => {
181 let collection = collection.as_mut().ok().expect("invalid state");
182 U::extend(sealed::Internal, collection, item)
183 }
184 Err(err) => {
185 *collection = Err(err);
186 false
187 }
188 }
189 }
190
191 fn finalize(_: sealed::Internal, collection: &mut Self::InternalCollection) -> Result<U, E> {
192 if let Ok(collection) = collection.as_mut() {
193 Ok(U::finalize(sealed::Internal, collection))
194 } else {
195 let res = mem::replace(collection, Ok(U::initialize(sealed::Internal, 0, Some(0))));
196
197 Err(res.map(drop).unwrap_err())
198 }
199 }
200}
201
202pub(crate) mod sealed {
203 #[doc(hidden)]
204 pub trait FromStreamPriv<T> {
205 type InternalCollection;
209
210 fn initialize(
212 internal: Internal,
213 lower: usize,
214 upper: Option<usize>,
215 ) -> Self::InternalCollection;
216
217 fn extend(internal: Internal, collection: &mut Self::InternalCollection, item: T) -> bool;
221
222 fn finalize(internal: Internal, collection: &mut Self::InternalCollection) -> Self;
224 }
225
226 #[allow(missing_debug_implementations)]
227 pub struct Internal;
228}