1use std::collections::BTreeMap;
2use std::error::Error as _;
3use std::fmt::Display;
4use std::sync::{Arc, Mutex};
5use std::time::{Duration, Instant};
6
7use futures::future::FutureExt;
8use futures::stream::StreamExt;
9use kube::api::Api;
10use kube::core::{ClusterResourceScope, NamespaceResourceScope};
11use kube::{Client, Resource, ResourceExt};
12use kube_runtime::controller::Action;
13use kube_runtime::finalizer::{Event, finalizer};
14use kube_runtime::watcher;
15use rand::{Rng, rng};
16use tracing::field::Empty;
17use tracing::{Instrument, Span, error, info, info_span, trace, warn};
18
19#[derive(Debug, thiserror::Error)]
20pub enum Error<E: std::error::Error + 'static> {
21 #[error("{0}")]
22 ControllerError(#[source] E),
23 #[error("{0}")]
24 FinalizerError(#[source] kube_runtime::finalizer::Error<E>),
25}
26
27#[derive(Debug, Default)]
28pub struct TraceMetadata(BTreeMap<String, String>);
29
30impl TraceMetadata {
31 pub fn annotate<K: Display, V: Display>(&mut self, key: K, val: V) {
32 self.0.insert(key.to_string(), val.to_string());
33 }
34}
35
36pub struct Controller<Ctx: Context>
39where
40 Ctx: Send + Sync + 'static,
41 Ctx::Error: Send + Sync + 'static,
42 Ctx::Resource: Send + Sync + 'static,
43 Ctx::Resource: Clone + std::fmt::Debug + serde::Serialize,
44 for<'de> Ctx::Resource: serde::Deserialize<'de>,
45 <Ctx::Resource as Resource>::DynamicType:
46 Eq + Clone + std::hash::Hash + std::default::Default + std::fmt::Debug + std::marker::Unpin,
47{
48 client: kube::Client,
49 make_api: Box<dyn Fn(&Ctx::Resource) -> Api<Ctx::Resource> + Sync + Send + 'static>,
50 controller: kube_runtime::controller::Controller<Ctx::Resource>,
51 context: Ctx,
52}
53
54impl<Ctx: Context> Controller<Ctx>
55where
56 Ctx: Send + Sync + 'static,
57 Ctx::Error: Send + Sync + 'static,
58 Ctx::Resource: Clone + std::fmt::Debug + serde::Serialize,
59 for<'de> Ctx::Resource: serde::Deserialize<'de>,
60 <Ctx::Resource as Resource>::DynamicType:
61 Eq + Clone + std::hash::Hash + std::default::Default + std::fmt::Debug + std::marker::Unpin,
62{
63 pub fn namespaced(client: Client, context: Ctx, namespace: &str, wc: watcher::Config) -> Self
71 where
72 Ctx::Resource: Resource<Scope = NamespaceResourceScope>,
73 {
74 let make_api = {
75 let client = client.clone();
76 Box::new(move |resource: &Ctx::Resource| {
77 Api::<Ctx::Resource>::namespaced(client.clone(), &resource.namespace().unwrap())
78 })
79 };
80 let controller = kube_runtime::controller::Controller::new(
81 Api::<Ctx::Resource>::namespaced(client.clone(), namespace),
82 wc,
83 );
84 Self {
85 client,
86 make_api,
87 controller,
88 context,
89 }
90 }
91
92 pub fn namespaced_all(client: Client, context: Ctx, wc: watcher::Config) -> Self
100 where
101 Ctx::Resource: Resource<Scope = NamespaceResourceScope>,
102 {
103 let make_api = {
104 let client = client.clone();
105 Box::new(move |resource: &Ctx::Resource| {
106 Api::<Ctx::Resource>::namespaced(client.clone(), &resource.namespace().unwrap())
107 })
108 };
109 let controller = kube_runtime::controller::Controller::new(
110 Api::<Ctx::Resource>::all(client.clone()),
111 wc,
112 );
113 Self {
114 client,
115 make_api,
116 controller,
117 context,
118 }
119 }
120
121 pub fn cluster(client: Client, context: Ctx, wc: watcher::Config) -> Self
128 where
129 Ctx::Resource: Resource<Scope = ClusterResourceScope>,
130 {
131 let make_api = {
132 let client = client.clone();
133 Box::new(move |_: &Ctx::Resource| Api::<Ctx::Resource>::all(client.clone()))
134 };
135 let controller = kube_runtime::controller::Controller::new(
136 Api::<Ctx::Resource>::all(client.clone()),
137 wc,
138 );
139 Self {
140 client,
141 make_api,
142 controller,
143 context,
144 }
145 }
146
147 pub async fn run(self) {
153 let Self {
154 client,
155 make_api,
156 controller,
157 context,
158 } = self;
159 let backoffs = Arc::new(Mutex::new(BTreeMap::new()));
160 let backoffs = &backoffs;
161 controller
162 .run(
163 |resource, context| {
164 let uid = resource.uid().unwrap();
165 let backoffs = Arc::clone(backoffs);
166 context
167 ._reconcile(client.clone(), make_api(&resource), resource)
168 .inspect(move |result| {
169 if result.is_ok() {
170 backoffs.lock().unwrap().remove(&uid);
171 }
172 })
173 },
174 |resource, err, context| {
175 let consecutive_errors = {
176 let uid = resource.uid().unwrap();
177 let mut backoffs = backoffs.lock().unwrap();
178 let consecutive_errors: u32 =
179 backoffs.get(&uid).copied().unwrap_or_default();
180 backoffs.insert(uid, consecutive_errors.saturating_add(1));
181 consecutive_errors
182 };
183 context.error_action(resource, err, consecutive_errors)
184 },
185 Arc::new(context),
186 )
187 .for_each(|res| async {
188 if let Err(e) = res
191 && !matches!(e, kube_runtime::controller::Error::ReconcilerFailed(..))
192 {
193 warn!(
196 error = %e,
197 source = e.source(),
198 "internal kube controller error",
199 );
200 }
201 })
202 .await
203 }
204
205 pub fn with_controller<F>(mut self, f: F) -> Self
210 where
211 F: FnOnce(
212 kube_runtime::Controller<Ctx::Resource>,
213 ) -> kube_runtime::Controller<Ctx::Resource>,
214 {
215 self.controller = f(self.controller);
216 self
217 }
218}
219
220#[cfg_attr(not(docsrs), async_trait::async_trait)]
223pub trait Context {
224 type Resource: Resource + Send + Sync + 'static;
227 type Error: std::error::Error;
230
231 const FINALIZER_NAME: Option<&'static str> = None;
238
239 async fn apply(
247 &self,
248 client: Client,
249 resource: &Self::Resource,
250 metadata: &mut TraceMetadata,
251 ) -> Result<Option<Action>, Self::Error>;
252
253 async fn cleanup(
263 &self,
264 client: Client,
265 resource: &Self::Resource,
266 metadata: &mut TraceMetadata,
267 ) -> Result<Option<Action>, Self::Error> {
268 let _client = client;
270 let _resource = resource;
271 let _metadata = metadata;
272
273 Ok(Some(Action::await_change()))
274 }
275
276 fn success_action(&self, resource: &Self::Resource) -> Action {
282 let _resource = resource;
284
285 Action::requeue(Duration::from_secs(rng().random_range(2400..3600)))
286 }
287
288 fn error_action(
296 self: Arc<Self>,
297 resource: Arc<Self::Resource>,
298 err: &Error<Self::Error>,
299 consecutive_errors: u32,
300 ) -> Action {
301 let _resource = resource;
303 let _err = err;
304
305 let seconds = 2u64.pow(consecutive_errors.min(7) + 1);
306 Action::requeue(Duration::from_millis(
307 rng().random_range((seconds * 500)..(seconds * 1000)),
308 ))
309 }
310
311 #[doc(hidden)]
312 async fn _reconcile(
313 self: Arc<Self>,
314 client: Client,
315 api: Api<Self::Resource>,
316 resource: Arc<Self::Resource>,
317 ) -> Result<Action, Error<Self::Error>>
318 where
319 Self: Send + Sync + 'static,
320 Self::Error: Send + Sync + 'static,
321 Self::Resource: Send + Sync + 'static,
322 Self::Resource: Clone + std::fmt::Debug + serde::Serialize,
323 for<'de> Self::Resource: serde::Deserialize<'de>,
324 <Self::Resource as Resource>::DynamicType: Eq
325 + Clone
326 + std::hash::Hash
327 + std::default::Default
328 + std::fmt::Debug
329 + std::marker::Unpin,
330 {
331 let span = info_span!(
332 "reconcile",
333 resource_type = Self::Resource::kind(&Default::default()).as_ref(),
334 resource_name = resource.name_unchecked().as_str(),
335 controller = Self::FINALIZER_NAME,
336 event_type = Empty,
337 success = Empty,
338 duration_seconds = Empty,
339 metadata = Empty,
340 );
341 async {
342 trace!("beginning reconciliation");
343
344 let mut metadata = TraceMetadata::default();
345 let mut ran = false;
346 let start = Instant::now();
347
348 let res = if let Some(finalizer_name) = Self::FINALIZER_NAME {
349 finalizer(&api, finalizer_name, Arc::clone(&resource), |event| async {
350 ran = true;
351 Span::current().record(
352 "event_type",
353 match event {
354 Event::Apply(_) => "apply",
355 Event::Cleanup(_) => "cleanup",
356 },
357 );
358 match event {
359 Event::Apply(resource) => self
360 .apply(client, &resource, &mut metadata)
361 .await
362 .map(|action| action.unwrap_or_else(|| self.success_action(&resource))),
363 Event::Cleanup(resource) => self
364 .cleanup(client, &resource, &mut metadata)
365 .await
366 .map(|action| action.unwrap_or_else(Action::await_change)),
367 }
368 })
369 .await
370 .map_err(Error::FinalizerError)
371 } else if resource.meta().deletion_timestamp.is_none() {
372 ran = true;
373 Span::current().record("event_type", "apply");
374 self.apply(client, &resource, &mut metadata)
375 .await
376 .map(|action| action.unwrap_or_else(|| self.success_action(&resource)))
377 .map_err(Error::ControllerError)
378 } else {
379 Ok(Action::await_change())
380 };
381
382 Span::current().record("duration_seconds", start.elapsed().as_secs_f64());
385
386 if !ran {
387 Span::current().record(
388 "event_type",
389 if resource.meta().deletion_timestamp.is_some() {
390 "delete"
391 } else {
392 "init"
393 },
394 );
395 }
396
397 if !metadata.0.is_empty()
398 && let Ok(s) = serde_json::to_string(&metadata.0)
399 {
400 Span::current().record("metadata", s);
401 }
402
403 if let Err(e) = &res {
404 Span::current().record("success", false);
405 error!(error = %e, source = e.source(), "reconcile");
406 } else {
407 Span::current().record("success", true);
408 info!("reconcile");
409 }
410
411 res
412 }
413 .instrument(span)
414 .await
415 }
416}