1#![cfg_attr(docsrs, feature(doc_cfg))]
64#![allow(clippy::unnecessary_lazy_evaluations)]
66
67macro_rules! cfg_client {
68 ($($item:item)*) => {
69 $(
70 #[cfg_attr(docsrs, doc(cfg(feature = "client")))]
71 #[cfg(feature = "client")]
72 $item
73 )*
74 }
75}
76macro_rules! cfg_config {
77 ($($item:item)*) => {
78 $(
79 #[cfg_attr(docsrs, doc(cfg(feature = "config")))]
80 #[cfg(feature = "config")]
81 $item
82 )*
83 }
84}
85
86macro_rules! cfg_error {
87 ($($item:item)*) => {
88 $(
89 #[cfg_attr(docsrs, doc(cfg(any(feature = "config", feature = "client"))))]
90 #[cfg(any(feature = "config", feature = "client"))]
91 $item
92 )*
93 }
94}
95
96cfg_client! {
97 pub mod api;
98 pub mod discovery;
99 pub mod client;
100
101 #[doc(inline)]
102 pub use api::Api;
103 #[doc(inline)]
104 pub use client::Client;
105 #[doc(inline)]
106 pub use discovery::Discovery;
107}
108
109cfg_config! {
110 pub mod config;
111 #[doc(inline)]
112 pub use config::Config;
113}
114
115cfg_error! {
116 pub mod error;
117 #[doc(inline)] pub use error::Error;
118 pub type Result<T, E = Error> = std::result::Result<T, E>;
120}
121
122pub use crate::core::{CustomResourceExt, Resource, ResourceExt};
123pub use kube_core as core;
125
126#[cfg(all(feature = "client", feature = "config"))]
129#[cfg(test)]
130#[allow(unused_imports)] mod test {
132 use crate::{
133 api::{AttachParams, AttachedProcess},
134 client::ConfigExt,
135 Api, Client, Config, ResourceExt,
136 };
137 use futures::{AsyncBufRead, AsyncBufReadExt, StreamExt, TryStreamExt};
138 use hyper::Uri;
139 use k8s_openapi::api::core::v1::{EphemeralContainer, Pod, PodSpec};
140 use kube_core::{
141 params::{DeleteParams, Patch, PatchParams, PostParams, WatchParams},
142 response::StatusSummary,
143 };
144 use serde_json::json;
145 use tower::ServiceBuilder;
146
147 #[allow(dead_code)]
149 #[ignore = "needs cluster (lists pods)"]
151 #[cfg(feature = "rustls-tls")]
152 async fn custom_client_rustls_configuration() -> Result<(), Box<dyn std::error::Error>> {
153 use hyper_util::rt::TokioExecutor;
154
155 let config = Config::infer().await?;
156 let https = config.rustls_https_connector()?;
157 let service = ServiceBuilder::new()
158 .layer(config.base_uri_layer())
159 .service(hyper_util::client::legacy::Client::builder(TokioExecutor::new()).build(https));
160 let client = Client::new(service, config.default_namespace);
161 let pods: Api<Pod> = Api::default_namespaced(client);
162 pods.list(&Default::default()).await?;
163 Ok(())
164 }
165
166 #[tokio::test]
167 #[ignore = "needs cluster (lists pods)"]
168 #[cfg(feature = "openssl-tls")]
169 async fn custom_client_openssl_tls_configuration() -> Result<(), Box<dyn std::error::Error>> {
170 use hyper_util::rt::TokioExecutor;
171
172 let config = Config::infer().await?;
173 let https = config.openssl_https_connector()?;
174 let service = ServiceBuilder::new()
175 .layer(config.base_uri_layer())
176 .service(hyper_util::client::legacy::Client::builder(TokioExecutor::new()).build(https));
177 let client = Client::new(service, config.default_namespace);
178 let pods: Api<Pod> = Api::default_namespaced(client);
179 pods.list(&Default::default()).await?;
180 Ok(())
181 }
182
183 #[tokio::test]
184 #[ignore = "needs cluster (lists api resources)"]
185 #[cfg(feature = "client")]
186 async fn group_discovery_oneshot() -> Result<(), Box<dyn std::error::Error>> {
187 use crate::{core::DynamicObject, discovery};
188 let client = Client::try_default().await?;
189 let apigroup = discovery::group(&client, "apiregistration.k8s.io").await?;
190 let (ar, _caps) = apigroup.recommended_kind("APIService").unwrap();
191 let api: Api<DynamicObject> = Api::all_with(client.clone(), &ar);
192 api.list(&Default::default()).await?;
193
194 Ok(())
195 }
196
197 #[tokio::test]
198 #[ignore = "needs cluster (will create and edit a pod)"]
199 async fn pod_can_use_core_apis() -> Result<(), Box<dyn std::error::Error>> {
200 use kube::api::{DeleteParams, ListParams, Patch, PatchParams, PostParams, WatchEvent};
201
202 let client = Client::try_default().await?;
203 let pods: Api<Pod> = Api::default_namespaced(client);
204
205 let p: Pod = serde_json::from_value(json!({
207 "apiVersion": "v1",
208 "kind": "Pod",
209 "metadata": {
210 "name": "busybox-kube1",
211 "labels": { "app": "kube-rs-test" },
212 },
213 "spec": {
214 "terminationGracePeriodSeconds": 1,
215 "restartPolicy": "Never",
216 "containers": [{
217 "name": "busybox",
218 "image": "busybox:1.34.1",
219 "command": ["sh", "-c", "sleep 30"],
220 }],
221 }
222 }))?;
223
224 let pp = PostParams::default();
225 match pods.create(&pp, &p).await {
226 Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()),
227 Err(crate::Error::Api(ae)) => assert_eq!(ae.code, 409), Err(e) => return Err(e.into()), }
230
231 let wp = WatchParams::default()
234 .fields(&format!("metadata.name={}", "busybox-kube1"))
235 .timeout(15);
236 let mut stream = pods.watch(&wp, "0").await?.boxed();
237 while let Some(ev) = stream.try_next().await? {
238 let _ = format!("we: {ev:?}");
240 match ev {
241 WatchEvent::Modified(o) => {
242 let s = o.status.as_ref().expect("status exists on pod");
243 let phase = s.phase.clone().unwrap_or_default();
244 if phase == "Running" {
245 break;
246 }
247 }
248 WatchEvent::Error(e) => panic!("watch error: {e}"),
249 _ => {}
250 }
251 }
252
253 let mut pod = pods.get("busybox-kube1").await?;
255 assert_eq!(p.spec.as_ref().unwrap().containers[0].name, "busybox");
256
257 {
260 assert!(pod.resource_version().is_some());
261 pod.spec.as_mut().unwrap().active_deadline_seconds = Some(5);
262
263 let pp = PostParams::default();
264 let patched_pod = pods.replace("busybox-kube1", &pp, &pod).await?;
265 assert_eq!(patched_pod.spec.unwrap().active_deadline_seconds, Some(5));
266 }
267
268 let dp = DeleteParams::default();
270 pods.delete("busybox-kube1", &dp).await?.map_left(|pdel| {
271 assert_eq!(pdel.name_unchecked(), "busybox-kube1");
272 });
273
274 Ok(())
275 }
276
277 #[tokio::test]
278 #[ignore = "needs cluster (will create and attach to a pod)"]
279 #[cfg(feature = "ws")]
280 async fn pod_can_exec_and_write_to_stdin() -> Result<(), Box<dyn std::error::Error>> {
281 use crate::api::{DeleteParams, ListParams, Patch, PatchParams, WatchEvent};
282
283 let client = Client::try_default().await?;
284 let pods: Api<Pod> = Api::default_namespaced(client);
285
286 let p: Pod = serde_json::from_value(json!({
288 "apiVersion": "v1",
289 "kind": "Pod",
290 "metadata": {
291 "name": "busybox-kube2",
292 "labels": { "app": "kube-rs-test" },
293 },
294 "spec": {
295 "terminationGracePeriodSeconds": 1,
296 "restartPolicy": "Never",
297 "containers": [{
298 "name": "busybox",
299 "image": "busybox:1.34.1",
300 "command": ["sh", "-c", "sleep 30"],
301 }],
302 }
303 }))?;
304
305 match pods.create(&Default::default(), &p).await {
306 Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()),
307 Err(crate::Error::Api(ae)) => assert_eq!(ae.code, 409), Err(e) => return Err(e.into()), }
310
311 let wp = WatchParams::default()
314 .fields(&format!("metadata.name={}", "busybox-kube2"))
315 .timeout(15);
316 let mut stream = pods.watch(&wp, "0").await?.boxed();
317 while let Some(ev) = stream.try_next().await? {
318 match ev {
319 WatchEvent::Modified(o) => {
320 let s = o.status.as_ref().expect("status exists on pod");
321 let phase = s.phase.clone().unwrap_or_default();
322 if phase == "Running" {
323 break;
324 }
325 }
326 WatchEvent::Error(e) => panic!("watch error: {e}"),
327 _ => {}
328 }
329 }
330
331 {
333 let mut attached = pods
334 .exec(
335 "busybox-kube2",
336 vec!["sh", "-c", "for i in $(seq 1 3); do echo $i; done"],
337 &AttachParams::default().stderr(false),
338 )
339 .await?;
340 let stdout = tokio_util::io::ReaderStream::new(attached.stdout().unwrap());
341 let out = stdout
342 .filter_map(|r| async { r.ok().and_then(|v| String::from_utf8(v.to_vec()).ok()) })
343 .collect::<Vec<_>>()
344 .await
345 .join("");
346 attached.join().await.unwrap();
347 assert_eq!(out.lines().count(), 3);
348 assert_eq!(out, "1\n2\n3\n");
349 }
350
351 {
353 use tokio::io::AsyncWriteExt;
354 let mut attached = pods
355 .exec(
356 "busybox-kube2",
357 vec!["sh"],
358 &AttachParams::default().stdin(true).stderr(false),
359 )
360 .await?;
361 let mut stdin_writer = attached.stdin().unwrap();
362 let mut stdout_stream = tokio_util::io::ReaderStream::new(attached.stdout().unwrap());
363 let next_stdout = stdout_stream.next();
364 stdin_writer.write_all(b"echo test string 1\n").await?;
365 let stdout = String::from_utf8(next_stdout.await.unwrap().unwrap().to_vec()).unwrap();
366 println!("{stdout}");
367 assert_eq!(stdout, "test string 1\n");
368
369 stdin_writer.write_all(b"exit 1\n").await?;
372 let status = attached.take_status().unwrap();
373 if let Some(status) = status.await {
374 println!("{status:?}");
375 assert_eq!(status.status, Some("Failure".to_owned()));
376 assert_eq!(status.reason, Some("NonZeroExitCode".to_owned()));
377 }
378 }
379
380 let dp = DeleteParams::default();
382 pods.delete("busybox-kube2", &dp).await?.map_left(|pdel| {
383 assert_eq!(pdel.name_unchecked(), "busybox-kube2");
384 });
385
386 Ok(())
387 }
388
389 #[tokio::test]
390 #[ignore = "needs cluster (will create and tail logs from a pod)"]
391 async fn can_get_pod_logs_and_evict() -> Result<(), Box<dyn std::error::Error>> {
392 use crate::{
393 api::{DeleteParams, EvictParams, ListParams, Patch, PatchParams, WatchEvent},
394 core::subresource::LogParams,
395 };
396
397 let client = Client::try_default().await?;
398 let pods: Api<Pod> = Api::default_namespaced(client);
399
400 let p: Pod = serde_json::from_value(json!({
402 "apiVersion": "v1",
403 "kind": "Pod",
404 "metadata": {
405 "name": "busybox-kube3",
406 "labels": { "app": "kube-rs-test" },
407 },
408 "spec": {
409 "terminationGracePeriodSeconds": 1,
410 "restartPolicy": "Never",
411 "containers": [{
412 "name": "busybox",
413 "image": "busybox:1.34.1",
414 "command": ["sh", "-c", "for i in $(seq 1 5); do echo kube $i; sleep 0.1; done"],
415 }],
416 }
417 }))?;
418
419 match pods.create(&Default::default(), &p).await {
420 Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()),
421 Err(crate::Error::Api(ae)) => assert_eq!(ae.code, 409), Err(e) => return Err(e.into()), }
424
425 let wp = WatchParams::default()
428 .fields(&format!("metadata.name={}", "busybox-kube3"))
429 .timeout(15);
430 let mut stream = pods.watch(&wp, "0").await?.boxed();
431 while let Some(ev) = stream.try_next().await? {
432 match ev {
433 WatchEvent::Modified(o) => {
434 let s = o.status.as_ref().expect("status exists on pod");
435 let phase = s.phase.clone().unwrap_or_default();
436 if phase == "Running" {
437 break;
438 }
439 }
440 WatchEvent::Error(e) => panic!("watch error: {e}"),
441 _ => {}
442 }
443 }
444
445 let lp = LogParams {
447 follow: true,
448 ..LogParams::default()
449 };
450 let mut logs_stream = pods.log_stream("busybox-kube3", &lp).await?.lines();
451
452 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
454
455 let all_logs = pods.logs("busybox-kube3", &Default::default()).await?;
456 assert_eq!(all_logs, "kube 1\nkube 2\nkube 3\nkube 4\nkube 5\n");
457
458 let mut output = vec![];
460 while let Some(line) = logs_stream.try_next().await? {
461 output.push(line);
462 }
463 assert_eq!(output, vec!["kube 1", "kube 2", "kube 3", "kube 4", "kube 5"]);
464
465 let ep = EvictParams::default();
467 let eres = pods.evict("busybox-kube3", &ep).await?;
468 assert_eq!(eres.code, 201); assert!(eres.is_success());
470
471 Ok(())
472 }
473
474 #[tokio::test]
475 #[ignore = "requires a cluster"]
476 async fn can_operate_on_pod_metadata() -> Result<(), Box<dyn std::error::Error>> {
477 use crate::{
478 api::{DeleteParams, EvictParams, ListParams, Patch, PatchParams, WatchEvent},
479 core::subresource::LogParams,
480 };
481 use kube_core::{ObjectList, ObjectMeta, PartialObjectMeta, PartialObjectMetaExt};
482
483 let client = Client::try_default().await?;
484 let pods: Api<Pod> = Api::default_namespaced(client);
485
486 let p: Pod = serde_json::from_value(json!({
488 "apiVersion": "v1",
489 "kind": "Pod",
490 "metadata": {
491 "name": "busybox-kube-meta",
492 "labels": { "app": "kube-rs-test" },
493 },
494 "spec": {
495 "terminationGracePeriodSeconds": 1,
496 "restartPolicy": "Never",
497 "containers": [{
498 "name": "busybox",
499 "image": "busybox:1.34.1",
500 "command": ["sh", "-c", "sleep 30s"],
501 }],
502 }
503 }))?;
504
505 match pods.create(&Default::default(), &p).await {
506 Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()),
507 Err(crate::Error::Api(ae)) => assert_eq!(ae.code, 409), Err(e) => return Err(e.into()), }
510
511 let pod_metadata = pods.get_metadata("busybox-kube-meta").await?;
514 assert_eq!("busybox-kube-meta", pod_metadata.name_any());
515 assert_eq!(
516 Some((&"app".to_string(), &"kube-rs-test".to_string())),
517 pod_metadata.labels().get_key_value("app")
518 );
519
520 let p_list = pods.list_metadata(&ListParams::default()).await?;
522
523 let pod_metadata = p_list
526 .items
527 .into_iter()
528 .find(|p| p.name_any() == "busybox-kube-meta")
529 .unwrap();
530 assert_eq!(
531 pod_metadata.labels().get("app"),
532 Some(&"kube-rs-test".to_string())
533 );
534
535 let patch = ObjectMeta {
537 annotations: Some([("test".to_string(), "123".to_string())].into()),
538 ..Default::default()
539 }
540 .into_request_partial::<Pod>();
541
542 let patchparams = PatchParams::default();
543 let p_patched = pods
544 .patch_metadata("busybox-kube-meta", &patchparams, &Patch::Merge(&patch))
545 .await?;
546 assert_eq!(p_patched.annotations().get("test"), Some(&"123".to_string()));
547 assert_eq!(p_patched.types.as_ref().unwrap().kind, "PartialObjectMetadata");
548 assert_eq!(p_patched.types.as_ref().unwrap().api_version, "meta.k8s.io/v1");
549
550 let dp = DeleteParams::default();
552 pods.delete("busybox-kube-meta", &dp).await?.map_left(|pdel| {
553 assert_eq!(pdel.name_any(), "busybox-kube-meta");
554 });
555
556 Ok(())
557 }
558 #[tokio::test]
559 #[ignore = "needs cluster (will create a CertificateSigningRequest)"]
560 async fn csr_can_be_approved() -> Result<(), Box<dyn std::error::Error>> {
561 use crate::api::PostParams;
562 use k8s_openapi::api::certificates::v1::{
563 CertificateSigningRequest, CertificateSigningRequestCondition, CertificateSigningRequestStatus,
564 };
565
566 let csr_name = "fake";
567 let dummy_csr: CertificateSigningRequest = serde_json::from_value(json!({
568 "apiVersion": "certificates.k8s.io/v1",
569 "kind": "CertificateSigningRequest",
570 "metadata": { "name": csr_name },
571 "spec": {
572 "request": "LS0tLS1CRUdJTiBDRVJUSUZJQ0FURSBSRVFVRVNULS0tLS0KTUlJQ1ZqQ0NBVDRDQVFBd0VURVBNQTBHQTFVRUF3d0dZVzVuWld4aE1JSUJJakFOQmdrcWhraUc5dzBCQVFFRgpBQU9DQVE4QU1JSUJDZ0tDQVFFQTByczhJTHRHdTYxakx2dHhWTTJSVlRWMDNHWlJTWWw0dWluVWo4RElaWjBOCnR2MUZtRVFSd3VoaUZsOFEzcWl0Qm0wMUFSMkNJVXBGd2ZzSjZ4MXF3ckJzVkhZbGlBNVhwRVpZM3ExcGswSDQKM3Z3aGJlK1o2MVNrVHF5SVBYUUwrTWM5T1Nsbm0xb0R2N0NtSkZNMUlMRVI3QTVGZnZKOEdFRjJ6dHBoaUlFMwpub1dtdHNZb3JuT2wzc2lHQ2ZGZzR4Zmd4eW8ybmlneFNVekl1bXNnVm9PM2ttT0x1RVF6cXpkakJ3TFJXbWlECklmMXBMWnoyalVnald4UkhCM1gyWnVVV1d1T09PZnpXM01LaE8ybHEvZi9DdS8wYk83c0x0MCt3U2ZMSU91TFcKcW90blZtRmxMMytqTy82WDNDKzBERHk5aUtwbXJjVDBnWGZLemE1dHJRSURBUUFCb0FBd0RRWUpLb1pJaHZjTgpBUUVMQlFBRGdnRUJBR05WdmVIOGR4ZzNvK21VeVRkbmFjVmQ1N24zSkExdnZEU1JWREkyQTZ1eXN3ZFp1L1BVCkkwZXpZWFV0RVNnSk1IRmQycVVNMjNuNVJsSXJ3R0xuUXFISUh5VStWWHhsdnZsRnpNOVpEWllSTmU3QlJvYXgKQVlEdUI5STZXT3FYbkFvczFqRmxNUG5NbFpqdU5kSGxpT1BjTU1oNndLaTZzZFhpVStHYTJ2RUVLY01jSVUyRgpvU2djUWdMYTk0aEpacGk3ZnNMdm1OQUxoT045UHdNMGM1dVJVejV4T0dGMUtCbWRSeEgvbUNOS2JKYjFRQm1HCkkwYitEUEdaTktXTU0xMzhIQXdoV0tkNjVoVHdYOWl4V3ZHMkh4TG1WQzg0L1BHT0tWQW9FNkpsYWFHdTlQVmkKdjlOSjVaZlZrcXdCd0hKbzZXdk9xVlA3SVFjZmg3d0drWm89Ci0tLS0tRU5EIENFUlRJRklDQVRFIFJFUVVFU1QtLS0tLQo=",
573 "signerName": "kubernetes.io/kube-apiserver-client",
574 "expirationSeconds": 86400,
575 "usages": ["client auth"]
576 }
577 }))?;
578
579 let client = Client::try_default().await?;
580 let csr: Api<CertificateSigningRequest> = Api::all(client.clone());
581 assert!(csr.create(&PostParams::default(), &dummy_csr).await.is_ok());
582
583 let approval_type = "ApprovedFake";
585 let csr_status: CertificateSigningRequestStatus = CertificateSigningRequestStatus {
586 certificate: None,
587 conditions: Some(vec![CertificateSigningRequestCondition {
588 type_: approval_type.to_string(),
589 last_update_time: None,
590 last_transition_time: None,
591 message: Some(format!("{} {}", approval_type, "by kube-rs client")),
592 reason: Some("kube-rsClient".to_string()),
593 status: "True".to_string(),
594 }]),
595 };
596 let csr_status_patch = Patch::Merge(serde_json::json!({ "status": csr_status }));
597 let _ = csr
598 .patch_approval(csr_name, &Default::default(), &csr_status_patch)
599 .await?;
600 let csr_after_approval = csr.get_approval(csr_name).await?;
601
602 assert_eq!(
603 csr_after_approval
604 .status
605 .as_ref()
606 .unwrap()
607 .conditions
608 .as_ref()
609 .unwrap()[0]
610 .type_,
611 approval_type.to_string()
612 );
613 csr.delete(csr_name, &DeleteParams::default()).await?;
614 Ok(())
615 }
616
617 #[tokio::test]
618 #[ignore = "needs cluster for ephemeral containers operations"]
619 async fn can_operate_on_ephemeral_containers() -> Result<(), Box<dyn std::error::Error>> {
620 let client = Client::try_default().await?;
621
622 let api_version = client.apiserver_version().await?;
625 if api_version.major.parse::<i32>()? < 1 || api_version.minor.parse::<i32>()? < 25 {
626 return Ok(());
627 }
628
629 let pod: Pod = serde_json::from_value(serde_json::json!({
630 "apiVersion": "v1",
631 "kind": "Pod",
632 "metadata": {
633 "name": "ephemeral-container-test",
634 "labels": { "app": "kube-rs-test" },
635 },
636 "spec": {
637 "restartPolicy": "Never",
638 "containers": [{
639 "name": "busybox",
640 "image": "busybox:1.34.1",
641 "command": ["sh", "-c", "sleep 2"],
642 }],
643 }
644 }))?;
645
646 let pod_name = pod.name_any();
647 let pods = Api::<Pod>::default_namespaced(client);
648
649 let _ = pods
654 .delete(&pod.name_any(), &DeleteParams::default())
655 .await
656 .map(|v| v.map_left(|pdel| assert_eq!(pdel.name_any(), pod.name_any())));
657
658 match pods.create(&Default::default(), &pod).await {
661 Ok(o) => assert_eq!(pod.name_unchecked(), o.name_unchecked()),
662 Err(e) => return Err(e.into()), }
664
665 let current_ephemeral_containers = pods
666 .get_ephemeral_containers(&pod.name_any())
667 .await?
668 .spec
669 .unwrap()
670 .ephemeral_containers;
671
672 assert_eq!(current_ephemeral_containers, None);
675
676 let mut busybox_eph: EphemeralContainer = serde_json::from_value(json!(
677 {
678 "name": "myephemeralcontainer1",
679 "image": "busybox:1.34.1",
680 "command": ["sh", "-c", "sleep 2"],
681 }
682 ))?;
683
684 let patch: Pod = serde_json::from_value(json!({
687 "metadata": { "name": pod_name },
688 "spec":{ "ephemeralContainers": [ busybox_eph ] }
689 }))?;
690
691 let current_containers = pods
692 .replace_ephemeral_containers(&pod_name, &PostParams::default(), &patch)
693 .await?
694 .spec
695 .unwrap()
696 .ephemeral_containers
697 .expect("could find ephemeral container");
698
699 assert_eq!(current_containers.len(), 1);
702 assert_eq!(current_containers[0].name, busybox_eph.name);
703 assert_eq!(current_containers[0].image, busybox_eph.image);
704 assert_eq!(current_containers[0].command, busybox_eph.command);
705
706 busybox_eph = serde_json::from_value(json!(
711 {
712 "name": "myephemeralcontainer2",
713 "image": "busybox:1.35.0",
714 "command": ["sh", "-c", "sleep 1"],
715 }
716 ))?;
717
718 let patch: Pod =
719 serde_json::from_value(json!({ "spec": { "ephemeralContainers": [ busybox_eph ] }}))?;
720
721 let current_containers = pods
722 .patch_ephemeral_containers(&pod_name, &PatchParams::default(), &Patch::Strategic(patch))
723 .await?
724 .spec
725 .unwrap()
726 .ephemeral_containers
727 .expect("could find ephemeral container");
728
729 assert_eq!(current_containers.len(), 2);
732
733 let new_container = current_containers
734 .iter()
735 .find(|c| c.name == busybox_eph.name)
736 .expect("could find myephemeralcontainer2");
737
738 assert_eq!(new_container.image, busybox_eph.image);
741 assert_eq!(new_container.command, busybox_eph.command);
742
743 let expected_containers = current_containers;
746
747 let current_containers = pods
748 .get_ephemeral_containers(&pod.name_any())
749 .await?
750 .spec
751 .unwrap()
752 .ephemeral_containers
753 .unwrap();
754
755 assert_eq!(current_containers, expected_containers);
756
757 pods.delete(&pod.name_any(), &DeleteParams::default())
758 .await?
759 .map_left(|pdel| {
760 assert_eq!(pdel.name_any(), pod.name_any());
761 });
762
763 Ok(())
764 }
765
766 #[tokio::test]
767 #[ignore = "needs kubelet debug methods"]
768 #[cfg(feature = "kubelet-debug")]
769 async fn pod_can_exec_and_write_to_stdin_from_node_proxy() -> Result<(), Box<dyn std::error::Error>> {
770 use crate::{
771 api::{DeleteParams, ListParams, Patch, PatchParams, WatchEvent},
772 core::kubelet_debug::KubeletDebugParams,
773 };
774
775 let client = Client::try_default().await?;
776 let pods: Api<Pod> = Api::default_namespaced(client);
777
778 let p: Pod = serde_json::from_value(json!({
780 "apiVersion": "v1",
781 "kind": "Pod",
782 "metadata": {
783 "name": "busybox-kube2",
784 "labels": { "app": "kube-rs-test" },
785 },
786 "spec": {
787 "terminationGracePeriodSeconds": 1,
788 "restartPolicy": "Never",
789 "containers": [{
790 "name": "busybox",
791 "image": "busybox:1.34.1",
792 "command": ["sh", "-c", "sleep 30"],
793 }],
794 }
795 }))?;
796
797 match pods.create(&Default::default(), &p).await {
798 Ok(o) => assert_eq!(p.name_unchecked(), o.name_unchecked()),
799 Err(crate::Error::Api(ae)) => assert_eq!(ae.code, 409), Err(e) => return Err(e.into()), }
802
803 let wp = WatchParams::default()
806 .fields(&format!("metadata.name={}", "busybox-kube2"))
807 .timeout(15);
808 let mut stream = pods.watch(&wp, "0").await?.boxed();
809 while let Some(ev) = stream.try_next().await? {
810 match ev {
811 WatchEvent::Modified(o) => {
812 let s = o.status.as_ref().expect("status exists on pod");
813 let phase = s.phase.clone().unwrap_or_default();
814 if phase == "Running" {
815 break;
816 }
817 }
818 WatchEvent::Error(e) => panic!("watch error: {e}"),
819 _ => {}
820 }
821 }
822
823 let mut config = Config::infer().await?;
824 config.accept_invalid_certs = true;
825 config.cluster_url = "https://localhost:10250".to_string().parse::<Uri>().unwrap();
826 let kubelet_client: Client = config.try_into()?;
827
828 {
830 let mut attached = kubelet_client
831 .kubelet_node_exec(
832 &KubeletDebugParams {
833 name: "busybox-kube2",
834 namespace: "default",
835 ..Default::default()
836 },
837 "busybox",
838 vec!["sh", "-c", "for i in $(seq 1 3); do echo $i; done"],
839 &AttachParams::default().stderr(false),
840 )
841 .await?;
842 let stdout = tokio_util::io::ReaderStream::new(attached.stdout().unwrap());
843 let out = stdout
844 .filter_map(|r| async { r.ok().and_then(|v| String::from_utf8(v.to_vec()).ok()) })
845 .collect::<Vec<_>>()
846 .await
847 .join("");
848 attached.join().await.unwrap();
849 assert_eq!(out.lines().count(), 3);
850 assert_eq!(out, "1\n2\n3\n");
851 }
852
853 {
855 use tokio::io::AsyncWriteExt;
856 let mut attached = kubelet_client
857 .kubelet_node_exec(
858 &KubeletDebugParams {
859 name: "busybox-kube2",
860 namespace: "default",
861 ..Default::default()
862 },
863 "busybox",
864 vec!["sh"],
865 &AttachParams::default().stdin(true).stderr(false),
866 )
867 .await?;
868 let mut stdin_writer = attached.stdin().unwrap();
869 let mut stdout_stream = tokio_util::io::ReaderStream::new(attached.stdout().unwrap());
870 let next_stdout = stdout_stream.next();
871 stdin_writer.write_all(b"echo test string 1\n").await?;
872 let stdout = String::from_utf8(next_stdout.await.unwrap().unwrap().to_vec()).unwrap();
873 println!("{stdout}");
874 assert_eq!(stdout, "test string 1\n");
875
876 stdin_writer.write_all(b"exit 1\n").await?;
879 let status = attached.take_status().unwrap();
880 if let Some(status) = status.await {
881 println!("{status:?}");
882 assert_eq!(status.status, Some("Failure".to_owned()));
883 assert_eq!(status.reason, Some("NonZeroExitCode".to_owned()));
884 }
885 }
886
887 let dp = DeleteParams::default();
889 pods.delete("busybox-kube2", &dp).await?.map_left(|pdel| {
890 assert_eq!(pdel.name_unchecked(), "busybox-kube2");
891 });
892
893 Ok(())
894 }
895}