hyper/proto/h2/
client.rs

1use std::{
2    convert::Infallible,
3    future::Future,
4    marker::PhantomData,
5    pin::Pin,
6    task::{Context, Poll},
7    time::Duration,
8};
9
10use crate::rt::{Read, Write};
11use bytes::Bytes;
12use futures_channel::mpsc::{Receiver, Sender};
13use futures_channel::{mpsc, oneshot};
14use futures_util::future::{Either, FusedFuture, FutureExt as _};
15use futures_util::ready;
16use futures_util::stream::{StreamExt as _, StreamFuture};
17use h2::client::{Builder, Connection, SendRequest};
18use h2::SendStream;
19use http::{Method, StatusCode};
20use pin_project_lite::pin_project;
21
22use super::ping::{Ponger, Recorder};
23use super::{ping, H2Upgraded, PipeToSendStream, SendBuf};
24use crate::body::{Body, Incoming as IncomingBody};
25use crate::client::dispatch::{Callback, SendWhen, TrySendError};
26use crate::common::io::Compat;
27use crate::common::time::Time;
28use crate::ext::Protocol;
29use crate::headers;
30use crate::proto::h2::UpgradedSendStream;
31use crate::proto::Dispatched;
32use crate::rt::bounds::Http2ClientConnExec;
33use crate::upgrade::Upgraded;
34use crate::{Request, Response};
35use h2::client::ResponseFuture;
36
37type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, Response<IncomingBody>>;
38
39///// An mpsc channel is used to help notify the `Connection` task when *all*
40///// other handles to it have been dropped, so that it can shutdown.
41type ConnDropRef = mpsc::Sender<Infallible>;
42
43///// A oneshot channel watches the `Connection` task, and when it completes,
44///// the "dispatch" task will be notified and can shutdown sooner.
45type ConnEof = oneshot::Receiver<Infallible>;
46
47// Our defaults are chosen for the "majority" case, which usually are not
48// resource constrained, and so the spec default of 64kb can be too limiting
49// for performance.
50const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024 * 5; // 5mb
51const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024 * 2; // 2mb
52const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; // 16kb
53const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 1024; // 1mb
54const DEFAULT_MAX_HEADER_LIST_SIZE: u32 = 1024 * 16; // 16kb
55
56// The maximum number of concurrent streams that the client is allowed to open
57// before it receives the initial SETTINGS frame from the server.
58// This default value is derived from what the HTTP/2 spec recommends as the
59// minimum value that endpoints advertise to their peers. It means that using
60// this value will minimize the chance of the failure where the local endpoint
61// attempts to open too many streams and gets rejected by the remote peer with
62// the `REFUSED_STREAM` error.
63const DEFAULT_INITIAL_MAX_SEND_STREAMS: usize = 100;
64
65#[derive(Clone, Debug)]
66pub(crate) struct Config {
67    pub(crate) adaptive_window: bool,
68    pub(crate) initial_conn_window_size: u32,
69    pub(crate) initial_stream_window_size: u32,
70    pub(crate) initial_max_send_streams: usize,
71    pub(crate) max_frame_size: Option<u32>,
72    pub(crate) max_header_list_size: u32,
73    pub(crate) keep_alive_interval: Option<Duration>,
74    pub(crate) keep_alive_timeout: Duration,
75    pub(crate) keep_alive_while_idle: bool,
76    pub(crate) max_concurrent_reset_streams: Option<usize>,
77    pub(crate) max_send_buffer_size: usize,
78    pub(crate) max_pending_accept_reset_streams: Option<usize>,
79    pub(crate) header_table_size: Option<u32>,
80    pub(crate) max_concurrent_streams: Option<u32>,
81}
82
83impl Default for Config {
84    fn default() -> Config {
85        Config {
86            adaptive_window: false,
87            initial_conn_window_size: DEFAULT_CONN_WINDOW,
88            initial_stream_window_size: DEFAULT_STREAM_WINDOW,
89            initial_max_send_streams: DEFAULT_INITIAL_MAX_SEND_STREAMS,
90            max_frame_size: Some(DEFAULT_MAX_FRAME_SIZE),
91            max_header_list_size: DEFAULT_MAX_HEADER_LIST_SIZE,
92            keep_alive_interval: None,
93            keep_alive_timeout: Duration::from_secs(20),
94            keep_alive_while_idle: false,
95            max_concurrent_reset_streams: None,
96            max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE,
97            max_pending_accept_reset_streams: None,
98            header_table_size: None,
99            max_concurrent_streams: None,
100        }
101    }
102}
103
104fn new_builder(config: &Config) -> Builder {
105    let mut builder = Builder::default();
106    builder
107        .initial_max_send_streams(config.initial_max_send_streams)
108        .initial_window_size(config.initial_stream_window_size)
109        .initial_connection_window_size(config.initial_conn_window_size)
110        .max_header_list_size(config.max_header_list_size)
111        .max_send_buffer_size(config.max_send_buffer_size)
112        .enable_push(false);
113    if let Some(max) = config.max_frame_size {
114        builder.max_frame_size(max);
115    }
116    if let Some(max) = config.max_concurrent_reset_streams {
117        builder.max_concurrent_reset_streams(max);
118    }
119    if let Some(max) = config.max_pending_accept_reset_streams {
120        builder.max_pending_accept_reset_streams(max);
121    }
122    if let Some(size) = config.header_table_size {
123        builder.header_table_size(size);
124    }
125    if let Some(max) = config.max_concurrent_streams {
126        builder.max_concurrent_streams(max);
127    }
128    builder
129}
130
131fn new_ping_config(config: &Config) -> ping::Config {
132    ping::Config {
133        bdp_initial_window: if config.adaptive_window {
134            Some(config.initial_stream_window_size)
135        } else {
136            None
137        },
138        keep_alive_interval: config.keep_alive_interval,
139        keep_alive_timeout: config.keep_alive_timeout,
140        keep_alive_while_idle: config.keep_alive_while_idle,
141    }
142}
143
144pub(crate) async fn handshake<T, B, E>(
145    io: T,
146    req_rx: ClientRx<B>,
147    config: &Config,
148    mut exec: E,
149    timer: Time,
150) -> crate::Result<ClientTask<B, E, T>>
151where
152    T: Read + Write + Unpin,
153    B: Body + 'static,
154    B::Data: Send + 'static,
155    E: Http2ClientConnExec<B, T> + Unpin,
156    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
157{
158    let (h2_tx, mut conn) = new_builder(config)
159        .handshake::<_, SendBuf<B::Data>>(Compat::new(io))
160        .await
161        .map_err(crate::Error::new_h2)?;
162
163    // An mpsc channel is used entirely to detect when the
164    // 'Client' has been dropped. This is to get around a bug
165    // in h2 where dropping all SendRequests won't notify a
166    // parked Connection.
167    let (conn_drop_ref, rx) = mpsc::channel(1);
168    let (cancel_tx, conn_eof) = oneshot::channel();
169
170    let conn_drop_rx = rx.into_future();
171
172    let ping_config = new_ping_config(config);
173
174    let (conn, ping) = if ping_config.is_enabled() {
175        let pp = conn.ping_pong().expect("conn.ping_pong");
176        let (recorder, ponger) = ping::channel(pp, ping_config, timer);
177
178        let conn: Conn<_, B> = Conn::new(ponger, conn);
179        (Either::Left(conn), recorder)
180    } else {
181        (Either::Right(conn), ping::disabled())
182    };
183    let conn: ConnMapErr<T, B> = ConnMapErr {
184        conn,
185        is_terminated: false,
186    };
187
188    exec.execute_h2_future(H2ClientFuture::Task {
189        task: ConnTask::new(conn, conn_drop_rx, cancel_tx),
190    });
191
192    Ok(ClientTask {
193        ping,
194        conn_drop_ref,
195        conn_eof,
196        executor: exec,
197        h2_tx,
198        req_rx,
199        fut_ctx: None,
200        marker: PhantomData,
201    })
202}
203
204pin_project! {
205    struct Conn<T, B>
206    where
207        B: Body,
208    {
209        #[pin]
210        ponger: Ponger,
211        #[pin]
212        conn: Connection<Compat<T>, SendBuf<<B as Body>::Data>>,
213    }
214}
215
216impl<T, B> Conn<T, B>
217where
218    B: Body,
219    T: Read + Write + Unpin,
220{
221    fn new(ponger: Ponger, conn: Connection<Compat<T>, SendBuf<<B as Body>::Data>>) -> Self {
222        Conn { ponger, conn }
223    }
224}
225
226impl<T, B> Future for Conn<T, B>
227where
228    B: Body,
229    T: Read + Write + Unpin,
230{
231    type Output = Result<(), h2::Error>;
232
233    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
234        let mut this = self.project();
235        match this.ponger.poll(cx) {
236            Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => {
237                this.conn.set_target_window_size(wnd);
238                this.conn.set_initial_window_size(wnd)?;
239            }
240            Poll::Ready(ping::Ponged::KeepAliveTimedOut) => {
241                debug!("connection keep-alive timed out");
242                return Poll::Ready(Ok(()));
243            }
244            Poll::Pending => {}
245        }
246
247        Pin::new(&mut this.conn).poll(cx)
248    }
249}
250
251pin_project! {
252    struct ConnMapErr<T, B>
253    where
254        B: Body,
255        T: Read,
256        T: Write,
257        T: Unpin,
258    {
259        #[pin]
260        conn: Either<Conn<T, B>, Connection<Compat<T>, SendBuf<<B as Body>::Data>>>,
261        #[pin]
262        is_terminated: bool,
263    }
264}
265
266impl<T, B> Future for ConnMapErr<T, B>
267where
268    B: Body,
269    T: Read + Write + Unpin,
270{
271    type Output = Result<(), ()>;
272
273    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
274        let mut this = self.project();
275
276        if *this.is_terminated {
277            return Poll::Pending;
278        }
279        let polled = this.conn.poll(cx);
280        if polled.is_ready() {
281            *this.is_terminated = true;
282        }
283        polled.map_err(|_e| {
284            debug!(error = %_e, "connection error");
285        })
286    }
287}
288
289impl<T, B> FusedFuture for ConnMapErr<T, B>
290where
291    B: Body,
292    T: Read + Write + Unpin,
293{
294    fn is_terminated(&self) -> bool {
295        self.is_terminated
296    }
297}
298
299pin_project! {
300    pub struct ConnTask<T, B>
301    where
302        B: Body,
303        T: Read,
304        T: Write,
305        T: Unpin,
306    {
307        #[pin]
308        drop_rx: StreamFuture<Receiver<Infallible>>,
309        #[pin]
310        cancel_tx: Option<oneshot::Sender<Infallible>>,
311        #[pin]
312        conn: ConnMapErr<T, B>,
313    }
314}
315
316impl<T, B> ConnTask<T, B>
317where
318    B: Body,
319    T: Read + Write + Unpin,
320{
321    fn new(
322        conn: ConnMapErr<T, B>,
323        drop_rx: StreamFuture<Receiver<Infallible>>,
324        cancel_tx: oneshot::Sender<Infallible>,
325    ) -> Self {
326        Self {
327            drop_rx,
328            cancel_tx: Some(cancel_tx),
329            conn,
330        }
331    }
332}
333
334impl<T, B> Future for ConnTask<T, B>
335where
336    B: Body,
337    T: Read + Write + Unpin,
338{
339    type Output = ();
340
341    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
342        let mut this = self.project();
343
344        if !this.conn.is_terminated() && this.conn.poll_unpin(cx).is_ready() {
345            // ok or err, the `conn` has finished.
346            return Poll::Ready(());
347        }
348
349        if !this.drop_rx.is_terminated() && this.drop_rx.poll_unpin(cx).is_ready() {
350            // mpsc has been dropped, hopefully polling
351            // the connection some more should start shutdown
352            // and then close.
353            trace!("send_request dropped, starting conn shutdown");
354            drop(this.cancel_tx.take().expect("ConnTask Future polled twice"));
355        }
356
357        Poll::Pending
358    }
359}
360
361pin_project! {
362    #[project = H2ClientFutureProject]
363    pub enum H2ClientFuture<B, T>
364    where
365        B: http_body::Body,
366        B: 'static,
367        B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
368        T: Read,
369        T: Write,
370        T: Unpin,
371    {
372        Pipe {
373            #[pin]
374            pipe: PipeMap<B>,
375        },
376        Send {
377            #[pin]
378            send_when: SendWhen<B>,
379        },
380        Task {
381            #[pin]
382            task: ConnTask<T, B>,
383        },
384    }
385}
386
387impl<B, T> Future for H2ClientFuture<B, T>
388where
389    B: http_body::Body + 'static,
390    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
391    T: Read + Write + Unpin,
392{
393    type Output = ();
394
395    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
396        let this = self.project();
397
398        match this {
399            H2ClientFutureProject::Pipe { pipe } => pipe.poll(cx),
400            H2ClientFutureProject::Send { send_when } => send_when.poll(cx),
401            H2ClientFutureProject::Task { task } => task.poll(cx),
402        }
403    }
404}
405
406struct FutCtx<B>
407where
408    B: Body,
409{
410    is_connect: bool,
411    eos: bool,
412    fut: ResponseFuture,
413    body_tx: SendStream<SendBuf<B::Data>>,
414    body: B,
415    cb: Callback<Request<B>, Response<IncomingBody>>,
416}
417
418impl<B: Body> Unpin for FutCtx<B> {}
419
420pub(crate) struct ClientTask<B, E, T>
421where
422    B: Body,
423    E: Unpin,
424{
425    ping: ping::Recorder,
426    conn_drop_ref: ConnDropRef,
427    conn_eof: ConnEof,
428    executor: E,
429    h2_tx: SendRequest<SendBuf<B::Data>>,
430    req_rx: ClientRx<B>,
431    fut_ctx: Option<FutCtx<B>>,
432    marker: PhantomData<T>,
433}
434
435impl<B, E, T> ClientTask<B, E, T>
436where
437    B: Body + 'static,
438    E: Http2ClientConnExec<B, T> + Unpin,
439    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
440    T: Read + Write + Unpin,
441{
442    pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool {
443        self.h2_tx.is_extended_connect_protocol_enabled()
444    }
445}
446
447pin_project! {
448    pub struct PipeMap<S>
449    where
450        S: Body,
451    {
452        #[pin]
453        pipe: PipeToSendStream<S>,
454        #[pin]
455        conn_drop_ref: Option<Sender<Infallible>>,
456        #[pin]
457        ping: Option<Recorder>,
458    }
459}
460
461impl<B> Future for PipeMap<B>
462where
463    B: http_body::Body,
464    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
465{
466    type Output = ();
467
468    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
469        let mut this = self.project();
470
471        match this.pipe.poll_unpin(cx) {
472            Poll::Ready(result) => {
473                if let Err(_e) = result {
474                    debug!("client request body error: {}", _e);
475                }
476                drop(this.conn_drop_ref.take().expect("Future polled twice"));
477                drop(this.ping.take().expect("Future polled twice"));
478                return Poll::Ready(());
479            }
480            Poll::Pending => (),
481        };
482        Poll::Pending
483    }
484}
485
486impl<B, E, T> ClientTask<B, E, T>
487where
488    B: Body + 'static + Unpin,
489    B::Data: Send,
490    E: Http2ClientConnExec<B, T> + Unpin,
491    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
492    T: Read + Write + Unpin,
493{
494    fn poll_pipe(&mut self, f: FutCtx<B>, cx: &mut Context<'_>) {
495        let ping = self.ping.clone();
496
497        let send_stream = if !f.is_connect {
498            if !f.eos {
499                let mut pipe = PipeToSendStream::new(f.body, f.body_tx);
500
501                // eagerly see if the body pipe is ready and
502                // can thus skip allocating in the executor
503                match Pin::new(&mut pipe).poll(cx) {
504                    Poll::Ready(_) => (),
505                    Poll::Pending => {
506                        let conn_drop_ref = self.conn_drop_ref.clone();
507                        // keep the ping recorder's knowledge of an
508                        // "open stream" alive while this body is
509                        // still sending...
510                        let ping = ping.clone();
511
512                        let pipe = PipeMap {
513                            pipe,
514                            conn_drop_ref: Some(conn_drop_ref),
515                            ping: Some(ping),
516                        };
517                        // Clear send task
518                        self.executor
519                            .execute_h2_future(H2ClientFuture::Pipe { pipe });
520                    }
521                }
522            }
523
524            None
525        } else {
526            Some(f.body_tx)
527        };
528
529        self.executor.execute_h2_future(H2ClientFuture::Send {
530            send_when: SendWhen {
531                when: ResponseFutMap {
532                    fut: f.fut,
533                    ping: Some(ping),
534                    send_stream: Some(send_stream),
535                },
536                call_back: Some(f.cb),
537            },
538        });
539    }
540}
541
542pin_project! {
543    pub(crate) struct ResponseFutMap<B>
544    where
545        B: Body,
546        B: 'static,
547    {
548        #[pin]
549        fut: ResponseFuture,
550        #[pin]
551        ping: Option<Recorder>,
552        #[pin]
553        send_stream: Option<Option<SendStream<SendBuf<<B as Body>::Data>>>>,
554    }
555}
556
557impl<B> Future for ResponseFutMap<B>
558where
559    B: Body + 'static,
560{
561    type Output = Result<Response<crate::body::Incoming>, (crate::Error, Option<Request<B>>)>;
562
563    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
564        let mut this = self.project();
565
566        let result = ready!(this.fut.poll(cx));
567
568        let ping = this.ping.take().expect("Future polled twice");
569        let send_stream = this.send_stream.take().expect("Future polled twice");
570
571        match result {
572            Ok(res) => {
573                // record that we got the response headers
574                ping.record_non_data();
575
576                let content_length = headers::content_length_parse_all(res.headers());
577                if let (Some(mut send_stream), StatusCode::OK) = (send_stream, res.status()) {
578                    if content_length.map_or(false, |len| len != 0) {
579                        warn!("h2 connect response with non-zero body not supported");
580
581                        send_stream.send_reset(h2::Reason::INTERNAL_ERROR);
582                        return Poll::Ready(Err((
583                            crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
584                            None::<Request<B>>,
585                        )));
586                    }
587                    let (parts, recv_stream) = res.into_parts();
588                    let mut res = Response::from_parts(parts, IncomingBody::empty());
589
590                    let (pending, on_upgrade) = crate::upgrade::pending();
591                    let io = H2Upgraded {
592                        ping,
593                        send_stream: unsafe { UpgradedSendStream::new(send_stream) },
594                        recv_stream,
595                        buf: Bytes::new(),
596                    };
597                    let upgraded = Upgraded::new(io, Bytes::new());
598
599                    pending.fulfill(upgraded);
600                    res.extensions_mut().insert(on_upgrade);
601
602                    Poll::Ready(Ok(res))
603                } else {
604                    let res = res.map(|stream| {
605                        let ping = ping.for_stream(&stream);
606                        IncomingBody::h2(stream, content_length.into(), ping)
607                    });
608                    Poll::Ready(Ok(res))
609                }
610            }
611            Err(err) => {
612                ping.ensure_not_timed_out().map_err(|e| (e, None))?;
613
614                debug!("client response error: {}", err);
615                Poll::Ready(Err((crate::Error::new_h2(err), None::<Request<B>>)))
616            }
617        }
618    }
619}
620
621impl<B, E, T> Future for ClientTask<B, E, T>
622where
623    B: Body + 'static + Unpin,
624    B::Data: Send,
625    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
626    E: Http2ClientConnExec<B, T> + Unpin,
627    T: Read + Write + Unpin,
628{
629    type Output = crate::Result<Dispatched>;
630
631    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
632        loop {
633            match ready!(self.h2_tx.poll_ready(cx)) {
634                Ok(()) => (),
635                Err(err) => {
636                    self.ping.ensure_not_timed_out()?;
637                    return if err.reason() == Some(::h2::Reason::NO_ERROR) {
638                        trace!("connection gracefully shutdown");
639                        Poll::Ready(Ok(Dispatched::Shutdown))
640                    } else {
641                        Poll::Ready(Err(crate::Error::new_h2(err)))
642                    };
643                }
644            };
645
646            // If we were waiting on pending open
647            // continue where we left off.
648            if let Some(f) = self.fut_ctx.take() {
649                self.poll_pipe(f, cx);
650                continue;
651            }
652
653            match self.req_rx.poll_recv(cx) {
654                Poll::Ready(Some((req, cb))) => {
655                    // check that future hasn't been canceled already
656                    if cb.is_canceled() {
657                        trace!("request callback is canceled");
658                        continue;
659                    }
660                    let (head, body) = req.into_parts();
661                    let mut req = ::http::Request::from_parts(head, ());
662                    super::strip_connection_headers(req.headers_mut(), true);
663                    if let Some(len) = body.size_hint().exact() {
664                        if len != 0 || headers::method_has_defined_payload_semantics(req.method()) {
665                            headers::set_content_length_if_missing(req.headers_mut(), len);
666                        }
667                    }
668
669                    let is_connect = req.method() == Method::CONNECT;
670                    let eos = body.is_end_stream();
671
672                    if is_connect
673                        && headers::content_length_parse_all(req.headers())
674                            .map_or(false, |len| len != 0)
675                    {
676                        warn!("h2 connect request with non-zero body not supported");
677                        cb.send(Err(TrySendError {
678                            error: crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
679                            message: None,
680                        }));
681                        continue;
682                    }
683
684                    if let Some(protocol) = req.extensions_mut().remove::<Protocol>() {
685                        req.extensions_mut().insert(protocol.into_inner());
686                    }
687
688                    let (fut, body_tx) = match self.h2_tx.send_request(req, !is_connect && eos) {
689                        Ok(ok) => ok,
690                        Err(err) => {
691                            debug!("client send request error: {}", err);
692                            cb.send(Err(TrySendError {
693                                error: crate::Error::new_h2(err),
694                                message: None,
695                            }));
696                            continue;
697                        }
698                    };
699
700                    let f = FutCtx {
701                        is_connect,
702                        eos,
703                        fut,
704                        body_tx,
705                        body,
706                        cb,
707                    };
708
709                    // Check poll_ready() again.
710                    // If the call to send_request() resulted in the new stream being pending open
711                    // we have to wait for the open to complete before accepting new requests.
712                    match self.h2_tx.poll_ready(cx) {
713                        Poll::Pending => {
714                            // Save Context
715                            self.fut_ctx = Some(f);
716                            return Poll::Pending;
717                        }
718                        Poll::Ready(Ok(())) => (),
719                        Poll::Ready(Err(err)) => {
720                            f.cb.send(Err(TrySendError {
721                                error: crate::Error::new_h2(err),
722                                message: None,
723                            }));
724                            continue;
725                        }
726                    }
727                    self.poll_pipe(f, cx);
728                    continue;
729                }
730
731                Poll::Ready(None) => {
732                    trace!("client::dispatch::Sender dropped");
733                    return Poll::Ready(Ok(Dispatched::Shutdown));
734                }
735
736                Poll::Pending => match ready!(Pin::new(&mut self.conn_eof).poll(cx)) {
737                    // As of Rust 1.82, this pattern is no longer needed, and emits a warning.
738                    // But we cannot remove it as long as MSRV is less than that.
739                    #[allow(unused)]
740                    Ok(never) => match never {},
741                    Err(_conn_is_eof) => {
742                        trace!("connection task is closed, closing dispatch task");
743                        return Poll::Ready(Ok(Dispatched::Shutdown));
744                    }
745                },
746            }
747        }
748    }
749}