h2/
client.rs

1//! Client implementation of the HTTP/2 protocol.
2//!
3//! # Getting started
4//!
5//! Running an HTTP/2 client requires the caller to establish the underlying
6//! connection as well as get the connection to a state that is ready to begin
7//! the HTTP/2 handshake. See [here](../index.html#handshake) for more
8//! details.
9//!
10//! This could be as basic as using Tokio's [`TcpStream`] to connect to a remote
11//! host, but usually it means using either ALPN or HTTP/1.1 protocol upgrades.
12//!
13//! Once a connection is obtained, it is passed to [`handshake`], which will
14//! begin the [HTTP/2 handshake]. This returns a future that completes once
15//! the handshake process is performed and HTTP/2 streams may be initialized.
16//!
17//! [`handshake`] uses default configuration values. There are a number of
18//! settings that can be changed by using [`Builder`] instead.
19//!
20//! Once the handshake future completes, the caller is provided with a
21//! [`Connection`] instance and a [`SendRequest`] instance. The [`Connection`]
22//! instance is used to drive the connection (see [Managing the connection]).
23//! The [`SendRequest`] instance is used to initialize new streams (see [Making
24//! requests]).
25//!
26//! # Making requests
27//!
28//! Requests are made using the [`SendRequest`] handle provided by the handshake
29//! future. Once a request is submitted, an HTTP/2 stream is initialized and
30//! the request is sent to the server.
31//!
32//! A request body and request trailers are sent using [`SendRequest`] and the
33//! server's response is returned once the [`ResponseFuture`] future completes.
34//! Both the [`SendStream`] and [`ResponseFuture`] instances are returned by
35//! [`SendRequest::send_request`] and are tied to the HTTP/2 stream
36//! initialized by the sent request.
37//!
38//! The [`SendRequest::poll_ready`] function returns `Ready` when a new HTTP/2
39//! stream can be created, i.e. as long as the current number of active streams
40//! is below [`MAX_CONCURRENT_STREAMS`]. If a new stream cannot be created, the
41//! caller will be notified once an existing stream closes, freeing capacity for
42//! the caller.  The caller should use [`SendRequest::poll_ready`] to check for
43//! capacity before sending a request to the server.
44//!
45//! [`SendRequest`] enforces the [`MAX_CONCURRENT_STREAMS`] setting. The user
46//! must not send a request if `poll_ready` does not return `Ready`. Attempting
47//! to do so will result in an [`Error`] being returned.
48//!
49//! # Managing the connection
50//!
51//! The [`Connection`] instance is used to manage connection state. The caller
52//! is required to call [`Connection::poll`] in order to advance state.
53//! [`SendRequest::send_request`] and other functions have no effect unless
54//! [`Connection::poll`] is called.
55//!
56//! The [`Connection`] instance should only be dropped once [`Connection::poll`]
57//! returns `Ready`. At this point, the underlying socket has been closed and no
58//! further work needs to be done.
59//!
60//! The easiest way to ensure that the [`Connection`] instance gets polled is to
61//! submit the [`Connection`] instance to an [executor]. The executor will then
62//! manage polling the connection until the connection is complete.
63//! Alternatively, the caller can call `poll` manually.
64//!
65//! # Example
66//!
67//! ```rust, no_run
68//!
69//! use h2::client;
70//!
71//! use http::{Request, Method};
72//! use std::error::Error;
73//! use tokio::net::TcpStream;
74//!
75//! #[tokio::main]
76//! pub async fn main() -> Result<(), Box<dyn Error>> {
77//!     // Establish TCP connection to the server.
78//!     let tcp = TcpStream::connect("127.0.0.1:5928").await?;
79//!     let (h2, connection) = client::handshake(tcp).await?;
80//!     tokio::spawn(async move {
81//!         connection.await.unwrap();
82//!     });
83//!
84//!     let mut h2 = h2.ready().await?;
85//!     // Prepare the HTTP request to send to the server.
86//!     let request = Request::builder()
87//!                     .method(Method::GET)
88//!                     .uri("https://www.example.com/")
89//!                     .body(())
90//!                     .unwrap();
91//!
92//!     // Send the request. The second tuple item allows the caller
93//!     // to stream a request body.
94//!     let (response, _) = h2.send_request(request, true).unwrap();
95//!
96//!     let (head, mut body) = response.await?.into_parts();
97//!
98//!     println!("Received response: {:?}", head);
99//!
100//!     // The `flow_control` handle allows the caller to manage
101//!     // flow control.
102//!     //
103//!     // Whenever data is received, the caller is responsible for
104//!     // releasing capacity back to the server once it has freed
105//!     // the data from memory.
106//!     let mut flow_control = body.flow_control().clone();
107//!
108//!     while let Some(chunk) = body.data().await {
109//!         let chunk = chunk?;
110//!         println!("RX: {:?}", chunk);
111//!
112//!         // Let the server send more data.
113//!         let _ = flow_control.release_capacity(chunk.len());
114//!     }
115//!
116//!     Ok(())
117//! }
118//! ```
119//!
120//! [`TcpStream`]: https://docs.rs/tokio-core/0.1/tokio_core/net/struct.TcpStream.html
121//! [`handshake`]: fn.handshake.html
122//! [executor]: https://docs.rs/futures/0.1/futures/future/trait.Executor.html
123//! [`SendRequest`]: struct.SendRequest.html
124//! [`SendStream`]: ../struct.SendStream.html
125//! [Making requests]: #making-requests
126//! [Managing the connection]: #managing-the-connection
127//! [`Connection`]: struct.Connection.html
128//! [`Connection::poll`]: struct.Connection.html#method.poll
129//! [`SendRequest::send_request`]: struct.SendRequest.html#method.send_request
130//! [`MAX_CONCURRENT_STREAMS`]: http://httpwg.org/specs/rfc7540.html#SettingValues
131//! [`SendRequest`]: struct.SendRequest.html
132//! [`ResponseFuture`]: struct.ResponseFuture.html
133//! [`SendRequest::poll_ready`]: struct.SendRequest.html#method.poll_ready
134//! [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
135//! [`Builder`]: struct.Builder.html
136//! [`Error`]: ../struct.Error.html
137
138use crate::codec::{Codec, SendError, UserError};
139use crate::ext::Protocol;
140use crate::frame::{Headers, Pseudo, Reason, Settings, StreamId};
141use crate::proto::{self, Error};
142use crate::{FlowControl, PingPong, RecvStream, SendStream};
143
144use bytes::{Buf, Bytes};
145use http::{uri, HeaderMap, Method, Request, Response, Version};
146use std::fmt;
147use std::future::Future;
148use std::pin::Pin;
149use std::task::{Context, Poll};
150use std::time::Duration;
151use std::usize;
152use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt};
153use tracing::Instrument;
154
155/// Initializes new HTTP/2 streams on a connection by sending a request.
156///
157/// This type does no work itself. Instead, it is a handle to the inner
158/// connection state held by [`Connection`]. If the associated connection
159/// instance is dropped, all `SendRequest` functions will return [`Error`].
160///
161/// [`SendRequest`] instances are able to move to and operate on separate tasks
162/// / threads than their associated [`Connection`] instance. Internally, there
163/// is a buffer used to stage requests before they get written to the
164/// connection. There is no guarantee that requests get written to the
165/// connection in FIFO order as HTTP/2 prioritization logic can play a role.
166///
167/// [`SendRequest`] implements [`Clone`], enabling the creation of many
168/// instances that are backed by a single connection.
169///
170/// See [module] level documentation for more details.
171///
172/// [module]: index.html
173/// [`Connection`]: struct.Connection.html
174/// [`Clone`]: https://doc.rust-lang.org/std/clone/trait.Clone.html
175/// [`Error`]: ../struct.Error.html
176pub struct SendRequest<B: Buf> {
177    inner: proto::Streams<B, Peer>,
178    pending: Option<proto::OpaqueStreamRef>,
179}
180
181/// Returns a `SendRequest` instance once it is ready to send at least one
182/// request.
183#[derive(Debug)]
184pub struct ReadySendRequest<B: Buf> {
185    inner: Option<SendRequest<B>>,
186}
187
188/// Manages all state associated with an HTTP/2 client connection.
189///
190/// A `Connection` is backed by an I/O resource (usually a TCP socket) and
191/// implements the HTTP/2 client logic for that connection. It is responsible
192/// for driving the internal state forward, performing the work requested of the
193/// associated handles ([`SendRequest`], [`ResponseFuture`], [`SendStream`],
194/// [`RecvStream`]).
195///
196/// `Connection` values are created by calling [`handshake`]. Once a
197/// `Connection` value is obtained, the caller must repeatedly call [`poll`]
198/// until `Ready` is returned. The easiest way to do this is to submit the
199/// `Connection` instance to an [executor].
200///
201/// [module]: index.html
202/// [`handshake`]: fn.handshake.html
203/// [`SendRequest`]: struct.SendRequest.html
204/// [`ResponseFuture`]: struct.ResponseFuture.html
205/// [`SendStream`]: ../struct.SendStream.html
206/// [`RecvStream`]: ../struct.RecvStream.html
207/// [`poll`]: #method.poll
208/// [executor]: https://docs.rs/futures/0.1/futures/future/trait.Executor.html
209///
210/// # Examples
211///
212/// ```
213/// # use tokio::io::{AsyncRead, AsyncWrite};
214/// # use h2::client;
215/// # use h2::client::*;
216/// #
217/// # async fn doc<T>(my_io: T) -> Result<(), h2::Error>
218/// # where T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
219/// # {
220///     let (send_request, connection) = client::handshake(my_io).await?;
221///     // Submit the connection handle to an executor.
222///     tokio::spawn(async { connection.await.expect("connection failed"); });
223///
224///     // Now, use `send_request` to initialize HTTP/2 streams.
225///     // ...
226/// # Ok(())
227/// # }
228/// #
229/// # pub fn main() {}
230/// ```
231#[must_use = "futures do nothing unless polled"]
232pub struct Connection<T, B: Buf = Bytes> {
233    inner: proto::Connection<T, Peer, B>,
234}
235
236/// A future of an HTTP response.
237#[derive(Debug)]
238#[must_use = "futures do nothing unless polled"]
239pub struct ResponseFuture {
240    inner: proto::OpaqueStreamRef,
241    push_promise_consumed: bool,
242}
243
244/// A future of a pushed HTTP response.
245///
246/// We have to differentiate between pushed and non pushed because of the spec
247/// <https://httpwg.org/specs/rfc7540.html#PUSH_PROMISE>
248/// > PUSH_PROMISE frames MUST only be sent on a peer-initiated stream
249/// > that is in either the "open" or "half-closed (remote)" state.
250#[derive(Debug)]
251#[must_use = "futures do nothing unless polled"]
252pub struct PushedResponseFuture {
253    inner: ResponseFuture,
254}
255
256/// A pushed response and corresponding request headers
257#[derive(Debug)]
258pub struct PushPromise {
259    /// The request headers
260    request: Request<()>,
261
262    /// The pushed response
263    response: PushedResponseFuture,
264}
265
266/// A stream of pushed responses and corresponding promised requests
267#[derive(Debug)]
268#[must_use = "streams do nothing unless polled"]
269pub struct PushPromises {
270    inner: proto::OpaqueStreamRef,
271}
272
273/// Builds client connections with custom configuration values.
274///
275/// Methods can be chained in order to set the configuration values.
276///
277/// The client is constructed by calling [`handshake`] and passing the I/O
278/// handle that will back the HTTP/2 server.
279///
280/// New instances of `Builder` are obtained via [`Builder::new`].
281///
282/// See function level documentation for details on the various client
283/// configuration settings.
284///
285/// [`Builder::new`]: struct.Builder.html#method.new
286/// [`handshake`]: struct.Builder.html#method.handshake
287///
288/// # Examples
289///
290/// ```
291/// # use tokio::io::{AsyncRead, AsyncWrite};
292/// # use h2::client::*;
293/// # use bytes::Bytes;
294/// #
295/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
296///     -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
297/// # {
298/// // `client_fut` is a future representing the completion of the HTTP/2
299/// // handshake.
300/// let client_fut = Builder::new()
301///     .initial_window_size(1_000_000)
302///     .max_concurrent_streams(1000)
303///     .handshake(my_io);
304/// # client_fut.await
305/// # }
306/// #
307/// # pub fn main() {}
308/// ```
309#[derive(Clone, Debug)]
310pub struct Builder {
311    /// Time to keep locally reset streams around before reaping.
312    reset_stream_duration: Duration,
313
314    /// Initial maximum number of locally initiated (send) streams.
315    /// After receiving a SETTINGS frame from the remote peer,
316    /// the connection will overwrite this value with the
317    /// MAX_CONCURRENT_STREAMS specified in the frame.
318    /// If no value is advertised by the remote peer in the initial SETTINGS
319    /// frame, it will be set to usize::MAX.
320    initial_max_send_streams: usize,
321
322    /// Initial target window size for new connections.
323    initial_target_connection_window_size: Option<u32>,
324
325    /// Maximum amount of bytes to "buffer" for writing per stream.
326    max_send_buffer_size: usize,
327
328    /// Maximum number of locally reset streams to keep at a time.
329    reset_stream_max: usize,
330
331    /// Maximum number of remotely reset streams to allow in the pending
332    /// accept queue.
333    pending_accept_reset_stream_max: usize,
334
335    /// Initial `Settings` frame to send as part of the handshake.
336    settings: Settings,
337
338    /// The stream ID of the first (lowest) stream. Subsequent streams will use
339    /// monotonically increasing stream IDs.
340    stream_id: StreamId,
341
342    /// Maximum number of locally reset streams due to protocol error across
343    /// the lifetime of the connection.
344    ///
345    /// When this gets exceeded, we issue GOAWAYs.
346    local_max_error_reset_streams: Option<usize>,
347}
348
349#[derive(Debug)]
350pub(crate) struct Peer;
351
352// ===== impl SendRequest =====
353
354impl<B> SendRequest<B>
355where
356    B: Buf,
357{
358    /// Returns `Ready` when the connection can initialize a new HTTP/2
359    /// stream.
360    ///
361    /// This function must return `Ready` before `send_request` is called. When
362    /// `Poll::Pending` is returned, the task will be notified once the readiness
363    /// state changes.
364    ///
365    /// See [module] level docs for more details.
366    ///
367    /// [module]: index.html
368    pub fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), crate::Error>> {
369        ready!(self.inner.poll_pending_open(cx, self.pending.as_ref()))?;
370        self.pending = None;
371        Poll::Ready(Ok(()))
372    }
373
374    /// Consumes `self`, returning a future that returns `self` back once it is
375    /// ready to send a request.
376    ///
377    /// This function should be called before calling `send_request`.
378    ///
379    /// This is a functional combinator for [`poll_ready`]. The returned future
380    /// will call `SendStream::poll_ready` until `Ready`, then returns `self` to
381    /// the caller.
382    ///
383    /// # Examples
384    ///
385    /// ```rust
386    /// # use h2::client::*;
387    /// # use http::*;
388    /// # async fn doc(send_request: SendRequest<&'static [u8]>)
389    /// # {
390    /// // First, wait until the `send_request` handle is ready to send a new
391    /// // request
392    /// let mut send_request = send_request.ready().await.unwrap();
393    /// // Use `send_request` here.
394    /// # }
395    /// # pub fn main() {}
396    /// ```
397    ///
398    /// See [module] level docs for more details.
399    ///
400    /// [`poll_ready`]: #method.poll_ready
401    /// [module]: index.html
402    pub fn ready(self) -> ReadySendRequest<B> {
403        ReadySendRequest { inner: Some(self) }
404    }
405
406    /// Sends a HTTP/2 request to the server.
407    ///
408    /// `send_request` initializes a new HTTP/2 stream on the associated
409    /// connection, then sends the given request using this new stream. Only the
410    /// request head is sent.
411    ///
412    /// On success, a [`ResponseFuture`] instance and [`SendStream`] instance
413    /// are returned. The [`ResponseFuture`] instance is used to get the
414    /// server's response and the [`SendStream`] instance is used to send a
415    /// request body or trailers to the server over the same HTTP/2 stream.
416    ///
417    /// To send a request body or trailers, set `end_of_stream` to `false`.
418    /// Then, use the returned [`SendStream`] instance to stream request body
419    /// chunks or send trailers. If `end_of_stream` is **not** set to `false`
420    /// then attempting to call [`SendStream::send_data`] or
421    /// [`SendStream::send_trailers`] will result in an error.
422    ///
423    /// If no request body or trailers are to be sent, set `end_of_stream` to
424    /// `true` and drop the returned [`SendStream`] instance.
425    ///
426    /// # A note on HTTP versions
427    ///
428    /// The provided `Request` will be encoded differently depending on the
429    /// value of its version field. If the version is set to 2.0, then the
430    /// request is encoded as per the specification recommends.
431    ///
432    /// If the version is set to a lower value, then the request is encoded to
433    /// preserve the characteristics of HTTP 1.1 and lower. Specifically, host
434    /// headers are permitted and the `:authority` pseudo header is not
435    /// included.
436    ///
437    /// The caller should always set the request's version field to 2.0 unless
438    /// specifically transmitting an HTTP 1.1 request over 2.0.
439    ///
440    /// # Examples
441    ///
442    /// Sending a request with no body
443    ///
444    /// ```rust
445    /// # use h2::client::*;
446    /// # use http::*;
447    /// # async fn doc(send_request: SendRequest<&'static [u8]>)
448    /// # {
449    /// // First, wait until the `send_request` handle is ready to send a new
450    /// // request
451    /// let mut send_request = send_request.ready().await.unwrap();
452    /// // Prepare the HTTP request to send to the server.
453    /// let request = Request::get("https://www.example.com/")
454    ///     .body(())
455    ///     .unwrap();
456    ///
457    /// // Send the request to the server. Since we are not sending a
458    /// // body or trailers, we can drop the `SendStream` instance.
459    /// let (response, _) = send_request.send_request(request, true).unwrap();
460    /// let response = response.await.unwrap();
461    /// // Process the response
462    /// # }
463    /// # pub fn main() {}
464    /// ```
465    ///
466    /// Sending a request with a body and trailers
467    ///
468    /// ```rust
469    /// # use h2::client::*;
470    /// # use http::*;
471    /// # async fn doc(send_request: SendRequest<&'static [u8]>)
472    /// # {
473    /// // First, wait until the `send_request` handle is ready to send a new
474    /// // request
475    /// let mut send_request = send_request.ready().await.unwrap();
476    ///
477    /// // Prepare the HTTP request to send to the server.
478    /// let request = Request::get("https://www.example.com/")
479    ///     .body(())
480    ///     .unwrap();
481    ///
482    /// // Send the request to the server. If we are not sending a
483    /// // body or trailers, we can drop the `SendStream` instance.
484    /// let (response, mut send_stream) = send_request
485    ///     .send_request(request, false).unwrap();
486    ///
487    /// // At this point, one option would be to wait for send capacity.
488    /// // Doing so would allow us to not hold data in memory that
489    /// // cannot be sent. However, this is not a requirement, so this
490    /// // example will skip that step. See `SendStream` documentation
491    /// // for more details.
492    /// send_stream.send_data(b"hello", false).unwrap();
493    /// send_stream.send_data(b"world", false).unwrap();
494    ///
495    /// // Send the trailers.
496    /// let mut trailers = HeaderMap::new();
497    /// trailers.insert(
498    ///     header::HeaderName::from_bytes(b"my-trailer").unwrap(),
499    ///     header::HeaderValue::from_bytes(b"hello").unwrap());
500    ///
501    /// send_stream.send_trailers(trailers).unwrap();
502    ///
503    /// let response = response.await.unwrap();
504    /// // Process the response
505    /// # }
506    /// # pub fn main() {}
507    /// ```
508    ///
509    /// [`ResponseFuture`]: struct.ResponseFuture.html
510    /// [`SendStream`]: ../struct.SendStream.html
511    /// [`SendStream::send_data`]: ../struct.SendStream.html#method.send_data
512    /// [`SendStream::send_trailers`]: ../struct.SendStream.html#method.send_trailers
513    pub fn send_request(
514        &mut self,
515        request: Request<()>,
516        end_of_stream: bool,
517    ) -> Result<(ResponseFuture, SendStream<B>), crate::Error> {
518        self.inner
519            .send_request(request, end_of_stream, self.pending.as_ref())
520            .map_err(Into::into)
521            .map(|(stream, is_full)| {
522                if stream.is_pending_open() && is_full {
523                    // Only prevent sending another request when the request queue
524                    // is not full.
525                    self.pending = Some(stream.clone_to_opaque());
526                }
527
528                let response = ResponseFuture {
529                    inner: stream.clone_to_opaque(),
530                    push_promise_consumed: false,
531                };
532
533                let stream = SendStream::new(stream);
534
535                (response, stream)
536            })
537    }
538
539    /// Returns whether the [extended CONNECT protocol][1] is enabled or not.
540    ///
541    /// This setting is configured by the server peer by sending the
542    /// [`SETTINGS_ENABLE_CONNECT_PROTOCOL` parameter][2] in a `SETTINGS` frame.
543    /// This method returns the currently acknowledged value received from the
544    /// remote.
545    ///
546    /// [1]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
547    /// [2]: https://datatracker.ietf.org/doc/html/rfc8441#section-3
548    pub fn is_extended_connect_protocol_enabled(&self) -> bool {
549        self.inner.is_extended_connect_protocol_enabled()
550    }
551}
552
553impl<B> fmt::Debug for SendRequest<B>
554where
555    B: Buf,
556{
557    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
558        fmt.debug_struct("SendRequest").finish()
559    }
560}
561
562impl<B> Clone for SendRequest<B>
563where
564    B: Buf,
565{
566    fn clone(&self) -> Self {
567        SendRequest {
568            inner: self.inner.clone(),
569            pending: None,
570        }
571    }
572}
573
574#[cfg(feature = "unstable")]
575impl<B> SendRequest<B>
576where
577    B: Buf,
578{
579    /// Returns the number of active streams.
580    ///
581    /// An active stream is a stream that has not yet transitioned to a closed
582    /// state.
583    pub fn num_active_streams(&self) -> usize {
584        self.inner.num_active_streams()
585    }
586
587    /// Returns the number of streams that are held in memory.
588    ///
589    /// A wired stream is a stream that is either active or is closed but must
590    /// stay in memory for some reason. For example, there are still outstanding
591    /// userspace handles pointing to the slot.
592    pub fn num_wired_streams(&self) -> usize {
593        self.inner.num_wired_streams()
594    }
595}
596
597// ===== impl ReadySendRequest =====
598
599impl<B> Future for ReadySendRequest<B>
600where
601    B: Buf,
602{
603    type Output = Result<SendRequest<B>, crate::Error>;
604
605    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
606        match &mut self.inner {
607            Some(send_request) => {
608                ready!(send_request.poll_ready(cx))?;
609            }
610            None => panic!("called `poll` after future completed"),
611        }
612
613        Poll::Ready(Ok(self.inner.take().unwrap()))
614    }
615}
616
617// ===== impl Builder =====
618
619impl Builder {
620    /// Returns a new client builder instance initialized with default
621    /// configuration values.
622    ///
623    /// Configuration methods can be chained on the return value.
624    ///
625    /// # Examples
626    ///
627    /// ```
628    /// # use tokio::io::{AsyncRead, AsyncWrite};
629    /// # use h2::client::*;
630    /// # use bytes::Bytes;
631    /// #
632    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
633    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
634    /// # {
635    /// // `client_fut` is a future representing the completion of the HTTP/2
636    /// // handshake.
637    /// let client_fut = Builder::new()
638    ///     .initial_window_size(1_000_000)
639    ///     .max_concurrent_streams(1000)
640    ///     .handshake(my_io);
641    /// # client_fut.await
642    /// # }
643    /// #
644    /// # pub fn main() {}
645    /// ```
646    pub fn new() -> Builder {
647        Builder {
648            max_send_buffer_size: proto::DEFAULT_MAX_SEND_BUFFER_SIZE,
649            reset_stream_duration: Duration::from_secs(proto::DEFAULT_RESET_STREAM_SECS),
650            reset_stream_max: proto::DEFAULT_RESET_STREAM_MAX,
651            pending_accept_reset_stream_max: proto::DEFAULT_REMOTE_RESET_STREAM_MAX,
652            initial_target_connection_window_size: None,
653            initial_max_send_streams: usize::MAX,
654            settings: Default::default(),
655            stream_id: 1.into(),
656            local_max_error_reset_streams: Some(proto::DEFAULT_LOCAL_RESET_COUNT_MAX),
657        }
658    }
659
660    /// Indicates the initial window size (in octets) for stream-level
661    /// flow control for received data.
662    ///
663    /// The initial window of a stream is used as part of flow control. For more
664    /// details, see [`FlowControl`].
665    ///
666    /// The default value is 65,535.
667    ///
668    /// [`FlowControl`]: ../struct.FlowControl.html
669    ///
670    /// # Examples
671    ///
672    /// ```
673    /// # use tokio::io::{AsyncRead, AsyncWrite};
674    /// # use h2::client::*;
675    /// # use bytes::Bytes;
676    /// #
677    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
678    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
679    /// # {
680    /// // `client_fut` is a future representing the completion of the HTTP/2
681    /// // handshake.
682    /// let client_fut = Builder::new()
683    ///     .initial_window_size(1_000_000)
684    ///     .handshake(my_io);
685    /// # client_fut.await
686    /// # }
687    /// #
688    /// # pub fn main() {}
689    /// ```
690    pub fn initial_window_size(&mut self, size: u32) -> &mut Self {
691        self.settings.set_initial_window_size(Some(size));
692        self
693    }
694
695    /// Indicates the initial window size (in octets) for connection-level flow control
696    /// for received data.
697    ///
698    /// The initial window of a connection is used as part of flow control. For more details,
699    /// see [`FlowControl`].
700    ///
701    /// The default value is 65,535.
702    ///
703    /// [`FlowControl`]: ../struct.FlowControl.html
704    ///
705    /// # Examples
706    ///
707    /// ```
708    /// # use tokio::io::{AsyncRead, AsyncWrite};
709    /// # use h2::client::*;
710    /// # use bytes::Bytes;
711    /// #
712    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
713    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
714    /// # {
715    /// // `client_fut` is a future representing the completion of the HTTP/2
716    /// // handshake.
717    /// let client_fut = Builder::new()
718    ///     .initial_connection_window_size(1_000_000)
719    ///     .handshake(my_io);
720    /// # client_fut.await
721    /// # }
722    /// #
723    /// # pub fn main() {}
724    /// ```
725    pub fn initial_connection_window_size(&mut self, size: u32) -> &mut Self {
726        self.initial_target_connection_window_size = Some(size);
727        self
728    }
729
730    /// Indicates the size (in octets) of the largest HTTP/2 frame payload that the
731    /// configured client is able to accept.
732    ///
733    /// The sender may send data frames that are **smaller** than this value,
734    /// but any data larger than `max` will be broken up into multiple `DATA`
735    /// frames.
736    ///
737    /// The value **must** be between 16,384 and 16,777,215. The default value is 16,384.
738    ///
739    /// # Examples
740    ///
741    /// ```
742    /// # use tokio::io::{AsyncRead, AsyncWrite};
743    /// # use h2::client::*;
744    /// # use bytes::Bytes;
745    /// #
746    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
747    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
748    /// # {
749    /// // `client_fut` is a future representing the completion of the HTTP/2
750    /// // handshake.
751    /// let client_fut = Builder::new()
752    ///     .max_frame_size(1_000_000)
753    ///     .handshake(my_io);
754    /// # client_fut.await
755    /// # }
756    /// #
757    /// # pub fn main() {}
758    /// ```
759    ///
760    /// # Panics
761    ///
762    /// This function panics if `max` is not within the legal range specified
763    /// above.
764    pub fn max_frame_size(&mut self, max: u32) -> &mut Self {
765        self.settings.set_max_frame_size(Some(max));
766        self
767    }
768
769    /// Sets the max size of received header frames.
770    ///
771    /// This advisory setting informs a peer of the maximum size of header list
772    /// that the sender is prepared to accept, in octets. The value is based on
773    /// the uncompressed size of header fields, including the length of the name
774    /// and value in octets plus an overhead of 32 octets for each header field.
775    ///
776    /// This setting is also used to limit the maximum amount of data that is
777    /// buffered to decode HEADERS frames.
778    ///
779    /// # Examples
780    ///
781    /// ```
782    /// # use tokio::io::{AsyncRead, AsyncWrite};
783    /// # use h2::client::*;
784    /// # use bytes::Bytes;
785    /// #
786    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
787    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
788    /// # {
789    /// // `client_fut` is a future representing the completion of the HTTP/2
790    /// // handshake.
791    /// let client_fut = Builder::new()
792    ///     .max_header_list_size(16 * 1024)
793    ///     .handshake(my_io);
794    /// # client_fut.await
795    /// # }
796    /// #
797    /// # pub fn main() {}
798    /// ```
799    pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
800        self.settings.set_max_header_list_size(Some(max));
801        self
802    }
803
804    /// Sets the maximum number of concurrent streams.
805    ///
806    /// The maximum concurrent streams setting only controls the maximum number
807    /// of streams that can be initiated by the remote peer. In other words,
808    /// when this setting is set to 100, this does not limit the number of
809    /// concurrent streams that can be created by the caller.
810    ///
811    /// It is recommended that this value be no smaller than 100, so as to not
812    /// unnecessarily limit parallelism. However, any value is legal, including
813    /// 0. If `max` is set to 0, then the remote will not be permitted to
814    /// initiate streams.
815    ///
816    /// Note that streams in the reserved state, i.e., push promises that have
817    /// been reserved but the stream has not started, do not count against this
818    /// setting.
819    ///
820    /// Also note that if the remote *does* exceed the value set here, it is not
821    /// a protocol level error. Instead, the `h2` library will immediately reset
822    /// the stream.
823    ///
824    /// See [Section 5.1.2] in the HTTP/2 spec for more details.
825    ///
826    /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
827    ///
828    /// # Examples
829    ///
830    /// ```
831    /// # use tokio::io::{AsyncRead, AsyncWrite};
832    /// # use h2::client::*;
833    /// # use bytes::Bytes;
834    /// #
835    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
836    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
837    /// # {
838    /// // `client_fut` is a future representing the completion of the HTTP/2
839    /// // handshake.
840    /// let client_fut = Builder::new()
841    ///     .max_concurrent_streams(1000)
842    ///     .handshake(my_io);
843    /// # client_fut.await
844    /// # }
845    /// #
846    /// # pub fn main() {}
847    /// ```
848    pub fn max_concurrent_streams(&mut self, max: u32) -> &mut Self {
849        self.settings.set_max_concurrent_streams(Some(max));
850        self
851    }
852
853    /// Sets the initial maximum of locally initiated (send) streams.
854    ///
855    /// The initial settings will be overwritten by the remote peer when
856    /// the SETTINGS frame is received. The new value will be set to the
857    /// `max_concurrent_streams()` from the frame. If no value is advertised in
858    /// the initial SETTINGS frame from the remote peer as part of
859    /// [HTTP/2 Connection Preface], `usize::MAX` will be set.
860    ///
861    /// This setting prevents the caller from exceeding this number of
862    /// streams that are counted towards the concurrency limit.
863    ///
864    /// Sending streams past the limit returned by the peer will be treated
865    /// as a stream error of type PROTOCOL_ERROR or REFUSED_STREAM.
866    ///
867    /// See [Section 5.1.2] in the HTTP/2 spec for more details.
868    ///
869    /// The default value is `usize::MAX`.
870    ///
871    /// [HTTP/2 Connection Preface]: https://httpwg.org/specs/rfc9113.html#preface
872    /// [Section 5.1.2]: https://httpwg.org/specs/rfc9113.html#rfc.section.5.1.2
873    ///
874    /// # Examples
875    ///
876    /// ```
877    /// # use tokio::io::{AsyncRead, AsyncWrite};
878    /// # use h2::client::*;
879    /// # use bytes::Bytes;
880    /// #
881    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
882    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
883    /// # {
884    /// // `client_fut` is a future representing the completion of the HTTP/2
885    /// // handshake.
886    /// let client_fut = Builder::new()
887    ///     .initial_max_send_streams(1000)
888    ///     .handshake(my_io);
889    /// # client_fut.await
890    /// # }
891    /// #
892    /// # pub fn main() {}
893    /// ```
894    pub fn initial_max_send_streams(&mut self, initial: usize) -> &mut Self {
895        self.initial_max_send_streams = initial;
896        self
897    }
898
899    /// Sets the maximum number of concurrent locally reset streams.
900    ///
901    /// When a stream is explicitly reset, the HTTP/2 specification requires
902    /// that any further frames received for that stream must be ignored for
903    /// "some time".
904    ///
905    /// In order to satisfy the specification, internal state must be maintained
906    /// to implement the behavior. This state grows linearly with the number of
907    /// streams that are locally reset.
908    ///
909    /// The `max_concurrent_reset_streams` setting configures sets an upper
910    /// bound on the amount of state that is maintained. When this max value is
911    /// reached, the oldest reset stream is purged from memory.
912    ///
913    /// Once the stream has been fully purged from memory, any additional frames
914    /// received for that stream will result in a connection level protocol
915    /// error, forcing the connection to terminate.
916    ///
917    /// The default value is 10.
918    ///
919    /// # Examples
920    ///
921    /// ```
922    /// # use tokio::io::{AsyncRead, AsyncWrite};
923    /// # use h2::client::*;
924    /// # use bytes::Bytes;
925    /// #
926    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
927    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
928    /// # {
929    /// // `client_fut` is a future representing the completion of the HTTP/2
930    /// // handshake.
931    /// let client_fut = Builder::new()
932    ///     .max_concurrent_reset_streams(1000)
933    ///     .handshake(my_io);
934    /// # client_fut.await
935    /// # }
936    /// #
937    /// # pub fn main() {}
938    /// ```
939    pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
940        self.reset_stream_max = max;
941        self
942    }
943
944    /// Sets the duration to remember locally reset streams.
945    ///
946    /// When a stream is explicitly reset, the HTTP/2 specification requires
947    /// that any further frames received for that stream must be ignored for
948    /// "some time".
949    ///
950    /// In order to satisfy the specification, internal state must be maintained
951    /// to implement the behavior. This state grows linearly with the number of
952    /// streams that are locally reset.
953    ///
954    /// The `reset_stream_duration` setting configures the max amount of time
955    /// this state will be maintained in memory. Once the duration elapses, the
956    /// stream state is purged from memory.
957    ///
958    /// Once the stream has been fully purged from memory, any additional frames
959    /// received for that stream will result in a connection level protocol
960    /// error, forcing the connection to terminate.
961    ///
962    /// The default value is 30 seconds.
963    ///
964    /// # Examples
965    ///
966    /// ```
967    /// # use tokio::io::{AsyncRead, AsyncWrite};
968    /// # use h2::client::*;
969    /// # use std::time::Duration;
970    /// # use bytes::Bytes;
971    /// #
972    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
973    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
974    /// # {
975    /// // `client_fut` is a future representing the completion of the HTTP/2
976    /// // handshake.
977    /// let client_fut = Builder::new()
978    ///     .reset_stream_duration(Duration::from_secs(10))
979    ///     .handshake(my_io);
980    /// # client_fut.await
981    /// # }
982    /// #
983    /// # pub fn main() {}
984    /// ```
985    pub fn reset_stream_duration(&mut self, dur: Duration) -> &mut Self {
986        self.reset_stream_duration = dur;
987        self
988    }
989
990    /// Sets the maximum number of local resets due to protocol errors made by the remote end.
991    ///
992    /// Invalid frames and many other protocol errors will lead to resets being generated for those streams.
993    /// Too many of these often indicate a malicious client, and there are attacks which can abuse this to DOS servers.
994    /// This limit protects against these DOS attacks by limiting the amount of resets we can be forced to generate.
995    ///
996    /// When the number of local resets exceeds this threshold, the client will close the connection.
997    ///
998    /// If you really want to disable this, supply [`Option::None`] here.
999    /// Disabling this is not recommended and may expose you to DOS attacks.
1000    ///
1001    /// The default value is currently 1024, but could change.
1002    pub fn max_local_error_reset_streams(&mut self, max: Option<usize>) -> &mut Self {
1003        self.local_max_error_reset_streams = max;
1004        self
1005    }
1006
1007    /// Sets the maximum number of pending-accept remotely-reset streams.
1008    ///
1009    /// Streams that have been received by the peer, but not accepted by the
1010    /// user, can also receive a RST_STREAM. This is a legitimate pattern: one
1011    /// could send a request and then shortly after, realize it is not needed,
1012    /// sending a CANCEL.
1013    ///
1014    /// However, since those streams are now "closed", they don't count towards
1015    /// the max concurrent streams. So, they will sit in the accept queue,
1016    /// using memory.
1017    ///
1018    /// When the number of remotely-reset streams sitting in the pending-accept
1019    /// queue reaches this maximum value, a connection error with the code of
1020    /// `ENHANCE_YOUR_CALM` will be sent to the peer, and returned by the
1021    /// `Future`.
1022    ///
1023    /// The default value is currently 20, but could change.
1024    ///
1025    /// # Examples
1026    ///
1027    /// ```
1028    /// # use tokio::io::{AsyncRead, AsyncWrite};
1029    /// # use h2::client::*;
1030    /// # use bytes::Bytes;
1031    /// #
1032    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1033    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
1034    /// # {
1035    /// // `client_fut` is a future representing the completion of the HTTP/2
1036    /// // handshake.
1037    /// let client_fut = Builder::new()
1038    ///     .max_pending_accept_reset_streams(100)
1039    ///     .handshake(my_io);
1040    /// # client_fut.await
1041    /// # }
1042    /// #
1043    /// # pub fn main() {}
1044    /// ```
1045    pub fn max_pending_accept_reset_streams(&mut self, max: usize) -> &mut Self {
1046        self.pending_accept_reset_stream_max = max;
1047        self
1048    }
1049
1050    /// Sets the maximum send buffer size per stream.
1051    ///
1052    /// Once a stream has buffered up to (or over) the maximum, the stream's
1053    /// flow control will not "poll" additional capacity. Once bytes for the
1054    /// stream have been written to the connection, the send buffer capacity
1055    /// will be freed up again.
1056    ///
1057    /// The default is currently ~400KB, but may change.
1058    ///
1059    /// # Panics
1060    ///
1061    /// This function panics if `max` is larger than `u32::MAX`.
1062    pub fn max_send_buffer_size(&mut self, max: usize) -> &mut Self {
1063        assert!(max <= std::u32::MAX as usize);
1064        self.max_send_buffer_size = max;
1065        self
1066    }
1067
1068    /// Enables or disables server push promises.
1069    ///
1070    /// This value is included in the initial SETTINGS handshake.
1071    /// Setting this value to value to
1072    /// false in the initial SETTINGS handshake guarantees that the remote server
1073    /// will never send a push promise.
1074    ///
1075    /// This setting can be changed during the life of a single HTTP/2
1076    /// connection by sending another settings frame updating the value.
1077    ///
1078    /// Default value: `true`.
1079    ///
1080    /// # Examples
1081    ///
1082    /// ```
1083    /// # use tokio::io::{AsyncRead, AsyncWrite};
1084    /// # use h2::client::*;
1085    /// # use std::time::Duration;
1086    /// # use bytes::Bytes;
1087    /// #
1088    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1089    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
1090    /// # {
1091    /// // `client_fut` is a future representing the completion of the HTTP/2
1092    /// // handshake.
1093    /// let client_fut = Builder::new()
1094    ///     .enable_push(false)
1095    ///     .handshake(my_io);
1096    /// # client_fut.await
1097    /// # }
1098    /// #
1099    /// # pub fn main() {}
1100    /// ```
1101    pub fn enable_push(&mut self, enabled: bool) -> &mut Self {
1102        self.settings.set_enable_push(enabled);
1103        self
1104    }
1105
1106    /// Sets the header table size.
1107    ///
1108    /// This setting informs the peer of the maximum size of the header compression
1109    /// table used to encode header blocks, in octets. The encoder may select any value
1110    /// equal to or less than the header table size specified by the sender.
1111    ///
1112    /// The default value is 4,096.
1113    ///
1114    /// # Examples
1115    ///
1116    /// ```
1117    /// # use tokio::io::{AsyncRead, AsyncWrite};
1118    /// # use h2::client::*;
1119    /// # use bytes::Bytes;
1120    /// #
1121    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1122    /// # -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
1123    /// # {
1124    /// // `client_fut` is a future representing the completion of the HTTP/2
1125    /// // handshake.
1126    /// let client_fut = Builder::new()
1127    ///     .header_table_size(1_000_000)
1128    ///     .handshake(my_io);
1129    /// # client_fut.await
1130    /// # }
1131    /// #
1132    /// # pub fn main() {}
1133    /// ```
1134    pub fn header_table_size(&mut self, size: u32) -> &mut Self {
1135        self.settings.set_header_table_size(Some(size));
1136        self
1137    }
1138
1139    /// Sets the first stream ID to something other than 1.
1140    #[cfg(feature = "unstable")]
1141    pub fn initial_stream_id(&mut self, stream_id: u32) -> &mut Self {
1142        self.stream_id = stream_id.into();
1143        assert!(
1144            self.stream_id.is_client_initiated(),
1145            "stream id must be odd"
1146        );
1147        self
1148    }
1149
1150    /// Creates a new configured HTTP/2 client backed by `io`.
1151    ///
1152    /// It is expected that `io` already be in an appropriate state to commence
1153    /// the [HTTP/2 handshake]. The handshake is completed once both the connection
1154    /// preface and the initial settings frame is sent by the client.
1155    ///
1156    /// The handshake future does not wait for the initial settings frame from the
1157    /// server.
1158    ///
1159    /// Returns a future which resolves to the [`Connection`] / [`SendRequest`]
1160    /// tuple once the HTTP/2 handshake has been completed.
1161    ///
1162    /// This function also allows the caller to configure the send payload data
1163    /// type. See [Outbound data type] for more details.
1164    ///
1165    /// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1166    /// [`Connection`]: struct.Connection.html
1167    /// [`SendRequest`]: struct.SendRequest.html
1168    /// [Outbound data type]: ../index.html#outbound-data-type.
1169    ///
1170    /// # Examples
1171    ///
1172    /// Basic usage:
1173    ///
1174    /// ```
1175    /// # use tokio::io::{AsyncRead, AsyncWrite};
1176    /// # use h2::client::*;
1177    /// # use bytes::Bytes;
1178    /// #
1179    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1180    ///     -> Result<((SendRequest<Bytes>, Connection<T, Bytes>)), h2::Error>
1181    /// # {
1182    /// // `client_fut` is a future representing the completion of the HTTP/2
1183    /// // handshake.
1184    /// let client_fut = Builder::new()
1185    ///     .handshake(my_io);
1186    /// # client_fut.await
1187    /// # }
1188    /// #
1189    /// # pub fn main() {}
1190    /// ```
1191    ///
1192    /// Configures the send-payload data type. In this case, the outbound data
1193    /// type will be `&'static [u8]`.
1194    ///
1195    /// ```
1196    /// # use tokio::io::{AsyncRead, AsyncWrite};
1197    /// # use h2::client::*;
1198    /// #
1199    /// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T)
1200    /// # -> Result<((SendRequest<&'static [u8]>, Connection<T, &'static [u8]>)), h2::Error>
1201    /// # {
1202    /// // `client_fut` is a future representing the completion of the HTTP/2
1203    /// // handshake.
1204    /// let client_fut = Builder::new()
1205    ///     .handshake::<_, &'static [u8]>(my_io);
1206    /// # client_fut.await
1207    /// # }
1208    /// #
1209    /// # pub fn main() {}
1210    /// ```
1211    pub fn handshake<T, B>(
1212        &self,
1213        io: T,
1214    ) -> impl Future<Output = Result<(SendRequest<B>, Connection<T, B>), crate::Error>>
1215    where
1216        T: AsyncRead + AsyncWrite + Unpin,
1217        B: Buf,
1218    {
1219        Connection::handshake2(io, self.clone())
1220    }
1221}
1222
1223impl Default for Builder {
1224    fn default() -> Builder {
1225        Builder::new()
1226    }
1227}
1228
1229/// Creates a new configured HTTP/2 client with default configuration
1230/// values backed by `io`.
1231///
1232/// It is expected that `io` already be in an appropriate state to commence
1233/// the [HTTP/2 handshake]. See [Handshake] for more details.
1234///
1235/// Returns a future which resolves to the [`Connection`] / [`SendRequest`]
1236/// tuple once the HTTP/2 handshake has been completed. The returned
1237/// [`Connection`] instance will be using default configuration values. Use
1238/// [`Builder`] to customize the configuration values used by a [`Connection`]
1239/// instance.
1240///
1241/// [HTTP/2 handshake]: http://httpwg.org/specs/rfc7540.html#ConnectionHeader
1242/// [Handshake]: ../index.html#handshake
1243/// [`Connection`]: struct.Connection.html
1244/// [`SendRequest`]: struct.SendRequest.html
1245///
1246/// # Examples
1247///
1248/// ```
1249/// # use tokio::io::{AsyncRead, AsyncWrite};
1250/// # use h2::client;
1251/// # use h2::client::*;
1252/// #
1253/// # async fn doc<T: AsyncRead + AsyncWrite + Unpin>(my_io: T) -> Result<(), h2::Error>
1254/// # {
1255/// let (send_request, connection) = client::handshake(my_io).await?;
1256/// // The HTTP/2 handshake has completed, now start polling
1257/// // `connection` and use `send_request` to send requests to the
1258/// // server.
1259/// # Ok(())
1260/// # }
1261/// #
1262/// # pub fn main() {}
1263/// ```
1264pub async fn handshake<T>(io: T) -> Result<(SendRequest<Bytes>, Connection<T, Bytes>), crate::Error>
1265where
1266    T: AsyncRead + AsyncWrite + Unpin,
1267{
1268    let builder = Builder::new();
1269    builder
1270        .handshake(io)
1271        .instrument(tracing::trace_span!("client_handshake"))
1272        .await
1273}
1274
1275// ===== impl Connection =====
1276
1277async fn bind_connection<T>(io: &mut T) -> Result<(), crate::Error>
1278where
1279    T: AsyncRead + AsyncWrite + Unpin,
1280{
1281    tracing::debug!("binding client connection");
1282
1283    let msg: &'static [u8] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";
1284    io.write_all(msg).await.map_err(crate::Error::from_io)?;
1285
1286    tracing::debug!("client connection bound");
1287
1288    Ok(())
1289}
1290
1291impl<T, B> Connection<T, B>
1292where
1293    T: AsyncRead + AsyncWrite + Unpin,
1294    B: Buf,
1295{
1296    async fn handshake2(
1297        mut io: T,
1298        builder: Builder,
1299    ) -> Result<(SendRequest<B>, Connection<T, B>), crate::Error> {
1300        bind_connection(&mut io).await?;
1301
1302        // Create the codec
1303        let mut codec = Codec::new(io);
1304
1305        if let Some(max) = builder.settings.max_frame_size() {
1306            codec.set_max_recv_frame_size(max as usize);
1307        }
1308
1309        if let Some(max) = builder.settings.max_header_list_size() {
1310            codec.set_max_recv_header_list_size(max as usize);
1311        }
1312
1313        // Send initial settings frame
1314        codec
1315            .buffer(builder.settings.clone().into())
1316            .expect("invalid SETTINGS frame");
1317
1318        let inner = proto::Connection::new(
1319            codec,
1320            proto::Config {
1321                next_stream_id: builder.stream_id,
1322                initial_max_send_streams: builder.initial_max_send_streams,
1323                max_send_buffer_size: builder.max_send_buffer_size,
1324                reset_stream_duration: builder.reset_stream_duration,
1325                reset_stream_max: builder.reset_stream_max,
1326                remote_reset_stream_max: builder.pending_accept_reset_stream_max,
1327                local_error_reset_streams_max: builder.local_max_error_reset_streams,
1328                settings: builder.settings.clone(),
1329            },
1330        );
1331        let send_request = SendRequest {
1332            inner: inner.streams().clone(),
1333            pending: None,
1334        };
1335
1336        let mut connection = Connection { inner };
1337        if let Some(sz) = builder.initial_target_connection_window_size {
1338            connection.set_target_window_size(sz);
1339        }
1340
1341        Ok((send_request, connection))
1342    }
1343
1344    /// Sets the target window size for the whole connection.
1345    ///
1346    /// If `size` is greater than the current value, then a `WINDOW_UPDATE`
1347    /// frame will be immediately sent to the remote, increasing the connection
1348    /// level window by `size - current_value`.
1349    ///
1350    /// If `size` is less than the current value, nothing will happen
1351    /// immediately. However, as window capacity is released by
1352    /// [`FlowControl`] instances, no `WINDOW_UPDATE` frames will be sent
1353    /// out until the number of "in flight" bytes drops below `size`.
1354    ///
1355    /// The default value is 65,535.
1356    ///
1357    /// See [`FlowControl`] documentation for more details.
1358    ///
1359    /// [`FlowControl`]: ../struct.FlowControl.html
1360    /// [library level]: ../index.html#flow-control
1361    pub fn set_target_window_size(&mut self, size: u32) {
1362        assert!(size <= proto::MAX_WINDOW_SIZE);
1363        self.inner.set_target_window_size(size);
1364    }
1365
1366    /// Set a new `INITIAL_WINDOW_SIZE` setting (in octets) for stream-level
1367    /// flow control for received data.
1368    ///
1369    /// The `SETTINGS` will be sent to the remote, and only applied once the
1370    /// remote acknowledges the change.
1371    ///
1372    /// This can be used to increase or decrease the window size for existing
1373    /// streams.
1374    ///
1375    /// # Errors
1376    ///
1377    /// Returns an error if a previous call is still pending acknowledgement
1378    /// from the remote endpoint.
1379    pub fn set_initial_window_size(&mut self, size: u32) -> Result<(), crate::Error> {
1380        assert!(size <= proto::MAX_WINDOW_SIZE);
1381        self.inner.set_initial_window_size(size)?;
1382        Ok(())
1383    }
1384
1385    /// Takes a `PingPong` instance from the connection.
1386    ///
1387    /// # Note
1388    ///
1389    /// This may only be called once. Calling multiple times will return `None`.
1390    pub fn ping_pong(&mut self) -> Option<PingPong> {
1391        self.inner.take_user_pings().map(PingPong::new)
1392    }
1393
1394    /// Returns the maximum number of concurrent streams that may be initiated
1395    /// by this client.
1396    ///
1397    /// This limit is configured by the server peer by sending the
1398    /// [`SETTINGS_MAX_CONCURRENT_STREAMS` parameter][1] in a `SETTINGS` frame.
1399    /// This method returns the currently acknowledged value received from the
1400    /// remote.
1401    ///
1402    /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
1403    pub fn max_concurrent_send_streams(&self) -> usize {
1404        self.inner.max_send_streams()
1405    }
1406    /// Returns the maximum number of concurrent streams that may be initiated
1407    /// by the server on this connection.
1408    ///
1409    /// This returns the value of the [`SETTINGS_MAX_CONCURRENT_STREAMS`
1410    /// parameter][1] sent in a `SETTINGS` frame that has been
1411    /// acknowledged by the remote peer. The value to be sent is configured by
1412    /// the [`Builder::max_concurrent_streams`][2] method before handshaking
1413    /// with the remote peer.
1414    ///
1415    /// [1]: https://tools.ietf.org/html/rfc7540#section-5.1.2
1416    /// [2]: ../struct.Builder.html#method.max_concurrent_streams
1417    pub fn max_concurrent_recv_streams(&self) -> usize {
1418        self.inner.max_recv_streams()
1419    }
1420}
1421
1422impl<T, B> Future for Connection<T, B>
1423where
1424    T: AsyncRead + AsyncWrite + Unpin,
1425    B: Buf,
1426{
1427    type Output = Result<(), crate::Error>;
1428
1429    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1430        self.inner.maybe_close_connection_if_no_streams();
1431        let result = self.inner.poll(cx).map_err(Into::into);
1432        if result.is_pending() && !self.inner.has_streams_or_other_references() {
1433            tracing::trace!("last stream closed during poll, wake again");
1434            cx.waker().wake_by_ref();
1435        }
1436        result
1437    }
1438}
1439
1440impl<T, B> fmt::Debug for Connection<T, B>
1441where
1442    T: AsyncRead + AsyncWrite,
1443    T: fmt::Debug,
1444    B: fmt::Debug + Buf,
1445{
1446    fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
1447        fmt::Debug::fmt(&self.inner, fmt)
1448    }
1449}
1450
1451// ===== impl ResponseFuture =====
1452
1453impl Future for ResponseFuture {
1454    type Output = Result<Response<RecvStream>, crate::Error>;
1455
1456    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1457        let (parts, _) = ready!(self.inner.poll_response(cx))?.into_parts();
1458        let body = RecvStream::new(FlowControl::new(self.inner.clone()));
1459
1460        Poll::Ready(Ok(Response::from_parts(parts, body)))
1461    }
1462}
1463
1464impl ResponseFuture {
1465    /// Returns the stream ID of the response stream.
1466    ///
1467    /// # Panics
1468    ///
1469    /// If the lock on the stream store has been poisoned.
1470    pub fn stream_id(&self) -> crate::StreamId {
1471        crate::StreamId::from_internal(self.inner.stream_id())
1472    }
1473    /// Returns a stream of PushPromises
1474    ///
1475    /// # Panics
1476    ///
1477    /// If this method has been called before
1478    /// or the stream was itself was pushed
1479    pub fn push_promises(&mut self) -> PushPromises {
1480        if self.push_promise_consumed {
1481            panic!("Reference to push promises stream taken!");
1482        }
1483        self.push_promise_consumed = true;
1484        PushPromises {
1485            inner: self.inner.clone(),
1486        }
1487    }
1488}
1489
1490// ===== impl PushPromises =====
1491
1492impl PushPromises {
1493    /// Get the next `PushPromise`.
1494    pub async fn push_promise(&mut self) -> Option<Result<PushPromise, crate::Error>> {
1495        crate::poll_fn(move |cx| self.poll_push_promise(cx)).await
1496    }
1497
1498    #[doc(hidden)]
1499    pub fn poll_push_promise(
1500        &mut self,
1501        cx: &mut Context<'_>,
1502    ) -> Poll<Option<Result<PushPromise, crate::Error>>> {
1503        match self.inner.poll_pushed(cx) {
1504            Poll::Ready(Some(Ok((request, response)))) => {
1505                let response = PushedResponseFuture {
1506                    inner: ResponseFuture {
1507                        inner: response,
1508                        push_promise_consumed: false,
1509                    },
1510                };
1511                Poll::Ready(Some(Ok(PushPromise { request, response })))
1512            }
1513            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e.into()))),
1514            Poll::Ready(None) => Poll::Ready(None),
1515            Poll::Pending => Poll::Pending,
1516        }
1517    }
1518}
1519
1520#[cfg(feature = "stream")]
1521impl futures_core::Stream for PushPromises {
1522    type Item = Result<PushPromise, crate::Error>;
1523
1524    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1525        self.poll_push_promise(cx)
1526    }
1527}
1528
1529// ===== impl PushPromise =====
1530
1531impl PushPromise {
1532    /// Returns a reference to the push promise's request headers.
1533    pub fn request(&self) -> &Request<()> {
1534        &self.request
1535    }
1536
1537    /// Returns a mutable reference to the push promise's request headers.
1538    pub fn request_mut(&mut self) -> &mut Request<()> {
1539        &mut self.request
1540    }
1541
1542    /// Consumes `self`, returning the push promise's request headers and
1543    /// response future.
1544    pub fn into_parts(self) -> (Request<()>, PushedResponseFuture) {
1545        (self.request, self.response)
1546    }
1547}
1548
1549// ===== impl PushedResponseFuture =====
1550
1551impl Future for PushedResponseFuture {
1552    type Output = Result<Response<RecvStream>, crate::Error>;
1553
1554    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1555        Pin::new(&mut self.inner).poll(cx)
1556    }
1557}
1558
1559impl PushedResponseFuture {
1560    /// Returns the stream ID of the response stream.
1561    ///
1562    /// # Panics
1563    ///
1564    /// If the lock on the stream store has been poisoned.
1565    pub fn stream_id(&self) -> crate::StreamId {
1566        self.inner.stream_id()
1567    }
1568}
1569
1570// ===== impl Peer =====
1571
1572impl Peer {
1573    pub fn convert_send_message(
1574        id: StreamId,
1575        request: Request<()>,
1576        protocol: Option<Protocol>,
1577        end_of_stream: bool,
1578    ) -> Result<Headers, SendError> {
1579        use http::request::Parts;
1580
1581        let (
1582            Parts {
1583                method,
1584                uri,
1585                headers,
1586                version,
1587                ..
1588            },
1589            _,
1590        ) = request.into_parts();
1591
1592        let is_connect = method == Method::CONNECT;
1593
1594        // Build the set pseudo header set. All requests will include `method`
1595        // and `path`.
1596        let mut pseudo = Pseudo::request(method, uri, protocol);
1597
1598        if pseudo.scheme.is_none() {
1599            // If the scheme is not set, then there are a two options.
1600            //
1601            // 1) Authority is not set. In this case, a request was issued with
1602            //    a relative URI. This is permitted **only** when forwarding
1603            //    HTTP 1.x requests. If the HTTP version is set to 2.0, then
1604            //    this is an error.
1605            //
1606            // 2) Authority is set, then the HTTP method *must* be CONNECT.
1607            //
1608            // It is not possible to have a scheme but not an authority set (the
1609            // `http` crate does not allow it).
1610            //
1611            if pseudo.authority.is_none() {
1612                if version == Version::HTTP_2 {
1613                    return Err(UserError::MissingUriSchemeAndAuthority.into());
1614                } else {
1615                    // This is acceptable as per the above comment. However,
1616                    // HTTP/2 requires that a scheme is set. Since we are
1617                    // forwarding an HTTP 1.1 request, the scheme is set to
1618                    // "http".
1619                    pseudo.set_scheme(uri::Scheme::HTTP);
1620                }
1621            } else if !is_connect {
1622                // TODO: Error
1623            }
1624        }
1625
1626        // Create the HEADERS frame
1627        let mut frame = Headers::new(id, pseudo, headers);
1628
1629        if end_of_stream {
1630            frame.set_end_stream()
1631        }
1632
1633        Ok(frame)
1634    }
1635}
1636
1637impl proto::Peer for Peer {
1638    type Poll = Response<()>;
1639
1640    const NAME: &'static str = "Client";
1641
1642    fn r#dyn() -> proto::DynPeer {
1643        proto::DynPeer::Client
1644    }
1645
1646    /*
1647    fn is_server() -> bool {
1648        false
1649    }
1650    */
1651
1652    fn convert_poll_message(
1653        pseudo: Pseudo,
1654        fields: HeaderMap,
1655        stream_id: StreamId,
1656    ) -> Result<Self::Poll, Error> {
1657        let mut b = Response::builder();
1658
1659        b = b.version(Version::HTTP_2);
1660
1661        if let Some(status) = pseudo.status {
1662            b = b.status(status);
1663        }
1664
1665        let mut response = match b.body(()) {
1666            Ok(response) => response,
1667            Err(_) => {
1668                // TODO: Should there be more specialized handling for different
1669                // kinds of errors
1670                return Err(Error::library_reset(stream_id, Reason::PROTOCOL_ERROR));
1671            }
1672        };
1673
1674        *response.headers_mut() = fields;
1675
1676        Ok(response)
1677    }
1678}