1use std::fmt;
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{ready, Context, Poll};
5use std::time::Duration;
6
7use bytes::Bytes;
8use http_body::Body as HttpBody;
9use http_body_util::combinators::BoxBody;
10use pin_project_lite::pin_project;
12#[cfg(feature = "stream")]
13use tokio::fs::File;
14use tokio::time::Sleep;
15#[cfg(feature = "stream")]
16use tokio_util::io::ReaderStream;
17
18pub struct Body {
20 inner: Inner,
21}
22
23enum Inner {
24 Reusable(Bytes),
25 Streaming(BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>),
26}
27
28pin_project! {
29 pub(crate) struct TotalTimeoutBody<B> {
34 #[pin]
35 inner: B,
36 timeout: Pin<Box<Sleep>>,
37 }
38}
39
40pin_project! {
41 pub(crate) struct ReadTimeoutBody<B> {
42 #[pin]
43 inner: B,
44 #[pin]
45 sleep: Option<Sleep>,
46 timeout: Duration,
47 }
48}
49
50#[cfg(any(feature = "stream", feature = "multipart",))]
52pub(crate) struct DataStream<B>(pub(crate) B);
53
54impl Body {
55 pub fn as_bytes(&self) -> Option<&[u8]> {
59 match &self.inner {
60 Inner::Reusable(bytes) => Some(bytes.as_ref()),
61 Inner::Streaming(..) => None,
62 }
63 }
64
65 #[cfg(feature = "stream")]
89 #[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
90 pub fn wrap_stream<S>(stream: S) -> Body
91 where
92 S: futures_core::stream::TryStream + Send + 'static,
93 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
94 Bytes: From<S::Ok>,
95 {
96 Body::stream(stream)
97 }
98
99 #[cfg(any(feature = "stream", feature = "multipart", feature = "blocking"))]
100 pub(crate) fn stream<S>(stream: S) -> Body
101 where
102 S: futures_core::stream::TryStream + Send + 'static,
103 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
104 Bytes: From<S::Ok>,
105 {
106 use futures_util::TryStreamExt;
107 use http_body::Frame;
108 use http_body_util::StreamBody;
109
110 let body = http_body_util::BodyExt::boxed(StreamBody::new(sync_wrapper::SyncStream::new(
111 stream
112 .map_ok(|d| Frame::data(Bytes::from(d)))
113 .map_err(Into::into),
114 )));
115 Body {
116 inner: Inner::Streaming(body),
117 }
118 }
119
120 pub(crate) fn empty() -> Body {
121 Body::reusable(Bytes::new())
122 }
123
124 pub(crate) fn reusable(chunk: Bytes) -> Body {
125 Body {
126 inner: Inner::Reusable(chunk),
127 }
128 }
129
130 pub fn wrap<B>(inner: B) -> Body
144 where
145 B: HttpBody + Send + Sync + 'static,
146 B::Data: Into<Bytes>,
147 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
148 {
149 use http_body_util::BodyExt;
150
151 let boxed = IntoBytesBody { inner }.map_err(Into::into).boxed();
152
153 Body {
154 inner: Inner::Streaming(boxed),
155 }
156 }
157
158 pub(crate) fn try_reuse(self) -> (Option<Bytes>, Self) {
159 let reuse = match self.inner {
160 Inner::Reusable(ref chunk) => Some(chunk.clone()),
161 Inner::Streaming { .. } => None,
162 };
163
164 (reuse, self)
165 }
166
167 pub(crate) fn try_clone(&self) -> Option<Body> {
168 match self.inner {
169 Inner::Reusable(ref chunk) => Some(Body::reusable(chunk.clone())),
170 Inner::Streaming { .. } => None,
171 }
172 }
173
174 #[cfg(feature = "multipart")]
175 pub(crate) fn into_stream(self) -> DataStream<Body> {
176 DataStream(self)
177 }
178
179 #[cfg(feature = "multipart")]
180 pub(crate) fn content_length(&self) -> Option<u64> {
181 match self.inner {
182 Inner::Reusable(ref bytes) => Some(bytes.len() as u64),
183 Inner::Streaming(ref body) => body.size_hint().exact(),
184 }
185 }
186}
187
188impl Default for Body {
189 #[inline]
190 fn default() -> Body {
191 Body::empty()
192 }
193}
194
195impl From<Bytes> for Body {
209 #[inline]
210 fn from(bytes: Bytes) -> Body {
211 Body::reusable(bytes)
212 }
213}
214
215impl From<Vec<u8>> for Body {
216 #[inline]
217 fn from(vec: Vec<u8>) -> Body {
218 Body::reusable(vec.into())
219 }
220}
221
222impl From<&'static [u8]> for Body {
223 #[inline]
224 fn from(s: &'static [u8]) -> Body {
225 Body::reusable(Bytes::from_static(s))
226 }
227}
228
229impl From<String> for Body {
230 #[inline]
231 fn from(s: String) -> Body {
232 Body::reusable(s.into())
233 }
234}
235
236impl From<&'static str> for Body {
237 #[inline]
238 fn from(s: &'static str) -> Body {
239 s.as_bytes().into()
240 }
241}
242
243#[cfg(feature = "stream")]
244#[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
245impl From<File> for Body {
246 #[inline]
247 fn from(file: File) -> Body {
248 Body::wrap_stream(ReaderStream::new(file))
249 }
250}
251
252impl fmt::Debug for Body {
253 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
254 f.debug_struct("Body").finish()
255 }
256}
257
258impl HttpBody for Body {
259 type Data = Bytes;
260 type Error = crate::Error;
261
262 fn poll_frame(
263 mut self: Pin<&mut Self>,
264 cx: &mut Context,
265 ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
266 match self.inner {
267 Inner::Reusable(ref mut bytes) => {
268 let out = bytes.split_off(0);
269 if out.is_empty() {
270 Poll::Ready(None)
271 } else {
272 Poll::Ready(Some(Ok(hyper::body::Frame::data(out))))
273 }
274 }
275 Inner::Streaming(ref mut body) => Poll::Ready(
276 ready!(Pin::new(body).poll_frame(cx))
277 .map(|opt_chunk| opt_chunk.map_err(crate::error::body)),
278 ),
279 }
280 }
281
282 fn size_hint(&self) -> http_body::SizeHint {
283 match self.inner {
284 Inner::Reusable(ref bytes) => http_body::SizeHint::with_exact(bytes.len() as u64),
285 Inner::Streaming(ref body) => body.size_hint(),
286 }
287 }
288
289 fn is_end_stream(&self) -> bool {
290 match self.inner {
291 Inner::Reusable(ref bytes) => bytes.is_empty(),
292 Inner::Streaming(ref body) => body.is_end_stream(),
293 }
294 }
295}
296
297pub(crate) fn total_timeout<B>(body: B, timeout: Pin<Box<Sleep>>) -> TotalTimeoutBody<B> {
300 TotalTimeoutBody {
301 inner: body,
302 timeout,
303 }
304}
305
306pub(crate) fn with_read_timeout<B>(body: B, timeout: Duration) -> ReadTimeoutBody<B> {
307 ReadTimeoutBody {
308 inner: body,
309 sleep: None,
310 timeout,
311 }
312}
313
314impl<B> hyper::body::Body for TotalTimeoutBody<B>
315where
316 B: hyper::body::Body,
317 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
318{
319 type Data = B::Data;
320 type Error = crate::Error;
321
322 fn poll_frame(
323 self: Pin<&mut Self>,
324 cx: &mut Context,
325 ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
326 let this = self.project();
327 if let Poll::Ready(()) = this.timeout.as_mut().poll(cx) {
328 return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut))));
329 }
330 Poll::Ready(
331 ready!(this.inner.poll_frame(cx))
332 .map(|opt_chunk| opt_chunk.map_err(crate::error::body)),
333 )
334 }
335
336 #[inline]
337 fn size_hint(&self) -> http_body::SizeHint {
338 self.inner.size_hint()
339 }
340
341 #[inline]
342 fn is_end_stream(&self) -> bool {
343 self.inner.is_end_stream()
344 }
345}
346
347impl<B> hyper::body::Body for ReadTimeoutBody<B>
348where
349 B: hyper::body::Body,
350 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
351{
352 type Data = B::Data;
353 type Error = crate::Error;
354
355 fn poll_frame(
356 self: Pin<&mut Self>,
357 cx: &mut Context,
358 ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
359 let mut this = self.project();
360
361 let sleep_pinned = if let Some(some) = this.sleep.as_mut().as_pin_mut() {
363 some
364 } else {
365 this.sleep.set(Some(tokio::time::sleep(*this.timeout)));
366 this.sleep.as_mut().as_pin_mut().unwrap()
367 };
368
369 if let Poll::Ready(()) = sleep_pinned.poll(cx) {
371 return Poll::Ready(Some(Err(crate::error::body(crate::error::TimedOut))));
372 }
373
374 let item = ready!(this.inner.poll_frame(cx))
375 .map(|opt_chunk| opt_chunk.map_err(crate::error::body));
376 this.sleep.set(None);
378 Poll::Ready(item)
379 }
380
381 #[inline]
382 fn size_hint(&self) -> http_body::SizeHint {
383 self.inner.size_hint()
384 }
385
386 #[inline]
387 fn is_end_stream(&self) -> bool {
388 self.inner.is_end_stream()
389 }
390}
391
392pub(crate) type ResponseBody =
393 http_body_util::combinators::BoxBody<Bytes, Box<dyn std::error::Error + Send + Sync>>;
394
395pub(crate) fn boxed<B>(body: B) -> ResponseBody
396where
397 B: hyper::body::Body<Data = Bytes> + Send + Sync + 'static,
398 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
399{
400 use http_body_util::BodyExt;
401
402 body.map_err(box_err).boxed()
403}
404
405pub(crate) fn response<B>(
406 body: B,
407 deadline: Option<Pin<Box<Sleep>>>,
408 read_timeout: Option<Duration>,
409) -> ResponseBody
410where
411 B: hyper::body::Body<Data = Bytes> + Send + Sync + 'static,
412 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
413{
414 use http_body_util::BodyExt;
415
416 match (deadline, read_timeout) {
417 (Some(total), Some(read)) => {
418 let body = with_read_timeout(body, read).map_err(box_err);
419 total_timeout(body, total).map_err(box_err).boxed()
420 }
421 (Some(total), None) => total_timeout(body, total).map_err(box_err).boxed(),
422 (None, Some(read)) => with_read_timeout(body, read).map_err(box_err).boxed(),
423 (None, None) => body.map_err(box_err).boxed(),
424 }
425}
426
427fn box_err<E>(err: E) -> Box<dyn std::error::Error + Send + Sync>
428where
429 E: Into<Box<dyn std::error::Error + Send + Sync>>,
430{
431 err.into()
432}
433
434#[cfg(any(feature = "stream", feature = "multipart",))]
437impl<B> futures_core::Stream for DataStream<B>
438where
439 B: HttpBody<Data = Bytes> + Unpin,
440{
441 type Item = Result<Bytes, B::Error>;
442
443 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
444 loop {
445 return match ready!(Pin::new(&mut self.0).poll_frame(cx)) {
446 Some(Ok(frame)) => {
447 if let Ok(buf) = frame.into_data() {
449 Poll::Ready(Some(Ok(buf)))
450 } else {
451 continue;
452 }
453 }
454 Some(Err(err)) => Poll::Ready(Some(Err(err))),
455 None => Poll::Ready(None),
456 };
457 }
458 }
459}
460
461pin_project! {
464 struct IntoBytesBody<B> {
465 #[pin]
466 inner: B,
467 }
468}
469
470impl<B> hyper::body::Body for IntoBytesBody<B>
473where
474 B: hyper::body::Body,
475 B::Data: Into<Bytes>,
476{
477 type Data = Bytes;
478 type Error = B::Error;
479
480 fn poll_frame(
481 self: Pin<&mut Self>,
482 cx: &mut Context,
483 ) -> Poll<Option<Result<hyper::body::Frame<Self::Data>, Self::Error>>> {
484 match ready!(self.project().inner.poll_frame(cx)) {
485 Some(Ok(f)) => Poll::Ready(Some(Ok(f.map_data(Into::into)))),
486 Some(Err(e)) => Poll::Ready(Some(Err(e))),
487 None => Poll::Ready(None),
488 }
489 }
490
491 #[inline]
492 fn size_hint(&self) -> http_body::SizeHint {
493 self.inner.size_hint()
494 }
495
496 #[inline]
497 fn is_end_stream(&self) -> bool {
498 self.inner.is_end_stream()
499 }
500}
501
502#[cfg(test)]
503mod tests {
504 use http_body::Body as _;
505
506 use super::Body;
507
508 #[test]
509 fn test_as_bytes() {
510 let test_data = b"Test body";
511 let body = Body::from(&test_data[..]);
512 assert_eq!(body.as_bytes(), Some(&test_data[..]));
513 }
514
515 #[test]
516 fn body_exact_length() {
517 let empty_body = Body::empty();
518 assert!(empty_body.is_end_stream());
519 assert_eq!(empty_body.size_hint().exact(), Some(0));
520
521 let bytes_body = Body::reusable("abc".into());
522 assert!(!bytes_body.is_end_stream());
523 assert_eq!(bytes_body.size_hint().exact(), Some(3));
524
525 let stream_body = Body::wrap(empty_body);
527 assert!(stream_body.is_end_stream());
528 assert_eq!(stream_body.size_hint().exact(), Some(0));
529 }
530}