Skip to main content

kube_client/client/
body.rs

1use std::{
2    error::Error as StdError,
3    fmt,
4    pin::{Pin, pin},
5    task::{Context, Poll},
6};
7
8use bytes::Bytes;
9use http_body::{Body as HttpBody, Frame, SizeHint};
10use http_body_util::{BodyExt, combinators::UnsyncBoxBody};
11
12/// A request body.
13pub struct Body {
14    kind: Kind,
15}
16
17impl fmt::Debug for Body {
18    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
19        let mut builder = f.debug_struct("Body");
20        match self.kind {
21            Kind::Once(_) => builder.field("kind", &"Once"),
22            Kind::Wrap(_) => builder.field("kind", &"Wrap"),
23        };
24        builder.finish()
25    }
26}
27
28enum Kind {
29    Once(Option<Bytes>),
30    Wrap(UnsyncBoxBody<Bytes, Box<dyn StdError + Send + Sync>>),
31}
32
33impl Body {
34    fn new(kind: Kind) -> Self {
35        Body { kind }
36    }
37
38    /// Create an empty body
39    pub fn empty() -> Self {
40        Self::new(Kind::Once(None))
41    }
42
43    // Create a body from an existing body
44    pub(crate) fn wrap_body<B>(body: B) -> Self
45    where
46        B: HttpBody<Data = Bytes> + Send + 'static,
47        B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
48    {
49        Body::new(Kind::Wrap(body.map_err(Into::into).boxed_unsync()))
50    }
51
52    /// Collect all the data frames and trailers of this request body and return the data frame
53    pub async fn collect_bytes(self) -> Result<Bytes, crate::Error> {
54        Ok(self.collect().await?.to_bytes())
55    }
56
57    /// Tries to clone a [`Body`].
58    ///
59    /// Returns the cloned `Body` when it's [`Kind::Once`].
60    pub fn try_clone(&self) -> Option<Self> {
61        match &self.kind {
62            Kind::Once(bytes) => Some(Self {
63                kind: Kind::Once(bytes.clone()),
64            }),
65            Kind::Wrap(..) => None,
66        }
67    }
68}
69
70impl From<Bytes> for Body {
71    fn from(bytes: Bytes) -> Self {
72        if bytes.is_empty() {
73            Self::empty()
74        } else {
75            Self::new(Kind::Once(Some(bytes)))
76        }
77    }
78}
79
80impl From<Vec<u8>> for Body {
81    fn from(vec: Vec<u8>) -> Self {
82        Self::from(Bytes::from(vec))
83    }
84}
85
86impl HttpBody for Body {
87    type Data = Bytes;
88    type Error = crate::Error;
89
90    fn poll_frame(
91        mut self: Pin<&mut Self>,
92        cx: &mut Context<'_>,
93    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
94        match &mut self.kind {
95            Kind::Once(val) => Poll::Ready(val.take().map(|bytes| Ok(Frame::data(bytes)))),
96            Kind::Wrap(body) => pin!(body).poll_frame(cx).map_err(crate::Error::Service),
97        }
98    }
99
100    fn size_hint(&self) -> SizeHint {
101        match &self.kind {
102            Kind::Once(Some(bytes)) => SizeHint::with_exact(bytes.len() as u64),
103            Kind::Once(None) => SizeHint::with_exact(0),
104            Kind::Wrap(body) => body.size_hint(),
105        }
106    }
107
108    fn is_end_stream(&self) -> bool {
109        match &self.kind {
110            Kind::Once(Some(bytes)) => bytes.is_empty(),
111            Kind::Once(None) => true,
112            Kind::Wrap(body) => body.is_end_stream(),
113        }
114    }
115}