1#![cfg_attr(docsrs, feature(doc_cfg))]
64#![allow(clippy::unnecessary_lazy_evaluations)]
66
67macro_rules! cfg_client {
68 ($($item:item)*) => {
69 $(
70 #[cfg_attr(docsrs, doc(cfg(feature = "client")))]
71 #[cfg(feature = "client")]
72 $item
73 )*
74 }
75}
76macro_rules! cfg_config {
77 ($($item:item)*) => {
78 $(
79 #[cfg_attr(docsrs, doc(cfg(feature = "config")))]
80 #[cfg(feature = "config")]
81 $item
82 )*
83 }
84}
85
86macro_rules! cfg_error {
87 ($($item:item)*) => {
88 $(
89 #[cfg_attr(docsrs, doc(cfg(any(feature = "config", feature = "client"))))]
90 #[cfg(any(feature = "config", feature = "client"))]
91 $item
92 )*
93 }
94}
95
96cfg_client! {
97 pub mod api;
98 pub mod discovery;
99 pub mod client;
100
101 #[doc(inline)]
102 pub use api::Api;
103 #[doc(inline)]
104 pub use client::Client;
105 #[doc(inline)]
106 pub use discovery::Discovery;
107}
108
109cfg_config! {
110 pub mod config;
111 #[doc(inline)]
112 pub use config::Config;
113}
114
115cfg_error! {
116 pub mod error;
117 #[doc(inline)] pub use error::Error;
118 pub type Result<T, E = Error> = std::result::Result<T, E>;
120}
121
122pub use crate::core::{CustomResourceExt, Resource, ResourceExt};
123pub use kube_core as core;
125
126#[cfg(all(feature = "client", feature = "config"))]
129#[cfg(test)]
130#[allow(unused_imports)] mod test {
132 use crate::{
133 Api, Client, Config, ResourceExt,
134 api::{AttachParams, AttachedProcess},
135 client::ConfigExt,
136 };
137 use futures::{AsyncBufRead, AsyncBufReadExt, StreamExt, TryStreamExt};
138 use hyper::Uri;
139 use k8s_openapi::api::core::v1::{EphemeralContainer, Pod, PodSpec};
140 use kube_core::{
141 params::{DeleteParams, Patch, PatchParams, PostParams, WatchParams},
142 response::StatusSummary,
143 };
144 use serde_json::json;
145 use tower::ServiceBuilder;
146
147 #[allow(dead_code)]
149 #[ignore = "needs cluster (lists pods)"]
151 #[cfg(feature = "rustls-tls")]
152 async fn custom_client_rustls_configuration() -> Result<(), Box<dyn std::error::Error>> {
153 use hyper_util::rt::TokioExecutor;
154
155 let config = Config::infer().await?;
156 let https = config.rustls_https_connector()?;
157 let service = ServiceBuilder::new()
158 .layer(config.base_uri_layer())
159 .service(hyper_util::client::legacy::Client::builder(TokioExecutor::new()).build(https));
160 let client = Client::new(service, config.default_namespace);
161 let pods: Api<Pod> = Api::default_namespaced(client);
162 pods.list(&Default::default()).await?;
163 Ok(())
164 }
165
166 #[tokio::test]
167 #[ignore = "needs cluster (lists pods)"]
168 #[cfg(feature = "openssl-tls")]
169 async fn custom_client_openssl_tls_configuration() -> Result<(), Box<dyn std::error::Error>> {
170 use hyper_util::rt::TokioExecutor;
171
172 let config = Config::infer().await?;
173 let https = config.openssl_https_connector()?;
174 let service = ServiceBuilder::new()
175 .layer(config.base_uri_layer())
176 .service(hyper_util::client::legacy::Client::builder(TokioExecutor::new()).build(https));
177 let client = Client::new(service, config.default_namespace);
178 let pods: Api<Pod> = Api::default_namespaced(client);
179 pods.list(&Default::default()).await?;
180 Ok(())
181 }
182
183 #[tokio::test]
184 #[ignore = "needs cluster (lists api resources)"]
185 #[cfg(feature = "client")]
186 async fn group_discovery_oneshot() -> Result<(), Box<dyn std::error::Error>> {
187 use crate::{core::DynamicObject, discovery};
188 let client = Client::try_default().await?;
189 let apigroup = discovery::group(&client, "apiregistration.k8s.io").await?;
190 let (ar, _caps) = apigroup.recommended_kind("APIService").unwrap();
191 let api: Api<DynamicObject> = Api::all_with(client.clone(), &ar);
192 api.list(&Default::default()).await?;
193
194 Ok(())
195 }
196
197 #[tokio::test]
198 #[ignore = "needs cluster (uses aggregated discovery, requires k8s 1.26+)"]
199 #[cfg(feature = "client")]
200 async fn aggregated_discovery_apis() -> Result<(), Box<dyn std::error::Error>> {
201 let client = Client::try_default().await?;
202
203 let apis_discovery = client.list_api_groups_aggregated().await?;
205 assert!(!apis_discovery.items.is_empty(), "should have API groups");
206
207 let apps_group = apis_discovery
209 .items
210 .iter()
211 .find(|g| g.metadata.as_ref().and_then(|m| m.name.as_ref()) == Some(&"apps".to_string()));
212 assert!(apps_group.is_some(), "should have apps group");
213
214 let apps = apps_group.unwrap();
215 assert!(!apps.versions.is_empty(), "apps should have versions");
216
217 let v1 = apps.versions.iter().find(|v| v.version == Some("v1".to_string()));
219 assert!(v1.is_some(), "apps should have v1");
220
221 let deployments = v1
222 .unwrap()
223 .resources
224 .iter()
225 .find(|r| r.resource == Some("deployments".to_string()));
226 assert!(deployments.is_some(), "apps/v1 should have deployments");
227
228 Ok(())
229 }
230
231 #[tokio::test]
232 #[ignore = "needs cluster (uses aggregated discovery, requires k8s 1.26+)"]
233 #[cfg(feature = "client")]
234 async fn aggregated_discovery_core() -> Result<(), Box<dyn std::error::Error>> {
235 let client = Client::try_default().await?;
236
237 let core_discovery = client.list_core_api_versions_aggregated().await?;
239 assert!(!core_discovery.items.is_empty(), "should have core group");
240
241 let core = &core_discovery.items[0];
242 let v1 = core.versions.iter().find(|v| v.version == Some("v1".to_string()));
243 assert!(v1.is_some(), "core should have v1");
244
245 let pods = v1
247 .unwrap()
248 .resources
249 .iter()
250 .find(|r| r.resource == Some("pods".to_string()));
251 assert!(pods.is_some(), "core/v1 should have pods");
252
253 let pods_resource = pods.unwrap();
254 assert_eq!(pods_resource.scope, Some("Namespaced".to_string()));
255 assert!(pods_resource.verbs.contains(&"list".to_string()));
256
257 Ok(())
258 }
259
260 #[tokio::test]
261 #[ignore = "needs cluster (uses aggregated discovery, requires k8s 1.26+)"]
262 #[cfg(feature = "client")]
263 async fn discovery_run_aggregated() -> Result<(), Box<dyn std::error::Error>> {
264 use crate::discovery::{Discovery, verbs};
265
266 let client = Client::try_default().await?;
267
268 let discovery = Discovery::new(client.clone()).run_aggregated().await?;
270
271 assert!(discovery.groups().count() > 0, "should have discovered groups");
273
274 assert!(discovery.has_group(""), "should have core group");
276
277 assert!(discovery.has_group("apps"), "should have apps group");
279
280 let apps = discovery.get("apps").expect("apps group");
282 let (ar, caps) = apps.recommended_kind("Deployment").expect("Deployment kind");
283 assert_eq!(ar.kind, "Deployment");
284 assert!(caps.supports_operation(verbs::LIST));
285
286 let core = discovery.get("").expect("core group");
288 let (ar, caps) = core.recommended_kind("Pod").expect("Pod kind");
289 assert_eq!(ar.kind, "Pod");
290 assert!(caps.supports_operation(verbs::GET));
291
292 Ok(())
293 }
294
295 #[tokio::test]
296 #[ignore = "needs cluster (will create and edit a pod)"]
297 async fn pod_can_use_core_apis() -> Result<(), Box<dyn std::error::Error>> {
298 use kube::api::{DeleteParams, ListParams, Patch, PatchParams, PostParams, WatchEvent};
299
300 let client = Client::try_default().await?;
301 let pods: Api<Pod> = Api::default_namespaced(client);
302
303 let p: Pod = serde_json::from_value(json!({
305 "apiVersion": "v1",
306 "kind": "Pod",
307 "metadata": {
308 "name": "busybox-kube1",
309 "labels": { "app": "kube-rs-test" },
310 },
311 "spec": {
312 "terminationGracePeriodSeconds": 1,
313 "restartPolicy": "Never",
314 "containers": [{
315 "name": "busybox",
316 "image": "busybox:1.34.1",
317 "command": ["sh", "-c", "sleep 30"],
318 }],
319 }
320 }))?;
321
322 let pp = PostParams::default();
323 match pods.create(&pp, &p).await {
324 Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()),
325 Err(crate::Error::Api(ae)) => assert_eq!(ae.code, 409), Err(e) => return Err(e.into()), }
328
329 let wp = WatchParams::default()
332 .fields(&format!("metadata.name={}", "busybox-kube1"))
333 .timeout(15);
334 let mut stream = pods.watch(&wp, "0").await?.boxed();
335 while let Some(ev) = stream.try_next().await? {
336 let _ = format!("we: {ev:?}");
338 match ev {
339 WatchEvent::Modified(o) => {
340 let s = o.status.as_ref().expect("status exists on pod");
341 let phase = s.phase.clone().unwrap_or_default();
342 if phase == "Running" {
343 break;
344 }
345 }
346 WatchEvent::Error(e) => panic!("watch error: {e}"),
347 _ => {}
348 }
349 }
350
351 let mut pod = pods.get("busybox-kube1").await?;
353 assert_eq!(p.spec.as_ref().unwrap().containers[0].name, "busybox");
354
355 {
358 assert!(pod.resource_version().is_some());
359 pod.spec.as_mut().unwrap().active_deadline_seconds = Some(5);
360
361 let pp = PostParams::default();
362 let patched_pod = pods.replace("busybox-kube1", &pp, &pod).await?;
363 assert_eq!(patched_pod.spec.unwrap().active_deadline_seconds, Some(5));
364 }
365
366 let dp = DeleteParams::default();
368 pods.delete("busybox-kube1", &dp).await?.map_left(|pdel| {
369 assert_eq!(pdel.name_unchecked(), "busybox-kube1");
370 });
371
372 Ok(())
373 }
374
375 #[tokio::test]
376 #[ignore = "needs cluster (will create and attach to a pod)"]
377 #[cfg(feature = "ws")]
378 async fn pod_can_exec_and_write_to_stdin() -> Result<(), Box<dyn std::error::Error>> {
379 use crate::api::{DeleteParams, ListParams, Patch, PatchParams, WatchEvent};
380 use tokio::io::AsyncWriteExt;
381
382 let client = Client::try_default().await?;
383 let pods: Api<Pod> = Api::default_namespaced(client);
384
385 let p: Pod = serde_json::from_value(json!({
387 "apiVersion": "v1",
388 "kind": "Pod",
389 "metadata": {
390 "name": "busybox-kube2",
391 "labels": { "app": "kube-rs-test" },
392 },
393 "spec": {
394 "terminationGracePeriodSeconds": 1,
395 "restartPolicy": "Never",
396 "containers": [{
397 "name": "busybox",
398 "image": "busybox:1.34.1",
399 "command": ["sh", "-c", "sleep 30"],
400 }],
401 }
402 }))?;
403
404 match pods.create(&Default::default(), &p).await {
405 Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()),
406 Err(crate::Error::Api(ae)) => assert_eq!(ae.code, 409), Err(e) => return Err(e.into()), }
409
410 let wp = WatchParams::default()
413 .fields(&format!("metadata.name={}", "busybox-kube2"))
414 .timeout(15);
415 let mut stream = pods.watch(&wp, "0").await?.boxed();
416 while let Some(ev) = stream.try_next().await? {
417 match ev {
418 WatchEvent::Modified(o) => {
419 let s = o.status.as_ref().expect("status exists on pod");
420 let phase = s.phase.clone().unwrap_or_default();
421 if phase == "Running" {
422 break;
423 }
424 }
425 WatchEvent::Error(e) => panic!("watch error: {e}"),
426 _ => {}
427 }
428 }
429
430 {
432 let mut attached = pods
433 .exec(
434 "busybox-kube2",
435 vec!["sh", "-c", "for i in $(seq 1 3); do echo $i; done"],
436 &AttachParams::default().stderr(false),
437 )
438 .await?;
439 let stdout = tokio_util::io::ReaderStream::new(attached.stdout().unwrap());
440 let out = stdout
441 .filter_map(|r| async { r.ok().and_then(|v| String::from_utf8(v.to_vec()).ok()) })
442 .collect::<Vec<_>>()
443 .await
444 .join("");
445 attached.join().await.unwrap();
446 assert_eq!(out.lines().count(), 3);
447 assert_eq!(out, "1\n2\n3\n");
448 }
449
450 {
452 let name = "busybox-kube2";
453 let command = vec!["sh", "-c", "sleep 2; echo test string 2"];
454 let ap = AttachParams::default().stdin(true).stderr(false);
455
456 let mut req = pods.request.exec(name, command.clone(), &ap)?;
458 req.extensions_mut().insert("exec");
459 let stream = pods.client.connect(req).await?;
460
461 if stream.supports_stream_close() {
464 let mut attached = pods.exec(name, command, &ap).await?;
465 let mut stdin_writer = attached.stdin().unwrap();
466 let mut stdout_stream = tokio_util::io::ReaderStream::new(attached.stdout().unwrap());
467
468 stdin_writer.write_all(b"this will be ignored\n").await?;
469 _ = stdin_writer.shutdown().await;
470
471 let next_stdout = stdout_stream.next();
472 let stdout = String::from_utf8(next_stdout.await.unwrap().unwrap().to_vec()).unwrap();
473 assert_eq!(stdout, "test string 2\n");
474
475 let status = attached.take_status().unwrap();
477 if let Some(status) = status.await {
478 assert_eq!(status.status, Some("Success".to_owned()));
479 assert_eq!(status.reason, None);
480 }
481 }
482 }
483
484 {
486 let mut attached = pods
487 .exec(
488 "busybox-kube2",
489 vec!["sh"],
490 &AttachParams::default().stdin(true).stderr(false),
491 )
492 .await?;
493 let mut stdin_writer = attached.stdin().unwrap();
494 let mut stdout_stream = tokio_util::io::ReaderStream::new(attached.stdout().unwrap());
495 let next_stdout = stdout_stream.next();
496 stdin_writer.write_all(b"echo test string 1\n").await?;
497 let stdout = String::from_utf8(next_stdout.await.unwrap().unwrap().to_vec()).unwrap();
498 println!("{stdout}");
499 assert_eq!(stdout, "test string 1\n");
500
501 stdin_writer.write_all(b"exit 1\n").await?;
504 let status = attached.take_status().unwrap();
505 if let Some(status) = status.await {
506 println!("{status:?}");
507 assert_eq!(status.status, Some("Failure".to_owned()));
508 assert_eq!(status.reason, Some("NonZeroExitCode".to_owned()));
509 }
510 }
511
512 let dp = DeleteParams::default();
514 pods.delete("busybox-kube2", &dp).await?.map_left(|pdel| {
515 assert_eq!(pdel.name_unchecked(), "busybox-kube2");
516 });
517
518 Ok(())
519 }
520
521 #[tokio::test]
522 #[ignore = "needs cluster (will create and tail logs from a pod)"]
523 async fn can_get_pod_logs_and_evict() -> Result<(), Box<dyn std::error::Error>> {
524 use crate::{
525 api::{DeleteParams, EvictParams, ListParams, Patch, PatchParams, WatchEvent},
526 core::subresource::LogParams,
527 };
528
529 let client = Client::try_default().await?;
530 let pods: Api<Pod> = Api::default_namespaced(client);
531
532 let p: Pod = serde_json::from_value(json!({
534 "apiVersion": "v1",
535 "kind": "Pod",
536 "metadata": {
537 "name": "busybox-kube3",
538 "labels": { "app": "kube-rs-test" },
539 },
540 "spec": {
541 "terminationGracePeriodSeconds": 1,
542 "restartPolicy": "Never",
543 "containers": [{
544 "name": "busybox",
545 "image": "busybox:1.34.1",
546 "command": ["sh", "-c", "for i in $(seq 1 5); do echo kube $i; sleep 0.1; done"],
547 }],
548 }
549 }))?;
550
551 match pods.create(&Default::default(), &p).await {
552 Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()),
553 Err(crate::Error::Api(ae)) => assert_eq!(ae.code, 409), Err(e) => return Err(e.into()), }
556
557 let wp = WatchParams::default()
560 .fields(&format!("metadata.name={}", "busybox-kube3"))
561 .timeout(15);
562 let mut stream = pods.watch(&wp, "0").await?.boxed();
563 while let Some(ev) = stream.try_next().await? {
564 match ev {
565 WatchEvent::Modified(o) => {
566 let s = o.status.as_ref().expect("status exists on pod");
567 let phase = s.phase.clone().unwrap_or_default();
568 if phase == "Running" {
569 break;
570 }
571 }
572 WatchEvent::Error(e) => panic!("watch error: {e}"),
573 _ => {}
574 }
575 }
576
577 let lp = LogParams {
579 follow: true,
580 ..LogParams::default()
581 };
582 let mut logs_stream = pods.log_stream("busybox-kube3", &lp).await?.lines();
583
584 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
586
587 let all_logs = pods.logs("busybox-kube3", &Default::default()).await?;
588 assert_eq!(all_logs, "kube 1\nkube 2\nkube 3\nkube 4\nkube 5\n");
589
590 let mut output = vec![];
592 while let Some(line) = logs_stream.try_next().await? {
593 output.push(line);
594 }
595 assert_eq!(output, vec!["kube 1", "kube 2", "kube 3", "kube 4", "kube 5"]);
596
597 let ep = EvictParams::default();
599 let eres = pods.evict("busybox-kube3", &ep).await?;
600 assert_eq!(eres.code, 201); assert!(eres.is_success());
602
603 Ok(())
604 }
605
606 #[tokio::test]
607 #[ignore = "requires a cluster"]
608 async fn can_operate_on_pod_metadata() -> Result<(), Box<dyn std::error::Error>> {
609 use crate::{
610 api::{DeleteParams, EvictParams, ListParams, Patch, PatchParams, WatchEvent},
611 core::subresource::LogParams,
612 };
613 use kube_core::{ObjectList, ObjectMeta, PartialObjectMeta, PartialObjectMetaExt};
614
615 let client = Client::try_default().await?;
616 let pods: Api<Pod> = Api::default_namespaced(client);
617
618 let p: Pod = serde_json::from_value(json!({
620 "apiVersion": "v1",
621 "kind": "Pod",
622 "metadata": {
623 "name": "busybox-kube-meta",
624 "labels": { "app": "kube-rs-test" },
625 },
626 "spec": {
627 "terminationGracePeriodSeconds": 1,
628 "restartPolicy": "Never",
629 "containers": [{
630 "name": "busybox",
631 "image": "busybox:1.34.1",
632 "command": ["sh", "-c", "sleep 30s"],
633 }],
634 }
635 }))?;
636
637 match pods.create(&Default::default(), &p).await {
638 Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()),
639 Err(crate::Error::Api(ae)) => assert_eq!(ae.code, 409), Err(e) => return Err(e.into()), }
642
643 let pod_metadata = pods.get_metadata("busybox-kube-meta").await?;
646 assert_eq!("busybox-kube-meta", pod_metadata.name_any());
647 assert_eq!(
648 Some((&"app".to_string(), &"kube-rs-test".to_string())),
649 pod_metadata.labels().get_key_value("app")
650 );
651
652 let p_list = pods.list_metadata(&ListParams::default()).await?;
654
655 let pod_metadata = p_list
658 .items
659 .into_iter()
660 .find(|p| p.name_any() == "busybox-kube-meta")
661 .unwrap();
662 assert_eq!(
663 pod_metadata.labels().get("app"),
664 Some(&"kube-rs-test".to_string())
665 );
666
667 let patch = ObjectMeta {
669 annotations: Some([("test".to_string(), "123".to_string())].into()),
670 ..Default::default()
671 }
672 .into_request_partial::<Pod>();
673
674 let patchparams = PatchParams::default();
675 let p_patched = pods
676 .patch_metadata("busybox-kube-meta", &patchparams, &Patch::Merge(&patch))
677 .await?;
678 assert_eq!(p_patched.annotations().get("test"), Some(&"123".to_string()));
679 assert_eq!(p_patched.types.as_ref().unwrap().kind, "PartialObjectMetadata");
680 assert_eq!(p_patched.types.as_ref().unwrap().api_version, "meta.k8s.io/v1");
681
682 let dp = DeleteParams::default();
684 pods.delete("busybox-kube-meta", &dp).await?.map_left(|pdel| {
685 assert_eq!(pdel.name_any(), "busybox-kube-meta");
686 });
687
688 Ok(())
689 }
690 #[tokio::test]
691 #[ignore = "needs cluster (will create a CertificateSigningRequest)"]
692 async fn csr_can_be_approved() -> Result<(), Box<dyn std::error::Error>> {
693 use crate::api::PostParams;
694 use k8s_openapi::api::certificates::v1::{
695 CertificateSigningRequest, CertificateSigningRequestCondition, CertificateSigningRequestStatus,
696 };
697
698 let csr_name = "fake";
699 let dummy_csr: CertificateSigningRequest = serde_json::from_value(json!({
700 "apiVersion": "certificates.k8s.io/v1",
701 "kind": "CertificateSigningRequest",
702 "metadata": { "name": csr_name },
703 "spec": {
704 "request": "LS0tLS1CRUdJTiBDRVJUSUZJQ0FURSBSRVFVRVNULS0tLS0KTUlJQ1ZqQ0NBVDRDQVFBd0VURVBNQTBHQTFVRUF3d0dZVzVuWld4aE1JSUJJakFOQmdrcWhraUc5dzBCQVFFRgpBQU9DQVE4QU1JSUJDZ0tDQVFFQTByczhJTHRHdTYxakx2dHhWTTJSVlRWMDNHWlJTWWw0dWluVWo4RElaWjBOCnR2MUZtRVFSd3VoaUZsOFEzcWl0Qm0wMUFSMkNJVXBGd2ZzSjZ4MXF3ckJzVkhZbGlBNVhwRVpZM3ExcGswSDQKM3Z3aGJlK1o2MVNrVHF5SVBYUUwrTWM5T1Nsbm0xb0R2N0NtSkZNMUlMRVI3QTVGZnZKOEdFRjJ6dHBoaUlFMwpub1dtdHNZb3JuT2wzc2lHQ2ZGZzR4Zmd4eW8ybmlneFNVekl1bXNnVm9PM2ttT0x1RVF6cXpkakJ3TFJXbWlECklmMXBMWnoyalVnald4UkhCM1gyWnVVV1d1T09PZnpXM01LaE8ybHEvZi9DdS8wYk83c0x0MCt3U2ZMSU91TFcKcW90blZtRmxMMytqTy82WDNDKzBERHk5aUtwbXJjVDBnWGZLemE1dHJRSURBUUFCb0FBd0RRWUpLb1pJaHZjTgpBUUVMQlFBRGdnRUJBR05WdmVIOGR4ZzNvK21VeVRkbmFjVmQ1N24zSkExdnZEU1JWREkyQTZ1eXN3ZFp1L1BVCkkwZXpZWFV0RVNnSk1IRmQycVVNMjNuNVJsSXJ3R0xuUXFISUh5VStWWHhsdnZsRnpNOVpEWllSTmU3QlJvYXgKQVlEdUI5STZXT3FYbkFvczFqRmxNUG5NbFpqdU5kSGxpT1BjTU1oNndLaTZzZFhpVStHYTJ2RUVLY01jSVUyRgpvU2djUWdMYTk0aEpacGk3ZnNMdm1OQUxoT045UHdNMGM1dVJVejV4T0dGMUtCbWRSeEgvbUNOS2JKYjFRQm1HCkkwYitEUEdaTktXTU0xMzhIQXdoV0tkNjVoVHdYOWl4V3ZHMkh4TG1WQzg0L1BHT0tWQW9FNkpsYWFHdTlQVmkKdjlOSjVaZlZrcXdCd0hKbzZXdk9xVlA3SVFjZmg3d0drWm89Ci0tLS0tRU5EIENFUlRJRklDQVRFIFJFUVVFU1QtLS0tLQo=",
705 "signerName": "kubernetes.io/kube-apiserver-client",
706 "expirationSeconds": 86400,
707 "usages": ["client auth"]
708 }
709 }))?;
710
711 let client = Client::try_default().await?;
712 let csr: Api<CertificateSigningRequest> = Api::all(client.clone());
713 assert!(csr.create(&PostParams::default(), &dummy_csr).await.is_ok());
714
715 let approval_type = "ApprovedFake";
717 let csr_status: CertificateSigningRequestStatus = CertificateSigningRequestStatus {
718 certificate: None,
719 conditions: Some(vec![CertificateSigningRequestCondition {
720 type_: approval_type.to_string(),
721 last_update_time: None,
722 last_transition_time: None,
723 message: Some(format!("{} {}", approval_type, "by kube-rs client")),
724 reason: Some("kube-rsClient".to_string()),
725 status: "True".to_string(),
726 }]),
727 };
728 let csr_status_patch = Patch::Merge(serde_json::json!({ "status": csr_status }));
729 let _ = csr
730 .patch_approval(csr_name, &Default::default(), &csr_status_patch)
731 .await?;
732 let csr_after_approval = csr.get_approval(csr_name).await?;
733
734 assert_eq!(
735 csr_after_approval
736 .status
737 .as_ref()
738 .unwrap()
739 .conditions
740 .as_ref()
741 .unwrap()[0]
742 .type_,
743 approval_type.to_string()
744 );
745 csr.delete(csr_name, &DeleteParams::default()).await?;
746 Ok(())
747 }
748
749 #[tokio::test]
750 #[ignore = "needs cluster for ephemeral containers operations"]
751 async fn can_operate_on_ephemeral_containers() -> Result<(), Box<dyn std::error::Error>> {
752 let client = Client::try_default().await?;
753
754 let api_version = client.apiserver_version().await?;
757 if api_version.major.parse::<i32>()? < 1 || api_version.minor.parse::<i32>()? < 25 {
758 return Ok(());
759 }
760
761 let pod: Pod = serde_json::from_value(serde_json::json!({
762 "apiVersion": "v1",
763 "kind": "Pod",
764 "metadata": {
765 "name": "ephemeral-container-test",
766 "labels": { "app": "kube-rs-test" },
767 },
768 "spec": {
769 "restartPolicy": "Never",
770 "containers": [{
771 "name": "busybox",
772 "image": "busybox:1.34.1",
773 "command": ["sh", "-c", "sleep 2"],
774 }],
775 }
776 }))?;
777
778 let pod_name = pod.name_any();
779 let pods = Api::<Pod>::default_namespaced(client);
780
781 let _ = pods
786 .delete(&pod.name_any(), &DeleteParams::default())
787 .await
788 .map(|v| v.map_left(|pdel| assert_eq!(pdel.name_any(), pod.name_any())));
789
790 match pods.create(&Default::default(), &pod).await {
793 Ok(o) => assert_eq!(pod.name_unchecked(), o.name_unchecked()),
794 Err(e) => return Err(e.into()), }
796
797 let current_ephemeral_containers = pods
798 .get_ephemeral_containers(&pod.name_any())
799 .await?
800 .spec
801 .unwrap()
802 .ephemeral_containers;
803
804 assert_eq!(current_ephemeral_containers, None);
807
808 let mut busybox_eph: EphemeralContainer = serde_json::from_value(json!(
809 {
810 "name": "myephemeralcontainer1",
811 "image": "busybox:1.34.1",
812 "command": ["sh", "-c", "sleep 2"],
813 }
814 ))?;
815
816 let patch: Pod = serde_json::from_value(json!({
819 "metadata": { "name": pod_name },
820 "spec":{ "ephemeralContainers": [ busybox_eph ] }
821 }))?;
822
823 let current_containers = pods
824 .replace_ephemeral_containers(&pod_name, &PostParams::default(), &patch)
825 .await?
826 .spec
827 .unwrap()
828 .ephemeral_containers
829 .expect("could find ephemeral container");
830
831 assert_eq!(current_containers.len(), 1);
834 assert_eq!(current_containers[0].name, busybox_eph.name);
835 assert_eq!(current_containers[0].image, busybox_eph.image);
836 assert_eq!(current_containers[0].command, busybox_eph.command);
837
838 busybox_eph = serde_json::from_value(json!(
843 {
844 "name": "myephemeralcontainer2",
845 "image": "busybox:1.35.0",
846 "command": ["sh", "-c", "sleep 1"],
847 }
848 ))?;
849
850 let patch: Pod =
851 serde_json::from_value(json!({ "spec": { "ephemeralContainers": [ busybox_eph ] }}))?;
852
853 let current_containers = pods
854 .patch_ephemeral_containers(&pod_name, &PatchParams::default(), &Patch::Strategic(patch))
855 .await?
856 .spec
857 .unwrap()
858 .ephemeral_containers
859 .expect("could find ephemeral container");
860
861 assert_eq!(current_containers.len(), 2);
864
865 let new_container = current_containers
866 .iter()
867 .find(|c| c.name == busybox_eph.name)
868 .expect("could find myephemeralcontainer2");
869
870 assert_eq!(new_container.image, busybox_eph.image);
873 assert_eq!(new_container.command, busybox_eph.command);
874
875 let expected_containers = current_containers;
878
879 let current_containers = pods
880 .get_ephemeral_containers(&pod.name_any())
881 .await?
882 .spec
883 .unwrap()
884 .ephemeral_containers
885 .unwrap();
886
887 assert_eq!(current_containers, expected_containers);
888
889 pods.delete(&pod.name_any(), &DeleteParams::default())
890 .await?
891 .map_left(|pdel| {
892 assert_eq!(pdel.name_any(), pod.name_any());
893 });
894
895 Ok(())
896 }
897
898 #[tokio::test]
899 #[ignore = "needs kubelet debug methods"]
900 #[cfg(feature = "kubelet-debug")]
901 async fn pod_can_exec_and_write_to_stdin_from_node_proxy() -> Result<(), Box<dyn std::error::Error>> {
902 use crate::{
903 api::{DeleteParams, ListParams, Patch, PatchParams, WatchEvent},
904 core::kubelet_debug::KubeletDebugParams,
905 };
906
907 let client = Client::try_default().await?;
908 let pods: Api<Pod> = Api::default_namespaced(client);
909
910 let p: Pod = serde_json::from_value(json!({
912 "apiVersion": "v1",
913 "kind": "Pod",
914 "metadata": {
915 "name": "busybox-kube2",
916 "labels": { "app": "kube-rs-test" },
917 },
918 "spec": {
919 "terminationGracePeriodSeconds": 1,
920 "restartPolicy": "Never",
921 "containers": [{
922 "name": "busybox",
923 "image": "busybox:1.34.1",
924 "command": ["sh", "-c", "sleep 30"],
925 }],
926 }
927 }))?;
928
929 match pods.create(&Default::default(), &p).await {
930 Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()),
931 Err(crate::Error::Api(ae)) => assert_eq!(ae.code, 409), Err(e) => return Err(e.into()), }
934
935 let wp = WatchParams::default()
938 .fields(&format!("metadata.name={}", "busybox-kube2"))
939 .timeout(15);
940 let mut stream = pods.watch(&wp, "0").await?.boxed();
941 while let Some(ev) = stream.try_next().await? {
942 match ev {
943 WatchEvent::Modified(o) => {
944 let s = o.status.as_ref().expect("status exists on pod");
945 let phase = s.phase.clone().unwrap_or_default();
946 if phase == "Running" {
947 break;
948 }
949 }
950 WatchEvent::Error(e) => panic!("watch error: {e}"),
951 _ => {}
952 }
953 }
954
955 let mut config = Config::infer().await?;
956 config.accept_invalid_certs = true;
957 config.cluster_url = "https://localhost:10250".to_string().parse::<Uri>().unwrap();
958 let kubelet_client: Client = config.try_into()?;
959
960 {
962 let mut attached = kubelet_client
963 .kubelet_node_exec(
964 &KubeletDebugParams {
965 name: "busybox-kube2",
966 namespace: "default",
967 ..Default::default()
968 },
969 "busybox",
970 vec!["sh", "-c", "for i in $(seq 1 3); do echo $i; done"],
971 &AttachParams::default().stderr(false),
972 )
973 .await?;
974 let stdout = tokio_util::io::ReaderStream::new(attached.stdout().unwrap());
975 let out = stdout
976 .filter_map(|r| async { r.ok().and_then(|v| String::from_utf8(v.to_vec()).ok()) })
977 .collect::<Vec<_>>()
978 .await
979 .join("");
980 attached.join().await.unwrap();
981 assert_eq!(out.lines().count(), 3);
982 assert_eq!(out, "1\n2\n3\n");
983 }
984
985 {
987 use tokio::io::AsyncWriteExt;
988 let mut attached = kubelet_client
989 .kubelet_node_exec(
990 &KubeletDebugParams {
991 name: "busybox-kube2",
992 namespace: "default",
993 ..Default::default()
994 },
995 "busybox",
996 vec!["sh"],
997 &AttachParams::default().stdin(true).stderr(false),
998 )
999 .await?;
1000 let mut stdin_writer = attached.stdin().unwrap();
1001 let mut stdout_stream = tokio_util::io::ReaderStream::new(attached.stdout().unwrap());
1002 let next_stdout = stdout_stream.next();
1003 stdin_writer.write_all(b"echo test string 1\n").await?;
1004 let stdout = String::from_utf8(next_stdout.await.unwrap().unwrap().to_vec()).unwrap();
1005 println!("{stdout}");
1006 assert_eq!(stdout, "test string 1\n");
1007
1008 stdin_writer.write_all(b"exit 1\n").await?;
1011 let status = attached.take_status().unwrap();
1012 if let Some(status) = status.await {
1013 println!("{status:?}");
1014 assert_eq!(status.status, Some("Failure".to_owned()));
1015 assert_eq!(status.reason, Some("NonZeroExitCode".to_owned()));
1016 }
1017 }
1018
1019 let dp = DeleteParams::default();
1021 pods.delete("busybox-kube2", &dp).await?.map_left(|pdel| {
1022 assert_eq!(pdel.name_unchecked(), "busybox-kube2");
1023 });
1024
1025 Ok(())
1026 }
1027}