kube_runtime/wait.rs
1//! Waits for objects to reach desired states
2use std::{future, pin::pin};
3
4use futures::TryStreamExt;
5use kube_client::{Api, Resource};
6use serde::de::DeserializeOwned;
7use std::fmt::Debug;
8use thiserror::Error;
9
10use crate::watcher::{self, watch_object};
11
12#[derive(Debug, Error)]
13pub enum Error {
14 #[error("failed to probe for whether the condition is fulfilled yet: {0}")]
15 ProbeFailed(#[source] watcher::Error),
16}
17
18/// Watch an object, and wait for some condition `cond` to return `true`.
19///
20/// `cond` is passed `Some` if the object is found, otherwise `None`.
21///
22/// The object is returned when the condition is fulfilled.
23///
24/// # Caveats
25///
26/// Keep in mind that the condition is typically fulfilled by an external service, which might not even be available. `await_condition`
27/// does *not* automatically add a timeout. If this is desired, wrap it in [`tokio::time::timeout`].
28///
29/// # Errors
30///
31/// Fails if the type is not known to the Kubernetes API, or if the [`Api`] does not have
32/// permission to `watch` and `list` it.
33///
34/// Does *not* fail if the object is not found.
35///
36/// # Usage
37///
38/// ```
39/// use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
40/// use kube::{Api, runtime::wait::{await_condition, conditions}};
41/// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
42/// # let client: kube::Client = todo!();
43///
44/// let crds: Api<CustomResourceDefinition> = Api::all(client);
45/// // .. create or apply a crd here ..
46/// let establish = await_condition(crds, "foos.clux.dev", conditions::is_crd_established());
47/// let _ = tokio::time::timeout(std::time::Duration::from_secs(10), establish).await?;
48/// # Ok(())
49/// # }
50/// ```
51#[allow(clippy::missing_panics_doc)] // watch never actually terminates, expect cannot fail
52pub async fn await_condition<K>(api: Api<K>, name: &str, cond: impl Condition<K>) -> Result<Option<K>, Error>
53where
54 K: Clone + Debug + Send + DeserializeOwned + Resource + 'static,
55{
56 // Skip updates until the condition is satisfied.
57 let mut stream = pin!(watch_object(api, name).try_skip_while(|obj| {
58 let matches = cond.matches_object(obj.as_ref());
59 future::ready(Ok(!matches))
60 }));
61
62 // Then take the first update that satisfies the condition.
63 let obj = stream
64 .try_next()
65 .await
66 .map_err(Error::ProbeFailed)?
67 .expect("stream must not terminate");
68 Ok(obj)
69}
70
71/// A trait for condition functions to be used by [`await_condition`]
72///
73/// Note that this is auto-implemented for functions of type `fn(Option<&K>) -> bool`.
74///
75/// # Usage
76///
77/// ```
78/// use kube::runtime::wait::Condition;
79/// use k8s_openapi::api::core::v1::Pod;
80/// fn my_custom_condition(my_cond: &str) -> impl Condition<Pod> + '_ {
81/// move |obj: Option<&Pod>| {
82/// if let Some(pod) = &obj {
83/// if let Some(status) = &pod.status {
84/// if let Some(conds) = &status.conditions {
85/// if let Some(pcond) = conds.iter().find(|c| c.type_ == my_cond) {
86/// return pcond.status == "True";
87/// }
88/// }
89/// }
90/// }
91/// false
92/// }
93/// }
94/// ```
95pub trait Condition<K> {
96 fn matches_object(&self, obj: Option<&K>) -> bool;
97
98 /// Returns a `Condition` that holds if `self` does not
99 ///
100 /// # Usage
101 ///
102 /// ```
103 /// # use kube_runtime::wait::Condition;
104 /// let condition: fn(Option<&()>) -> bool = |_| true;
105 /// assert!(condition.matches_object(None));
106 /// assert!(!condition.not().matches_object(None));
107 /// ```
108 fn not(self) -> conditions::Not<Self>
109 where
110 Self: Sized,
111 {
112 conditions::Not(self)
113 }
114
115 /// Returns a `Condition` that holds if `self` and `other` both do
116 ///
117 /// # Usage
118 ///
119 /// ```
120 /// # use kube_runtime::wait::Condition;
121 /// let cond_false: fn(Option<&()>) -> bool = |_| false;
122 /// let cond_true: fn(Option<&()>) -> bool = |_| true;
123 /// assert!(!cond_false.and(cond_false).matches_object(None));
124 /// assert!(!cond_false.and(cond_true).matches_object(None));
125 /// assert!(!cond_true.and(cond_false).matches_object(None));
126 /// assert!(cond_true.and(cond_true).matches_object(None));
127 /// ```
128 fn and<Other: Condition<K>>(self, other: Other) -> conditions::And<Self, Other>
129 where
130 Self: Sized,
131 {
132 conditions::And(self, other)
133 }
134
135 /// Returns a `Condition` that holds if either `self` or `other` does
136 ///
137 /// # Usage
138 ///
139 /// ```
140 /// # use kube_runtime::wait::Condition;
141 /// let cond_false: fn(Option<&()>) -> bool = |_| false;
142 /// let cond_true: fn(Option<&()>) -> bool = |_| true;
143 /// assert!(!cond_false.or(cond_false).matches_object(None));
144 /// assert!(cond_false.or(cond_true).matches_object(None));
145 /// assert!(cond_true.or(cond_false).matches_object(None));
146 /// assert!(cond_true.or(cond_true).matches_object(None));
147 /// ```
148 fn or<Other: Condition<K>>(self, other: Other) -> conditions::Or<Self, Other>
149 where
150 Self: Sized,
151 {
152 conditions::Or(self, other)
153 }
154}
155
156impl<K, F: Fn(Option<&K>) -> bool> Condition<K> for F {
157 fn matches_object(&self, obj: Option<&K>) -> bool {
158 (self)(obj)
159 }
160}
161
162/// Common conditions to wait for
163pub mod conditions {
164 pub use super::Condition;
165 use k8s_openapi::{
166 api::{batch::v1::Job, core::v1::Pod},
167 apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition,
168 };
169 use kube_client::Resource;
170
171 /// An await condition that returns `true` once the object has been deleted.
172 ///
173 /// An object is considered to be deleted if the object can no longer be found, or if its
174 /// [`uid`](kube_client::api::ObjectMeta#structfield.uid) changes. This means that an object is considered to be deleted even if we miss
175 /// the deletion event and the object is recreated in the meantime.
176 #[must_use]
177 pub fn is_deleted<K: Resource>(uid: &str) -> impl Condition<K> + '_ {
178 move |obj: Option<&K>| {
179 obj.map_or(
180 // Object is not found, success!
181 true,
182 // Object is found, but a changed uid would mean that it was deleted and recreated
183 |obj| obj.meta().uid.as_deref() != Some(uid),
184 )
185 }
186 }
187
188 /// An await condition for `CustomResourceDefinition` that returns `true` once it has been accepted and established
189 ///
190 /// Note that this condition only guarantees you that you can use `Api<CustomResourceDefinition>` when it is ready.
191 /// It usually takes extra time for Discovery to notice the custom resource, and there is no condition for this.
192 #[must_use]
193 pub fn is_crd_established() -> impl Condition<CustomResourceDefinition> {
194 |obj: Option<&CustomResourceDefinition>| {
195 if let Some(o) = obj {
196 if let Some(s) = &o.status {
197 if let Some(conds) = &s.conditions {
198 if let Some(pcond) = conds.iter().find(|c| c.type_ == "Established") {
199 return pcond.status == "True";
200 }
201 }
202 }
203 }
204 false
205 }
206 }
207
208 /// An await condition for `Pod` that returns `true` once it is running
209 #[must_use]
210 pub fn is_pod_running() -> impl Condition<Pod> {
211 |obj: Option<&Pod>| {
212 if let Some(pod) = &obj {
213 if let Some(status) = &pod.status {
214 if let Some(phase) = &status.phase {
215 return phase == "Running";
216 }
217 }
218 }
219 false
220 }
221 }
222
223 /// An await condition for `Job` that returns `true` once it is completed
224 #[must_use]
225 pub fn is_job_completed() -> impl Condition<Job> {
226 |obj: Option<&Job>| {
227 if let Some(job) = &obj {
228 if let Some(s) = &job.status {
229 if let Some(conds) = &s.conditions {
230 if let Some(pcond) = conds.iter().find(|c| c.type_ == "Complete") {
231 return pcond.status == "True";
232 }
233 }
234 }
235 }
236 false
237 }
238 }
239
240 /// See [`Condition::not`]
241 #[derive(Copy, Clone, Debug, PartialEq, Eq)]
242 pub struct Not<A>(pub(super) A);
243 impl<A: Condition<K>, K> Condition<K> for Not<A> {
244 fn matches_object(&self, obj: Option<&K>) -> bool {
245 !self.0.matches_object(obj)
246 }
247 }
248
249 /// See [`Condition::and`]
250 #[derive(Copy, Clone, Debug, PartialEq, Eq)]
251 pub struct And<A, B>(pub(super) A, pub(super) B);
252 impl<A, B, K> Condition<K> for And<A, B>
253 where
254 A: Condition<K>,
255 B: Condition<K>,
256 {
257 fn matches_object(&self, obj: Option<&K>) -> bool {
258 self.0.matches_object(obj) && self.1.matches_object(obj)
259 }
260 }
261
262 /// See [`Condition::or`]
263 #[derive(Copy, Clone, Debug, PartialEq, Eq)]
264 pub struct Or<A, B>(pub(super) A, pub(super) B);
265 impl<A, B, K> Condition<K> for Or<A, B>
266 where
267 A: Condition<K>,
268 B: Condition<K>,
269 {
270 fn matches_object(&self, obj: Option<&K>) -> bool {
271 self.0.matches_object(obj) || self.1.matches_object(obj)
272 }
273 }
274}
275
276/// Utilities for deleting objects
277pub mod delete {
278 use super::{await_condition, conditions};
279 use kube_client::{api::DeleteParams, Api, Resource};
280 use serde::de::DeserializeOwned;
281 use std::fmt::Debug;
282 use thiserror::Error;
283
284 #[derive(Debug, Error)]
285 pub enum Error {
286 #[error("deleted object has no UID to wait for")]
287 NoUid,
288 #[error("failed to delete object: {0}")]
289 Delete(#[source] kube_client::Error),
290 #[error("failed to wait for object to be deleted: {0}")]
291 Await(#[source] super::Error),
292 }
293
294 /// Delete an object, and wait for it to be removed from the Kubernetes API (including waiting for all finalizers to unregister themselves).
295 ///
296 /// # Errors
297 ///
298 /// Returns an [`Error`](enum@super::Error) if the object was unable to be deleted, or if the wait was interrupted.
299 #[allow(clippy::module_name_repetitions)]
300 pub async fn delete_and_finalize<K: Clone + Debug + Send + DeserializeOwned + Resource + 'static>(
301 api: Api<K>,
302 name: &str,
303 delete_params: &DeleteParams,
304 ) -> Result<(), Error> {
305 let deleted_obj_uid = api
306 .delete(name, delete_params)
307 .await
308 .map_err(Error::Delete)?
309 .either(
310 |mut obj| obj.meta_mut().uid.take(),
311 |status| status.details.map(|details| details.uid),
312 )
313 .ok_or(Error::NoUid)?;
314 await_condition(api, name, conditions::is_deleted(&deleted_obj_uid))
315 .await
316 .map_err(Error::Await)?;
317 Ok(())
318 }
319}