1#[cfg(feature = "tls")]
2use super::service::TlsConnector;
3use super::service::{self, Executor, SharedExec};
4use super::Channel;
5#[cfg(feature = "tls")]
6use super::ClientTlsConfig;
7use crate::transport::Error;
8use bytes::Bytes;
9use http::{uri::Uri, HeaderValue};
10use hyper::rt;
11use hyper_util::client::legacy::connect::HttpConnector;
12use std::{fmt, future::Future, pin::Pin, str::FromStr, time::Duration};
13use tower_service::Service;
14
15#[derive(Clone)]
19pub struct Endpoint {
20 pub(crate) uri: Uri,
21 pub(crate) origin: Option<Uri>,
22 pub(crate) user_agent: Option<HeaderValue>,
23 pub(crate) timeout: Option<Duration>,
24 pub(crate) concurrency_limit: Option<usize>,
25 pub(crate) rate_limit: Option<(u64, Duration)>,
26 #[cfg(feature = "tls")]
27 pub(crate) tls: Option<TlsConnector>,
28 pub(crate) buffer_size: Option<usize>,
29 pub(crate) init_stream_window_size: Option<u32>,
30 pub(crate) init_connection_window_size: Option<u32>,
31 pub(crate) tcp_keepalive: Option<Duration>,
32 pub(crate) tcp_nodelay: bool,
33 pub(crate) http2_keep_alive_interval: Option<Duration>,
34 pub(crate) http2_keep_alive_timeout: Option<Duration>,
35 pub(crate) http2_keep_alive_while_idle: Option<bool>,
36 pub(crate) http2_max_header_list_size: Option<u32>,
37 pub(crate) connect_timeout: Option<Duration>,
38 pub(crate) http2_adaptive_window: Option<bool>,
39 pub(crate) executor: SharedExec,
40}
41
42impl Endpoint {
43 #[doc(hidden)]
46 pub fn new<D>(dst: D) -> Result<Self, Error>
47 where
48 D: TryInto<Self>,
49 D::Error: Into<crate::Error>,
50 {
51 let me = dst.try_into().map_err(|e| Error::from_source(e.into()))?;
52 #[cfg(feature = "tls")]
53 if me.uri.scheme() == Some(&http::uri::Scheme::HTTPS) {
54 return me.tls_config(ClientTlsConfig::new().with_enabled_roots());
55 }
56
57 Ok(me)
58 }
59
60 pub fn from_static(s: &'static str) -> Self {
71 let uri = Uri::from_static(s);
72 Self::from(uri)
73 }
74
75 pub fn from_shared(s: impl Into<Bytes>) -> Result<Self, Error> {
82 let uri = Uri::from_maybe_shared(s.into()).map_err(|e| Error::new_invalid_uri().with(e))?;
83 Ok(Self::from(uri))
84 }
85
86 pub fn user_agent<T>(self, user_agent: T) -> Result<Self, Error>
98 where
99 T: TryInto<HeaderValue>,
100 {
101 user_agent
102 .try_into()
103 .map(|ua| Endpoint {
104 user_agent: Some(ua),
105 ..self
106 })
107 .map_err(|_| Error::new_invalid_user_agent())
108 }
109
110 pub fn origin(self, origin: Uri) -> Self {
123 Endpoint {
124 origin: Some(origin),
125 ..self
126 }
127 }
128
129 pub fn timeout(self, dur: Duration) -> Self {
146 Endpoint {
147 timeout: Some(dur),
148 ..self
149 }
150 }
151
152 pub fn connect_timeout(self, dur: Duration) -> Self {
163 Endpoint {
164 connect_timeout: Some(dur),
165 ..self
166 }
167 }
168
169 pub fn tcp_keepalive(self, tcp_keepalive: Option<Duration>) -> Self {
178 Endpoint {
179 tcp_keepalive,
180 ..self
181 }
182 }
183
184 pub fn concurrency_limit(self, limit: usize) -> Self {
192 Endpoint {
193 concurrency_limit: Some(limit),
194 ..self
195 }
196 }
197
198 pub fn rate_limit(self, limit: u64, duration: Duration) -> Self {
207 Endpoint {
208 rate_limit: Some((limit, duration)),
209 ..self
210 }
211 }
212
213 pub fn initial_stream_window_size(self, sz: impl Into<Option<u32>>) -> Self {
220 Endpoint {
221 init_stream_window_size: sz.into(),
222 ..self
223 }
224 }
225
226 pub fn initial_connection_window_size(self, sz: impl Into<Option<u32>>) -> Self {
230 Endpoint {
231 init_connection_window_size: sz.into(),
232 ..self
233 }
234 }
235
236 pub fn buffer_size(self, sz: impl Into<Option<usize>>) -> Self {
240 Endpoint {
241 buffer_size: sz.into(),
242 ..self
243 }
244 }
245
246 #[cfg(feature = "tls")]
248 pub fn tls_config(self, tls_config: ClientTlsConfig) -> Result<Self, Error> {
249 Ok(Endpoint {
250 tls: Some(
251 tls_config
252 .into_tls_connector(&self.uri)
253 .map_err(Error::from_source)?,
254 ),
255 ..self
256 })
257 }
258
259 pub fn tcp_nodelay(self, enabled: bool) -> Self {
261 Endpoint {
262 tcp_nodelay: enabled,
263 ..self
264 }
265 }
266
267 pub fn http2_keep_alive_interval(self, interval: Duration) -> Self {
269 Endpoint {
270 http2_keep_alive_interval: Some(interval),
271 ..self
272 }
273 }
274
275 pub fn keep_alive_timeout(self, duration: Duration) -> Self {
277 Endpoint {
278 http2_keep_alive_timeout: Some(duration),
279 ..self
280 }
281 }
282
283 pub fn keep_alive_while_idle(self, enabled: bool) -> Self {
285 Endpoint {
286 http2_keep_alive_while_idle: Some(enabled),
287 ..self
288 }
289 }
290
291 pub fn http2_adaptive_window(self, enabled: bool) -> Self {
293 Endpoint {
294 http2_adaptive_window: Some(enabled),
295 ..self
296 }
297 }
298
299 pub fn http2_max_header_list_size(self, size: u32) -> Self {
303 Endpoint {
304 http2_max_header_list_size: Some(size),
305 ..self
306 }
307 }
308
309 pub fn executor<E>(mut self, executor: E) -> Self
313 where
314 E: Executor<Pin<Box<dyn Future<Output = ()> + Send>>> + Send + Sync + 'static,
315 {
316 self.executor = SharedExec::new(executor);
317 self
318 }
319
320 pub(crate) fn connector<C>(&self, c: C) -> service::Connector<C> {
321 service::Connector::new(
322 c,
323 #[cfg(feature = "tls")]
324 self.tls.clone(),
325 )
326 }
327
328 pub async fn connect(&self) -> Result<Channel, Error> {
330 let mut http = HttpConnector::new();
331 http.enforce_http(false);
332 http.set_nodelay(self.tcp_nodelay);
333 http.set_keepalive(self.tcp_keepalive);
334 http.set_connect_timeout(self.connect_timeout);
335
336 let connector = self.connector(http);
337
338 Channel::connect(connector, self.clone()).await
339 }
340
341 pub fn connect_lazy(&self) -> Channel {
346 let mut http = HttpConnector::new();
347 http.enforce_http(false);
348 http.set_nodelay(self.tcp_nodelay);
349 http.set_keepalive(self.tcp_keepalive);
350 http.set_connect_timeout(self.connect_timeout);
351
352 let connector = self.connector(http);
353
354 Channel::new(connector, self.clone())
355 }
356
357 pub async fn connect_with_connector<C>(&self, connector: C) -> Result<Channel, Error>
365 where
366 C: Service<Uri> + Send + 'static,
367 C::Response: rt::Read + rt::Write + Send + Unpin,
368 C::Future: Send,
369 crate::Error: From<C::Error> + Send,
370 {
371 let connector = self.connector(connector);
372
373 if let Some(connect_timeout) = self.connect_timeout {
374 let mut connector = hyper_timeout::TimeoutConnector::new(connector);
375 connector.set_connect_timeout(Some(connect_timeout));
376 Channel::connect(connector, self.clone()).await
377 } else {
378 Channel::connect(connector, self.clone()).await
379 }
380 }
381
382 pub fn connect_with_connector_lazy<C>(&self, connector: C) -> Channel
390 where
391 C: Service<Uri> + Send + 'static,
392 C::Response: rt::Read + rt::Write + Send + Unpin,
393 C::Future: Send,
394 crate::Error: From<C::Error> + Send,
395 {
396 let connector = self.connector(connector);
397 if let Some(connect_timeout) = self.connect_timeout {
398 let mut connector = hyper_timeout::TimeoutConnector::new(connector);
399 connector.set_connect_timeout(Some(connect_timeout));
400 Channel::new(connector, self.clone())
401 } else {
402 Channel::new(connector, self.clone())
403 }
404 }
405
406 pub fn uri(&self) -> &Uri {
416 &self.uri
417 }
418}
419
420impl From<Uri> for Endpoint {
421 fn from(uri: Uri) -> Self {
422 Self {
423 uri,
424 origin: None,
425 user_agent: None,
426 concurrency_limit: None,
427 rate_limit: None,
428 timeout: None,
429 #[cfg(feature = "tls")]
430 tls: None,
431 buffer_size: None,
432 init_stream_window_size: None,
433 init_connection_window_size: None,
434 tcp_keepalive: None,
435 tcp_nodelay: true,
436 http2_keep_alive_interval: None,
437 http2_keep_alive_timeout: None,
438 http2_keep_alive_while_idle: None,
439 http2_max_header_list_size: None,
440 connect_timeout: None,
441 http2_adaptive_window: None,
442 executor: SharedExec::tokio(),
443 }
444 }
445}
446
447impl TryFrom<Bytes> for Endpoint {
448 type Error = Error;
449
450 fn try_from(t: Bytes) -> Result<Self, Self::Error> {
451 Self::from_shared(t)
452 }
453}
454
455impl TryFrom<String> for Endpoint {
456 type Error = Error;
457
458 fn try_from(t: String) -> Result<Self, Self::Error> {
459 Self::from_shared(t.into_bytes())
460 }
461}
462
463impl TryFrom<&'static str> for Endpoint {
464 type Error = Error;
465
466 fn try_from(t: &'static str) -> Result<Self, Self::Error> {
467 Self::from_shared(t.as_bytes())
468 }
469}
470
471impl fmt::Debug for Endpoint {
472 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
473 f.debug_struct("Endpoint").finish()
474 }
475}
476
477impl FromStr for Endpoint {
478 type Err = Error;
479
480 fn from_str(s: &str) -> Result<Self, Self::Err> {
481 Self::try_from(s.to_string())
482 }
483}