1use std::fmt;
2use std::fs::File;
3use std::future::Future;
4#[cfg(feature = "multipart")]
5use std::io::Cursor;
6use std::io::{self, Read};
7use std::mem::{self, MaybeUninit};
8use std::ptr;
910use bytes::Bytes;
11use futures_channel::mpsc;
1213use crate::async_impl;
1415/// The body of a `Request`.
16///
17/// In most cases, this is not needed directly, as the
18/// [`RequestBuilder.body`][builder] method uses `Into<Body>`, which allows
19/// passing many things (like a string or vector of bytes).
20///
21/// [builder]: ./struct.RequestBuilder.html#method.body
22#[derive(Debug)]
23pub struct Body {
24 kind: Kind,
25}
2627impl Body {
28/// Instantiate a `Body` from a reader.
29 ///
30 /// # Note
31 ///
32 /// While allowing for many types to be used, these bodies do not have
33 /// a way to reset to the beginning and be reused. This means that when
34 /// encountering a 307 or 308 status code, instead of repeating the
35 /// request at the new location, the `Response` will be returned with
36 /// the redirect status code set.
37 ///
38 /// ```rust
39 /// # use std::fs::File;
40 /// # use reqwest::blocking::Body;
41 /// # fn run() -> Result<(), Box<dyn std::error::Error>> {
42 /// let file = File::open("national_secrets.txt")?;
43 /// let body = Body::new(file);
44 /// # Ok(())
45 /// # }
46 /// ```
47 ///
48 /// If you have a set of bytes, like `String` or `Vec<u8>`, using the
49 /// `From` implementations for `Body` will store the data in a manner
50 /// it can be reused.
51 ///
52 /// ```rust
53 /// # use reqwest::blocking::Body;
54 /// # fn run() -> Result<(), Box<dyn std::error::Error>> {
55 /// let s = "A stringy body";
56 /// let body = Body::from(s);
57 /// # Ok(())
58 /// # }
59 /// ```
60pub fn new<R: Read + Send + 'static>(reader: R) -> Body {
61 Body {
62 kind: Kind::Reader(Box::from(reader), None),
63 }
64 }
6566/// Create a `Body` from a `Read` where the size is known in advance
67 /// but the data should not be fully loaded into memory. This will
68 /// set the `Content-Length` header and stream from the `Read`.
69 ///
70 /// ```rust
71 /// # use std::fs::File;
72 /// # use reqwest::blocking::Body;
73 /// # fn run() -> Result<(), Box<dyn std::error::Error>> {
74 /// let file = File::open("a_large_file.txt")?;
75 /// let file_size = file.metadata()?.len();
76 /// let body = Body::sized(file, file_size);
77 /// # Ok(())
78 /// # }
79 /// ```
80pub fn sized<R: Read + Send + 'static>(reader: R, len: u64) -> Body {
81 Body {
82 kind: Kind::Reader(Box::from(reader), Some(len)),
83 }
84 }
8586/// Returns the body as a byte slice if the body is already buffered in
87 /// memory. For streamed requests this method returns `None`.
88pub fn as_bytes(&self) -> Option<&[u8]> {
89match self.kind {
90 Kind::Reader(_, _) => None,
91 Kind::Bytes(ref bytes) => Some(bytes.as_ref()),
92 }
93 }
9495/// Converts streamed requests to their buffered equivalent and
96 /// returns a reference to the buffer. If the request is already
97 /// buffered, this has no effect.
98 ///
99 /// Be aware that for large requests this method is expensive
100 /// and may cause your program to run out of memory.
101pub fn buffer(&mut self) -> Result<&[u8], crate::Error> {
102match self.kind {
103 Kind::Reader(ref mut reader, maybe_len) => {
104let mut bytes = if let Some(len) = maybe_len {
105 Vec::with_capacity(len as usize)
106 } else {
107 Vec::new()
108 };
109 io::copy(reader, &mut bytes).map_err(crate::error::builder)?;
110self.kind = Kind::Bytes(bytes.into());
111self.buffer()
112 }
113 Kind::Bytes(ref bytes) => Ok(bytes.as_ref()),
114 }
115 }
116117#[cfg(feature = "multipart")]
118pub(crate) fn len(&self) -> Option<u64> {
119match self.kind {
120 Kind::Reader(_, len) => len,
121 Kind::Bytes(ref bytes) => Some(bytes.len() as u64),
122 }
123 }
124125#[cfg(feature = "multipart")]
126pub(crate) fn into_reader(self) -> Reader {
127match self.kind {
128 Kind::Reader(r, _) => Reader::Reader(r),
129 Kind::Bytes(b) => Reader::Bytes(Cursor::new(b)),
130 }
131 }
132133pub(crate) fn into_async(self) -> (Option<Sender>, async_impl::Body, Option<u64>) {
134match self.kind {
135 Kind::Reader(read, len) => {
136let (tx, rx) = mpsc::channel(0);
137let tx = Sender {
138 body: (read, len),
139 tx,
140 };
141 (Some(tx), async_impl::Body::stream(rx), len)
142 }
143 Kind::Bytes(chunk) => {
144let len = chunk.len() as u64;
145 (None, async_impl::Body::reusable(chunk), Some(len))
146 }
147 }
148 }
149150pub(crate) fn try_clone(&self) -> Option<Body> {
151self.kind.try_clone().map(|kind| Body { kind })
152 }
153}
154155enum Kind {
156 Reader(Box<dyn Read + Send>, Option<u64>),
157 Bytes(Bytes),
158}
159160impl Kind {
161fn try_clone(&self) -> Option<Kind> {
162match self {
163 Kind::Reader(..) => None,
164 Kind::Bytes(v) => Some(Kind::Bytes(v.clone())),
165 }
166 }
167}
168169impl From<Vec<u8>> for Body {
170#[inline]
171fn from(v: Vec<u8>) -> Body {
172 Body {
173 kind: Kind::Bytes(v.into()),
174 }
175 }
176}
177178impl From<String> for Body {
179#[inline]
180fn from(s: String) -> Body {
181 s.into_bytes().into()
182 }
183}
184185impl From<&'static [u8]> for Body {
186#[inline]
187fn from(s: &'static [u8]) -> Body {
188 Body {
189 kind: Kind::Bytes(Bytes::from_static(s)),
190 }
191 }
192}
193194impl From<&'static str> for Body {
195#[inline]
196fn from(s: &'static str) -> Body {
197 s.as_bytes().into()
198 }
199}
200201impl From<File> for Body {
202#[inline]
203fn from(f: File) -> Body {
204let len = f.metadata().map(|m| m.len()).ok();
205 Body {
206 kind: Kind::Reader(Box::new(f), len),
207 }
208 }
209}
210impl From<Bytes> for Body {
211#[inline]
212fn from(b: Bytes) -> Body {
213 Body {
214 kind: Kind::Bytes(b),
215 }
216 }
217}
218219impl fmt::Debug for Kind {
220fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
221match *self {
222 Kind::Reader(_, ref v) => f
223 .debug_struct("Reader")
224 .field("length", &DebugLength(v))
225 .finish(),
226 Kind::Bytes(ref v) => fmt::Debug::fmt(v, f),
227 }
228 }
229}
230231struct DebugLength<'a>(&'a Option<u64>);
232233impl<'a> fmt::Debug for DebugLength<'a> {
234fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
235match *self.0 {
236Some(ref len) => fmt::Debug::fmt(len, f),
237None => f.write_str("Unknown"),
238 }
239 }
240}
241242#[cfg(feature = "multipart")]
243pub(crate) enum Reader {
244 Reader(Box<dyn Read + Send>),
245 Bytes(Cursor<Bytes>),
246}
247248#[cfg(feature = "multipart")]
249impl Read for Reader {
250fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
251match *self {
252 Reader::Reader(ref mut rdr) => rdr.read(buf),
253 Reader::Bytes(ref mut rdr) => rdr.read(buf),
254 }
255 }
256}
257258pub(crate) struct Sender {
259 body: (Box<dyn Read + Send>, Option<u64>),
260 tx: mpsc::Sender<Result<Bytes, Abort>>,
261}
262263#[derive(Debug)]
264struct Abort;
265266impl fmt::Display for Abort {
267fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
268 f.write_str("abort request body")
269 }
270}
271272impl std::error::Error for Abort {}
273274async fn send_future(sender: Sender) -> Result<(), crate::Error> {
275use bytes::{BufMut, BytesMut};
276use futures_util::SinkExt;
277use std::cmp;
278279let con_len = sender.body.1;
280let cap = cmp::min(sender.body.1.unwrap_or(8192), 8192);
281let mut written = 0;
282let mut buf = BytesMut::zeroed(cap as usize);
283 buf.clear();
284let mut body = sender.body.0;
285// Put in an option so that it can be consumed on error to call abort()
286let mut tx = Some(sender.tx);
287288loop {
289if Some(written) == con_len {
290// Written up to content-length, so stop.
291return Ok(());
292 }
293294// The input stream is read only if the buffer is empty so
295 // that there is only one read in the buffer at any time.
296 //
297 // We need to know whether there is any data to send before
298 // we check the transmission channel (with poll_ready below)
299 // because sometimes the receiver disappears as soon as it
300 // considers the data is completely transmitted, which may
301 // be true.
302 //
303 // The use case is a web server that closes its
304 // input stream as soon as the data received is valid JSON.
305 // This behaviour is questionable, but it exists and the
306 // fact is that there is actually no remaining data to read.
307if buf.is_empty() {
308if buf.capacity() == buf.len() {
309 buf.reserve(8192);
310// zero out the reserved memory
311let uninit = buf.spare_capacity_mut();
312let uninit_len = uninit.len();
313unsafe {
314 ptr::write_bytes(uninit.as_mut_ptr().cast::<u8>(), 0, uninit_len);
315 }
316 }
317318let bytes = unsafe {
319 mem::transmute::<&mut [MaybeUninit<u8>], &mut [u8]>(buf.spare_capacity_mut())
320 };
321match body.read(bytes) {
322Ok(0) => {
323// The buffer was empty and nothing's left to
324 // read. Return.
325return Ok(());
326 }
327Ok(n) => unsafe {
328 buf.advance_mut(n);
329 },
330Err(e) => {
331let _ = tx
332 .take()
333 .expect("tx only taken on error")
334 .clone()
335 .try_send(Err(Abort));
336return Err(crate::error::body(e));
337 }
338 }
339 }
340341// The only way to get here is when the buffer is not empty.
342 // We can check the transmission channel
343344let buf_len = buf.len() as u64;
345 tx.as_mut()
346 .expect("tx only taken on error")
347 .send(Ok(buf.split().freeze()))
348 .await
349.map_err(crate::error::body)?;
350351 written += buf_len;
352 }
353}
354355impl Sender {
356// A `Future` that may do blocking read calls.
357 // As a `Future`, this integrates easily with `wait::timeout`.
358pub(crate) fn send(self) -> impl Future<Output = Result<(), crate::Error>> {
359 send_future(self)
360 }
361}
362363// useful for tests, but not publicly exposed
364#[cfg(test)]
365pub(crate) fn read_to_string(mut body: Body) -> io::Result<String> {
366let mut s = String::new();
367match body.kind {
368 Kind::Reader(ref mut reader, _) => reader.read_to_string(&mut s),
369 Kind::Bytes(ref mut bytes) => (&**bytes).read_to_string(&mut s),
370 }
371 .map(|_| s)
372}