1use bytes::Bytes;
2use chrono::{DateTime, Utc};
3use http::{header::HeaderMap, Request, Response};
4use hyper::{
5 body::Incoming,
6 rt::{Read, Write},
7};
8use hyper_timeout::TimeoutConnector;
9
10use hyper_util::{
11 client::legacy::connect::{Connection, HttpConnector},
12 rt::TokioExecutor,
13};
14
15use std::time::Duration;
16use tower::{util::BoxService, BoxError, Layer, Service, ServiceBuilder};
17use tower_http::{
18 classify::ServerErrorsFailureClass, map_response_body::MapResponseBodyLayer, trace::TraceLayer,
19};
20use tracing::Span;
21
22use super::body::Body;
23use crate::{client::ConfigExt, Client, Config, Error, Result};
24
25pub type DynBody = dyn http_body::Body<Data = Bytes, Error = BoxError> + Send + Unpin;
29
30pub struct ClientBuilder<Svc> {
32 service: Svc,
33 default_ns: String,
34 valid_until: Option<DateTime<Utc>>,
35}
36
37impl<Svc> ClientBuilder<Svc> {
38 pub fn new(service: Svc, default_namespace: impl Into<String>) -> Self
43 where
44 Svc: Service<Request<Body>>,
45 {
46 Self {
47 service,
48 default_ns: default_namespace.into(),
49 valid_until: None,
50 }
51 }
52
53 pub fn with_layer<L: Layer<Svc>>(self, layer: &L) -> ClientBuilder<L::Service> {
55 let Self {
56 service: stack,
57 default_ns,
58 valid_until,
59 } = self;
60 ClientBuilder {
61 service: layer.layer(stack),
62 default_ns,
63 valid_until,
64 }
65 }
66
67 pub fn with_valid_until(self, valid_until: Option<DateTime<Utc>>) -> Self {
69 ClientBuilder {
70 service: self.service,
71 default_ns: self.default_ns,
72 valid_until,
73 }
74 }
75
76 pub fn build<B>(self) -> Client
78 where
79 Svc: Service<Request<Body>, Response = Response<B>> + Send + 'static,
80 Svc::Future: Send + 'static,
81 Svc::Error: Into<BoxError>,
82 B: http_body::Body<Data = bytes::Bytes> + Send + 'static,
83 B::Error: Into<BoxError>,
84 {
85 Client::new(self.service, self.default_ns).with_valid_until(self.valid_until)
86 }
87}
88
89pub type GenericService = BoxService<Request<Body>, Response<Box<DynBody>>, BoxError>;
90
91impl TryFrom<Config> for ClientBuilder<GenericService> {
92 type Error = Error;
93
94 fn try_from(config: Config) -> Result<Self> {
96 let mut connector = HttpConnector::new();
97 connector.enforce_http(false);
98
99 #[cfg(all(feature = "aws-lc-rs", feature = "rustls-tls"))]
100 {
101 if rustls::crypto::CryptoProvider::get_default().is_none() {
102 let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
105 }
106 }
107
108 match config.proxy_url.as_ref() {
109 Some(proxy_url) if proxy_url.scheme_str() == Some("socks5") => {
110 #[cfg(feature = "socks5")]
111 {
112 let connector = hyper_util::client::legacy::connect::proxy::SocksV5::new(
113 proxy_url.clone(),
114 connector,
115 );
116 make_generic_builder(connector, config)
117 }
118
119 #[cfg(not(feature = "socks5"))]
120 Err(Error::ProxyProtocolDisabled {
121 proxy_url: proxy_url.clone(),
122 protocol_feature: "kube/socks5",
123 })
124 }
125
126 Some(proxy_url) if proxy_url.scheme_str() == Some("http") => {
127 #[cfg(feature = "http-proxy")]
128 {
129 let connector =
130 hyper_util::client::legacy::connect::proxy::Tunnel::new(proxy_url.clone(), connector);
131 make_generic_builder(connector, config)
132 }
133
134 #[cfg(not(feature = "http-proxy"))]
135 Err(Error::ProxyProtocolDisabled {
136 proxy_url: proxy_url.clone(),
137 protocol_feature: "kube/http-proxy",
138 })
139 }
140
141 Some(proxy_url) => Err(Error::ProxyProtocolUnsupported {
142 proxy_url: proxy_url.clone(),
143 }),
144
145 None => make_generic_builder(connector, config),
146 }
147 }
148}
149
150fn make_generic_builder<H>(connector: H, config: Config) -> Result<ClientBuilder<GenericService>, Error>
153where
154 H: 'static + Clone + Send + Sync + Service<http::Uri>,
155 H::Response: 'static + Connection + Read + Write + Send + Unpin,
156 H::Future: 'static + Send,
157 H::Error: 'static + Send + Sync + std::error::Error,
158{
159 let default_ns = config.default_namespace.clone();
160 let auth_layer = config.auth_layer()?;
161
162 let client: hyper_util::client::legacy::Client<_, Body> = {
163 #[cfg(feature = "rustls-tls")]
169 let connector = config.rustls_https_connector_with_connector(connector)?;
170 #[cfg(all(not(feature = "rustls-tls"), feature = "openssl-tls"))]
171 let connector = config.openssl_https_connector_with_connector(connector)?;
172 #[cfg(all(not(feature = "rustls-tls"), not(feature = "openssl-tls")))]
173 if config.cluster_url.scheme() == Some(&http::uri::Scheme::HTTPS) {
174 return Err(Error::TlsRequired);
176 }
177
178 let mut connector = TimeoutConnector::new(connector);
179
180 connector.set_connect_timeout(config.connect_timeout);
182 connector.set_read_timeout(config.read_timeout);
183 connector.set_write_timeout(config.write_timeout);
184
185 hyper_util::client::legacy::Builder::new(TokioExecutor::new()).build(connector)
186 };
187
188 let stack = ServiceBuilder::new().layer(config.base_uri_layer()).into_inner();
189 #[cfg(feature = "gzip")]
190 let stack = ServiceBuilder::new()
191 .layer(stack)
192 .layer(
193 tower_http::decompression::DecompressionLayer::new()
194 .no_br()
195 .no_deflate()
196 .no_zstd()
197 .gzip(!config.disable_compression),
198 )
199 .into_inner();
200
201 let service = ServiceBuilder::new()
202 .layer(stack)
203 .option_layer(auth_layer)
204 .layer(config.extra_headers_layer()?)
205 .layer(
206 TraceLayer::new_for_http()
209 .make_span_with(|req: &Request<Body>| {
210 tracing::debug_span!(
211 "HTTP",
212 http.method = %req.method(),
213 http.url = %req.uri(),
214 http.status_code = tracing::field::Empty,
215 otel.name = req.extensions().get::<&'static str>().unwrap_or(&"HTTP"),
216 otel.kind = "client",
217 otel.status_code = tracing::field::Empty,
218 )
219 })
220 .on_request(|_req: &Request<Body>, _span: &Span| {
221 tracing::debug!("requesting");
222 })
223 .on_response(|res: &Response<Incoming>, _latency: Duration, span: &Span| {
224 let status = res.status();
225 span.record("http.status_code", status.as_u16());
226 if status.is_client_error() || status.is_server_error() {
227 span.record("otel.status_code", "ERROR");
228 }
229 })
230 .on_body_chunk(())
232 .on_eos(|_: Option<&HeaderMap>, _duration: Duration, _span: &Span| {
233 tracing::debug!("stream closed");
234 })
235 .on_failure(|ec: ServerErrorsFailureClass, _latency: Duration, span: &Span| {
236 span.record("otel.status_code", "ERROR");
242 match ec {
243 ServerErrorsFailureClass::StatusCode(status) => {
244 span.record("http.status_code", status.as_u16());
245 tracing::error!("failed with status {}", status)
246 }
247 ServerErrorsFailureClass::Error(err) => {
248 tracing::error!("failed with error {}", err)
249 }
250 }
251 }),
252 )
253 .map_err(BoxError::from)
254 .service(client);
255
256 let (_, expiration) = config.exec_identity_pem();
257
258 let client = ClientBuilder::new(
259 BoxService::new(
260 MapResponseBodyLayer::new(|body| {
261 Box::new(http_body_util::BodyExt::map_err(body, BoxError::from)) as Box<DynBody>
262 })
263 .layer(service),
264 ),
265 default_ns,
266 )
267 .with_valid_until(expiration);
268
269 Ok(client)
270}
271
272#[cfg(test)]
273mod tests {
274 #[cfg(feature = "gzip")] use super::*;
275
276 #[cfg(feature = "gzip")]
277 #[tokio::test]
278 async fn test_no_accept_encoding_header_sent_when_compression_disabled(
279 ) -> Result<(), Box<dyn std::error::Error>> {
280 use http::Uri;
281 use std::net::SocketAddr;
282 use tokio::net::{TcpListener, TcpStream};
283
284 let addr: SocketAddr = ([127, 0, 0, 1], 0).into();
286 let listener = TcpListener::bind(addr).await?;
287 let local_addr = listener.local_addr()?;
288 let uri: Uri = format!("http://{}", local_addr).parse()?;
289
290 tokio::spawn(async move {
291 use http_body_util::Full;
292 use hyper::{server::conn::http1, service::service_fn};
293 use hyper_util::rt::{TokioIo, TokioTimer};
294 use std::convert::Infallible;
295
296 loop {
297 let (tcp, _) = listener.accept().await.unwrap();
298 let io: TokioIo<TcpStream> = TokioIo::new(tcp);
299
300 tokio::spawn(async move {
301 let _ = http1::Builder::new()
302 .timer(TokioTimer::new())
303 .serve_connection(
304 io,
305 service_fn(|req| async move {
306 let response = req
307 .headers()
308 .get(http::header::ACCEPT_ENCODING)
309 .map(|b| Bytes::copy_from_slice(b.as_bytes()))
310 .unwrap_or_default();
311 Ok::<_, Infallible>(Response::new(Full::new(response)))
312 }),
313 )
314 .await
315 .unwrap();
316 });
317 }
318 });
319
320 let config = Config { ..Config::new(uri) };
322 let client = make_generic_builder(HttpConnector::new(), config.clone())?.build();
323 let response = client.request_text(http::Request::default()).await?;
324 assert_eq!(&response, "gzip");
325
326 let config = Config {
328 disable_compression: true,
329 ..config
330 };
331 let client = make_generic_builder(HttpConnector::new(), config)?.build();
332 let response = client.request_text(http::Request::default()).await?;
333 assert_eq!(&response, "");
334
335 Ok(())
336 }
337}