Skip to main content

tonic/transport/server/
mod.rs

1//! Server implementation and builder.
2
3mod conn;
4mod display_error_stack;
5mod incoming;
6mod io_stream;
7mod service;
8#[cfg(feature = "_tls-any")]
9mod tls;
10#[cfg(unix)]
11mod unix;
12
13use tokio_stream::StreamExt as _;
14use tracing::{debug, trace};
15
16#[cfg(feature = "router")]
17use crate::{server::NamedService, service::Routes};
18
19#[cfg(feature = "router")]
20use std::convert::Infallible;
21
22pub use conn::{Connected, TcpConnectInfo};
23use hyper_util::{
24    rt::{TokioExecutor, TokioIo, TokioTimer},
25    server::conn::auto::{Builder as ConnectionBuilder, HttpServerConnExec},
26    service::TowerToHyperService,
27};
28#[cfg(feature = "_tls-any")]
29pub use tls::ServerTlsConfig;
30
31#[cfg(feature = "_tls-any")]
32pub use conn::TlsConnectInfo;
33
34#[cfg(feature = "_tls-any")]
35use self::service::TlsAcceptor;
36
37#[cfg(unix)]
38pub use unix::UdsConnectInfo;
39
40pub use incoming::TcpIncoming;
41
42#[cfg(feature = "_tls-any")]
43use crate::transport::Error;
44
45use self::service::{ConnectInfoLayer, ServerIo};
46use super::service::GrpcTimeout;
47use crate::body::Body;
48use crate::service::RecoverErrorLayer;
49use crate::transport::server::display_error_stack::DisplayErrorStack;
50use bytes::Bytes;
51use http::{Request, Response};
52use http_body_util::BodyExt;
53use hyper::{body::Incoming, service::Service as HyperService};
54use pin_project::pin_project;
55use std::{
56    fmt,
57    future::{self, Future},
58    marker::PhantomData,
59    net::SocketAddr,
60    pin::{Pin, pin},
61    sync::Arc,
62    task::{Context, Poll, ready},
63    time::Duration,
64};
65use tokio::io::{AsyncRead, AsyncWrite};
66use tokio_stream::Stream;
67use tower::{
68    Service, ServiceBuilder, ServiceExt,
69    layer::Layer,
70    layer::util::{Identity, Stack},
71    limit::concurrency::ConcurrencyLimitLayer,
72    load_shed::LoadShedLayer,
73    util::BoxCloneService,
74};
75
76type BoxService = tower::util::BoxCloneService<Request<Body>, Response<Body>, crate::BoxError>;
77type TraceInterceptor = Arc<dyn Fn(&http::Request<()>) -> tracing::Span + Send + Sync + 'static>;
78
79const DEFAULT_HTTP2_KEEPALIVE_TIMEOUT: Duration = Duration::from_secs(20);
80
81/// A default batteries included `transport` server.
82///
83/// This provides an easy builder pattern style builder [`Server`] on top of
84/// `hyper` connections. This builder exposes easy configuration parameters
85/// for providing a fully featured http2 based gRPC server. This should provide
86/// a very good out of the box http2 server for use with tonic but is also a
87/// reference implementation that should be a good starting point for anyone
88/// wanting to create a more complex and/or specific implementation.
89#[derive(Clone)]
90pub struct Server<L = Identity> {
91    trace_interceptor: Option<TraceInterceptor>,
92    concurrency_limit: Option<usize>,
93    load_shed: bool,
94    timeout: Option<Duration>,
95    #[cfg(feature = "_tls-any")]
96    tls: Option<TlsAcceptor>,
97    init_stream_window_size: Option<u32>,
98    init_connection_window_size: Option<u32>,
99    max_concurrent_streams: Option<u32>,
100    tcp_keepalive: Option<Duration>,
101    tcp_keepalive_interval: Option<Duration>,
102    tcp_keepalive_retries: Option<u32>,
103    tcp_nodelay: bool,
104    http2_keepalive_interval: Option<Duration>,
105    http2_keepalive_timeout: Duration,
106    http2_adaptive_window: Option<bool>,
107    http2_max_pending_accept_reset_streams: Option<usize>,
108    http2_max_local_error_reset_streams: Option<usize>,
109    http2_max_header_list_size: Option<u32>,
110    max_frame_size: Option<u32>,
111    accept_http1: bool,
112    service_builder: ServiceBuilder<L>,
113    max_connection_age: Option<Duration>,
114    max_connection_age_grace: Option<Duration>,
115}
116
117impl Default for Server<Identity> {
118    fn default() -> Self {
119        Self {
120            trace_interceptor: None,
121            concurrency_limit: None,
122            load_shed: false,
123            timeout: None,
124            #[cfg(feature = "_tls-any")]
125            tls: None,
126            init_stream_window_size: None,
127            init_connection_window_size: None,
128            max_concurrent_streams: None,
129            tcp_keepalive: None,
130            tcp_keepalive_interval: None,
131            tcp_keepalive_retries: None,
132            tcp_nodelay: true,
133            http2_keepalive_interval: None,
134            http2_keepalive_timeout: DEFAULT_HTTP2_KEEPALIVE_TIMEOUT,
135            http2_adaptive_window: None,
136            http2_max_pending_accept_reset_streams: None,
137            http2_max_local_error_reset_streams: None,
138            http2_max_header_list_size: None,
139            max_frame_size: None,
140            accept_http1: false,
141            service_builder: Default::default(),
142            max_connection_age: None,
143            max_connection_age_grace: None,
144        }
145    }
146}
147
148/// A stack based [`Service`] router.
149#[cfg(feature = "router")]
150#[derive(Clone, Debug)]
151pub struct Router<L = Identity> {
152    server: Server<L>,
153    routes: Routes,
154}
155
156impl Server {
157    /// Create a new server builder that can configure a [`Server`].
158    pub fn builder() -> Self {
159        Self::default()
160    }
161}
162
163impl<L> Server<L> {
164    /// Configure TLS for this server.
165    #[cfg(feature = "_tls-any")]
166    pub fn tls_config(self, tls_config: ServerTlsConfig) -> Result<Self, Error> {
167        Ok(Server {
168            tls: Some(tls_config.tls_acceptor().map_err(Error::from_source)?),
169            ..self
170        })
171    }
172
173    /// Set the concurrency limit applied to on requests inbound per connection.
174    ///
175    /// # Example
176    ///
177    /// ```
178    /// # use tonic::transport::Server;
179    /// # use tower_service::Service;
180    /// # let builder = Server::builder();
181    /// builder.concurrency_limit_per_connection(32);
182    /// ```
183    #[must_use]
184    pub fn concurrency_limit_per_connection(self, limit: usize) -> Self {
185        Server {
186            concurrency_limit: Some(limit),
187            ..self
188        }
189    }
190
191    /// Enable or disable load shedding. The default is disabled.
192    ///
193    /// When load shedding is enabled, if the service responds with not ready
194    /// the request will immediately be rejected with a
195    /// [`resource_exhausted`](https://docs.rs/tonic/latest/tonic/struct.Status.html#method.resource_exhausted) error.
196    /// The default is to buffer requests. This is especially useful in combination with
197    /// setting a concurrency limit per connection.
198    ///
199    /// # Example
200    ///
201    /// ```
202    /// # use tonic::transport::Server;
203    /// # use tower_service::Service;
204    /// # let builder = Server::builder();
205    /// builder.load_shed(true);
206    /// ```
207    #[must_use]
208    pub fn load_shed(self, load_shed: bool) -> Self {
209        Server { load_shed, ..self }
210    }
211
212    /// Set a timeout on for all request handlers.
213    ///
214    /// # Example
215    ///
216    /// ```
217    /// # use tonic::transport::Server;
218    /// # use tower_service::Service;
219    /// # use std::time::Duration;
220    /// # let builder = Server::builder();
221    /// builder.timeout(Duration::from_secs(30));
222    /// ```
223    #[must_use]
224    pub fn timeout(self, timeout: Duration) -> Self {
225        Server {
226            timeout: Some(timeout),
227            ..self
228        }
229    }
230
231    /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
232    /// stream-level flow control.
233    ///
234    /// Default is 65,535
235    ///
236    /// [spec]: https://httpwg.org/specs/rfc9113.html#InitialWindowSize
237    #[must_use]
238    pub fn initial_stream_window_size(self, sz: impl Into<Option<u32>>) -> Self {
239        Server {
240            init_stream_window_size: sz.into(),
241            ..self
242        }
243    }
244
245    /// Sets the max connection-level flow control for HTTP2
246    ///
247    /// Default is 65,535
248    #[must_use]
249    pub fn initial_connection_window_size(self, sz: impl Into<Option<u32>>) -> Self {
250        Server {
251            init_connection_window_size: sz.into(),
252            ..self
253        }
254    }
255
256    /// Sets the [`SETTINGS_MAX_CONCURRENT_STREAMS`][spec] option for HTTP2
257    /// connections.
258    ///
259    /// Default is no limit (`None`).
260    ///
261    /// [spec]: https://httpwg.org/specs/rfc9113.html#n-stream-concurrency
262    #[must_use]
263    pub fn max_concurrent_streams(self, max: impl Into<Option<u32>>) -> Self {
264        Server {
265            max_concurrent_streams: max.into(),
266            ..self
267        }
268    }
269
270    /// Sets the maximum time option in milliseconds that a connection may exist
271    ///
272    /// Default is no limit (`None`).
273    ///
274    /// # Example
275    ///
276    /// ```
277    /// # use tonic::transport::Server;
278    /// # use tower_service::Service;
279    /// # use std::time::Duration;
280    /// # let builder = Server::builder();
281    /// builder.max_connection_age(Duration::from_secs(60));
282    /// ```
283    #[must_use]
284    pub fn max_connection_age(self, max_connection_age: Duration) -> Self {
285        Server {
286            max_connection_age: Some(max_connection_age),
287            ..self
288        }
289    }
290
291    /// Sets the maximum duration that a connection may continue to exist
292    /// **after** the graceful shutdown period (`max_connection_age`) has elapsed.
293    ///
294    /// This timeout only takes effect *after* a connection has exceeded its
295    /// configured `max_connection_age`. Once that happens, the server will begin
296    /// graceful shutdown for the connection. If the connection does not close
297    /// gracefully within the `max_connection_age_grace` duration, the server will then
298    /// forcefully terminate it.
299    ///
300    /// If no `max_connection_age` is configured, this forced shutdown timeout will
301    /// **never trigger**, because the server will not know when to begin the
302    /// graceful shutdown phase.
303    ///
304    /// Default is no limit (`None`).
305    ///
306    /// ```
307    /// # use tonic::transport::Server;
308    /// # use tower_service::Service;
309    /// # use std::time::Duration;
310    /// # let builder = Server::builder();
311    /// builder.max_connection_age_grace(Duration::from_secs(60));
312    /// ```
313    #[must_use]
314    pub fn max_connection_age_grace(self, max_connection_age_grace: Duration) -> Self {
315        Server {
316            max_connection_age_grace: Some(max_connection_age_grace),
317            ..self
318        }
319    }
320
321    /// Set whether HTTP2 Ping frames are enabled on accepted connections.
322    ///
323    /// If `None` is specified, HTTP2 keepalive is disabled, otherwise the duration
324    /// specified will be the time interval between HTTP2 Ping frames.
325    /// The timeout for receiving an acknowledgement of the keepalive ping
326    /// can be set with [`Server::http2_keepalive_timeout`].
327    ///
328    /// Default is no HTTP2 keepalive (`None`)
329    ///
330    #[must_use]
331    pub fn http2_keepalive_interval(self, http2_keepalive_interval: Option<Duration>) -> Self {
332        Server {
333            http2_keepalive_interval,
334            ..self
335        }
336    }
337
338    /// Sets a timeout for receiving an acknowledgement of the keepalive ping.
339    ///
340    /// If the ping is not acknowledged within the timeout, the connection will be closed.
341    /// Does nothing if http2_keep_alive_interval is disabled.
342    ///
343    /// Default is 20 seconds.
344    ///
345    #[must_use]
346    pub fn http2_keepalive_timeout(mut self, http2_keepalive_timeout: Option<Duration>) -> Self {
347        if let Some(timeout) = http2_keepalive_timeout {
348            self.http2_keepalive_timeout = timeout;
349        }
350        self
351    }
352
353    /// Sets whether to use an adaptive flow control. Defaults to false.
354    /// Enabling this will override the limits set in http2_initial_stream_window_size and
355    /// http2_initial_connection_window_size.
356    #[must_use]
357    pub fn http2_adaptive_window(self, enabled: Option<bool>) -> Self {
358        Server {
359            http2_adaptive_window: enabled,
360            ..self
361        }
362    }
363
364    /// Configures the maximum number of pending reset streams allowed before a GOAWAY will be sent.
365    ///
366    /// This will default to whatever the default in h2 is. As of v0.3.17, it is 20.
367    ///
368    /// See <https://github.com/hyperium/hyper/issues/2877> for more information.
369    #[must_use]
370    pub fn http2_max_pending_accept_reset_streams(self, max: Option<usize>) -> Self {
371        Server {
372            http2_max_pending_accept_reset_streams: max,
373            ..self
374        }
375    }
376
377    /// Configures the maximum number of local reset streams allowed before a GOAWAY will be sent.
378    ///
379    /// This will default to whatever the default in hyper is.
380    #[must_use]
381    pub fn http2_max_local_error_reset_streams(self, max: Option<usize>) -> Self {
382        Server {
383            http2_max_local_error_reset_streams: max,
384            ..self
385        }
386    }
387
388    /// Set whether TCP keepalive messages are enabled on accepted connections.
389    ///
390    /// If `None` is specified, keepalive is disabled, otherwise the duration
391    /// specified will be the time to remain idle before sending TCP keepalive
392    /// probes.
393    ///
394    /// Important: This setting is ignored when using `serve_with_incoming`.
395    ///
396    /// Default is no keepalive (`None`)
397    ///
398    #[must_use]
399    pub fn tcp_keepalive(self, tcp_keepalive: Option<Duration>) -> Self {
400        Server {
401            tcp_keepalive,
402            ..self
403        }
404    }
405
406    /// Set the value of `TCP_KEEPINTVL` option for accepted connections.
407    ///
408    /// This option specifies the time interval between subsequent keepalive probes.
409    /// This setting only takes effect if [`tcp_keepalive`](Self::tcp_keepalive) is also set.
410    ///
411    /// Important: This setting is ignored when using `serve_with_incoming`.
412    ///
413    /// Default is `None` (system default).
414    ///
415    /// Note: This option is only available on some platforms (Linux, macOS, Windows, etc.).
416    #[must_use]
417    pub fn tcp_keepalive_interval(self, tcp_keepalive_interval: Option<Duration>) -> Self {
418        Server {
419            tcp_keepalive_interval,
420            ..self
421        }
422    }
423
424    /// Set the value of `TCP_KEEPCNT` option for accepted connections.
425    ///
426    /// This option specifies the maximum number of keepalive probes that should be sent
427    /// before dropping the connection.
428    /// This setting only takes effect if [`tcp_keepalive`](Self::tcp_keepalive) is also set.
429    ///
430    /// Important: This setting is ignored when using `serve_with_incoming`.
431    ///
432    /// Default is `None` (system default).
433    ///
434    /// Note: This option is only available on some platforms (Linux, macOS, Windows, etc.).
435    #[must_use]
436    pub fn tcp_keepalive_retries(self, tcp_keepalive_retries: Option<u32>) -> Self {
437        Server {
438            tcp_keepalive_retries,
439            ..self
440        }
441    }
442
443    /// Set the value of `TCP_NODELAY` option for accepted connections. Enabled by default.
444    ///
445    /// Important: This setting is ignored when using `serve_with_incoming`.
446    #[must_use]
447    pub fn tcp_nodelay(self, enabled: bool) -> Self {
448        Server {
449            tcp_nodelay: enabled,
450            ..self
451        }
452    }
453
454    /// Sets the max size of received header frames.
455    ///
456    /// This will default to whatever the default in hyper is. As of v1.4.1, it is 16 KiB.
457    #[must_use]
458    pub fn http2_max_header_list_size(self, max: impl Into<Option<u32>>) -> Self {
459        Server {
460            http2_max_header_list_size: max.into(),
461            ..self
462        }
463    }
464
465    /// Sets the maximum frame size to use for HTTP2.
466    ///
467    /// Passing `None` will do nothing.
468    ///
469    /// If not set, will default from underlying transport.
470    #[must_use]
471    pub fn max_frame_size(self, frame_size: impl Into<Option<u32>>) -> Self {
472        Server {
473            max_frame_size: frame_size.into(),
474            ..self
475        }
476    }
477
478    /// Allow this server to accept http1 requests.
479    ///
480    /// Accepting http1 requests is only useful when developing `grpc-web`
481    /// enabled services. If this setting is set to `true` but services are
482    /// not correctly configured to handle grpc-web requests, your server may
483    /// return confusing (but correct) protocol errors.
484    ///
485    /// Default is `false`.
486    #[must_use]
487    pub fn accept_http1(self, accept_http1: bool) -> Self {
488        Server {
489            accept_http1,
490            ..self
491        }
492    }
493
494    /// Intercept inbound headers and add a [`tracing::Span`] to each response future.
495    #[must_use]
496    pub fn trace_fn<F>(self, f: F) -> Self
497    where
498        F: Fn(&http::Request<()>) -> tracing::Span + Send + Sync + 'static,
499    {
500        Server {
501            trace_interceptor: Some(Arc::new(f)),
502            ..self
503        }
504    }
505
506    /// Create a router with the `S` typed service as the first service.
507    ///
508    /// This will clone the `Server` builder and create a router that will
509    /// route around different services.
510    #[cfg(feature = "router")]
511    pub fn add_service<S>(&mut self, svc: S) -> Router<L>
512    where
513        S: Service<Request<Body>, Error = Infallible>
514            + NamedService
515            + Clone
516            + Send
517            + Sync
518            + 'static,
519        S::Response: axum::response::IntoResponse,
520        S::Future: Send + 'static,
521        L: Clone,
522    {
523        Router::new(self.clone(), Routes::new(svc))
524    }
525
526    /// Create a router with the optional `S` typed service as the first service.
527    ///
528    /// This will clone the `Server` builder and create a router that will
529    /// route around different services.
530    ///
531    /// # Note
532    /// Even when the argument given is `None` this will capture *all* requests to this service name.
533    /// As a result, one cannot use this to toggle between two identically named implementations.
534    #[cfg(feature = "router")]
535    pub fn add_optional_service<S>(&mut self, svc: Option<S>) -> Router<L>
536    where
537        S: Service<Request<Body>, Error = Infallible>
538            + NamedService
539            + Clone
540            + Send
541            + Sync
542            + 'static,
543        S::Response: axum::response::IntoResponse,
544        S::Future: Send + 'static,
545        L: Clone,
546    {
547        let routes = svc.map(Routes::new).unwrap_or_default();
548        Router::new(self.clone(), routes)
549    }
550
551    /// Create a router with given [`Routes`].
552    ///
553    /// This will clone the `Server` builder and create a router that will
554    /// route around different services that were already added to the provided `routes`.
555    #[cfg(feature = "router")]
556    pub fn add_routes(&mut self, routes: Routes) -> Router<L>
557    where
558        L: Clone,
559    {
560        Router::new(self.clone(), routes)
561    }
562
563    /// Set the [Tower] [`Layer`] all services will be wrapped in.
564    ///
565    /// This enables using middleware from the [Tower ecosystem][eco].
566    ///
567    /// # Example
568    ///
569    /// ```
570    /// # use tonic::transport::Server;
571    /// # use tower_service::Service;
572    /// use tower::timeout::TimeoutLayer;
573    /// use std::time::Duration;
574    ///
575    /// # let mut builder = Server::builder();
576    /// builder.layer(TimeoutLayer::new(Duration::from_secs(30)));
577    /// ```
578    ///
579    /// Note that timeouts should be set using [`Server::timeout`]. `TimeoutLayer` is only used
580    /// here as an example.
581    ///
582    /// You can build more complex layers using [`ServiceBuilder`]. Those layers can include
583    /// [interceptors]:
584    ///
585    /// ```
586    /// # use tonic::transport::Server;
587    /// # use tower_service::Service;
588    /// use tower::ServiceBuilder;
589    /// use std::time::Duration;
590    /// use tonic::{Request, Status, service::InterceptorLayer};
591    ///
592    /// fn auth_interceptor(request: Request<()>) -> Result<Request<()>, Status> {
593    ///     if valid_credentials(&request) {
594    ///         Ok(request)
595    ///     } else {
596    ///         Err(Status::unauthenticated("invalid credentials"))
597    ///     }
598    /// }
599    ///
600    /// fn valid_credentials(request: &Request<()>) -> bool {
601    ///     // ...
602    ///     # true
603    /// }
604    ///
605    /// fn some_other_interceptor(request: Request<()>) -> Result<Request<()>, Status> {
606    ///     Ok(request)
607    /// }
608    ///
609    /// let layer = ServiceBuilder::new()
610    ///     .load_shed()
611    ///     .timeout(Duration::from_secs(30))
612    ///     .layer(InterceptorLayer::new(auth_interceptor))
613    ///     .layer(InterceptorLayer::new(some_other_interceptor))
614    ///     .into_inner();
615    ///
616    /// Server::builder().layer(layer);
617    /// ```
618    ///
619    /// [Tower]: https://github.com/tower-rs/tower
620    /// [`Layer`]: tower::layer::Layer
621    /// [eco]: https://github.com/tower-rs
622    /// [`ServiceBuilder`]: tower::ServiceBuilder
623    /// [interceptors]: crate::service::Interceptor
624    pub fn layer<NewLayer>(self, new_layer: NewLayer) -> Server<Stack<NewLayer, L>> {
625        Server {
626            service_builder: self.service_builder.layer(new_layer),
627            trace_interceptor: self.trace_interceptor,
628            concurrency_limit: self.concurrency_limit,
629            load_shed: self.load_shed,
630            timeout: self.timeout,
631            #[cfg(feature = "_tls-any")]
632            tls: self.tls,
633            init_stream_window_size: self.init_stream_window_size,
634            init_connection_window_size: self.init_connection_window_size,
635            max_concurrent_streams: self.max_concurrent_streams,
636            tcp_keepalive: self.tcp_keepalive,
637            tcp_keepalive_interval: self.tcp_keepalive_interval,
638            tcp_keepalive_retries: self.tcp_keepalive_retries,
639            tcp_nodelay: self.tcp_nodelay,
640            http2_keepalive_interval: self.http2_keepalive_interval,
641            http2_keepalive_timeout: self.http2_keepalive_timeout,
642            http2_adaptive_window: self.http2_adaptive_window,
643            http2_max_pending_accept_reset_streams: self.http2_max_pending_accept_reset_streams,
644            http2_max_header_list_size: self.http2_max_header_list_size,
645            http2_max_local_error_reset_streams: self.http2_max_local_error_reset_streams,
646            max_frame_size: self.max_frame_size,
647            accept_http1: self.accept_http1,
648            max_connection_age: self.max_connection_age,
649            max_connection_age_grace: self.max_connection_age_grace,
650        }
651    }
652
653    fn bind_incoming(&self, addr: SocketAddr) -> Result<TcpIncoming, super::Error> {
654        Ok(TcpIncoming::bind(addr)
655            .map_err(super::Error::from_source)?
656            .with_nodelay(Some(self.tcp_nodelay))
657            .with_keepalive(self.tcp_keepalive)
658            .with_keepalive_interval(self.tcp_keepalive_interval)
659            .with_keepalive_retries(self.tcp_keepalive_retries))
660    }
661
662    /// Serve the service.
663    pub async fn serve<S, ResBody>(self, addr: SocketAddr, svc: S) -> Result<(), super::Error>
664    where
665        L: Layer<S>,
666        L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
667        <<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send,
668        <<L as Layer<S>>::Service as Service<Request<Body>>>::Error:
669            Into<crate::BoxError> + Send + 'static,
670        ResBody: http_body::Body<Data = Bytes> + Send + 'static,
671        ResBody::Error: Into<crate::BoxError>,
672    {
673        let incoming = self.bind_incoming(addr)?;
674        self.serve_with_incoming(svc, incoming).await
675    }
676
677    /// Serve the service with the shutdown signal.
678    pub async fn serve_with_shutdown<S, F, ResBody>(
679        self,
680        addr: SocketAddr,
681        svc: S,
682        signal: F,
683    ) -> Result<(), super::Error>
684    where
685        L: Layer<S>,
686        L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
687        <<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send,
688        <<L as Layer<S>>::Service as Service<Request<Body>>>::Error:
689            Into<crate::BoxError> + Send + 'static,
690        F: Future<Output = ()>,
691        ResBody: http_body::Body<Data = Bytes> + Send + 'static,
692        ResBody::Error: Into<crate::BoxError>,
693    {
694        let incoming = self.bind_incoming(addr)?;
695        self.serve_with_incoming_shutdown(svc, incoming, signal)
696            .await
697    }
698
699    /// Serve the service on the provided incoming stream.
700    ///
701    /// The `tcp_nodelay` and `tcp_keepalive` settings are ignored when using this method.
702    pub async fn serve_with_incoming<S, I, IO, IE, ResBody>(
703        self,
704        svc: S,
705        incoming: I,
706    ) -> Result<(), super::Error>
707    where
708        L: Layer<S>,
709        L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
710        <<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send,
711        <<L as Layer<S>>::Service as Service<Request<Body>>>::Error:
712            Into<crate::BoxError> + Send + 'static,
713        I: Stream<Item = Result<IO, IE>>,
714        IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
715        IE: Into<crate::BoxError>,
716        ResBody: http_body::Body<Data = Bytes> + Send + 'static,
717        ResBody::Error: Into<crate::BoxError>,
718    {
719        self.serve_internal(svc, incoming, Option::<future::Ready<()>>::None)
720            .await
721    }
722
723    /// Serve the service with the signal on the provided incoming stream.
724    pub async fn serve_with_incoming_shutdown<S, I, F, IO, IE, ResBody>(
725        self,
726        svc: S,
727        incoming: I,
728        signal: F,
729    ) -> Result<(), super::Error>
730    where
731        L: Layer<S>,
732        L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
733        <<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send,
734        <<L as Layer<S>>::Service as Service<Request<Body>>>::Error:
735            Into<crate::BoxError> + Send + 'static,
736        I: Stream<Item = Result<IO, IE>>,
737        IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
738        IE: Into<crate::BoxError>,
739        F: Future<Output = ()>,
740        ResBody: http_body::Body<Data = Bytes> + Send + 'static,
741        ResBody::Error: Into<crate::BoxError>,
742    {
743        self.serve_internal(svc, incoming, Some(signal)).await
744    }
745
746    async fn serve_internal<S, I, F, IO, IE, ResBody>(
747        self,
748        svc: S,
749        incoming: I,
750        signal: Option<F>,
751    ) -> Result<(), super::Error>
752    where
753        L: Layer<S>,
754        L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
755        <<L as Layer<S>>::Service as Service<Request<Body>>>::Future: Send,
756        <<L as Layer<S>>::Service as Service<Request<Body>>>::Error:
757            Into<crate::BoxError> + Send + 'static,
758        I: Stream<Item = Result<IO, IE>>,
759        IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
760        IE: Into<crate::BoxError>,
761        F: Future<Output = ()>,
762        ResBody: http_body::Body<Data = Bytes> + Send + 'static,
763        ResBody::Error: Into<crate::BoxError>,
764    {
765        let trace_interceptor = self.trace_interceptor.clone();
766        let concurrency_limit = self.concurrency_limit;
767        let load_shed = self.load_shed;
768        let init_connection_window_size = self.init_connection_window_size;
769        let init_stream_window_size = self.init_stream_window_size;
770        let max_concurrent_streams = self.max_concurrent_streams;
771        let timeout = self.timeout;
772        let max_header_list_size = self.http2_max_header_list_size;
773        let max_frame_size = self.max_frame_size;
774        let http2_only = !self.accept_http1;
775
776        let http2_keepalive_interval = self.http2_keepalive_interval;
777        let http2_keepalive_timeout = self.http2_keepalive_timeout;
778        let http2_adaptive_window = self.http2_adaptive_window;
779        let http2_max_pending_accept_reset_streams = self.http2_max_pending_accept_reset_streams;
780        let http2_max_local_error_reset_streams = self.http2_max_local_error_reset_streams;
781        let max_connection_age = self.max_connection_age;
782        let max_connection_age_grace = self.max_connection_age_grace;
783
784        let svc = self.service_builder.service(svc);
785
786        let incoming = io_stream::ServerIoStream::new(
787            incoming,
788            #[cfg(feature = "_tls-any")]
789            self.tls,
790        );
791        let mut svc = MakeSvc {
792            inner: svc,
793            concurrency_limit,
794            load_shed,
795            timeout,
796            trace_interceptor,
797            _io: PhantomData,
798        };
799
800        let server = {
801            let mut builder = ConnectionBuilder::new(TokioExecutor::new());
802
803            if http2_only {
804                builder = builder.http2_only();
805            }
806
807            builder
808                .http2()
809                .timer(TokioTimer::new())
810                .initial_connection_window_size(init_connection_window_size)
811                .initial_stream_window_size(init_stream_window_size)
812                .max_concurrent_streams(max_concurrent_streams)
813                .keep_alive_interval(http2_keepalive_interval)
814                .keep_alive_timeout(http2_keepalive_timeout)
815                .adaptive_window(http2_adaptive_window.unwrap_or_default())
816                .max_pending_accept_reset_streams(http2_max_pending_accept_reset_streams)
817                .max_local_error_reset_streams(http2_max_local_error_reset_streams)
818                .max_frame_size(max_frame_size);
819
820            if let Some(max_header_list_size) = max_header_list_size {
821                builder.http2().max_header_list_size(max_header_list_size);
822            }
823
824            builder
825        };
826
827        let (signal_tx, signal_rx) = tokio::sync::watch::channel(());
828        let signal_tx = Arc::new(signal_tx);
829
830        let graceful = signal.is_some();
831        let mut sig = pin!(Fuse { inner: signal });
832        let mut incoming = pin!(incoming);
833
834        loop {
835            tokio::select! {
836                _ = &mut sig => {
837                    trace!("signal received, shutting down");
838                    break;
839                },
840                io = incoming.next() => {
841                    let io = match io {
842                        Some(Ok(io)) => io,
843                        Some(Err(e)) => {
844                            trace!("error accepting connection: {}", DisplayErrorStack(&*e));
845                            continue;
846                        },
847                        None => {
848                            break
849                        },
850                    };
851
852                    trace!("connection accepted");
853
854                    let req_svc = svc
855                        .call(&io)
856                        .await
857                        .map_err(super::Error::from_source)?;
858
859                    let hyper_io = TokioIo::new(io);
860                    let hyper_svc = TowerToHyperService::new(req_svc.map_request(|req: Request<Incoming>| req.map(Body::new)));
861
862                    serve_connection(hyper_io, hyper_svc, server.clone(), graceful.then(|| signal_rx.clone()), max_connection_age, max_connection_age_grace);
863                }
864            }
865        }
866
867        if graceful {
868            let _ = signal_tx.send(());
869            drop(signal_rx);
870            trace!(
871                "waiting for {} connections to close",
872                signal_tx.receiver_count()
873            );
874
875            // Wait for all connections to close
876            signal_tx.closed().await;
877        }
878
879        Ok(())
880    }
881}
882
883enum TimeoutAction {
884    GracefulShutdown,
885    ForcefulShutdown,
886}
887
888async fn connection_timeout_future(
889    max_connection_age: Option<Duration>,
890    max_connection_age_grace: Option<Duration>,
891) -> TimeoutAction {
892    if let Some(age) = max_connection_age {
893        tokio::time::sleep(age).await;
894
895        if let Some(grace) = max_connection_age_grace {
896            tokio::time::sleep(grace).await;
897            TimeoutAction::ForcefulShutdown
898        } else {
899            TimeoutAction::GracefulShutdown
900        }
901    } else {
902        future::pending().await
903    }
904}
905
906// This is moved to its own function as a way to get around
907// https://github.com/rust-lang/rust/issues/102211
908fn serve_connection<B, IO, S, E>(
909    hyper_io: IO,
910    hyper_svc: S,
911    builder: ConnectionBuilder<E>,
912    mut watcher: Option<tokio::sync::watch::Receiver<()>>,
913    max_connection_age: Option<Duration>,
914    max_connection_age_grace: Option<Duration>,
915) where
916    B: http_body::Body + Send + 'static,
917    B::Data: Send,
918    B::Error: Into<Box<dyn std::error::Error + Send + Sync>> + Send + Sync,
919    IO: hyper::rt::Read + hyper::rt::Write + Unpin + Send + 'static,
920    S: HyperService<Request<Incoming>, Response = Response<B>> + Clone + Send + 'static,
921    S::Future: Send + 'static,
922    S::Error: Into<Box<dyn std::error::Error + Send + Sync>> + Send,
923    E: HttpServerConnExec<S::Future, B> + Send + Sync + 'static,
924{
925    tokio::spawn(async move {
926        {
927            let mut sig = pin!(Fuse {
928                inner: watcher.as_mut().map(|w| w.changed()),
929            });
930
931            let mut conn = pin!(builder.serve_connection(hyper_io, hyper_svc));
932
933            let mut connection_timeout = pin!(connection_timeout_future(
934                max_connection_age,
935                max_connection_age_grace,
936            ));
937
938            loop {
939                tokio::select! {
940                    rv = &mut conn => {
941                        if let Err(err) = rv {
942                            debug!("failed serving connection: {}", DisplayErrorStack(&*err));
943                        }
944                        break;
945                    },
946                    timeout_action = &mut connection_timeout => {
947                        match timeout_action {
948                            TimeoutAction::GracefulShutdown => {
949                                conn.as_mut().graceful_shutdown();
950                            },
951                            TimeoutAction::ForcefulShutdown => {
952                                debug!("forcefully closed connection");
953                                break;
954                            }
955                        }
956                    },
957                    _ = &mut sig => {
958                        conn.as_mut().graceful_shutdown();
959                    },
960                }
961            }
962        }
963
964        drop(watcher);
965        trace!("connection closed");
966    });
967}
968
969#[cfg(feature = "router")]
970impl<L> Router<L> {
971    pub(crate) fn new(server: Server<L>, routes: Routes) -> Self {
972        Self { server, routes }
973    }
974}
975
976#[cfg(feature = "router")]
977impl<L> Router<L> {
978    /// Add a new service to this router.
979    pub fn add_service<S>(mut self, svc: S) -> Self
980    where
981        S: Service<Request<Body>, Error = Infallible>
982            + NamedService
983            + Clone
984            + Send
985            + Sync
986            + 'static,
987        S::Response: axum::response::IntoResponse,
988        S::Future: Send + 'static,
989    {
990        self.routes = self.routes.add_service(svc);
991        self
992    }
993
994    /// Add a new optional service to this router.
995    ///
996    /// # Note
997    /// Even when the argument given is `None` this will capture *all* requests to this service name.
998    /// As a result, one cannot use this to toggle between two identically named implementations.
999    pub fn add_optional_service<S>(mut self, svc: Option<S>) -> Self
1000    where
1001        S: Service<Request<Body>, Error = Infallible>
1002            + NamedService
1003            + Clone
1004            + Send
1005            + Sync
1006            + 'static,
1007        S::Response: axum::response::IntoResponse,
1008        S::Future: Send + 'static,
1009    {
1010        if let Some(svc) = svc {
1011            self.routes = self.routes.add_service(svc);
1012        }
1013        self
1014    }
1015
1016    /// Consume this [`Server`] creating a future that will execute the server
1017    /// on [tokio]'s default executor.
1018    ///
1019    /// [`Server`]: struct.Server.html
1020    /// [tokio]: https://docs.rs/tokio
1021    pub async fn serve<ResBody>(self, addr: SocketAddr) -> Result<(), super::Error>
1022    where
1023        L: Layer<Routes> + Clone,
1024        L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
1025        <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send,
1026        <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error:
1027            Into<crate::BoxError> + Send,
1028        ResBody: http_body::Body<Data = Bytes> + Send + 'static,
1029        ResBody::Error: Into<crate::BoxError>,
1030    {
1031        self.server.serve(addr, self.routes.prepare()).await
1032    }
1033
1034    /// Consume this [`Server`] creating a future that will execute the server
1035    /// on [tokio]'s default executor. And shutdown when the provided signal
1036    /// is received.
1037    ///
1038    /// [`Server`]: struct.Server.html
1039    /// [tokio]: https://docs.rs/tokio
1040    pub async fn serve_with_shutdown<F: Future<Output = ()>, ResBody>(
1041        self,
1042        addr: SocketAddr,
1043        signal: F,
1044    ) -> Result<(), super::Error>
1045    where
1046        L: Layer<Routes>,
1047        L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
1048        <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send,
1049        <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error:
1050            Into<crate::BoxError> + Send,
1051        ResBody: http_body::Body<Data = Bytes> + Send + 'static,
1052        ResBody::Error: Into<crate::BoxError>,
1053    {
1054        self.server
1055            .serve_with_shutdown(addr, self.routes.prepare(), signal)
1056            .await
1057    }
1058
1059    /// Consume this [`Server`] creating a future that will execute the server
1060    /// on the provided incoming stream of `AsyncRead + AsyncWrite`.
1061    ///
1062    /// This method discards any provided [`Server`] TCP configuration.
1063    ///
1064    /// [`Server`]: struct.Server.html
1065    pub async fn serve_with_incoming<I, IO, IE, ResBody>(
1066        self,
1067        incoming: I,
1068    ) -> Result<(), super::Error>
1069    where
1070        I: Stream<Item = Result<IO, IE>>,
1071        IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
1072        IE: Into<crate::BoxError>,
1073        L: Layer<Routes>,
1074        L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
1075        <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send,
1076        <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error:
1077            Into<crate::BoxError> + Send,
1078        ResBody: http_body::Body<Data = Bytes> + Send + 'static,
1079        ResBody::Error: Into<crate::BoxError>,
1080    {
1081        self.server
1082            .serve_with_incoming(self.routes.prepare(), incoming)
1083            .await
1084    }
1085
1086    /// Consume this [`Server`] creating a future that will execute the server
1087    /// on the provided incoming stream of `AsyncRead + AsyncWrite`. Similar to
1088    /// `serve_with_shutdown` this method will also take a signal future to
1089    /// gracefully shutdown the server.
1090    ///
1091    /// This method discards any provided [`Server`] TCP configuration.
1092    ///
1093    /// [`Server`]: struct.Server.html
1094    pub async fn serve_with_incoming_shutdown<I, IO, IE, F, ResBody>(
1095        self,
1096        incoming: I,
1097        signal: F,
1098    ) -> Result<(), super::Error>
1099    where
1100        I: Stream<Item = Result<IO, IE>>,
1101        IO: AsyncRead + AsyncWrite + Connected + Unpin + Send + 'static,
1102        IE: Into<crate::BoxError>,
1103        F: Future<Output = ()>,
1104        L: Layer<Routes>,
1105        L::Service: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
1106        <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Future: Send,
1107        <<L as Layer<Routes>>::Service as Service<Request<Body>>>::Error:
1108            Into<crate::BoxError> + Send,
1109        ResBody: http_body::Body<Data = Bytes> + Send + 'static,
1110        ResBody::Error: Into<crate::BoxError>,
1111    {
1112        self.server
1113            .serve_with_incoming_shutdown(self.routes.prepare(), incoming, signal)
1114            .await
1115    }
1116}
1117
1118impl<L> fmt::Debug for Server<L> {
1119    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1120        f.debug_struct("Builder").finish()
1121    }
1122}
1123
1124#[derive(Clone)]
1125struct Svc<S> {
1126    inner: S,
1127    trace_interceptor: Option<TraceInterceptor>,
1128}
1129
1130impl<S, ResBody> Service<Request<Body>> for Svc<S>
1131where
1132    S: Service<Request<Body>, Response = Response<ResBody>>,
1133    S::Error: Into<crate::BoxError>,
1134    ResBody: http_body::Body<Data = Bytes> + Send + 'static,
1135    ResBody::Error: Into<crate::BoxError>,
1136{
1137    type Response = Response<Body>;
1138    type Error = crate::BoxError;
1139    type Future = SvcFuture<S::Future>;
1140
1141    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1142        self.inner.poll_ready(cx).map_err(Into::into)
1143    }
1144
1145    fn call(&mut self, mut req: Request<Body>) -> Self::Future {
1146        let span = if let Some(trace_interceptor) = &self.trace_interceptor {
1147            let (parts, body) = req.into_parts();
1148            let bodyless_request = Request::from_parts(parts, ());
1149
1150            let span = trace_interceptor(&bodyless_request);
1151
1152            let (parts, _) = bodyless_request.into_parts();
1153            req = Request::from_parts(parts, body);
1154
1155            span
1156        } else {
1157            tracing::Span::none()
1158        };
1159
1160        SvcFuture {
1161            inner: self.inner.call(req),
1162            span,
1163        }
1164    }
1165}
1166
1167#[pin_project]
1168struct SvcFuture<F> {
1169    #[pin]
1170    inner: F,
1171    span: tracing::Span,
1172}
1173
1174impl<F, E, ResBody> Future for SvcFuture<F>
1175where
1176    F: Future<Output = Result<Response<ResBody>, E>>,
1177    E: Into<crate::BoxError>,
1178    ResBody: http_body::Body<Data = Bytes> + Send + 'static,
1179    ResBody::Error: Into<crate::BoxError>,
1180{
1181    type Output = Result<Response<Body>, crate::BoxError>;
1182
1183    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1184        let this = self.project();
1185        let _guard = this.span.enter();
1186
1187        let response: Response<ResBody> = ready!(this.inner.poll(cx)).map_err(Into::into)?;
1188        let response = response.map(|body| Body::new(body.map_err(Into::into)));
1189        Poll::Ready(Ok(response))
1190    }
1191}
1192
1193impl<S> fmt::Debug for Svc<S> {
1194    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1195        f.debug_struct("Svc").finish()
1196    }
1197}
1198
1199#[derive(Clone)]
1200struct MakeSvc<S, IO> {
1201    concurrency_limit: Option<usize>,
1202    load_shed: bool,
1203    timeout: Option<Duration>,
1204    inner: S,
1205    trace_interceptor: Option<TraceInterceptor>,
1206    _io: PhantomData<fn() -> IO>,
1207}
1208
1209impl<S, ResBody, IO> Service<&ServerIo<IO>> for MakeSvc<S, IO>
1210where
1211    IO: Connected + 'static,
1212    S: Service<Request<Body>, Response = Response<ResBody>> + Clone + Send + 'static,
1213    S::Future: Send,
1214    S::Error: Into<crate::BoxError> + Send,
1215    ResBody: http_body::Body<Data = Bytes> + Send + 'static,
1216    ResBody::Error: Into<crate::BoxError>,
1217{
1218    type Response = BoxService;
1219    type Error = crate::BoxError;
1220    type Future = future::Ready<Result<Self::Response, Self::Error>>;
1221
1222    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
1223        Ok(()).into()
1224    }
1225
1226    fn call(&mut self, io: &ServerIo<IO>) -> Self::Future {
1227        let conn_info = io.connect_info();
1228
1229        let svc = self.inner.clone();
1230        let concurrency_limit = self.concurrency_limit;
1231        let timeout = self.timeout;
1232        let trace_interceptor = self.trace_interceptor.clone();
1233
1234        let svc = ServiceBuilder::new()
1235            .layer(RecoverErrorLayer::new())
1236            .option_layer(self.load_shed.then_some(LoadShedLayer::new()))
1237            .option_layer(concurrency_limit.map(ConcurrencyLimitLayer::new))
1238            .layer_fn(|s| GrpcTimeout::new(s, timeout))
1239            .service(svc);
1240
1241        let svc = ServiceBuilder::new()
1242            .layer(BoxCloneService::layer())
1243            .layer(ConnectInfoLayer::new(conn_info.clone()))
1244            .service(Svc {
1245                inner: svc,
1246                trace_interceptor,
1247            });
1248
1249        future::ready(Ok(svc))
1250    }
1251}
1252
1253// From `futures-util` crate, borrowed since this is the only dependency tonic requires.
1254// LICENSE: MIT or Apache-2.0
1255// A future which only yields `Poll::Ready` once, and thereafter yields `Poll::Pending`.
1256#[pin_project]
1257struct Fuse<F> {
1258    #[pin]
1259    inner: Option<F>,
1260}
1261
1262impl<F> Future for Fuse<F>
1263where
1264    F: Future,
1265{
1266    type Output = F::Output;
1267
1268    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1269        match self.as_mut().project().inner.as_pin_mut() {
1270            Some(fut) => fut.poll(cx).map(|output| {
1271                self.project().inner.set(None);
1272                output
1273            }),
1274            None => Poll::Pending,
1275        }
1276    }
1277}
1278
1279#[cfg(test)]
1280mod tests {
1281    use super::*;
1282    use crate::transport::Server;
1283    use std::time::Duration;
1284
1285    #[tokio::test(start_paused = true)]
1286    async fn test_connection_timeout_no_max_age() {
1287        let future = connection_timeout_future(None, None);
1288
1289        tokio::select! {
1290            _ = future => {
1291                panic!("timeout future should never complete when max_connection_age is None");
1292            }
1293            _ = tokio::time::sleep(Duration::from_secs(1000)) => {
1294            }
1295        }
1296    }
1297
1298    #[tokio::test(start_paused = true)]
1299    async fn test_connection_timeout_with_max_connection_age() {
1300        let future = connection_timeout_future(Some(Duration::from_secs(10)), None);
1301
1302        let action = future.await;
1303        assert!(matches!(action, TimeoutAction::GracefulShutdown));
1304    }
1305
1306    #[tokio::test(start_paused = true)]
1307    async fn test_connection_timeout_with_max_connection_age_grace() {
1308        let mut future = pin!(connection_timeout_future(
1309            Some(Duration::from_secs(10)),
1310            Some(Duration::from_secs(5)),
1311        ));
1312
1313        tokio::select! {
1314            _ = &mut future => {
1315                panic!("should not complete before max_connection_age");
1316            }
1317            _ = tokio::time::sleep(Duration::from_secs(9)) => {}
1318        }
1319
1320        tokio::select! {
1321            _ = &mut future => {
1322                panic!("should not complete before max_connection_age_grace");
1323            }
1324            _ = tokio::time::sleep(Duration::from_secs(4)) => {}
1325        }
1326
1327        let action = future.await;
1328        assert!(matches!(action, TimeoutAction::ForcefulShutdown));
1329    }
1330
1331    #[test]
1332    fn server_tcp_defaults() {
1333        const EXAMPLE_TCP_KEEPALIVE: Duration = Duration::from_secs(10);
1334        const EXAMPLE_TCP_KEEPALIVE_INTERVAL: Duration = Duration::from_secs(5);
1335        const EXAMPLE_TCP_KEEPALIVE_RETRIES: u32 = 3;
1336
1337        // Using ::builder() or ::default() should do the same thing
1338        let server_via_builder = Server::builder();
1339        assert!(server_via_builder.tcp_nodelay);
1340        assert_eq!(server_via_builder.tcp_keepalive, None);
1341        assert_eq!(server_via_builder.tcp_keepalive_interval, None);
1342        assert_eq!(server_via_builder.tcp_keepalive_retries, None);
1343        let server_via_default = Server::default();
1344        assert!(server_via_default.tcp_nodelay);
1345        assert_eq!(server_via_default.tcp_keepalive, None);
1346        assert_eq!(server_via_default.tcp_keepalive_interval, None);
1347        assert_eq!(server_via_default.tcp_keepalive_retries, None);
1348
1349        // overriding should be possible
1350        let server_via_builder = Server::builder()
1351            .tcp_nodelay(false)
1352            .tcp_keepalive(Some(EXAMPLE_TCP_KEEPALIVE))
1353            .tcp_keepalive_interval(Some(EXAMPLE_TCP_KEEPALIVE_INTERVAL))
1354            .tcp_keepalive_retries(Some(EXAMPLE_TCP_KEEPALIVE_RETRIES));
1355        assert!(!server_via_builder.tcp_nodelay);
1356        assert_eq!(
1357            server_via_builder.tcp_keepalive,
1358            Some(EXAMPLE_TCP_KEEPALIVE)
1359        );
1360        assert_eq!(
1361            server_via_builder.tcp_keepalive_interval,
1362            Some(EXAMPLE_TCP_KEEPALIVE_INTERVAL)
1363        );
1364        assert_eq!(
1365            server_via_builder.tcp_keepalive_retries,
1366            Some(EXAMPLE_TCP_KEEPALIVE_RETRIES)
1367        );
1368    }
1369}