reqwest/async_impl/
body.rs

1use std::fmt;
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{ready, Context, Poll};
5use std::time::Duration;
6
7use bytes::Bytes;
8use http_body::Body as HttpBody;
9use http_body_util::combinators::BoxBody;
10use pin_project_lite::pin_project;
11#[cfg(feature = "stream")]
12use tokio::fs::File;
13use tokio::time::Sleep;
14#[cfg(feature = "stream")]
15use tokio_util::io::ReaderStream;
16
17/// An asynchronous request body.
18pub struct Body {
19    inner: Inner,
20}
21
22enum Inner {
23    Reusable(Bytes),
24    Streaming(BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>),
25}
26
27pin_project! {
28    /// A body with a total timeout.
29    ///
30    /// The timeout does not reset upon each chunk, but rather requires the whole
31    /// body be streamed before the deadline is reached.
32    pub(crate) struct TotalTimeoutBody<B> {
33        #[pin]
34        inner: B,
35        timeout: Pin<Box<Sleep>>,
36    }
37}
38
39pin_project! {
40    pub(crate) struct ReadTimeoutBody<B> {
41        #[pin]
42        inner: B,
43        #[pin]
44        sleep: Option<Sleep>,
45        timeout: Duration,
46    }
47}
48
49impl Body {
50    /// Returns a reference to the internal data of the `Body`.
51    ///
52    /// `None` is returned, if the underlying data is a stream.
53    pub fn as_bytes(&self) -> Option<&[u8]> {
54        match &self.inner {
55            Inner::Reusable(bytes) => Some(bytes.as_ref()),
56            Inner::Streaming(..) => None,
57        }
58    }
59
60    /// Wrap a futures `Stream` in a box inside `Body`.
61    ///
62    /// # Example
63    ///
64    /// ```
65    /// # use reqwest::Body;
66    /// # use futures_util;
67    /// # fn main() {
68    /// let chunks: Vec<Result<_, ::std::io::Error>> = vec![
69    ///     Ok("hello"),
70    ///     Ok(" "),
71    ///     Ok("world"),
72    /// ];
73    ///
74    /// let stream = futures_util::stream::iter(chunks);
75    ///
76    /// let body = Body::wrap_stream(stream);
77    /// # }
78    /// ```
79    ///
80    /// # Optional
81    ///
82    /// This requires the `stream` feature to be enabled.
83    #[cfg(feature = "stream")]
84    #[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
85    pub fn wrap_stream<S>(stream: S) -> Body
86    where
87        S: futures_core::stream::TryStream + Send + 'static,
88        S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
89        Bytes: From<S::Ok>,
90    {
91        Body::stream(stream)
92    }
93
94    #[cfg(any(feature = "stream", feature = "multipart", feature = "blocking"))]
95    pub(crate) fn stream<S>(stream: S) -> Body
96    where
97        S: futures_core::stream::TryStream + Send + 'static,
98        S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
99        Bytes: From<S::Ok>,
100    {
101        use futures_util::TryStreamExt;
102        use http_body::Frame;
103        use http_body_util::StreamBody;
104
105        let body = http_body_util::BodyExt::boxed(StreamBody::new(sync_wrapper::SyncStream::new(
106            stream
107                .map_ok(|d| Frame::data(Bytes::from(d)))
108                .map_err(Into::into),
109        )));
110        Body {
111            inner: Inner::Streaming(body),
112        }
113    }
114
115    pub(crate) fn empty() -> Body {
116        Body::reusable(Bytes::new())
117    }
118
119    pub(crate) fn reusable(chunk: Bytes) -> Body {
120        Body {
121            inner: Inner::Reusable(chunk),
122        }
123    }
124
125    /// Wrap a [`HttpBody`] in a box inside `Body`.
126    ///
127    /// # Example
128    ///
129    /// ```
130    /// # use reqwest::Body;
131    /// # use futures_util;
132    /// # fn main() {
133    /// let content = "hello,world!".to_string();
134    ///
135    /// let body = Body::wrap(content);
136    /// # }
137    /// ```
138    pub fn wrap<B>(inner: B) -> Body
139    where
140        B: HttpBody + Send + Sync + 'static,
141        B::Data: Into<Bytes>,
142        B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
143    {
144        use http_body_util::BodyExt;
145
146        let boxed = IntoBytesBody { inner }.map_err(Into::into).boxed();
147
148        Body {
149            inner: Inner::Streaming(boxed),
150        }
151    }
152
153    pub(crate) fn try_clone(&self) -> Option<Body> {
154        match self.inner {
155            Inner::Reusable(ref chunk) => Some(Body::reusable(chunk.clone())),
156            Inner::Streaming { .. } => None,
157        }
158    }
159
160    #[cfg(feature = "multipart")]
161    pub(crate) fn content_length(&self) -> Option<u64> {
162        match self.inner {
163            Inner::Reusable(ref bytes) => Some(bytes.len() as u64),
164            Inner::Streaming(ref body) => body.size_hint().exact(),
165        }
166    }
167}
168
169impl Default for Body {
170    #[inline]
171    fn default() -> Body {
172        Body::empty()
173    }
174}
175
176/*
177impl From<hyper::Body> for Body {
178    #[inline]
179    fn from(body: hyper::Body) -> Body {
180        Self {
181            inner: Inner::Streaming {
182                body: Box::pin(WrapHyper(body)),
183            },
184        }
185    }
186}
187*/
188
189impl From<Bytes> for Body {
190    #[inline]
191    fn from(bytes: Bytes) -> Body {
192        Body::reusable(bytes)
193    }
194}
195
196impl From<Vec<u8>> for Body {
197    #[inline]
198    fn from(vec: Vec<u8>) -> Body {
199        Body::reusable(vec.into())
200    }
201}
202
203impl From<&'static [u8]> for Body {
204    #[inline]
205    fn from(s: &'static [u8]) -> Body {
206        Body::reusable(Bytes::from_static(s))
207    }
208}
209
210impl From<String> for Body {
211    #[inline]
212    fn from(s: String) -> Body {
213        Body::reusable(s.into())
214    }
215}
216
217impl From<&'static str> for Body {
218    #[inline]
219    fn from(s: &'static str) -> Body {
220        s.as_bytes().into()
221    }
222}
223
224#[cfg(feature = "stream")]
225#[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
226impl From<File> for Body {
227    #[inline]
228    fn from(file: File) -> Body {
229        Body::wrap_stream(ReaderStream::new(file))
230    }
231}
232
233impl fmt::Debug for Body {
234    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
235        f.debug_struct("Body").finish()
236    }
237}
238
239impl HttpBody for Body {
240    type Data = Bytes;
241    type Error = crate::Error;
242
243    fn poll_frame(
244        mut self: Pin<&mut Self>,
245        cx: &mut Context,
246    ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
247        match self.inner {
248            Inner::Reusable(ref mut bytes) => {
249                let out = bytes.split_off(0);
250                if out.is_empty() {
251                    Poll::Ready(None)
252                } else {
253                    Poll::Ready(Some(Ok(hyper::body::Frame::data(out))))
254                }
255            }
256            Inner::Streaming(ref mut body) => Poll::Ready(
257                ready!(Pin::new(body).poll_frame(cx))
258                    .map(|opt_chunk| opt_chunk.map_err(crate::error::body)),
259            ),
260        }
261    }
262
263    fn size_hint(&self) -> http_body::SizeHint {
264        match self.inner {
265            Inner::Reusable(ref bytes) => http_body::SizeHint::with_exact(bytes.len() as u64),
266            Inner::Streaming(ref body) => body.size_hint(),
267        }
268    }
269
270    fn is_end_stream(&self) -> bool {
271        match self.inner {
272            Inner::Reusable(ref bytes) => bytes.is_empty(),
273            Inner::Streaming(ref body) => body.is_end_stream(),
274        }
275    }
276}
277
278// ===== impl TotalTimeoutBody =====
279
280pub(crate) fn total_timeout<B>(body: B, timeout: Pin<Box<Sleep>>) -> TotalTimeoutBody<B> {
281    TotalTimeoutBody {
282        inner: body,
283        timeout,
284    }
285}
286
287pub(crate) fn with_read_timeout<B>(body: B, timeout: Duration) -> ReadTimeoutBody<B> {
288    ReadTimeoutBody {
289        inner: body,
290        sleep: None,
291        timeout,
292    }
293}
294
295impl<B> hyper::body::Body for TotalTimeoutBody<B>
296where
297    B: hyper::body::Body,
298    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
299{
300    type Data = B::Data;
301    type Error = crate::Error;
302
303    fn poll_frame(
304        self: Pin<&mut Self>,
305        cx: &mut Context,
306    ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
307        let this = self.project();
308        if let Poll::Ready(()) = this.timeout.as_mut().poll(cx) {
309            return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut))));
310        }
311        Poll::Ready(
312            ready!(this.inner.poll_frame(cx))
313                .map(|opt_chunk| opt_chunk.map_err(crate::error::body)),
314        )
315    }
316
317    #[inline]
318    fn size_hint(&self) -> http_body::SizeHint {
319        self.inner.size_hint()
320    }
321
322    #[inline]
323    fn is_end_stream(&self) -> bool {
324        self.inner.is_end_stream()
325    }
326}
327
328impl<B> hyper::body::Body for ReadTimeoutBody<B>
329where
330    B: hyper::body::Body,
331    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
332{
333    type Data = B::Data;
334    type Error = crate::Error;
335
336    fn poll_frame(
337        self: Pin<&mut Self>,
338        cx: &mut Context,
339    ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
340        let mut this = self.project();
341
342        // Start the `Sleep` if not active.
343        let sleep_pinned = if let Some(some) = this.sleep.as_mut().as_pin_mut() {
344            some
345        } else {
346            this.sleep.set(Some(tokio::time::sleep(*this.timeout)));
347            this.sleep.as_mut().as_pin_mut().unwrap()
348        };
349
350        // Error if the timeout has expired.
351        if let Poll::Ready(()) = sleep_pinned.poll(cx) {
352            return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut))));
353        }
354
355        let item = ready!(this.inner.poll_frame(cx))
356            .map(|opt_chunk| opt_chunk.map_err(crate::error::body));
357        // a ready frame means timeout is reset
358        this.sleep.set(None);
359        Poll::Ready(item)
360    }
361
362    #[inline]
363    fn size_hint(&self) -> http_body::SizeHint {
364        self.inner.size_hint()
365    }
366
367    #[inline]
368    fn is_end_stream(&self) -> bool {
369        self.inner.is_end_stream()
370    }
371}
372
373pub(crate) type ResponseBody =
374    http_body_util::combinators::BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>;
375
376pub(crate) fn boxed<B>(body: B) -> ResponseBody
377where
378    B: hyper::body::Body<Data = Bytes> + Send + Sync + 'static,
379    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
380{
381    use http_body_util::BodyExt;
382
383    body.map_err(box_err).boxed()
384}
385
386pub(crate) fn response<B>(
387    body: B,
388    deadline: Option<Pin<Box<Sleep>>>,
389    read_timeout: Option<Duration>,
390) -> ResponseBody
391where
392    B: hyper::body::Body<Data = Bytes> + Send + Sync + 'static,
393    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
394{
395    use http_body_util::BodyExt;
396
397    match (deadline, read_timeout) {
398        (Some(total), Some(read)) => {
399            let body = with_read_timeout(body, read).map_err(box_err);
400            total_timeout(body, total).map_err(box_err).boxed()
401        }
402        (Some(total), None) => total_timeout(body, total).map_err(box_err).boxed(),
403        (None, Some(read)) => with_read_timeout(body, read).map_err(box_err).boxed(),
404        (None, None) => body.map_err(box_err).boxed(),
405    }
406}
407
408fn box_err<E>(err: E) -> Box<dyn std::error::Error + Send + Sync>
409where
410    E: Into<Box<dyn std::error::Error + Send + Sync>>,
411{
412    err.into()
413}
414
415// ===== impl IntoBytesBody =====
416
417pin_project! {
418    struct IntoBytesBody<B> {
419        #[pin]
420        inner: B,
421    }
422}
423
424// We can't use `map_frame()` because that loses the hint data (for good reason).
425// But we aren't transforming the data.
426impl<B> hyper::body::Body for IntoBytesBody<B>
427where
428    B: hyper::body::Body,
429    B::Data: Into<Bytes>,
430{
431    type Data = Bytes;
432    type Error = B::Error;
433
434    fn poll_frame(
435        self: Pin<&mut Self>,
436        cx: &mut Context,
437    ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
438        match ready!(self.project().inner.poll_frame(cx)) {
439            Some(Ok(f)) => Poll::Ready(Some(Ok(f.map_data(Into::into)))),
440            Some(Err(e)) => Poll::Ready(Some(Err(e))),
441            None => Poll::Ready(None),
442        }
443    }
444
445    #[inline]
446    fn size_hint(&self) -> http_body::SizeHint {
447        self.inner.size_hint()
448    }
449
450    #[inline]
451    fn is_end_stream(&self) -> bool {
452        self.inner.is_end_stream()
453    }
454}
455
456#[cfg(test)]
457mod tests {
458    use http_body::Body as _;
459
460    use super::Body;
461
462    #[test]
463    fn test_as_bytes() {
464        let test_data = b"Test body";
465        let body = Body::from(&test_data[..]);
466        assert_eq!(body.as_bytes(), Some(&test_data[..]));
467    }
468
469    #[test]
470    fn body_exact_length() {
471        let empty_body = Body::empty();
472        assert!(empty_body.is_end_stream());
473        assert_eq!(empty_body.size_hint().exact(), Some(0));
474
475        let bytes_body = Body::reusable("abc".into());
476        assert!(!bytes_body.is_end_stream());
477        assert_eq!(bytes_body.size_hint().exact(), Some(3));
478
479        // can delegate even when wrapped
480        let stream_body = Body::wrap(empty_body);
481        assert!(stream_body.is_end_stream());
482        assert_eq!(stream_body.size_hint().exact(), Some(0));
483    }
484}