Skip to main content

k8s_controller/
controller.rs

1use std::collections::BTreeMap;
2use std::error::Error as _;
3use std::fmt::Display;
4use std::sync::{Arc, Mutex};
5use std::time::{Duration, Instant};
6
7use futures::future::FutureExt;
8use futures::stream::StreamExt;
9use kube::api::Api;
10use kube::core::{ClusterResourceScope, NamespaceResourceScope};
11use kube::{Client, Resource, ResourceExt};
12use kube_runtime::controller::Action;
13use kube_runtime::finalizer::{Event, finalizer};
14use kube_runtime::watcher;
15use rand::{Rng, rng};
16use tracing::field::Empty;
17use tracing::{Instrument, Span, error, info, info_span, trace, warn};
18
19#[derive(Debug, thiserror::Error)]
20pub enum Error<E: std::error::Error + 'static> {
21    #[error("{0}")]
22    ControllerError(#[source] E),
23    #[error("{0}")]
24    FinalizerError(#[source] kube_runtime::finalizer::Error<E>),
25}
26
27#[derive(Debug, Default)]
28pub struct TraceMetadata(BTreeMap<String, String>);
29
30impl TraceMetadata {
31    pub fn annotate<K: Display, V: Display>(&mut self, key: K, val: V) {
32        self.0.insert(key.to_string(), val.to_string());
33    }
34}
35
36/// The [`Controller`] watches a set of resources, calling methods on the
37/// provided [`Context`] when events occur.
38pub struct Controller<Ctx: Context>
39where
40    Ctx: Send + Sync + 'static,
41    Ctx::Error: Send + Sync + 'static,
42    Ctx::Resource: Send + Sync + 'static,
43    Ctx::Resource: Clone + std::fmt::Debug + serde::Serialize,
44    for<'de> Ctx::Resource: serde::Deserialize<'de>,
45    <Ctx::Resource as Resource>::DynamicType:
46        Eq + Clone + std::hash::Hash + std::default::Default + std::fmt::Debug + std::marker::Unpin,
47{
48    client: kube::Client,
49    make_api: Box<dyn Fn(&Ctx::Resource) -> Api<Ctx::Resource> + Sync + Send + 'static>,
50    controller: kube_runtime::controller::Controller<Ctx::Resource>,
51    context: Ctx,
52}
53
54impl<Ctx: Context> Controller<Ctx>
55where
56    Ctx: Send + Sync + 'static,
57    Ctx::Error: Send + Sync + 'static,
58    Ctx::Resource: Clone + std::fmt::Debug + serde::Serialize,
59    for<'de> Ctx::Resource: serde::Deserialize<'de>,
60    <Ctx::Resource as Resource>::DynamicType:
61        Eq + Clone + std::hash::Hash + std::default::Default + std::fmt::Debug + std::marker::Unpin,
62{
63    /// Creates a new controller for a namespaced resource using the given
64    /// `client`. The `context` given determines the type of resource
65    /// to watch (via the [`Context::Resource`] type provided as part of
66    /// the trait implementation). The resources to be watched will be
67    /// limited to resources in the given `namespace`. A [`watcher::Config`]
68    /// can be given to limit the resources watched (for instance,
69    /// `watcher::Config::default().labels("app=myapp")`).
70    pub fn namespaced(client: Client, context: Ctx, namespace: &str, wc: watcher::Config) -> Self
71    where
72        Ctx::Resource: Resource<Scope = NamespaceResourceScope>,
73    {
74        let make_api = {
75            let client = client.clone();
76            Box::new(move |resource: &Ctx::Resource| {
77                Api::<Ctx::Resource>::namespaced(client.clone(), &resource.namespace().unwrap())
78            })
79        };
80        let controller = kube_runtime::controller::Controller::new(
81            Api::<Ctx::Resource>::namespaced(client.clone(), namespace),
82            wc,
83        );
84        Self {
85            client,
86            make_api,
87            controller,
88            context,
89        }
90    }
91
92    /// Creates a new controller for a namespaced resource using the given
93    /// `client`. The `context` given determines the type of resource to
94    /// watch (via the [`Context::Resource`] type provided as part of the
95    /// trait implementation). The resources to be watched will not be
96    /// limited by namespace. A [`watcher::Config`] can be given to limit the
97    /// resources watched (for instance,
98    /// `watcher::Config::default().labels("app=myapp")`).
99    pub fn namespaced_all(client: Client, context: Ctx, wc: watcher::Config) -> Self
100    where
101        Ctx::Resource: Resource<Scope = NamespaceResourceScope>,
102    {
103        let make_api = {
104            let client = client.clone();
105            Box::new(move |resource: &Ctx::Resource| {
106                Api::<Ctx::Resource>::namespaced(client.clone(), &resource.namespace().unwrap())
107            })
108        };
109        let controller = kube_runtime::controller::Controller::new(
110            Api::<Ctx::Resource>::all(client.clone()),
111            wc,
112        );
113        Self {
114            client,
115            make_api,
116            controller,
117            context,
118        }
119    }
120
121    /// Creates a new controller for a cluster-scoped resource using the
122    /// given `client`. The `context` given determines the type of resource
123    /// to watch (via the [`Context::Resource`] type provided as part of the
124    /// trait implementation). A [`watcher::Config`] can be given to limit the
125    /// resources watched (for instance,
126    /// `watcher::Config::default().labels("app=myapp")`).
127    pub fn cluster(client: Client, context: Ctx, wc: watcher::Config) -> Self
128    where
129        Ctx::Resource: Resource<Scope = ClusterResourceScope>,
130    {
131        let make_api = {
132            let client = client.clone();
133            Box::new(move |_: &Ctx::Resource| Api::<Ctx::Resource>::all(client.clone()))
134        };
135        let controller = kube_runtime::controller::Controller::new(
136            Api::<Ctx::Resource>::all(client.clone()),
137            wc,
138        );
139        Self {
140            client,
141            make_api,
142            controller,
143            context,
144        }
145    }
146
147    /// Run the controller. This method will not return. The [`Context`]
148    /// given to the constructor will have its [`apply`](Context::apply)
149    /// method called when a resource is created or updated, and its
150    /// [`cleanup`](Context::cleanup) method called when a resource is about
151    /// to be deleted.
152    pub async fn run(self) {
153        let Self {
154            client,
155            make_api,
156            controller,
157            context,
158        } = self;
159        let backoffs = Arc::new(Mutex::new(BTreeMap::new()));
160        let backoffs = &backoffs;
161        controller
162            .run(
163                |resource, context| {
164                    let uid = resource.uid().unwrap();
165                    let backoffs = Arc::clone(backoffs);
166                    context
167                        ._reconcile(client.clone(), make_api(&resource), resource)
168                        .inspect(move |result| {
169                            if result.is_ok() {
170                                backoffs.lock().unwrap().remove(&uid);
171                            }
172                        })
173                },
174                |resource, err, context| {
175                    let consecutive_errors = {
176                        let uid = resource.uid().unwrap();
177                        let mut backoffs = backoffs.lock().unwrap();
178                        let consecutive_errors: u32 =
179                            backoffs.get(&uid).copied().unwrap_or_default();
180                        backoffs.insert(uid, consecutive_errors.saturating_add(1));
181                        consecutive_errors
182                    };
183                    context.error_action(resource, err, consecutive_errors)
184                },
185                Arc::new(context),
186            )
187            .for_each(|res| async {
188                // ReconcilerFailed errors will already have been reported by
189                // the _reconcile function
190                if let Err(e) = res
191                    && !matches!(e, kube_runtime::controller::Error::ReconcilerFailed(..))
192                {
193                    // warn instead of error because these kinds of errors
194                    // are almost always recoverable
195                    warn!(
196                        error = %e,
197                        source = e.source(),
198                        "internal kube controller error",
199                    );
200                }
201            })
202            .await
203    }
204
205    /// Allow configuring the underlying [`kube_runtime::Controller`]. For
206    /// example, you can use
207    /// `controller.with_controller(|controller| controller.with_config(Config::default().concurrency(10)))`
208    /// to limit the created controller to reconciling 10 resources at once.
209    pub fn with_controller<F>(mut self, f: F) -> Self
210    where
211        F: FnOnce(
212            kube_runtime::Controller<Ctx::Resource>,
213        ) -> kube_runtime::Controller<Ctx::Resource>,
214    {
215        self.controller = f(self.controller);
216        self
217    }
218}
219
220/// The [`Context`] trait should be implemented in order to provide callbacks
221/// for events that happen to resources watched by a [`Controller`].
222#[cfg_attr(not(docsrs), async_trait::async_trait)]
223pub trait Context {
224    /// The type of Kubernetes [resource](Resource) that will be watched by
225    /// the [`Controller`] this context is passed to
226    type Resource: Resource + Send + Sync + 'static;
227    /// The error type which will be returned by the [`apply`](Self::apply)
228    /// and [`cleanup`](Self::cleanup) methods
229    type Error: std::error::Error;
230
231    /// The name to use for the finalizer. This must be unique across
232    /// controllers - if multiple controllers with the same finalizer name
233    /// run against the same resource, unexpected behavior can occur.
234    ///
235    /// If this is None (the default), a finalizer will not be used, and
236    /// cleanup events will not be reported.
237    const FINALIZER_NAME: Option<&'static str> = None;
238
239    /// This method is called when a watched resource is created or updated.
240    /// The [`Client`] used by the controller is passed in to allow making
241    /// additional API requests, as is the resource which triggered this
242    /// event. If this method returns `Some(action)`, the given action will
243    /// be performed, otherwise if `None` is returned,
244    /// [`success_action`](Self::success_action) will be called to find the
245    /// action to perform.
246    async fn apply(
247        &self,
248        client: Client,
249        resource: &Self::Resource,
250        metadata: &mut TraceMetadata,
251    ) -> Result<Option<Action>, Self::Error>;
252
253    /// This method is called when a watched resource is marked for deletion.
254    /// The [`Client`] used by the controller is passed in to allow making
255    /// additional API requests, as is the resource which triggered this
256    /// event. If this method returns `Some(action)`, the given action will
257    /// be performed, otherwise if `None` is returned,
258    /// [`success_action`](Self::success_action) will be called to find the
259    /// action to perform.
260    ///
261    /// Note that this method will only be called if a finalizer is used.
262    async fn cleanup(
263        &self,
264        client: Client,
265        resource: &Self::Resource,
266        metadata: &mut TraceMetadata,
267    ) -> Result<Option<Action>, Self::Error> {
268        // use a better name for the parameter name in the docs
269        let _client = client;
270        let _resource = resource;
271        let _metadata = metadata;
272
273        Ok(Some(Action::await_change()))
274    }
275
276    /// This method is called when a call to [`apply`](Self::apply) or
277    /// [`cleanup`](Self::cleanup) returns `Ok(None)`. It should return the
278    /// default [`Action`] to perform. The default implementation will
279    /// requeue the event at a random time between 40 and 60 minutes in the
280    /// future.
281    fn success_action(&self, resource: &Self::Resource) -> Action {
282        // use a better name for the parameter name in the docs
283        let _resource = resource;
284
285        Action::requeue(Duration::from_secs(rng().random_range(2400..3600)))
286    }
287
288    /// This method is called when a call to [`apply`](Self::apply) or
289    /// [`cleanup`](Self::cleanup) returns `Err`. It should return the
290    /// default [`Action`] to perform. The error returned will be passed in
291    /// here, as well as a count of how many consecutive errors have happened
292    /// for this resource, to allow for an exponential backoff strategy. The
293    /// default implementation uses exponential backoff with a max of 256
294    /// seconds and some added randomization to avoid thundering herds.
295    fn error_action(
296        self: Arc<Self>,
297        resource: Arc<Self::Resource>,
298        err: &Error<Self::Error>,
299        consecutive_errors: u32,
300    ) -> Action {
301        // use a better name for the parameter name in the docs
302        let _resource = resource;
303        let _err = err;
304
305        let seconds = 2u64.pow(consecutive_errors.min(7) + 1);
306        Action::requeue(Duration::from_millis(
307            rng().random_range((seconds * 500)..(seconds * 1000)),
308        ))
309    }
310
311    #[doc(hidden)]
312    async fn _reconcile(
313        self: Arc<Self>,
314        client: Client,
315        api: Api<Self::Resource>,
316        resource: Arc<Self::Resource>,
317    ) -> Result<Action, Error<Self::Error>>
318    where
319        Self: Send + Sync + 'static,
320        Self::Error: Send + Sync + 'static,
321        Self::Resource: Send + Sync + 'static,
322        Self::Resource: Clone + std::fmt::Debug + serde::Serialize,
323        for<'de> Self::Resource: serde::Deserialize<'de>,
324        <Self::Resource as Resource>::DynamicType: Eq
325            + Clone
326            + std::hash::Hash
327            + std::default::Default
328            + std::fmt::Debug
329            + std::marker::Unpin,
330    {
331        let span = info_span!(
332            "reconcile",
333            resource_type = Self::Resource::kind(&Default::default()).as_ref(),
334            resource_name = resource.name_unchecked().as_str(),
335            controller = Self::FINALIZER_NAME,
336            event_type = Empty,
337            success = Empty,
338            duration_seconds = Empty,
339            metadata = Empty,
340        );
341        async {
342            trace!("beginning reconciliation");
343
344            let mut metadata = TraceMetadata::default();
345            let mut ran = false;
346            let start = Instant::now();
347
348            let res = if let Some(finalizer_name) = Self::FINALIZER_NAME {
349                finalizer(&api, finalizer_name, Arc::clone(&resource), |event| async {
350                    ran = true;
351                    Span::current().record(
352                        "event_type",
353                        match event {
354                            Event::Apply(_) => "apply",
355                            Event::Cleanup(_) => "cleanup",
356                        },
357                    );
358                    match event {
359                        Event::Apply(resource) => self
360                            .apply(client, &resource, &mut metadata)
361                            .await
362                            .map(|action| action.unwrap_or_else(|| self.success_action(&resource))),
363                        Event::Cleanup(resource) => self
364                            .cleanup(client, &resource, &mut metadata)
365                            .await
366                            .map(|action| action.unwrap_or_else(Action::await_change)),
367                    }
368                })
369                .await
370                .map_err(Error::FinalizerError)
371            } else if resource.meta().deletion_timestamp.is_none() {
372                ran = true;
373                Span::current().record("event_type", "apply");
374                self.apply(client, &resource, &mut metadata)
375                    .await
376                    .map(|action| action.unwrap_or_else(|| self.success_action(&resource)))
377                    .map_err(Error::ControllerError)
378            } else {
379                Ok(Action::await_change())
380            };
381
382            // eventually we should set up metrics here, but this should
383            // help for now
384            Span::current().record("duration_seconds", start.elapsed().as_secs_f64());
385
386            if !ran {
387                Span::current().record(
388                    "event_type",
389                    if resource.meta().deletion_timestamp.is_some() {
390                        "delete"
391                    } else {
392                        "init"
393                    },
394                );
395            }
396
397            if !metadata.0.is_empty()
398                && let Ok(s) = serde_json::to_string(&metadata.0)
399            {
400                Span::current().record("metadata", s);
401            }
402
403            if let Err(e) = &res {
404                Span::current().record("success", false);
405                error!(error = %e, source = e.source(), "reconcile");
406            } else {
407                Span::current().record("success", true);
408                info!("reconcile");
409            }
410
411            res
412        }
413        .instrument(span)
414        .await
415    }
416}