kube_runtime/
wait.rs

1//! Waits for objects to reach desired states
2use std::{future, pin::pin};
3
4use futures::TryStreamExt;
5use kube_client::{Api, Resource};
6use serde::de::DeserializeOwned;
7use std::fmt::Debug;
8use thiserror::Error;
9
10use crate::watcher::{self, watch_object};
11
12#[derive(Debug, Error)]
13pub enum Error {
14    #[error("failed to probe for whether the condition is fulfilled yet: {0}")]
15    ProbeFailed(#[source] watcher::Error),
16}
17
18/// Watch an object, and wait for some condition `cond` to return `true`.
19///
20/// `cond` is passed `Some` if the object is found, otherwise `None`.
21///
22/// The object is returned when the condition is fulfilled.
23///
24/// # Caveats
25///
26/// Keep in mind that the condition is typically fulfilled by an external service, which might not even be available. `await_condition`
27/// does *not* automatically add a timeout. If this is desired, wrap it in [`tokio::time::timeout`].
28///
29/// # Errors
30///
31/// Fails if the type is not known to the Kubernetes API, or if the [`Api`] does not have
32/// permission to `watch` and `list` it.
33///
34/// Does *not* fail if the object is not found.
35///
36/// # Usage
37///
38/// ```
39/// use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
40/// use kube::{Api, runtime::wait::{await_condition, conditions}};
41/// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
42/// # let client: kube::Client = todo!();
43///
44/// let crds: Api<CustomResourceDefinition> = Api::all(client);
45/// // .. create or apply a crd here ..
46/// let establish = await_condition(crds, "foos.clux.dev", conditions::is_crd_established());
47/// let _ = tokio::time::timeout(std::time::Duration::from_secs(10), establish).await?;
48/// # Ok(())
49/// # }
50/// ```
51#[allow(clippy::missing_panics_doc)] // watch never actually terminates, expect cannot fail
52pub async fn await_condition<K>(api: Api<K>, name: &str, cond: impl Condition<K>) -> Result<Option<K>, Error>
53where
54    K: Clone + Debug + Send + DeserializeOwned + Resource + 'static,
55{
56    // Skip updates until the condition is satisfied.
57    let mut stream = pin!(watch_object(api, name).try_skip_while(|obj| {
58        let matches = cond.matches_object(obj.as_ref());
59        future::ready(Ok(!matches))
60    }));
61
62    // Then take the first update that satisfies the condition.
63    let obj = stream
64        .try_next()
65        .await
66        .map_err(Error::ProbeFailed)?
67        .expect("stream must not terminate");
68    Ok(obj)
69}
70
71/// A trait for condition functions to be used by [`await_condition`]
72///
73/// Note that this is auto-implemented for functions of type `fn(Option<&K>) -> bool`.
74///
75/// # Usage
76///
77/// ```
78/// use kube::runtime::wait::Condition;
79/// use k8s_openapi::api::core::v1::Pod;
80/// fn my_custom_condition(my_cond: &str) -> impl Condition<Pod> + '_ {
81///     move |obj: Option<&Pod>| {
82///         if let Some(pod) = &obj {
83///             if let Some(status) = &pod.status {
84///                 if let Some(conds) = &status.conditions {
85///                     if let Some(pcond) = conds.iter().find(|c| c.type_ == my_cond) {
86///                         return pcond.status == "True";
87///                     }
88///                 }
89///             }
90///         }
91///         false
92///     }
93/// }
94/// ```
95pub trait Condition<K> {
96    fn matches_object(&self, obj: Option<&K>) -> bool;
97
98    /// Returns a `Condition` that holds if `self` does not
99    ///
100    /// # Usage
101    ///
102    /// ```
103    /// # use kube_runtime::wait::Condition;
104    /// let condition: fn(Option<&()>) -> bool = |_| true;
105    /// assert!(condition.matches_object(None));
106    /// assert!(!condition.not().matches_object(None));
107    /// ```
108    fn not(self) -> conditions::Not<Self>
109    where
110        Self: Sized,
111    {
112        conditions::Not(self)
113    }
114
115    /// Returns a `Condition` that holds if `self` and `other` both do
116    ///
117    /// # Usage
118    ///
119    /// ```
120    /// # use kube_runtime::wait::Condition;
121    /// let cond_false: fn(Option<&()>) -> bool = |_| false;
122    /// let cond_true: fn(Option<&()>) -> bool = |_| true;
123    /// assert!(!cond_false.and(cond_false).matches_object(None));
124    /// assert!(!cond_false.and(cond_true).matches_object(None));
125    /// assert!(!cond_true.and(cond_false).matches_object(None));
126    /// assert!(cond_true.and(cond_true).matches_object(None));
127    /// ```
128    fn and<Other: Condition<K>>(self, other: Other) -> conditions::And<Self, Other>
129    where
130        Self: Sized,
131    {
132        conditions::And(self, other)
133    }
134
135    /// Returns a `Condition` that holds if either `self` or `other` does
136    ///
137    /// # Usage
138    ///
139    /// ```
140    /// # use kube_runtime::wait::Condition;
141    /// let cond_false: fn(Option<&()>) -> bool = |_| false;
142    /// let cond_true: fn(Option<&()>) -> bool = |_| true;
143    /// assert!(!cond_false.or(cond_false).matches_object(None));
144    /// assert!(cond_false.or(cond_true).matches_object(None));
145    /// assert!(cond_true.or(cond_false).matches_object(None));
146    /// assert!(cond_true.or(cond_true).matches_object(None));
147    /// ```
148    fn or<Other: Condition<K>>(self, other: Other) -> conditions::Or<Self, Other>
149    where
150        Self: Sized,
151    {
152        conditions::Or(self, other)
153    }
154}
155
156impl<K, F: Fn(Option<&K>) -> bool> Condition<K> for F {
157    fn matches_object(&self, obj: Option<&K>) -> bool {
158        (self)(obj)
159    }
160}
161
162/// Common conditions to wait for
163pub mod conditions {
164    pub use super::Condition;
165    use k8s_openapi::{
166        api::{batch::v1::Job, core::v1::Pod},
167        apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition,
168    };
169    use kube_client::Resource;
170
171    /// An await condition that returns `true` once the object has been deleted.
172    ///
173    /// An object is considered to be deleted if the object can no longer be found, or if its
174    /// [`uid`](kube_client::api::ObjectMeta#structfield.uid) changes. This means that an object is considered to be deleted even if we miss
175    /// the deletion event and the object is recreated in the meantime.
176    #[must_use]
177    pub fn is_deleted<K: Resource>(uid: &str) -> impl Condition<K> + '_ {
178        move |obj: Option<&K>| {
179            obj.map_or(
180                // Object is not found, success!
181                true,
182                // Object is found, but a changed uid would mean that it was deleted and recreated
183                |obj| obj.meta().uid.as_deref() != Some(uid),
184            )
185        }
186    }
187
188    /// An await condition for `CustomResourceDefinition` that returns `true` once it has been accepted and established
189    ///
190    /// Note that this condition only guarantees you that you can use `Api<CustomResourceDefinition>` when it is ready.
191    /// It usually takes extra time for Discovery to notice the custom resource, and there is no condition for this.
192    #[must_use]
193    pub fn is_crd_established() -> impl Condition<CustomResourceDefinition> {
194        |obj: Option<&CustomResourceDefinition>| {
195            if let Some(o) = obj {
196                if let Some(s) = &o.status {
197                    if let Some(conds) = &s.conditions {
198                        if let Some(pcond) = conds.iter().find(|c| c.type_ == "Established") {
199                            return pcond.status == "True";
200                        }
201                    }
202                }
203            }
204            false
205        }
206    }
207
208    /// An await condition for `Pod` that returns `true` once it is running
209    #[must_use]
210    pub fn is_pod_running() -> impl Condition<Pod> {
211        |obj: Option<&Pod>| {
212            if let Some(pod) = &obj {
213                if let Some(status) = &pod.status {
214                    if let Some(phase) = &status.phase {
215                        return phase == "Running";
216                    }
217                }
218            }
219            false
220        }
221    }
222
223    /// An await condition for `Job` that returns `true` once it is completed
224    #[must_use]
225    pub fn is_job_completed() -> impl Condition<Job> {
226        |obj: Option<&Job>| {
227            if let Some(job) = &obj {
228                if let Some(s) = &job.status {
229                    if let Some(conds) = &s.conditions {
230                        if let Some(pcond) = conds.iter().find(|c| c.type_ == "Complete") {
231                            return pcond.status == "True";
232                        }
233                    }
234                }
235            }
236            false
237        }
238    }
239
240    /// See [`Condition::not`]
241    #[derive(Copy, Clone, Debug, PartialEq, Eq)]
242    pub struct Not<A>(pub(super) A);
243    impl<A: Condition<K>, K> Condition<K> for Not<A> {
244        fn matches_object(&self, obj: Option<&K>) -> bool {
245            !self.0.matches_object(obj)
246        }
247    }
248
249    /// See [`Condition::and`]
250    #[derive(Copy, Clone, Debug, PartialEq, Eq)]
251    pub struct And<A, B>(pub(super) A, pub(super) B);
252    impl<A, B, K> Condition<K> for And<A, B>
253    where
254        A: Condition<K>,
255        B: Condition<K>,
256    {
257        fn matches_object(&self, obj: Option<&K>) -> bool {
258            self.0.matches_object(obj) && self.1.matches_object(obj)
259        }
260    }
261
262    /// See [`Condition::or`]
263    #[derive(Copy, Clone, Debug, PartialEq, Eq)]
264    pub struct Or<A, B>(pub(super) A, pub(super) B);
265    impl<A, B, K> Condition<K> for Or<A, B>
266    where
267        A: Condition<K>,
268        B: Condition<K>,
269    {
270        fn matches_object(&self, obj: Option<&K>) -> bool {
271            self.0.matches_object(obj) || self.1.matches_object(obj)
272        }
273    }
274}
275
276/// Utilities for deleting objects
277pub mod delete {
278    use super::{await_condition, conditions};
279    use kube_client::{api::DeleteParams, Api, Resource};
280    use serde::de::DeserializeOwned;
281    use std::fmt::Debug;
282    use thiserror::Error;
283
284    #[derive(Debug, Error)]
285    pub enum Error {
286        #[error("deleted object has no UID to wait for")]
287        NoUid,
288        #[error("failed to delete object: {0}")]
289        Delete(#[source] kube_client::Error),
290        #[error("failed to wait for object to be deleted: {0}")]
291        Await(#[source] super::Error),
292    }
293
294    /// Delete an object, and wait for it to be removed from the Kubernetes API (including waiting for all finalizers to unregister themselves).
295    ///
296    /// # Errors
297    ///
298    /// Returns an [`Error`](enum@super::Error) if the object was unable to be deleted, or if the wait was interrupted.
299    #[allow(clippy::module_name_repetitions)]
300    pub async fn delete_and_finalize<K: Clone + Debug + Send + DeserializeOwned + Resource + 'static>(
301        api: Api<K>,
302        name: &str,
303        delete_params: &DeleteParams,
304    ) -> Result<(), Error> {
305        let deleted_obj_uid = api
306            .delete(name, delete_params)
307            .await
308            .map_err(Error::Delete)?
309            .either(
310                |mut obj| obj.meta_mut().uid.take(),
311                |status| status.details.map(|details| details.uid),
312            )
313            .ok_or(Error::NoUid)?;
314        await_condition(api, name, conditions::is_deleted(&deleted_obj_uid))
315            .await
316            .map_err(Error::Await)?;
317        Ok(())
318    }
319}