kube_runtime/
wait.rs

1//! Waits for objects to reach desired states
2use std::{future, pin::pin};
3
4use futures::TryStreamExt;
5use kube_client::{Api, Resource};
6use serde::de::DeserializeOwned;
7use std::fmt::Debug;
8use thiserror::Error;
9
10use crate::watcher::{self, watch_object};
11
12#[derive(Debug, Error)]
13pub enum Error {
14    #[error("failed to probe for whether the condition is fulfilled yet: {0}")]
15    ProbeFailed(#[source] watcher::Error),
16}
17
18/// Watch an object, and wait for some condition `cond` to return `true`.
19///
20/// `cond` is passed `Some` if the object is found, otherwise `None`.
21///
22/// The object is returned when the condition is fulfilled.
23///
24/// # Caveats
25///
26/// Keep in mind that the condition is typically fulfilled by an external service, which might not even be available. `await_condition`
27/// does *not* automatically add a timeout. If this is desired, wrap it in [`tokio::time::timeout`].
28///
29/// # Errors
30///
31/// Fails if the type is not known to the Kubernetes API, or if the [`Api`] does not have
32/// permission to `watch` and `list` it.
33///
34/// Does *not* fail if the object is not found.
35///
36/// # Usage
37///
38/// ```
39/// use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
40/// use kube::{Api, runtime::wait::{await_condition, conditions}};
41/// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
42/// # let client: kube::Client = todo!();
43///
44/// let crds: Api<CustomResourceDefinition> = Api::all(client);
45/// // .. create or apply a crd here ..
46/// let establish = await_condition(crds, "foos.clux.dev", conditions::is_crd_established());
47/// let _ = tokio::time::timeout(std::time::Duration::from_secs(10), establish).await?;
48/// # Ok(())
49/// # }
50/// ```
51#[allow(clippy::missing_panics_doc)] // watch never actually terminates, expect cannot fail
52pub async fn await_condition<K>(api: Api<K>, name: &str, cond: impl Condition<K>) -> Result<Option<K>, Error>
53where
54    K: Clone + Debug + Send + DeserializeOwned + Resource + 'static,
55{
56    // Skip updates until the condition is satisfied.
57    let mut stream = pin!(watch_object(api, name).try_skip_while(|obj| {
58        let matches = cond.matches_object(obj.as_ref());
59        future::ready(Ok(!matches))
60    }));
61
62    // Then take the first update that satisfies the condition.
63    let obj = stream
64        .try_next()
65        .await
66        .map_err(Error::ProbeFailed)?
67        .expect("stream must not terminate");
68    Ok(obj)
69}
70
71/// A trait for condition functions to be used by [`await_condition`]
72///
73/// Note that this is auto-implemented for functions of type `fn(Option<&K>) -> bool`.
74///
75/// # Usage
76///
77/// ```
78/// use kube::runtime::wait::Condition;
79/// use k8s_openapi::api::core::v1::Pod;
80/// fn my_custom_condition(my_cond: &str) -> impl Condition<Pod> + '_ {
81///     move |obj: Option<&Pod>| {
82///         if let Some(pod) = &obj {
83///             if let Some(status) = &pod.status {
84///                 if let Some(conds) = &status.conditions {
85///                     if let Some(pcond) = conds.iter().find(|c| c.type_ == my_cond) {
86///                         return pcond.status == "True";
87///                     }
88///                 }
89///             }
90///         }
91///         false
92///     }
93/// }
94/// ```
95pub trait Condition<K> {
96    fn matches_object(&self, obj: Option<&K>) -> bool;
97
98    /// Returns a `Condition` that holds if `self` does not
99    ///
100    /// # Usage
101    ///
102    /// ```
103    /// # use kube_runtime::wait::Condition;
104    /// let condition: fn(Option<&()>) -> bool = |_| true;
105    /// assert!(condition.matches_object(None));
106    /// assert!(!condition.not().matches_object(None));
107    /// ```
108    fn not(self) -> conditions::Not<Self>
109    where
110        Self: Sized,
111    {
112        conditions::Not(self)
113    }
114
115    /// Returns a `Condition` that holds if `self` and `other` both do
116    ///
117    /// # Usage
118    ///
119    /// ```
120    /// # use kube_runtime::wait::Condition;
121    /// let cond_false: fn(Option<&()>) -> bool = |_| false;
122    /// let cond_true: fn(Option<&()>) -> bool = |_| true;
123    /// assert!(!cond_false.and(cond_false).matches_object(None));
124    /// assert!(!cond_false.and(cond_true).matches_object(None));
125    /// assert!(!cond_true.and(cond_false).matches_object(None));
126    /// assert!(cond_true.and(cond_true).matches_object(None));
127    /// ```
128    fn and<Other: Condition<K>>(self, other: Other) -> conditions::And<Self, Other>
129    where
130        Self: Sized,
131    {
132        conditions::And(self, other)
133    }
134
135    /// Returns a `Condition` that holds if either `self` or `other` does
136    ///
137    /// # Usage
138    ///
139    /// ```
140    /// # use kube_runtime::wait::Condition;
141    /// let cond_false: fn(Option<&()>) -> bool = |_| false;
142    /// let cond_true: fn(Option<&()>) -> bool = |_| true;
143    /// assert!(!cond_false.or(cond_false).matches_object(None));
144    /// assert!(cond_false.or(cond_true).matches_object(None));
145    /// assert!(cond_true.or(cond_false).matches_object(None));
146    /// assert!(cond_true.or(cond_true).matches_object(None));
147    /// ```
148    fn or<Other: Condition<K>>(self, other: Other) -> conditions::Or<Self, Other>
149    where
150        Self: Sized,
151    {
152        conditions::Or(self, other)
153    }
154}
155
156impl<K, F: Fn(Option<&K>) -> bool> Condition<K> for F {
157    fn matches_object(&self, obj: Option<&K>) -> bool {
158        (self)(obj)
159    }
160}
161
162/// Common conditions to wait for
163pub mod conditions {
164    pub use super::Condition;
165    use k8s_openapi::{
166        api::{
167            apps::v1::Deployment,
168            batch::v1::Job,
169            core::v1::{Pod, Service},
170            networking::v1::Ingress,
171        },
172        apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition,
173    };
174    use kube_client::Resource;
175
176    /// An await condition that returns `true` once the object has been deleted.
177    ///
178    /// An object is considered to be deleted if the object can no longer be found, or if its
179    /// [`uid`](kube_client::api::ObjectMeta#structfield.uid) changes. This means that an object is considered to be deleted even if we miss
180    /// the deletion event and the object is recreated in the meantime.
181    #[must_use]
182    pub fn is_deleted<K: Resource>(uid: &str) -> impl Condition<K> + '_ {
183        move |obj: Option<&K>| {
184            // NB: Object not found implies success.
185            obj.is_none_or(
186                // Object is found, but a changed uid would mean that it was deleted and recreated
187                |obj| obj.meta().uid.as_deref() != Some(uid),
188            )
189        }
190    }
191
192    /// An await condition for `CustomResourceDefinition` that returns `true` once it has been accepted and established
193    ///
194    /// Note that this condition only guarantees you that you can use `Api<CustomResourceDefinition>` when it is ready.
195    /// It usually takes extra time for Discovery to notice the custom resource, and there is no condition for this.
196    #[must_use]
197    pub fn is_crd_established() -> impl Condition<CustomResourceDefinition> {
198        |obj: Option<&CustomResourceDefinition>| {
199            if let Some(o) = obj {
200                if let Some(s) = &o.status {
201                    if let Some(conds) = &s.conditions {
202                        if let Some(pcond) = conds.iter().find(|c| c.type_ == "Established") {
203                            return pcond.status == "True";
204                        }
205                    }
206                }
207            }
208            false
209        }
210    }
211
212    /// An await condition for `Pod` that returns `true` once it is running
213    #[must_use]
214    pub fn is_pod_running() -> impl Condition<Pod> {
215        |obj: Option<&Pod>| {
216            if let Some(pod) = &obj {
217                if let Some(status) = &pod.status {
218                    if let Some(phase) = &status.phase {
219                        return phase == "Running";
220                    }
221                }
222            }
223            false
224        }
225    }
226
227    /// An await condition for `Job` that returns `true` once it is completed
228    #[must_use]
229    pub fn is_job_completed() -> impl Condition<Job> {
230        |obj: Option<&Job>| {
231            if let Some(job) = &obj {
232                if let Some(s) = &job.status {
233                    if let Some(conds) = &s.conditions {
234                        if let Some(pcond) = conds.iter().find(|c| c.type_ == "Complete") {
235                            return pcond.status == "True";
236                        }
237                    }
238                }
239            }
240            false
241        }
242    }
243
244    /// An await condition for `Deployment` that returns `true` once the latest deployment has completed
245    ///
246    /// This looks for the condition that Kubernetes sets for completed deployments:
247    /// <https://kubernetes.io/docs/concepts/workloads/controllers/deployment/#complete-deployment>
248    #[must_use]
249    pub fn is_deployment_completed() -> impl Condition<Deployment> {
250        |obj: Option<&Deployment>| {
251            if let Some(depl) = &obj {
252                if let Some(s) = &depl.status {
253                    if let Some(conds) = &s.conditions {
254                        if let Some(dcond) = conds.iter().find(|c| {
255                            c.type_ == "Progressing" && c.reason == Some("NewReplicaSetAvailable".to_string())
256                        }) {
257                            return dcond.status == "True";
258                        }
259                    }
260                }
261            }
262            false
263        }
264    }
265
266    /// An await condition for `Service`s of type `LoadBalancer` that returns `true` once the backing load balancer has an external IP or hostname
267    #[must_use]
268    pub fn is_service_loadbalancer_provisioned() -> impl Condition<Service> {
269        |obj: Option<&Service>| {
270            if let Some(svc) = &obj {
271                // ignore services that are not type LoadBalancer (return true immediately)
272                if let Some(spec) = &svc.spec {
273                    if spec.type_ != Some("LoadBalancer".to_string()) {
274                        return true;
275                    }
276                    // carry on if this is a LoadBalancer service
277                    if let Some(s) = &svc.status {
278                        if let Some(lbs) = &s.load_balancer {
279                            if let Some(ings) = &lbs.ingress {
280                                return ings.iter().all(|ip| ip.ip.is_some() || ip.hostname.is_some());
281                            }
282                        }
283                    }
284                }
285            }
286            false
287        }
288    }
289
290    /// An await condition for `Ingress` that returns `true` once the backing load balancer has an external IP or hostname
291    #[must_use]
292    pub fn is_ingress_provisioned() -> impl Condition<Ingress> {
293        |obj: Option<&Ingress>| {
294            if let Some(ing) = &obj {
295                if let Some(s) = &ing.status {
296                    if let Some(lbs) = &s.load_balancer {
297                        if let Some(ings) = &lbs.ingress {
298                            return ings.iter().all(|ip| ip.ip.is_some() || ip.hostname.is_some());
299                        }
300                    }
301                }
302            }
303            false
304        }
305    }
306
307    /// See [`Condition::not`]
308    #[derive(Copy, Clone, Debug, PartialEq, Eq)]
309    pub struct Not<A>(pub(super) A);
310    impl<A: Condition<K>, K> Condition<K> for Not<A> {
311        fn matches_object(&self, obj: Option<&K>) -> bool {
312            !self.0.matches_object(obj)
313        }
314    }
315
316    /// See [`Condition::and`]
317    #[derive(Copy, Clone, Debug, PartialEq, Eq)]
318    pub struct And<A, B>(pub(super) A, pub(super) B);
319    impl<A, B, K> Condition<K> for And<A, B>
320    where
321        A: Condition<K>,
322        B: Condition<K>,
323    {
324        fn matches_object(&self, obj: Option<&K>) -> bool {
325            self.0.matches_object(obj) && self.1.matches_object(obj)
326        }
327    }
328
329    /// See [`Condition::or`]
330    #[derive(Copy, Clone, Debug, PartialEq, Eq)]
331    pub struct Or<A, B>(pub(super) A, pub(super) B);
332    impl<A, B, K> Condition<K> for Or<A, B>
333    where
334        A: Condition<K>,
335        B: Condition<K>,
336    {
337        fn matches_object(&self, obj: Option<&K>) -> bool {
338            self.0.matches_object(obj) || self.1.matches_object(obj)
339        }
340    }
341
342    mod tests {
343        #[test]
344        /// pass when CRD is established
345        fn crd_established_ok() {
346            use super::{is_crd_established, Condition};
347
348            let crd = r#"
349                apiVersion: apiextensions.k8s.io/v1
350                kind: CustomResourceDefinition
351                metadata:
352                  name: testthings.kube.rs
353                spec:
354                  group: kube.rs
355                  names:
356                    categories: []
357                    kind: TestThing
358                    plural: testthings
359                    shortNames: []
360                    singular: testthing
361                  scope: Namespaced
362                  versions:
363                    - additionalPrinterColumns: []
364                      name: v1
365                      schema:
366                        openAPIV3Schema:
367                          type: object
368                          x-kubernetes-preserve-unknown-fields: true
369                      served: true
370                      storage: true
371                status:
372                  acceptedNames:
373                    kind: TestThing
374                    listKind: TestThingList
375                    plural: testthings
376                    singular: testthing
377                  conditions:
378                    - lastTransitionTime: "2025-03-06T03:10:03Z"
379                      message: no conflicts found
380                      reason: NoConflicts
381                      status: "True"
382                      type: NamesAccepted
383                    - lastTransitionTime: "2025-03-06T03:10:03Z"
384                      message: the initial names have been accepted
385                      reason: InitialNamesAccepted
386                      status: "True"
387                      type: Established
388                storedVersions:
389                  - v1
390            "#;
391
392            let c = serde_yaml::from_str(crd).unwrap();
393            assert!(is_crd_established().matches_object(Some(&c)))
394        }
395
396        #[test]
397        /// fail when CRD is not yet ready
398        fn crd_established_fail() {
399            use super::{is_crd_established, Condition};
400
401            let crd = r#"
402                apiVersion: apiextensions.k8s.io/v1
403                kind: CustomResourceDefinition
404                metadata:
405                  name: testthings.kube.rs
406                spec:
407                  group: kube.rs
408                  names:
409                    categories: []
410                    kind: TestThing
411                    plural: testthings
412                    shortNames: []
413                    singular: testthing
414                  scope: Namespaced
415                  versions:
416                    - additionalPrinterColumns: []
417                      name: v1
418                      schema:
419                        openAPIV3Schema:
420                          type: object
421                          x-kubernetes-preserve-unknown-fields: true
422                      served: true
423                      storage: true
424                status:
425                  acceptedNames:
426                    kind: TestThing
427                    listKind: TestThingList
428                    plural: testthings
429                    singular: testthing
430                  conditions:
431                    - lastTransitionTime: "2025-03-06T03:10:03Z"
432                      message: no conflicts found
433                      reason: NoConflicts
434                      status: "True"
435                      type: NamesAccepted
436                    - lastTransitionTime: "2025-03-06T03:10:03Z"
437                      message: the initial names have been accepted
438                      reason: InitialNamesAccepted
439                      status: "False"
440                      type: Established
441                storedVersions:
442                  - v1
443            "#;
444
445            let c = serde_yaml::from_str(crd).unwrap();
446            assert!(!is_crd_established().matches_object(Some(&c)))
447        }
448
449        #[test]
450        /// fail when CRD does not exist
451        fn crd_established_missing() {
452            use super::{is_crd_established, Condition};
453
454            assert!(!is_crd_established().matches_object(None))
455        }
456
457        #[test]
458        /// pass when pod is running
459        fn pod_running_ok() {
460            use super::{is_pod_running, Condition};
461
462            let pod = r#"
463                apiVersion: v1
464                kind: Pod
465                metadata:
466                  namespace: default
467                  name: testpod
468                spec:
469                  containers:
470                    - name: testcontainer
471                      image: alpine
472                      command: [ sleep ]
473                      args: [ "100000" ]
474                status:
475                  conditions:
476                    - lastProbeTime: null
477                      lastTransitionTime: "2025-03-06T03:53:07Z"
478                      status: "True"
479                      type: PodReadyToStartContainers
480                    - lastProbeTime: null
481                      lastTransitionTime: "2025-03-06T03:52:58Z"
482                      status: "True"
483                      type: Initialized
484                    - lastProbeTime: null
485                      lastTransitionTime: "2025-03-06T03:53:24Z"
486                      status: "True"
487                      type: Ready
488                    - lastProbeTime: null
489                      lastTransitionTime: "2025-03-06T03:53:24Z"
490                      status: "True"
491                      type: ContainersReady
492                    - lastProbeTime: null
493                      lastTransitionTime: "2025-03-06T03:52:58Z"
494                      status: "True"
495                      type: PodScheduled
496                  containerStatuses:
497                    - containerID: containerd://598323380ae59d60c1ab98f9091c94659137a976d52136a8083775d47fea5875
498                      image: docker.io/library/alpine:latest
499                      imageID: docker.io/library/alpine@sha256:a8560b36e8b8210634f77d9f7f9efd7ffa463e380b75e2e74aff4511df3ef88c
500                      lastState: {}
501                      name: testcontainer
502                      ready: true
503                      restartCount: 0
504                      started: true
505                      state:
506                        running:
507                          startedAt: "2025-03-06T03:59:20Z"
508                  phase: Running
509                  qosClass: Burstable
510            "#;
511
512            let p = serde_yaml::from_str(pod).unwrap();
513            assert!(is_pod_running().matches_object(Some(&p)))
514        }
515
516        #[test]
517        /// fail if pod is unschedulable
518        fn pod_running_unschedulable() {
519            use super::{is_pod_running, Condition};
520
521            let pod = r#"
522                apiVersion: v1
523                kind: Pod
524                metadata:
525                  namespace: default
526                  name: testpod
527                spec:
528                  containers:
529                    - name: testcontainer
530                      image: alpine
531                      command: [ sleep ]
532                      args: [ "100000" ]
533                status:
534                  conditions:
535                    - lastProbeTime: null
536                      lastTransitionTime: "2025-03-06T03:52:25Z"
537                      message: '0/1 nodes are available: 1 node(s) were unschedulable. preemption: 0/1
538                      nodes are available: 1 Preemption is not helpful for scheduling.'
539                      reason: Unschedulable
540                      status: "False"
541                      type: PodScheduled
542                  phase: Pending
543                  qosClass: Burstable
544            "#;
545
546            let p = serde_yaml::from_str(pod).unwrap();
547            assert!(!is_pod_running().matches_object(Some(&p)))
548        }
549
550        #[test]
551        /// fail if pod does not exist
552        fn pod_running_missing() {
553            use super::{is_pod_running, Condition};
554
555            assert!(!is_pod_running().matches_object(None))
556        }
557
558        #[test]
559        /// pass if job completed
560        fn job_completed_ok() {
561            use super::{is_job_completed, Condition};
562
563            let job = r#"
564                apiVersion: batch/v1
565                kind: Job
566                metadata:
567                  name: pi
568                  namespace: default
569                spec:
570                  template:
571                    spec:
572                      containers:
573                      - name: pi
574                        command:
575                        - perl
576                        - -Mbignum=bpi
577                        - -wle
578                        - print bpi(2000)
579                        image: perl:5.34.0
580                        imagePullPolicy: IfNotPresent
581                status:
582                  completionTime: "2025-03-06T05:27:56Z"
583                  conditions:
584                  - lastProbeTime: "2025-03-06T05:27:56Z"
585                    lastTransitionTime: "2025-03-06T05:27:56Z"
586                    message: Reached expected number of succeeded pods
587                    reason: CompletionsReached
588                    status: "True"
589                    type: SuccessCriteriaMet
590                  - lastProbeTime: "2025-03-06T05:27:56Z"
591                    lastTransitionTime: "2025-03-06T05:27:56Z"
592                    message: Reached expected number of succeeded pods
593                    reason: CompletionsReached
594                    status: "True"
595                    type: Complete
596                  ready: 0
597                  startTime: "2025-03-06T05:27:27Z"
598                  succeeded: 1
599                  terminating: 0
600                  uncountedTerminatedPods: {}
601            "#;
602
603            let j = serde_yaml::from_str(job).unwrap();
604            assert!(is_job_completed().matches_object(Some(&j)))
605        }
606
607        #[test]
608        /// fail if job is still in progress
609        fn job_completed_running() {
610            use super::{is_job_completed, Condition};
611
612            let job = r#"
613                apiVersion: batch/v1
614                kind: Job
615                metadata:
616                  name: pi
617                  namespace: default
618                spec:
619                  backoffLimit: 4
620                  completionMode: NonIndexed
621                  completions: 1
622                  manualSelector: false
623                  parallelism: 1
624                  template:
625                    spec:
626                      containers:
627                      - name: pi
628                        command:
629                        - perl
630                        - -Mbignum=bpi
631                        - -wle
632                        - print bpi(2000)
633                        image: perl:5.34.0
634                        imagePullPolicy: IfNotPresent
635                status:
636                  active: 1
637                  ready: 0
638                  startTime: "2025-03-06T05:27:27Z"
639                  terminating: 0
640                  uncountedTerminatedPods: {}
641            "#;
642
643            let j = serde_yaml::from_str(job).unwrap();
644            assert!(!is_job_completed().matches_object(Some(&j)))
645        }
646
647        #[test]
648        /// fail if job does not exist
649        fn job_completed_missing() {
650            use super::{is_job_completed, Condition};
651
652            assert!(!is_job_completed().matches_object(None))
653        }
654
655        #[test]
656        /// pass when deployment has been fully rolled out
657        fn deployment_completed_ok() {
658            use super::{is_deployment_completed, Condition};
659
660            let depl = r#"
661                apiVersion: apps/v1
662                kind: Deployment
663                metadata:
664                  name: testapp
665                  namespace: default
666                spec:
667                  progressDeadlineSeconds: 600
668                  replicas: 3
669                  revisionHistoryLimit: 10
670                  selector:
671                    matchLabels:
672                      app: test
673                  strategy:
674                    rollingUpdate:
675                      maxSurge: 25%
676                      maxUnavailable: 25%
677                    type: RollingUpdate
678                  template:
679                    metadata:
680                      creationTimestamp: null
681                      labels:
682                        app: test
683                    spec:
684                      containers:
685                      - image: postgres
686                        imagePullPolicy: Always
687                        name: postgres
688                        ports:
689                        - containerPort: 5432
690                          protocol: TCP
691                        env:
692                        - name: POSTGRES_PASSWORD
693                          value: foobar
694                status:
695                  availableReplicas: 3
696                  conditions:
697                  - lastTransitionTime: "2025-03-06T06:06:57Z"
698                    lastUpdateTime: "2025-03-06T06:06:57Z"
699                    message: Deployment has minimum availability.
700                    reason: MinimumReplicasAvailable
701                    status: "True"
702                    type: Available
703                  - lastTransitionTime: "2025-03-06T06:03:20Z"
704                    lastUpdateTime: "2025-03-06T06:06:57Z"
705                    message: ReplicaSet "testapp-7fcd4b58c9" has successfully progressed.
706                    reason: NewReplicaSetAvailable
707                    status: "True"
708                    type: Progressing
709                  observedGeneration: 2
710                  readyReplicas: 3
711                  replicas: 3
712                  updatedReplicas: 3
713            "#;
714
715            let d = serde_yaml::from_str(depl).unwrap();
716            assert!(is_deployment_completed().matches_object(Some(&d)))
717        }
718
719        #[test]
720        /// fail if deployment update is still rolling out
721        fn deployment_completed_pending() {
722            use super::{is_deployment_completed, Condition};
723
724            let depl = r#"
725                apiVersion: apps/v1
726                kind: Deployment
727                metadata:
728                  name: testapp
729                  namespace: default
730                spec:
731                  progressDeadlineSeconds: 600
732                  replicas: 3
733                  revisionHistoryLimit: 10
734                  selector:
735                    matchLabels:
736                      app: test
737                  strategy:
738                    rollingUpdate:
739                      maxSurge: 25%
740                      maxUnavailable: 25%
741                    type: RollingUpdate
742                  template:
743                    metadata:
744                      creationTimestamp: null
745                      labels:
746                        app: test
747                    spec:
748                      containers:
749                      - image: postgres
750                        imagePullPolicy: Always
751                        name: postgres
752                        ports:
753                        - containerPort: 5432
754                          protocol: TCP
755                        env:
756                        - name: POSTGRES_PASSWORD
757                          value: foobar
758                status:
759                  conditions:
760                  - lastTransitionTime: "2025-03-06T06:03:20Z"
761                    lastUpdateTime: "2025-03-06T06:03:20Z"
762                    message: Deployment does not have minimum availability.
763                    reason: MinimumReplicasUnavailable
764                    status: "False"
765                    type: Available
766                  - lastTransitionTime: "2025-03-06T06:03:20Z"
767                    lastUpdateTime: "2025-03-06T06:03:20Z"
768                    message: ReplicaSet "testapp-77789cd7d4" is progressing.
769                    reason: ReplicaSetUpdated
770                    status: "True"
771                    type: Progressing
772                  observedGeneration: 1
773                  replicas: 3
774                  unavailableReplicas: 3
775                  updatedReplicas: 3
776            "#;
777
778            let d = serde_yaml::from_str(depl).unwrap();
779            assert!(!is_deployment_completed().matches_object(Some(&d)))
780        }
781
782        #[test]
783        /// fail if deployment does not exist
784        fn deployment_completed_missing() {
785            use super::{is_deployment_completed, Condition};
786
787            assert!(!is_deployment_completed().matches_object(None))
788        }
789
790        #[test]
791        /// pass if loadbalancer service has recieved a loadbalancer IP
792        fn service_lb_provisioned_ok_ip() {
793            use super::{is_service_loadbalancer_provisioned, Condition};
794
795            let service = r"
796                apiVersion: v1
797                kind: Service
798                metadata:
799                  name: test
800                spec:
801                  selector:
802                    app.kubernetes.io/name: test
803                  type: LoadBalancer
804                  ports:
805                    - protocol: TCP
806                      port: 80
807                      targetPort: 9376
808                  clusterIP: 10.0.171.239
809                status:
810                  loadBalancer:
811                    ingress:
812                      - ip: 192.0.2.127
813            ";
814
815            let s = serde_yaml::from_str(service).unwrap();
816            assert!(is_service_loadbalancer_provisioned().matches_object(Some(&s)))
817        }
818
819        #[test]
820        /// pass if loadbalancer service has recieved a loadbalancer hostname
821        fn service_lb_provisioned_ok_hostname() {
822            use super::{is_service_loadbalancer_provisioned, Condition};
823
824            let service = r"
825                apiVersion: v1
826                kind: Service
827                metadata:
828                  name: test
829                spec:
830                  selector:
831                    app.kubernetes.io/name: test
832                  type: LoadBalancer
833                  ports:
834                    - protocol: TCP
835                      port: 80
836                      targetPort: 9376
837                  clusterIP: 10.0.171.239
838                status:
839                  loadBalancer:
840                    ingress:
841                      - hostname: example.exposed.service
842            ";
843
844            let s = serde_yaml::from_str(service).unwrap();
845            assert!(is_service_loadbalancer_provisioned().matches_object(Some(&s)))
846        }
847
848        #[test]
849        /// fail if loadbalancer service is still waiting for a LB
850        fn service_lb_provisioned_pending() {
851            use super::{is_service_loadbalancer_provisioned, Condition};
852
853            let service = r"
854                apiVersion: v1
855                kind: Service
856                metadata:
857                  name: test
858                spec:
859                  selector:
860                    app.kubernetes.io/name: test
861                  type: LoadBalancer
862                  ports:
863                    - protocol: TCP
864                      port: 80
865                      targetPort: 9376
866                  clusterIP: 10.0.171.239
867                status:
868                  loadBalancer: {}
869            ";
870
871            let s = serde_yaml::from_str(service).unwrap();
872            assert!(!is_service_loadbalancer_provisioned().matches_object(Some(&s)))
873        }
874
875        #[test]
876        /// pass if service is not a loadbalancer
877        fn service_lb_provisioned_not_loadbalancer() {
878            use super::{is_service_loadbalancer_provisioned, Condition};
879
880            let service = r"
881                apiVersion: v1
882                kind: Service
883                metadata:
884                  name: test
885                spec:
886                  selector:
887                    app.kubernetes.io/name: test
888                  type: ClusterIP
889                  ports:
890                    - protocol: TCP
891                      port: 80
892                      targetPort: 9376
893                status:
894                  loadBalancer: {}
895            ";
896
897            let s = serde_yaml::from_str(service).unwrap();
898            assert!(is_service_loadbalancer_provisioned().matches_object(Some(&s)))
899        }
900
901        #[test]
902        /// fail if service does not exist
903        fn service_lb_provisioned_missing() {
904            use super::{is_service_loadbalancer_provisioned, Condition};
905
906            assert!(!is_service_loadbalancer_provisioned().matches_object(None))
907        }
908
909        #[test]
910        /// pass when ingress has recieved a loadbalancer IP
911        fn ingress_provisioned_ok_ip() {
912            use super::{is_ingress_provisioned, Condition};
913
914            let ingress = r#"
915                apiVersion: networking.k8s.io/v1
916                kind: Ingress
917                metadata:
918                  name: test
919                  namespace: default
920                  resourceVersion: "1401"
921                  uid: d653ee4d-0adb-40d9-b03c-7f84f35d4a67
922                spec:
923                  ingressClassName: nginx
924                  rules:
925                  - host: httpbin.local
926                    http:
927                      paths:
928                      - path: /
929                        backend:
930                          service:
931                            name: httpbin
932                            port:
933                              number: 80
934                status:
935                  loadBalancer:
936                    ingress:
937                      - ip: 10.89.7.3
938            "#;
939
940            let i = serde_yaml::from_str(ingress).unwrap();
941            assert!(is_ingress_provisioned().matches_object(Some(&i)))
942        }
943
944        #[test]
945        /// pass when ingress has recieved a loadbalancer hostname
946        fn ingress_provisioned_ok_hostname() {
947            use super::{is_ingress_provisioned, Condition};
948
949            let ingress = r#"
950                apiVersion: networking.k8s.io/v1
951                kind: Ingress
952                metadata:
953                  name: test
954                  namespace: default
955                  resourceVersion: "1401"
956                  uid: d653ee4d-0adb-40d9-b03c-7f84f35d4a67
957                spec:
958                  ingressClassName: nginx
959                  rules:
960                  - host: httpbin.local
961                    http:
962                      paths:
963                      - path: /
964                        backend:
965                          service:
966                            name: httpbin
967                            port:
968                              number: 80
969                status:
970                  loadBalancer:
971                    ingress:
972                      - hostname: example.exposed.service
973            "#;
974
975            let i = serde_yaml::from_str(ingress).unwrap();
976            assert!(is_ingress_provisioned().matches_object(Some(&i)))
977        }
978
979        #[test]
980        /// fail if ingress is still waiting for a LB
981        fn ingress_provisioned_pending() {
982            use super::{is_ingress_provisioned, Condition};
983
984            let ingress = r#"
985                apiVersion: networking.k8s.io/v1
986                kind: Ingress
987                metadata:
988                  name: test
989                  namespace: default
990                  resourceVersion: "1401"
991                  uid: d653ee4d-0adb-40d9-b03c-7f84f35d4a67
992                spec:
993                  ingressClassName: nginx
994                  rules:
995                  - host: httpbin.local
996                    http:
997                      paths:
998                      - path: /
999                        backend:
1000                          service:
1001                            name: httpbin
1002                            port:
1003                              number: 80
1004                status:
1005                  loadBalancer: {}
1006            "#;
1007
1008            let i = serde_yaml::from_str(ingress).unwrap();
1009            assert!(!is_ingress_provisioned().matches_object(Some(&i)))
1010        }
1011
1012        #[test]
1013        /// fail if ingress does not exist
1014        fn ingress_provisioned_missing() {
1015            use super::{is_ingress_provisioned, Condition};
1016
1017            assert!(!is_ingress_provisioned().matches_object(None))
1018        }
1019    }
1020}
1021
1022/// Utilities for deleting objects
1023pub mod delete {
1024    use super::{await_condition, conditions};
1025    use kube_client::{api::DeleteParams, Api, Resource};
1026    use serde::de::DeserializeOwned;
1027    use std::fmt::Debug;
1028    use thiserror::Error;
1029
1030    #[derive(Debug, Error)]
1031    pub enum Error {
1032        #[error("deleted object has no UID to wait for")]
1033        NoUid,
1034        #[error("failed to delete object: {0}")]
1035        Delete(#[source] kube_client::Error),
1036        #[error("failed to wait for object to be deleted: {0}")]
1037        Await(#[source] super::Error),
1038    }
1039
1040    /// Delete an object, and wait for it to be removed from the Kubernetes API (including waiting for all finalizers to unregister themselves).
1041    ///
1042    /// # Errors
1043    ///
1044    /// Returns an [`Error`](enum@super::Error) if the object was unable to be deleted, or if the wait was interrupted.
1045    #[allow(clippy::module_name_repetitions)]
1046    pub async fn delete_and_finalize<K: Clone + Debug + Send + DeserializeOwned + Resource + 'static>(
1047        api: Api<K>,
1048        name: &str,
1049        delete_params: &DeleteParams,
1050    ) -> Result<(), Error> {
1051        let deleted_obj_uid = api
1052            .delete(name, delete_params)
1053            .await
1054            .map_err(Error::Delete)?
1055            .either(
1056                |mut obj| obj.meta_mut().uid.take(),
1057                |status| status.details.map(|details| details.uid),
1058            )
1059            .ok_or(Error::NoUid)?;
1060        await_condition(api, name, conditions::is_deleted(&deleted_obj_uid))
1061            .await
1062            .map_err(Error::Await)?;
1063        Ok(())
1064    }
1065}