hyper/proto/h2/
server.rs

1use std::error::Error as StdError;
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use std::time::Duration;
6
7use bytes::Bytes;
8use futures_util::ready;
9use h2::server::{Connection, Handshake, SendResponse};
10use h2::{Reason, RecvStream};
11use http::{Method, Request};
12use pin_project_lite::pin_project;
13
14use super::{ping, PipeToSendStream, SendBuf};
15use crate::body::{Body, Incoming as IncomingBody};
16use crate::common::date;
17use crate::common::io::Compat;
18use crate::common::time::Time;
19use crate::ext::Protocol;
20use crate::headers;
21use crate::proto::h2::ping::Recorder;
22use crate::proto::h2::{H2Upgraded, UpgradedSendStream};
23use crate::proto::Dispatched;
24use crate::rt::bounds::Http2ServerConnExec;
25use crate::rt::{Read, Write};
26use crate::service::HttpService;
27
28use crate::upgrade::{OnUpgrade, Pending, Upgraded};
29use crate::Response;
30
31// Our defaults are chosen for the "majority" case, which usually are not
32// resource constrained, and so the spec default of 64kb can be too limiting
33// for performance.
34//
35// At the same time, a server more often has multiple clients connected, and
36// so is more likely to use more resources than a client would.
37const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024; // 1mb
38const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024; // 1mb
39const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; // 16kb
40const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 400; // 400kb
41const DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE: u32 = 1024 * 16; // 16kb
42const DEFAULT_MAX_LOCAL_ERROR_RESET_STREAMS: usize = 1024;
43
44#[derive(Clone, Debug)]
45pub(crate) struct Config {
46    pub(crate) adaptive_window: bool,
47    pub(crate) initial_conn_window_size: u32,
48    pub(crate) initial_stream_window_size: u32,
49    pub(crate) max_frame_size: u32,
50    pub(crate) enable_connect_protocol: bool,
51    pub(crate) max_concurrent_streams: Option<u32>,
52    pub(crate) max_pending_accept_reset_streams: Option<usize>,
53    pub(crate) max_local_error_reset_streams: Option<usize>,
54    pub(crate) keep_alive_interval: Option<Duration>,
55    pub(crate) keep_alive_timeout: Duration,
56    pub(crate) max_send_buffer_size: usize,
57    pub(crate) max_header_list_size: u32,
58    pub(crate) date_header: bool,
59}
60
61impl Default for Config {
62    fn default() -> Config {
63        Config {
64            adaptive_window: false,
65            initial_conn_window_size: DEFAULT_CONN_WINDOW,
66            initial_stream_window_size: DEFAULT_STREAM_WINDOW,
67            max_frame_size: DEFAULT_MAX_FRAME_SIZE,
68            enable_connect_protocol: false,
69            max_concurrent_streams: Some(200),
70            max_pending_accept_reset_streams: None,
71            max_local_error_reset_streams: Some(DEFAULT_MAX_LOCAL_ERROR_RESET_STREAMS),
72            keep_alive_interval: None,
73            keep_alive_timeout: Duration::from_secs(20),
74            max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE,
75            max_header_list_size: DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE,
76            date_header: true,
77        }
78    }
79}
80
81pin_project! {
82    pub(crate) struct Server<T, S, B, E>
83    where
84        S: HttpService<IncomingBody>,
85        B: Body,
86    {
87        exec: E,
88        timer: Time,
89        service: S,
90        state: State<T, B>,
91        date_header: bool,
92        close_pending: bool
93    }
94}
95
96enum State<T, B>
97where
98    B: Body,
99{
100    Handshaking {
101        ping_config: ping::Config,
102        hs: Handshake<Compat<T>, SendBuf<B::Data>>,
103    },
104    Serving(Serving<T, B>),
105}
106
107struct Serving<T, B>
108where
109    B: Body,
110{
111    ping: Option<(ping::Recorder, ping::Ponger)>,
112    conn: Connection<Compat<T>, SendBuf<B::Data>>,
113    closing: Option<crate::Error>,
114    date_header: bool,
115}
116
117impl<T, S, B, E> Server<T, S, B, E>
118where
119    T: Read + Write + Unpin,
120    S: HttpService<IncomingBody, ResBody = B>,
121    S::Error: Into<Box<dyn StdError + Send + Sync>>,
122    B: Body + 'static,
123    E: Http2ServerConnExec<S::Future, B>,
124{
125    pub(crate) fn new(
126        io: T,
127        service: S,
128        config: &Config,
129        exec: E,
130        timer: Time,
131    ) -> Server<T, S, B, E> {
132        let mut builder = h2::server::Builder::default();
133        builder
134            .initial_window_size(config.initial_stream_window_size)
135            .initial_connection_window_size(config.initial_conn_window_size)
136            .max_frame_size(config.max_frame_size)
137            .max_header_list_size(config.max_header_list_size)
138            .max_local_error_reset_streams(config.max_local_error_reset_streams)
139            .max_send_buffer_size(config.max_send_buffer_size);
140        if let Some(max) = config.max_concurrent_streams {
141            builder.max_concurrent_streams(max);
142        }
143        if let Some(max) = config.max_pending_accept_reset_streams {
144            builder.max_pending_accept_reset_streams(max);
145        }
146        if config.enable_connect_protocol {
147            builder.enable_connect_protocol();
148        }
149        let handshake = builder.handshake(Compat::new(io));
150
151        let bdp = if config.adaptive_window {
152            Some(config.initial_stream_window_size)
153        } else {
154            None
155        };
156
157        let ping_config = ping::Config {
158            bdp_initial_window: bdp,
159            keep_alive_interval: config.keep_alive_interval,
160            keep_alive_timeout: config.keep_alive_timeout,
161            // If keep-alive is enabled for servers, always enabled while
162            // idle, so it can more aggressively close dead connections.
163            keep_alive_while_idle: true,
164        };
165
166        Server {
167            exec,
168            timer,
169            state: State::Handshaking {
170                ping_config,
171                hs: handshake,
172            },
173            service,
174            date_header: config.date_header,
175            close_pending: false,
176        }
177    }
178
179    pub(crate) fn graceful_shutdown(&mut self) {
180        trace!("graceful_shutdown");
181        match self.state {
182            State::Handshaking { .. } => {
183                self.close_pending = true;
184            }
185            State::Serving(ref mut srv) => {
186                if srv.closing.is_none() {
187                    srv.conn.graceful_shutdown();
188                }
189            }
190        }
191    }
192}
193
194impl<T, S, B, E> Future for Server<T, S, B, E>
195where
196    T: Read + Write + Unpin,
197    S: HttpService<IncomingBody, ResBody = B>,
198    S::Error: Into<Box<dyn StdError + Send + Sync>>,
199    B: Body + 'static,
200    E: Http2ServerConnExec<S::Future, B>,
201{
202    type Output = crate::Result<Dispatched>;
203
204    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
205        let me = &mut *self;
206        loop {
207            let next = match me.state {
208                State::Handshaking {
209                    ref mut hs,
210                    ref ping_config,
211                } => {
212                    let mut conn = ready!(Pin::new(hs).poll(cx).map_err(crate::Error::new_h2))?;
213                    let ping = if ping_config.is_enabled() {
214                        let pp = conn.ping_pong().expect("conn.ping_pong");
215                        Some(ping::channel(pp, ping_config.clone(), me.timer.clone()))
216                    } else {
217                        None
218                    };
219                    State::Serving(Serving {
220                        ping,
221                        conn,
222                        closing: None,
223                        date_header: me.date_header,
224                    })
225                }
226                State::Serving(ref mut srv) => {
227                    // graceful_shutdown was called before handshaking finished,
228                    if me.close_pending && srv.closing.is_none() {
229                        srv.conn.graceful_shutdown();
230                    }
231                    ready!(srv.poll_server(cx, &mut me.service, &mut me.exec))?;
232                    return Poll::Ready(Ok(Dispatched::Shutdown));
233                }
234            };
235            me.state = next;
236        }
237    }
238}
239
240impl<T, B> Serving<T, B>
241where
242    T: Read + Write + Unpin,
243    B: Body + 'static,
244{
245    fn poll_server<S, E>(
246        &mut self,
247        cx: &mut Context<'_>,
248        service: &mut S,
249        exec: &mut E,
250    ) -> Poll<crate::Result<()>>
251    where
252        S: HttpService<IncomingBody, ResBody = B>,
253        S::Error: Into<Box<dyn StdError + Send + Sync>>,
254        E: Http2ServerConnExec<S::Future, B>,
255    {
256        if self.closing.is_none() {
257            loop {
258                self.poll_ping(cx);
259
260                match ready!(self.conn.poll_accept(cx)) {
261                    Some(Ok((req, mut respond))) => {
262                        trace!("incoming request");
263                        let content_length = headers::content_length_parse_all(req.headers());
264                        let ping = self
265                            .ping
266                            .as_ref()
267                            .map(|ping| ping.0.clone())
268                            .unwrap_or_else(ping::disabled);
269
270                        // Record the headers received
271                        ping.record_non_data();
272
273                        let is_connect = req.method() == Method::CONNECT;
274                        let (mut parts, stream) = req.into_parts();
275                        let (mut req, connect_parts) = if !is_connect {
276                            (
277                                Request::from_parts(
278                                    parts,
279                                    IncomingBody::h2(stream, content_length.into(), ping),
280                                ),
281                                None,
282                            )
283                        } else {
284                            if content_length.map_or(false, |len| len != 0) {
285                                warn!("h2 connect request with non-zero body not supported");
286                                respond.send_reset(h2::Reason::INTERNAL_ERROR);
287                                return Poll::Ready(Ok(()));
288                            }
289                            let (pending, upgrade) = crate::upgrade::pending();
290                            debug_assert!(parts.extensions.get::<OnUpgrade>().is_none());
291                            parts.extensions.insert(upgrade);
292                            (
293                                Request::from_parts(parts, IncomingBody::empty()),
294                                Some(ConnectParts {
295                                    pending,
296                                    ping,
297                                    recv_stream: stream,
298                                }),
299                            )
300                        };
301
302                        if let Some(protocol) = req.extensions_mut().remove::<h2::ext::Protocol>() {
303                            req.extensions_mut().insert(Protocol::from_inner(protocol));
304                        }
305
306                        let fut = H2Stream::new(
307                            service.call(req),
308                            connect_parts,
309                            respond,
310                            self.date_header,
311                        );
312
313                        exec.execute_h2stream(fut);
314                    }
315                    Some(Err(e)) => {
316                        return Poll::Ready(Err(crate::Error::new_h2(e)));
317                    }
318                    None => {
319                        // no more incoming streams...
320                        if let Some((ref ping, _)) = self.ping {
321                            ping.ensure_not_timed_out()?;
322                        }
323
324                        trace!("incoming connection complete");
325                        return Poll::Ready(Ok(()));
326                    }
327                }
328            }
329        }
330
331        debug_assert!(
332            self.closing.is_some(),
333            "poll_server broke loop without closing"
334        );
335
336        ready!(self.conn.poll_closed(cx).map_err(crate::Error::new_h2))?;
337
338        Poll::Ready(Err(self.closing.take().expect("polled after error")))
339    }
340
341    fn poll_ping(&mut self, cx: &mut Context<'_>) {
342        if let Some((_, ref mut estimator)) = self.ping {
343            match estimator.poll(cx) {
344                Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => {
345                    self.conn.set_target_window_size(wnd);
346                    let _ = self.conn.set_initial_window_size(wnd);
347                }
348                Poll::Ready(ping::Ponged::KeepAliveTimedOut) => {
349                    debug!("keep-alive timed out, closing connection");
350                    self.conn.abrupt_shutdown(h2::Reason::NO_ERROR);
351                }
352                Poll::Pending => {}
353            }
354        }
355    }
356}
357
358pin_project! {
359    #[allow(missing_debug_implementations)]
360    pub struct H2Stream<F, B>
361    where
362        B: Body,
363    {
364        reply: SendResponse<SendBuf<B::Data>>,
365        #[pin]
366        state: H2StreamState<F, B>,
367        date_header: bool,
368    }
369}
370
371pin_project! {
372    #[project = H2StreamStateProj]
373    enum H2StreamState<F, B>
374    where
375        B: Body,
376    {
377        Service {
378            #[pin]
379            fut: F,
380            connect_parts: Option<ConnectParts>,
381        },
382        Body {
383            #[pin]
384            pipe: PipeToSendStream<B>,
385        },
386    }
387}
388
389struct ConnectParts {
390    pending: Pending,
391    ping: Recorder,
392    recv_stream: RecvStream,
393}
394
395impl<F, B> H2Stream<F, B>
396where
397    B: Body,
398{
399    fn new(
400        fut: F,
401        connect_parts: Option<ConnectParts>,
402        respond: SendResponse<SendBuf<B::Data>>,
403        date_header: bool,
404    ) -> H2Stream<F, B> {
405        H2Stream {
406            reply: respond,
407            state: H2StreamState::Service { fut, connect_parts },
408            date_header,
409        }
410    }
411}
412
413macro_rules! reply {
414    ($me:expr, $res:expr, $eos:expr) => {{
415        match $me.reply.send_response($res, $eos) {
416            Ok(tx) => tx,
417            Err(e) => {
418                debug!("send response error: {}", e);
419                $me.reply.send_reset(Reason::INTERNAL_ERROR);
420                return Poll::Ready(Err(crate::Error::new_h2(e)));
421            }
422        }
423    }};
424}
425
426impl<F, B, E> H2Stream<F, B>
427where
428    F: Future<Output = Result<Response<B>, E>>,
429    B: Body,
430    B::Data: 'static,
431    B::Error: Into<Box<dyn StdError + Send + Sync>>,
432    E: Into<Box<dyn StdError + Send + Sync>>,
433{
434    fn poll2(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
435        let mut me = self.project();
436        loop {
437            let next = match me.state.as_mut().project() {
438                H2StreamStateProj::Service {
439                    fut: h,
440                    connect_parts,
441                } => {
442                    let res = match h.poll(cx) {
443                        Poll::Ready(Ok(r)) => r,
444                        Poll::Pending => {
445                            // Response is not yet ready, so we want to check if the client has sent a
446                            // RST_STREAM frame which would cancel the current request.
447                            if let Poll::Ready(reason) =
448                                me.reply.poll_reset(cx).map_err(crate::Error::new_h2)?
449                            {
450                                debug!("stream received RST_STREAM: {:?}", reason);
451                                return Poll::Ready(Err(crate::Error::new_h2(reason.into())));
452                            }
453                            return Poll::Pending;
454                        }
455                        Poll::Ready(Err(e)) => {
456                            let err = crate::Error::new_user_service(e);
457                            warn!("http2 service errored: {}", err);
458                            me.reply.send_reset(err.h2_reason());
459                            return Poll::Ready(Err(err));
460                        }
461                    };
462
463                    let (head, body) = res.into_parts();
464                    let mut res = ::http::Response::from_parts(head, ());
465                    super::strip_connection_headers(res.headers_mut(), false);
466
467                    // set Date header if it isn't already set if instructed
468                    if *me.date_header {
469                        res.headers_mut()
470                            .entry(::http::header::DATE)
471                            .or_insert_with(date::update_and_header_value);
472                    }
473
474                    if let Some(connect_parts) = connect_parts.take() {
475                        if res.status().is_success() {
476                            if headers::content_length_parse_all(res.headers())
477                                .map_or(false, |len| len != 0)
478                            {
479                                warn!("h2 successful response to CONNECT request with body not supported");
480                                me.reply.send_reset(h2::Reason::INTERNAL_ERROR);
481                                return Poll::Ready(Err(crate::Error::new_user_header()));
482                            }
483                            if res
484                                .headers_mut()
485                                .remove(::http::header::CONTENT_LENGTH)
486                                .is_some()
487                            {
488                                warn!("successful response to CONNECT request disallows content-length header");
489                            }
490                            let send_stream = reply!(me, res, false);
491                            connect_parts.pending.fulfill(Upgraded::new(
492                                H2Upgraded {
493                                    ping: connect_parts.ping,
494                                    recv_stream: connect_parts.recv_stream,
495                                    send_stream: unsafe { UpgradedSendStream::new(send_stream) },
496                                    buf: Bytes::new(),
497                                },
498                                Bytes::new(),
499                            ));
500                            return Poll::Ready(Ok(()));
501                        }
502                    }
503
504                    if !body.is_end_stream() {
505                        // automatically set Content-Length from body...
506                        if let Some(len) = body.size_hint().exact() {
507                            headers::set_content_length_if_missing(res.headers_mut(), len);
508                        }
509
510                        let body_tx = reply!(me, res, false);
511                        H2StreamState::Body {
512                            pipe: PipeToSendStream::new(body, body_tx),
513                        }
514                    } else {
515                        reply!(me, res, true);
516                        return Poll::Ready(Ok(()));
517                    }
518                }
519                H2StreamStateProj::Body { pipe } => {
520                    return pipe.poll(cx);
521                }
522            };
523            me.state.set(next);
524        }
525    }
526}
527
528impl<F, B, E> Future for H2Stream<F, B>
529where
530    F: Future<Output = Result<Response<B>, E>>,
531    B: Body,
532    B::Data: 'static,
533    B::Error: Into<Box<dyn StdError + Send + Sync>>,
534    E: Into<Box<dyn StdError + Send + Sync>>,
535{
536    type Output = ();
537
538    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
539        self.poll2(cx).map(|res| {
540            if let Err(_e) = res {
541                debug!("stream error: {}", _e);
542            }
543        })
544    }
545}