hyper/client/conn/http1.rs
1//! HTTP/1 client connections
2
3use std::error::Error as StdError;
4use std::fmt;
5use std::future::Future;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8
9use crate::rt::{Read, Write};
10use bytes::Bytes;
11use futures_util::ready;
12use http::{Request, Response};
13use httparse::ParserConfig;
14
15use super::super::dispatch::{self, TrySendError};
16use crate::body::{Body, Incoming as IncomingBody};
17use crate::proto;
18
19type Dispatcher<T, B> =
20 proto::dispatch::Dispatcher<proto::dispatch::Client<B>, B, T, proto::h1::ClientTransaction>;
21
22/// The sender side of an established connection.
23pub struct SendRequest<B> {
24 dispatch: dispatch::Sender<Request<B>, Response<IncomingBody>>,
25}
26
27/// Deconstructed parts of a `Connection`.
28///
29/// This allows taking apart a `Connection` at a later time, in order to
30/// reclaim the IO object, and additional related pieces.
31#[derive(Debug)]
32#[non_exhaustive]
33pub struct Parts<T> {
34 /// The original IO object used in the handshake.
35 pub io: T,
36 /// A buffer of bytes that have been read but not processed as HTTP.
37 ///
38 /// For instance, if the `Connection` is used for an HTTP upgrade request,
39 /// it is possible the server sent back the first bytes of the new protocol
40 /// along with the response upgrade.
41 ///
42 /// You will want to check for any existing bytes if you plan to continue
43 /// communicating on the IO object.
44 pub read_buf: Bytes,
45}
46
47/// A future that processes all HTTP state for the IO object.
48///
49/// In most cases, this should just be spawned into an executor, so that it
50/// can process incoming and outgoing messages, notice hangups, and the like.
51///
52/// Instances of this type are typically created via the [`handshake`] function
53#[must_use = "futures do nothing unless polled"]
54pub struct Connection<T, B>
55where
56 T: Read + Write,
57 B: Body + 'static,
58{
59 inner: Dispatcher<T, B>,
60}
61
62impl<T, B> Connection<T, B>
63where
64 T: Read + Write + Unpin,
65 B: Body + 'static,
66 B::Error: Into<Box<dyn StdError + Send + Sync>>,
67{
68 /// Return the inner IO object, and additional information.
69 ///
70 /// Only works for HTTP/1 connections. HTTP/2 connections will panic.
71 pub fn into_parts(self) -> Parts<T> {
72 let (io, read_buf, _) = self.inner.into_inner();
73 Parts { io, read_buf }
74 }
75
76 /// Poll the connection for completion, but without calling `shutdown`
77 /// on the underlying IO.
78 ///
79 /// This is useful to allow running a connection while doing an HTTP
80 /// upgrade. Once the upgrade is completed, the connection would be "done",
81 /// but it is not desired to actually shutdown the IO object. Instead you
82 /// would take it back using `into_parts`.
83 ///
84 /// Use [`poll_fn`](https://docs.rs/futures/0.1.25/futures/future/fn.poll_fn.html)
85 /// and [`try_ready!`](https://docs.rs/futures/0.1.25/futures/macro.try_ready.html)
86 /// to work with this function; or use the `without_shutdown` wrapper.
87 pub fn poll_without_shutdown(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
88 self.inner.poll_without_shutdown(cx)
89 }
90
91 /// Prevent shutdown of the underlying IO object at the end of service the request,
92 /// instead run `into_parts`. This is a convenience wrapper over `poll_without_shutdown`.
93 pub async fn without_shutdown(self) -> crate::Result<Parts<T>> {
94 let mut conn = Some(self);
95 futures_util::future::poll_fn(move |cx| -> Poll<crate::Result<Parts<T>>> {
96 ready!(conn.as_mut().unwrap().poll_without_shutdown(cx))?;
97 Poll::Ready(Ok(conn.take().unwrap().into_parts()))
98 })
99 .await
100 }
101}
102
103/// A builder to configure an HTTP connection.
104///
105/// After setting options, the builder is used to create a handshake future.
106///
107/// **Note**: The default values of options are *not considered stable*. They
108/// are subject to change at any time.
109#[derive(Clone, Debug)]
110pub struct Builder {
111 h09_responses: bool,
112 h1_parser_config: ParserConfig,
113 h1_writev: Option<bool>,
114 h1_title_case_headers: bool,
115 h1_preserve_header_case: bool,
116 h1_max_headers: Option<usize>,
117 #[cfg(feature = "ffi")]
118 h1_preserve_header_order: bool,
119 h1_read_buf_exact_size: Option<usize>,
120 h1_max_buf_size: Option<usize>,
121}
122
123/// Returns a handshake future over some IO.
124///
125/// This is a shortcut for `Builder::new().handshake(io)`.
126/// See [`client::conn`](crate::client::conn) for more.
127pub async fn handshake<T, B>(io: T) -> crate::Result<(SendRequest<B>, Connection<T, B>)>
128where
129 T: Read + Write + Unpin,
130 B: Body + 'static,
131 B::Data: Send,
132 B::Error: Into<Box<dyn StdError + Send + Sync>>,
133{
134 Builder::new().handshake(io).await
135}
136
137// ===== impl SendRequest
138
139impl<B> SendRequest<B> {
140 /// Polls to determine whether this sender can be used yet for a request.
141 ///
142 /// If the associated connection is closed, this returns an Error.
143 pub fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
144 self.dispatch.poll_ready(cx)
145 }
146
147 /// Waits until the dispatcher is ready
148 ///
149 /// If the associated connection is closed, this returns an Error.
150 pub async fn ready(&mut self) -> crate::Result<()> {
151 futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
152 }
153
154 /// Checks if the connection is currently ready to send a request.
155 ///
156 /// # Note
157 ///
158 /// This is mostly a hint. Due to inherent latency of networks, it is
159 /// possible that even after checking this is ready, sending a request
160 /// may still fail because the connection was closed in the meantime.
161 pub fn is_ready(&self) -> bool {
162 self.dispatch.is_ready()
163 }
164
165 /// Checks if the connection side has been closed.
166 pub fn is_closed(&self) -> bool {
167 self.dispatch.is_closed()
168 }
169}
170
171impl<B> SendRequest<B>
172where
173 B: Body + 'static,
174{
175 /// Sends a `Request` on the associated connection.
176 ///
177 /// Returns a future that if successful, yields the `Response`.
178 ///
179 /// `req` must have a `Host` header.
180 ///
181 /// # Uri
182 ///
183 /// The `Uri` of the request is serialized as-is.
184 ///
185 /// - Usually you want origin-form (`/path?query`).
186 /// - For sending to an HTTP proxy, you want to send in absolute-form
187 /// (`https://hyper.rs/guides`).
188 ///
189 /// This is however not enforced or validated and it is up to the user
190 /// of this method to ensure the `Uri` is correct for their intended purpose.
191 pub fn send_request(
192 &mut self,
193 req: Request<B>,
194 ) -> impl Future<Output = crate::Result<Response<IncomingBody>>> {
195 let sent = self.dispatch.send(req);
196
197 async move {
198 match sent {
199 Ok(rx) => match rx.await {
200 Ok(Ok(resp)) => Ok(resp),
201 Ok(Err(err)) => Err(err),
202 // this is definite bug if it happens, but it shouldn't happen!
203 Err(_canceled) => panic!("dispatch dropped without returning error"),
204 },
205 Err(_req) => {
206 debug!("connection was not ready");
207 Err(crate::Error::new_canceled().with("connection was not ready"))
208 }
209 }
210 }
211 }
212
213 /// Sends a `Request` on the associated connection.
214 ///
215 /// Returns a future that if successful, yields the `Response`.
216 ///
217 /// # Error
218 ///
219 /// If there was an error before trying to serialize the request to the
220 /// connection, the message will be returned as part of this error.
221 pub fn try_send_request(
222 &mut self,
223 req: Request<B>,
224 ) -> impl Future<Output = Result<Response<IncomingBody>, TrySendError<Request<B>>>> {
225 let sent = self.dispatch.try_send(req);
226 async move {
227 match sent {
228 Ok(rx) => match rx.await {
229 Ok(Ok(res)) => Ok(res),
230 Ok(Err(err)) => Err(err),
231 // this is definite bug if it happens, but it shouldn't happen!
232 Err(_) => panic!("dispatch dropped without returning error"),
233 },
234 Err(req) => {
235 debug!("connection was not ready");
236 let error = crate::Error::new_canceled().with("connection was not ready");
237 Err(TrySendError {
238 error,
239 message: Some(req),
240 })
241 }
242 }
243 }
244 }
245}
246
247impl<B> fmt::Debug for SendRequest<B> {
248 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
249 f.debug_struct("SendRequest").finish()
250 }
251}
252
253// ===== impl Connection
254
255impl<T, B> Connection<T, B>
256where
257 T: Read + Write + Unpin + Send,
258 B: Body + 'static,
259 B::Error: Into<Box<dyn StdError + Send + Sync>>,
260{
261 /// Enable this connection to support higher-level HTTP upgrades.
262 ///
263 /// See [the `upgrade` module](crate::upgrade) for more.
264 pub fn with_upgrades(self) -> upgrades::UpgradeableConnection<T, B> {
265 upgrades::UpgradeableConnection { inner: Some(self) }
266 }
267}
268
269impl<T, B> fmt::Debug for Connection<T, B>
270where
271 T: Read + Write + fmt::Debug,
272 B: Body + 'static,
273{
274 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
275 f.debug_struct("Connection").finish()
276 }
277}
278
279impl<T, B> Future for Connection<T, B>
280where
281 T: Read + Write + Unpin,
282 B: Body + 'static,
283 B::Data: Send,
284 B::Error: Into<Box<dyn StdError + Send + Sync>>,
285{
286 type Output = crate::Result<()>;
287
288 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
289 match ready!(Pin::new(&mut self.inner).poll(cx))? {
290 proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
291 proto::Dispatched::Upgrade(pending) => {
292 // With no `Send` bound on `I`, we can't try to do
293 // upgrades here. In case a user was trying to use
294 // `upgrade` with this API, send a special
295 // error letting them know about that.
296 pending.manual();
297 Poll::Ready(Ok(()))
298 }
299 }
300 }
301}
302
303// ===== impl Builder
304
305impl Builder {
306 /// Creates a new connection builder.
307 #[inline]
308 pub fn new() -> Builder {
309 Builder {
310 h09_responses: false,
311 h1_writev: None,
312 h1_read_buf_exact_size: None,
313 h1_parser_config: Default::default(),
314 h1_title_case_headers: false,
315 h1_preserve_header_case: false,
316 h1_max_headers: None,
317 #[cfg(feature = "ffi")]
318 h1_preserve_header_order: false,
319 h1_max_buf_size: None,
320 }
321 }
322
323 /// Set whether HTTP/0.9 responses should be tolerated.
324 ///
325 /// Default is false.
326 pub fn http09_responses(&mut self, enabled: bool) -> &mut Builder {
327 self.h09_responses = enabled;
328 self
329 }
330
331 /// Set whether HTTP/1 connections will accept spaces between header names
332 /// and the colon that follow them in responses.
333 ///
334 /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
335 /// to say about it:
336 ///
337 /// > No whitespace is allowed between the header field-name and colon. In
338 /// > the past, differences in the handling of such whitespace have led to
339 /// > security vulnerabilities in request routing and response handling. A
340 /// > server MUST reject any received request message that contains
341 /// > whitespace between a header field-name and colon with a response code
342 /// > of 400 (Bad Request). A proxy MUST remove any such whitespace from a
343 /// > response message before forwarding the message downstream.
344 ///
345 /// Default is false.
346 ///
347 /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
348 pub fn allow_spaces_after_header_name_in_responses(&mut self, enabled: bool) -> &mut Builder {
349 self.h1_parser_config
350 .allow_spaces_after_header_name_in_responses(enabled);
351 self
352 }
353
354 /// Set whether HTTP/1 connections will accept obsolete line folding for
355 /// header values.
356 ///
357 /// Newline codepoints (`\r` and `\n`) will be transformed to spaces when
358 /// parsing.
359 ///
360 /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
361 /// to say about it:
362 ///
363 /// > A server that receives an obs-fold in a request message that is not
364 /// > within a message/http container MUST either reject the message by
365 /// > sending a 400 (Bad Request), preferably with a representation
366 /// > explaining that obsolete line folding is unacceptable, or replace
367 /// > each received obs-fold with one or more SP octets prior to
368 /// > interpreting the field value or forwarding the message downstream.
369 ///
370 /// > A proxy or gateway that receives an obs-fold in a response message
371 /// > that is not within a message/http container MUST either discard the
372 /// > message and replace it with a 502 (Bad Gateway) response, preferably
373 /// > with a representation explaining that unacceptable line folding was
374 /// > received, or replace each received obs-fold with one or more SP
375 /// > octets prior to interpreting the field value or forwarding the
376 /// > message downstream.
377 ///
378 /// > A user agent that receives an obs-fold in a response message that is
379 /// > not within a message/http container MUST replace each received
380 /// > obs-fold with one or more SP octets prior to interpreting the field
381 /// > value.
382 ///
383 /// Default is false.
384 ///
385 /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
386 pub fn allow_obsolete_multiline_headers_in_responses(&mut self, enabled: bool) -> &mut Builder {
387 self.h1_parser_config
388 .allow_obsolete_multiline_headers_in_responses(enabled);
389 self
390 }
391
392 /// Set whether HTTP/1 connections will silently ignored malformed header lines.
393 ///
394 /// If this is enabled and a header line does not start with a valid header
395 /// name, or does not include a colon at all, the line will be silently ignored
396 /// and no error will be reported.
397 ///
398 /// Default is false.
399 pub fn ignore_invalid_headers_in_responses(&mut self, enabled: bool) -> &mut Builder {
400 self.h1_parser_config
401 .ignore_invalid_headers_in_responses(enabled);
402 self
403 }
404
405 /// Set whether HTTP/1 connections should try to use vectored writes,
406 /// or always flatten into a single buffer.
407 ///
408 /// Note that setting this to false may mean more copies of body data,
409 /// but may also improve performance when an IO transport doesn't
410 /// support vectored writes well, such as most TLS implementations.
411 ///
412 /// Setting this to true will force hyper to use queued strategy
413 /// which may eliminate unnecessary cloning on some TLS backends
414 ///
415 /// Default is `auto`. In this mode hyper will try to guess which
416 /// mode to use
417 pub fn writev(&mut self, enabled: bool) -> &mut Builder {
418 self.h1_writev = Some(enabled);
419 self
420 }
421
422 /// Set whether HTTP/1 connections will write header names as title case at
423 /// the socket level.
424 ///
425 /// Default is false.
426 pub fn title_case_headers(&mut self, enabled: bool) -> &mut Builder {
427 self.h1_title_case_headers = enabled;
428 self
429 }
430
431 /// Set whether to support preserving original header cases.
432 ///
433 /// Currently, this will record the original cases received, and store them
434 /// in a private extension on the `Response`. It will also look for and use
435 /// such an extension in any provided `Request`.
436 ///
437 /// Since the relevant extension is still private, there is no way to
438 /// interact with the original cases. The only effect this can have now is
439 /// to forward the cases in a proxy-like fashion.
440 ///
441 /// Default is false.
442 pub fn preserve_header_case(&mut self, enabled: bool) -> &mut Builder {
443 self.h1_preserve_header_case = enabled;
444 self
445 }
446
447 /// Set the maximum number of headers.
448 ///
449 /// When a response is received, the parser will reserve a buffer to store headers for optimal
450 /// performance.
451 ///
452 /// If client receives more headers than the buffer size, the error "message header too large"
453 /// is returned.
454 ///
455 /// Note that headers is allocated on the stack by default, which has higher performance. After
456 /// setting this value, headers will be allocated in heap memory, that is, heap memory
457 /// allocation will occur for each response, and there will be a performance drop of about 5%.
458 ///
459 /// Default is 100.
460 pub fn max_headers(&mut self, val: usize) -> &mut Self {
461 self.h1_max_headers = Some(val);
462 self
463 }
464
465 /// Set whether to support preserving original header order.
466 ///
467 /// Currently, this will record the order in which headers are received, and store this
468 /// ordering in a private extension on the `Response`. It will also look for and use
469 /// such an extension in any provided `Request`.
470 ///
471 /// Default is false.
472 #[cfg(feature = "ffi")]
473 pub fn preserve_header_order(&mut self, enabled: bool) -> &mut Builder {
474 self.h1_preserve_header_order = enabled;
475 self
476 }
477
478 /// Sets the exact size of the read buffer to *always* use.
479 ///
480 /// Note that setting this option unsets the `max_buf_size` option.
481 ///
482 /// Default is an adaptive read buffer.
483 pub fn read_buf_exact_size(&mut self, sz: Option<usize>) -> &mut Builder {
484 self.h1_read_buf_exact_size = sz;
485 self.h1_max_buf_size = None;
486 self
487 }
488
489 /// Set the maximum buffer size for the connection.
490 ///
491 /// Default is ~400kb.
492 ///
493 /// Note that setting this option unsets the `read_exact_buf_size` option.
494 ///
495 /// # Panics
496 ///
497 /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
498 pub fn max_buf_size(&mut self, max: usize) -> &mut Self {
499 assert!(
500 max >= proto::h1::MINIMUM_MAX_BUFFER_SIZE,
501 "the max_buf_size cannot be smaller than the minimum that h1 specifies."
502 );
503
504 self.h1_max_buf_size = Some(max);
505 self.h1_read_buf_exact_size = None;
506 self
507 }
508
509 /// Constructs a connection with the configured options and IO.
510 /// See [`client::conn`](crate::client::conn) for more.
511 ///
512 /// Note, if [`Connection`] is not `await`-ed, [`SendRequest`] will
513 /// do nothing.
514 pub fn handshake<T, B>(
515 &self,
516 io: T,
517 ) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>>
518 where
519 T: Read + Write + Unpin,
520 B: Body + 'static,
521 B::Data: Send,
522 B::Error: Into<Box<dyn StdError + Send + Sync>>,
523 {
524 let opts = self.clone();
525
526 async move {
527 trace!("client handshake HTTP/1");
528
529 let (tx, rx) = dispatch::channel();
530 let mut conn = proto::Conn::new(io);
531 conn.set_h1_parser_config(opts.h1_parser_config);
532 if let Some(writev) = opts.h1_writev {
533 if writev {
534 conn.set_write_strategy_queue();
535 } else {
536 conn.set_write_strategy_flatten();
537 }
538 }
539 if opts.h1_title_case_headers {
540 conn.set_title_case_headers();
541 }
542 if opts.h1_preserve_header_case {
543 conn.set_preserve_header_case();
544 }
545 if let Some(max_headers) = opts.h1_max_headers {
546 conn.set_http1_max_headers(max_headers);
547 }
548 #[cfg(feature = "ffi")]
549 if opts.h1_preserve_header_order {
550 conn.set_preserve_header_order();
551 }
552
553 if opts.h09_responses {
554 conn.set_h09_responses();
555 }
556
557 if let Some(sz) = opts.h1_read_buf_exact_size {
558 conn.set_read_buf_exact_size(sz);
559 }
560 if let Some(max) = opts.h1_max_buf_size {
561 conn.set_max_buf_size(max);
562 }
563 let cd = proto::h1::dispatch::Client::new(rx);
564 let proto = proto::h1::Dispatcher::new(cd, conn);
565
566 Ok((SendRequest { dispatch: tx }, Connection { inner: proto }))
567 }
568 }
569}
570
571mod upgrades {
572 use crate::upgrade::Upgraded;
573
574 use super::*;
575
576 // A future binding a connection with a Service with Upgrade support.
577 //
578 // This type is unnameable outside the crate.
579 #[must_use = "futures do nothing unless polled"]
580 #[allow(missing_debug_implementations)]
581 pub struct UpgradeableConnection<T, B>
582 where
583 T: Read + Write + Unpin + Send + 'static,
584 B: Body + 'static,
585 B::Error: Into<Box<dyn StdError + Send + Sync>>,
586 {
587 pub(super) inner: Option<Connection<T, B>>,
588 }
589
590 impl<I, B> Future for UpgradeableConnection<I, B>
591 where
592 I: Read + Write + Unpin + Send + 'static,
593 B: Body + 'static,
594 B::Data: Send,
595 B::Error: Into<Box<dyn StdError + Send + Sync>>,
596 {
597 type Output = crate::Result<()>;
598
599 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
600 match ready!(Pin::new(&mut self.inner.as_mut().unwrap().inner).poll(cx)) {
601 Ok(proto::Dispatched::Shutdown) => Poll::Ready(Ok(())),
602 Ok(proto::Dispatched::Upgrade(pending)) => {
603 let Parts { io, read_buf } = self.inner.take().unwrap().into_parts();
604 pending.fulfill(Upgraded::new(io, read_buf));
605 Poll::Ready(Ok(()))
606 }
607 Err(e) => Poll::Ready(Err(e)),
608 }
609 }
610 }
611}