kube_client/client/
mod.rs

1//! API client for interacting with the Kubernetes API
2//!
3//! The [`Client`] uses standard kube error handling.
4//!
5//! This client can be used on its own or in conjuction with the [`Api`][crate::api::Api]
6//! type for more structured interaction with the kubernetes API.
7//!
8//! The [`Client`] can also be used with [`Discovery`](crate::Discovery) to dynamically
9//! retrieve the resources served by the kubernetes API.
10use either::{Either, Left, Right};
11use futures::{future::BoxFuture, AsyncBufRead, StreamExt, TryStream, TryStreamExt};
12use http::{self, Request, Response};
13use http_body_util::BodyExt;
14#[cfg(feature = "ws")] use hyper_util::rt::TokioIo;
15use k8s_openapi::apimachinery::pkg::apis::meta::v1 as k8s_meta_v1;
16pub use kube_core::response::Status;
17use serde::de::DeserializeOwned;
18use serde_json::{self, Value};
19#[cfg(feature = "ws")]
20use tokio_tungstenite::{tungstenite as ws, WebSocketStream};
21use tokio_util::{
22    codec::{FramedRead, LinesCodec, LinesCodecError},
23    io::StreamReader,
24};
25use tower::{buffer::Buffer, util::BoxService, BoxError, Layer, Service, ServiceExt};
26use tower_http::map_response_body::MapResponseBodyLayer;
27
28pub use self::body::Body;
29use crate::{api::WatchEvent, error::ErrorResponse, Config, Error, Result};
30
31mod auth;
32mod body;
33mod builder;
34#[cfg_attr(docsrs, doc(cfg(feature = "unstable-client")))]
35#[cfg(feature = "unstable-client")]
36mod client_ext;
37#[cfg_attr(docsrs, doc(cfg(feature = "unstable-client")))]
38#[cfg(feature = "unstable-client")]
39pub use client_ext::scope;
40mod config_ext;
41pub use auth::Error as AuthError;
42pub use config_ext::ConfigExt;
43pub mod middleware;
44
45#[cfg(any(feature = "rustls-tls", feature = "openssl-tls"))] mod tls;
46
47#[cfg(feature = "openssl-tls")]
48pub use tls::openssl_tls::Error as OpensslTlsError;
49#[cfg(feature = "rustls-tls")] pub use tls::rustls_tls::Error as RustlsTlsError;
50#[cfg(feature = "ws")] mod upgrade;
51
52#[cfg(feature = "oauth")]
53#[cfg_attr(docsrs, doc(cfg(feature = "oauth")))]
54pub use auth::OAuthError;
55
56#[cfg(feature = "oidc")]
57#[cfg_attr(docsrs, doc(cfg(feature = "oidc")))]
58pub use auth::oidc_errors;
59
60#[cfg(feature = "ws")] pub use upgrade::UpgradeConnectionError;
61
62#[cfg(feature = "kubelet-debug")]
63#[cfg_attr(docsrs, doc(cfg(feature = "kubelet-debug")))]
64mod kubelet_debug;
65
66pub use builder::{ClientBuilder, DynBody};
67
68/// Client for connecting with a Kubernetes cluster.
69///
70/// The easiest way to instantiate the client is either by
71/// inferring the configuration from the environment using
72/// [`Client::try_default`] or with an existing [`Config`]
73/// using [`Client::try_from`].
74#[cfg_attr(docsrs, doc(cfg(feature = "client")))]
75#[derive(Clone)]
76pub struct Client {
77    // - `Buffer` for cheap clone
78    // - `BoxFuture` for dynamic response future type
79    inner: Buffer<Request<Body>, BoxFuture<'static, Result<Response<Body>, BoxError>>>,
80    default_ns: String,
81}
82
83/// Constructors and low-level api interfaces.
84///
85/// Most users only need [`Client::try_default`] or [`Client::new`] from this block.
86///
87/// The many various lower level interfaces here are for more advanced use-cases with specific requirements.
88impl Client {
89    /// Create a [`Client`] using a custom `Service` stack.
90    ///
91    /// [`ConfigExt`](crate::client::ConfigExt) provides extensions for
92    /// building a custom stack.
93    ///
94    /// To create with the default stack with a [`Config`], use
95    /// [`Client::try_from`].
96    ///
97    /// To create with the default stack with an inferred [`Config`], use
98    /// [`Client::try_default`].
99    ///
100    /// # Example
101    ///
102    /// ```rust
103    /// # async fn doc() -> Result<(), Box<dyn std::error::Error>> {
104    /// use kube::{client::ConfigExt, Client, Config};
105    /// use tower::{BoxError, ServiceBuilder};
106    /// use hyper_util::rt::TokioExecutor;
107    ///
108    /// let config = Config::infer().await?;
109    /// let service = ServiceBuilder::new()
110    ///     .layer(config.base_uri_layer())
111    ///     .option_layer(config.auth_layer()?)
112    ///     .map_err(BoxError::from)
113    ///     .service(hyper_util::client::legacy::Client::builder(TokioExecutor::new()).build_http());
114    /// let client = Client::new(service, config.default_namespace);
115    /// # Ok(())
116    /// # }
117    /// ```
118    pub fn new<S, B, T>(service: S, default_namespace: T) -> Self
119    where
120        S: Service<Request<Body>, Response = Response<B>> + Send + 'static,
121        S::Future: Send + 'static,
122        S::Error: Into<BoxError>,
123        B: http_body::Body<Data = bytes::Bytes> + Send + 'static,
124        B::Error: Into<BoxError>,
125        T: Into<String>,
126    {
127        // Transform response body to `crate::client::Body` and use type erased error to avoid type parameters.
128        let service = MapResponseBodyLayer::new(Body::wrap_body)
129            .layer(service)
130            .map_err(|e| e.into());
131        Self {
132            inner: Buffer::new(BoxService::new(service), 1024),
133            default_ns: default_namespace.into(),
134        }
135    }
136
137    /// Create and initialize a [`Client`] using the inferred configuration.
138    ///
139    /// Will use [`Config::infer`] which attempts to load the local kubeconfig first,
140    /// and then if that fails, trying the in-cluster environment variables.
141    ///
142    /// Will fail if neither configuration could be loaded.
143    ///
144    /// ```rust
145    /// # async fn doc() -> Result<(), Box<dyn std::error::Error>> {
146    /// # use kube::Client;
147    /// let client = Client::try_default().await?;
148    /// # Ok(())
149    /// # }
150    /// ```
151    ///
152    /// If you already have a [`Config`] then use [`Client::try_from`](Self::try_from)
153    /// instead.
154    pub async fn try_default() -> Result<Self> {
155        Self::try_from(Config::infer().await.map_err(Error::InferConfig)?)
156    }
157
158    /// Get the default namespace for the client
159    ///
160    /// The namespace is either configured on `context` in the kubeconfig,
161    /// falls back to `default` when running locally,
162    /// or uses the service account's namespace when deployed in-cluster.
163    pub fn default_namespace(&self) -> &str {
164        &self.default_ns
165    }
166
167    /// Perform a raw HTTP request against the API and return the raw response back.
168    /// This method can be used to get raw access to the API which may be used to, for example,
169    /// create a proxy server or application-level gateway between localhost and the API server.
170    pub async fn send(&self, request: Request<Body>) -> Result<Response<Body>> {
171        let mut svc = self.inner.clone();
172        let res = svc
173            .ready()
174            .await
175            .map_err(Error::Service)?
176            .call(request)
177            .await
178            .map_err(|err| {
179                // Error decorating request
180                err.downcast::<Error>()
181                    .map(|e| *e)
182                    // Error requesting
183                    .or_else(|err| err.downcast::<hyper::Error>().map(|err| Error::HyperError(*err)))
184                    // Error from another middleware
185                    .unwrap_or_else(Error::Service)
186            })?;
187        Ok(res)
188    }
189
190    /// Make WebSocket connection.
191    #[cfg(feature = "ws")]
192    #[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
193    pub async fn connect(
194        &self,
195        request: Request<Vec<u8>>,
196    ) -> Result<WebSocketStream<TokioIo<hyper::upgrade::Upgraded>>> {
197        use http::header::HeaderValue;
198        let (mut parts, body) = request.into_parts();
199        parts
200            .headers
201            .insert(http::header::CONNECTION, HeaderValue::from_static("Upgrade"));
202        parts
203            .headers
204            .insert(http::header::UPGRADE, HeaderValue::from_static("websocket"));
205        parts.headers.insert(
206            http::header::SEC_WEBSOCKET_VERSION,
207            HeaderValue::from_static("13"),
208        );
209        let key = upgrade::sec_websocket_key();
210        parts.headers.insert(
211            http::header::SEC_WEBSOCKET_KEY,
212            key.parse().expect("valid header value"),
213        );
214        // Use the binary subprotocol v4, to get JSON `Status` object in `error` channel (3).
215        // There's no official documentation about this protocol, but it's described in
216        // [`k8s.io/apiserver/pkg/util/wsstream/conn.go`](https://git.io/JLQED).
217        // There's a comment about v4 and `Status` object in
218        // [`kublet/cri/streaming/remotecommand/httpstream.go`](https://git.io/JLQEh).
219        parts.headers.insert(
220            http::header::SEC_WEBSOCKET_PROTOCOL,
221            HeaderValue::from_static(upgrade::WS_PROTOCOL),
222        );
223
224        let res = self.send(Request::from_parts(parts, Body::from(body))).await?;
225        upgrade::verify_response(&res, &key).map_err(Error::UpgradeConnection)?;
226        match hyper::upgrade::on(res).await {
227            Ok(upgraded) => Ok(WebSocketStream::from_raw_socket(
228                TokioIo::new(upgraded),
229                ws::protocol::Role::Client,
230                None,
231            )
232            .await),
233
234            Err(e) => Err(Error::UpgradeConnection(
235                UpgradeConnectionError::GetPendingUpgrade(e),
236            )),
237        }
238    }
239
240    /// Perform a raw HTTP request against the API and deserialize the response
241    /// as JSON to some known type.
242    pub async fn request<T>(&self, request: Request<Vec<u8>>) -> Result<T>
243    where
244        T: DeserializeOwned,
245    {
246        let text = self.request_text(request).await?;
247
248        serde_json::from_str(&text).map_err(|e| {
249            tracing::warn!("{}, {:?}", text, e);
250            Error::SerdeError(e)
251        })
252    }
253
254    /// Perform a raw HTTP request against the API and get back the response
255    /// as a string
256    pub async fn request_text(&self, request: Request<Vec<u8>>) -> Result<String> {
257        let res = self.send(request.map(Body::from)).await?;
258        let res = handle_api_errors(res).await?;
259        let body_bytes = res.into_body().collect().await?.to_bytes();
260        let text = String::from_utf8(body_bytes.to_vec()).map_err(Error::FromUtf8)?;
261        Ok(text)
262    }
263
264    /// Perform a raw HTTP request against the API and stream the response body.
265    ///
266    /// The response can be processed using [`AsyncReadExt`](futures::AsyncReadExt)
267    /// and [`AsyncBufReadExt`](futures::AsyncBufReadExt).
268    pub async fn request_stream(&self, request: Request<Vec<u8>>) -> Result<impl AsyncBufRead> {
269        let res = self.send(request.map(Body::from)).await?;
270        let res = handle_api_errors(res).await?;
271        // Map the error, since we want to convert this into an `AsyncBufReader` using
272        // `into_async_read` which specifies `std::io::Error` as the stream's error type.
273        let body = res.into_body().into_data_stream().map_err(std::io::Error::other);
274        Ok(body.into_async_read())
275    }
276
277    /// Perform a raw HTTP request against the API and get back either an object
278    /// deserialized as JSON or a [`Status`] Object.
279    pub async fn request_status<T>(&self, request: Request<Vec<u8>>) -> Result<Either<T, Status>>
280    where
281        T: DeserializeOwned,
282    {
283        let text = self.request_text(request).await?;
284        // It needs to be JSON:
285        let v: Value = serde_json::from_str(&text).map_err(Error::SerdeError)?;
286        if v["kind"] == "Status" {
287            tracing::trace!("Status from {}", text);
288            Ok(Right(serde_json::from_str::<Status>(&text).map_err(|e| {
289                tracing::warn!("{}, {:?}", text, e);
290                Error::SerdeError(e)
291            })?))
292        } else {
293            Ok(Left(serde_json::from_str::<T>(&text).map_err(|e| {
294                tracing::warn!("{}, {:?}", text, e);
295                Error::SerdeError(e)
296            })?))
297        }
298    }
299
300    /// Perform a raw request and get back a stream of [`WatchEvent`] objects
301    pub async fn request_events<T>(
302        &self,
303        request: Request<Vec<u8>>,
304    ) -> Result<impl TryStream<Item = Result<WatchEvent<T>>>>
305    where
306        T: Clone + DeserializeOwned,
307    {
308        let res = self.send(request.map(Body::from)).await?;
309        // trace!("Streaming from {} -> {}", res.url(), res.status().as_str());
310        tracing::trace!("headers: {:?}", res.headers());
311
312        let frames = FramedRead::new(
313            StreamReader::new(res.into_body().into_data_stream().map_err(|e| {
314                // Unexpected EOF from chunked decoder.
315                // Tends to happen when watching for 300+s. This will be ignored.
316                if e.to_string().contains("unexpected EOF during chunk") {
317                    return std::io::Error::new(std::io::ErrorKind::UnexpectedEof, e);
318                }
319                std::io::Error::other(e)
320            })),
321            LinesCodec::new(),
322        );
323
324        Ok(frames.filter_map(|res| async {
325            match res {
326                Ok(line) => match serde_json::from_str::<WatchEvent<T>>(&line) {
327                    Ok(event) => Some(Ok(event)),
328                    Err(e) => {
329                        // Ignore EOF error that can happen for incomplete line from `decode_eof`.
330                        if e.is_eof() {
331                            return None;
332                        }
333
334                        // Got general error response
335                        if let Ok(e_resp) = serde_json::from_str::<ErrorResponse>(&line) {
336                            return Some(Err(Error::Api(e_resp)));
337                        }
338                        // Parsing error
339                        Some(Err(Error::SerdeError(e)))
340                    }
341                },
342
343                Err(LinesCodecError::Io(e)) => match e.kind() {
344                    // Client timeout
345                    std::io::ErrorKind::TimedOut => {
346                        tracing::warn!("timeout in poll: {}", e); // our client timeout
347                        None
348                    }
349                    // Unexpected EOF from chunked decoder.
350                    // Tends to happen after 300+s of watching.
351                    std::io::ErrorKind::UnexpectedEof => {
352                        tracing::warn!("eof in poll: {}", e);
353                        None
354                    }
355                    _ => Some(Err(Error::ReadEvents(e))),
356                },
357
358                // Reached the maximum line length without finding a newline.
359                // This should never happen because we're using the default `usize::MAX`.
360                Err(LinesCodecError::MaxLineLengthExceeded) => {
361                    Some(Err(Error::LinesCodecMaxLineLengthExceeded))
362                }
363            }
364        }))
365    }
366}
367
368/// Low level discovery methods using `k8s_openapi` types.
369///
370/// Consider using the [`discovery`](crate::discovery) module for
371/// easier-to-use variants of this functionality.
372/// The following methods might be deprecated to avoid confusion between similarly named types within `discovery`.
373impl Client {
374    /// Returns apiserver version.
375    pub async fn apiserver_version(&self) -> Result<k8s_openapi::apimachinery::pkg::version::Info> {
376        self.request(
377            Request::builder()
378                .uri("/version")
379                .body(vec![])
380                .map_err(Error::HttpError)?,
381        )
382        .await
383    }
384
385    /// Lists api groups that apiserver serves.
386    pub async fn list_api_groups(&self) -> Result<k8s_meta_v1::APIGroupList> {
387        self.request(
388            Request::builder()
389                .uri("/apis")
390                .body(vec![])
391                .map_err(Error::HttpError)?,
392        )
393        .await
394    }
395
396    /// Lists resources served in given API group.
397    ///
398    /// ### Example usage:
399    /// ```rust
400    /// # async fn scope(client: kube::Client) -> Result<(), Box<dyn std::error::Error>> {
401    /// let apigroups = client.list_api_groups().await?;
402    /// for g in apigroups.groups {
403    ///     let ver = g
404    ///         .preferred_version
405    ///         .as_ref()
406    ///         .or_else(|| g.versions.first())
407    ///         .expect("preferred or versions exists");
408    ///     let apis = client.list_api_group_resources(&ver.group_version).await?;
409    ///     dbg!(apis);
410    /// }
411    /// # Ok(())
412    /// # }
413    /// ```
414    pub async fn list_api_group_resources(&self, apiversion: &str) -> Result<k8s_meta_v1::APIResourceList> {
415        let url = format!("/apis/{apiversion}");
416        self.request(
417            Request::builder()
418                .uri(url)
419                .body(vec![])
420                .map_err(Error::HttpError)?,
421        )
422        .await
423    }
424
425    /// Lists versions of `core` a.k.a. `""` legacy API group.
426    pub async fn list_core_api_versions(&self) -> Result<k8s_meta_v1::APIVersions> {
427        self.request(
428            Request::builder()
429                .uri("/api")
430                .body(vec![])
431                .map_err(Error::HttpError)?,
432        )
433        .await
434    }
435
436    /// Lists resources served in particular `core` group version.
437    pub async fn list_core_api_resources(&self, version: &str) -> Result<k8s_meta_v1::APIResourceList> {
438        let url = format!("/api/{version}");
439        self.request(
440            Request::builder()
441                .uri(url)
442                .body(vec![])
443                .map_err(Error::HttpError)?,
444        )
445        .await
446    }
447}
448
449/// Kubernetes returned error handling
450///
451/// Either kube returned an explicit ApiError struct,
452/// or it someohow returned something we couldn't parse as one.
453///
454/// In either case, present an ApiError upstream.
455/// The latter is probably a bug if encountered.
456async fn handle_api_errors(res: Response<Body>) -> Result<Response<Body>> {
457    let status = res.status();
458    if status.is_client_error() || status.is_server_error() {
459        // trace!("Status = {:?} for {}", status, res.url());
460        let body_bytes = res.into_body().collect().await?.to_bytes();
461        let text = String::from_utf8(body_bytes.to_vec()).map_err(Error::FromUtf8)?;
462        // Print better debug when things do fail
463        // trace!("Parsing error: {}", text);
464        if let Ok(errdata) = serde_json::from_str::<ErrorResponse>(&text) {
465            tracing::debug!("Unsuccessful: {errdata:?}");
466            Err(Error::Api(errdata))
467        } else {
468            tracing::warn!("Unsuccessful data error parse: {}", text);
469            let error_response = ErrorResponse {
470                status: status.to_string(),
471                code: status.as_u16(),
472                message: format!("{text:?}"),
473                reason: "Failed to parse error data".into(),
474            };
475            tracing::debug!("Unsuccessful: {error_response:?} (reconstruct)");
476            Err(Error::Api(error_response))
477        }
478    } else {
479        Ok(res)
480    }
481}
482
483impl TryFrom<Config> for Client {
484    type Error = Error;
485
486    /// Builds a default [`Client`] from a [`Config`].
487    ///
488    /// See [`ClientBuilder`] or [`Client::new`] if more customization is required
489    fn try_from(config: Config) -> Result<Self> {
490        Ok(ClientBuilder::try_from(config)?.build())
491    }
492}
493
494#[cfg(test)]
495mod tests {
496    use std::pin::pin;
497
498    use crate::{client::Body, Api, Client};
499
500    use http::{Request, Response};
501    use k8s_openapi::api::core::v1::Pod;
502    use tower_test::mock;
503
504    #[tokio::test]
505    async fn test_default_ns() {
506        let (mock_service, _) = mock::pair::<Request<Body>, Response<Body>>();
507        let client = Client::new(mock_service, "test-namespace");
508        assert_eq!(client.default_namespace(), "test-namespace");
509    }
510
511    #[tokio::test]
512    async fn test_mock() {
513        let (mock_service, handle) = mock::pair::<Request<Body>, Response<Body>>();
514        let spawned = tokio::spawn(async move {
515            // Receive a request for pod and respond with some data
516            let mut handle = pin!(handle);
517            let (request, send) = handle.next_request().await.expect("service not called");
518            assert_eq!(request.method(), http::Method::GET);
519            assert_eq!(request.uri().to_string(), "/api/v1/namespaces/default/pods/test");
520            let pod: Pod = serde_json::from_value(serde_json::json!({
521                "apiVersion": "v1",
522                "kind": "Pod",
523                "metadata": {
524                    "name": "test",
525                    "annotations": { "kube-rs": "test" },
526                },
527                "spec": {
528                    "containers": [{ "name": "test", "image": "test-image" }],
529                }
530            }))
531            .unwrap();
532            send.send_response(
533                Response::builder()
534                    .body(Body::from(serde_json::to_vec(&pod).unwrap()))
535                    .unwrap(),
536            );
537        });
538
539        let pods: Api<Pod> = Api::default_namespaced(Client::new(mock_service, "default"));
540        let pod = pods.get("test").await.unwrap();
541        assert_eq!(pod.metadata.annotations.unwrap().get("kube-rs").unwrap(), "test");
542        spawned.await.unwrap();
543    }
544}