1use super::Channel;
2#[cfg(feature = "_tls-any")]
3use super::ClientTlsConfig;
4#[cfg(feature = "_tls-any")]
5use super::service::TlsConnector;
6use super::service::{self, Executor, SharedExec};
7use super::uds_connector::UdsConnector;
8use crate::transport::Error;
9#[cfg(feature = "_tls-any")]
10use crate::transport::error;
11use bytes::Bytes;
12use http::{HeaderValue, uri::Uri};
13use hyper::rt;
14use hyper_util::client::legacy::connect::HttpConnector;
15#[cfg(feature = "_tls-any")]
16use std::sync::Arc;
17use std::{fmt, future::Future, net::IpAddr, pin::Pin, str, str::FromStr, time::Duration};
18#[cfg(feature = "_tls-any")]
19use tokio_rustls::rustls::client::danger::ServerCertVerifier;
20use tower_service::Service;
21
22#[derive(Clone, PartialEq, Eq, Hash)]
23pub(crate) enum EndpointType {
24 Uri(Uri),
25 Uds(String),
26}
27
28#[derive(Clone)]
32pub struct Endpoint {
33 pub(crate) uri: EndpointType,
34 fallback_uri: Uri,
35 pub(crate) origin: Option<Uri>,
36 pub(crate) user_agent: Option<HeaderValue>,
37 pub(crate) timeout: Option<Duration>,
38 pub(crate) concurrency_limit: Option<usize>,
39 pub(crate) rate_limit: Option<(u64, Duration)>,
40 #[cfg(feature = "_tls-any")]
41 pub(crate) tls: Option<TlsConnector>,
42 pub(crate) buffer_size: Option<usize>,
43 pub(crate) init_stream_window_size: Option<u32>,
44 pub(crate) init_connection_window_size: Option<u32>,
45 pub(crate) max_frame_size: Option<u32>,
46 pub(crate) tcp_keepalive: Option<Duration>,
47 pub(crate) tcp_keepalive_interval: Option<Duration>,
48 pub(crate) tcp_keepalive_retries: Option<u32>,
49 pub(crate) tcp_nodelay: bool,
50 pub(crate) http2_keep_alive_interval: Option<Duration>,
51 pub(crate) http2_keep_alive_timeout: Option<Duration>,
52 pub(crate) http2_keep_alive_while_idle: Option<bool>,
53 pub(crate) http2_header_table_size: Option<u32>,
54 pub(crate) http2_max_header_list_size: Option<u32>,
55 pub(crate) connect_timeout: Option<Duration>,
56 pub(crate) http2_adaptive_window: Option<bool>,
57 pub(crate) local_address: Option<IpAddr>,
58 pub(crate) executor: SharedExec,
59}
60
61impl Endpoint {
62 #[doc(hidden)]
65 pub fn new<D>(dst: D) -> Result<Self, Error>
66 where
67 D: TryInto<Self>,
68 D::Error: Into<crate::BoxError>,
69 {
70 let me = dst.try_into().map_err(|e| Error::from_source(e.into()))?;
71 #[cfg(feature = "_tls-any")]
72 if let EndpointType::Uri(uri) = &me.uri {
73 if me.tls.is_none() && uri.scheme() == Some(&http::uri::Scheme::HTTPS) {
74 return me.tls_config(ClientTlsConfig::new().with_enabled_roots());
75 }
76 }
77 Ok(me)
78 }
79
80 fn new_uri(uri: Uri) -> Self {
81 Self {
82 uri: EndpointType::Uri(uri.clone()),
83 fallback_uri: uri,
84 origin: None,
85 user_agent: None,
86 concurrency_limit: None,
87 rate_limit: None,
88 timeout: None,
89 #[cfg(feature = "_tls-any")]
90 tls: None,
91 buffer_size: None,
92 init_stream_window_size: None,
93 init_connection_window_size: None,
94 max_frame_size: None,
95 tcp_keepalive: None,
96 tcp_keepalive_interval: None,
97 tcp_keepalive_retries: None,
98 tcp_nodelay: true,
99 http2_keep_alive_interval: None,
100 http2_keep_alive_timeout: None,
101 http2_keep_alive_while_idle: None,
102 http2_header_table_size: None,
103 http2_max_header_list_size: None,
104 connect_timeout: None,
105 http2_adaptive_window: None,
106 executor: SharedExec::tokio(),
107 local_address: None,
108 }
109 }
110
111 fn new_uds(uds_filepath: &str) -> Self {
112 Self {
113 uri: EndpointType::Uds(uds_filepath.to_string()),
114 fallback_uri: Uri::from_static("http://tonic"),
115 origin: None,
116 user_agent: None,
117 concurrency_limit: None,
118 rate_limit: None,
119 timeout: None,
120 #[cfg(feature = "_tls-any")]
121 tls: None,
122 buffer_size: None,
123 init_stream_window_size: None,
124 init_connection_window_size: None,
125 max_frame_size: None,
126 tcp_keepalive: None,
127 tcp_keepalive_interval: None,
128 tcp_keepalive_retries: None,
129 tcp_nodelay: true,
130 http2_keep_alive_interval: None,
131 http2_keep_alive_timeout: None,
132 http2_keep_alive_while_idle: None,
133 http2_header_table_size: None,
134 http2_max_header_list_size: None,
135 connect_timeout: None,
136 http2_adaptive_window: None,
137 executor: SharedExec::tokio(),
138 local_address: None,
139 }
140 }
141
142 pub fn from_static(s: &'static str) -> Self {
153 if s.starts_with("unix:") {
154 let uds_filepath = s
155 .strip_prefix("unix://")
156 .or_else(|| s.strip_prefix("unix:"))
157 .expect("Invalid unix domain socket URI");
158 Self::new_uds(uds_filepath)
159 } else {
160 let uri = Uri::from_static(s);
161 Self::new_uri(uri)
162 }
163 }
164
165 pub fn from_shared(s: impl Into<Bytes>) -> Result<Self, Error> {
172 let s = str::from_utf8(&s.into())
173 .map_err(|e| Error::new_invalid_uri().with(e))?
174 .to_string();
175 if s.starts_with("unix:") {
176 let uds_filepath = s
177 .strip_prefix("unix://")
178 .or_else(|| s.strip_prefix("unix:"))
179 .ok_or(Error::new_invalid_uri())?;
180 Ok(Self::new_uds(uds_filepath))
181 } else {
182 let uri = Uri::from_maybe_shared(s).map_err(|e| Error::new_invalid_uri().with(e))?;
183 Ok(Self::from(uri))
184 }
185 }
186
187 pub fn user_agent<T>(self, user_agent: T) -> Result<Self, Error>
199 where
200 T: TryInto<HeaderValue>,
201 {
202 user_agent
203 .try_into()
204 .map(|ua| Endpoint {
205 user_agent: Some(ua),
206 ..self
207 })
208 .map_err(|_| Error::new_invalid_user_agent())
209 }
210
211 pub fn origin(self, origin: Uri) -> Self {
224 Endpoint {
225 origin: Some(origin),
226 ..self
227 }
228 }
229
230 pub fn timeout(self, dur: Duration) -> Self {
247 Endpoint {
248 timeout: Some(dur),
249 ..self
250 }
251 }
252
253 pub fn connect_timeout(self, dur: Duration) -> Self {
264 Endpoint {
265 connect_timeout: Some(dur),
266 ..self
267 }
268 }
269
270 pub fn tcp_keepalive(self, tcp_keepalive: Option<Duration>) -> Self {
278 Endpoint {
279 tcp_keepalive,
280 ..self
281 }
282 }
283
284 pub fn tcp_keepalive_interval(self, tcp_keepalive_interval: Option<Duration>) -> Self {
291 Endpoint {
292 tcp_keepalive_interval,
293 ..self
294 }
295 }
296
297 pub fn tcp_keepalive_retries(self, tcp_keepalive_retries: Option<u32>) -> Self {
303 Endpoint {
304 tcp_keepalive_retries,
305 ..self
306 }
307 }
308
309 pub fn concurrency_limit(self, limit: usize) -> Self {
317 Endpoint {
318 concurrency_limit: Some(limit),
319 ..self
320 }
321 }
322
323 pub fn rate_limit(self, limit: u64, duration: Duration) -> Self {
332 Endpoint {
333 rate_limit: Some((limit, duration)),
334 ..self
335 }
336 }
337
338 pub fn initial_stream_window_size(self, sz: impl Into<Option<u32>>) -> Self {
345 Endpoint {
346 init_stream_window_size: sz.into(),
347 ..self
348 }
349 }
350
351 pub fn initial_connection_window_size(self, sz: impl Into<Option<u32>>) -> Self {
355 Endpoint {
356 init_connection_window_size: sz.into(),
357 ..self
358 }
359 }
360
361 pub fn buffer_size(self, sz: impl Into<Option<usize>>) -> Self {
365 Endpoint {
366 buffer_size: sz.into(),
367 ..self
368 }
369 }
370
371 #[cfg(feature = "_tls-any")]
373 pub fn tls_config(self, tls_config: ClientTlsConfig) -> Result<Self, Error> {
374 match &self.uri {
375 EndpointType::Uri(uri) => Ok(Endpoint {
376 tls: Some(
377 tls_config
378 .into_tls_connector(uri)
379 .map_err(Error::from_source)?,
380 ),
381 ..self
382 }),
383 EndpointType::Uds(_) => Err(Error::new(error::Kind::InvalidTlsConfigForUds)),
384 }
385 }
386
387 #[cfg(feature = "_tls-any")]
409 pub fn tls_config_with_verifier(
410 self,
411 tls_config: ClientTlsConfig,
412 verifier: Arc<dyn ServerCertVerifier>,
413 ) -> Result<Self, Error> {
414 match &self.uri {
415 EndpointType::Uri(uri) => Ok(Endpoint {
416 tls: Some(
417 tls_config
418 .into_tls_connector_with_verifier(uri, verifier)
419 .map_err(Error::from_source)?,
420 ),
421 ..self
422 }),
423 EndpointType::Uds(_) => Err(Error::new(error::Kind::InvalidTlsConfigForUds)),
424 }
425 }
426
427 pub fn tcp_nodelay(self, enabled: bool) -> Self {
429 Endpoint {
430 tcp_nodelay: enabled,
431 ..self
432 }
433 }
434
435 pub fn http2_keep_alive_interval(self, interval: Duration) -> Self {
437 Endpoint {
438 http2_keep_alive_interval: Some(interval),
439 ..self
440 }
441 }
442
443 pub fn keep_alive_timeout(self, duration: Duration) -> Self {
445 Endpoint {
446 http2_keep_alive_timeout: Some(duration),
447 ..self
448 }
449 }
450
451 pub fn keep_alive_while_idle(self, enabled: bool) -> Self {
453 Endpoint {
454 http2_keep_alive_while_idle: Some(enabled),
455 ..self
456 }
457 }
458
459 pub fn http2_adaptive_window(self, enabled: bool) -> Self {
461 Endpoint {
462 http2_adaptive_window: Some(enabled),
463 ..self
464 }
465 }
466
467 pub fn http2_header_table_size(self, size: u32) -> Self {
474 Endpoint {
475 http2_header_table_size: Some(size),
476 ..self
477 }
478 }
479
480 pub fn http2_max_header_list_size(self, size: u32) -> Self {
484 Endpoint {
485 http2_max_header_list_size: Some(size),
486 ..self
487 }
488 }
489
490 #[must_use]
504 pub fn max_frame_size(self, frame_size: impl Into<Option<u32>>) -> Self {
505 Endpoint {
506 max_frame_size: frame_size.into(),
507 ..self
508 }
509 }
510
511 pub fn executor<E>(mut self, executor: E) -> Self
515 where
516 E: Executor<Pin<Box<dyn Future<Output = ()> + Send>>> + Send + Sync + 'static,
517 {
518 self.executor = SharedExec::new(executor);
519 self
520 }
521
522 pub(crate) fn connector<C>(&self, c: C) -> service::Connector<C> {
523 service::Connector::new(
524 c,
525 #[cfg(feature = "_tls-any")]
526 self.tls.clone(),
527 )
528 }
529
530 pub fn local_address(self, addr: Option<IpAddr>) -> Self {
534 Endpoint {
535 local_address: addr,
536 ..self
537 }
538 }
539
540 pub(crate) fn http_connector(&self) -> service::Connector<HttpConnector> {
541 let mut http = HttpConnector::new();
542 http.enforce_http(false);
543 http.set_nodelay(self.tcp_nodelay);
544 http.set_keepalive(self.tcp_keepalive);
545 http.set_keepalive_interval(self.tcp_keepalive_interval);
546 http.set_keepalive_retries(self.tcp_keepalive_retries);
547 http.set_connect_timeout(self.connect_timeout);
548 http.set_local_address(self.local_address);
549 self.connector(http)
550 }
551
552 pub(crate) fn uds_connector(&self, uds_filepath: &str) -> service::Connector<UdsConnector> {
553 self.connector(UdsConnector::new(uds_filepath))
554 }
555
556 pub async fn connect(&self) -> Result<Channel, Error> {
558 match &self.uri {
559 EndpointType::Uri(_) => Channel::connect(self.http_connector(), self.clone()).await,
560 EndpointType::Uds(uds_filepath) => {
561 Channel::connect(self.uds_connector(uds_filepath.as_str()), self.clone()).await
562 }
563 }
564 }
565
566 pub fn connect_lazy(&self) -> Channel {
571 match &self.uri {
572 EndpointType::Uri(_) => Channel::new(self.http_connector(), self.clone()),
573 EndpointType::Uds(uds_filepath) => {
574 Channel::new(self.uds_connector(uds_filepath.as_str()), self.clone())
575 }
576 }
577 }
578
579 pub async fn connect_with_connector<C>(&self, connector: C) -> Result<Channel, Error>
587 where
588 C: Service<Uri> + Send + 'static,
589 C::Response: rt::Read + rt::Write + Send + Unpin,
590 C::Future: Send,
591 crate::BoxError: From<C::Error> + Send,
592 {
593 let connector = self.connector(connector);
594
595 if let Some(connect_timeout) = self.connect_timeout {
596 let mut connector = hyper_timeout::TimeoutConnector::new(connector);
597 connector.set_connect_timeout(Some(connect_timeout));
598 Channel::connect(connector, self.clone()).await
599 } else {
600 Channel::connect(connector, self.clone()).await
601 }
602 }
603
604 pub fn connect_with_connector_lazy<C>(&self, connector: C) -> Channel
612 where
613 C: Service<Uri> + Send + 'static,
614 C::Response: rt::Read + rt::Write + Send + Unpin,
615 C::Future: Send,
616 crate::BoxError: From<C::Error> + Send,
617 {
618 let connector = self.connector(connector);
619 if let Some(connect_timeout) = self.connect_timeout {
620 let mut connector = hyper_timeout::TimeoutConnector::new(connector);
621 connector.set_connect_timeout(Some(connect_timeout));
622 Channel::new(connector, self.clone())
623 } else {
624 Channel::new(connector, self.clone())
625 }
626 }
627
628 pub fn uri(&self) -> &Uri {
638 match &self.uri {
639 EndpointType::Uri(uri) => uri,
640 EndpointType::Uds(_) => &self.fallback_uri,
641 }
642 }
643
644 pub fn get_tcp_nodelay(&self) -> bool {
646 self.tcp_nodelay
647 }
648
649 pub fn get_connect_timeout(&self) -> Option<Duration> {
651 self.connect_timeout
652 }
653
654 pub fn get_tcp_keepalive(&self) -> Option<Duration> {
660 self.tcp_keepalive
661 }
662
663 pub fn get_tcp_keepalive_interval(&self) -> Option<Duration> {
665 self.tcp_keepalive_interval
666 }
667
668 pub fn get_tcp_keepalive_retries(&self) -> Option<u32> {
670 self.tcp_keepalive_retries
671 }
672}
673
674impl From<Uri> for Endpoint {
675 fn from(uri: Uri) -> Self {
676 Self::new_uri(uri)
677 }
678}
679
680impl TryFrom<Bytes> for Endpoint {
681 type Error = Error;
682
683 fn try_from(t: Bytes) -> Result<Self, Self::Error> {
684 Self::from_shared(t)
685 }
686}
687
688impl TryFrom<String> for Endpoint {
689 type Error = Error;
690
691 fn try_from(t: String) -> Result<Self, Self::Error> {
692 Self::from_shared(t.into_bytes())
693 }
694}
695
696impl TryFrom<&'static str> for Endpoint {
697 type Error = Error;
698
699 fn try_from(t: &'static str) -> Result<Self, Self::Error> {
700 Self::from_shared(t.as_bytes())
701 }
702}
703
704impl fmt::Debug for Endpoint {
705 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
706 f.debug_struct("Endpoint").finish()
707 }
708}
709
710impl FromStr for Endpoint {
711 type Err = Error;
712
713 fn from_str(s: &str) -> Result<Self, Self::Err> {
714 Self::try_from(s.to_string())
715 }
716}