kube_client/
lib.rs

1//! Crate for interacting with the Kubernetes API
2//!
3//! This crate includes the tools for manipulating Kubernetes resources as
4//! well as keeping track of those resources as they change over time
5//!
6//! # Example
7//!
8//! The following example will create a [`Pod`](k8s_openapi::api::core::v1::Pod)
9//! and then watch for it to become available using a manual [`Api::watch`] call.
10//!
11//! ```rust,no_run
12//! use futures::{StreamExt, TryStreamExt};
13//! use kube_client::api::{Api, ResourceExt, ListParams, PatchParams, Patch};
14//! use kube_client::Client;
15//! use k8s_openapi::api::core::v1::Pod;
16//!
17//! #[tokio::main]
18//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
19//!     // Read the environment to find config for kube client.
20//!     // Note that this tries an in-cluster configuration first,
21//!     // then falls back on a kubeconfig file.
22//!     let client = Client::try_default().await?;
23//!
24//!     // Interact with pods in the configured namespace with the typed interface from k8s-openapi
25//!     let pods: Api<Pod> = Api::default_namespaced(client);
26//!
27//!     // Create a Pod (cheating here with json, but it has to validate against the type):
28//!     let patch: Pod = serde_json::from_value(serde_json::json!({
29//!         "apiVersion": "v1",
30//!         "kind": "Pod",
31//!         "metadata": {
32//!             "name": "my-pod"
33//!         },
34//!         "spec": {
35//!             "containers": [
36//!                 {
37//!                     "name": "my-container",
38//!                     "image": "myregistry.azurecr.io/hello-world:v1",
39//!                 },
40//!             ],
41//!         }
42//!     }))?;
43//!
44//!     // Apply the Pod via server-side apply
45//!     let params = PatchParams::apply("myapp");
46//!     let result = pods.patch("my-pod", &params, &Patch::Apply(&patch)).await?;
47//!
48//!     // List pods in the configured namespace
49//!     for p in pods.list(&ListParams::default()).await? {
50//!         println!("found pod {}", p.name_any());
51//!     }
52//!
53//!     Ok(())
54//! }
55//! ```
56//!
57//! For more details, see:
58//!
59//! - [`Client`](crate::client) for the extensible Kubernetes client
60//! - [`Config`](crate::config) for the Kubernetes config abstraction
61//! - [`Api`](crate::Api) for the generic api methods available on Kubernetes resources
62//! - [k8s-openapi](https://docs.rs/k8s-openapi) for how to create typed kubernetes objects directly
63#![cfg_attr(docsrs, feature(doc_cfg))]
64// Nightly clippy (0.1.64) considers Drop a side effect, see https://github.com/rust-lang/rust-clippy/issues/9608
65#![allow(clippy::unnecessary_lazy_evaluations)]
66
67macro_rules! cfg_client {
68    ($($item:item)*) => {
69        $(
70            #[cfg_attr(docsrs, doc(cfg(feature = "client")))]
71            #[cfg(feature = "client")]
72            $item
73        )*
74    }
75}
76macro_rules! cfg_config {
77    ($($item:item)*) => {
78        $(
79            #[cfg_attr(docsrs, doc(cfg(feature = "config")))]
80            #[cfg(feature = "config")]
81            $item
82        )*
83    }
84}
85
86macro_rules! cfg_error {
87    ($($item:item)*) => {
88        $(
89            #[cfg_attr(docsrs, doc(cfg(any(feature = "config", feature = "client"))))]
90            #[cfg(any(feature = "config", feature = "client"))]
91            $item
92        )*
93    }
94}
95
96cfg_client! {
97    pub mod api;
98    pub mod discovery;
99    pub mod client;
100
101    #[doc(inline)]
102    pub use api::Api;
103    #[doc(inline)]
104    pub use client::Client;
105    #[doc(inline)]
106    pub use discovery::Discovery;
107}
108
109cfg_config! {
110    pub mod config;
111    #[doc(inline)]
112    pub use config::Config;
113}
114
115cfg_error! {
116    pub mod error;
117    #[doc(inline)] pub use error::Error;
118    /// Convient alias for `Result<T, Error>`
119    pub type Result<T, E = Error> = std::result::Result<T, E>;
120}
121
122pub use crate::core::{CustomResourceExt, Resource, ResourceExt};
123/// Re-exports from kube_core
124pub use kube_core as core;
125
126// Tests that require a cluster and the complete feature set
127// Can be run with `cargo test -p kube-client --lib features=rustls-tls,ws -- --ignored`
128#[cfg(all(feature = "client", feature = "config"))]
129#[cfg(test)]
130#[allow(unused_imports)] // varying test imports depending on feature
131mod test {
132    use crate::{
133        api::{AttachParams, AttachedProcess},
134        client::ConfigExt,
135        Api, Client, Config, ResourceExt,
136    };
137    use futures::{AsyncBufRead, AsyncBufReadExt, StreamExt, TryStreamExt};
138    use hyper::Uri;
139    use k8s_openapi::api::core::v1::{EphemeralContainer, Pod, PodSpec};
140    use kube_core::{
141        params::{DeleteParams, Patch, PatchParams, PostParams, WatchParams},
142        response::StatusSummary,
143    };
144    use serde_json::json;
145    use tower::ServiceBuilder;
146
147    // hard disabled test atm due to k3d rustls issues: https://github.com/kube-rs/kube/issues?q=is%3Aopen+is%3Aissue+label%3Arustls
148    #[allow(dead_code)]
149    // #[tokio::test]
150    #[ignore = "needs cluster (lists pods)"]
151    #[cfg(feature = "rustls-tls")]
152    async fn custom_client_rustls_configuration() -> Result<(), Box<dyn std::error::Error>> {
153        use hyper_util::rt::TokioExecutor;
154
155        let config = Config::infer().await?;
156        let https = config.rustls_https_connector()?;
157        let service = ServiceBuilder::new()
158            .layer(config.base_uri_layer())
159            .service(hyper_util::client::legacy::Client::builder(TokioExecutor::new()).build(https));
160        let client = Client::new(service, config.default_namespace);
161        let pods: Api<Pod> = Api::default_namespaced(client);
162        pods.list(&Default::default()).await?;
163        Ok(())
164    }
165
166    #[tokio::test]
167    #[ignore = "needs cluster (lists pods)"]
168    #[cfg(feature = "openssl-tls")]
169    async fn custom_client_openssl_tls_configuration() -> Result<(), Box<dyn std::error::Error>> {
170        use hyper_util::rt::TokioExecutor;
171
172        let config = Config::infer().await?;
173        let https = config.openssl_https_connector()?;
174        let service = ServiceBuilder::new()
175            .layer(config.base_uri_layer())
176            .service(hyper_util::client::legacy::Client::builder(TokioExecutor::new()).build(https));
177        let client = Client::new(service, config.default_namespace);
178        let pods: Api<Pod> = Api::default_namespaced(client);
179        pods.list(&Default::default()).await?;
180        Ok(())
181    }
182
183    #[tokio::test]
184    #[ignore = "needs cluster (lists api resources)"]
185    #[cfg(feature = "client")]
186    async fn group_discovery_oneshot() -> Result<(), Box<dyn std::error::Error>> {
187        use crate::{core::DynamicObject, discovery};
188        let client = Client::try_default().await?;
189        let apigroup = discovery::group(&client, "apiregistration.k8s.io").await?;
190        let (ar, _caps) = apigroup.recommended_kind("APIService").unwrap();
191        let api: Api<DynamicObject> = Api::all_with(client.clone(), &ar);
192        api.list(&Default::default()).await?;
193
194        Ok(())
195    }
196
197    #[tokio::test]
198    #[ignore = "needs cluster (will create and edit a pod)"]
199    async fn pod_can_use_core_apis() -> Result<(), Box<dyn std::error::Error>> {
200        use kube::api::{DeleteParams, ListParams, Patch, PatchParams, PostParams, WatchEvent};
201
202        let client = Client::try_default().await?;
203        let pods: Api<Pod> = Api::default_namespaced(client);
204
205        // create busybox pod that's alive for at most 30s
206        let p: Pod = serde_json::from_value(json!({
207            "apiVersion": "v1",
208            "kind": "Pod",
209            "metadata": {
210                "name": "busybox-kube1",
211                "labels": { "app": "kube-rs-test" },
212            },
213            "spec": {
214                "terminationGracePeriodSeconds": 1,
215                "restartPolicy": "Never",
216                "containers": [{
217                  "name": "busybox",
218                  "image": "busybox:1.34.1",
219                  "command": ["sh", "-c", "sleep 30"],
220                }],
221            }
222        }))?;
223
224        let pp = PostParams::default();
225        match pods.create(&pp, &p).await {
226            Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()),
227            Err(crate::Error::Api(ae)) => assert_eq!(ae.code, 409), // if we failed to clean-up
228            Err(e) => return Err(e.into()),                         // any other case if a failure
229        }
230
231        // Manual watch-api for it to become ready
232        // NB: don't do this; using conditions (see pod_api example) is easier and less error prone
233        let wp = WatchParams::default()
234            .fields(&format!("metadata.name={}", "busybox-kube1"))
235            .timeout(15);
236        let mut stream = pods.watch(&wp, "0").await?.boxed();
237        while let Some(ev) = stream.try_next().await? {
238            // can debug format watch event
239            let _ = format!("we: {ev:?}");
240            match ev {
241                WatchEvent::Modified(o) => {
242                    let s = o.status.as_ref().expect("status exists on pod");
243                    let phase = s.phase.clone().unwrap_or_default();
244                    if phase == "Running" {
245                        break;
246                    }
247                }
248                WatchEvent::Error(e) => panic!("watch error: {e}"),
249                _ => {}
250            }
251        }
252
253        // Verify we can get it
254        let mut pod = pods.get("busybox-kube1").await?;
255        assert_eq!(p.spec.as_ref().unwrap().containers[0].name, "busybox");
256
257        // verify replace with explicit resource version
258        // NB: don't do this; use server side apply
259        {
260            assert!(pod.resource_version().is_some());
261            pod.spec.as_mut().unwrap().active_deadline_seconds = Some(5);
262
263            let pp = PostParams::default();
264            let patched_pod = pods.replace("busybox-kube1", &pp, &pod).await?;
265            assert_eq!(patched_pod.spec.unwrap().active_deadline_seconds, Some(5));
266        }
267
268        // Delete it
269        let dp = DeleteParams::default();
270        pods.delete("busybox-kube1", &dp).await?.map_left(|pdel| {
271            assert_eq!(pdel.name_unchecked(), "busybox-kube1");
272        });
273
274        Ok(())
275    }
276
277    #[tokio::test]
278    #[ignore = "needs cluster (will create and attach to a pod)"]
279    #[cfg(feature = "ws")]
280    async fn pod_can_exec_and_write_to_stdin() -> Result<(), Box<dyn std::error::Error>> {
281        use crate::api::{DeleteParams, ListParams, Patch, PatchParams, WatchEvent};
282
283        let client = Client::try_default().await?;
284        let pods: Api<Pod> = Api::default_namespaced(client);
285
286        // create busybox pod that's alive for at most 30s
287        let p: Pod = serde_json::from_value(json!({
288            "apiVersion": "v1",
289            "kind": "Pod",
290            "metadata": {
291                "name": "busybox-kube2",
292                "labels": { "app": "kube-rs-test" },
293            },
294            "spec": {
295                "terminationGracePeriodSeconds": 1,
296                "restartPolicy": "Never",
297                "containers": [{
298                  "name": "busybox",
299                  "image": "busybox:1.34.1",
300                  "command": ["sh", "-c", "sleep 30"],
301                }],
302            }
303        }))?;
304
305        match pods.create(&Default::default(), &p).await {
306            Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()),
307            Err(crate::Error::Api(ae)) => assert_eq!(ae.code, 409), // if we failed to clean-up
308            Err(e) => return Err(e.into()),                         // any other case if a failure
309        }
310
311        // Manual watch-api for it to become ready
312        // NB: don't do this; using conditions (see pod_api example) is easier and less error prone
313        let wp = WatchParams::default()
314            .fields(&format!("metadata.name={}", "busybox-kube2"))
315            .timeout(15);
316        let mut stream = pods.watch(&wp, "0").await?.boxed();
317        while let Some(ev) = stream.try_next().await? {
318            match ev {
319                WatchEvent::Modified(o) => {
320                    let s = o.status.as_ref().expect("status exists on pod");
321                    let phase = s.phase.clone().unwrap_or_default();
322                    if phase == "Running" {
323                        break;
324                    }
325                }
326                WatchEvent::Error(e) => panic!("watch error: {e}"),
327                _ => {}
328            }
329        }
330
331        // Verify exec works and we can get the output
332        {
333            let mut attached = pods
334                .exec(
335                    "busybox-kube2",
336                    vec!["sh", "-c", "for i in $(seq 1 3); do echo $i; done"],
337                    &AttachParams::default().stderr(false),
338                )
339                .await?;
340            let stdout = tokio_util::io::ReaderStream::new(attached.stdout().unwrap());
341            let out = stdout
342                .filter_map(|r| async { r.ok().and_then(|v| String::from_utf8(v.to_vec()).ok()) })
343                .collect::<Vec<_>>()
344                .await
345                .join("");
346            attached.join().await.unwrap();
347            assert_eq!(out.lines().count(), 3);
348            assert_eq!(out, "1\n2\n3\n");
349        }
350
351        // Verify we can write to Stdin
352        {
353            use tokio::io::AsyncWriteExt;
354            let mut attached = pods
355                .exec(
356                    "busybox-kube2",
357                    vec!["sh"],
358                    &AttachParams::default().stdin(true).stderr(false),
359                )
360                .await?;
361            let mut stdin_writer = attached.stdin().unwrap();
362            let mut stdout_stream = tokio_util::io::ReaderStream::new(attached.stdout().unwrap());
363            let next_stdout = stdout_stream.next();
364            stdin_writer.write_all(b"echo test string 1\n").await?;
365            let stdout = String::from_utf8(next_stdout.await.unwrap().unwrap().to_vec()).unwrap();
366            println!("{stdout}");
367            assert_eq!(stdout, "test string 1\n");
368
369            // AttachedProcess resolves with status object.
370            // Send `exit 1` to get a failure status.
371            stdin_writer.write_all(b"exit 1\n").await?;
372            let status = attached.take_status().unwrap();
373            if let Some(status) = status.await {
374                println!("{status:?}");
375                assert_eq!(status.status, Some("Failure".to_owned()));
376                assert_eq!(status.reason, Some("NonZeroExitCode".to_owned()));
377            }
378        }
379
380        // Delete it
381        let dp = DeleteParams::default();
382        pods.delete("busybox-kube2", &dp).await?.map_left(|pdel| {
383            assert_eq!(pdel.name_unchecked(), "busybox-kube2");
384        });
385
386        Ok(())
387    }
388
389    #[tokio::test]
390    #[ignore = "needs cluster (will create and tail logs from a pod)"]
391    async fn can_get_pod_logs_and_evict() -> Result<(), Box<dyn std::error::Error>> {
392        use crate::{
393            api::{DeleteParams, EvictParams, ListParams, Patch, PatchParams, WatchEvent},
394            core::subresource::LogParams,
395        };
396
397        let client = Client::try_default().await?;
398        let pods: Api<Pod> = Api::default_namespaced(client);
399
400        // create busybox pod that's alive for at most 30s
401        let p: Pod = serde_json::from_value(json!({
402            "apiVersion": "v1",
403            "kind": "Pod",
404            "metadata": {
405                "name": "busybox-kube3",
406                "labels": { "app": "kube-rs-test" },
407            },
408            "spec": {
409                "terminationGracePeriodSeconds": 1,
410                "restartPolicy": "Never",
411                "containers": [{
412                  "name": "busybox",
413                  "image": "busybox:1.34.1",
414                  "command": ["sh", "-c", "for i in $(seq 1 5); do echo kube $i; sleep 0.1; done"],
415                }],
416            }
417        }))?;
418
419        match pods.create(&Default::default(), &p).await {
420            Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()),
421            Err(crate::Error::Api(ae)) => assert_eq!(ae.code, 409), // if we failed to clean-up
422            Err(e) => return Err(e.into()),                         // any other case if a failure
423        }
424
425        // Manual watch-api for it to become ready
426        // NB: don't do this; using conditions (see pod_api example) is easier and less error prone
427        let wp = WatchParams::default()
428            .fields(&format!("metadata.name={}", "busybox-kube3"))
429            .timeout(15);
430        let mut stream = pods.watch(&wp, "0").await?.boxed();
431        while let Some(ev) = stream.try_next().await? {
432            match ev {
433                WatchEvent::Modified(o) => {
434                    let s = o.status.as_ref().expect("status exists on pod");
435                    let phase = s.phase.clone().unwrap_or_default();
436                    if phase == "Running" {
437                        break;
438                    }
439                }
440                WatchEvent::Error(e) => panic!("watch error: {e}"),
441                _ => {}
442            }
443        }
444
445        // Get current list of logs
446        let lp = LogParams {
447            follow: true,
448            ..LogParams::default()
449        };
450        let mut logs_stream = pods.log_stream("busybox-kube3", &lp).await?.lines();
451
452        // wait for container to finish
453        tokio::time::sleep(std::time::Duration::from_secs(2)).await;
454
455        let all_logs = pods.logs("busybox-kube3", &Default::default()).await?;
456        assert_eq!(all_logs, "kube 1\nkube 2\nkube 3\nkube 4\nkube 5\n");
457
458        // individual logs may or may not buffer
459        let mut output = vec![];
460        while let Some(line) = logs_stream.try_next().await? {
461            output.push(line);
462        }
463        assert_eq!(output, vec!["kube 1", "kube 2", "kube 3", "kube 4", "kube 5"]);
464
465        // evict the pod
466        let ep = EvictParams::default();
467        let eres = pods.evict("busybox-kube3", &ep).await?;
468        assert_eq!(eres.code, 201); // created
469        assert!(eres.is_success());
470
471        Ok(())
472    }
473
474    #[tokio::test]
475    #[ignore = "requires a cluster"]
476    async fn can_operate_on_pod_metadata() -> Result<(), Box<dyn std::error::Error>> {
477        use crate::{
478            api::{DeleteParams, EvictParams, ListParams, Patch, PatchParams, WatchEvent},
479            core::subresource::LogParams,
480        };
481        use kube_core::{ObjectList, ObjectMeta, PartialObjectMeta, PartialObjectMetaExt};
482
483        let client = Client::try_default().await?;
484        let pods: Api<Pod> = Api::default_namespaced(client);
485
486        // create busybox pod that's alive for at most 30s
487        let p: Pod = serde_json::from_value(json!({
488            "apiVersion": "v1",
489            "kind": "Pod",
490            "metadata": {
491                "name": "busybox-kube-meta",
492                "labels": { "app": "kube-rs-test" },
493            },
494            "spec": {
495                "terminationGracePeriodSeconds": 1,
496                "restartPolicy": "Never",
497                "containers": [{
498                  "name": "busybox",
499                  "image": "busybox:1.34.1",
500                  "command": ["sh", "-c", "sleep 30s"],
501                }],
502            }
503        }))?;
504
505        match pods.create(&Default::default(), &p).await {
506            Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()),
507            Err(crate::Error::Api(ae)) => assert_eq!(ae.code, 409), // if we failed to clean-up
508            Err(e) => return Err(e.into()),                         // any other case if a failure
509        }
510
511        // Test we can get a pod as a PartialObjectMeta and convert to
512        // ObjectMeta
513        let pod_metadata = pods.get_metadata("busybox-kube-meta").await?;
514        assert_eq!("busybox-kube-meta", pod_metadata.name_any());
515        assert_eq!(
516            Some((&"app".to_string(), &"kube-rs-test".to_string())),
517            pod_metadata.labels().get_key_value("app")
518        );
519
520        // Test we can get a list of PartialObjectMeta for pods
521        let p_list = pods.list_metadata(&ListParams::default()).await?;
522
523        // Find only pod we are concerned with in this test and fail eagerly if
524        // name doesn't exist
525        let pod_metadata = p_list
526            .items
527            .into_iter()
528            .find(|p| p.name_any() == "busybox-kube-meta")
529            .unwrap();
530        assert_eq!(
531            pod_metadata.labels().get("app"),
532            Some(&"kube-rs-test".to_string())
533        );
534
535        // Attempt to patch pod metadata
536        let patch = ObjectMeta {
537            annotations: Some([("test".to_string(), "123".to_string())].into()),
538            ..Default::default()
539        }
540        .into_request_partial::<Pod>();
541
542        let patchparams = PatchParams::default();
543        let p_patched = pods
544            .patch_metadata("busybox-kube-meta", &patchparams, &Patch::Merge(&patch))
545            .await?;
546        assert_eq!(p_patched.annotations().get("test"), Some(&"123".to_string()));
547        assert_eq!(p_patched.types.as_ref().unwrap().kind, "PartialObjectMetadata");
548        assert_eq!(p_patched.types.as_ref().unwrap().api_version, "meta.k8s.io/v1");
549
550        // Clean-up
551        let dp = DeleteParams::default();
552        pods.delete("busybox-kube-meta", &dp).await?.map_left(|pdel| {
553            assert_eq!(pdel.name_any(), "busybox-kube-meta");
554        });
555
556        Ok(())
557    }
558    #[tokio::test]
559    #[ignore = "needs cluster (will create a CertificateSigningRequest)"]
560    async fn csr_can_be_approved() -> Result<(), Box<dyn std::error::Error>> {
561        use crate::api::PostParams;
562        use k8s_openapi::api::certificates::v1::{
563            CertificateSigningRequest, CertificateSigningRequestCondition, CertificateSigningRequestStatus,
564        };
565
566        let csr_name = "fake";
567        let dummy_csr: CertificateSigningRequest = serde_json::from_value(json!({
568            "apiVersion": "certificates.k8s.io/v1",
569            "kind": "CertificateSigningRequest",
570            "metadata": { "name": csr_name },
571            "spec": {
572                "request": "LS0tLS1CRUdJTiBDRVJUSUZJQ0FURSBSRVFVRVNULS0tLS0KTUlJQ1ZqQ0NBVDRDQVFBd0VURVBNQTBHQTFVRUF3d0dZVzVuWld4aE1JSUJJakFOQmdrcWhraUc5dzBCQVFFRgpBQU9DQVE4QU1JSUJDZ0tDQVFFQTByczhJTHRHdTYxakx2dHhWTTJSVlRWMDNHWlJTWWw0dWluVWo4RElaWjBOCnR2MUZtRVFSd3VoaUZsOFEzcWl0Qm0wMUFSMkNJVXBGd2ZzSjZ4MXF3ckJzVkhZbGlBNVhwRVpZM3ExcGswSDQKM3Z3aGJlK1o2MVNrVHF5SVBYUUwrTWM5T1Nsbm0xb0R2N0NtSkZNMUlMRVI3QTVGZnZKOEdFRjJ6dHBoaUlFMwpub1dtdHNZb3JuT2wzc2lHQ2ZGZzR4Zmd4eW8ybmlneFNVekl1bXNnVm9PM2ttT0x1RVF6cXpkakJ3TFJXbWlECklmMXBMWnoyalVnald4UkhCM1gyWnVVV1d1T09PZnpXM01LaE8ybHEvZi9DdS8wYk83c0x0MCt3U2ZMSU91TFcKcW90blZtRmxMMytqTy82WDNDKzBERHk5aUtwbXJjVDBnWGZLemE1dHJRSURBUUFCb0FBd0RRWUpLb1pJaHZjTgpBUUVMQlFBRGdnRUJBR05WdmVIOGR4ZzNvK21VeVRkbmFjVmQ1N24zSkExdnZEU1JWREkyQTZ1eXN3ZFp1L1BVCkkwZXpZWFV0RVNnSk1IRmQycVVNMjNuNVJsSXJ3R0xuUXFISUh5VStWWHhsdnZsRnpNOVpEWllSTmU3QlJvYXgKQVlEdUI5STZXT3FYbkFvczFqRmxNUG5NbFpqdU5kSGxpT1BjTU1oNndLaTZzZFhpVStHYTJ2RUVLY01jSVUyRgpvU2djUWdMYTk0aEpacGk3ZnNMdm1OQUxoT045UHdNMGM1dVJVejV4T0dGMUtCbWRSeEgvbUNOS2JKYjFRQm1HCkkwYitEUEdaTktXTU0xMzhIQXdoV0tkNjVoVHdYOWl4V3ZHMkh4TG1WQzg0L1BHT0tWQW9FNkpsYWFHdTlQVmkKdjlOSjVaZlZrcXdCd0hKbzZXdk9xVlA3SVFjZmg3d0drWm89Ci0tLS0tRU5EIENFUlRJRklDQVRFIFJFUVVFU1QtLS0tLQo=",
573                "signerName": "kubernetes.io/kube-apiserver-client",
574                "expirationSeconds": 86400,
575                "usages": ["client auth"]
576            }
577        }))?;
578
579        let client = Client::try_default().await?;
580        let csr: Api<CertificateSigningRequest> = Api::all(client.clone());
581        assert!(csr.create(&PostParams::default(), &dummy_csr).await.is_ok());
582
583        // Patch the approval and approve the CSR
584        let approval_type = "ApprovedFake";
585        let csr_status: CertificateSigningRequestStatus = CertificateSigningRequestStatus {
586            certificate: None,
587            conditions: Some(vec![CertificateSigningRequestCondition {
588                type_: approval_type.to_string(),
589                last_update_time: None,
590                last_transition_time: None,
591                message: Some(format!("{} {}", approval_type, "by kube-rs client")),
592                reason: Some("kube-rsClient".to_string()),
593                status: "True".to_string(),
594            }]),
595        };
596        let csr_status_patch = Patch::Merge(serde_json::json!({ "status": csr_status }));
597        let _ = csr
598            .patch_approval(csr_name, &Default::default(), &csr_status_patch)
599            .await?;
600        let csr_after_approval = csr.get_approval(csr_name).await?;
601
602        assert_eq!(
603            csr_after_approval
604                .status
605                .as_ref()
606                .unwrap()
607                .conditions
608                .as_ref()
609                .unwrap()[0]
610                .type_,
611            approval_type.to_string()
612        );
613        csr.delete(csr_name, &DeleteParams::default()).await?;
614        Ok(())
615    }
616
617    #[tokio::test]
618    #[ignore = "needs cluster for ephemeral containers operations"]
619    async fn can_operate_on_ephemeral_containers() -> Result<(), Box<dyn std::error::Error>> {
620        let client = Client::try_default().await?;
621
622        // Ephemeral containers were stabilized in Kubernetes v1.25.
623        // This test therefore exits early if the current cluster version is older than v1.25.
624        let api_version = client.apiserver_version().await?;
625        if api_version.major.parse::<i32>()? < 1 || api_version.minor.parse::<i32>()? < 25 {
626            return Ok(());
627        }
628
629        let pod: Pod = serde_json::from_value(serde_json::json!({
630            "apiVersion": "v1",
631            "kind": "Pod",
632            "metadata": {
633                "name": "ephemeral-container-test",
634                "labels": { "app": "kube-rs-test" },
635            },
636            "spec": {
637                "restartPolicy": "Never",
638                "containers": [{
639                  "name": "busybox",
640                  "image": "busybox:1.34.1",
641                  "command": ["sh", "-c", "sleep 2"],
642                }],
643            }
644        }))?;
645
646        let pod_name = pod.name_any();
647        let pods = Api::<Pod>::default_namespaced(client);
648
649        // If cleanup failed and a pod already exists, we attempt to remove it
650        // before proceeding. This is important as ephemeral containers can't
651        // be removed from a Pod's spec. Therefore this test must start with a fresh
652        // Pod every time.
653        let _ = pods
654            .delete(&pod.name_any(), &DeleteParams::default())
655            .await
656            .map(|v| v.map_left(|pdel| assert_eq!(pdel.name_any(), pod.name_any())));
657
658        // Ephemeral containes can only be applied to a running pod, so one must
659        // be created before any operations are tested.
660        match pods.create(&Default::default(), &pod).await {
661            Ok(o) => assert_eq!(pod.name_unchecked(), o.name_unchecked()),
662            Err(e) => return Err(e.into()), // any other case if a failure
663        }
664
665        let current_ephemeral_containers = pods
666            .get_ephemeral_containers(&pod.name_any())
667            .await?
668            .spec
669            .unwrap()
670            .ephemeral_containers;
671
672        // We expect no ephemeral containers initially, get_ephemeral_containers should
673        // reflect that.
674        assert_eq!(current_ephemeral_containers, None);
675
676        let mut busybox_eph: EphemeralContainer = serde_json::from_value(json!(
677            {
678                "name": "myephemeralcontainer1",
679                "image": "busybox:1.34.1",
680                "command": ["sh", "-c", "sleep 2"],
681            }
682        ))?;
683
684        // Attempt to replace ephemeral containers.
685
686        let patch: Pod = serde_json::from_value(json!({
687            "metadata": { "name": pod_name },
688            "spec":{ "ephemeralContainers": [ busybox_eph ] }
689        }))?;
690
691        let current_containers = pods
692            .replace_ephemeral_containers(&pod_name, &PostParams::default(), &patch)
693            .await?
694            .spec
695            .unwrap()
696            .ephemeral_containers
697            .expect("could find ephemeral container");
698
699        // Note that we can't compare the whole ephemeral containers object, as some fields
700        // are set by the cluster. We therefore compare the fields specified in the patch.
701        assert_eq!(current_containers.len(), 1);
702        assert_eq!(current_containers[0].name, busybox_eph.name);
703        assert_eq!(current_containers[0].image, busybox_eph.image);
704        assert_eq!(current_containers[0].command, busybox_eph.command);
705
706        // Attempt to patch ephemeral containers.
707
708        // The new ephemeral container will have different values from the
709        // first to ensure we can test for its presence.
710        busybox_eph = serde_json::from_value(json!(
711            {
712                "name": "myephemeralcontainer2",
713                "image": "busybox:1.35.0",
714                "command": ["sh", "-c", "sleep 1"],
715            }
716        ))?;
717
718        let patch: Pod =
719            serde_json::from_value(json!({ "spec": { "ephemeralContainers": [ busybox_eph ] }}))?;
720
721        let current_containers = pods
722            .patch_ephemeral_containers(&pod_name, &PatchParams::default(), &Patch::Strategic(patch))
723            .await?
724            .spec
725            .unwrap()
726            .ephemeral_containers
727            .expect("could find ephemeral container");
728
729        // There should only be 2 ephemeral containers at this point,
730        // one from each patch
731        assert_eq!(current_containers.len(), 2);
732
733        let new_container = current_containers
734            .iter()
735            .find(|c| c.name == busybox_eph.name)
736            .expect("could find myephemeralcontainer2");
737
738        // Note that we can't compare the whole ephemeral container object, as some fields
739        // get set in the cluster. We therefore compare the fields specified in the patch.
740        assert_eq!(new_container.image, busybox_eph.image);
741        assert_eq!(new_container.command, busybox_eph.command);
742
743        // Attempt to get ephemeral containers.
744
745        let expected_containers = current_containers;
746
747        let current_containers = pods
748            .get_ephemeral_containers(&pod.name_any())
749            .await?
750            .spec
751            .unwrap()
752            .ephemeral_containers
753            .unwrap();
754
755        assert_eq!(current_containers, expected_containers);
756
757        pods.delete(&pod.name_any(), &DeleteParams::default())
758            .await?
759            .map_left(|pdel| {
760                assert_eq!(pdel.name_any(), pod.name_any());
761            });
762
763        Ok(())
764    }
765
766    #[tokio::test]
767    #[ignore = "needs kubelet debug methods"]
768    #[cfg(feature = "kubelet-debug")]
769    async fn pod_can_exec_and_write_to_stdin_from_node_proxy() -> Result<(), Box<dyn std::error::Error>> {
770        use crate::{
771            api::{DeleteParams, ListParams, Patch, PatchParams, WatchEvent},
772            core::kubelet_debug::KubeletDebugParams,
773        };
774
775        let client = Client::try_default().await?;
776        let pods: Api<Pod> = Api::default_namespaced(client);
777
778        // create busybox pod that's alive for at most 30s
779        let p: Pod = serde_json::from_value(json!({
780            "apiVersion": "v1",
781            "kind": "Pod",
782            "metadata": {
783                "name": "busybox-kube2",
784                "labels": { "app": "kube-rs-test" },
785            },
786            "spec": {
787                "terminationGracePeriodSeconds": 1,
788                "restartPolicy": "Never",
789                "containers": [{
790                  "name": "busybox",
791                  "image": "busybox:1.34.1",
792                  "command": ["sh", "-c", "sleep 30"],
793                }],
794            }
795        }))?;
796
797        match pods.create(&Default::default(), &p).await {
798            Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()),
799            Err(crate::Error::Api(ae)) => assert_eq!(ae.code, 409), // if we failed to clean-up
800            Err(e) => return Err(e.into()),                         // any other case if a failure
801        }
802
803        // Manual watch-api for it to become ready
804        // NB: don't do this; using conditions (see pod_api example) is easier and less error prone
805        let wp = WatchParams::default()
806            .fields(&format!("metadata.name={}", "busybox-kube2"))
807            .timeout(15);
808        let mut stream = pods.watch(&wp, "0").await?.boxed();
809        while let Some(ev) = stream.try_next().await? {
810            match ev {
811                WatchEvent::Modified(o) => {
812                    let s = o.status.as_ref().expect("status exists on pod");
813                    let phase = s.phase.clone().unwrap_or_default();
814                    if phase == "Running" {
815                        break;
816                    }
817                }
818                WatchEvent::Error(e) => panic!("watch error: {e}"),
819                _ => {}
820            }
821        }
822
823        let mut config = Config::infer().await?;
824        config.accept_invalid_certs = true;
825        config.cluster_url = "https://localhost:10250".to_string().parse::<Uri>().unwrap();
826        let kubelet_client: Client = config.try_into()?;
827
828        // Verify exec works and we can get the output
829        {
830            let mut attached = kubelet_client
831                .kubelet_node_exec(
832                    &KubeletDebugParams {
833                        name: "busybox-kube2",
834                        namespace: "default",
835                        ..Default::default()
836                    },
837                    "busybox",
838                    vec!["sh", "-c", "for i in $(seq 1 3); do echo $i; done"],
839                    &AttachParams::default().stderr(false),
840                )
841                .await?;
842            let stdout = tokio_util::io::ReaderStream::new(attached.stdout().unwrap());
843            let out = stdout
844                .filter_map(|r| async { r.ok().and_then(|v| String::from_utf8(v.to_vec()).ok()) })
845                .collect::<Vec<_>>()
846                .await
847                .join("");
848            attached.join().await.unwrap();
849            assert_eq!(out.lines().count(), 3);
850            assert_eq!(out, "1\n2\n3\n");
851        }
852
853        // Verify we can write to Stdin
854        {
855            use tokio::io::AsyncWriteExt;
856            let mut attached = kubelet_client
857                .kubelet_node_exec(
858                    &KubeletDebugParams {
859                        name: "busybox-kube2",
860                        namespace: "default",
861                        ..Default::default()
862                    },
863                    "busybox",
864                    vec!["sh"],
865                    &AttachParams::default().stdin(true).stderr(false),
866                )
867                .await?;
868            let mut stdin_writer = attached.stdin().unwrap();
869            let mut stdout_stream = tokio_util::io::ReaderStream::new(attached.stdout().unwrap());
870            let next_stdout = stdout_stream.next();
871            stdin_writer.write_all(b"echo test string 1\n").await?;
872            let stdout = String::from_utf8(next_stdout.await.unwrap().unwrap().to_vec()).unwrap();
873            println!("{stdout}");
874            assert_eq!(stdout, "test string 1\n");
875
876            // AttachedProcess resolves with status object.
877            // Send `exit 1` to get a failure status.
878            stdin_writer.write_all(b"exit 1\n").await?;
879            let status = attached.take_status().unwrap();
880            if let Some(status) = status.await {
881                println!("{status:?}");
882                assert_eq!(status.status, Some("Failure".to_owned()));
883                assert_eq!(status.reason, Some("NonZeroExitCode".to_owned()));
884            }
885        }
886
887        // Delete it
888        let dp = DeleteParams::default();
889        pods.delete("busybox-kube2", &dp).await?.map_left(|pdel| {
890            assert_eq!(pdel.name_unchecked(), "busybox-kube2");
891        });
892
893        Ok(())
894    }
895}