k8s_controller/
controller.rs

1use std::collections::BTreeMap;
2use std::error::Error;
3use std::sync::{Arc, Mutex};
4use std::time::Duration;
5
6use futures::future::FutureExt;
7use futures::stream::StreamExt;
8use kube::api::Api;
9use kube::core::{ClusterResourceScope, NamespaceResourceScope};
10use kube::{Client, Resource, ResourceExt};
11use kube_runtime::controller::Action;
12use kube_runtime::finalizer::{finalizer, Event};
13use kube_runtime::watcher;
14use rand::{thread_rng, Rng};
15use tracing::{event, Level};
16
17/// The [`Controller`] watches a set of resources, calling methods on the
18/// provided [`Context`] when events occur.
19pub struct Controller<Ctx: Context>
20where
21    Ctx: Send + Sync + 'static,
22    Ctx::Error: Send + Sync + 'static,
23    Ctx::Resource: Send + Sync + 'static,
24    Ctx::Resource: Clone + std::fmt::Debug + serde::Serialize,
25    for<'de> Ctx::Resource: serde::Deserialize<'de>,
26    <Ctx::Resource as Resource>::DynamicType:
27        Eq + Clone + std::hash::Hash + std::default::Default + std::fmt::Debug + std::marker::Unpin,
28{
29    client: kube::Client,
30    make_api: Box<dyn Fn(&Ctx::Resource) -> Api<Ctx::Resource> + Sync + Send + 'static>,
31    controller: kube_runtime::controller::Controller<Ctx::Resource>,
32    context: Ctx,
33}
34
35impl<Ctx: Context> Controller<Ctx>
36where
37    Ctx: Send + Sync + 'static,
38    Ctx::Error: Send + Sync + 'static,
39    Ctx::Resource: Send + Sync + 'static,
40    Ctx::Resource: Clone + std::fmt::Debug + serde::Serialize,
41    for<'de> Ctx::Resource: serde::Deserialize<'de>,
42    <Ctx::Resource as Resource>::DynamicType:
43        Eq + Clone + std::hash::Hash + std::default::Default + std::fmt::Debug + std::marker::Unpin,
44{
45    /// Creates a new controller for a namespaced resource using the given
46    /// `client`. The `context` given determines the type of resource
47    /// to watch (via the [`Context::Resource`] type provided as part of
48    /// the trait implementation). The resources to be watched will be
49    /// limited to resources in the given `namespace`. A [`watcher::Config`]
50    /// can be given to limit the resources watched (for instance,
51    /// `watcher::Config::default().labels("app=myapp")`).
52    pub fn namespaced(client: Client, context: Ctx, namespace: &str, wc: watcher::Config) -> Self
53    where
54        Ctx::Resource: Resource<Scope = NamespaceResourceScope>,
55    {
56        let make_api = {
57            let client = client.clone();
58            Box::new(move |resource: &Ctx::Resource| {
59                Api::<Ctx::Resource>::namespaced(client.clone(), &resource.namespace().unwrap())
60            })
61        };
62        let controller = kube_runtime::controller::Controller::new(
63            Api::<Ctx::Resource>::namespaced(client.clone(), namespace),
64            wc,
65        );
66        Self {
67            client,
68            make_api,
69            controller,
70            context,
71        }
72    }
73
74    /// Creates a new controller for a namespaced resource using the given
75    /// `client`. The `context` given determines the type of resource to
76    /// watch (via the [`Context::Resource`] type provided as part of the
77    /// trait implementation). The resources to be watched will not be
78    /// limited by namespace. A [`watcher::Config`] can be given to limit the
79    /// resources watched (for instance,
80    /// `watcher::Config::default().labels("app=myapp")`).
81    pub fn namespaced_all(client: Client, context: Ctx, wc: watcher::Config) -> Self
82    where
83        Ctx::Resource: Resource<Scope = NamespaceResourceScope>,
84    {
85        let make_api = {
86            let client = client.clone();
87            Box::new(move |resource: &Ctx::Resource| {
88                Api::<Ctx::Resource>::namespaced(client.clone(), &resource.namespace().unwrap())
89            })
90        };
91        let controller = kube_runtime::controller::Controller::new(
92            Api::<Ctx::Resource>::all(client.clone()),
93            wc,
94        );
95        Self {
96            client,
97            make_api,
98            controller,
99            context,
100        }
101    }
102
103    /// Creates a new controller for a cluster-scoped resource using the
104    /// given `client`. The `context` given determines the type of resource
105    /// to watch (via the [`Context::Resource`] type provided as part of the
106    /// trait implementation). A [`watcher::Config`] can be given to limit the
107    /// resources watched (for instance,
108    /// `watcher::Config::default().labels("app=myapp")`).
109    pub fn cluster(client: Client, context: Ctx, wc: watcher::Config) -> Self
110    where
111        Ctx::Resource: Resource<Scope = ClusterResourceScope>,
112    {
113        let make_api = {
114            let client = client.clone();
115            Box::new(move |_: &Ctx::Resource| Api::<Ctx::Resource>::all(client.clone()))
116        };
117        let controller = kube_runtime::controller::Controller::new(
118            Api::<Ctx::Resource>::all(client.clone()),
119            wc,
120        );
121        Self {
122            client,
123            make_api,
124            controller,
125            context,
126        }
127    }
128
129    /// Run the controller. This method will not return. The [`Context`]
130    /// given to the constructor will have its [`apply`](Context::apply)
131    /// method called when a resource is created or updated, and its
132    /// [`cleanup`](Context::cleanup) method called when a resource is about
133    /// to be deleted.
134    pub async fn run(self) {
135        let Self {
136            client,
137            make_api,
138            controller,
139            context,
140        } = self;
141        let backoffs = Arc::new(Mutex::new(BTreeMap::new()));
142        let backoffs = &backoffs;
143        controller
144            .run(
145                |resource, context| {
146                    let uid = resource.uid().unwrap();
147                    let backoffs = Arc::clone(backoffs);
148                    context
149                        ._reconcile(client.clone(), make_api(&resource), resource)
150                        .inspect(move |result| {
151                            if result.is_ok() {
152                                backoffs.lock().unwrap().remove(&uid);
153                            }
154                        })
155                },
156                |resource, err, context| {
157                    let consecutive_errors = {
158                        let uid = resource.uid().unwrap();
159                        let mut backoffs = backoffs.lock().unwrap();
160                        let consecutive_errors: u32 =
161                            backoffs.get(&uid).copied().unwrap_or_default();
162                        backoffs.insert(uid, consecutive_errors.saturating_add(1));
163                        consecutive_errors
164                    };
165                    context.error_action(resource, err, consecutive_errors)
166                },
167                Arc::new(context),
168            )
169            .for_each(|reconciliation_result| async move {
170                let dynamic_type = Default::default();
171                let kind = Ctx::Resource::kind(&dynamic_type).into_owned();
172                match reconciliation_result {
173                    Ok(resource) => {
174                        event!(
175                            Level::INFO,
176                            resource_name = %resource.0.name,
177                            controller = Ctx::FINALIZER_NAME,
178                            "{} reconciliation successful.",
179                            kind
180                        );
181                    }
182                    Err(err) => event!(
183                        Level::ERROR,
184                        err = %err,
185                        source = err.source(),
186                        controller = Ctx::FINALIZER_NAME,
187                        "{} reconciliation error.",
188                        kind
189                    ),
190                }
191            })
192            .await
193    }
194
195    pub fn with_concurrency(mut self, concurrency: u16) -> Self {
196        self.controller = self
197            .controller
198            .with_config(kube_runtime::Config::default().concurrency(concurrency));
199        self
200    }
201}
202
203/// The [`Context`] trait should be implemented in order to provide callbacks
204/// for events that happen to resources watched by a [`Controller`].
205#[cfg_attr(not(docsrs), async_trait::async_trait)]
206pub trait Context {
207    /// The type of Kubernetes [resource](Resource) that will be watched by
208    /// the [`Controller`] this context is passed to
209    type Resource: Resource;
210    /// The error type which will be returned by the [`apply`](Self::apply)
211    /// and [`cleanup`](Self::apply) methods
212    type Error: std::error::Error;
213
214    /// The name to use for the finalizer. This must be unique across
215    /// controllers - if multiple controllers with the same finalizer name
216    /// run against the same resource, unexpected behavior can occur.
217    const FINALIZER_NAME: &'static str;
218
219    /// This method is called when a watched resource is created or updated.
220    /// The [`Client`] used by the controller is passed in to allow making
221    /// additional API requests, as is the resource which triggered this
222    /// event. If this method returns `Some(action)`, the given action will
223    /// be performed, otherwise if `None` is returned,
224    /// [`success_action`](Self::success_action) will be called to find the
225    /// action to perform.
226    async fn apply(
227        &self,
228        client: Client,
229        resource: &Self::Resource,
230    ) -> Result<Option<Action>, Self::Error>;
231
232    /// This method is called when a watched resource is marked for deletion.
233    /// The [`Client`] used by the controller is passed in to allow making
234    /// additional API requests, as is the resource which triggered this
235    /// event. If this method returns `Some(action)`, the given action will
236    /// be performed, otherwise if `None` is returned,
237    /// [`success_action`](Self::success_action) will be called to find the
238    /// action to perform.
239    async fn cleanup(
240        &self,
241        client: Client,
242        resource: &Self::Resource,
243    ) -> Result<Option<Action>, Self::Error>;
244
245    /// This method is called when a call to [`apply`](Self::apply) or
246    /// [`cleanup`](Self::cleanup) returns `Ok(None)`. It should return the
247    /// default [`Action`] to perform. The default implementation will
248    /// requeue the event at a random time between 40 and 60 minutes in the
249    /// future.
250    fn success_action(&self, resource: &Self::Resource) -> Action {
251        // use a better name for the parameter name in the docs
252        let _resource = resource;
253
254        Action::requeue(Duration::from_secs(thread_rng().gen_range(2400..3600)))
255    }
256
257    /// This method is called when a call to [`apply`](Self::apply) or
258    /// [`cleanup`](Self::cleanup) returns `Err`. It should return the
259    /// default [`Action`] to perform. The error returned will be passed in
260    /// here, as well as a count of how many consecutive errors have happened
261    /// for this resource, to allow for an exponential backoff strategy. The
262    /// default implementation uses exponential backoff with a max of 256
263    /// seconds and some added randomization to avoid thundering herds.
264    fn error_action(
265        self: Arc<Self>,
266        resource: Arc<Self::Resource>,
267        err: &kube_runtime::finalizer::Error<Self::Error>,
268        consecutive_errors: u32,
269    ) -> Action {
270        // use a better name for the parameter name in the docs
271        let _resource = resource;
272        let _err = err;
273
274        let seconds = 2u64.pow(consecutive_errors.min(7) + 1);
275        Action::requeue(Duration::from_millis(
276            thread_rng().gen_range((seconds * 500)..(seconds * 1000)),
277        ))
278    }
279
280    #[doc(hidden)]
281    async fn _reconcile(
282        self: Arc<Self>,
283        client: Client,
284        api: Api<Self::Resource>,
285        resource: Arc<Self::Resource>,
286    ) -> Result<Action, kube_runtime::finalizer::Error<Self::Error>>
287    where
288        Self: Send + Sync + 'static,
289        Self::Error: Send + Sync + 'static,
290        Self::Resource: Send + Sync + 'static,
291        Self::Resource: Clone + std::fmt::Debug + serde::Serialize,
292        for<'de> Self::Resource: serde::Deserialize<'de>,
293        <Self::Resource as Resource>::DynamicType: Eq
294            + Clone
295            + std::hash::Hash
296            + std::default::Default
297            + std::fmt::Debug
298            + std::marker::Unpin,
299    {
300        let dynamic_type = Default::default();
301        let kind = Self::Resource::kind(&dynamic_type).into_owned();
302        let mut ran = false;
303        let res = finalizer(
304            &api,
305            Self::FINALIZER_NAME,
306            Arc::clone(&resource),
307            |event| async {
308                ran = true;
309                event!(
310                    Level::INFO,
311                    resource_name = %resource.name_unchecked().as_str(),
312                    controller = Self::FINALIZER_NAME,
313                    "Reconciling {} ({}).",
314                    kind,
315                    match event {
316                        Event::Apply(_) => "apply",
317                        Event::Cleanup(_) => "cleanup",
318                    }
319                );
320                let action = match event {
321                    Event::Apply(resource) => {
322                        let action = self.apply(client, &resource).await?;
323                        if let Some(action) = action {
324                            action
325                        } else {
326                            self.success_action(&resource)
327                        }
328                    }
329                    Event::Cleanup(resource) => self
330                        .cleanup(client, &resource)
331                        .await?
332                        .unwrap_or_else(Action::await_change),
333                };
334                Ok(action)
335            },
336        )
337        .await;
338        if !ran {
339            event!(
340                Level::INFO,
341                resource_name = %resource.name_unchecked().as_str(),
342                controller = Self::FINALIZER_NAME,
343                "Reconciling {} ({}).",
344                kind,
345                if resource.meta().deletion_timestamp.is_some() {
346                    "delete"
347                } else {
348                    "init"
349                }
350            );
351        }
352        res
353    }
354}