1use std::error::Error as StdError;
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use std::time::Duration;
6
7use bytes::Bytes;
8use futures_util::ready;
9use h2::server::{Connection, Handshake, SendResponse};
10use h2::{Reason, RecvStream};
11use http::{Method, Request};
12use pin_project_lite::pin_project;
13
14use super::{ping, PipeToSendStream, SendBuf};
15use crate::body::{Body, Incoming as IncomingBody};
16use crate::common::date;
17use crate::common::io::Compat;
18use crate::common::time::Time;
19use crate::ext::Protocol;
20use crate::headers;
21use crate::proto::h2::ping::Recorder;
22use crate::proto::h2::{H2Upgraded, UpgradedSendStream};
23use crate::proto::Dispatched;
24use crate::rt::bounds::Http2ServerConnExec;
25use crate::rt::{Read, Write};
26use crate::service::HttpService;
27
28use crate::upgrade::{OnUpgrade, Pending, Upgraded};
29use crate::Response;
30
31const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024; const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024; const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 400; const DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE: u32 = 1024 * 16; const DEFAULT_MAX_LOCAL_ERROR_RESET_STREAMS: usize = 1024;
43
44#[derive(Clone, Debug)]
45pub(crate) struct Config {
46 pub(crate) adaptive_window: bool,
47 pub(crate) initial_conn_window_size: u32,
48 pub(crate) initial_stream_window_size: u32,
49 pub(crate) max_frame_size: u32,
50 pub(crate) enable_connect_protocol: bool,
51 pub(crate) max_concurrent_streams: Option<u32>,
52 pub(crate) max_pending_accept_reset_streams: Option<usize>,
53 pub(crate) max_local_error_reset_streams: Option<usize>,
54 pub(crate) keep_alive_interval: Option<Duration>,
55 pub(crate) keep_alive_timeout: Duration,
56 pub(crate) max_send_buffer_size: usize,
57 pub(crate) max_header_list_size: u32,
58 pub(crate) date_header: bool,
59}
60
61impl Default for Config {
62 fn default() -> Config {
63 Config {
64 adaptive_window: false,
65 initial_conn_window_size: DEFAULT_CONN_WINDOW,
66 initial_stream_window_size: DEFAULT_STREAM_WINDOW,
67 max_frame_size: DEFAULT_MAX_FRAME_SIZE,
68 enable_connect_protocol: false,
69 max_concurrent_streams: Some(200),
70 max_pending_accept_reset_streams: None,
71 max_local_error_reset_streams: Some(DEFAULT_MAX_LOCAL_ERROR_RESET_STREAMS),
72 keep_alive_interval: None,
73 keep_alive_timeout: Duration::from_secs(20),
74 max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE,
75 max_header_list_size: DEFAULT_SETTINGS_MAX_HEADER_LIST_SIZE,
76 date_header: true,
77 }
78 }
79}
80
81pin_project! {
82 pub(crate) struct Server<T, S, B, E>
83 where
84 S: HttpService<IncomingBody>,
85 B: Body,
86 {
87 exec: E,
88 timer: Time,
89 service: S,
90 state: State<T, B>,
91 date_header: bool,
92 close_pending: bool
93 }
94}
95
96enum State<T, B>
97where
98 B: Body,
99{
100 Handshaking {
101 ping_config: ping::Config,
102 hs: Handshake<Compat<T>, SendBuf<B::Data>>,
103 },
104 Serving(Serving<T, B>),
105}
106
107struct Serving<T, B>
108where
109 B: Body,
110{
111 ping: Option<(ping::Recorder, ping::Ponger)>,
112 conn: Connection<Compat<T>, SendBuf<B::Data>>,
113 closing: Option<crate::Error>,
114 date_header: bool,
115}
116
117impl<T, S, B, E> Server<T, S, B, E>
118where
119 T: Read + Write + Unpin,
120 S: HttpService<IncomingBody, ResBody = B>,
121 S::Error: Into<Box<dyn StdError + Send + Sync>>,
122 B: Body + 'static,
123 E: Http2ServerConnExec<S::Future, B>,
124{
125 pub(crate) fn new(
126 io: T,
127 service: S,
128 config: &Config,
129 exec: E,
130 timer: Time,
131 ) -> Server<T, S, B, E> {
132 let mut builder = h2::server::Builder::default();
133 builder
134 .initial_window_size(config.initial_stream_window_size)
135 .initial_connection_window_size(config.initial_conn_window_size)
136 .max_frame_size(config.max_frame_size)
137 .max_header_list_size(config.max_header_list_size)
138 .max_local_error_reset_streams(config.max_local_error_reset_streams)
139 .max_send_buffer_size(config.max_send_buffer_size);
140 if let Some(max) = config.max_concurrent_streams {
141 builder.max_concurrent_streams(max);
142 }
143 if let Some(max) = config.max_pending_accept_reset_streams {
144 builder.max_pending_accept_reset_streams(max);
145 }
146 if config.enable_connect_protocol {
147 builder.enable_connect_protocol();
148 }
149 let handshake = builder.handshake(Compat::new(io));
150
151 let bdp = if config.adaptive_window {
152 Some(config.initial_stream_window_size)
153 } else {
154 None
155 };
156
157 let ping_config = ping::Config {
158 bdp_initial_window: bdp,
159 keep_alive_interval: config.keep_alive_interval,
160 keep_alive_timeout: config.keep_alive_timeout,
161 keep_alive_while_idle: true,
164 };
165
166 Server {
167 exec,
168 timer,
169 state: State::Handshaking {
170 ping_config,
171 hs: handshake,
172 },
173 service,
174 date_header: config.date_header,
175 close_pending: false,
176 }
177 }
178
179 pub(crate) fn graceful_shutdown(&mut self) {
180 trace!("graceful_shutdown");
181 match self.state {
182 State::Handshaking { .. } => {
183 self.close_pending = true;
184 }
185 State::Serving(ref mut srv) => {
186 if srv.closing.is_none() {
187 srv.conn.graceful_shutdown();
188 }
189 }
190 }
191 }
192}
193
194impl<T, S, B, E> Future for Server<T, S, B, E>
195where
196 T: Read + Write + Unpin,
197 S: HttpService<IncomingBody, ResBody = B>,
198 S::Error: Into<Box<dyn StdError + Send + Sync>>,
199 B: Body + 'static,
200 E: Http2ServerConnExec<S::Future, B>,
201{
202 type Output = crate::Result<Dispatched>;
203
204 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
205 let me = &mut *self;
206 loop {
207 let next = match me.state {
208 State::Handshaking {
209 ref mut hs,
210 ref ping_config,
211 } => {
212 let mut conn = ready!(Pin::new(hs).poll(cx).map_err(crate::Error::new_h2))?;
213 let ping = if ping_config.is_enabled() {
214 let pp = conn.ping_pong().expect("conn.ping_pong");
215 Some(ping::channel(pp, ping_config.clone(), me.timer.clone()))
216 } else {
217 None
218 };
219 State::Serving(Serving {
220 ping,
221 conn,
222 closing: None,
223 date_header: me.date_header,
224 })
225 }
226 State::Serving(ref mut srv) => {
227 if me.close_pending && srv.closing.is_none() {
229 srv.conn.graceful_shutdown();
230 }
231 ready!(srv.poll_server(cx, &mut me.service, &mut me.exec))?;
232 return Poll::Ready(Ok(Dispatched::Shutdown));
233 }
234 };
235 me.state = next;
236 }
237 }
238}
239
240impl<T, B> Serving<T, B>
241where
242 T: Read + Write + Unpin,
243 B: Body + 'static,
244{
245 fn poll_server<S, E>(
246 &mut self,
247 cx: &mut Context<'_>,
248 service: &mut S,
249 exec: &mut E,
250 ) -> Poll<crate::Result<()>>
251 where
252 S: HttpService<IncomingBody, ResBody = B>,
253 S::Error: Into<Box<dyn StdError + Send + Sync>>,
254 E: Http2ServerConnExec<S::Future, B>,
255 {
256 if self.closing.is_none() {
257 loop {
258 self.poll_ping(cx);
259
260 match ready!(self.conn.poll_accept(cx)) {
261 Some(Ok((req, mut respond))) => {
262 trace!("incoming request");
263 let content_length = headers::content_length_parse_all(req.headers());
264 let ping = self
265 .ping
266 .as_ref()
267 .map(|ping| ping.0.clone())
268 .unwrap_or_else(ping::disabled);
269
270 ping.record_non_data();
272
273 let is_connect = req.method() == Method::CONNECT;
274 let (mut parts, stream) = req.into_parts();
275 let (mut req, connect_parts) = if !is_connect {
276 (
277 Request::from_parts(
278 parts,
279 IncomingBody::h2(stream, content_length.into(), ping),
280 ),
281 None,
282 )
283 } else {
284 if content_length.map_or(false, |len| len != 0) {
285 warn!("h2 connect request with non-zero body not supported");
286 respond.send_reset(h2::Reason::INTERNAL_ERROR);
287 return Poll::Ready(Ok(()));
288 }
289 let (pending, upgrade) = crate::upgrade::pending();
290 debug_assert!(parts.extensions.get::<OnUpgrade>().is_none());
291 parts.extensions.insert(upgrade);
292 (
293 Request::from_parts(parts, IncomingBody::empty()),
294 Some(ConnectParts {
295 pending,
296 ping,
297 recv_stream: stream,
298 }),
299 )
300 };
301
302 if let Some(protocol) = req.extensions_mut().remove::<h2::ext::Protocol>() {
303 req.extensions_mut().insert(Protocol::from_inner(protocol));
304 }
305
306 let fut = H2Stream::new(
307 service.call(req),
308 connect_parts,
309 respond,
310 self.date_header,
311 );
312
313 exec.execute_h2stream(fut);
314 }
315 Some(Err(e)) => {
316 return Poll::Ready(Err(crate::Error::new_h2(e)));
317 }
318 None => {
319 if let Some((ref ping, _)) = self.ping {
321 ping.ensure_not_timed_out()?;
322 }
323
324 trace!("incoming connection complete");
325 return Poll::Ready(Ok(()));
326 }
327 }
328 }
329 }
330
331 debug_assert!(
332 self.closing.is_some(),
333 "poll_server broke loop without closing"
334 );
335
336 ready!(self.conn.poll_closed(cx).map_err(crate::Error::new_h2))?;
337
338 Poll::Ready(Err(self.closing.take().expect("polled after error")))
339 }
340
341 fn poll_ping(&mut self, cx: &mut Context<'_>) {
342 if let Some((_, ref mut estimator)) = self.ping {
343 match estimator.poll(cx) {
344 Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => {
345 self.conn.set_target_window_size(wnd);
346 let _ = self.conn.set_initial_window_size(wnd);
347 }
348 Poll::Ready(ping::Ponged::KeepAliveTimedOut) => {
349 debug!("keep-alive timed out, closing connection");
350 self.conn.abrupt_shutdown(h2::Reason::NO_ERROR);
351 }
352 Poll::Pending => {}
353 }
354 }
355 }
356}
357
358pin_project! {
359 #[allow(missing_debug_implementations)]
360 pub struct H2Stream<F, B>
361 where
362 B: Body,
363 {
364 reply: SendResponse<SendBuf<B::Data>>,
365 #[pin]
366 state: H2StreamState<F, B>,
367 date_header: bool,
368 }
369}
370
371pin_project! {
372 #[project = H2StreamStateProj]
373 enum H2StreamState<F, B>
374 where
375 B: Body,
376 {
377 Service {
378 #[pin]
379 fut: F,
380 connect_parts: Option<ConnectParts>,
381 },
382 Body {
383 #[pin]
384 pipe: PipeToSendStream<B>,
385 },
386 }
387}
388
389struct ConnectParts {
390 pending: Pending,
391 ping: Recorder,
392 recv_stream: RecvStream,
393}
394
395impl<F, B> H2Stream<F, B>
396where
397 B: Body,
398{
399 fn new(
400 fut: F,
401 connect_parts: Option<ConnectParts>,
402 respond: SendResponse<SendBuf<B::Data>>,
403 date_header: bool,
404 ) -> H2Stream<F, B> {
405 H2Stream {
406 reply: respond,
407 state: H2StreamState::Service { fut, connect_parts },
408 date_header,
409 }
410 }
411}
412
413macro_rules! reply {
414 ($me:expr, $res:expr, $eos:expr) => {{
415 match $me.reply.send_response($res, $eos) {
416 Ok(tx) => tx,
417 Err(e) => {
418 debug!("send response error: {}", e);
419 $me.reply.send_reset(Reason::INTERNAL_ERROR);
420 return Poll::Ready(Err(crate::Error::new_h2(e)));
421 }
422 }
423 }};
424}
425
426impl<F, B, E> H2Stream<F, B>
427where
428 F: Future<Output = Result<Response<B>, E>>,
429 B: Body,
430 B::Data: 'static,
431 B::Error: Into<Box<dyn StdError + Send + Sync>>,
432 E: Into<Box<dyn StdError + Send + Sync>>,
433{
434 fn poll2(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
435 let mut me = self.project();
436 loop {
437 let next = match me.state.as_mut().project() {
438 H2StreamStateProj::Service {
439 fut: h,
440 connect_parts,
441 } => {
442 let res = match h.poll(cx) {
443 Poll::Ready(Ok(r)) => r,
444 Poll::Pending => {
445 if let Poll::Ready(reason) =
448 me.reply.poll_reset(cx).map_err(crate::Error::new_h2)?
449 {
450 debug!("stream received RST_STREAM: {:?}", reason);
451 return Poll::Ready(Err(crate::Error::new_h2(reason.into())));
452 }
453 return Poll::Pending;
454 }
455 Poll::Ready(Err(e)) => {
456 let err = crate::Error::new_user_service(e);
457 warn!("http2 service errored: {}", err);
458 me.reply.send_reset(err.h2_reason());
459 return Poll::Ready(Err(err));
460 }
461 };
462
463 let (head, body) = res.into_parts();
464 let mut res = ::http::Response::from_parts(head, ());
465 super::strip_connection_headers(res.headers_mut(), false);
466
467 if *me.date_header {
469 res.headers_mut()
470 .entry(::http::header::DATE)
471 .or_insert_with(date::update_and_header_value);
472 }
473
474 if let Some(connect_parts) = connect_parts.take() {
475 if res.status().is_success() {
476 if headers::content_length_parse_all(res.headers())
477 .map_or(false, |len| len != 0)
478 {
479 warn!("h2 successful response to CONNECT request with body not supported");
480 me.reply.send_reset(h2::Reason::INTERNAL_ERROR);
481 return Poll::Ready(Err(crate::Error::new_user_header()));
482 }
483 if res
484 .headers_mut()
485 .remove(::http::header::CONTENT_LENGTH)
486 .is_some()
487 {
488 warn!("successful response to CONNECT request disallows content-length header");
489 }
490 let send_stream = reply!(me, res, false);
491 connect_parts.pending.fulfill(Upgraded::new(
492 H2Upgraded {
493 ping: connect_parts.ping,
494 recv_stream: connect_parts.recv_stream,
495 send_stream: unsafe { UpgradedSendStream::new(send_stream) },
496 buf: Bytes::new(),
497 },
498 Bytes::new(),
499 ));
500 return Poll::Ready(Ok(()));
501 }
502 }
503
504 if !body.is_end_stream() {
505 if let Some(len) = body.size_hint().exact() {
507 headers::set_content_length_if_missing(res.headers_mut(), len);
508 }
509
510 let body_tx = reply!(me, res, false);
511 H2StreamState::Body {
512 pipe: PipeToSendStream::new(body, body_tx),
513 }
514 } else {
515 reply!(me, res, true);
516 return Poll::Ready(Ok(()));
517 }
518 }
519 H2StreamStateProj::Body { pipe } => {
520 return pipe.poll(cx);
521 }
522 };
523 me.state.set(next);
524 }
525 }
526}
527
528impl<F, B, E> Future for H2Stream<F, B>
529where
530 F: Future<Output = Result<Response<B>, E>>,
531 B: Body,
532 B::Data: 'static,
533 B::Error: Into<Box<dyn StdError + Send + Sync>>,
534 E: Into<Box<dyn StdError + Send + Sync>>,
535{
536 type Output = ();
537
538 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
539 self.poll2(cx).map(|res| {
540 if let Err(_e) = res {
541 debug!("stream error: {}", _e);
542 }
543 })
544 }
545}