1use std::{
2 convert::Infallible,
3 future::Future,
4 marker::PhantomData,
5 pin::Pin,
6 task::{Context, Poll},
7 time::Duration,
8};
9
10use crate::rt::{Read, Write};
11use bytes::Bytes;
12use futures_channel::mpsc::{Receiver, Sender};
13use futures_channel::{mpsc, oneshot};
14use futures_util::future::{Either, FusedFuture, FutureExt as _};
15use futures_util::ready;
16use futures_util::stream::{StreamExt as _, StreamFuture};
17use h2::client::{Builder, Connection, SendRequest};
18use h2::SendStream;
19use http::{Method, StatusCode};
20use pin_project_lite::pin_project;
21
22use super::ping::{Ponger, Recorder};
23use super::{ping, H2Upgraded, PipeToSendStream, SendBuf};
24use crate::body::{Body, Incoming as IncomingBody};
25use crate::client::dispatch::{Callback, SendWhen, TrySendError};
26use crate::common::io::Compat;
27use crate::common::time::Time;
28use crate::ext::Protocol;
29use crate::headers;
30use crate::proto::h2::UpgradedSendStream;
31use crate::proto::Dispatched;
32use crate::rt::bounds::Http2ClientConnExec;
33use crate::upgrade::Upgraded;
34use crate::{Request, Response};
35use h2::client::ResponseFuture;
36
37type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, Response<IncomingBody>>;
38
39type ConnDropRef = mpsc::Sender<Infallible>;
42
43type ConnEof = oneshot::Receiver<Infallible>;
46
47const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024 * 5; const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024 * 2; const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 1024; const DEFAULT_MAX_HEADER_LIST_SIZE: u32 = 1024 * 16; const DEFAULT_INITIAL_MAX_SEND_STREAMS: usize = 100;
64
65#[derive(Clone, Debug)]
66pub(crate) struct Config {
67 pub(crate) adaptive_window: bool,
68 pub(crate) initial_conn_window_size: u32,
69 pub(crate) initial_stream_window_size: u32,
70 pub(crate) initial_max_send_streams: usize,
71 pub(crate) max_frame_size: Option<u32>,
72 pub(crate) max_header_list_size: u32,
73 pub(crate) keep_alive_interval: Option<Duration>,
74 pub(crate) keep_alive_timeout: Duration,
75 pub(crate) keep_alive_while_idle: bool,
76 pub(crate) max_concurrent_reset_streams: Option<usize>,
77 pub(crate) max_send_buffer_size: usize,
78 pub(crate) max_pending_accept_reset_streams: Option<usize>,
79 pub(crate) header_table_size: Option<u32>,
80 pub(crate) max_concurrent_streams: Option<u32>,
81}
82
83impl Default for Config {
84 fn default() -> Config {
85 Config {
86 adaptive_window: false,
87 initial_conn_window_size: DEFAULT_CONN_WINDOW,
88 initial_stream_window_size: DEFAULT_STREAM_WINDOW,
89 initial_max_send_streams: DEFAULT_INITIAL_MAX_SEND_STREAMS,
90 max_frame_size: Some(DEFAULT_MAX_FRAME_SIZE),
91 max_header_list_size: DEFAULT_MAX_HEADER_LIST_SIZE,
92 keep_alive_interval: None,
93 keep_alive_timeout: Duration::from_secs(20),
94 keep_alive_while_idle: false,
95 max_concurrent_reset_streams: None,
96 max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE,
97 max_pending_accept_reset_streams: None,
98 header_table_size: None,
99 max_concurrent_streams: None,
100 }
101 }
102}
103
104fn new_builder(config: &Config) -> Builder {
105 let mut builder = Builder::default();
106 builder
107 .initial_max_send_streams(config.initial_max_send_streams)
108 .initial_window_size(config.initial_stream_window_size)
109 .initial_connection_window_size(config.initial_conn_window_size)
110 .max_header_list_size(config.max_header_list_size)
111 .max_send_buffer_size(config.max_send_buffer_size)
112 .enable_push(false);
113 if let Some(max) = config.max_frame_size {
114 builder.max_frame_size(max);
115 }
116 if let Some(max) = config.max_concurrent_reset_streams {
117 builder.max_concurrent_reset_streams(max);
118 }
119 if let Some(max) = config.max_pending_accept_reset_streams {
120 builder.max_pending_accept_reset_streams(max);
121 }
122 if let Some(size) = config.header_table_size {
123 builder.header_table_size(size);
124 }
125 if let Some(max) = config.max_concurrent_streams {
126 builder.max_concurrent_streams(max);
127 }
128 builder
129}
130
131fn new_ping_config(config: &Config) -> ping::Config {
132 ping::Config {
133 bdp_initial_window: if config.adaptive_window {
134 Some(config.initial_stream_window_size)
135 } else {
136 None
137 },
138 keep_alive_interval: config.keep_alive_interval,
139 keep_alive_timeout: config.keep_alive_timeout,
140 keep_alive_while_idle: config.keep_alive_while_idle,
141 }
142}
143
144pub(crate) async fn handshake<T, B, E>(
145 io: T,
146 req_rx: ClientRx<B>,
147 config: &Config,
148 mut exec: E,
149 timer: Time,
150) -> crate::Result<ClientTask<B, E, T>>
151where
152 T: Read + Write + Unpin,
153 B: Body + 'static,
154 B::Data: Send + 'static,
155 E: Http2ClientConnExec<B, T> + Unpin,
156 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
157{
158 let (h2_tx, mut conn) = new_builder(config)
159 .handshake::<_, SendBuf<B::Data>>(Compat::new(io))
160 .await
161 .map_err(crate::Error::new_h2)?;
162
163 let (conn_drop_ref, rx) = mpsc::channel(1);
168 let (cancel_tx, conn_eof) = oneshot::channel();
169
170 let conn_drop_rx = rx.into_future();
171
172 let ping_config = new_ping_config(config);
173
174 let (conn, ping) = if ping_config.is_enabled() {
175 let pp = conn.ping_pong().expect("conn.ping_pong");
176 let (recorder, ponger) = ping::channel(pp, ping_config, timer);
177
178 let conn: Conn<_, B> = Conn::new(ponger, conn);
179 (Either::Left(conn), recorder)
180 } else {
181 (Either::Right(conn), ping::disabled())
182 };
183 let conn: ConnMapErr<T, B> = ConnMapErr {
184 conn,
185 is_terminated: false,
186 };
187
188 exec.execute_h2_future(H2ClientFuture::Task {
189 task: ConnTask::new(conn, conn_drop_rx, cancel_tx),
190 });
191
192 Ok(ClientTask {
193 ping,
194 conn_drop_ref,
195 conn_eof,
196 executor: exec,
197 h2_tx,
198 req_rx,
199 fut_ctx: None,
200 marker: PhantomData,
201 })
202}
203
204pin_project! {
205 struct Conn<T, B>
206 where
207 B: Body,
208 {
209 #[pin]
210 ponger: Ponger,
211 #[pin]
212 conn: Connection<Compat<T>, SendBuf<<B as Body>::Data>>,
213 }
214}
215
216impl<T, B> Conn<T, B>
217where
218 B: Body,
219 T: Read + Write + Unpin,
220{
221 fn new(ponger: Ponger, conn: Connection<Compat<T>, SendBuf<<B as Body>::Data>>) -> Self {
222 Conn { ponger, conn }
223 }
224}
225
226impl<T, B> Future for Conn<T, B>
227where
228 B: Body,
229 T: Read + Write + Unpin,
230{
231 type Output = Result<(), h2::Error>;
232
233 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
234 let mut this = self.project();
235 match this.ponger.poll(cx) {
236 Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => {
237 this.conn.set_target_window_size(wnd);
238 this.conn.set_initial_window_size(wnd)?;
239 }
240 Poll::Ready(ping::Ponged::KeepAliveTimedOut) => {
241 debug!("connection keep-alive timed out");
242 return Poll::Ready(Ok(()));
243 }
244 Poll::Pending => {}
245 }
246
247 Pin::new(&mut this.conn).poll(cx)
248 }
249}
250
251pin_project! {
252 struct ConnMapErr<T, B>
253 where
254 B: Body,
255 T: Read,
256 T: Write,
257 T: Unpin,
258 {
259 #[pin]
260 conn: Either<Conn<T, B>, Connection<Compat<T>, SendBuf<<B as Body>::Data>>>,
261 #[pin]
262 is_terminated: bool,
263 }
264}
265
266impl<T, B> Future for ConnMapErr<T, B>
267where
268 B: Body,
269 T: Read + Write + Unpin,
270{
271 type Output = Result<(), ()>;
272
273 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
274 let mut this = self.project();
275
276 if *this.is_terminated {
277 return Poll::Pending;
278 }
279 let polled = this.conn.poll(cx);
280 if polled.is_ready() {
281 *this.is_terminated = true;
282 }
283 polled.map_err(|_e| {
284 debug!(error = %_e, "connection error");
285 })
286 }
287}
288
289impl<T, B> FusedFuture for ConnMapErr<T, B>
290where
291 B: Body,
292 T: Read + Write + Unpin,
293{
294 fn is_terminated(&self) -> bool {
295 self.is_terminated
296 }
297}
298
299pin_project! {
300 pub struct ConnTask<T, B>
301 where
302 B: Body,
303 T: Read,
304 T: Write,
305 T: Unpin,
306 {
307 #[pin]
308 drop_rx: StreamFuture<Receiver<Infallible>>,
309 #[pin]
310 cancel_tx: Option<oneshot::Sender<Infallible>>,
311 #[pin]
312 conn: ConnMapErr<T, B>,
313 }
314}
315
316impl<T, B> ConnTask<T, B>
317where
318 B: Body,
319 T: Read + Write + Unpin,
320{
321 fn new(
322 conn: ConnMapErr<T, B>,
323 drop_rx: StreamFuture<Receiver<Infallible>>,
324 cancel_tx: oneshot::Sender<Infallible>,
325 ) -> Self {
326 Self {
327 drop_rx,
328 cancel_tx: Some(cancel_tx),
329 conn,
330 }
331 }
332}
333
334impl<T, B> Future for ConnTask<T, B>
335where
336 B: Body,
337 T: Read + Write + Unpin,
338{
339 type Output = ();
340
341 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
342 let mut this = self.project();
343
344 if !this.conn.is_terminated() && this.conn.poll_unpin(cx).is_ready() {
345 return Poll::Ready(());
347 }
348
349 if !this.drop_rx.is_terminated() && this.drop_rx.poll_unpin(cx).is_ready() {
350 trace!("send_request dropped, starting conn shutdown");
354 drop(this.cancel_tx.take().expect("ConnTask Future polled twice"));
355 }
356
357 Poll::Pending
358 }
359}
360
361pin_project! {
362 #[project = H2ClientFutureProject]
363 pub enum H2ClientFuture<B, T>
364 where
365 B: http_body::Body,
366 B: 'static,
367 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
368 T: Read,
369 T: Write,
370 T: Unpin,
371 {
372 Pipe {
373 #[pin]
374 pipe: PipeMap<B>,
375 },
376 Send {
377 #[pin]
378 send_when: SendWhen<B>,
379 },
380 Task {
381 #[pin]
382 task: ConnTask<T, B>,
383 },
384 }
385}
386
387impl<B, T> Future for H2ClientFuture<B, T>
388where
389 B: http_body::Body + 'static,
390 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
391 T: Read + Write + Unpin,
392{
393 type Output = ();
394
395 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
396 let this = self.project();
397
398 match this {
399 H2ClientFutureProject::Pipe { pipe } => pipe.poll(cx),
400 H2ClientFutureProject::Send { send_when } => send_when.poll(cx),
401 H2ClientFutureProject::Task { task } => task.poll(cx),
402 }
403 }
404}
405
406struct FutCtx<B>
407where
408 B: Body,
409{
410 is_connect: bool,
411 eos: bool,
412 fut: ResponseFuture,
413 body_tx: SendStream<SendBuf<B::Data>>,
414 body: B,
415 cb: Callback<Request<B>, Response<IncomingBody>>,
416}
417
418impl<B: Body> Unpin for FutCtx<B> {}
419
420pub(crate) struct ClientTask<B, E, T>
421where
422 B: Body,
423 E: Unpin,
424{
425 ping: ping::Recorder,
426 conn_drop_ref: ConnDropRef,
427 conn_eof: ConnEof,
428 executor: E,
429 h2_tx: SendRequest<SendBuf<B::Data>>,
430 req_rx: ClientRx<B>,
431 fut_ctx: Option<FutCtx<B>>,
432 marker: PhantomData<T>,
433}
434
435impl<B, E, T> ClientTask<B, E, T>
436where
437 B: Body + 'static,
438 E: Http2ClientConnExec<B, T> + Unpin,
439 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
440 T: Read + Write + Unpin,
441{
442 pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool {
443 self.h2_tx.is_extended_connect_protocol_enabled()
444 }
445}
446
447pin_project! {
448 pub struct PipeMap<S>
449 where
450 S: Body,
451 {
452 #[pin]
453 pipe: PipeToSendStream<S>,
454 #[pin]
455 conn_drop_ref: Option<Sender<Infallible>>,
456 #[pin]
457 ping: Option<Recorder>,
458 }
459}
460
461impl<B> Future for PipeMap<B>
462where
463 B: http_body::Body,
464 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
465{
466 type Output = ();
467
468 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
469 let mut this = self.project();
470
471 match this.pipe.poll_unpin(cx) {
472 Poll::Ready(result) => {
473 if let Err(_e) = result {
474 debug!("client request body error: {}", _e);
475 }
476 drop(this.conn_drop_ref.take().expect("Future polled twice"));
477 drop(this.ping.take().expect("Future polled twice"));
478 return Poll::Ready(());
479 }
480 Poll::Pending => (),
481 };
482 Poll::Pending
483 }
484}
485
486impl<B, E, T> ClientTask<B, E, T>
487where
488 B: Body + 'static + Unpin,
489 B::Data: Send,
490 E: Http2ClientConnExec<B, T> + Unpin,
491 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
492 T: Read + Write + Unpin,
493{
494 fn poll_pipe(&mut self, f: FutCtx<B>, cx: &mut Context<'_>) {
495 let ping = self.ping.clone();
496
497 let send_stream = if !f.is_connect {
498 if !f.eos {
499 let mut pipe = PipeToSendStream::new(f.body, f.body_tx);
500
501 match Pin::new(&mut pipe).poll(cx) {
504 Poll::Ready(_) => (),
505 Poll::Pending => {
506 let conn_drop_ref = self.conn_drop_ref.clone();
507 let ping = ping.clone();
511
512 let pipe = PipeMap {
513 pipe,
514 conn_drop_ref: Some(conn_drop_ref),
515 ping: Some(ping),
516 };
517 self.executor
519 .execute_h2_future(H2ClientFuture::Pipe { pipe });
520 }
521 }
522 }
523
524 None
525 } else {
526 Some(f.body_tx)
527 };
528
529 self.executor.execute_h2_future(H2ClientFuture::Send {
530 send_when: SendWhen {
531 when: ResponseFutMap {
532 fut: f.fut,
533 ping: Some(ping),
534 send_stream: Some(send_stream),
535 },
536 call_back: Some(f.cb),
537 },
538 });
539 }
540}
541
542pin_project! {
543 pub(crate) struct ResponseFutMap<B>
544 where
545 B: Body,
546 B: 'static,
547 {
548 #[pin]
549 fut: ResponseFuture,
550 #[pin]
551 ping: Option<Recorder>,
552 #[pin]
553 send_stream: Option<Option<SendStream<SendBuf<<B as Body>::Data>>>>,
554 }
555}
556
557impl<B> Future for ResponseFutMap<B>
558where
559 B: Body + 'static,
560{
561 type Output = Result<Response<crate::body::Incoming>, (crate::Error, Option<Request<B>>)>;
562
563 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
564 let mut this = self.project();
565
566 let result = ready!(this.fut.poll(cx));
567
568 let ping = this.ping.take().expect("Future polled twice");
569 let send_stream = this.send_stream.take().expect("Future polled twice");
570
571 match result {
572 Ok(res) => {
573 ping.record_non_data();
575
576 let content_length = headers::content_length_parse_all(res.headers());
577 if let (Some(mut send_stream), StatusCode::OK) = (send_stream, res.status()) {
578 if content_length.map_or(false, |len| len != 0) {
579 warn!("h2 connect response with non-zero body not supported");
580
581 send_stream.send_reset(h2::Reason::INTERNAL_ERROR);
582 return Poll::Ready(Err((
583 crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
584 None::<Request<B>>,
585 )));
586 }
587 let (parts, recv_stream) = res.into_parts();
588 let mut res = Response::from_parts(parts, IncomingBody::empty());
589
590 let (pending, on_upgrade) = crate::upgrade::pending();
591 let io = H2Upgraded {
592 ping,
593 send_stream: unsafe { UpgradedSendStream::new(send_stream) },
594 recv_stream,
595 buf: Bytes::new(),
596 };
597 let upgraded = Upgraded::new(io, Bytes::new());
598
599 pending.fulfill(upgraded);
600 res.extensions_mut().insert(on_upgrade);
601
602 Poll::Ready(Ok(res))
603 } else {
604 let res = res.map(|stream| {
605 let ping = ping.for_stream(&stream);
606 IncomingBody::h2(stream, content_length.into(), ping)
607 });
608 Poll::Ready(Ok(res))
609 }
610 }
611 Err(err) => {
612 ping.ensure_not_timed_out().map_err(|e| (e, None))?;
613
614 debug!("client response error: {}", err);
615 Poll::Ready(Err((crate::Error::new_h2(err), None::<Request<B>>)))
616 }
617 }
618 }
619}
620
621impl<B, E, T> Future for ClientTask<B, E, T>
622where
623 B: Body + 'static + Unpin,
624 B::Data: Send,
625 B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
626 E: Http2ClientConnExec<B, T> + Unpin,
627 T: Read + Write + Unpin,
628{
629 type Output = crate::Result<Dispatched>;
630
631 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
632 loop {
633 match ready!(self.h2_tx.poll_ready(cx)) {
634 Ok(()) => (),
635 Err(err) => {
636 self.ping.ensure_not_timed_out()?;
637 return if err.reason() == Some(::h2::Reason::NO_ERROR) {
638 trace!("connection gracefully shutdown");
639 Poll::Ready(Ok(Dispatched::Shutdown))
640 } else {
641 Poll::Ready(Err(crate::Error::new_h2(err)))
642 };
643 }
644 };
645
646 if let Some(f) = self.fut_ctx.take() {
649 self.poll_pipe(f, cx);
650 continue;
651 }
652
653 match self.req_rx.poll_recv(cx) {
654 Poll::Ready(Some((req, cb))) => {
655 if cb.is_canceled() {
657 trace!("request callback is canceled");
658 continue;
659 }
660 let (head, body) = req.into_parts();
661 let mut req = ::http::Request::from_parts(head, ());
662 super::strip_connection_headers(req.headers_mut(), true);
663 if let Some(len) = body.size_hint().exact() {
664 if len != 0 || headers::method_has_defined_payload_semantics(req.method()) {
665 headers::set_content_length_if_missing(req.headers_mut(), len);
666 }
667 }
668
669 let is_connect = req.method() == Method::CONNECT;
670 let eos = body.is_end_stream();
671
672 if is_connect
673 && headers::content_length_parse_all(req.headers())
674 .map_or(false, |len| len != 0)
675 {
676 warn!("h2 connect request with non-zero body not supported");
677 cb.send(Err(TrySendError {
678 error: crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
679 message: None,
680 }));
681 continue;
682 }
683
684 if let Some(protocol) = req.extensions_mut().remove::<Protocol>() {
685 req.extensions_mut().insert(protocol.into_inner());
686 }
687
688 let (fut, body_tx) = match self.h2_tx.send_request(req, !is_connect && eos) {
689 Ok(ok) => ok,
690 Err(err) => {
691 debug!("client send request error: {}", err);
692 cb.send(Err(TrySendError {
693 error: crate::Error::new_h2(err),
694 message: None,
695 }));
696 continue;
697 }
698 };
699
700 let f = FutCtx {
701 is_connect,
702 eos,
703 fut,
704 body_tx,
705 body,
706 cb,
707 };
708
709 match self.h2_tx.poll_ready(cx) {
713 Poll::Pending => {
714 self.fut_ctx = Some(f);
716 return Poll::Pending;
717 }
718 Poll::Ready(Ok(())) => (),
719 Poll::Ready(Err(err)) => {
720 f.cb.send(Err(TrySendError {
721 error: crate::Error::new_h2(err),
722 message: None,
723 }));
724 continue;
725 }
726 }
727 self.poll_pipe(f, cx);
728 continue;
729 }
730
731 Poll::Ready(None) => {
732 trace!("client::dispatch::Sender dropped");
733 return Poll::Ready(Ok(Dispatched::Shutdown));
734 }
735
736 Poll::Pending => match ready!(Pin::new(&mut self.conn_eof).poll(cx)) {
737 #[allow(unused)]
740 Ok(never) => match never {},
741 Err(_conn_is_eof) => {
742 trace!("connection task is closed, closing dispatch task");
743 return Poll::Ready(Ok(Dispatched::Shutdown));
744 }
745 },
746 }
747 }
748 }
749}