1use either::{Either, Left, Right};
11use futures::{future::BoxFuture, AsyncBufRead, StreamExt, TryStream, TryStreamExt};
12use http::{self, Request, Response};
13use http_body_util::BodyExt;
14#[cfg(feature = "ws")] use hyper_util::rt::TokioIo;
15use k8s_openapi::apimachinery::pkg::apis::meta::v1 as k8s_meta_v1;
16pub use kube_core::response::Status;
17use serde::de::DeserializeOwned;
18use serde_json::{self, Value};
19#[cfg(feature = "ws")]
20use tokio_tungstenite::{tungstenite as ws, WebSocketStream};
21use tokio_util::{
22 codec::{FramedRead, LinesCodec, LinesCodecError},
23 io::StreamReader,
24};
25use tower::{buffer::Buffer, util::BoxService, BoxError, Layer, Service, ServiceExt};
26use tower_http::map_response_body::MapResponseBodyLayer;
27
28pub use self::body::Body;
29use crate::{api::WatchEvent, error::ErrorResponse, Config, Error, Result};
30
31mod auth;
32mod body;
33mod builder;
34#[cfg_attr(docsrs, doc(cfg(feature = "unstable-client")))]
35#[cfg(feature = "unstable-client")]
36mod client_ext;
37#[cfg_attr(docsrs, doc(cfg(feature = "unstable-client")))]
38#[cfg(feature = "unstable-client")]
39pub use client_ext::scope;
40mod config_ext;
41pub use auth::Error as AuthError;
42pub use config_ext::ConfigExt;
43pub mod middleware;
44
45#[cfg(any(feature = "rustls-tls", feature = "openssl-tls"))] mod tls;
46
47#[cfg(feature = "openssl-tls")]
48pub use tls::openssl_tls::Error as OpensslTlsError;
49#[cfg(feature = "rustls-tls")] pub use tls::rustls_tls::Error as RustlsTlsError;
50#[cfg(feature = "ws")] mod upgrade;
51
52#[cfg(feature = "oauth")]
53#[cfg_attr(docsrs, doc(cfg(feature = "oauth")))]
54pub use auth::OAuthError;
55
56#[cfg(feature = "oidc")]
57#[cfg_attr(docsrs, doc(cfg(feature = "oidc")))]
58pub use auth::oidc_errors;
59
60#[cfg(feature = "ws")] pub use upgrade::UpgradeConnectionError;
61
62#[cfg(feature = "kubelet-debug")]
63#[cfg_attr(docsrs, doc(cfg(feature = "kubelet-debug")))]
64mod kubelet_debug;
65
66pub use builder::{ClientBuilder, DynBody};
67
68#[cfg_attr(docsrs, doc(cfg(feature = "client")))]
75#[derive(Clone)]
76pub struct Client {
77 inner: Buffer<Request<Body>, BoxFuture<'static, Result<Response<Body>, BoxError>>>,
80 default_ns: String,
81}
82
83impl Client {
89 pub fn new<S, B, T>(service: S, default_namespace: T) -> Self
119 where
120 S: Service<Request<Body>, Response = Response<B>> + Send + 'static,
121 S::Future: Send + 'static,
122 S::Error: Into<BoxError>,
123 B: http_body::Body<Data = bytes::Bytes> + Send + 'static,
124 B::Error: Into<BoxError>,
125 T: Into<String>,
126 {
127 let service = MapResponseBodyLayer::new(Body::wrap_body)
129 .layer(service)
130 .map_err(|e| e.into());
131 Self {
132 inner: Buffer::new(BoxService::new(service), 1024),
133 default_ns: default_namespace.into(),
134 }
135 }
136
137 pub async fn try_default() -> Result<Self> {
155 Self::try_from(Config::infer().await.map_err(Error::InferConfig)?)
156 }
157
158 pub fn default_namespace(&self) -> &str {
164 &self.default_ns
165 }
166
167 pub async fn send(&self, request: Request<Body>) -> Result<Response<Body>> {
171 let mut svc = self.inner.clone();
172 let res = svc
173 .ready()
174 .await
175 .map_err(Error::Service)?
176 .call(request)
177 .await
178 .map_err(|err| {
179 err.downcast::<Error>()
181 .map(|e| *e)
182 .or_else(|err| err.downcast::<hyper::Error>().map(|err| Error::HyperError(*err)))
184 .unwrap_or_else(Error::Service)
186 })?;
187 Ok(res)
188 }
189
190 #[cfg(feature = "ws")]
192 #[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
193 pub async fn connect(
194 &self,
195 request: Request<Vec<u8>>,
196 ) -> Result<WebSocketStream<TokioIo<hyper::upgrade::Upgraded>>> {
197 use http::header::HeaderValue;
198 let (mut parts, body) = request.into_parts();
199 parts
200 .headers
201 .insert(http::header::CONNECTION, HeaderValue::from_static("Upgrade"));
202 parts
203 .headers
204 .insert(http::header::UPGRADE, HeaderValue::from_static("websocket"));
205 parts.headers.insert(
206 http::header::SEC_WEBSOCKET_VERSION,
207 HeaderValue::from_static("13"),
208 );
209 let key = upgrade::sec_websocket_key();
210 parts.headers.insert(
211 http::header::SEC_WEBSOCKET_KEY,
212 key.parse().expect("valid header value"),
213 );
214 parts.headers.insert(
220 http::header::SEC_WEBSOCKET_PROTOCOL,
221 HeaderValue::from_static(upgrade::WS_PROTOCOL),
222 );
223
224 let res = self.send(Request::from_parts(parts, Body::from(body))).await?;
225 upgrade::verify_response(&res, &key).map_err(Error::UpgradeConnection)?;
226 match hyper::upgrade::on(res).await {
227 Ok(upgraded) => Ok(WebSocketStream::from_raw_socket(
228 TokioIo::new(upgraded),
229 ws::protocol::Role::Client,
230 None,
231 )
232 .await),
233
234 Err(e) => Err(Error::UpgradeConnection(
235 UpgradeConnectionError::GetPendingUpgrade(e),
236 )),
237 }
238 }
239
240 pub async fn request<T>(&self, request: Request<Vec<u8>>) -> Result<T>
243 where
244 T: DeserializeOwned,
245 {
246 let text = self.request_text(request).await?;
247
248 serde_json::from_str(&text).map_err(|e| {
249 tracing::warn!("{}, {:?}", text, e);
250 Error::SerdeError(e)
251 })
252 }
253
254 pub async fn request_text(&self, request: Request<Vec<u8>>) -> Result<String> {
257 let res = self.send(request.map(Body::from)).await?;
258 let res = handle_api_errors(res).await?;
259 let body_bytes = res.into_body().collect().await?.to_bytes();
260 let text = String::from_utf8(body_bytes.to_vec()).map_err(Error::FromUtf8)?;
261 Ok(text)
262 }
263
264 pub async fn request_stream(&self, request: Request<Vec<u8>>) -> Result<impl AsyncBufRead> {
269 let res = self.send(request.map(Body::from)).await?;
270 let res = handle_api_errors(res).await?;
271 let body = res.into_body().into_data_stream().map_err(std::io::Error::other);
274 Ok(body.into_async_read())
275 }
276
277 pub async fn request_status<T>(&self, request: Request<Vec<u8>>) -> Result<Either<T, Status>>
280 where
281 T: DeserializeOwned,
282 {
283 let text = self.request_text(request).await?;
284 let v: Value = serde_json::from_str(&text).map_err(Error::SerdeError)?;
286 if v["kind"] == "Status" {
287 tracing::trace!("Status from {}", text);
288 Ok(Right(serde_json::from_str::<Status>(&text).map_err(|e| {
289 tracing::warn!("{}, {:?}", text, e);
290 Error::SerdeError(e)
291 })?))
292 } else {
293 Ok(Left(serde_json::from_str::<T>(&text).map_err(|e| {
294 tracing::warn!("{}, {:?}", text, e);
295 Error::SerdeError(e)
296 })?))
297 }
298 }
299
300 pub async fn request_events<T>(
302 &self,
303 request: Request<Vec<u8>>,
304 ) -> Result<impl TryStream<Item = Result<WatchEvent<T>>>>
305 where
306 T: Clone + DeserializeOwned,
307 {
308 let res = self.send(request.map(Body::from)).await?;
309 tracing::trace!("headers: {:?}", res.headers());
311
312 let frames = FramedRead::new(
313 StreamReader::new(res.into_body().into_data_stream().map_err(|e| {
314 if e.to_string().contains("unexpected EOF during chunk") {
317 return std::io::Error::new(std::io::ErrorKind::UnexpectedEof, e);
318 }
319 std::io::Error::other(e)
320 })),
321 LinesCodec::new(),
322 );
323
324 Ok(frames.filter_map(|res| async {
325 match res {
326 Ok(line) => match serde_json::from_str::<WatchEvent<T>>(&line) {
327 Ok(event) => Some(Ok(event)),
328 Err(e) => {
329 if e.is_eof() {
331 return None;
332 }
333
334 if let Ok(e_resp) = serde_json::from_str::<ErrorResponse>(&line) {
336 return Some(Err(Error::Api(e_resp)));
337 }
338 Some(Err(Error::SerdeError(e)))
340 }
341 },
342
343 Err(LinesCodecError::Io(e)) => match e.kind() {
344 std::io::ErrorKind::TimedOut => {
346 tracing::warn!("timeout in poll: {}", e); None
348 }
349 std::io::ErrorKind::UnexpectedEof => {
352 tracing::warn!("eof in poll: {}", e);
353 None
354 }
355 _ => Some(Err(Error::ReadEvents(e))),
356 },
357
358 Err(LinesCodecError::MaxLineLengthExceeded) => {
361 Some(Err(Error::LinesCodecMaxLineLengthExceeded))
362 }
363 }
364 }))
365 }
366}
367
368impl Client {
374 pub async fn apiserver_version(&self) -> Result<k8s_openapi::apimachinery::pkg::version::Info> {
376 self.request(
377 Request::builder()
378 .uri("/version")
379 .body(vec![])
380 .map_err(Error::HttpError)?,
381 )
382 .await
383 }
384
385 pub async fn list_api_groups(&self) -> Result<k8s_meta_v1::APIGroupList> {
387 self.request(
388 Request::builder()
389 .uri("/apis")
390 .body(vec![])
391 .map_err(Error::HttpError)?,
392 )
393 .await
394 }
395
396 pub async fn list_api_group_resources(&self, apiversion: &str) -> Result<k8s_meta_v1::APIResourceList> {
415 let url = format!("/apis/{apiversion}");
416 self.request(
417 Request::builder()
418 .uri(url)
419 .body(vec![])
420 .map_err(Error::HttpError)?,
421 )
422 .await
423 }
424
425 pub async fn list_core_api_versions(&self) -> Result<k8s_meta_v1::APIVersions> {
427 self.request(
428 Request::builder()
429 .uri("/api")
430 .body(vec![])
431 .map_err(Error::HttpError)?,
432 )
433 .await
434 }
435
436 pub async fn list_core_api_resources(&self, version: &str) -> Result<k8s_meta_v1::APIResourceList> {
438 let url = format!("/api/{version}");
439 self.request(
440 Request::builder()
441 .uri(url)
442 .body(vec![])
443 .map_err(Error::HttpError)?,
444 )
445 .await
446 }
447}
448
449async fn handle_api_errors(res: Response<Body>) -> Result<Response<Body>> {
457 let status = res.status();
458 if status.is_client_error() || status.is_server_error() {
459 let body_bytes = res.into_body().collect().await?.to_bytes();
461 let text = String::from_utf8(body_bytes.to_vec()).map_err(Error::FromUtf8)?;
462 if let Ok(errdata) = serde_json::from_str::<ErrorResponse>(&text) {
465 tracing::debug!("Unsuccessful: {errdata:?}");
466 Err(Error::Api(errdata))
467 } else {
468 tracing::warn!("Unsuccessful data error parse: {}", text);
469 let error_response = ErrorResponse {
470 status: status.to_string(),
471 code: status.as_u16(),
472 message: format!("{text:?}"),
473 reason: "Failed to parse error data".into(),
474 };
475 tracing::debug!("Unsuccessful: {error_response:?} (reconstruct)");
476 Err(Error::Api(error_response))
477 }
478 } else {
479 Ok(res)
480 }
481}
482
483impl TryFrom<Config> for Client {
484 type Error = Error;
485
486 fn try_from(config: Config) -> Result<Self> {
490 Ok(ClientBuilder::try_from(config)?.build())
491 }
492}
493
494#[cfg(test)]
495mod tests {
496 use std::pin::pin;
497
498 use crate::{client::Body, Api, Client};
499
500 use http::{Request, Response};
501 use k8s_openapi::api::core::v1::Pod;
502 use tower_test::mock;
503
504 #[tokio::test]
505 async fn test_default_ns() {
506 let (mock_service, _) = mock::pair::<Request<Body>, Response<Body>>();
507 let client = Client::new(mock_service, "test-namespace");
508 assert_eq!(client.default_namespace(), "test-namespace");
509 }
510
511 #[tokio::test]
512 async fn test_mock() {
513 let (mock_service, handle) = mock::pair::<Request<Body>, Response<Body>>();
514 let spawned = tokio::spawn(async move {
515 let mut handle = pin!(handle);
517 let (request, send) = handle.next_request().await.expect("service not called");
518 assert_eq!(request.method(), http::Method::GET);
519 assert_eq!(request.uri().to_string(), "/api/v1/namespaces/default/pods/test");
520 let pod: Pod = serde_json::from_value(serde_json::json!({
521 "apiVersion": "v1",
522 "kind": "Pod",
523 "metadata": {
524 "name": "test",
525 "annotations": { "kube-rs": "test" },
526 },
527 "spec": {
528 "containers": [{ "name": "test", "image": "test-image" }],
529 }
530 }))
531 .unwrap();
532 send.send_response(
533 Response::builder()
534 .body(Body::from(serde_json::to_vec(&pod).unwrap()))
535 .unwrap(),
536 );
537 });
538
539 let pods: Api<Pod> = Api::default_namespaced(Client::new(mock_service, "default"));
540 let pod = pods.get("test").await.unwrap();
541 assert_eq!(pod.metadata.annotations.unwrap().get("kube-rs").unwrap(), "test");
542 spawned.await.unwrap();
543 }
544}