k8s_controller/
controller.rs

1use std::collections::BTreeMap;
2use std::error::Error as _;
3use std::sync::{Arc, Mutex};
4use std::time::Duration;
5
6use futures::future::FutureExt;
7use futures::stream::StreamExt;
8use kube::api::Api;
9use kube::core::{ClusterResourceScope, NamespaceResourceScope};
10use kube::{Client, Resource, ResourceExt};
11use kube_runtime::controller::Action;
12use kube_runtime::finalizer::{Event, finalizer};
13use kube_runtime::watcher;
14use rand::{Rng, rng};
15use tracing::{Level, event};
16
17#[derive(Debug, thiserror::Error)]
18pub enum Error<E: std::error::Error + 'static> {
19    #[error("{0}")]
20    ControllerError(#[source] E),
21    #[error("{0}")]
22    FinalizerError(#[source] kube_runtime::finalizer::Error<E>),
23}
24
25/// The [`Controller`] watches a set of resources, calling methods on the
26/// provided [`Context`] when events occur.
27pub struct Controller<Ctx: Context>
28where
29    Ctx: Send + Sync + 'static,
30    Ctx::Error: Send + Sync + 'static,
31    Ctx::Resource: Send + Sync + 'static,
32    Ctx::Resource: Clone + std::fmt::Debug + serde::Serialize,
33    for<'de> Ctx::Resource: serde::Deserialize<'de>,
34    <Ctx::Resource as Resource>::DynamicType:
35        Eq + Clone + std::hash::Hash + std::default::Default + std::fmt::Debug + std::marker::Unpin,
36{
37    client: kube::Client,
38    make_api: Box<dyn Fn(&Ctx::Resource) -> Api<Ctx::Resource> + Sync + Send + 'static>,
39    controller: kube_runtime::controller::Controller<Ctx::Resource>,
40    context: Ctx,
41}
42
43impl<Ctx: Context> Controller<Ctx>
44where
45    Ctx: Send + Sync + 'static,
46    Ctx::Error: Send + Sync + 'static,
47    Ctx::Resource: Clone + std::fmt::Debug + serde::Serialize,
48    for<'de> Ctx::Resource: serde::Deserialize<'de>,
49    <Ctx::Resource as Resource>::DynamicType:
50        Eq + Clone + std::hash::Hash + std::default::Default + std::fmt::Debug + std::marker::Unpin,
51{
52    /// Creates a new controller for a namespaced resource using the given
53    /// `client`. The `context` given determines the type of resource
54    /// to watch (via the [`Context::Resource`] type provided as part of
55    /// the trait implementation). The resources to be watched will be
56    /// limited to resources in the given `namespace`. A [`watcher::Config`]
57    /// can be given to limit the resources watched (for instance,
58    /// `watcher::Config::default().labels("app=myapp")`).
59    pub fn namespaced(client: Client, context: Ctx, namespace: &str, wc: watcher::Config) -> Self
60    where
61        Ctx::Resource: Resource<Scope = NamespaceResourceScope>,
62    {
63        let make_api = {
64            let client = client.clone();
65            Box::new(move |resource: &Ctx::Resource| {
66                Api::<Ctx::Resource>::namespaced(client.clone(), &resource.namespace().unwrap())
67            })
68        };
69        let controller = kube_runtime::controller::Controller::new(
70            Api::<Ctx::Resource>::namespaced(client.clone(), namespace),
71            wc,
72        );
73        Self {
74            client,
75            make_api,
76            controller,
77            context,
78        }
79    }
80
81    /// Creates a new controller for a namespaced resource using the given
82    /// `client`. The `context` given determines the type of resource to
83    /// watch (via the [`Context::Resource`] type provided as part of the
84    /// trait implementation). The resources to be watched will not be
85    /// limited by namespace. A [`watcher::Config`] can be given to limit the
86    /// resources watched (for instance,
87    /// `watcher::Config::default().labels("app=myapp")`).
88    pub fn namespaced_all(client: Client, context: Ctx, wc: watcher::Config) -> Self
89    where
90        Ctx::Resource: Resource<Scope = NamespaceResourceScope>,
91    {
92        let make_api = {
93            let client = client.clone();
94            Box::new(move |resource: &Ctx::Resource| {
95                Api::<Ctx::Resource>::namespaced(client.clone(), &resource.namespace().unwrap())
96            })
97        };
98        let controller = kube_runtime::controller::Controller::new(
99            Api::<Ctx::Resource>::all(client.clone()),
100            wc,
101        );
102        Self {
103            client,
104            make_api,
105            controller,
106            context,
107        }
108    }
109
110    /// Creates a new controller for a cluster-scoped resource using the
111    /// given `client`. The `context` given determines the type of resource
112    /// to watch (via the [`Context::Resource`] type provided as part of the
113    /// trait implementation). A [`watcher::Config`] can be given to limit the
114    /// resources watched (for instance,
115    /// `watcher::Config::default().labels("app=myapp")`).
116    pub fn cluster(client: Client, context: Ctx, wc: watcher::Config) -> Self
117    where
118        Ctx::Resource: Resource<Scope = ClusterResourceScope>,
119    {
120        let make_api = {
121            let client = client.clone();
122            Box::new(move |_: &Ctx::Resource| Api::<Ctx::Resource>::all(client.clone()))
123        };
124        let controller = kube_runtime::controller::Controller::new(
125            Api::<Ctx::Resource>::all(client.clone()),
126            wc,
127        );
128        Self {
129            client,
130            make_api,
131            controller,
132            context,
133        }
134    }
135
136    /// Run the controller. This method will not return. The [`Context`]
137    /// given to the constructor will have its [`apply`](Context::apply)
138    /// method called when a resource is created or updated, and its
139    /// [`cleanup`](Context::cleanup) method called when a resource is about
140    /// to be deleted.
141    pub async fn run(self) {
142        let Self {
143            client,
144            make_api,
145            controller,
146            context,
147        } = self;
148        let backoffs = Arc::new(Mutex::new(BTreeMap::new()));
149        let backoffs = &backoffs;
150        controller
151            .run(
152                |resource, context| {
153                    let uid = resource.uid().unwrap();
154                    let backoffs = Arc::clone(backoffs);
155                    context
156                        ._reconcile(client.clone(), make_api(&resource), resource)
157                        .inspect(move |result| {
158                            if result.is_ok() {
159                                backoffs.lock().unwrap().remove(&uid);
160                            }
161                        })
162                },
163                |resource, err, context| {
164                    let consecutive_errors = {
165                        let uid = resource.uid().unwrap();
166                        let mut backoffs = backoffs.lock().unwrap();
167                        let consecutive_errors: u32 =
168                            backoffs.get(&uid).copied().unwrap_or_default();
169                        backoffs.insert(uid, consecutive_errors.saturating_add(1));
170                        consecutive_errors
171                    };
172                    context.error_action(resource, err, consecutive_errors)
173                },
174                Arc::new(context),
175            )
176            .for_each(|reconciliation_result| async move {
177                let dynamic_type = Default::default();
178                let kind = Ctx::Resource::kind(&dynamic_type).into_owned();
179                match reconciliation_result {
180                    Ok(resource) => {
181                        event!(
182                            Level::INFO,
183                            resource_name = %resource.0.name,
184                            controller = Ctx::FINALIZER_NAME,
185                            "{} reconciliation successful.",
186                            kind
187                        );
188                    }
189                    Err(err) => event!(
190                        Level::ERROR,
191                        err = %err,
192                        source = err.source(),
193                        controller = Ctx::FINALIZER_NAME,
194                        "{} reconciliation error.",
195                        kind
196                    ),
197                }
198            })
199            .await
200    }
201
202    /// Allow configuring the underlying [`kube_runtime::Controller`]. For
203    /// example, you can use
204    /// `controller.with_controller(|controller| controller.with_config(Config::default().concurrency(10)))`
205    /// to limit the created controller to reconciling 10 resources at once.
206    pub fn with_controller<F>(mut self, f: F) -> Self
207    where
208        F: FnOnce(
209            kube_runtime::Controller<Ctx::Resource>,
210        ) -> kube_runtime::Controller<Ctx::Resource>,
211    {
212        self.controller = f(self.controller);
213        self
214    }
215}
216
217/// The [`Context`] trait should be implemented in order to provide callbacks
218/// for events that happen to resources watched by a [`Controller`].
219#[cfg_attr(not(docsrs), async_trait::async_trait)]
220pub trait Context {
221    /// The type of Kubernetes [resource](Resource) that will be watched by
222    /// the [`Controller`] this context is passed to
223    type Resource: Resource + Send + Sync + 'static;
224    /// The error type which will be returned by the [`apply`](Self::apply)
225    /// and [`cleanup`](Self::cleanup) methods
226    type Error: std::error::Error;
227
228    /// The name to use for the finalizer. This must be unique across
229    /// controllers - if multiple controllers with the same finalizer name
230    /// run against the same resource, unexpected behavior can occur.
231    ///
232    /// If this is None (the default), a finalizer will not be used, and
233    /// cleanup events will not be reported.
234    const FINALIZER_NAME: Option<&'static str> = None;
235
236    /// This method is called when a watched resource is created or updated.
237    /// The [`Client`] used by the controller is passed in to allow making
238    /// additional API requests, as is the resource which triggered this
239    /// event. If this method returns `Some(action)`, the given action will
240    /// be performed, otherwise if `None` is returned,
241    /// [`success_action`](Self::success_action) will be called to find the
242    /// action to perform.
243    async fn apply(
244        &self,
245        client: Client,
246        resource: &Self::Resource,
247    ) -> Result<Option<Action>, Self::Error>;
248
249    /// This method is called when a watched resource is marked for deletion.
250    /// The [`Client`] used by the controller is passed in to allow making
251    /// additional API requests, as is the resource which triggered this
252    /// event. If this method returns `Some(action)`, the given action will
253    /// be performed, otherwise if `None` is returned,
254    /// [`success_action`](Self::success_action) will be called to find the
255    /// action to perform.
256    ///
257    /// Note that this method will only be called if a finalizer is used.
258    async fn cleanup(
259        &self,
260        client: Client,
261        resource: &Self::Resource,
262    ) -> Result<Option<Action>, Self::Error> {
263        // use a better name for the parameter name in the docs
264        let _client = client;
265        let _resource = resource;
266
267        Ok(Some(Action::await_change()))
268    }
269
270    /// This method is called when a call to [`apply`](Self::apply) or
271    /// [`cleanup`](Self::cleanup) returns `Ok(None)`. It should return the
272    /// default [`Action`] to perform. The default implementation will
273    /// requeue the event at a random time between 40 and 60 minutes in the
274    /// future.
275    fn success_action(&self, resource: &Self::Resource) -> Action {
276        // use a better name for the parameter name in the docs
277        let _resource = resource;
278
279        Action::requeue(Duration::from_secs(rng().random_range(2400..3600)))
280    }
281
282    /// This method is called when a call to [`apply`](Self::apply) or
283    /// [`cleanup`](Self::cleanup) returns `Err`. It should return the
284    /// default [`Action`] to perform. The error returned will be passed in
285    /// here, as well as a count of how many consecutive errors have happened
286    /// for this resource, to allow for an exponential backoff strategy. The
287    /// default implementation uses exponential backoff with a max of 256
288    /// seconds and some added randomization to avoid thundering herds.
289    fn error_action(
290        self: Arc<Self>,
291        resource: Arc<Self::Resource>,
292        err: &Error<Self::Error>,
293        consecutive_errors: u32,
294    ) -> Action {
295        // use a better name for the parameter name in the docs
296        let _resource = resource;
297        let _err = err;
298
299        let seconds = 2u64.pow(consecutive_errors.min(7) + 1);
300        Action::requeue(Duration::from_millis(
301            rng().random_range((seconds * 500)..(seconds * 1000)),
302        ))
303    }
304
305    #[doc(hidden)]
306    async fn _reconcile(
307        self: Arc<Self>,
308        client: Client,
309        api: Api<Self::Resource>,
310        resource: Arc<Self::Resource>,
311    ) -> Result<Action, Error<Self::Error>>
312    where
313        Self: Send + Sync + 'static,
314        Self::Error: Send + Sync + 'static,
315        Self::Resource: Send + Sync + 'static,
316        Self::Resource: Clone + std::fmt::Debug + serde::Serialize,
317        for<'de> Self::Resource: serde::Deserialize<'de>,
318        <Self::Resource as Resource>::DynamicType: Eq
319            + Clone
320            + std::hash::Hash
321            + std::default::Default
322            + std::fmt::Debug
323            + std::marker::Unpin,
324    {
325        let dynamic_type = Default::default();
326        let kind = Self::Resource::kind(&dynamic_type).into_owned();
327        let mut ran = false;
328        let res = if let Some(finalizer_name) = Self::FINALIZER_NAME {
329            finalizer(&api, finalizer_name, Arc::clone(&resource), |event| async {
330                ran = true;
331                event!(
332                    Level::INFO,
333                    resource_name = %resource.name_unchecked().as_str(),
334                    controller = Self::FINALIZER_NAME,
335                    "Reconciling {} ({}).",
336                    kind,
337                    match event {
338                        Event::Apply(_) => "apply",
339                        Event::Cleanup(_) => "cleanup",
340                    }
341                );
342                let action = match event {
343                    Event::Apply(resource) => {
344                        let action = self.apply(client, &resource).await?;
345                        if let Some(action) = action {
346                            action
347                        } else {
348                            self.success_action(&resource)
349                        }
350                    }
351                    Event::Cleanup(resource) => self
352                        .cleanup(client, &resource)
353                        .await?
354                        .unwrap_or_else(Action::await_change),
355                };
356                Ok(action)
357            })
358            .await
359            .map_err(Error::FinalizerError)
360        } else {
361            ran = true;
362            event!(
363                Level::INFO,
364                resource_name = %resource.name_unchecked().as_str(),
365                "Reconciling {} (apply).",
366                kind,
367            );
368            let action = self
369                .apply(client, &resource)
370                .await
371                .map_err(Error::ControllerError)?;
372            Ok(if let Some(action) = action {
373                action
374            } else {
375                self.success_action(&resource)
376            })
377        };
378        if !ran {
379            event!(
380                Level::INFO,
381                resource_name = %resource.name_unchecked().as_str(),
382                controller = Self::FINALIZER_NAME,
383                "Reconciling {} ({}).",
384                kind,
385                if resource.meta().deletion_timestamp.is_some() {
386                    "delete"
387                } else {
388                    "init"
389                }
390            );
391        }
392        res
393    }
394}