azure_core/
response.rs

1use crate::{
2    error::{ErrorKind, ResultExt},
3    from_json,
4    headers::Headers,
5    StatusCode,
6};
7use bytes::Bytes;
8use futures::{Stream, StreamExt};
9use serde::de::DeserializeOwned;
10use std::{fmt::Debug, pin::Pin};
11
12#[cfg(not(target_arch = "wasm32"))]
13pub(crate) type PinnedStream = Pin<Box<dyn Stream<Item = crate::Result<Bytes>> + Send + Sync>>;
14#[cfg(target_arch = "wasm32")]
15pub(crate) type PinnedStream = Pin<Box<dyn Stream<Item = crate::Result<Bytes>>>>;
16
17/// An HTTP Response.
18pub struct Response {
19    status: StatusCode,
20    headers: Headers,
21    body: ResponseBody,
22}
23
24impl Response {
25    pub fn new(status: StatusCode, headers: Headers, stream: PinnedStream) -> Self {
26        Self {
27            status,
28            headers,
29            body: ResponseBody::new(stream),
30        }
31    }
32
33    /// Get the status code from the response.
34    pub fn status(&self) -> StatusCode {
35        self.status
36    }
37
38    /// Get the headers from the response.
39    pub fn headers(&self) -> &Headers {
40        &self.headers
41    }
42
43    /// Deconstruct the HTTP response into its components.
44    pub fn deconstruct(self) -> (StatusCode, Headers, ResponseBody) {
45        (self.status, self.headers, self.body)
46    }
47
48    /// Consume the HTTP response and return the HTTP body bytes.
49    pub fn into_body(self) -> ResponseBody {
50        self.body
51    }
52
53    pub async fn json<T>(self) -> crate::Result<T>
54    where
55        T: DeserializeOwned,
56    {
57        self.into_body().json().await
58    }
59
60    #[cfg(feature = "xml")]
61    pub async fn xml<T>(self) -> crate::Result<T>
62    where
63        T: DeserializeOwned,
64    {
65        self.into_body().xml().await
66    }
67}
68
69impl std::fmt::Debug for Response {
70    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
71        f.debug_struct("Response")
72            .field("status", &self.status)
73            .field("headers", &self.headers)
74            .field("body", &"<BODY>")
75            .finish()
76    }
77}
78
79/// A response with the body collected as bytes
80#[derive(Debug, Clone)]
81pub struct CollectedResponse {
82    status: StatusCode,
83    headers: Headers,
84    body: Bytes,
85}
86
87impl AsRef<[u8]> for CollectedResponse {
88    fn as_ref(&self) -> &[u8] {
89        self.body.as_ref()
90    }
91}
92
93impl CollectedResponse {
94    /// Create a new instance
95    pub fn new(status: StatusCode, headers: Headers, body: Bytes) -> Self {
96        Self {
97            status,
98            headers,
99            body,
100        }
101    }
102
103    /// Get the status
104    pub fn status(&self) -> &StatusCode {
105        &self.status
106    }
107
108    /// Get the headers
109    pub fn headers(&self) -> &Headers {
110        &self.headers
111    }
112
113    /// Get the body
114    pub fn body(&self) -> &Bytes {
115        &self.body
116    }
117
118    /// From a response
119    pub async fn from_response(response: Response) -> crate::Result<Self> {
120        let (status, headers, body) = response.deconstruct();
121        let body = body.collect().await?;
122        Ok(Self::new(status, headers, body))
123    }
124
125    pub fn json<T>(&self) -> crate::Result<T>
126    where
127        T: DeserializeOwned,
128    {
129        from_json(&self.body)
130    }
131
132    #[cfg(feature = "xml")]
133    pub fn xml<T>(&self) -> crate::Result<T>
134    where
135        T: DeserializeOwned,
136    {
137        crate::xml::read_xml(&self.body)
138    }
139}
140
141/// A response body stream
142///
143/// This body can either be streamed or collected into `Bytes`
144#[pin_project::pin_project]
145pub struct ResponseBody(#[pin] PinnedStream);
146
147impl ResponseBody {
148    fn new(stream: PinnedStream) -> Self {
149        Self(stream)
150    }
151
152    /// Collect the stream into a `Bytes` collection
153    pub async fn collect(mut self) -> crate::Result<Bytes> {
154        let mut final_result = Vec::new();
155
156        while let Some(res) = self.0.next().await {
157            final_result.extend(&res?);
158        }
159
160        Ok(final_result.into())
161    }
162
163    /// Collect the stream into a `String`
164    pub async fn collect_string(self) -> crate::Result<String> {
165        std::str::from_utf8(&self.collect().await?)
166            .context(
167                ErrorKind::DataConversion,
168                "response body was not utf-8 like expected",
169            )
170            .map(ToOwned::to_owned)
171    }
172
173    pub async fn json<T>(self) -> crate::Result<T>
174    where
175        T: DeserializeOwned,
176    {
177        let body = self.collect().await?;
178        from_json(body)
179    }
180
181    #[cfg(feature = "xml")]
182    pub async fn xml<T>(self) -> crate::Result<T>
183    where
184        T: DeserializeOwned,
185    {
186        let body = self.collect().await?;
187        crate::xml::read_xml(&body)
188    }
189}
190
191impl Stream for ResponseBody {
192    type Item = crate::Result<Bytes>;
193    fn poll_next(
194        self: Pin<&mut Self>,
195        cx: &mut std::task::Context<'_>,
196    ) -> std::task::Poll<Option<Self::Item>> {
197        let this = self.project();
198        this.0.poll_next(cx)
199    }
200}
201
202impl Debug for ResponseBody {
203    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
204        f.write_str("ResponseBody")
205    }
206}