1use std::collections::BTreeMap;
2use std::error::Error;
3use std::sync::{Arc, Mutex};
4use std::time::Duration;
5
6use futures::future::FutureExt;
7use futures::stream::StreamExt;
8use kube::api::Api;
9use kube::core::{ClusterResourceScope, NamespaceResourceScope};
10use kube::{Client, Resource, ResourceExt};
11use kube_runtime::controller::Action;
12use kube_runtime::finalizer::{finalizer, Event};
13use kube_runtime::watcher;
14use rand::{thread_rng, Rng};
15use tracing::{event, Level};
16
17pub struct Controller<Ctx: Context>
20where
21 Ctx: Send + Sync + 'static,
22 Ctx::Error: Send + Sync + 'static,
23 Ctx::Resource: Send + Sync + 'static,
24 Ctx::Resource: Clone + std::fmt::Debug + serde::Serialize,
25 for<'de> Ctx::Resource: serde::Deserialize<'de>,
26 <Ctx::Resource as Resource>::DynamicType:
27 Eq + Clone + std::hash::Hash + std::default::Default + std::fmt::Debug + std::marker::Unpin,
28{
29 client: kube::Client,
30 make_api: Box<dyn Fn(&Ctx::Resource) -> Api<Ctx::Resource> + Sync + Send + 'static>,
31 controller: kube_runtime::controller::Controller<Ctx::Resource>,
32 context: Ctx,
33}
34
35impl<Ctx: Context> Controller<Ctx>
36where
37 Ctx: Send + Sync + 'static,
38 Ctx::Error: Send + Sync + 'static,
39 Ctx::Resource: Send + Sync + 'static,
40 Ctx::Resource: Clone + std::fmt::Debug + serde::Serialize,
41 for<'de> Ctx::Resource: serde::Deserialize<'de>,
42 <Ctx::Resource as Resource>::DynamicType:
43 Eq + Clone + std::hash::Hash + std::default::Default + std::fmt::Debug + std::marker::Unpin,
44{
45 pub fn namespaced(client: Client, context: Ctx, namespace: &str, wc: watcher::Config) -> Self
53 where
54 Ctx::Resource: Resource<Scope = NamespaceResourceScope>,
55 {
56 let make_api = {
57 let client = client.clone();
58 Box::new(move |resource: &Ctx::Resource| {
59 Api::<Ctx::Resource>::namespaced(client.clone(), &resource.namespace().unwrap())
60 })
61 };
62 let controller = kube_runtime::controller::Controller::new(
63 Api::<Ctx::Resource>::namespaced(client.clone(), namespace),
64 wc,
65 );
66 Self {
67 client,
68 make_api,
69 controller,
70 context,
71 }
72 }
73
74 pub fn namespaced_all(client: Client, context: Ctx, wc: watcher::Config) -> Self
82 where
83 Ctx::Resource: Resource<Scope = NamespaceResourceScope>,
84 {
85 let make_api = {
86 let client = client.clone();
87 Box::new(move |resource: &Ctx::Resource| {
88 Api::<Ctx::Resource>::namespaced(client.clone(), &resource.namespace().unwrap())
89 })
90 };
91 let controller = kube_runtime::controller::Controller::new(
92 Api::<Ctx::Resource>::all(client.clone()),
93 wc,
94 );
95 Self {
96 client,
97 make_api,
98 controller,
99 context,
100 }
101 }
102
103 pub fn cluster(client: Client, context: Ctx, wc: watcher::Config) -> Self
110 where
111 Ctx::Resource: Resource<Scope = ClusterResourceScope>,
112 {
113 let make_api = {
114 let client = client.clone();
115 Box::new(move |_: &Ctx::Resource| Api::<Ctx::Resource>::all(client.clone()))
116 };
117 let controller = kube_runtime::controller::Controller::new(
118 Api::<Ctx::Resource>::all(client.clone()),
119 wc,
120 );
121 Self {
122 client,
123 make_api,
124 controller,
125 context,
126 }
127 }
128
129 pub async fn run(self) {
135 let Self {
136 client,
137 make_api,
138 controller,
139 context,
140 } = self;
141 let backoffs = Arc::new(Mutex::new(BTreeMap::new()));
142 let backoffs = &backoffs;
143 controller
144 .run(
145 |resource, context| {
146 let uid = resource.uid().unwrap();
147 let backoffs = Arc::clone(backoffs);
148 context
149 ._reconcile(client.clone(), make_api(&resource), resource)
150 .inspect(move |result| {
151 if result.is_ok() {
152 backoffs.lock().unwrap().remove(&uid);
153 }
154 })
155 },
156 |resource, err, context| {
157 let consecutive_errors = {
158 let uid = resource.uid().unwrap();
159 let mut backoffs = backoffs.lock().unwrap();
160 let consecutive_errors: u32 =
161 backoffs.get(&uid).copied().unwrap_or_default();
162 backoffs.insert(uid, consecutive_errors.saturating_add(1));
163 consecutive_errors
164 };
165 context.error_action(resource, err, consecutive_errors)
166 },
167 Arc::new(context),
168 )
169 .for_each(|reconciliation_result| async move {
170 let dynamic_type = Default::default();
171 let kind = Ctx::Resource::kind(&dynamic_type).into_owned();
172 match reconciliation_result {
173 Ok(resource) => {
174 event!(
175 Level::INFO,
176 resource_name = %resource.0.name,
177 controller = Ctx::FINALIZER_NAME,
178 "{} reconciliation successful.",
179 kind
180 );
181 }
182 Err(err) => event!(
183 Level::ERROR,
184 err = %err,
185 source = err.source(),
186 controller = Ctx::FINALIZER_NAME,
187 "{} reconciliation error.",
188 kind
189 ),
190 }
191 })
192 .await
193 }
194
195 pub fn with_concurrency(mut self, concurrency: u16) -> Self {
196 self.controller = self
197 .controller
198 .with_config(kube_runtime::Config::default().concurrency(concurrency));
199 self
200 }
201}
202
203#[cfg_attr(not(docsrs), async_trait::async_trait)]
206pub trait Context {
207 type Resource: Resource;
210 type Error: std::error::Error;
213
214 const FINALIZER_NAME: &'static str;
218
219 async fn apply(
227 &self,
228 client: Client,
229 resource: &Self::Resource,
230 ) -> Result<Option<Action>, Self::Error>;
231
232 async fn cleanup(
240 &self,
241 client: Client,
242 resource: &Self::Resource,
243 ) -> Result<Option<Action>, Self::Error>;
244
245 fn success_action(&self, resource: &Self::Resource) -> Action {
251 let _resource = resource;
253
254 Action::requeue(Duration::from_secs(thread_rng().gen_range(2400..3600)))
255 }
256
257 fn error_action(
265 self: Arc<Self>,
266 resource: Arc<Self::Resource>,
267 err: &kube_runtime::finalizer::Error<Self::Error>,
268 consecutive_errors: u32,
269 ) -> Action {
270 let _resource = resource;
272 let _err = err;
273
274 let seconds = 2u64.pow(consecutive_errors.min(7) + 1);
275 Action::requeue(Duration::from_millis(
276 thread_rng().gen_range((seconds * 500)..(seconds * 1000)),
277 ))
278 }
279
280 #[doc(hidden)]
281 async fn _reconcile(
282 self: Arc<Self>,
283 client: Client,
284 api: Api<Self::Resource>,
285 resource: Arc<Self::Resource>,
286 ) -> Result<Action, kube_runtime::finalizer::Error<Self::Error>>
287 where
288 Self: Send + Sync + 'static,
289 Self::Error: Send + Sync + 'static,
290 Self::Resource: Send + Sync + 'static,
291 Self::Resource: Clone + std::fmt::Debug + serde::Serialize,
292 for<'de> Self::Resource: serde::Deserialize<'de>,
293 <Self::Resource as Resource>::DynamicType: Eq
294 + Clone
295 + std::hash::Hash
296 + std::default::Default
297 + std::fmt::Debug
298 + std::marker::Unpin,
299 {
300 let dynamic_type = Default::default();
301 let kind = Self::Resource::kind(&dynamic_type).into_owned();
302 let mut ran = false;
303 let res = finalizer(
304 &api,
305 Self::FINALIZER_NAME,
306 Arc::clone(&resource),
307 |event| async {
308 ran = true;
309 event!(
310 Level::INFO,
311 resource_name = %resource.name_unchecked().as_str(),
312 controller = Self::FINALIZER_NAME,
313 "Reconciling {} ({}).",
314 kind,
315 match event {
316 Event::Apply(_) => "apply",
317 Event::Cleanup(_) => "cleanup",
318 }
319 );
320 let action = match event {
321 Event::Apply(resource) => {
322 let action = self.apply(client, &resource).await?;
323 if let Some(action) = action {
324 action
325 } else {
326 self.success_action(&resource)
327 }
328 }
329 Event::Cleanup(resource) => self
330 .cleanup(client, &resource)
331 .await?
332 .unwrap_or_else(Action::await_change),
333 };
334 Ok(action)
335 },
336 )
337 .await;
338 if !ran {
339 event!(
340 Level::INFO,
341 resource_name = %resource.name_unchecked().as_str(),
342 controller = Self::FINALIZER_NAME,
343 "Reconciling {} ({}).",
344 kind,
345 if resource.meta().deletion_timestamp.is_some() {
346 "delete"
347 } else {
348 "init"
349 }
350 );
351 }
352 res
353 }
354}