1use either::{Either, Left, Right};
11use futures::{AsyncBufRead, StreamExt, TryStream, TryStreamExt, future::BoxFuture};
12use http::{self, Request, Response};
13use http_body_util::BodyExt;
14#[cfg(feature = "ws")] use hyper_util::rt::TokioIo;
15use jiff::Timestamp;
16use k8s_openapi::apimachinery::pkg::apis::meta::v1 as k8s_meta_v1;
17use kube_core::{discovery::v2::ACCEPT_AGGREGATED_DISCOVERY_V2, response::Status};
18use serde::de::DeserializeOwned;
19use serde_json::{self, Value};
20#[cfg(feature = "ws")]
21use tokio_tungstenite::{WebSocketStream, tungstenite as ws};
22use tokio_util::{
23 codec::{FramedRead, LinesCodec, LinesCodecError},
24 io::StreamReader,
25};
26use tower::{BoxError, Layer, Service, ServiceExt, buffer::Buffer, util::BoxService};
27use tower_http::map_response_body::MapResponseBodyLayer;
28
29pub use self::body::Body;
30use crate::{Config, Error, Result, api::WatchEvent, config::Kubeconfig};
31
32mod auth;
33mod body;
34mod builder;
35pub use kube_core::discovery::v2::{
36 APIGroupDiscovery, APIGroupDiscoveryList, APIResourceDiscovery, APISubresourceDiscovery,
37 APIVersionDiscovery, GroupVersionKind as DiscoveryGroupVersionKind,
38};
39#[cfg_attr(docsrs, doc(cfg(feature = "unstable-client")))]
40#[cfg(feature = "unstable-client")]
41mod client_ext;
42#[cfg_attr(docsrs, doc(cfg(feature = "unstable-client")))]
43#[cfg(feature = "unstable-client")]
44pub use client_ext::scope;
45mod config_ext;
46pub use auth::Error as AuthError;
47pub use config_ext::ConfigExt;
48pub mod middleware;
49pub mod retry;
50
51#[cfg(any(feature = "rustls-tls", feature = "openssl-tls"))] mod tls;
52
53#[cfg(feature = "openssl-tls")]
54pub use tls::openssl_tls::Error as OpensslTlsError;
55#[cfg(feature = "rustls-tls")] pub use tls::rustls_tls::Error as RustlsTlsError;
56#[cfg(feature = "ws")] mod upgrade;
57
58#[cfg(feature = "oauth")]
59#[cfg_attr(docsrs, doc(cfg(feature = "oauth")))]
60pub use auth::OAuthError;
61
62#[cfg(feature = "oidc")]
63#[cfg_attr(docsrs, doc(cfg(feature = "oidc")))]
64pub use auth::oidc_errors;
65
66#[cfg(feature = "ws")] pub use upgrade::UpgradeConnectionError;
67
68#[cfg(feature = "kubelet-debug")]
69#[cfg_attr(docsrs, doc(cfg(feature = "kubelet-debug")))]
70mod kubelet_debug;
71
72pub use builder::{ClientBuilder, DynBody};
73
74#[cfg_attr(docsrs, doc(cfg(feature = "client")))]
81#[derive(Clone)]
82pub struct Client {
83 inner: Buffer<Request<Body>, BoxFuture<'static, Result<Response<Body>, BoxError>>>,
86 default_ns: String,
87 valid_until: Option<Timestamp>,
88}
89
90#[cfg(feature = "ws")]
93#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
94pub struct Connection {
95 stream: WebSocketStream<TokioIo<hyper::upgrade::Upgraded>>,
96 protocol: upgrade::StreamProtocol,
97}
98
99#[cfg(feature = "ws")]
100#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
101impl Connection {
102 pub fn supports_stream_close(&self) -> bool {
104 self.protocol.supports_stream_close()
105 }
106
107 pub fn into_stream(self) -> WebSocketStream<TokioIo<hyper::upgrade::Upgraded>> {
109 self.stream
110 }
111}
112
113impl Client {
119 pub fn new<S, B, T>(service: S, default_namespace: T) -> Self
149 where
150 S: Service<Request<Body>, Response = Response<B>> + Send + 'static,
151 S::Future: Send + 'static,
152 S::Error: Into<BoxError>,
153 B: http_body::Body<Data = bytes::Bytes> + Send + 'static,
154 B::Error: Into<BoxError>,
155 T: Into<String>,
156 {
157 let service = MapResponseBodyLayer::new(Body::wrap_body)
159 .layer(service)
160 .map_err(|e| e.into());
161 Self {
162 inner: Buffer::new(BoxService::new(service), 1024),
163 default_ns: default_namespace.into(),
164 valid_until: None,
165 }
166 }
167
168 pub fn with_valid_until(self, valid_until: Option<Timestamp>) -> Self {
170 Client { valid_until, ..self }
171 }
172
173 pub fn valid_until(&self) -> &Option<Timestamp> {
175 &self.valid_until
176 }
177
178 pub async fn try_default() -> Result<Self> {
196 Self::try_from(Config::infer().await.map_err(Error::InferConfig)?)
197 }
198
199 pub fn default_namespace(&self) -> &str {
205 &self.default_ns
206 }
207
208 pub async fn send(&self, request: Request<Body>) -> Result<Response<Body>> {
212 let mut svc = self.inner.clone();
213 let res = svc
214 .ready()
215 .await
216 .map_err(Error::Service)?
217 .call(request)
218 .await
219 .map_err(|err| {
220 err.downcast::<Error>()
222 .map(|e| *e)
223 .or_else(|err| err.downcast::<hyper::Error>().map(|err| Error::HyperError(*err)))
225 .unwrap_or_else(Error::Service)
227 })?;
228 Ok(res)
229 }
230
231 #[cfg(feature = "ws")]
233 #[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
234 pub async fn connect(&self, request: Request<Vec<u8>>) -> Result<Connection> {
235 use http::header::HeaderValue;
236 let (mut parts, body) = request.into_parts();
237 parts
238 .headers
239 .insert(http::header::CONNECTION, HeaderValue::from_static("Upgrade"));
240 parts
241 .headers
242 .insert(http::header::UPGRADE, HeaderValue::from_static("websocket"));
243 parts.headers.insert(
244 http::header::SEC_WEBSOCKET_VERSION,
245 HeaderValue::from_static("13"),
246 );
247 let key = tokio_tungstenite::tungstenite::handshake::client::generate_key();
248 parts.headers.insert(
249 http::header::SEC_WEBSOCKET_KEY,
250 key.parse().expect("valid header value"),
251 );
252 upgrade::StreamProtocol::add_to_headers(&mut parts.headers)?;
253
254 let res = self.send(Request::from_parts(parts, Body::from(body))).await?;
255 let protocol = upgrade::verify_response(&res, &key).map_err(Error::UpgradeConnection)?;
256 match hyper::upgrade::on(res).await {
257 Ok(upgraded) => Ok(Connection {
258 stream: WebSocketStream::from_raw_socket(
259 TokioIo::new(upgraded),
260 ws::protocol::Role::Client,
261 None,
262 )
263 .await,
264 protocol,
265 }),
266
267 Err(e) => Err(Error::UpgradeConnection(
268 UpgradeConnectionError::GetPendingUpgrade(e),
269 )),
270 }
271 }
272
273 pub async fn request<T>(&self, request: Request<Vec<u8>>) -> Result<T>
276 where
277 T: DeserializeOwned,
278 {
279 let text = self.request_text(request).await?;
280
281 serde_json::from_str(&text).map_err(|e| {
282 tracing::warn!("{}, {:?}", text, e);
283 Error::SerdeError(e)
284 })
285 }
286
287 pub async fn request_text(&self, request: Request<Vec<u8>>) -> Result<String> {
290 let res = self.send(request.map(Body::from)).await?;
291 let res = handle_api_errors(res).await?;
292 let body_bytes = res.into_body().collect().await?.to_bytes();
293 let text = String::from_utf8(body_bytes.to_vec()).map_err(Error::FromUtf8)?;
294 Ok(text)
295 }
296
297 pub async fn request_stream(&self, request: Request<Vec<u8>>) -> Result<impl AsyncBufRead + use<>> {
302 let res = self.send(request.map(Body::from)).await?;
303 let res = handle_api_errors(res).await?;
304 let body = res.into_body().into_data_stream().map_err(std::io::Error::other);
307 Ok(body.into_async_read())
308 }
309
310 pub async fn request_status<T>(&self, request: Request<Vec<u8>>) -> Result<Either<T, Status>>
313 where
314 T: DeserializeOwned,
315 {
316 let text = self.request_text(request).await?;
317 let v: Value = serde_json::from_str(&text).map_err(Error::SerdeError)?;
319 if v["kind"] == "Status" {
320 tracing::trace!("Status from {}", text);
321 Ok(Right(serde_json::from_str::<Status>(&text).map_err(|e| {
322 tracing::warn!("{}, {:?}", text, e);
323 Error::SerdeError(e)
324 })?))
325 } else {
326 Ok(Left(serde_json::from_str::<T>(&text).map_err(|e| {
327 tracing::warn!("{}, {:?}", text, e);
328 Error::SerdeError(e)
329 })?))
330 }
331 }
332
333 pub async fn request_events<T>(
335 &self,
336 request: Request<Vec<u8>>,
337 ) -> Result<impl TryStream<Item = Result<WatchEvent<T>>> + use<T>>
338 where
339 T: Clone + DeserializeOwned,
340 {
341 let res = self.send(request.map(Body::from)).await?;
342 tracing::trace!("headers: {:?}", res.headers());
344
345 let frames = FramedRead::new(
346 StreamReader::new(res.into_body().into_data_stream().map_err(|e| {
347 if e.to_string().contains("unexpected EOF during chunk") {
350 return std::io::Error::new(std::io::ErrorKind::UnexpectedEof, e);
351 }
352 std::io::Error::other(e)
353 })),
354 LinesCodec::new(),
355 );
356
357 Ok(frames.filter_map(|res| async {
358 match res {
359 Ok(line) => match serde_json::from_str::<WatchEvent<T>>(&line) {
360 Ok(event) => Some(Ok(event)),
361 Err(e) => {
362 if e.is_eof() {
364 return None;
365 }
366
367 if let Ok(status) = serde_json::from_str::<Status>(&line) {
369 return Some(Err(Error::Api(status.boxed())));
370 }
371 Some(Err(Error::SerdeError(e)))
373 }
374 },
375
376 Err(LinesCodecError::Io(e)) => match e.kind() {
377 std::io::ErrorKind::TimedOut => {
379 tracing::warn!("timeout in poll: {}", e); None
381 }
382 std::io::ErrorKind::UnexpectedEof => {
385 tracing::warn!("eof in poll: {}", e);
386 None
387 }
388 _ => Some(Err(Error::ReadEvents(e))),
389 },
390
391 Err(LinesCodecError::MaxLineLengthExceeded) => {
394 Some(Err(Error::LinesCodecMaxLineLengthExceeded))
395 }
396 }
397 }))
398 }
399}
400
401impl Client {
407 pub async fn apiserver_version(&self) -> Result<k8s_openapi::apimachinery::pkg::version::Info> {
409 self.request(
410 Request::builder()
411 .uri("/version")
412 .body(vec![])
413 .map_err(Error::HttpError)?,
414 )
415 .await
416 }
417
418 pub async fn list_api_groups(&self) -> Result<k8s_meta_v1::APIGroupList> {
420 self.request(
421 Request::builder()
422 .uri("/apis")
423 .body(vec![])
424 .map_err(Error::HttpError)?,
425 )
426 .await
427 }
428
429 pub async fn list_api_group_resources(&self, apiversion: &str) -> Result<k8s_meta_v1::APIResourceList> {
448 let url = format!("/apis/{apiversion}");
449 self.request(
450 Request::builder()
451 .uri(url)
452 .body(vec![])
453 .map_err(Error::HttpError)?,
454 )
455 .await
456 }
457
458 pub async fn list_core_api_versions(&self) -> Result<k8s_meta_v1::APIVersions> {
460 self.request(
461 Request::builder()
462 .uri("/api")
463 .body(vec![])
464 .map_err(Error::HttpError)?,
465 )
466 .await
467 }
468
469 pub async fn list_core_api_resources(&self, version: &str) -> Result<k8s_meta_v1::APIResourceList> {
471 let url = format!("/api/{version}");
472 self.request(
473 Request::builder()
474 .uri(url)
475 .body(vec![])
476 .map_err(Error::HttpError)?,
477 )
478 .await
479 }
480}
481
482impl Client {
488 pub async fn list_api_groups_aggregated(&self) -> Result<APIGroupDiscoveryList> {
513 self.request(
514 Request::builder()
515 .uri("/apis")
516 .header(http::header::ACCEPT, ACCEPT_AGGREGATED_DISCOVERY_V2)
517 .body(vec![])
518 .map_err(Error::HttpError)?,
519 )
520 .await
521 }
522
523 pub async fn list_core_api_versions_aggregated(&self) -> Result<APIGroupDiscoveryList> {
546 self.request(
547 Request::builder()
548 .uri("/api")
549 .header(http::header::ACCEPT, ACCEPT_AGGREGATED_DISCOVERY_V2)
550 .body(vec![])
551 .map_err(Error::HttpError)?,
552 )
553 .await
554 }
555}
556
557async fn handle_api_errors(res: Response<Body>) -> Result<Response<Body>> {
565 let status = res.status();
566 if status.is_client_error() || status.is_server_error() {
567 let body_bytes = res.into_body().collect().await?.to_bytes();
569 let text = String::from_utf8(body_bytes.to_vec()).map_err(Error::FromUtf8)?;
570 if let Ok(status) = serde_json::from_str::<Status>(&text) {
573 tracing::debug!("Unsuccessful: {status:?}");
574 Err(Error::Api(status.boxed()))
575 } else {
576 tracing::warn!("Unsuccessful data error parse: {text}");
577 let status = Status::failure(&text, "Failed to parse error data").with_code(status.as_u16());
578 tracing::debug!("Unsuccessful: {status:?} (reconstruct)");
579 Err(Error::Api(status.boxed()))
580 }
581 } else {
582 Ok(res)
583 }
584}
585
586impl TryFrom<Config> for Client {
587 type Error = Error;
588
589 fn try_from(config: Config) -> Result<Self> {
593 Ok(ClientBuilder::try_from(config)?.build())
594 }
595}
596
597impl TryFrom<Kubeconfig> for Client {
598 type Error = Error;
599
600 fn try_from(kubeconfig: Kubeconfig) -> Result<Self> {
601 let config = Config::try_from(kubeconfig)?;
602 Ok(ClientBuilder::try_from(config)?.build())
603 }
604}
605
606#[cfg(test)]
607mod tests {
608 use std::pin::pin;
609
610 use crate::{
611 Api, Client,
612 client::Body,
613 config::{AuthInfo, Cluster, Context, Kubeconfig, NamedAuthInfo, NamedCluster, NamedContext},
614 };
615
616 use http::{Request, Response};
617 use k8s_openapi::api::core::v1::Pod;
618 use tower_test::mock;
619
620 #[tokio::test]
621 async fn test_default_ns() {
622 let (mock_service, _) = mock::pair::<Request<Body>, Response<Body>>();
623 let client = Client::new(mock_service, "test-namespace");
624 assert_eq!(client.default_namespace(), "test-namespace");
625 }
626
627 #[tokio::test]
628 async fn test_try_from_kubeconfig() {
629 let config = Kubeconfig {
630 current_context: Some("test-context".to_string()),
631 auth_infos: vec![NamedAuthInfo {
632 name: "test-user".to_string(),
633 auth_info: Some(AuthInfo::default()), }],
635 contexts: vec![NamedContext {
636 name: "test-context".to_string(),
637 context: Some(Context {
638 cluster: "test-cluster".to_string(),
639 user: Some("test-user".to_string()),
640 namespace: Some("test-namespace".to_string()),
641 ..Default::default()
642 }),
643 }],
644 clusters: vec![NamedCluster {
645 name: "test-cluster".to_string(),
646 cluster: Some(Cluster {
647 server: Some("http://localhost:8080".to_string()),
648 ..Default::default()
649 }),
650 }],
651 ..Default::default()
652 };
653 let client = Client::try_from(config).expect("Failed to create client from kubeconfig");
654 assert_eq!(client.default_namespace(), "test-namespace");
655 }
656
657 #[tokio::test]
658 async fn test_mock() {
659 let (mock_service, handle) = mock::pair::<Request<Body>, Response<Body>>();
660 let spawned = tokio::spawn(async move {
661 let mut handle = pin!(handle);
663 let (request, send) = handle.next_request().await.expect("service not called");
664 assert_eq!(request.method(), http::Method::GET);
665 assert_eq!(request.uri().to_string(), "/api/v1/namespaces/default/pods/test");
666 let pod: Pod = serde_json::from_value(serde_json::json!({
667 "apiVersion": "v1",
668 "kind": "Pod",
669 "metadata": {
670 "name": "test",
671 "annotations": { "kube-rs": "test" },
672 },
673 "spec": {
674 "containers": [{ "name": "test", "image": "test-image" }],
675 }
676 }))
677 .unwrap();
678 send.send_response(
679 Response::builder()
680 .body(Body::from(serde_json::to_vec(&pod).unwrap()))
681 .unwrap(),
682 );
683 });
684
685 let pods: Api<Pod> = Api::default_namespaced(Client::new(mock_service, "default"));
686 let pod = pods.get("test").await.unwrap();
687 assert_eq!(pod.metadata.annotations.unwrap().get("kube-rs").unwrap(), "test");
688 spawned.await.unwrap();
689 }
690}