1use std::collections::BTreeMap;
2use std::error::Error as _;
3use std::sync::{Arc, Mutex};
4use std::time::Duration;
5
6use futures::future::FutureExt;
7use futures::stream::StreamExt;
8use kube::api::Api;
9use kube::core::{ClusterResourceScope, NamespaceResourceScope};
10use kube::{Client, Resource, ResourceExt};
11use kube_runtime::controller::Action;
12use kube_runtime::finalizer::{Event, finalizer};
13use kube_runtime::watcher;
14use rand::{Rng, rng};
15use tracing::{Level, event};
16
17#[derive(Debug, thiserror::Error)]
18pub enum Error<E: std::error::Error + 'static> {
19 #[error("{0}")]
20 ControllerError(#[source] E),
21 #[error("{0}")]
22 FinalizerError(#[source] kube_runtime::finalizer::Error<E>),
23}
24
25pub struct Controller<Ctx: Context>
28where
29 Ctx: Send + Sync + 'static,
30 Ctx::Error: Send + Sync + 'static,
31 Ctx::Resource: Send + Sync + 'static,
32 Ctx::Resource: Clone + std::fmt::Debug + serde::Serialize,
33 for<'de> Ctx::Resource: serde::Deserialize<'de>,
34 <Ctx::Resource as Resource>::DynamicType:
35 Eq + Clone + std::hash::Hash + std::default::Default + std::fmt::Debug + std::marker::Unpin,
36{
37 client: kube::Client,
38 make_api: Box<dyn Fn(&Ctx::Resource) -> Api<Ctx::Resource> + Sync + Send + 'static>,
39 controller: kube_runtime::controller::Controller<Ctx::Resource>,
40 context: Ctx,
41}
42
43impl<Ctx: Context> Controller<Ctx>
44where
45 Ctx: Send + Sync + 'static,
46 Ctx::Error: Send + Sync + 'static,
47 Ctx::Resource: Clone + std::fmt::Debug + serde::Serialize,
48 for<'de> Ctx::Resource: serde::Deserialize<'de>,
49 <Ctx::Resource as Resource>::DynamicType:
50 Eq + Clone + std::hash::Hash + std::default::Default + std::fmt::Debug + std::marker::Unpin,
51{
52 pub fn namespaced(client: Client, context: Ctx, namespace: &str, wc: watcher::Config) -> Self
60 where
61 Ctx::Resource: Resource<Scope = NamespaceResourceScope>,
62 {
63 let make_api = {
64 let client = client.clone();
65 Box::new(move |resource: &Ctx::Resource| {
66 Api::<Ctx::Resource>::namespaced(client.clone(), &resource.namespace().unwrap())
67 })
68 };
69 let controller = kube_runtime::controller::Controller::new(
70 Api::<Ctx::Resource>::namespaced(client.clone(), namespace),
71 wc,
72 );
73 Self {
74 client,
75 make_api,
76 controller,
77 context,
78 }
79 }
80
81 pub fn namespaced_all(client: Client, context: Ctx, wc: watcher::Config) -> Self
89 where
90 Ctx::Resource: Resource<Scope = NamespaceResourceScope>,
91 {
92 let make_api = {
93 let client = client.clone();
94 Box::new(move |resource: &Ctx::Resource| {
95 Api::<Ctx::Resource>::namespaced(client.clone(), &resource.namespace().unwrap())
96 })
97 };
98 let controller = kube_runtime::controller::Controller::new(
99 Api::<Ctx::Resource>::all(client.clone()),
100 wc,
101 );
102 Self {
103 client,
104 make_api,
105 controller,
106 context,
107 }
108 }
109
110 pub fn cluster(client: Client, context: Ctx, wc: watcher::Config) -> Self
117 where
118 Ctx::Resource: Resource<Scope = ClusterResourceScope>,
119 {
120 let make_api = {
121 let client = client.clone();
122 Box::new(move |_: &Ctx::Resource| Api::<Ctx::Resource>::all(client.clone()))
123 };
124 let controller = kube_runtime::controller::Controller::new(
125 Api::<Ctx::Resource>::all(client.clone()),
126 wc,
127 );
128 Self {
129 client,
130 make_api,
131 controller,
132 context,
133 }
134 }
135
136 pub async fn run(self) {
142 let Self {
143 client,
144 make_api,
145 controller,
146 context,
147 } = self;
148 let backoffs = Arc::new(Mutex::new(BTreeMap::new()));
149 let backoffs = &backoffs;
150 controller
151 .run(
152 |resource, context| {
153 let uid = resource.uid().unwrap();
154 let backoffs = Arc::clone(backoffs);
155 context
156 ._reconcile(client.clone(), make_api(&resource), resource)
157 .inspect(move |result| {
158 if result.is_ok() {
159 backoffs.lock().unwrap().remove(&uid);
160 }
161 })
162 },
163 |resource, err, context| {
164 let consecutive_errors = {
165 let uid = resource.uid().unwrap();
166 let mut backoffs = backoffs.lock().unwrap();
167 let consecutive_errors: u32 =
168 backoffs.get(&uid).copied().unwrap_or_default();
169 backoffs.insert(uid, consecutive_errors.saturating_add(1));
170 consecutive_errors
171 };
172 context.error_action(resource, err, consecutive_errors)
173 },
174 Arc::new(context),
175 )
176 .for_each(|reconciliation_result| async move {
177 let dynamic_type = Default::default();
178 let kind = Ctx::Resource::kind(&dynamic_type).into_owned();
179 match reconciliation_result {
180 Ok(resource) => {
181 event!(
182 Level::INFO,
183 resource_name = %resource.0.name,
184 controller = Ctx::FINALIZER_NAME,
185 "{} reconciliation successful.",
186 kind
187 );
188 }
189 Err(err) => event!(
190 Level::ERROR,
191 err = %err,
192 source = err.source(),
193 controller = Ctx::FINALIZER_NAME,
194 "{} reconciliation error.",
195 kind
196 ),
197 }
198 })
199 .await
200 }
201
202 pub fn with_controller<F>(mut self, f: F) -> Self
207 where
208 F: FnOnce(
209 kube_runtime::Controller<Ctx::Resource>,
210 ) -> kube_runtime::Controller<Ctx::Resource>,
211 {
212 self.controller = f(self.controller);
213 self
214 }
215}
216
217#[cfg_attr(not(docsrs), async_trait::async_trait)]
220pub trait Context {
221 type Resource: Resource + Send + Sync + 'static;
224 type Error: std::error::Error;
227
228 const FINALIZER_NAME: Option<&'static str> = None;
235
236 async fn apply(
244 &self,
245 client: Client,
246 resource: &Self::Resource,
247 ) -> Result<Option<Action>, Self::Error>;
248
249 async fn cleanup(
259 &self,
260 client: Client,
261 resource: &Self::Resource,
262 ) -> Result<Option<Action>, Self::Error> {
263 let _client = client;
265 let _resource = resource;
266
267 Ok(Some(Action::await_change()))
268 }
269
270 fn success_action(&self, resource: &Self::Resource) -> Action {
276 let _resource = resource;
278
279 Action::requeue(Duration::from_secs(rng().random_range(2400..3600)))
280 }
281
282 fn error_action(
290 self: Arc<Self>,
291 resource: Arc<Self::Resource>,
292 err: &Error<Self::Error>,
293 consecutive_errors: u32,
294 ) -> Action {
295 let _resource = resource;
297 let _err = err;
298
299 let seconds = 2u64.pow(consecutive_errors.min(7) + 1);
300 Action::requeue(Duration::from_millis(
301 rng().random_range((seconds * 500)..(seconds * 1000)),
302 ))
303 }
304
305 #[doc(hidden)]
306 async fn _reconcile(
307 self: Arc<Self>,
308 client: Client,
309 api: Api<Self::Resource>,
310 resource: Arc<Self::Resource>,
311 ) -> Result<Action, Error<Self::Error>>
312 where
313 Self: Send + Sync + 'static,
314 Self::Error: Send + Sync + 'static,
315 Self::Resource: Send + Sync + 'static,
316 Self::Resource: Clone + std::fmt::Debug + serde::Serialize,
317 for<'de> Self::Resource: serde::Deserialize<'de>,
318 <Self::Resource as Resource>::DynamicType: Eq
319 + Clone
320 + std::hash::Hash
321 + std::default::Default
322 + std::fmt::Debug
323 + std::marker::Unpin,
324 {
325 let dynamic_type = Default::default();
326 let kind = Self::Resource::kind(&dynamic_type).into_owned();
327 let mut ran = false;
328 let res = if let Some(finalizer_name) = Self::FINALIZER_NAME {
329 finalizer(&api, finalizer_name, Arc::clone(&resource), |event| async {
330 ran = true;
331 event!(
332 Level::INFO,
333 resource_name = %resource.name_unchecked().as_str(),
334 controller = Self::FINALIZER_NAME,
335 "Reconciling {} ({}).",
336 kind,
337 match event {
338 Event::Apply(_) => "apply",
339 Event::Cleanup(_) => "cleanup",
340 }
341 );
342 let action = match event {
343 Event::Apply(resource) => {
344 let action = self.apply(client, &resource).await?;
345 if let Some(action) = action {
346 action
347 } else {
348 self.success_action(&resource)
349 }
350 }
351 Event::Cleanup(resource) => self
352 .cleanup(client, &resource)
353 .await?
354 .unwrap_or_else(Action::await_change),
355 };
356 Ok(action)
357 })
358 .await
359 .map_err(Error::FinalizerError)
360 } else {
361 ran = true;
362 event!(
363 Level::INFO,
364 resource_name = %resource.name_unchecked().as_str(),
365 "Reconciling {} (apply).",
366 kind,
367 );
368 let action = self
369 .apply(client, &resource)
370 .await
371 .map_err(Error::ControllerError)?;
372 Ok(if let Some(action) = action {
373 action
374 } else {
375 self.success_action(&resource)
376 })
377 };
378 if !ran {
379 event!(
380 Level::INFO,
381 resource_name = %resource.name_unchecked().as_str(),
382 controller = Self::FINALIZER_NAME,
383 "Reconciling {} ({}).",
384 kind,
385 if resource.meta().deletion_timestamp.is_some() {
386 "delete"
387 } else {
388 "init"
389 }
390 );
391 }
392 res
393 }
394}