reqwest/async_impl/
body.rs

1use std::fmt;
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{ready, Context, Poll};
5use std::time::Duration;
6
7use bytes::Bytes;
8use http_body::Body as HttpBody;
9use http_body_util::combinators::BoxBody;
10//use sync_wrapper::SyncWrapper;
11use pin_project_lite::pin_project;
12#[cfg(feature = "stream")]
13use tokio::fs::File;
14use tokio::time::Sleep;
15#[cfg(feature = "stream")]
16use tokio_util::io::ReaderStream;
17
18/// An asynchronous request body.
19pub struct Body {
20    inner: Inner,
21}
22
23enum Inner {
24    Reusable(Bytes),
25    Streaming(BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>),
26}
27
28pin_project! {
29    /// A body with a total timeout.
30    ///
31    /// The timeout does not reset upon each chunk, but rather requires the whole
32    /// body be streamed before the deadline is reached.
33    pub(crate) struct TotalTimeoutBody<B> {
34        #[pin]
35        inner: B,
36        timeout: Pin<Box<Sleep>>,
37    }
38}
39
40pin_project! {
41    pub(crate) struct ReadTimeoutBody<B> {
42        #[pin]
43        inner: B,
44        #[pin]
45        sleep: Option<Sleep>,
46        timeout: Duration,
47    }
48}
49
50/// Converts any `impl Body` into a `impl Stream` of just its DATA frames.
51#[cfg(any(feature = "stream", feature = "multipart",))]
52pub(crate) struct DataStream<B>(pub(crate) B);
53
54impl Body {
55    /// Returns a reference to the internal data of the `Body`.
56    ///
57    /// `None` is returned, if the underlying data is a stream.
58    pub fn as_bytes(&self) -> Option<&[u8]> {
59        match &self.inner {
60            Inner::Reusable(bytes) => Some(bytes.as_ref()),
61            Inner::Streaming(..) => None,
62        }
63    }
64
65    /// Wrap a futures `Stream` in a box inside `Body`.
66    ///
67    /// # Example
68    ///
69    /// ```
70    /// # use reqwest::Body;
71    /// # use futures_util;
72    /// # fn main() {
73    /// let chunks: Vec<Result<_, ::std::io::Error>> = vec![
74    ///     Ok("hello"),
75    ///     Ok(" "),
76    ///     Ok("world"),
77    /// ];
78    ///
79    /// let stream = futures_util::stream::iter(chunks);
80    ///
81    /// let body = Body::wrap_stream(stream);
82    /// # }
83    /// ```
84    ///
85    /// # Optional
86    ///
87    /// This requires the `stream` feature to be enabled.
88    #[cfg(feature = "stream")]
89    #[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
90    pub fn wrap_stream<S>(stream: S) -> Body
91    where
92        S: futures_core::stream::TryStream + Send + 'static,
93        S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
94        Bytes: From<S::Ok>,
95    {
96        Body::stream(stream)
97    }
98
99    #[cfg(any(feature = "stream", feature = "multipart", feature = "blocking"))]
100    pub(crate) fn stream<S>(stream: S) -> Body
101    where
102        S: futures_core::stream::TryStream + Send + 'static,
103        S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
104        Bytes: From<S::Ok>,
105    {
106        use futures_util::TryStreamExt;
107        use http_body::Frame;
108        use http_body_util::StreamBody;
109
110        let body = http_body_util::BodyExt::boxed(StreamBody::new(sync_wrapper::SyncStream::new(
111            stream
112                .map_ok(|d| Frame::data(Bytes::from(d)))
113                .map_err(Into::into),
114        )));
115        Body {
116            inner: Inner::Streaming(body),
117        }
118    }
119
120    pub(crate) fn empty() -> Body {
121        Body::reusable(Bytes::new())
122    }
123
124    pub(crate) fn reusable(chunk: Bytes) -> Body {
125        Body {
126            inner: Inner::Reusable(chunk),
127        }
128    }
129
130    /// Wrap a [`HttpBody`] in a box inside `Body`.
131    ///
132    /// # Example
133    ///
134    /// ```
135    /// # use reqwest::Body;
136    /// # use futures_util;
137    /// # fn main() {
138    /// let content = "hello,world!".to_string();
139    ///
140    /// let body = Body::wrap(content);
141    /// # }
142    /// ```
143    pub fn wrap<B>(inner: B) -> Body
144    where
145        B: HttpBody + Send + Sync + 'static,
146        B::Data: Into<Bytes>,
147        B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
148    {
149        use http_body_util::BodyExt;
150
151        let boxed = IntoBytesBody { inner }.map_err(Into::into).boxed();
152
153        Body {
154            inner: Inner::Streaming(boxed),
155        }
156    }
157
158    pub(crate) fn try_reuse(self) -> (Option<Bytes>, Self) {
159        let reuse = match self.inner {
160            Inner::Reusable(ref chunk) => Some(chunk.clone()),
161            Inner::Streaming { .. } => None,
162        };
163
164        (reuse, self)
165    }
166
167    pub(crate) fn try_clone(&self) -> Option<Body> {
168        match self.inner {
169            Inner::Reusable(ref chunk) => Some(Body::reusable(chunk.clone())),
170            Inner::Streaming { .. } => None,
171        }
172    }
173
174    #[cfg(feature = "multipart")]
175    pub(crate) fn into_stream(self) -> DataStream<Body> {
176        DataStream(self)
177    }
178
179    #[cfg(feature = "multipart")]
180    pub(crate) fn content_length(&self) -> Option<u64> {
181        match self.inner {
182            Inner::Reusable(ref bytes) => Some(bytes.len() as u64),
183            Inner::Streaming(ref body) => body.size_hint().exact(),
184        }
185    }
186}
187
188impl Default for Body {
189    #[inline]
190    fn default() -> Body {
191        Body::empty()
192    }
193}
194
195/*
196impl From<hyper::Body> for Body {
197    #[inline]
198    fn from(body: hyper::Body) -> Body {
199        Self {
200            inner: Inner::Streaming {
201                body: Box::pin(WrapHyper(body)),
202            },
203        }
204    }
205}
206*/
207
208impl From<Bytes> for Body {
209    #[inline]
210    fn from(bytes: Bytes) -> Body {
211        Body::reusable(bytes)
212    }
213}
214
215impl From<Vec<u8>> for Body {
216    #[inline]
217    fn from(vec: Vec<u8>) -> Body {
218        Body::reusable(vec.into())
219    }
220}
221
222impl From<&'static [u8]> for Body {
223    #[inline]
224    fn from(s: &'static [u8]) -> Body {
225        Body::reusable(Bytes::from_static(s))
226    }
227}
228
229impl From<String> for Body {
230    #[inline]
231    fn from(s: String) -> Body {
232        Body::reusable(s.into())
233    }
234}
235
236impl From<&'static str> for Body {
237    #[inline]
238    fn from(s: &'static str) -> Body {
239        s.as_bytes().into()
240    }
241}
242
243#[cfg(feature = "stream")]
244#[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
245impl From<File> for Body {
246    #[inline]
247    fn from(file: File) -> Body {
248        Body::wrap_stream(ReaderStream::new(file))
249    }
250}
251
252impl fmt::Debug for Body {
253    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
254        f.debug_struct("Body").finish()
255    }
256}
257
258impl HttpBody for Body {
259    type Data = Bytes;
260    type Error = crate::Error;
261
262    fn poll_frame(
263        mut self: Pin<&mut Self>,
264        cx: &mut Context,
265    ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
266        match self.inner {
267            Inner::Reusable(ref mut bytes) => {
268                let out = bytes.split_off(0);
269                if out.is_empty() {
270                    Poll::Ready(None)
271                } else {
272                    Poll::Ready(Some(Ok(hyper::body::Frame::data(out))))
273                }
274            }
275            Inner::Streaming(ref mut body) => Poll::Ready(
276                ready!(Pin::new(body).poll_frame(cx))
277                    .map(|opt_chunk| opt_chunk.map_err(crate::error::body)),
278            ),
279        }
280    }
281
282    fn size_hint(&self) -> http_body::SizeHint {
283        match self.inner {
284            Inner::Reusable(ref bytes) => http_body::SizeHint::with_exact(bytes.len() as u64),
285            Inner::Streaming(ref body) => body.size_hint(),
286        }
287    }
288
289    fn is_end_stream(&self) -> bool {
290        match self.inner {
291            Inner::Reusable(ref bytes) => bytes.is_empty(),
292            Inner::Streaming(ref body) => body.is_end_stream(),
293        }
294    }
295}
296
297// ===== impl TotalTimeoutBody =====
298
299pub(crate) fn total_timeout<B>(body: B, timeout: Pin<Box<Sleep>>) -> TotalTimeoutBody<B> {
300    TotalTimeoutBody {
301        inner: body,
302        timeout,
303    }
304}
305
306pub(crate) fn with_read_timeout<B>(body: B, timeout: Duration) -> ReadTimeoutBody<B> {
307    ReadTimeoutBody {
308        inner: body,
309        sleep: None,
310        timeout,
311    }
312}
313
314impl<B> hyper::body::Body for TotalTimeoutBody<B>
315where
316    B: hyper::body::Body,
317    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
318{
319    type Data = B::Data;
320    type Error = crate::Error;
321
322    fn poll_frame(
323        self: Pin<&mut Self>,
324        cx: &mut Context,
325    ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
326        let this = self.project();
327        if let Poll::Ready(()) = this.timeout.as_mut().poll(cx) {
328            return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut))));
329        }
330        Poll::Ready(
331            ready!(this.inner.poll_frame(cx))
332                .map(|opt_chunk| opt_chunk.map_err(crate::error::body)),
333        )
334    }
335
336    #[inline]
337    fn size_hint(&self) -> http_body::SizeHint {
338        self.inner.size_hint()
339    }
340
341    #[inline]
342    fn is_end_stream(&self) -> bool {
343        self.inner.is_end_stream()
344    }
345}
346
347impl<B> hyper::body::Body for ReadTimeoutBody<B>
348where
349    B: hyper::body::Body,
350    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
351{
352    type Data = B::Data;
353    type Error = crate::Error;
354
355    fn poll_frame(
356        self: Pin<&mut Self>,
357        cx: &mut Context,
358    ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
359        let mut this = self.project();
360
361        // Start the `Sleep` if not active.
362        let sleep_pinned = if let Some(some) = this.sleep.as_mut().as_pin_mut() {
363            some
364        } else {
365            this.sleep.set(Some(tokio::time::sleep(*this.timeout)));
366            this.sleep.as_mut().as_pin_mut().unwrap()
367        };
368
369        // Error if the timeout has expired.
370        if let Poll::Ready(()) = sleep_pinned.poll(cx) {
371            return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut))));
372        }
373
374        let item = ready!(this.inner.poll_frame(cx))
375            .map(|opt_chunk| opt_chunk.map_err(crate::error::body));
376        // a ready frame means timeout is reset
377        this.sleep.set(None);
378        Poll::Ready(item)
379    }
380
381    #[inline]
382    fn size_hint(&self) -> http_body::SizeHint {
383        self.inner.size_hint()
384    }
385
386    #[inline]
387    fn is_end_stream(&self) -> bool {
388        self.inner.is_end_stream()
389    }
390}
391
392pub(crate) type ResponseBody =
393    http_body_util::combinators::BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>;
394
395pub(crate) fn boxed<B>(body: B) -> ResponseBody
396where
397    B: hyper::body::Body<Data = Bytes> + Send + Sync + 'static,
398    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
399{
400    use http_body_util::BodyExt;
401
402    body.map_err(box_err).boxed()
403}
404
405pub(crate) fn response<B>(
406    body: B,
407    deadline: Option<Pin<Box<Sleep>>>,
408    read_timeout: Option<Duration>,
409) -> ResponseBody
410where
411    B: hyper::body::Body<Data = Bytes> + Send + Sync + 'static,
412    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
413{
414    use http_body_util::BodyExt;
415
416    match (deadline, read_timeout) {
417        (Some(total), Some(read)) => {
418            let body = with_read_timeout(body, read).map_err(box_err);
419            total_timeout(body, total).map_err(box_err).boxed()
420        }
421        (Some(total), None) => total_timeout(body, total).map_err(box_err).boxed(),
422        (None, Some(read)) => with_read_timeout(body, read).map_err(box_err).boxed(),
423        (None, None) => body.map_err(box_err).boxed(),
424    }
425}
426
427fn box_err<E>(err: E) -> Box<dyn std::error::Error + Send + Sync>
428where
429    E: Into<Box<dyn std::error::Error + Send + Sync>>,
430{
431    err.into()
432}
433
434// ===== impl DataStream =====
435
436#[cfg(any(feature = "stream", feature = "multipart",))]
437impl<B> futures_core::Stream for DataStream<B>
438where
439    B: HttpBody<Data = Bytes> + Unpin,
440{
441    type Item = Result<Bytes, B::Error>;
442
443    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
444        loop {
445            return match ready!(Pin::new(&mut self.0).poll_frame(cx)) {
446                Some(Ok(frame)) => {
447                    // skip non-data frames
448                    if let Ok(buf) = frame.into_data() {
449                        Poll::Ready(Some(Ok(buf)))
450                    } else {
451                        continue;
452                    }
453                }
454                Some(Err(err)) => Poll::Ready(Some(Err(err))),
455                None => Poll::Ready(None),
456            };
457        }
458    }
459}
460
461// ===== impl IntoBytesBody =====
462
463pin_project! {
464    struct IntoBytesBody<B> {
465        #[pin]
466        inner: B,
467    }
468}
469
470// We can't use `map_frame()` because that loses the hint data (for good reason).
471// But we aren't transforming the data.
472impl<B> hyper::body::Body for IntoBytesBody<B>
473where
474    B: hyper::body::Body,
475    B::Data: Into<Bytes>,
476{
477    type Data = Bytes;
478    type Error = B::Error;
479
480    fn poll_frame(
481        self: Pin<&mut Self>,
482        cx: &mut Context,
483    ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
484        match ready!(self.project().inner.poll_frame(cx)) {
485            Some(Ok(f)) => Poll::Ready(Some(Ok(f.map_data(Into::into)))),
486            Some(Err(e)) => Poll::Ready(Some(Err(e))),
487            None => Poll::Ready(None),
488        }
489    }
490
491    #[inline]
492    fn size_hint(&self) -> http_body::SizeHint {
493        self.inner.size_hint()
494    }
495
496    #[inline]
497    fn is_end_stream(&self) -> bool {
498        self.inner.is_end_stream()
499    }
500}
501
502#[cfg(test)]
503mod tests {
504    use http_body::Body as _;
505
506    use super::Body;
507
508    #[test]
509    fn test_as_bytes() {
510        let test_data = b"Test body";
511        let body = Body::from(&test_data[..]);
512        assert_eq!(body.as_bytes(), Some(&test_data[..]));
513    }
514
515    #[test]
516    fn body_exact_length() {
517        let empty_body = Body::empty();
518        assert!(empty_body.is_end_stream());
519        assert_eq!(empty_body.size_hint().exact(), Some(0));
520
521        let bytes_body = Body::reusable("abc".into());
522        assert!(!bytes_body.is_end_stream());
523        assert_eq!(bytes_body.size_hint().exact(), Some(3));
524
525        // can delegate even when wrapped
526        let stream_body = Body::wrap(empty_body);
527        assert!(stream_body.is_end_stream());
528        assert_eq!(stream_body.size_hint().exact(), Some(0));
529    }
530}