Skip to main content

kube_client/client/
mod.rs

1//! API client for interacting with the Kubernetes API
2//!
3//! The [`Client`] uses standard kube error handling.
4//!
5//! This client can be used on its own or in conjuction with the [`Api`][crate::api::Api]
6//! type for more structured interaction with the kubernetes API.
7//!
8//! The [`Client`] can also be used with [`Discovery`](crate::Discovery) to dynamically
9//! retrieve the resources served by the kubernetes API.
10use either::{Either, Left, Right};
11use futures::{AsyncBufRead, StreamExt, TryStream, TryStreamExt, future::BoxFuture};
12use http::{self, Request, Response};
13use http_body_util::BodyExt;
14#[cfg(feature = "ws")] use hyper_util::rt::TokioIo;
15use jiff::Timestamp;
16use k8s_openapi::apimachinery::pkg::apis::meta::v1 as k8s_meta_v1;
17use kube_core::{discovery::v2::ACCEPT_AGGREGATED_DISCOVERY_V2, response::Status};
18use serde::de::DeserializeOwned;
19use serde_json::{self, Value};
20#[cfg(feature = "ws")]
21use tokio_tungstenite::{WebSocketStream, tungstenite as ws};
22use tokio_util::{
23    codec::{FramedRead, LinesCodec, LinesCodecError},
24    io::StreamReader,
25};
26use tower::{BoxError, Layer, Service, ServiceExt, buffer::Buffer, util::BoxService};
27use tower_http::map_response_body::MapResponseBodyLayer;
28
29pub use self::body::Body;
30use crate::{Config, Error, Result, api::WatchEvent, config::Kubeconfig};
31
32mod auth;
33mod body;
34mod builder;
35pub use kube_core::discovery::v2::{
36    APIGroupDiscovery, APIGroupDiscoveryList, APIResourceDiscovery, APISubresourceDiscovery,
37    APIVersionDiscovery, GroupVersionKind as DiscoveryGroupVersionKind,
38};
39#[cfg_attr(docsrs, doc(cfg(feature = "unstable-client")))]
40#[cfg(feature = "unstable-client")]
41mod client_ext;
42#[cfg_attr(docsrs, doc(cfg(feature = "unstable-client")))]
43#[cfg(feature = "unstable-client")]
44pub use client_ext::scope;
45mod config_ext;
46pub use auth::Error as AuthError;
47pub use config_ext::ConfigExt;
48pub mod middleware;
49pub mod retry;
50
51#[cfg(any(feature = "rustls-tls", feature = "openssl-tls"))] mod tls;
52
53#[cfg(feature = "openssl-tls")]
54pub use tls::openssl_tls::Error as OpensslTlsError;
55#[cfg(feature = "rustls-tls")] pub use tls::rustls_tls::Error as RustlsTlsError;
56#[cfg(feature = "ws")] mod upgrade;
57
58#[cfg(feature = "oauth")]
59#[cfg_attr(docsrs, doc(cfg(feature = "oauth")))]
60pub use auth::OAuthError;
61
62#[cfg(feature = "oidc")]
63#[cfg_attr(docsrs, doc(cfg(feature = "oidc")))]
64pub use auth::oidc_errors;
65
66#[cfg(feature = "ws")] pub use upgrade::UpgradeConnectionError;
67
68#[cfg(feature = "kubelet-debug")]
69#[cfg_attr(docsrs, doc(cfg(feature = "kubelet-debug")))]
70mod kubelet_debug;
71
72pub use builder::{ClientBuilder, DynBody};
73
74/// Client for connecting with a Kubernetes cluster.
75///
76/// The easiest way to instantiate the client is either by
77/// inferring the configuration from the environment using
78/// [`Client::try_default`] or with an existing [`Config`]
79/// using [`Client::try_from`].
80#[cfg_attr(docsrs, doc(cfg(feature = "client")))]
81#[derive(Clone)]
82pub struct Client {
83    // - `Buffer` for cheap clone
84    // - `BoxFuture` for dynamic response future type
85    inner: Buffer<Request<Body>, BoxFuture<'static, Result<Response<Body>, BoxError>>>,
86    default_ns: String,
87    valid_until: Option<Timestamp>,
88}
89
90/// Represents a WebSocket connection.
91/// Value returned by [`Client::connect`].
92#[cfg(feature = "ws")]
93#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
94pub struct Connection {
95    stream: WebSocketStream<TokioIo<hyper::upgrade::Upgraded>>,
96    protocol: upgrade::StreamProtocol,
97}
98
99#[cfg(feature = "ws")]
100#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
101impl Connection {
102    /// Return true if the stream supports graceful close signaling.
103    pub fn supports_stream_close(&self) -> bool {
104        self.protocol.supports_stream_close()
105    }
106
107    /// Transform into the raw WebSocketStream.
108    pub fn into_stream(self) -> WebSocketStream<TokioIo<hyper::upgrade::Upgraded>> {
109        self.stream
110    }
111}
112
113/// Constructors and low-level api interfaces.
114///
115/// Most users only need [`Client::try_default`] or [`Client::new`] from this block.
116///
117/// The many various lower level interfaces here are for more advanced use-cases with specific requirements.
118impl Client {
119    /// Create a [`Client`] using a custom `Service` stack.
120    ///
121    /// [`ConfigExt`](crate::client::ConfigExt) provides extensions for
122    /// building a custom stack.
123    ///
124    /// To create with the default stack with a [`Config`], use
125    /// [`Client::try_from`].
126    ///
127    /// To create with the default stack with an inferred [`Config`], use
128    /// [`Client::try_default`].
129    ///
130    /// # Example
131    ///
132    /// ```rust
133    /// # async fn doc() -> Result<(), Box<dyn std::error::Error>> {
134    /// use kube::{client::ConfigExt, Client, Config};
135    /// use tower::{BoxError, ServiceBuilder};
136    /// use hyper_util::rt::TokioExecutor;
137    ///
138    /// let config = Config::infer().await?;
139    /// let service = ServiceBuilder::new()
140    ///     .layer(config.base_uri_layer())
141    ///     .option_layer(config.auth_layer()?)
142    ///     .map_err(BoxError::from)
143    ///     .service(hyper_util::client::legacy::Client::builder(TokioExecutor::new()).build_http());
144    /// let client = Client::new(service, config.default_namespace);
145    /// # Ok(())
146    /// # }
147    /// ```
148    pub fn new<S, B, T>(service: S, default_namespace: T) -> Self
149    where
150        S: Service<Request<Body>, Response = Response<B>> + Send + 'static,
151        S::Future: Send + 'static,
152        S::Error: Into<BoxError>,
153        B: http_body::Body<Data = bytes::Bytes> + Send + 'static,
154        B::Error: Into<BoxError>,
155        T: Into<String>,
156    {
157        // Transform response body to `crate::client::Body` and use type erased error to avoid type parameters.
158        let service = MapResponseBodyLayer::new(Body::wrap_body)
159            .layer(service)
160            .map_err(|e| e.into());
161        Self {
162            inner: Buffer::new(BoxService::new(service), 1024),
163            default_ns: default_namespace.into(),
164            valid_until: None,
165        }
166    }
167
168    /// Sets an expiration timestamp to the client, which has to be checked by the user using [`Client::valid_until`] function.
169    pub fn with_valid_until(self, valid_until: Option<Timestamp>) -> Self {
170        Client { valid_until, ..self }
171    }
172
173    /// Get the expiration timestamp of the client, if it has been set.
174    pub fn valid_until(&self) -> &Option<Timestamp> {
175        &self.valid_until
176    }
177
178    /// Create and initialize a [`Client`] using the inferred configuration.
179    ///
180    /// Will use [`Config::infer`] which attempts to load the local kubeconfig first,
181    /// and then if that fails, trying the in-cluster environment variables.
182    ///
183    /// Will fail if neither configuration could be loaded.
184    ///
185    /// ```rust
186    /// # async fn doc() -> Result<(), Box<dyn std::error::Error>> {
187    /// # use kube::Client;
188    /// let client = Client::try_default().await?;
189    /// # Ok(())
190    /// # }
191    /// ```
192    ///
193    /// If you already have a [`Config`] then use [`Client::try_from`](Self::try_from)
194    /// instead.
195    pub async fn try_default() -> Result<Self> {
196        Self::try_from(Config::infer().await.map_err(Error::InferConfig)?)
197    }
198
199    /// Get the default namespace for the client
200    ///
201    /// The namespace is either configured on `context` in the kubeconfig,
202    /// falls back to `default` when running locally,
203    /// or uses the service account's namespace when deployed in-cluster.
204    pub fn default_namespace(&self) -> &str {
205        &self.default_ns
206    }
207
208    /// Perform a raw HTTP request against the API and return the raw response back.
209    /// This method can be used to get raw access to the API which may be used to, for example,
210    /// create a proxy server or application-level gateway between localhost and the API server.
211    pub async fn send(&self, request: Request<Body>) -> Result<Response<Body>> {
212        let mut svc = self.inner.clone();
213        let res = svc
214            .ready()
215            .await
216            .map_err(Error::Service)?
217            .call(request)
218            .await
219            .map_err(|err| {
220                // Error decorating request
221                err.downcast::<Error>()
222                    .map(|e| *e)
223                    // Error requesting
224                    .or_else(|err| err.downcast::<hyper::Error>().map(|err| Error::HyperError(*err)))
225                    // Error from another middleware
226                    .unwrap_or_else(Error::Service)
227            })?;
228        Ok(res)
229    }
230
231    /// Make WebSocket connection.
232    #[cfg(feature = "ws")]
233    #[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
234    pub async fn connect(&self, request: Request<Vec<u8>>) -> Result<Connection> {
235        use http::header::HeaderValue;
236        let (mut parts, body) = request.into_parts();
237        parts
238            .headers
239            .insert(http::header::CONNECTION, HeaderValue::from_static("Upgrade"));
240        parts
241            .headers
242            .insert(http::header::UPGRADE, HeaderValue::from_static("websocket"));
243        parts.headers.insert(
244            http::header::SEC_WEBSOCKET_VERSION,
245            HeaderValue::from_static("13"),
246        );
247        let key = tokio_tungstenite::tungstenite::handshake::client::generate_key();
248        parts.headers.insert(
249            http::header::SEC_WEBSOCKET_KEY,
250            key.parse().expect("valid header value"),
251        );
252        upgrade::StreamProtocol::add_to_headers(&mut parts.headers)?;
253
254        let res = self.send(Request::from_parts(parts, Body::from(body))).await?;
255        let protocol = upgrade::verify_response(&res, &key).map_err(Error::UpgradeConnection)?;
256        match hyper::upgrade::on(res).await {
257            Ok(upgraded) => Ok(Connection {
258                stream: WebSocketStream::from_raw_socket(
259                    TokioIo::new(upgraded),
260                    ws::protocol::Role::Client,
261                    None,
262                )
263                .await,
264                protocol,
265            }),
266
267            Err(e) => Err(Error::UpgradeConnection(
268                UpgradeConnectionError::GetPendingUpgrade(e),
269            )),
270        }
271    }
272
273    /// Perform a raw HTTP request against the API and deserialize the response
274    /// as JSON to some known type.
275    pub async fn request<T>(&self, request: Request<Vec<u8>>) -> Result<T>
276    where
277        T: DeserializeOwned,
278    {
279        let text = self.request_text(request).await?;
280
281        serde_json::from_str(&text).map_err(|e| {
282            tracing::warn!("{}, {:?}", text, e);
283            Error::SerdeError(e)
284        })
285    }
286
287    /// Perform a raw HTTP request against the API and get back the response
288    /// as a string
289    pub async fn request_text(&self, request: Request<Vec<u8>>) -> Result<String> {
290        let res = self.send(request.map(Body::from)).await?;
291        let res = handle_api_errors(res).await?;
292        let body_bytes = res.into_body().collect().await?.to_bytes();
293        let text = String::from_utf8(body_bytes.to_vec()).map_err(Error::FromUtf8)?;
294        Ok(text)
295    }
296
297    /// Perform a raw HTTP request against the API and stream the response body.
298    ///
299    /// The response can be processed using [`AsyncReadExt`](futures::AsyncReadExt)
300    /// and [`AsyncBufReadExt`](futures::AsyncBufReadExt).
301    pub async fn request_stream(&self, request: Request<Vec<u8>>) -> Result<impl AsyncBufRead + use<>> {
302        let res = self.send(request.map(Body::from)).await?;
303        let res = handle_api_errors(res).await?;
304        // Map the error, since we want to convert this into an `AsyncBufReader` using
305        // `into_async_read` which specifies `std::io::Error` as the stream's error type.
306        let body = res.into_body().into_data_stream().map_err(std::io::Error::other);
307        Ok(body.into_async_read())
308    }
309
310    /// Perform a raw HTTP request against the API and get back either an object
311    /// deserialized as JSON or a [`Status`] Object.
312    pub async fn request_status<T>(&self, request: Request<Vec<u8>>) -> Result<Either<T, Status>>
313    where
314        T: DeserializeOwned,
315    {
316        let text = self.request_text(request).await?;
317        // It needs to be JSON:
318        let v: Value = serde_json::from_str(&text).map_err(Error::SerdeError)?;
319        if v["kind"] == "Status" {
320            tracing::trace!("Status from {}", text);
321            Ok(Right(serde_json::from_str::<Status>(&text).map_err(|e| {
322                tracing::warn!("{}, {:?}", text, e);
323                Error::SerdeError(e)
324            })?))
325        } else {
326            Ok(Left(serde_json::from_str::<T>(&text).map_err(|e| {
327                tracing::warn!("{}, {:?}", text, e);
328                Error::SerdeError(e)
329            })?))
330        }
331    }
332
333    /// Perform a raw request and get back a stream of [`WatchEvent`] objects
334    pub async fn request_events<T>(
335        &self,
336        request: Request<Vec<u8>>,
337    ) -> Result<impl TryStream<Item = Result<WatchEvent<T>>> + use<T>>
338    where
339        T: Clone + DeserializeOwned,
340    {
341        let res = self.send(request.map(Body::from)).await?;
342        // trace!("Streaming from {} -> {}", res.url(), res.status().as_str());
343        tracing::trace!("headers: {:?}", res.headers());
344
345        let frames = FramedRead::new(
346            StreamReader::new(res.into_body().into_data_stream().map_err(|e| {
347                // Unexpected EOF from chunked decoder.
348                // Tends to happen when watching for 300+s. This will be ignored.
349                if e.to_string().contains("unexpected EOF during chunk") {
350                    return std::io::Error::new(std::io::ErrorKind::UnexpectedEof, e);
351                }
352                std::io::Error::other(e)
353            })),
354            LinesCodec::new(),
355        );
356
357        Ok(frames.filter_map(|res| async {
358            match res {
359                Ok(line) => match serde_json::from_str::<WatchEvent<T>>(&line) {
360                    Ok(event) => Some(Ok(event)),
361                    Err(e) => {
362                        // Ignore EOF error that can happen for incomplete line from `decode_eof`.
363                        if e.is_eof() {
364                            return None;
365                        }
366
367                        // Got general error response
368                        if let Ok(status) = serde_json::from_str::<Status>(&line) {
369                            return Some(Err(Error::Api(status.boxed())));
370                        }
371                        // Parsing error
372                        Some(Err(Error::SerdeError(e)))
373                    }
374                },
375
376                Err(LinesCodecError::Io(e)) => match e.kind() {
377                    // Client timeout
378                    std::io::ErrorKind::TimedOut => {
379                        tracing::warn!("timeout in poll: {}", e); // our client timeout
380                        None
381                    }
382                    // Unexpected EOF from chunked decoder.
383                    // Tends to happen after 300+s of watching.
384                    std::io::ErrorKind::UnexpectedEof => {
385                        tracing::warn!("eof in poll: {}", e);
386                        None
387                    }
388                    _ => Some(Err(Error::ReadEvents(e))),
389                },
390
391                // Reached the maximum line length without finding a newline.
392                // This should never happen because we're using the default `usize::MAX`.
393                Err(LinesCodecError::MaxLineLengthExceeded) => {
394                    Some(Err(Error::LinesCodecMaxLineLengthExceeded))
395                }
396            }
397        }))
398    }
399}
400
401/// Low level discovery methods using `k8s_openapi` types.
402///
403/// Consider using the [`discovery`](crate::discovery) module for
404/// easier-to-use variants of this functionality.
405/// The following methods might be deprecated to avoid confusion between similarly named types within `discovery`.
406impl Client {
407    /// Returns apiserver version.
408    pub async fn apiserver_version(&self) -> Result<k8s_openapi::apimachinery::pkg::version::Info> {
409        self.request(
410            Request::builder()
411                .uri("/version")
412                .body(vec![])
413                .map_err(Error::HttpError)?,
414        )
415        .await
416    }
417
418    /// Lists api groups that apiserver serves.
419    pub async fn list_api_groups(&self) -> Result<k8s_meta_v1::APIGroupList> {
420        self.request(
421            Request::builder()
422                .uri("/apis")
423                .body(vec![])
424                .map_err(Error::HttpError)?,
425        )
426        .await
427    }
428
429    /// Lists resources served in given API group.
430    ///
431    /// ### Example usage:
432    /// ```rust
433    /// # async fn scope(client: kube::Client) -> Result<(), Box<dyn std::error::Error>> {
434    /// let apigroups = client.list_api_groups().await?;
435    /// for g in apigroups.groups {
436    ///     let ver = g
437    ///         .preferred_version
438    ///         .as_ref()
439    ///         .or_else(|| g.versions.first())
440    ///         .expect("preferred or versions exists");
441    ///     let apis = client.list_api_group_resources(&ver.group_version).await?;
442    ///     dbg!(apis);
443    /// }
444    /// # Ok(())
445    /// # }
446    /// ```
447    pub async fn list_api_group_resources(&self, apiversion: &str) -> Result<k8s_meta_v1::APIResourceList> {
448        let url = format!("/apis/{apiversion}");
449        self.request(
450            Request::builder()
451                .uri(url)
452                .body(vec![])
453                .map_err(Error::HttpError)?,
454        )
455        .await
456    }
457
458    /// Lists versions of `core` a.k.a. `""` legacy API group.
459    pub async fn list_core_api_versions(&self) -> Result<k8s_meta_v1::APIVersions> {
460        self.request(
461            Request::builder()
462                .uri("/api")
463                .body(vec![])
464                .map_err(Error::HttpError)?,
465        )
466        .await
467    }
468
469    /// Lists resources served in particular `core` group version.
470    pub async fn list_core_api_resources(&self, version: &str) -> Result<k8s_meta_v1::APIResourceList> {
471        let url = format!("/api/{version}");
472        self.request(
473            Request::builder()
474                .uri(url)
475                .body(vec![])
476                .map_err(Error::HttpError)?,
477        )
478        .await
479    }
480}
481
482/// Aggregated Discovery API methods
483///
484/// These methods use the Aggregated Discovery API (available since Kubernetes 1.26, stable in 1.30)
485/// to fetch all API resources in a single request, reducing the number of API calls compared to
486/// the traditional discovery methods.
487impl Client {
488    /// Returns aggregated discovery for all API groups served at /apis.
489    ///
490    /// This uses the Aggregated Discovery API to fetch all non-core API groups
491    /// and their resources in a single request.
492    ///
493    /// Requires Kubernetes 1.26+ (beta) or 1.30+ (stable).
494    ///
495    /// ### Example usage:
496    /// ```rust,no_run
497    /// # async fn scope(client: kube::Client) -> Result<(), Box<dyn std::error::Error>> {
498    /// let discovery = client.list_api_groups_aggregated().await?;
499    /// for group in discovery.items {
500    ///     let name = group.metadata.as_ref().and_then(|m| m.name.as_ref());
501    ///     println!("Group: {:?}", name);
502    ///     for version in group.versions {
503    ///         println!("  Version: {:?}", version.version);
504    ///         for resource in version.resources {
505    ///             println!("    Resource: {:?}", resource.resource);
506    ///         }
507    ///     }
508    /// }
509    /// # Ok(())
510    /// # }
511    /// ```
512    pub async fn list_api_groups_aggregated(&self) -> Result<APIGroupDiscoveryList> {
513        self.request(
514            Request::builder()
515                .uri("/apis")
516                .header(http::header::ACCEPT, ACCEPT_AGGREGATED_DISCOVERY_V2)
517                .body(vec![])
518                .map_err(Error::HttpError)?,
519        )
520        .await
521    }
522
523    /// Returns aggregated discovery for core API group served at /api.
524    ///
525    /// This uses the Aggregated Discovery API to fetch the core API group
526    /// and all its resources in a single request.
527    ///
528    /// Requires Kubernetes 1.26+ (beta) or 1.30+ (stable).
529    ///
530    /// ### Example usage:
531    /// ```rust,no_run
532    /// # async fn scope(client: kube::Client) -> Result<(), Box<dyn std::error::Error>> {
533    /// let discovery = client.list_core_api_versions_aggregated().await?;
534    /// for group in discovery.items {
535    ///     for version in group.versions {
536    ///         println!("Core version: {:?}", version.version);
537    ///         for resource in version.resources {
538    ///             println!("  Resource: {:?} (scope: {:?})", resource.resource, resource.scope);
539    ///         }
540    ///     }
541    /// }
542    /// # Ok(())
543    /// # }
544    /// ```
545    pub async fn list_core_api_versions_aggregated(&self) -> Result<APIGroupDiscoveryList> {
546        self.request(
547            Request::builder()
548                .uri("/api")
549                .header(http::header::ACCEPT, ACCEPT_AGGREGATED_DISCOVERY_V2)
550                .body(vec![])
551                .map_err(Error::HttpError)?,
552        )
553        .await
554    }
555}
556
557/// Kubernetes returned error handling
558///
559/// Either kube returned an explicit ApiError struct,
560/// or it someohow returned something we couldn't parse as one.
561///
562/// In either case, present an ApiError upstream.
563/// The latter is probably a bug if encountered.
564async fn handle_api_errors(res: Response<Body>) -> Result<Response<Body>> {
565    let status = res.status();
566    if status.is_client_error() || status.is_server_error() {
567        // trace!("Status = {:?} for {}", status, res.url());
568        let body_bytes = res.into_body().collect().await?.to_bytes();
569        let text = String::from_utf8(body_bytes.to_vec()).map_err(Error::FromUtf8)?;
570        // Print better debug when things do fail
571        // trace!("Parsing error: {}", text);
572        if let Ok(status) = serde_json::from_str::<Status>(&text) {
573            tracing::debug!("Unsuccessful: {status:?}");
574            Err(Error::Api(status.boxed()))
575        } else {
576            tracing::warn!("Unsuccessful data error parse: {text}");
577            let status = Status::failure(&text, "Failed to parse error data").with_code(status.as_u16());
578            tracing::debug!("Unsuccessful: {status:?} (reconstruct)");
579            Err(Error::Api(status.boxed()))
580        }
581    } else {
582        Ok(res)
583    }
584}
585
586impl TryFrom<Config> for Client {
587    type Error = Error;
588
589    /// Builds a default [`Client`] from a [`Config`].
590    ///
591    /// See [`ClientBuilder`] or [`Client::new`] if more customization is required
592    fn try_from(config: Config) -> Result<Self> {
593        Ok(ClientBuilder::try_from(config)?.build())
594    }
595}
596
597impl TryFrom<Kubeconfig> for Client {
598    type Error = Error;
599
600    fn try_from(kubeconfig: Kubeconfig) -> Result<Self> {
601        let config = Config::try_from(kubeconfig)?;
602        Ok(ClientBuilder::try_from(config)?.build())
603    }
604}
605
606#[cfg(test)]
607mod tests {
608    use std::pin::pin;
609
610    use crate::{
611        Api, Client,
612        client::Body,
613        config::{AuthInfo, Cluster, Context, Kubeconfig, NamedAuthInfo, NamedCluster, NamedContext},
614    };
615
616    use http::{Request, Response};
617    use k8s_openapi::api::core::v1::Pod;
618    use tower_test::mock;
619
620    #[tokio::test]
621    async fn test_default_ns() {
622        let (mock_service, _) = mock::pair::<Request<Body>, Response<Body>>();
623        let client = Client::new(mock_service, "test-namespace");
624        assert_eq!(client.default_namespace(), "test-namespace");
625    }
626
627    #[tokio::test]
628    async fn test_try_from_kubeconfig() {
629        let config = Kubeconfig {
630            current_context: Some("test-context".to_string()),
631            auth_infos: vec![NamedAuthInfo {
632                name: "test-user".to_string(),
633                auth_info: Some(AuthInfo::default()), // <-- empty but valid
634            }],
635            contexts: vec![NamedContext {
636                name: "test-context".to_string(),
637                context: Some(Context {
638                    cluster: "test-cluster".to_string(),
639                    user: Some("test-user".to_string()),
640                    namespace: Some("test-namespace".to_string()),
641                    ..Default::default()
642                }),
643            }],
644            clusters: vec![NamedCluster {
645                name: "test-cluster".to_string(),
646                cluster: Some(Cluster {
647                    server: Some("http://localhost:8080".to_string()),
648                    ..Default::default()
649                }),
650            }],
651            ..Default::default()
652        };
653        let client = Client::try_from(config).expect("Failed to create client from kubeconfig");
654        assert_eq!(client.default_namespace(), "test-namespace");
655    }
656
657    #[tokio::test]
658    async fn test_mock() {
659        let (mock_service, handle) = mock::pair::<Request<Body>, Response<Body>>();
660        let spawned = tokio::spawn(async move {
661            // Receive a request for pod and respond with some data
662            let mut handle = pin!(handle);
663            let (request, send) = handle.next_request().await.expect("service not called");
664            assert_eq!(request.method(), http::Method::GET);
665            assert_eq!(request.uri().to_string(), "/api/v1/namespaces/default/pods/test");
666            let pod: Pod = serde_json::from_value(serde_json::json!({
667                "apiVersion": "v1",
668                "kind": "Pod",
669                "metadata": {
670                    "name": "test",
671                    "annotations": { "kube-rs": "test" },
672                },
673                "spec": {
674                    "containers": [{ "name": "test", "image": "test-image" }],
675                }
676            }))
677            .unwrap();
678            send.send_response(
679                Response::builder()
680                    .body(Body::from(serde_json::to_vec(&pod).unwrap()))
681                    .unwrap(),
682            );
683        });
684
685        let pods: Api<Pod> = Api::default_namespaced(Client::new(mock_service, "default"));
686        let pod = pods.get("test").await.unwrap();
687        assert_eq!(pod.metadata.annotations.unwrap().get("kube-rs").unwrap(), "test");
688        spawned.await.unwrap();
689    }
690}