Skip to main content

kube_client/
lib.rs

1//! Crate for interacting with the Kubernetes API
2//!
3//! This crate includes the tools for manipulating Kubernetes resources as
4//! well as keeping track of those resources as they change over time
5//!
6//! # Example
7//!
8//! The following example will create a [`Pod`](k8s_openapi::api::core::v1::Pod)
9//! and then watch for it to become available using a manual [`Api::watch`] call.
10//!
11//! ```rust,no_run
12//! use futures::{StreamExt, TryStreamExt};
13//! use kube_client::api::{Api, ResourceExt, ListParams, PatchParams, Patch};
14//! use kube_client::Client;
15//! use k8s_openapi::api::core::v1::Pod;
16//!
17//! #[tokio::main]
18//! async fn main() -> Result<(), Box<dyn std::error::Error>> {
19//!     // Read the environment to find config for kube client.
20//!     // Note that this tries an in-cluster configuration first,
21//!     // then falls back on a kubeconfig file.
22//!     let client = Client::try_default().await?;
23//!
24//!     // Interact with pods in the configured namespace with the typed interface from k8s-openapi
25//!     let pods: Api<Pod> = Api::default_namespaced(client);
26//!
27//!     // Create a Pod (cheating here with json, but it has to validate against the type):
28//!     let patch: Pod = serde_json::from_value(serde_json::json!({
29//!         "apiVersion": "v1",
30//!         "kind": "Pod",
31//!         "metadata": {
32//!             "name": "my-pod"
33//!         },
34//!         "spec": {
35//!             "containers": [
36//!                 {
37//!                     "name": "my-container",
38//!                     "image": "myregistry.azurecr.io/hello-world:v1",
39//!                 },
40//!             ],
41//!         }
42//!     }))?;
43//!
44//!     // Apply the Pod via server-side apply
45//!     let params = PatchParams::apply("myapp");
46//!     let result = pods.patch("my-pod", &params, &Patch::Apply(&patch)).await?;
47//!
48//!     // List pods in the configured namespace
49//!     for p in pods.list(&ListParams::default()).await? {
50//!         println!("found pod {}", p.name_any());
51//!     }
52//!
53//!     Ok(())
54//! }
55//! ```
56//!
57//! For more details, see:
58//!
59//! - [`Client`](crate::client) for the extensible Kubernetes client
60//! - [`Config`](crate::config) for the Kubernetes config abstraction
61//! - [`Api`](crate::Api) for the generic api methods available on Kubernetes resources
62//! - [k8s-openapi](https://docs.rs/k8s-openapi) for how to create typed kubernetes objects directly
63#![cfg_attr(docsrs, feature(doc_cfg))]
64// Nightly clippy (0.1.64) considers Drop a side effect, see https://github.com/rust-lang/rust-clippy/issues/9608
65#![allow(clippy::unnecessary_lazy_evaluations)]
66
67macro_rules! cfg_client {
68    ($($item:item)*) => {
69        $(
70            #[cfg_attr(docsrs, doc(cfg(feature = "client")))]
71            #[cfg(feature = "client")]
72            $item
73        )*
74    }
75}
76macro_rules! cfg_config {
77    ($($item:item)*) => {
78        $(
79            #[cfg_attr(docsrs, doc(cfg(feature = "config")))]
80            #[cfg(feature = "config")]
81            $item
82        )*
83    }
84}
85
86macro_rules! cfg_error {
87    ($($item:item)*) => {
88        $(
89            #[cfg_attr(docsrs, doc(cfg(any(feature = "config", feature = "client"))))]
90            #[cfg(any(feature = "config", feature = "client"))]
91            $item
92        )*
93    }
94}
95
96cfg_client! {
97    pub mod api;
98    pub mod discovery;
99    pub mod client;
100
101    #[doc(inline)]
102    pub use api::Api;
103    #[doc(inline)]
104    pub use client::Client;
105    #[doc(inline)]
106    pub use discovery::Discovery;
107}
108
109cfg_config! {
110    pub mod config;
111    #[doc(inline)]
112    pub use config::Config;
113}
114
115cfg_error! {
116    pub mod error;
117    #[doc(inline)] pub use error::Error;
118    /// Convient alias for `Result<T, Error>`
119    pub type Result<T, E = Error> = std::result::Result<T, E>;
120}
121
122pub use crate::core::{CustomResourceExt, Resource, ResourceExt};
123/// Re-exports from kube_core
124pub use kube_core as core;
125
126// Tests that require a cluster and the complete feature set
127// Can be run with `cargo test -p kube-client --lib features=rustls-tls,ws -- --ignored`
128#[cfg(all(feature = "client", feature = "config"))]
129#[cfg(test)]
130#[allow(unused_imports)] // varying test imports depending on feature
131mod test {
132    use crate::{
133        Api, Client, Config, ResourceExt,
134        api::{AttachParams, AttachedProcess},
135        client::ConfigExt,
136    };
137    use futures::{AsyncBufRead, AsyncBufReadExt, StreamExt, TryStreamExt};
138    use hyper::Uri;
139    use k8s_openapi::api::core::v1::{EphemeralContainer, Pod, PodSpec};
140    use kube_core::{
141        params::{DeleteParams, Patch, PatchParams, PostParams, WatchParams},
142        response::StatusSummary,
143    };
144    use serde_json::json;
145    use tower::ServiceBuilder;
146
147    // hard disabled test atm due to k3d rustls issues: https://github.com/kube-rs/kube/issues?q=is%3Aopen+is%3Aissue+label%3Arustls
148    #[allow(dead_code)]
149    // #[tokio::test]
150    #[ignore = "needs cluster (lists pods)"]
151    #[cfg(feature = "rustls-tls")]
152    async fn custom_client_rustls_configuration() -> Result<(), Box<dyn std::error::Error>> {
153        use hyper_util::rt::TokioExecutor;
154
155        let config = Config::infer().await?;
156        let https = config.rustls_https_connector()?;
157        let service = ServiceBuilder::new()
158            .layer(config.base_uri_layer())
159            .service(hyper_util::client::legacy::Client::builder(TokioExecutor::new()).build(https));
160        let client = Client::new(service, config.default_namespace);
161        let pods: Api<Pod> = Api::default_namespaced(client);
162        pods.list(&Default::default()).await?;
163        Ok(())
164    }
165
166    #[tokio::test]
167    #[ignore = "needs cluster (lists pods)"]
168    #[cfg(feature = "openssl-tls")]
169    async fn custom_client_openssl_tls_configuration() -> Result<(), Box<dyn std::error::Error>> {
170        use hyper_util::rt::TokioExecutor;
171
172        let config = Config::infer().await?;
173        let https = config.openssl_https_connector()?;
174        let service = ServiceBuilder::new()
175            .layer(config.base_uri_layer())
176            .service(hyper_util::client::legacy::Client::builder(TokioExecutor::new()).build(https));
177        let client = Client::new(service, config.default_namespace);
178        let pods: Api<Pod> = Api::default_namespaced(client);
179        pods.list(&Default::default()).await?;
180        Ok(())
181    }
182
183    #[tokio::test]
184    #[ignore = "needs cluster (lists api resources)"]
185    #[cfg(feature = "client")]
186    async fn group_discovery_oneshot() -> Result<(), Box<dyn std::error::Error>> {
187        use crate::{core::DynamicObject, discovery};
188        let client = Client::try_default().await?;
189        let apigroup = discovery::group(&client, "apiregistration.k8s.io").await?;
190        let (ar, _caps) = apigroup.recommended_kind("APIService").unwrap();
191        let api: Api<DynamicObject> = Api::all_with(client.clone(), &ar);
192        api.list(&Default::default()).await?;
193
194        Ok(())
195    }
196
197    #[tokio::test]
198    #[ignore = "needs cluster (uses aggregated discovery, requires k8s 1.26+)"]
199    #[cfg(feature = "client")]
200    async fn aggregated_discovery_apis() -> Result<(), Box<dyn std::error::Error>> {
201        let client = Client::try_default().await?;
202
203        // Test /apis aggregated discovery
204        let apis_discovery = client.list_api_groups_aggregated().await?;
205        assert!(!apis_discovery.items.is_empty(), "should have API groups");
206
207        // Find the apps group
208        let apps_group = apis_discovery
209            .items
210            .iter()
211            .find(|g| g.metadata.as_ref().and_then(|m| m.name.as_ref()) == Some(&"apps".to_string()));
212        assert!(apps_group.is_some(), "should have apps group");
213
214        let apps = apps_group.unwrap();
215        assert!(!apps.versions.is_empty(), "apps should have versions");
216
217        // Check that deployments resource exists in apps/v1
218        let v1 = apps.versions.iter().find(|v| v.version == Some("v1".to_string()));
219        assert!(v1.is_some(), "apps should have v1");
220
221        let deployments = v1
222            .unwrap()
223            .resources
224            .iter()
225            .find(|r| r.resource == Some("deployments".to_string()));
226        assert!(deployments.is_some(), "apps/v1 should have deployments");
227
228        Ok(())
229    }
230
231    #[tokio::test]
232    #[ignore = "needs cluster (uses aggregated discovery, requires k8s 1.26+)"]
233    #[cfg(feature = "client")]
234    async fn aggregated_discovery_core() -> Result<(), Box<dyn std::error::Error>> {
235        let client = Client::try_default().await?;
236
237        // Test /api aggregated discovery (core group)
238        let core_discovery = client.list_core_api_versions_aggregated().await?;
239        assert!(!core_discovery.items.is_empty(), "should have core group");
240
241        let core = &core_discovery.items[0];
242        let v1 = core.versions.iter().find(|v| v.version == Some("v1".to_string()));
243        assert!(v1.is_some(), "core should have v1");
244
245        // Check that pods resource exists
246        let pods = v1
247            .unwrap()
248            .resources
249            .iter()
250            .find(|r| r.resource == Some("pods".to_string()));
251        assert!(pods.is_some(), "core/v1 should have pods");
252
253        let pods_resource = pods.unwrap();
254        assert_eq!(pods_resource.scope, Some("Namespaced".to_string()));
255        assert!(pods_resource.verbs.contains(&"list".to_string()));
256
257        Ok(())
258    }
259
260    #[tokio::test]
261    #[ignore = "needs cluster (uses aggregated discovery, requires k8s 1.26+)"]
262    #[cfg(feature = "client")]
263    async fn discovery_run_aggregated() -> Result<(), Box<dyn std::error::Error>> {
264        use crate::discovery::{Discovery, verbs};
265
266        let client = Client::try_default().await?;
267
268        // Test Discovery::run_aggregated()
269        let discovery = Discovery::new(client.clone()).run_aggregated().await?;
270
271        // Should have discovered groups
272        assert!(discovery.groups().count() > 0, "should have discovered groups");
273
274        // Should have core group
275        assert!(discovery.has_group(""), "should have core group");
276
277        // Should have apps group
278        assert!(discovery.has_group("apps"), "should have apps group");
279
280        // Check that we can find deployments in apps group
281        let apps = discovery.get("apps").expect("apps group");
282        let (ar, caps) = apps.recommended_kind("Deployment").expect("Deployment kind");
283        assert_eq!(ar.kind, "Deployment");
284        assert!(caps.supports_operation(verbs::LIST));
285
286        // Check that we can find pods in core group
287        let core = discovery.get("").expect("core group");
288        let (ar, caps) = core.recommended_kind("Pod").expect("Pod kind");
289        assert_eq!(ar.kind, "Pod");
290        assert!(caps.supports_operation(verbs::GET));
291
292        Ok(())
293    }
294
295    #[tokio::test]
296    #[ignore = "needs cluster (will create and edit a pod)"]
297    async fn pod_can_use_core_apis() -> Result<(), Box<dyn std::error::Error>> {
298        use kube::api::{DeleteParams, ListParams, Patch, PatchParams, PostParams, WatchEvent};
299
300        let client = Client::try_default().await?;
301        let pods: Api<Pod> = Api::default_namespaced(client);
302
303        // create busybox pod that's alive for at most 30s
304        let p: Pod = serde_json::from_value(json!({
305            "apiVersion": "v1",
306            "kind": "Pod",
307            "metadata": {
308                "name": "busybox-kube1",
309                "labels": { "app": "kube-rs-test" },
310            },
311            "spec": {
312                "terminationGracePeriodSeconds": 1,
313                "restartPolicy": "Never",
314                "containers": [{
315                  "name": "busybox",
316                  "image": "busybox:1.34.1",
317                  "command": ["sh", "-c", "sleep 30"],
318                }],
319            }
320        }))?;
321
322        let pp = PostParams::default();
323        match pods.create(&pp, &p).await {
324            Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()),
325            Err(crate::Error::Api(ae)) => assert_eq!(ae.code, 409), // if we failed to clean-up
326            Err(e) => return Err(e.into()),                         // any other case if a failure
327        }
328
329        // Manual watch-api for it to become ready
330        // NB: don't do this; using conditions (see pod_api example) is easier and less error prone
331        let wp = WatchParams::default()
332            .fields(&format!("metadata.name={}", "busybox-kube1"))
333            .timeout(15);
334        let mut stream = pods.watch(&wp, "0").await?.boxed();
335        while let Some(ev) = stream.try_next().await? {
336            // can debug format watch event
337            let _ = format!("we: {ev:?}");
338            match ev {
339                WatchEvent::Modified(o) => {
340                    let s = o.status.as_ref().expect("status exists on pod");
341                    let phase = s.phase.clone().unwrap_or_default();
342                    if phase == "Running" {
343                        break;
344                    }
345                }
346                WatchEvent::Error(e) => panic!("watch error: {e}"),
347                _ => {}
348            }
349        }
350
351        // Verify we can get it
352        let mut pod = pods.get("busybox-kube1").await?;
353        assert_eq!(p.spec.as_ref().unwrap().containers[0].name, "busybox");
354
355        // verify replace with explicit resource version
356        // NB: don't do this; use server side apply
357        {
358            assert!(pod.resource_version().is_some());
359            pod.spec.as_mut().unwrap().active_deadline_seconds = Some(5);
360
361            let pp = PostParams::default();
362            let patched_pod = pods.replace("busybox-kube1", &pp, &pod).await?;
363            assert_eq!(patched_pod.spec.unwrap().active_deadline_seconds, Some(5));
364        }
365
366        // Delete it
367        let dp = DeleteParams::default();
368        pods.delete("busybox-kube1", &dp).await?.map_left(|pdel| {
369            assert_eq!(pdel.name_unchecked(), "busybox-kube1");
370        });
371
372        Ok(())
373    }
374
375    #[tokio::test]
376    #[ignore = "needs cluster (will create and attach to a pod)"]
377    #[cfg(feature = "ws")]
378    async fn pod_can_exec_and_write_to_stdin() -> Result<(), Box<dyn std::error::Error>> {
379        use crate::api::{DeleteParams, ListParams, Patch, PatchParams, WatchEvent};
380        use tokio::io::AsyncWriteExt;
381
382        let client = Client::try_default().await?;
383        let pods: Api<Pod> = Api::default_namespaced(client);
384
385        // create busybox pod that's alive for at most 30s
386        let p: Pod = serde_json::from_value(json!({
387            "apiVersion": "v1",
388            "kind": "Pod",
389            "metadata": {
390                "name": "busybox-kube2",
391                "labels": { "app": "kube-rs-test" },
392            },
393            "spec": {
394                "terminationGracePeriodSeconds": 1,
395                "restartPolicy": "Never",
396                "containers": [{
397                  "name": "busybox",
398                  "image": "busybox:1.34.1",
399                  "command": ["sh", "-c", "sleep 30"],
400                }],
401            }
402        }))?;
403
404        match pods.create(&Default::default(), &p).await {
405            Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()),
406            Err(crate::Error::Api(ae)) => assert_eq!(ae.code, 409), // if we failed to clean-up
407            Err(e) => return Err(e.into()),                         // any other case if a failure
408        }
409
410        // Manual watch-api for it to become ready
411        // NB: don't do this; using conditions (see pod_api example) is easier and less error prone
412        let wp = WatchParams::default()
413            .fields(&format!("metadata.name={}", "busybox-kube2"))
414            .timeout(15);
415        let mut stream = pods.watch(&wp, "0").await?.boxed();
416        while let Some(ev) = stream.try_next().await? {
417            match ev {
418                WatchEvent::Modified(o) => {
419                    let s = o.status.as_ref().expect("status exists on pod");
420                    let phase = s.phase.clone().unwrap_or_default();
421                    if phase == "Running" {
422                        break;
423                    }
424                }
425                WatchEvent::Error(e) => panic!("watch error: {e}"),
426                _ => {}
427            }
428        }
429
430        // Verify exec works and we can get the output
431        {
432            let mut attached = pods
433                .exec(
434                    "busybox-kube2",
435                    vec!["sh", "-c", "for i in $(seq 1 3); do echo $i; done"],
436                    &AttachParams::default().stderr(false),
437                )
438                .await?;
439            let stdout = tokio_util::io::ReaderStream::new(attached.stdout().unwrap());
440            let out = stdout
441                .filter_map(|r| async { r.ok().and_then(|v| String::from_utf8(v.to_vec()).ok()) })
442                .collect::<Vec<_>>()
443                .await
444                .join("");
445            attached.join().await.unwrap();
446            assert_eq!(out.lines().count(), 3);
447            assert_eq!(out, "1\n2\n3\n");
448        }
449
450        // Verify we read from stdout after stdin is closed.
451        {
452            let name = "busybox-kube2";
453            let command = vec!["sh", "-c", "sleep 2; echo test string 2"];
454            let ap = AttachParams::default().stdin(true).stderr(false);
455
456            // Make a connection so we can determine if the K8s cluster supports stream closing.
457            let mut req = pods.request.exec(name, command.clone(), &ap)?;
458            req.extensions_mut().insert("exec");
459            let stream = pods.client.connect(req).await?;
460
461            // This only works is the cluster supports protocol version v5.channel.k8s.io
462            // Skip for older protocols.
463            if stream.supports_stream_close() {
464                let mut attached = pods.exec(name, command, &ap).await?;
465                let mut stdin_writer = attached.stdin().unwrap();
466                let mut stdout_stream = tokio_util::io::ReaderStream::new(attached.stdout().unwrap());
467
468                stdin_writer.write_all(b"this will be ignored\n").await?;
469                _ = stdin_writer.shutdown().await;
470
471                let next_stdout = stdout_stream.next();
472                let stdout = String::from_utf8(next_stdout.await.unwrap().unwrap().to_vec()).unwrap();
473                assert_eq!(stdout, "test string 2\n");
474
475                // AttachedProcess resolves with status object.
476                let status = attached.take_status().unwrap();
477                if let Some(status) = status.await {
478                    assert_eq!(status.status, Some("Success".to_owned()));
479                    assert_eq!(status.reason, None);
480                }
481            }
482        }
483
484        // Verify we can write to Stdin
485        {
486            let mut attached = pods
487                .exec(
488                    "busybox-kube2",
489                    vec!["sh"],
490                    &AttachParams::default().stdin(true).stderr(false),
491                )
492                .await?;
493            let mut stdin_writer = attached.stdin().unwrap();
494            let mut stdout_stream = tokio_util::io::ReaderStream::new(attached.stdout().unwrap());
495            let next_stdout = stdout_stream.next();
496            stdin_writer.write_all(b"echo test string 1\n").await?;
497            let stdout = String::from_utf8(next_stdout.await.unwrap().unwrap().to_vec()).unwrap();
498            println!("{stdout}");
499            assert_eq!(stdout, "test string 1\n");
500
501            // AttachedProcess resolves with status object.
502            // Send `exit 1` to get a failure status.
503            stdin_writer.write_all(b"exit 1\n").await?;
504            let status = attached.take_status().unwrap();
505            if let Some(status) = status.await {
506                println!("{status:?}");
507                assert_eq!(status.status, Some("Failure".to_owned()));
508                assert_eq!(status.reason, Some("NonZeroExitCode".to_owned()));
509            }
510        }
511
512        // Delete it
513        let dp = DeleteParams::default();
514        pods.delete("busybox-kube2", &dp).await?.map_left(|pdel| {
515            assert_eq!(pdel.name_unchecked(), "busybox-kube2");
516        });
517
518        Ok(())
519    }
520
521    #[tokio::test]
522    #[ignore = "needs cluster (will create and tail logs from a pod)"]
523    async fn can_get_pod_logs_and_evict() -> Result<(), Box<dyn std::error::Error>> {
524        use crate::{
525            api::{DeleteParams, EvictParams, ListParams, Patch, PatchParams, WatchEvent},
526            core::subresource::LogParams,
527        };
528
529        let client = Client::try_default().await?;
530        let pods: Api<Pod> = Api::default_namespaced(client);
531
532        // create busybox pod that's alive for at most 30s
533        let p: Pod = serde_json::from_value(json!({
534            "apiVersion": "v1",
535            "kind": "Pod",
536            "metadata": {
537                "name": "busybox-kube3",
538                "labels": { "app": "kube-rs-test" },
539            },
540            "spec": {
541                "terminationGracePeriodSeconds": 1,
542                "restartPolicy": "Never",
543                "containers": [{
544                  "name": "busybox",
545                  "image": "busybox:1.34.1",
546                  "command": ["sh", "-c", "for i in $(seq 1 5); do echo kube $i; sleep 0.1; done"],
547                }],
548            }
549        }))?;
550
551        match pods.create(&Default::default(), &p).await {
552            Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()),
553            Err(crate::Error::Api(ae)) => assert_eq!(ae.code, 409), // if we failed to clean-up
554            Err(e) => return Err(e.into()),                         // any other case if a failure
555        }
556
557        // Manual watch-api for it to become ready
558        // NB: don't do this; using conditions (see pod_api example) is easier and less error prone
559        let wp = WatchParams::default()
560            .fields(&format!("metadata.name={}", "busybox-kube3"))
561            .timeout(15);
562        let mut stream = pods.watch(&wp, "0").await?.boxed();
563        while let Some(ev) = stream.try_next().await? {
564            match ev {
565                WatchEvent::Modified(o) => {
566                    let s = o.status.as_ref().expect("status exists on pod");
567                    let phase = s.phase.clone().unwrap_or_default();
568                    if phase == "Running" {
569                        break;
570                    }
571                }
572                WatchEvent::Error(e) => panic!("watch error: {e}"),
573                _ => {}
574            }
575        }
576
577        // Get current list of logs
578        let lp = LogParams {
579            follow: true,
580            ..LogParams::default()
581        };
582        let mut logs_stream = pods.log_stream("busybox-kube3", &lp).await?.lines();
583
584        // wait for container to finish
585        tokio::time::sleep(std::time::Duration::from_secs(2)).await;
586
587        let all_logs = pods.logs("busybox-kube3", &Default::default()).await?;
588        assert_eq!(all_logs, "kube 1\nkube 2\nkube 3\nkube 4\nkube 5\n");
589
590        // individual logs may or may not buffer
591        let mut output = vec![];
592        while let Some(line) = logs_stream.try_next().await? {
593            output.push(line);
594        }
595        assert_eq!(output, vec!["kube 1", "kube 2", "kube 3", "kube 4", "kube 5"]);
596
597        // evict the pod
598        let ep = EvictParams::default();
599        let eres = pods.evict("busybox-kube3", &ep).await?;
600        assert_eq!(eres.code, 201); // created
601        assert!(eres.is_success());
602
603        Ok(())
604    }
605
606    #[tokio::test]
607    #[ignore = "requires a cluster"]
608    async fn can_operate_on_pod_metadata() -> Result<(), Box<dyn std::error::Error>> {
609        use crate::{
610            api::{DeleteParams, EvictParams, ListParams, Patch, PatchParams, WatchEvent},
611            core::subresource::LogParams,
612        };
613        use kube_core::{ObjectList, ObjectMeta, PartialObjectMeta, PartialObjectMetaExt};
614
615        let client = Client::try_default().await?;
616        let pods: Api<Pod> = Api::default_namespaced(client);
617
618        // create busybox pod that's alive for at most 30s
619        let p: Pod = serde_json::from_value(json!({
620            "apiVersion": "v1",
621            "kind": "Pod",
622            "metadata": {
623                "name": "busybox-kube-meta",
624                "labels": { "app": "kube-rs-test" },
625            },
626            "spec": {
627                "terminationGracePeriodSeconds": 1,
628                "restartPolicy": "Never",
629                "containers": [{
630                  "name": "busybox",
631                  "image": "busybox:1.34.1",
632                  "command": ["sh", "-c", "sleep 30s"],
633                }],
634            }
635        }))?;
636
637        match pods.create(&Default::default(), &p).await {
638            Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()),
639            Err(crate::Error::Api(ae)) => assert_eq!(ae.code, 409), // if we failed to clean-up
640            Err(e) => return Err(e.into()),                         // any other case if a failure
641        }
642
643        // Test we can get a pod as a PartialObjectMeta and convert to
644        // ObjectMeta
645        let pod_metadata = pods.get_metadata("busybox-kube-meta").await?;
646        assert_eq!("busybox-kube-meta", pod_metadata.name_any());
647        assert_eq!(
648            Some((&"app".to_string(), &"kube-rs-test".to_string())),
649            pod_metadata.labels().get_key_value("app")
650        );
651
652        // Test we can get a list of PartialObjectMeta for pods
653        let p_list = pods.list_metadata(&ListParams::default()).await?;
654
655        // Find only pod we are concerned with in this test and fail eagerly if
656        // name doesn't exist
657        let pod_metadata = p_list
658            .items
659            .into_iter()
660            .find(|p| p.name_any() == "busybox-kube-meta")
661            .unwrap();
662        assert_eq!(
663            pod_metadata.labels().get("app"),
664            Some(&"kube-rs-test".to_string())
665        );
666
667        // Attempt to patch pod metadata
668        let patch = ObjectMeta {
669            annotations: Some([("test".to_string(), "123".to_string())].into()),
670            ..Default::default()
671        }
672        .into_request_partial::<Pod>();
673
674        let patchparams = PatchParams::default();
675        let p_patched = pods
676            .patch_metadata("busybox-kube-meta", &patchparams, &Patch::Merge(&patch))
677            .await?;
678        assert_eq!(p_patched.annotations().get("test"), Some(&"123".to_string()));
679        assert_eq!(p_patched.types.as_ref().unwrap().kind, "PartialObjectMetadata");
680        assert_eq!(p_patched.types.as_ref().unwrap().api_version, "meta.k8s.io/v1");
681
682        // Clean-up
683        let dp = DeleteParams::default();
684        pods.delete("busybox-kube-meta", &dp).await?.map_left(|pdel| {
685            assert_eq!(pdel.name_any(), "busybox-kube-meta");
686        });
687
688        Ok(())
689    }
690    #[tokio::test]
691    #[ignore = "needs cluster (will create a CertificateSigningRequest)"]
692    async fn csr_can_be_approved() -> Result<(), Box<dyn std::error::Error>> {
693        use crate::api::PostParams;
694        use k8s_openapi::api::certificates::v1::{
695            CertificateSigningRequest, CertificateSigningRequestCondition, CertificateSigningRequestStatus,
696        };
697
698        let csr_name = "fake";
699        let dummy_csr: CertificateSigningRequest = serde_json::from_value(json!({
700            "apiVersion": "certificates.k8s.io/v1",
701            "kind": "CertificateSigningRequest",
702            "metadata": { "name": csr_name },
703            "spec": {
704                "request": "LS0tLS1CRUdJTiBDRVJUSUZJQ0FURSBSRVFVRVNULS0tLS0KTUlJQ1ZqQ0NBVDRDQVFBd0VURVBNQTBHQTFVRUF3d0dZVzVuWld4aE1JSUJJakFOQmdrcWhraUc5dzBCQVFFRgpBQU9DQVE4QU1JSUJDZ0tDQVFFQTByczhJTHRHdTYxakx2dHhWTTJSVlRWMDNHWlJTWWw0dWluVWo4RElaWjBOCnR2MUZtRVFSd3VoaUZsOFEzcWl0Qm0wMUFSMkNJVXBGd2ZzSjZ4MXF3ckJzVkhZbGlBNVhwRVpZM3ExcGswSDQKM3Z3aGJlK1o2MVNrVHF5SVBYUUwrTWM5T1Nsbm0xb0R2N0NtSkZNMUlMRVI3QTVGZnZKOEdFRjJ6dHBoaUlFMwpub1dtdHNZb3JuT2wzc2lHQ2ZGZzR4Zmd4eW8ybmlneFNVekl1bXNnVm9PM2ttT0x1RVF6cXpkakJ3TFJXbWlECklmMXBMWnoyalVnald4UkhCM1gyWnVVV1d1T09PZnpXM01LaE8ybHEvZi9DdS8wYk83c0x0MCt3U2ZMSU91TFcKcW90blZtRmxMMytqTy82WDNDKzBERHk5aUtwbXJjVDBnWGZLemE1dHJRSURBUUFCb0FBd0RRWUpLb1pJaHZjTgpBUUVMQlFBRGdnRUJBR05WdmVIOGR4ZzNvK21VeVRkbmFjVmQ1N24zSkExdnZEU1JWREkyQTZ1eXN3ZFp1L1BVCkkwZXpZWFV0RVNnSk1IRmQycVVNMjNuNVJsSXJ3R0xuUXFISUh5VStWWHhsdnZsRnpNOVpEWllSTmU3QlJvYXgKQVlEdUI5STZXT3FYbkFvczFqRmxNUG5NbFpqdU5kSGxpT1BjTU1oNndLaTZzZFhpVStHYTJ2RUVLY01jSVUyRgpvU2djUWdMYTk0aEpacGk3ZnNMdm1OQUxoT045UHdNMGM1dVJVejV4T0dGMUtCbWRSeEgvbUNOS2JKYjFRQm1HCkkwYitEUEdaTktXTU0xMzhIQXdoV0tkNjVoVHdYOWl4V3ZHMkh4TG1WQzg0L1BHT0tWQW9FNkpsYWFHdTlQVmkKdjlOSjVaZlZrcXdCd0hKbzZXdk9xVlA3SVFjZmg3d0drWm89Ci0tLS0tRU5EIENFUlRJRklDQVRFIFJFUVVFU1QtLS0tLQo=",
705                "signerName": "kubernetes.io/kube-apiserver-client",
706                "expirationSeconds": 86400,
707                "usages": ["client auth"]
708            }
709        }))?;
710
711        let client = Client::try_default().await?;
712        let csr: Api<CertificateSigningRequest> = Api::all(client.clone());
713        assert!(csr.create(&PostParams::default(), &dummy_csr).await.is_ok());
714
715        // Patch the approval and approve the CSR
716        let approval_type = "ApprovedFake";
717        let csr_status: CertificateSigningRequestStatus = CertificateSigningRequestStatus {
718            certificate: None,
719            conditions: Some(vec![CertificateSigningRequestCondition {
720                type_: approval_type.to_string(),
721                last_update_time: None,
722                last_transition_time: None,
723                message: Some(format!("{} {}", approval_type, "by kube-rs client")),
724                reason: Some("kube-rsClient".to_string()),
725                status: "True".to_string(),
726            }]),
727        };
728        let csr_status_patch = Patch::Merge(serde_json::json!({ "status": csr_status }));
729        let _ = csr
730            .patch_approval(csr_name, &Default::default(), &csr_status_patch)
731            .await?;
732        let csr_after_approval = csr.get_approval(csr_name).await?;
733
734        assert_eq!(
735            csr_after_approval
736                .status
737                .as_ref()
738                .unwrap()
739                .conditions
740                .as_ref()
741                .unwrap()[0]
742                .type_,
743            approval_type.to_string()
744        );
745        csr.delete(csr_name, &DeleteParams::default()).await?;
746        Ok(())
747    }
748
749    #[tokio::test]
750    #[ignore = "needs cluster for ephemeral containers operations"]
751    async fn can_operate_on_ephemeral_containers() -> Result<(), Box<dyn std::error::Error>> {
752        let client = Client::try_default().await?;
753
754        // Ephemeral containers were stabilized in Kubernetes v1.25.
755        // This test therefore exits early if the current cluster version is older than v1.25.
756        let api_version = client.apiserver_version().await?;
757        if api_version.major.parse::<i32>()? < 1 || api_version.minor.parse::<i32>()? < 25 {
758            return Ok(());
759        }
760
761        let pod: Pod = serde_json::from_value(serde_json::json!({
762            "apiVersion": "v1",
763            "kind": "Pod",
764            "metadata": {
765                "name": "ephemeral-container-test",
766                "labels": { "app": "kube-rs-test" },
767            },
768            "spec": {
769                "restartPolicy": "Never",
770                "containers": [{
771                  "name": "busybox",
772                  "image": "busybox:1.34.1",
773                  "command": ["sh", "-c", "sleep 2"],
774                }],
775            }
776        }))?;
777
778        let pod_name = pod.name_any();
779        let pods = Api::<Pod>::default_namespaced(client);
780
781        // If cleanup failed and a pod already exists, we attempt to remove it
782        // before proceeding. This is important as ephemeral containers can't
783        // be removed from a Pod's spec. Therefore this test must start with a fresh
784        // Pod every time.
785        let _ = pods
786            .delete(&pod.name_any(), &DeleteParams::default())
787            .await
788            .map(|v| v.map_left(|pdel| assert_eq!(pdel.name_any(), pod.name_any())));
789
790        // Ephemeral containes can only be applied to a running pod, so one must
791        // be created before any operations are tested.
792        match pods.create(&Default::default(), &pod).await {
793            Ok(o) => assert_eq!(pod.name_unchecked(), o.name_unchecked()),
794            Err(e) => return Err(e.into()), // any other case if a failure
795        }
796
797        let current_ephemeral_containers = pods
798            .get_ephemeral_containers(&pod.name_any())
799            .await?
800            .spec
801            .unwrap()
802            .ephemeral_containers;
803
804        // We expect no ephemeral containers initially, get_ephemeral_containers should
805        // reflect that.
806        assert_eq!(current_ephemeral_containers, None);
807
808        let mut busybox_eph: EphemeralContainer = serde_json::from_value(json!(
809            {
810                "name": "myephemeralcontainer1",
811                "image": "busybox:1.34.1",
812                "command": ["sh", "-c", "sleep 2"],
813            }
814        ))?;
815
816        // Attempt to replace ephemeral containers.
817
818        let patch: Pod = serde_json::from_value(json!({
819            "metadata": { "name": pod_name },
820            "spec":{ "ephemeralContainers": [ busybox_eph ] }
821        }))?;
822
823        let current_containers = pods
824            .replace_ephemeral_containers(&pod_name, &PostParams::default(), &patch)
825            .await?
826            .spec
827            .unwrap()
828            .ephemeral_containers
829            .expect("could find ephemeral container");
830
831        // Note that we can't compare the whole ephemeral containers object, as some fields
832        // are set by the cluster. We therefore compare the fields specified in the patch.
833        assert_eq!(current_containers.len(), 1);
834        assert_eq!(current_containers[0].name, busybox_eph.name);
835        assert_eq!(current_containers[0].image, busybox_eph.image);
836        assert_eq!(current_containers[0].command, busybox_eph.command);
837
838        // Attempt to patch ephemeral containers.
839
840        // The new ephemeral container will have different values from the
841        // first to ensure we can test for its presence.
842        busybox_eph = serde_json::from_value(json!(
843            {
844                "name": "myephemeralcontainer2",
845                "image": "busybox:1.35.0",
846                "command": ["sh", "-c", "sleep 1"],
847            }
848        ))?;
849
850        let patch: Pod =
851            serde_json::from_value(json!({ "spec": { "ephemeralContainers": [ busybox_eph ] }}))?;
852
853        let current_containers = pods
854            .patch_ephemeral_containers(&pod_name, &PatchParams::default(), &Patch::Strategic(patch))
855            .await?
856            .spec
857            .unwrap()
858            .ephemeral_containers
859            .expect("could find ephemeral container");
860
861        // There should only be 2 ephemeral containers at this point,
862        // one from each patch
863        assert_eq!(current_containers.len(), 2);
864
865        let new_container = current_containers
866            .iter()
867            .find(|c| c.name == busybox_eph.name)
868            .expect("could find myephemeralcontainer2");
869
870        // Note that we can't compare the whole ephemeral container object, as some fields
871        // get set in the cluster. We therefore compare the fields specified in the patch.
872        assert_eq!(new_container.image, busybox_eph.image);
873        assert_eq!(new_container.command, busybox_eph.command);
874
875        // Attempt to get ephemeral containers.
876
877        let expected_containers = current_containers;
878
879        let current_containers = pods
880            .get_ephemeral_containers(&pod.name_any())
881            .await?
882            .spec
883            .unwrap()
884            .ephemeral_containers
885            .unwrap();
886
887        assert_eq!(current_containers, expected_containers);
888
889        pods.delete(&pod.name_any(), &DeleteParams::default())
890            .await?
891            .map_left(|pdel| {
892                assert_eq!(pdel.name_any(), pod.name_any());
893            });
894
895        Ok(())
896    }
897
898    #[tokio::test]
899    #[ignore = "needs kubelet debug methods"]
900    #[cfg(feature = "kubelet-debug")]
901    async fn pod_can_exec_and_write_to_stdin_from_node_proxy() -> Result<(), Box<dyn std::error::Error>> {
902        use crate::{
903            api::{DeleteParams, ListParams, Patch, PatchParams, WatchEvent},
904            core::kubelet_debug::KubeletDebugParams,
905        };
906
907        let client = Client::try_default().await?;
908        let pods: Api<Pod> = Api::default_namespaced(client);
909
910        // create busybox pod that's alive for at most 30s
911        let p: Pod = serde_json::from_value(json!({
912            "apiVersion": "v1",
913            "kind": "Pod",
914            "metadata": {
915                "name": "busybox-kube2",
916                "labels": { "app": "kube-rs-test" },
917            },
918            "spec": {
919                "terminationGracePeriodSeconds": 1,
920                "restartPolicy": "Never",
921                "containers": [{
922                  "name": "busybox",
923                  "image": "busybox:1.34.1",
924                  "command": ["sh", "-c", "sleep 30"],
925                }],
926            }
927        }))?;
928
929        match pods.create(&Default::default(), &p).await {
930            Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()),
931            Err(crate::Error::Api(ae)) => assert_eq!(ae.code, 409), // if we failed to clean-up
932            Err(e) => return Err(e.into()),                         // any other case if a failure
933        }
934
935        // Manual watch-api for it to become ready
936        // NB: don't do this; using conditions (see pod_api example) is easier and less error prone
937        let wp = WatchParams::default()
938            .fields(&format!("metadata.name={}", "busybox-kube2"))
939            .timeout(15);
940        let mut stream = pods.watch(&wp, "0").await?.boxed();
941        while let Some(ev) = stream.try_next().await? {
942            match ev {
943                WatchEvent::Modified(o) => {
944                    let s = o.status.as_ref().expect("status exists on pod");
945                    let phase = s.phase.clone().unwrap_or_default();
946                    if phase == "Running" {
947                        break;
948                    }
949                }
950                WatchEvent::Error(e) => panic!("watch error: {e}"),
951                _ => {}
952            }
953        }
954
955        let mut config = Config::infer().await?;
956        config.accept_invalid_certs = true;
957        config.cluster_url = "https://localhost:10250".to_string().parse::<Uri>().unwrap();
958        let kubelet_client: Client = config.try_into()?;
959
960        // Verify exec works and we can get the output
961        {
962            let mut attached = kubelet_client
963                .kubelet_node_exec(
964                    &KubeletDebugParams {
965                        name: "busybox-kube2",
966                        namespace: "default",
967                        ..Default::default()
968                    },
969                    "busybox",
970                    vec!["sh", "-c", "for i in $(seq 1 3); do echo $i; done"],
971                    &AttachParams::default().stderr(false),
972                )
973                .await?;
974            let stdout = tokio_util::io::ReaderStream::new(attached.stdout().unwrap());
975            let out = stdout
976                .filter_map(|r| async { r.ok().and_then(|v| String::from_utf8(v.to_vec()).ok()) })
977                .collect::<Vec<_>>()
978                .await
979                .join("");
980            attached.join().await.unwrap();
981            assert_eq!(out.lines().count(), 3);
982            assert_eq!(out, "1\n2\n3\n");
983        }
984
985        // Verify we can write to Stdin
986        {
987            use tokio::io::AsyncWriteExt;
988            let mut attached = kubelet_client
989                .kubelet_node_exec(
990                    &KubeletDebugParams {
991                        name: "busybox-kube2",
992                        namespace: "default",
993                        ..Default::default()
994                    },
995                    "busybox",
996                    vec!["sh"],
997                    &AttachParams::default().stdin(true).stderr(false),
998                )
999                .await?;
1000            let mut stdin_writer = attached.stdin().unwrap();
1001            let mut stdout_stream = tokio_util::io::ReaderStream::new(attached.stdout().unwrap());
1002            let next_stdout = stdout_stream.next();
1003            stdin_writer.write_all(b"echo test string 1\n").await?;
1004            let stdout = String::from_utf8(next_stdout.await.unwrap().unwrap().to_vec()).unwrap();
1005            println!("{stdout}");
1006            assert_eq!(stdout, "test string 1\n");
1007
1008            // AttachedProcess resolves with status object.
1009            // Send `exit 1` to get a failure status.
1010            stdin_writer.write_all(b"exit 1\n").await?;
1011            let status = attached.take_status().unwrap();
1012            if let Some(status) = status.await {
1013                println!("{status:?}");
1014                assert_eq!(status.status, Some("Failure".to_owned()));
1015                assert_eq!(status.reason, Some("NonZeroExitCode".to_owned()));
1016            }
1017        }
1018
1019        // Delete it
1020        let dp = DeleteParams::default();
1021        pods.delete("busybox-kube2", &dp).await?.map_left(|pdel| {
1022            assert_eq!(pdel.name_unchecked(), "busybox-kube2");
1023        });
1024
1025        Ok(())
1026    }
1027}