1use std::fmt;
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{ready, Context, Poll};
5use std::time::Duration;
6
7use bytes::Bytes;
8use http_body::Body as HttpBody;
9use http_body_util::combinators::BoxBody;
10use pin_project_lite::pin_project;
11#[cfg(feature = "stream")]
12use tokio::fs::File;
13use tokio::time::Sleep;
14#[cfg(feature = "stream")]
15use tokio_util::io::ReaderStream;
16
17pub struct Body {
19 inner: Inner,
20}
21
22enum Inner {
23 Reusable(Bytes),
24 Streaming(BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>),
25}
26
27pin_project! {
28 pub(crate) struct TotalTimeoutBody<B> {
33 #[pin]
34 inner: B,
35 timeout: Pin<Box<Sleep>>,
36 }
37}
38
39pin_project! {
40 pub(crate) struct ReadTimeoutBody<B> {
41 #[pin]
42 inner: B,
43 #[pin]
44 sleep: Option<Sleep>,
45 timeout: Duration,
46 }
47}
48
49impl Body {
50 pub fn as_bytes(&self) -> Option<&[u8]> {
54 match &self.inner {
55 Inner::Reusable(bytes) => Some(bytes.as_ref()),
56 Inner::Streaming(..) => None,
57 }
58 }
59
60 #[cfg(feature = "stream")]
84 #[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
85 pub fn wrap_stream<S>(stream: S) -> Body
86 where
87 S: futures_core::stream::TryStream + Send + 'static,
88 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
89 Bytes: From<S::Ok>,
90 {
91 Body::stream(stream)
92 }
93
94 #[cfg(any(feature = "stream", feature = "multipart", feature = "blocking"))]
95 pub(crate) fn stream<S>(stream: S) -> Body
96 where
97 S: futures_core::stream::TryStream + Send + 'static,
98 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
99 Bytes: From<S::Ok>,
100 {
101 use futures_util::TryStreamExt;
102 use http_body::Frame;
103 use http_body_util::StreamBody;
104
105 let body = http_body_util::BodyExt::boxed(StreamBody::new(sync_wrapper::SyncStream::new(
106 stream
107 .map_ok(|d| Frame::data(Bytes::from(d)))
108 .map_err(Into::into),
109 )));
110 Body {
111 inner: Inner::Streaming(body),
112 }
113 }
114
115 pub(crate) fn empty() -> Body {
116 Body::reusable(Bytes::new())
117 }
118
119 pub(crate) fn reusable(chunk: Bytes) -> Body {
120 Body {
121 inner: Inner::Reusable(chunk),
122 }
123 }
124
125 pub fn wrap<B>(inner: B) -> Body
139 where
140 B: HttpBody + Send + Sync + 'static,
141 B::Data: Into<Bytes>,
142 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
143 {
144 use http_body_util::BodyExt;
145
146 let boxed = IntoBytesBody { inner }.map_err(Into::into).boxed();
147
148 Body {
149 inner: Inner::Streaming(boxed),
150 }
151 }
152
153 pub(crate) fn try_clone(&self) -> Option<Body> {
154 match self.inner {
155 Inner::Reusable(ref chunk) => Some(Body::reusable(chunk.clone())),
156 Inner::Streaming { .. } => None,
157 }
158 }
159
160 #[cfg(feature = "multipart")]
161 pub(crate) fn content_length(&self) -> Option<u64> {
162 match self.inner {
163 Inner::Reusable(ref bytes) => Some(bytes.len() as u64),
164 Inner::Streaming(ref body) => body.size_hint().exact(),
165 }
166 }
167}
168
169impl Default for Body {
170 #[inline]
171 fn default() -> Body {
172 Body::empty()
173 }
174}
175
176impl From<Bytes> for Body {
190 #[inline]
191 fn from(bytes: Bytes) -> Body {
192 Body::reusable(bytes)
193 }
194}
195
196impl From<Vec<u8>> for Body {
197 #[inline]
198 fn from(vec: Vec<u8>) -> Body {
199 Body::reusable(vec.into())
200 }
201}
202
203impl From<&'static [u8]> for Body {
204 #[inline]
205 fn from(s: &'static [u8]) -> Body {
206 Body::reusable(Bytes::from_static(s))
207 }
208}
209
210impl From<String> for Body {
211 #[inline]
212 fn from(s: String) -> Body {
213 Body::reusable(s.into())
214 }
215}
216
217impl From<&'static str> for Body {
218 #[inline]
219 fn from(s: &'static str) -> Body {
220 s.as_bytes().into()
221 }
222}
223
224#[cfg(feature = "stream")]
225#[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
226impl From<File> for Body {
227 #[inline]
228 fn from(file: File) -> Body {
229 Body::wrap_stream(ReaderStream::new(file))
230 }
231}
232
233impl fmt::Debug for Body {
234 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
235 f.debug_struct("Body").finish()
236 }
237}
238
239impl HttpBody for Body {
240 type Data = Bytes;
241 type Error = crate::Error;
242
243 fn poll_frame(
244 mut self: Pin<&mut Self>,
245 cx: &mut Context,
246 ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
247 match self.inner {
248 Inner::Reusable(ref mut bytes) => {
249 let out = bytes.split_off(0);
250 if out.is_empty() {
251 Poll::Ready(None)
252 } else {
253 Poll::Ready(Some(Ok(hyper::body::Frame::data(out))))
254 }
255 }
256 Inner::Streaming(ref mut body) => Poll::Ready(
257 ready!(Pin::new(body).poll_frame(cx))
258 .map(|opt_chunk| opt_chunk.map_err(crate::error::body)),
259 ),
260 }
261 }
262
263 fn size_hint(&self) -> http_body::SizeHint {
264 match self.inner {
265 Inner::Reusable(ref bytes) => http_body::SizeHint::with_exact(bytes.len() as u64),
266 Inner::Streaming(ref body) => body.size_hint(),
267 }
268 }
269
270 fn is_end_stream(&self) -> bool {
271 match self.inner {
272 Inner::Reusable(ref bytes) => bytes.is_empty(),
273 Inner::Streaming(ref body) => body.is_end_stream(),
274 }
275 }
276}
277
278pub(crate) fn total_timeout<B>(body: B, timeout: Pin<Box<Sleep>>) -> TotalTimeoutBody<B> {
281 TotalTimeoutBody {
282 inner: body,
283 timeout,
284 }
285}
286
287pub(crate) fn with_read_timeout<B>(body: B, timeout: Duration) -> ReadTimeoutBody<B> {
288 ReadTimeoutBody {
289 inner: body,
290 sleep: None,
291 timeout,
292 }
293}
294
295impl<B> hyper::body::Body for TotalTimeoutBody<B>
296where
297 B: hyper::body::Body,
298 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
299{
300 type Data = B::Data;
301 type Error = crate::Error;
302
303 fn poll_frame(
304 self: Pin<&mut Self>,
305 cx: &mut Context,
306 ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
307 let this = self.project();
308 if let Poll::Ready(()) = this.timeout.as_mut().poll(cx) {
309 return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut))));
310 }
311 Poll::Ready(
312 ready!(this.inner.poll_frame(cx))
313 .map(|opt_chunk| opt_chunk.map_err(crate::error::body)),
314 )
315 }
316
317 #[inline]
318 fn size_hint(&self) -> http_body::SizeHint {
319 self.inner.size_hint()
320 }
321
322 #[inline]
323 fn is_end_stream(&self) -> bool {
324 self.inner.is_end_stream()
325 }
326}
327
328impl<B> hyper::body::Body for ReadTimeoutBody<B>
329where
330 B: hyper::body::Body,
331 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
332{
333 type Data = B::Data;
334 type Error = crate::Error;
335
336 fn poll_frame(
337 self: Pin<&mut Self>,
338 cx: &mut Context,
339 ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
340 let mut this = self.project();
341
342 let sleep_pinned = if let Some(some) = this.sleep.as_mut().as_pin_mut() {
344 some
345 } else {
346 this.sleep.set(Some(tokio::time::sleep(*this.timeout)));
347 this.sleep.as_mut().as_pin_mut().unwrap()
348 };
349
350 if let Poll::Ready(()) = sleep_pinned.poll(cx) {
352 return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut))));
353 }
354
355 let item = ready!(this.inner.poll_frame(cx))
356 .map(|opt_chunk| opt_chunk.map_err(crate::error::body));
357 this.sleep.set(None);
359 Poll::Ready(item)
360 }
361
362 #[inline]
363 fn size_hint(&self) -> http_body::SizeHint {
364 self.inner.size_hint()
365 }
366
367 #[inline]
368 fn is_end_stream(&self) -> bool {
369 self.inner.is_end_stream()
370 }
371}
372
373pub(crate) type ResponseBody =
374 http_body_util::combinators::BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>;
375
376pub(crate) fn boxed<B>(body: B) -> ResponseBody
377where
378 B: hyper::body::Body<Data = Bytes> + Send + Sync + 'static,
379 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
380{
381 use http_body_util::BodyExt;
382
383 body.map_err(box_err).boxed()
384}
385
386pub(crate) fn response<B>(
387 body: B,
388 deadline: Option<Pin<Box<Sleep>>>,
389 read_timeout: Option<Duration>,
390) -> ResponseBody
391where
392 B: hyper::body::Body<Data = Bytes> + Send + Sync + 'static,
393 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
394{
395 use http_body_util::BodyExt;
396
397 match (deadline, read_timeout) {
398 (Some(total), Some(read)) => {
399 let body = with_read_timeout(body, read).map_err(box_err);
400 total_timeout(body, total).map_err(box_err).boxed()
401 }
402 (Some(total), None) => total_timeout(body, total).map_err(box_err).boxed(),
403 (None, Some(read)) => with_read_timeout(body, read).map_err(box_err).boxed(),
404 (None, None) => body.map_err(box_err).boxed(),
405 }
406}
407
408fn box_err<E>(err: E) -> Box<dyn std::error::Error + Send + Sync>
409where
410 E: Into<Box<dyn std::error::Error + Send + Sync>>,
411{
412 err.into()
413}
414
415pin_project! {
418 struct IntoBytesBody<B> {
419 #[pin]
420 inner: B,
421 }
422}
423
424impl<B> hyper::body::Body for IntoBytesBody<B>
427where
428 B: hyper::body::Body,
429 B::Data: Into<Bytes>,
430{
431 type Data = Bytes;
432 type Error = B::Error;
433
434 fn poll_frame(
435 self: Pin<&mut Self>,
436 cx: &mut Context,
437 ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
438 match ready!(self.project().inner.poll_frame(cx)) {
439 Some(Ok(f)) => Poll::Ready(Some(Ok(f.map_data(Into::into)))),
440 Some(Err(e)) => Poll::Ready(Some(Err(e))),
441 None => Poll::Ready(None),
442 }
443 }
444
445 #[inline]
446 fn size_hint(&self) -> http_body::SizeHint {
447 self.inner.size_hint()
448 }
449
450 #[inline]
451 fn is_end_stream(&self) -> bool {
452 self.inner.is_end_stream()
453 }
454}
455
456#[cfg(test)]
457mod tests {
458 use http_body::Body as _;
459
460 use super::Body;
461
462 #[test]
463 fn test_as_bytes() {
464 let test_data = b"Test body";
465 let body = Body::from(&test_data[..]);
466 assert_eq!(body.as_bytes(), Some(&test_data[..]));
467 }
468
469 #[test]
470 fn body_exact_length() {
471 let empty_body = Body::empty();
472 assert!(empty_body.is_end_stream());
473 assert_eq!(empty_body.size_hint().exact(), Some(0));
474
475 let bytes_body = Body::reusable("abc".into());
476 assert!(!bytes_body.is_end_stream());
477 assert_eq!(bytes_body.size_hint().exact(), Some(3));
478
479 let stream_body = Body::wrap(empty_body);
481 assert!(stream_body.is_end_stream());
482 assert_eq!(stream_body.size_hint().exact(), Some(0));
483 }
484}