tonic/transport/channel/
endpoint.rs

1#[cfg(feature = "tls")]
2use super::service::TlsConnector;
3use super::service::{self, Executor, SharedExec};
4use super::Channel;
5#[cfg(feature = "tls")]
6use super::ClientTlsConfig;
7use crate::transport::Error;
8use bytes::Bytes;
9use http::{uri::Uri, HeaderValue};
10use hyper::rt;
11use hyper_util::client::legacy::connect::HttpConnector;
12use std::{fmt, future::Future, pin::Pin, str::FromStr, time::Duration};
13use tower_service::Service;
14
15/// Channel builder.
16///
17/// This struct is used to build and configure HTTP/2 channels.
18#[derive(Clone)]
19pub struct Endpoint {
20    pub(crate) uri: Uri,
21    pub(crate) origin: Option<Uri>,
22    pub(crate) user_agent: Option<HeaderValue>,
23    pub(crate) timeout: Option<Duration>,
24    pub(crate) concurrency_limit: Option<usize>,
25    pub(crate) rate_limit: Option<(u64, Duration)>,
26    #[cfg(feature = "tls")]
27    pub(crate) tls: Option<TlsConnector>,
28    pub(crate) buffer_size: Option<usize>,
29    pub(crate) init_stream_window_size: Option<u32>,
30    pub(crate) init_connection_window_size: Option<u32>,
31    pub(crate) tcp_keepalive: Option<Duration>,
32    pub(crate) tcp_nodelay: bool,
33    pub(crate) http2_keep_alive_interval: Option<Duration>,
34    pub(crate) http2_keep_alive_timeout: Option<Duration>,
35    pub(crate) http2_keep_alive_while_idle: Option<bool>,
36    pub(crate) http2_max_header_list_size: Option<u32>,
37    pub(crate) connect_timeout: Option<Duration>,
38    pub(crate) http2_adaptive_window: Option<bool>,
39    pub(crate) executor: SharedExec,
40}
41
42impl Endpoint {
43    // FIXME: determine if we want to expose this or not. This is really
44    // just used in codegen for a shortcut.
45    #[doc(hidden)]
46    pub fn new<D>(dst: D) -> Result<Self, Error>
47    where
48        D: TryInto<Self>,
49        D::Error: Into<crate::Error>,
50    {
51        let me = dst.try_into().map_err(|e| Error::from_source(e.into()))?;
52        #[cfg(feature = "tls")]
53        if me.uri.scheme() == Some(&http::uri::Scheme::HTTPS) {
54            return me.tls_config(ClientTlsConfig::new().with_enabled_roots());
55        }
56
57        Ok(me)
58    }
59
60    /// Convert an `Endpoint` from a static string.
61    ///
62    /// # Panics
63    ///
64    /// This function panics if the argument is an invalid URI.
65    ///
66    /// ```
67    /// # use tonic::transport::Endpoint;
68    /// Endpoint::from_static("https://example.com");
69    /// ```
70    pub fn from_static(s: &'static str) -> Self {
71        let uri = Uri::from_static(s);
72        Self::from(uri)
73    }
74
75    /// Convert an `Endpoint` from shared bytes.
76    ///
77    /// ```
78    /// # use tonic::transport::Endpoint;
79    /// Endpoint::from_shared("https://example.com".to_string());
80    /// ```
81    pub fn from_shared(s: impl Into<Bytes>) -> Result<Self, Error> {
82        let uri = Uri::from_maybe_shared(s.into()).map_err(|e| Error::new_invalid_uri().with(e))?;
83        Ok(Self::from(uri))
84    }
85
86    /// Set a custom user-agent header.
87    ///
88    /// `user_agent` will be prepended to Tonic's default user-agent string (`tonic/x.x.x`).
89    /// It must be a value that can be converted into a valid  `http::HeaderValue` or building
90    /// the endpoint will fail.
91    /// ```
92    /// # use tonic::transport::Endpoint;
93    /// # let mut builder = Endpoint::from_static("https://example.com");
94    /// builder.user_agent("Greeter").expect("Greeter should be a valid header value");
95    /// // user-agent: "Greeter tonic/x.x.x"
96    /// ```
97    pub fn user_agent<T>(self, user_agent: T) -> Result<Self, Error>
98    where
99        T: TryInto<HeaderValue>,
100    {
101        user_agent
102            .try_into()
103            .map(|ua| Endpoint {
104                user_agent: Some(ua),
105                ..self
106            })
107            .map_err(|_| Error::new_invalid_user_agent())
108    }
109
110    /// Set a custom origin.
111    ///
112    /// Override the `origin`, mainly useful when you are reaching a Server/LoadBalancer
113    /// which serves multiple services at the same time.
114    /// It will play the role of SNI (Server Name Indication).
115    ///
116    /// ```
117    /// # use tonic::transport::Endpoint;
118    /// # let mut builder = Endpoint::from_static("https://proxy.com");
119    /// builder.origin("https://example.com".parse().expect("http://example.com must be a valid URI"));
120    /// // origin: "https://example.com"
121    /// ```
122    pub fn origin(self, origin: Uri) -> Self {
123        Endpoint {
124            origin: Some(origin),
125            ..self
126        }
127    }
128
129    /// Apply a timeout to each request.
130    ///
131    /// ```
132    /// # use tonic::transport::Endpoint;
133    /// # use std::time::Duration;
134    /// # let mut builder = Endpoint::from_static("https://example.com");
135    /// builder.timeout(Duration::from_secs(5));
136    /// ```
137    ///
138    /// # Notes
139    ///
140    /// This does **not** set the timeout metadata (`grpc-timeout` header) on
141    /// the request, meaning the server will not be informed of this timeout,
142    /// for that use [`Request::set_timeout`].
143    ///
144    /// [`Request::set_timeout`]: crate::Request::set_timeout
145    pub fn timeout(self, dur: Duration) -> Self {
146        Endpoint {
147            timeout: Some(dur),
148            ..self
149        }
150    }
151
152    /// Apply a timeout to connecting to the uri.
153    ///
154    /// Defaults to no timeout.
155    ///
156    /// ```
157    /// # use tonic::transport::Endpoint;
158    /// # use std::time::Duration;
159    /// # let mut builder = Endpoint::from_static("https://example.com");
160    /// builder.connect_timeout(Duration::from_secs(5));
161    /// ```
162    pub fn connect_timeout(self, dur: Duration) -> Self {
163        Endpoint {
164            connect_timeout: Some(dur),
165            ..self
166        }
167    }
168
169    /// Set whether TCP keepalive messages are enabled on accepted connections.
170    ///
171    /// If `None` is specified, keepalive is disabled, otherwise the duration
172    /// specified will be the time to remain idle before sending TCP keepalive
173    /// probes.
174    ///
175    /// Default is no keepalive (`None`)
176    ///
177    pub fn tcp_keepalive(self, tcp_keepalive: Option<Duration>) -> Self {
178        Endpoint {
179            tcp_keepalive,
180            ..self
181        }
182    }
183
184    /// Apply a concurrency limit to each request.
185    ///
186    /// ```
187    /// # use tonic::transport::Endpoint;
188    /// # let mut builder = Endpoint::from_static("https://example.com");
189    /// builder.concurrency_limit(256);
190    /// ```
191    pub fn concurrency_limit(self, limit: usize) -> Self {
192        Endpoint {
193            concurrency_limit: Some(limit),
194            ..self
195        }
196    }
197
198    /// Apply a rate limit to each request.
199    ///
200    /// ```
201    /// # use tonic::transport::Endpoint;
202    /// # use std::time::Duration;
203    /// # let mut builder = Endpoint::from_static("https://example.com");
204    /// builder.rate_limit(32, Duration::from_secs(1));
205    /// ```
206    pub fn rate_limit(self, limit: u64, duration: Duration) -> Self {
207        Endpoint {
208            rate_limit: Some((limit, duration)),
209            ..self
210        }
211    }
212
213    /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
214    /// stream-level flow control.
215    ///
216    /// Default is 65,535
217    ///
218    /// [spec]: https://httpwg.org/specs/rfc9113.html#InitialWindowSize
219    pub fn initial_stream_window_size(self, sz: impl Into<Option<u32>>) -> Self {
220        Endpoint {
221            init_stream_window_size: sz.into(),
222            ..self
223        }
224    }
225
226    /// Sets the max connection-level flow control for HTTP2
227    ///
228    /// Default is 65,535
229    pub fn initial_connection_window_size(self, sz: impl Into<Option<u32>>) -> Self {
230        Endpoint {
231            init_connection_window_size: sz.into(),
232            ..self
233        }
234    }
235
236    /// Sets the tower service default internal buffer size
237    ///
238    /// Default is 1024
239    pub fn buffer_size(self, sz: impl Into<Option<usize>>) -> Self {
240        Endpoint {
241            buffer_size: sz.into(),
242            ..self
243        }
244    }
245
246    /// Configures TLS for the endpoint.
247    #[cfg(feature = "tls")]
248    pub fn tls_config(self, tls_config: ClientTlsConfig) -> Result<Self, Error> {
249        Ok(Endpoint {
250            tls: Some(
251                tls_config
252                    .into_tls_connector(&self.uri)
253                    .map_err(Error::from_source)?,
254            ),
255            ..self
256        })
257    }
258
259    /// Set the value of `TCP_NODELAY` option for accepted connections. Enabled by default.
260    pub fn tcp_nodelay(self, enabled: bool) -> Self {
261        Endpoint {
262            tcp_nodelay: enabled,
263            ..self
264        }
265    }
266
267    /// Set http2 KEEP_ALIVE_INTERVAL. Uses `hyper`'s default otherwise.
268    pub fn http2_keep_alive_interval(self, interval: Duration) -> Self {
269        Endpoint {
270            http2_keep_alive_interval: Some(interval),
271            ..self
272        }
273    }
274
275    /// Set http2 KEEP_ALIVE_TIMEOUT. Uses `hyper`'s default otherwise.
276    pub fn keep_alive_timeout(self, duration: Duration) -> Self {
277        Endpoint {
278            http2_keep_alive_timeout: Some(duration),
279            ..self
280        }
281    }
282
283    /// Set http2 KEEP_ALIVE_WHILE_IDLE. Uses `hyper`'s default otherwise.
284    pub fn keep_alive_while_idle(self, enabled: bool) -> Self {
285        Endpoint {
286            http2_keep_alive_while_idle: Some(enabled),
287            ..self
288        }
289    }
290
291    /// Sets whether to use an adaptive flow control. Uses `hyper`'s default otherwise.
292    pub fn http2_adaptive_window(self, enabled: bool) -> Self {
293        Endpoint {
294            http2_adaptive_window: Some(enabled),
295            ..self
296        }
297    }
298
299    /// Sets the max size of received header frames.
300    ///
301    /// This will default to whatever the default in hyper is. As of v1.4.1, it is 16 KiB.
302    pub fn http2_max_header_list_size(self, size: u32) -> Self {
303        Endpoint {
304            http2_max_header_list_size: Some(size),
305            ..self
306        }
307    }
308
309    /// Sets the executor used to spawn async tasks.
310    ///
311    /// Uses `tokio::spawn` by default.
312    pub fn executor<E>(mut self, executor: E) -> Self
313    where
314        E: Executor<Pin<Box<dyn Future<Output = ()> + Send>>> + Send + Sync + 'static,
315    {
316        self.executor = SharedExec::new(executor);
317        self
318    }
319
320    pub(crate) fn connector<C>(&self, c: C) -> service::Connector<C> {
321        service::Connector::new(
322            c,
323            #[cfg(feature = "tls")]
324            self.tls.clone(),
325        )
326    }
327
328    /// Create a channel from this config.
329    pub async fn connect(&self) -> Result<Channel, Error> {
330        let mut http = HttpConnector::new();
331        http.enforce_http(false);
332        http.set_nodelay(self.tcp_nodelay);
333        http.set_keepalive(self.tcp_keepalive);
334        http.set_connect_timeout(self.connect_timeout);
335
336        let connector = self.connector(http);
337
338        Channel::connect(connector, self.clone()).await
339    }
340
341    /// Create a channel from this config.
342    ///
343    /// The channel returned by this method does not attempt to connect to the endpoint until first
344    /// use.
345    pub fn connect_lazy(&self) -> Channel {
346        let mut http = HttpConnector::new();
347        http.enforce_http(false);
348        http.set_nodelay(self.tcp_nodelay);
349        http.set_keepalive(self.tcp_keepalive);
350        http.set_connect_timeout(self.connect_timeout);
351
352        let connector = self.connector(http);
353
354        Channel::new(connector, self.clone())
355    }
356
357    /// Connect with a custom connector.
358    ///
359    /// This allows you to build a [Channel](struct.Channel.html) that uses a non-HTTP transport.
360    /// See the `uds` example for an example on how to use this function to build channel that
361    /// uses a Unix socket transport.
362    ///
363    /// The [`connect_timeout`](Endpoint::connect_timeout) will still be applied.
364    pub async fn connect_with_connector<C>(&self, connector: C) -> Result<Channel, Error>
365    where
366        C: Service<Uri> + Send + 'static,
367        C::Response: rt::Read + rt::Write + Send + Unpin,
368        C::Future: Send,
369        crate::Error: From<C::Error> + Send,
370    {
371        let connector = self.connector(connector);
372
373        if let Some(connect_timeout) = self.connect_timeout {
374            let mut connector = hyper_timeout::TimeoutConnector::new(connector);
375            connector.set_connect_timeout(Some(connect_timeout));
376            Channel::connect(connector, self.clone()).await
377        } else {
378            Channel::connect(connector, self.clone()).await
379        }
380    }
381
382    /// Connect with a custom connector lazily.
383    ///
384    /// This allows you to build a [Channel](struct.Channel.html) that uses a non-HTTP transport
385    /// connect to it lazily.
386    ///
387    /// See the `uds` example for an example on how to use this function to build channel that
388    /// uses a Unix socket transport.
389    pub fn connect_with_connector_lazy<C>(&self, connector: C) -> Channel
390    where
391        C: Service<Uri> + Send + 'static,
392        C::Response: rt::Read + rt::Write + Send + Unpin,
393        C::Future: Send,
394        crate::Error: From<C::Error> + Send,
395    {
396        let connector = self.connector(connector);
397        if let Some(connect_timeout) = self.connect_timeout {
398            let mut connector = hyper_timeout::TimeoutConnector::new(connector);
399            connector.set_connect_timeout(Some(connect_timeout));
400            Channel::new(connector, self.clone())
401        } else {
402            Channel::new(connector, self.clone())
403        }
404    }
405
406    /// Get the endpoint uri.
407    ///
408    /// ```
409    /// # use tonic::transport::Endpoint;
410    /// # use http::Uri;
411    /// let endpoint = Endpoint::from_static("https://example.com");
412    ///
413    /// assert_eq!(endpoint.uri(), &Uri::from_static("https://example.com"));
414    /// ```
415    pub fn uri(&self) -> &Uri {
416        &self.uri
417    }
418}
419
420impl From<Uri> for Endpoint {
421    fn from(uri: Uri) -> Self {
422        Self {
423            uri,
424            origin: None,
425            user_agent: None,
426            concurrency_limit: None,
427            rate_limit: None,
428            timeout: None,
429            #[cfg(feature = "tls")]
430            tls: None,
431            buffer_size: None,
432            init_stream_window_size: None,
433            init_connection_window_size: None,
434            tcp_keepalive: None,
435            tcp_nodelay: true,
436            http2_keep_alive_interval: None,
437            http2_keep_alive_timeout: None,
438            http2_keep_alive_while_idle: None,
439            http2_max_header_list_size: None,
440            connect_timeout: None,
441            http2_adaptive_window: None,
442            executor: SharedExec::tokio(),
443        }
444    }
445}
446
447impl TryFrom<Bytes> for Endpoint {
448    type Error = Error;
449
450    fn try_from(t: Bytes) -> Result<Self, Self::Error> {
451        Self::from_shared(t)
452    }
453}
454
455impl TryFrom<String> for Endpoint {
456    type Error = Error;
457
458    fn try_from(t: String) -> Result<Self, Self::Error> {
459        Self::from_shared(t.into_bytes())
460    }
461}
462
463impl TryFrom<&'static str> for Endpoint {
464    type Error = Error;
465
466    fn try_from(t: &'static str) -> Result<Self, Self::Error> {
467        Self::from_shared(t.as_bytes())
468    }
469}
470
471impl fmt::Debug for Endpoint {
472    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
473        f.debug_struct("Endpoint").finish()
474    }
475}
476
477impl FromStr for Endpoint {
478    type Err = Error;
479
480    fn from_str(s: &str) -> Result<Self, Self::Err> {
481        Self::try_from(s.to_string())
482    }
483}