kube_runtime/controller/mod.rs
1//! Runs a user-supplied reconciler function on objects when they (or related objects) are updated
2
3use self::runner::Runner;
4use crate::{
5 reflector::{
6 self, reflector,
7 store::{Store, Writer},
8 ObjectRef,
9 },
10 scheduler::{debounced_scheduler, ScheduleRequest},
11 utils::{trystream_try_via, CancelableJoinHandle, KubeRuntimeStreamExt, StreamBackoff, WatchStreamExt},
12 watcher::{self, metadata_watcher, watcher, DefaultBackoff},
13};
14use backoff::backoff::Backoff;
15use educe::Educe;
16use futures::{
17 channel,
18 future::{self, BoxFuture},
19 stream, FutureExt, Stream, StreamExt, TryFuture, TryFutureExt, TryStream, TryStreamExt,
20};
21use kube_client::api::{Api, DynamicObject, Resource};
22use pin_project::pin_project;
23use serde::de::DeserializeOwned;
24use std::{
25 fmt::{Debug, Display},
26 future::Future,
27 hash::Hash,
28 sync::Arc,
29 task::{ready, Poll},
30 time::Duration,
31};
32use stream::BoxStream;
33use thiserror::Error;
34use tokio::{runtime::Handle, time::Instant};
35use tracing::{info_span, Instrument};
36
37mod future_hash_map;
38mod runner;
39
40pub type RunnerError = runner::Error<reflector::store::WriterDropped>;
41
42#[derive(Debug, Error)]
43pub enum Error<ReconcilerErr: 'static, QueueErr: 'static> {
44 #[error("tried to reconcile object {0} that was not found in local store")]
45 ObjectNotFound(ObjectRef<DynamicObject>),
46 #[error("reconciler for object {1} failed")]
47 ReconcilerFailed(#[source] ReconcilerErr, ObjectRef<DynamicObject>),
48 #[error("event queue error")]
49 QueueError(#[source] QueueErr),
50 #[error("runner error")]
51 RunnerError(#[source] RunnerError),
52}
53
54/// Results of the reconciliation attempt
55#[derive(Debug, Clone, Eq, PartialEq)]
56pub struct Action {
57 /// Whether (and when) to next trigger the reconciliation if no external watch triggers hit
58 ///
59 /// For example, use this to query external systems for updates, expire time-limited resources, or
60 /// (in your `error_policy`) retry after errors.
61 requeue_after: Option<Duration>,
62}
63
64impl Action {
65 /// Action to the reconciliation at this time even if no external watch triggers hit
66 ///
67 /// This is the best-practice action that ensures eventual consistency of your controller
68 /// even in the case of missed changes (which can happen).
69 ///
70 /// Watch events are not normally missed, so running this once per hour (`Default`) as a fallback is reasonable.
71 #[must_use]
72 pub fn requeue(duration: Duration) -> Self {
73 Self {
74 requeue_after: Some(duration),
75 }
76 }
77
78 /// Do nothing until a change is detected
79 ///
80 /// This stops the controller periodically reconciling this object until a relevant watch event
81 /// was **detected**.
82 ///
83 /// **Warning**: If you have watch desyncs, it is possible to miss changes entirely.
84 /// It is therefore not recommended to disable requeuing this way, unless you have
85 /// frequent changes to the underlying object, or some other hook to retain eventual consistency.
86 #[must_use]
87 pub fn await_change() -> Self {
88 Self { requeue_after: None }
89 }
90}
91
92/// Helper for building custom trigger filters, see the implementations of [`trigger_self`] and [`trigger_owners`] for some examples.
93pub fn trigger_with<T, K, I, S>(
94 stream: S,
95 mapper: impl Fn(T) -> I,
96) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
97where
98 S: TryStream<Ok = T>,
99 I: IntoIterator,
100 I::Item: Into<ReconcileRequest<K>>,
101 K: Resource,
102{
103 stream
104 .map_ok(move |obj| stream::iter(mapper(obj).into_iter().map(Into::into).map(Ok)))
105 .try_flatten()
106}
107
108/// Enqueues the object itself for reconciliation
109pub fn trigger_self<K, S>(
110 stream: S,
111 dyntype: K::DynamicType,
112) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
113where
114 S: TryStream<Ok = K>,
115 K: Resource,
116 K::DynamicType: Clone,
117{
118 trigger_with(stream, move |obj| {
119 Some(ReconcileRequest {
120 obj_ref: ObjectRef::from_obj_with(&obj, dyntype.clone()),
121 reason: ReconcileReason::ObjectUpdated,
122 })
123 })
124}
125
126/// Enqueues the object itself for reconciliation when the object is behind a
127/// shared pointer
128#[cfg(feature = "unstable-runtime-subscribe")]
129fn trigger_self_shared<K, S>(
130 stream: S,
131 dyntype: K::DynamicType,
132) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
133where
134 // Input stream has item as some Arc'd Resource (via
135 // Controller::for_shared_stream)
136 S: TryStream<Ok = Arc<K>>,
137 K: Resource,
138 K::DynamicType: Clone,
139{
140 trigger_with(stream, move |obj| {
141 Some(ReconcileRequest {
142 obj_ref: ObjectRef::from_obj_with(obj.as_ref(), dyntype.clone()),
143 reason: ReconcileReason::ObjectUpdated,
144 })
145 })
146}
147
148/// Enqueues any mapper returned `K` types for reconciliation
149fn trigger_others<S, K, I>(
150 stream: S,
151 mapper: impl Fn(S::Ok) -> I + Sync + Send + 'static,
152 dyntype: <S::Ok as Resource>::DynamicType,
153) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
154where
155 // Input stream has items as some Resource (via Controller::watches)
156 S: TryStream,
157 S::Ok: Resource,
158 <S::Ok as Resource>::DynamicType: Clone,
159 // Output stream is requests for the root type K
160 K: Resource,
161 K::DynamicType: Clone,
162 // but the mapper can produce many of them
163 I: 'static + IntoIterator<Item = ObjectRef<K>>,
164 I::IntoIter: Send,
165{
166 trigger_with(stream, move |obj| {
167 let watch_ref = ObjectRef::from_obj_with(&obj, dyntype.clone()).erase();
168 mapper(obj)
169 .into_iter()
170 .map(move |mapped_obj_ref| ReconcileRequest {
171 obj_ref: mapped_obj_ref,
172 reason: ReconcileReason::RelatedObjectUpdated {
173 obj_ref: Box::new(watch_ref.clone()),
174 },
175 })
176 })
177}
178
179/// Enqueues any mapper returned `Arc<K>` types for reconciliation
180#[cfg(feature = "unstable-runtime-subscribe")]
181fn trigger_others_shared<S, O, K, I>(
182 stream: S,
183 mapper: impl Fn(S::Ok) -> I + Sync + Send + 'static,
184 dyntype: O::DynamicType,
185) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
186where
187 // Input is some shared resource (`Arc<O>`) obtained via `reflect`
188 S: TryStream<Ok = Arc<O>>,
189 O: Resource,
190 O::DynamicType: Clone,
191 // Output stream is requests for the root type K
192 K: Resource,
193 K::DynamicType: Clone,
194 // but the mapper can produce many of them
195 I: 'static + IntoIterator<Item = ObjectRef<K>>,
196 I::IntoIter: Send,
197{
198 trigger_with(stream, move |obj| {
199 let watch_ref = ObjectRef::from_obj_with(obj.as_ref(), dyntype.clone()).erase();
200 mapper(obj)
201 .into_iter()
202 .map(move |mapped_obj_ref| ReconcileRequest {
203 obj_ref: mapped_obj_ref,
204 reason: ReconcileReason::RelatedObjectUpdated {
205 obj_ref: Box::new(watch_ref.clone()),
206 },
207 })
208 })
209}
210
211/// Enqueues any owners of type `KOwner` for reconciliation
212pub fn trigger_owners<KOwner, S>(
213 stream: S,
214 owner_type: KOwner::DynamicType,
215 child_type: <S::Ok as Resource>::DynamicType,
216) -> impl Stream<Item = Result<ReconcileRequest<KOwner>, S::Error>>
217where
218 S: TryStream,
219 S::Ok: Resource,
220 <S::Ok as Resource>::DynamicType: Clone,
221 KOwner: Resource,
222 KOwner::DynamicType: Clone,
223{
224 let mapper = move |obj: S::Ok| {
225 let meta = obj.meta().clone();
226 let ns = meta.namespace;
227 let owner_type = owner_type.clone();
228 meta.owner_references
229 .into_iter()
230 .flatten()
231 .filter_map(move |owner| ObjectRef::from_owner_ref(ns.as_deref(), &owner, owner_type.clone()))
232 };
233 trigger_others(stream, mapper, child_type)
234}
235
236// TODO: do we really need to deal with a trystream? can we simplify this at
237// all?
238/// Enqueues any owners of type `KOwner` for reconciliation based on a stream of
239/// owned `K` objects
240#[cfg(feature = "unstable-runtime-subscribe")]
241fn trigger_owners_shared<KOwner, S, K>(
242 stream: S,
243 owner_type: KOwner::DynamicType,
244 child_type: K::DynamicType,
245) -> impl Stream<Item = Result<ReconcileRequest<KOwner>, S::Error>>
246where
247 S: TryStream<Ok = Arc<K>>,
248 K: Resource,
249 K::DynamicType: Clone,
250 KOwner: Resource,
251 KOwner::DynamicType: Clone,
252{
253 let mapper = move |obj: S::Ok| {
254 let meta = obj.meta().clone();
255 let ns = meta.namespace;
256 let owner_type = owner_type.clone();
257 meta.owner_references
258 .into_iter()
259 .flatten()
260 .filter_map(move |owner| ObjectRef::from_owner_ref(ns.as_deref(), &owner, owner_type.clone()))
261 };
262 trigger_others_shared(stream, mapper, child_type)
263}
264
265/// A request to reconcile an object, annotated with why that request was made.
266///
267/// NOTE: The reason is ignored for comparison purposes. This means that, for example,
268/// an object can only occupy one scheduler slot, even if it has been scheduled for multiple reasons.
269/// In this case, only *the first* reason is stored.
270#[derive(Educe)]
271#[educe(
272 Debug(bound("K::DynamicType: Debug")),
273 Clone(bound("K::DynamicType: Clone")),
274 PartialEq(bound("K::DynamicType: PartialEq")),
275 Hash(bound("K::DynamicType: Hash"))
276)]
277pub struct ReconcileRequest<K: Resource> {
278 pub obj_ref: ObjectRef<K>,
279 #[educe(PartialEq(ignore), Hash(ignore))]
280 pub reason: ReconcileReason,
281}
282
283impl<K: Resource> Eq for ReconcileRequest<K> where K::DynamicType: Eq {}
284
285impl<K: Resource> From<ObjectRef<K>> for ReconcileRequest<K> {
286 fn from(obj_ref: ObjectRef<K>) -> Self {
287 ReconcileRequest {
288 obj_ref,
289 reason: ReconcileReason::Unknown,
290 }
291 }
292}
293
294#[derive(Debug, Clone)]
295pub enum ReconcileReason {
296 Unknown,
297 ObjectUpdated,
298 RelatedObjectUpdated { obj_ref: Box<ObjectRef<DynamicObject>> },
299 ReconcilerRequestedRetry,
300 ErrorPolicyRequestedRetry,
301 BulkReconcile,
302 Custom { reason: String },
303}
304
305impl Display for ReconcileReason {
306 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
307 match self {
308 ReconcileReason::Unknown => f.write_str("unknown"),
309 ReconcileReason::ObjectUpdated => f.write_str("object updated"),
310 ReconcileReason::RelatedObjectUpdated { obj_ref: object } => {
311 f.write_fmt(format_args!("related object updated: {object}"))
312 }
313 ReconcileReason::BulkReconcile => f.write_str("bulk reconcile requested"),
314 ReconcileReason::ReconcilerRequestedRetry => f.write_str("reconciler requested retry"),
315 ReconcileReason::ErrorPolicyRequestedRetry => f.write_str("error policy requested retry"),
316 ReconcileReason::Custom { reason } => f.write_str(reason),
317 }
318 }
319}
320
321const APPLIER_REQUEUE_BUF_SIZE: usize = 100;
322
323/// Apply a reconciler to an input stream, with a given retry policy
324///
325/// Takes a `store` parameter for the core objects, which should usually be updated by a [`reflector()`].
326///
327/// The `queue` indicates which objects should be reconciled. For the core objects this will usually be
328/// the [`reflector()`] (piped through [`trigger_self`]). If your core objects own any subobjects then you
329/// can also make them trigger reconciliations by [merging](`futures::stream::select`) the [`reflector()`]
330/// with a [`watcher()`] or [`reflector()`] for the subobject.
331///
332/// This is the "hard-mode" version of [`Controller`], which allows you some more customization
333/// (such as triggering from arbitrary [`Stream`]s), at the cost of being a bit more verbose.
334#[allow(clippy::needless_pass_by_value)]
335#[allow(clippy::type_complexity)]
336pub fn applier<K, QueueStream, ReconcilerFut, Ctx>(
337 mut reconciler: impl FnMut(Arc<K>, Arc<Ctx>) -> ReconcilerFut,
338 error_policy: impl Fn(Arc<K>, &ReconcilerFut::Error, Arc<Ctx>) -> Action,
339 context: Arc<Ctx>,
340 store: Store<K>,
341 queue: QueueStream,
342 config: Config,
343) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, QueueStream::Error>>>
344where
345 K: Clone + Resource + 'static,
346 K::DynamicType: Debug + Eq + Hash + Clone + Unpin,
347 ReconcilerFut: TryFuture<Ok = Action> + Unpin,
348 ReconcilerFut::Error: std::error::Error + 'static,
349 QueueStream: TryStream,
350 QueueStream::Ok: Into<ReconcileRequest<K>>,
351 QueueStream::Error: std::error::Error + 'static,
352{
353 let (scheduler_shutdown_tx, scheduler_shutdown_rx) = channel::oneshot::channel();
354 let (scheduler_tx, scheduler_rx) =
355 channel::mpsc::channel::<ScheduleRequest<ReconcileRequest<K>>>(APPLIER_REQUEUE_BUF_SIZE);
356 let error_policy = Arc::new(error_policy);
357 let delay_store = store.clone();
358 // Create a stream of ObjectRefs that need to be reconciled
359 trystream_try_via(
360 // input: stream combining scheduled tasks and user specified inputs event
361 Box::pin(stream::select(
362 // 1. inputs from users queue stream
363 queue
364 .map_err(Error::QueueError)
365 .map_ok(|request| ScheduleRequest {
366 message: request.into(),
367 run_at: Instant::now(),
368 })
369 .on_complete(async move {
370 // On error: scheduler has already been shut down and there is nothing for us to do
371 let _ = scheduler_shutdown_tx.send(());
372 tracing::debug!("applier queue terminated, starting graceful shutdown")
373 }),
374 // 2. requests sent to scheduler_tx
375 scheduler_rx
376 .map(Ok)
377 .take_until(scheduler_shutdown_rx)
378 .on_complete(async { tracing::debug!("applier scheduler consumer terminated") }),
379 )),
380 // all the Oks from the select gets passed through the scheduler stream, and are then executed
381 move |s| {
382 Runner::new(
383 debounced_scheduler(s, config.debounce),
384 config.concurrency,
385 move |request| {
386 let request = request.clone();
387 match store.get(&request.obj_ref) {
388 Some(obj) => {
389 let scheduler_tx = scheduler_tx.clone();
390 let error_policy_ctx = context.clone();
391 let error_policy = error_policy.clone();
392 let reconciler_span = info_span!(
393 "reconciling object",
394 "object.ref" = %request.obj_ref,
395 object.reason = %request.reason
396 );
397 reconciler_span
398 .in_scope(|| reconciler(Arc::clone(&obj), context.clone()))
399 .into_future()
400 .then(move |res| {
401 let error_policy = error_policy;
402 RescheduleReconciliation::new(
403 res,
404 |err| error_policy(obj, err, error_policy_ctx),
405 request.obj_ref.clone(),
406 scheduler_tx,
407 )
408 // Reconciler errors are OK from the applier's PoV, we need to apply the error policy
409 // to them separately
410 .map(|res| Ok((request.obj_ref, res)))
411 })
412 .instrument(reconciler_span)
413 .left_future()
414 }
415 None => std::future::ready(Err(Error::ObjectNotFound(request.obj_ref.erase())))
416 .right_future(),
417 }
418 },
419 )
420 .delay_tasks_until(async move {
421 tracing::debug!("applier runner held until store is ready");
422 let res = delay_store.wait_until_ready().await;
423 tracing::debug!("store is ready, starting runner");
424 res
425 })
426 .map(|runner_res| runner_res.unwrap_or_else(|err| Err(Error::RunnerError(err))))
427 .on_complete(async { tracing::debug!("applier runner terminated") })
428 },
429 )
430 .on_complete(async { tracing::debug!("applier runner-merge terminated") })
431 // finally, for each completed reconcile call:
432 .and_then(move |(obj_ref, reconciler_result)| async move {
433 match reconciler_result {
434 Ok(action) => Ok((obj_ref, action)),
435 Err(err) => Err(Error::ReconcilerFailed(err, obj_ref.erase())),
436 }
437 })
438 .on_complete(async { tracing::debug!("applier terminated") })
439}
440
441/// Internal helper [`Future`] that reschedules reconciliation of objects (if required), in the scheduled context of the reconciler
442///
443/// This could be an `async fn`, but isn't because we want it to be [`Unpin`]
444#[pin_project]
445#[must_use]
446struct RescheduleReconciliation<K: Resource, ReconcilerErr> {
447 reschedule_tx: channel::mpsc::Sender<ScheduleRequest<ReconcileRequest<K>>>,
448
449 reschedule_request: Option<ScheduleRequest<ReconcileRequest<K>>>,
450 result: Option<Result<Action, ReconcilerErr>>,
451}
452
453impl<K, ReconcilerErr> RescheduleReconciliation<K, ReconcilerErr>
454where
455 K: Resource,
456{
457 fn new(
458 result: Result<Action, ReconcilerErr>,
459 error_policy: impl FnOnce(&ReconcilerErr) -> Action,
460 obj_ref: ObjectRef<K>,
461 reschedule_tx: channel::mpsc::Sender<ScheduleRequest<ReconcileRequest<K>>>,
462 ) -> Self {
463 let reconciler_finished_at = Instant::now();
464
465 let (action, reschedule_reason) = result.as_ref().map_or_else(
466 |err| (error_policy(err), ReconcileReason::ErrorPolicyRequestedRetry),
467 |action| (action.clone(), ReconcileReason::ReconcilerRequestedRetry),
468 );
469
470 Self {
471 reschedule_tx,
472 reschedule_request: action.requeue_after.map(|requeue_after| ScheduleRequest {
473 message: ReconcileRequest {
474 obj_ref,
475 reason: reschedule_reason,
476 },
477 run_at: reconciler_finished_at
478 .checked_add(requeue_after)
479 .unwrap_or_else(crate::scheduler::far_future),
480 }),
481 result: Some(result),
482 }
483 }
484}
485
486impl<K, ReconcilerErr> Future for RescheduleReconciliation<K, ReconcilerErr>
487where
488 K: Resource,
489{
490 type Output = Result<Action, ReconcilerErr>;
491
492 fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
493 let this = self.get_mut();
494
495 if this.reschedule_request.is_some() {
496 let rescheduler_ready = ready!(this.reschedule_tx.poll_ready(cx));
497 let reschedule_request = this
498 .reschedule_request
499 .take()
500 .expect("PostReconciler::reschedule_request was taken during processing");
501 // Failure to schedule item = in graceful shutdown mode, ignore
502 if let Ok(()) = rescheduler_ready {
503 let _ = this.reschedule_tx.start_send(reschedule_request);
504 }
505 }
506
507 Poll::Ready(
508 this.result
509 .take()
510 .expect("PostReconciler::result was already taken"),
511 )
512 }
513}
514
515/// Accumulates all options that can be used on a [`Controller`] invocation.
516#[derive(Clone, Debug, Default)]
517pub struct Config {
518 debounce: Duration,
519 concurrency: u16,
520}
521
522impl Config {
523 /// The debounce duration used to deduplicate reconciliation requests.
524 ///
525 /// When set to a non-zero duration, debouncing is enabled in the [`scheduler`](crate::scheduler())
526 /// resulting in __trailing edge debouncing__ of reconciler requests.
527 /// This option can help to reduce the amount of unnecessary reconciler calls
528 /// when using multiple controller relations, or during rapid phase transitions.
529 ///
530 /// ## Warning
531 /// This option delays (and keeps delaying) reconcile requests for objects while
532 /// the object is updated. It can **permanently hide** updates from your reconciler
533 /// if set too high on objects that are updated frequently (like nodes).
534 #[must_use]
535 pub fn debounce(mut self, debounce: Duration) -> Self {
536 self.debounce = debounce;
537 self
538 }
539
540 /// The number of concurrent reconciliations of that are allowed to run at an given moment.
541 ///
542 /// This can be adjusted to the controller's needs to increase
543 /// performance and/or make performance predictable. By default, its 0 meaning
544 /// the controller runs with unbounded concurrency.
545 ///
546 /// Note that despite concurrency, a controller never schedules concurrent reconciles
547 /// on the same object.
548 #[must_use]
549 pub fn concurrency(mut self, concurrency: u16) -> Self {
550 self.concurrency = concurrency;
551 self
552 }
553}
554
555/// Controller for a Resource `K`
556///
557/// A controller is an infinite stream of objects to be reconciled.
558///
559/// Once `run` and continuously awaited, it continuously calls out to user provided
560/// `reconcile` and `error_policy` callbacks whenever relevant changes are detected
561/// or if errors are seen from `reconcile`.
562///
563/// Reconciles are generally requested for all changes on your root objects.
564/// Changes to managed child resources will also trigger the reconciler for the
565/// managing object by traversing owner references (for `Controller::owns`),
566/// or traverse a custom mapping (for `Controller::watches`).
567///
568/// This mapping mechanism ultimately hides the reason for the reconciliation request,
569/// and forces you to write an idempotent reconciler.
570///
571/// General setup:
572/// ```no_run
573/// use kube::{Api, Client, CustomResource};
574/// use kube::runtime::{controller::{Controller, Action}, watcher};
575/// # use serde::{Deserialize, Serialize};
576/// # use tokio::time::Duration;
577/// use futures::StreamExt;
578/// use k8s_openapi::api::core::v1::ConfigMap;
579/// use schemars::JsonSchema;
580/// # use std::sync::Arc;
581/// use thiserror::Error;
582///
583/// #[derive(Debug, Error)]
584/// enum Error {}
585///
586/// /// A custom resource
587/// #[derive(CustomResource, Debug, Clone, Deserialize, Serialize, JsonSchema)]
588/// #[kube(group = "nullable.se", version = "v1", kind = "ConfigMapGenerator", namespaced)]
589/// struct ConfigMapGeneratorSpec {
590/// content: String,
591/// }
592///
593/// /// The reconciler that will be called when either object change
594/// async fn reconcile(g: Arc<ConfigMapGenerator>, _ctx: Arc<()>) -> Result<Action, Error> {
595/// // .. use api here to reconcile a child ConfigMap with ownerreferences
596/// // see configmapgen_controller example for full info
597/// Ok(Action::requeue(Duration::from_secs(300)))
598/// }
599/// /// an error handler that will be called when the reconciler fails with access to both the
600/// /// object that caused the failure and the actual error
601/// fn error_policy(obj: Arc<ConfigMapGenerator>, _error: &Error, _ctx: Arc<()>) -> Action {
602/// Action::requeue(Duration::from_secs(60))
603/// }
604///
605/// /// something to drive the controller
606/// #[tokio::main]
607/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
608/// let client = Client::try_default().await?;
609/// let context = Arc::new(()); // bad empty context - put client in here
610/// let cmgs = Api::<ConfigMapGenerator>::all(client.clone());
611/// let cms = Api::<ConfigMap>::all(client.clone());
612/// Controller::new(cmgs, watcher::Config::default())
613/// .owns(cms, watcher::Config::default())
614/// .run(reconcile, error_policy, context)
615/// .for_each(|res| async move {
616/// match res {
617/// Ok(o) => println!("reconciled {:?}", o),
618/// Err(e) => println!("reconcile failed: {:?}", e),
619/// }
620/// })
621/// .await; // controller does nothing unless polled
622/// Ok(())
623/// }
624/// ```
625pub struct Controller<K>
626where
627 K: Clone + Resource + Debug + 'static,
628 K::DynamicType: Eq + Hash,
629{
630 // NB: Need to Unpin for stream::select_all
631 trigger_selector: stream::SelectAll<BoxStream<'static, Result<ReconcileRequest<K>, watcher::Error>>>,
632 trigger_backoff: Box<dyn Backoff + Send>,
633 /// [`run`](crate::Controller::run) starts a graceful shutdown when any of these [`Future`]s complete,
634 /// refusing to start any new reconciliations but letting any existing ones finish.
635 graceful_shutdown_selector: Vec<BoxFuture<'static, ()>>,
636 /// [`run`](crate::Controller::run) terminates immediately when any of these [`Future`]s complete,
637 /// requesting that all running reconciliations be aborted.
638 /// However, note that they *will* keep running until their next yield point (`.await`),
639 /// blocking [`tokio::runtime::Runtime`] destruction (unless you follow up by calling [`std::process::exit`] after `run`).
640 forceful_shutdown_selector: Vec<BoxFuture<'static, ()>>,
641 dyntype: K::DynamicType,
642 reader: Store<K>,
643 config: Config,
644}
645
646impl<K> Controller<K>
647where
648 K: Clone + Resource + DeserializeOwned + Debug + Send + Sync + 'static,
649 K::DynamicType: Eq + Hash + Clone,
650{
651 /// Create a Controller for a resource `K`
652 ///
653 /// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `K`.
654 ///
655 /// The [`watcher::Config`] controls to the possible subset of objects of `K` that you want to manage
656 /// and receive reconcile events for.
657 /// For the full set of objects `K` in the given `Api` scope, you can use [`watcher::Config::default`].
658 #[must_use]
659 pub fn new(main_api: Api<K>, wc: watcher::Config) -> Self
660 where
661 K::DynamicType: Default,
662 {
663 Self::new_with(main_api, wc, Default::default())
664 }
665
666 /// Create a Controller for a resource `K`
667 ///
668 /// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `K`.
669 ///
670 /// The [`watcher::Config`] lets you define a possible subset of objects of `K` that you want the [`Api`]
671 /// to watch - in the Api's configured scope - and receive reconcile events for.
672 /// For the full set of objects `K` in the given `Api` scope, you can use [`Config::default`].
673 ///
674 /// This variant constructor is for [`dynamic`] types found through discovery. Prefer [`Controller::new`] for static types.
675 ///
676 /// [`watcher::Config`]: crate::watcher::Config
677 /// [`Api`]: kube_client::Api
678 /// [`dynamic`]: kube_client::core::dynamic
679 /// [`Config::default`]: crate::watcher::Config::default
680 pub fn new_with(main_api: Api<K>, wc: watcher::Config, dyntype: K::DynamicType) -> Self {
681 let writer = Writer::<K>::new(dyntype.clone());
682 let reader = writer.as_reader();
683 let mut trigger_selector = stream::SelectAll::new();
684 let self_watcher = trigger_self(
685 reflector(writer, watcher(main_api, wc)).applied_objects(),
686 dyntype.clone(),
687 )
688 .boxed();
689 trigger_selector.push(self_watcher);
690 Self {
691 trigger_selector,
692 trigger_backoff: Box::<DefaultBackoff>::default(),
693 graceful_shutdown_selector: vec![
694 // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
695 future::pending().boxed(),
696 ],
697 forceful_shutdown_selector: vec![
698 // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
699 future::pending().boxed(),
700 ],
701 dyntype,
702 reader,
703 config: Default::default(),
704 }
705 }
706
707 /// Create a Controller for a resource `K` from a stream of `K` objects
708 ///
709 /// Same as [`Controller::new`], but instead of an `Api`, a stream of resources is used.
710 /// This allows for customized and pre-filtered watch streams to be used as a trigger,
711 /// as well as sharing input streams between multiple controllers.
712 ///
713 /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature.
714 ///
715 /// # Example:
716 ///
717 /// ```no_run
718 /// # use futures::StreamExt;
719 /// # use k8s_openapi::api::apps::v1::Deployment;
720 /// # use kube::runtime::controller::{Action, Controller};
721 /// # use kube::runtime::{predicates, watcher, reflector, WatchStreamExt};
722 /// # use kube::{Api, Client, Error, ResourceExt};
723 /// # use std::sync::Arc;
724 /// # async fn reconcile(_: Arc<Deployment>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
725 /// # fn error_policy(_: Arc<Deployment>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
726 /// # async fn doc(client: kube::Client) {
727 /// let api: Api<Deployment> = Api::default_namespaced(client);
728 /// let (reader, writer) = reflector::store();
729 /// let deploys = watcher(api, watcher::Config::default())
730 /// .default_backoff()
731 /// .reflect(writer)
732 /// .applied_objects()
733 /// .predicate_filter(predicates::generation);
734 ///
735 /// Controller::for_stream(deploys, reader)
736 /// .run(reconcile, error_policy, Arc::new(()))
737 /// .for_each(|_| std::future::ready(()))
738 /// .await;
739 /// # }
740 /// ```
741 ///
742 /// Prefer [`Controller::new`] if you do not need to share the stream, or do not need pre-filtering.
743 #[cfg(feature = "unstable-runtime-stream-control")]
744 pub fn for_stream(
745 trigger: impl Stream<Item = Result<K, watcher::Error>> + Send + 'static,
746 reader: Store<K>,
747 ) -> Self
748 where
749 K::DynamicType: Default,
750 {
751 Self::for_stream_with(trigger, reader, Default::default())
752 }
753
754 /// Create a Controller for a resource `K` from a stream of `K` objects
755 ///
756 /// Same as [`Controller::new`], but instead of an `Api`, a stream of resources is used.
757 /// This allows for customized and pre-filtered watch streams to be used as a trigger,
758 /// as well as sharing input streams between multiple controllers.
759 ///
760 /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature.
761 ///
762 /// Prefer [`Controller::new`] if you do not need to share the stream, or do not need pre-filtering.
763 ///
764 /// This variant constructor is for [`dynamic`] types found through discovery. Prefer [`Controller::for_stream`] for static types.
765 ///
766 /// [`dynamic`]: kube_client::core::dynamic
767 #[cfg(feature = "unstable-runtime-stream-control")]
768 pub fn for_stream_with(
769 trigger: impl Stream<Item = Result<K, watcher::Error>> + Send + 'static,
770 reader: Store<K>,
771 dyntype: K::DynamicType,
772 ) -> Self {
773 let mut trigger_selector = stream::SelectAll::new();
774 let self_watcher = trigger_self(trigger, dyntype.clone()).boxed();
775 trigger_selector.push(self_watcher);
776 Self {
777 trigger_selector,
778 trigger_backoff: Box::<DefaultBackoff>::default(),
779 graceful_shutdown_selector: vec![
780 // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
781 future::pending().boxed(),
782 ],
783 forceful_shutdown_selector: vec![
784 // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
785 future::pending().boxed(),
786 ],
787 dyntype,
788 reader,
789 config: Default::default(),
790 }
791 }
792
793 /// This is the same as [`Controller::for_stream`]. Instead of taking an
794 /// `Api` (e.g. [`Controller::new`]), a stream of resources is used. Shared
795 /// streams can be created out-of-band by subscribing on a store `Writer`.
796 /// Through this interface, multiple controllers can use the same root
797 /// (shared) input stream of resources to keep memory overheads smaller.
798 ///
799 /// **N.B**: This constructor requires an
800 /// [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21)
801 /// feature.
802 ///
803 /// Prefer [`Controller::new`] or [`Controller::for_stream`] if you do not
804 /// need to share the stream.
805 ///
806 /// ## Warning:
807 ///
808 /// You **must** ensure the root stream (i.e. stream created through a `reflector()`)
809 /// is driven to readiness independently of this controller to ensure the
810 /// watcher never deadlocks.
811 ///
812 /// # Example:
813 ///
814 /// ```no_run
815 /// # use futures::StreamExt;
816 /// # use k8s_openapi::api::apps::v1::Deployment;
817 /// # use kube::runtime::controller::{Action, Controller};
818 /// # use kube::runtime::{predicates, watcher, reflector, WatchStreamExt};
819 /// # use kube::{Api, Client, Error, ResourceExt};
820 /// # use std::sync::Arc;
821 /// # async fn reconcile(_: Arc<Deployment>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
822 /// # fn error_policy(_: Arc<Deployment>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
823 /// # async fn doc(client: kube::Client) {
824 /// let api: Api<Deployment> = Api::default_namespaced(client);
825 /// let (reader, writer) = reflector::store_shared(128);
826 /// let subscriber = writer
827 /// .subscribe()
828 /// .expect("subscribers can only be created from shared stores");
829 /// let deploys = watcher(api, watcher::Config::default())
830 /// .default_backoff()
831 /// .reflect(writer)
832 /// .applied_objects()
833 /// .for_each(|ev| async move {
834 /// match ev {
835 /// Ok(obj) => tracing::info!("got obj {obj:?}"),
836 /// Err(error) => tracing::error!(%error, "received error")
837 /// }
838 /// });
839 ///
840 /// let controller = Controller::for_shared_stream(subscriber, reader)
841 /// .run(reconcile, error_policy, Arc::new(()))
842 /// .for_each(|ev| async move {
843 /// tracing::info!("reconciled {ev:?}")
844 /// });
845 ///
846 /// // Drive streams using a select statement
847 /// tokio::select! {
848 /// _ = deploys => {},
849 /// _ = controller => {},
850 /// }
851 /// # }
852 #[cfg(feature = "unstable-runtime-subscribe")]
853 pub fn for_shared_stream(trigger: impl Stream<Item = Arc<K>> + Send + 'static, reader: Store<K>) -> Self
854 where
855 K::DynamicType: Default,
856 {
857 Self::for_shared_stream_with(trigger, reader, Default::default())
858 }
859
860 /// This is the same as [`Controller::for_stream`]. Instead of taking an
861 /// `Api` (e.g. [`Controller::new`]), a stream of resources is used. Shared
862 /// streams can be created out-of-band by subscribing on a store `Writer`.
863 /// Through this interface, multiple controllers can use the same root
864 /// (shared) input stream of resources to keep memory overheads smaller.
865 ///
866 /// **N.B**: This constructor requires an
867 /// [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21)
868 /// feature.
869 ///
870 /// Prefer [`Controller::new`] or [`Controller::for_stream`] if you do not
871 /// need to share the stream.
872 ///
873 /// This variant constructor is used for [`dynamic`] types found through
874 /// discovery. Prefer [`Controller::for_shared_stream`] for static types (i.e.
875 /// known at compile time).
876 ///
877 /// [`dynamic`]: kube_client::core::dynamic
878 #[cfg(feature = "unstable-runtime-subscribe")]
879 pub fn for_shared_stream_with(
880 trigger: impl Stream<Item = Arc<K>> + Send + 'static,
881 reader: Store<K>,
882 dyntype: K::DynamicType,
883 ) -> Self {
884 let mut trigger_selector = stream::SelectAll::new();
885 let self_watcher = trigger_self_shared(trigger.map(Ok), dyntype.clone()).boxed();
886 trigger_selector.push(self_watcher);
887 Self {
888 trigger_selector,
889 trigger_backoff: Box::<DefaultBackoff>::default(),
890 graceful_shutdown_selector: vec![
891 // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
892 future::pending().boxed(),
893 ],
894 forceful_shutdown_selector: vec![
895 // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
896 future::pending().boxed(),
897 ],
898 dyntype,
899 reader,
900 config: Default::default(),
901 }
902 }
903
904 /// Specify the configuration for the controller's behavior.
905 #[must_use]
906 pub fn with_config(mut self, config: Config) -> Self {
907 self.config = config;
908 self
909 }
910
911 /// Specify the backoff policy for "trigger" watches
912 ///
913 /// This includes the core watch, as well as auxilary watches introduced by [`Self::owns`] and [`Self::watches`].
914 ///
915 /// The [`default_backoff`](crate::watcher::default_backoff) follows client-go conventions,
916 /// but can be overridden by calling this method.
917 #[must_use]
918 pub fn trigger_backoff(mut self, backoff: impl Backoff + Send + 'static) -> Self {
919 self.trigger_backoff = Box::new(backoff);
920 self
921 }
922
923 /// Retrieve a copy of the reader before starting the controller
924 pub fn store(&self) -> Store<K> {
925 self.reader.clone()
926 }
927
928 /// Specify `Child` objects which `K` owns and should be watched
929 ///
930 /// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `Child`.
931 /// All owned `Child` objects **must** contain an [`OwnerReference`] pointing back to a `K`.
932 ///
933 /// The [`watcher::Config`] controls the subset of `Child` objects that you want the [`Api`]
934 /// to watch - in the Api's configured scope - and receive reconcile events for.
935 /// To watch the full set of `Child` objects in the given `Api` scope, you can use [`watcher::Config::default`].
936 ///
937 /// [`OwnerReference`]: k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference
938 #[must_use]
939 pub fn owns<Child: Clone + Resource<DynamicType = ()> + DeserializeOwned + Debug + Send + 'static>(
940 self,
941 api: Api<Child>,
942 wc: watcher::Config,
943 ) -> Self {
944 self.owns_with(api, (), wc)
945 }
946
947 /// Specify `Child` objects which `K` owns and should be watched
948 ///
949 /// Same as [`Controller::owns`], but accepts a `DynamicType` so it can be used with dynamic resources.
950 #[must_use]
951 pub fn owns_with<Child: Clone + Resource + DeserializeOwned + Debug + Send + 'static>(
952 mut self,
953 api: Api<Child>,
954 dyntype: Child::DynamicType,
955 wc: watcher::Config,
956 ) -> Self
957 where
958 Child::DynamicType: Debug + Eq + Hash + Clone,
959 {
960 // TODO: call owns_stream_with when it's stable
961 let child_watcher = trigger_owners(
962 metadata_watcher(api, wc).touched_objects(),
963 self.dyntype.clone(),
964 dyntype,
965 );
966 self.trigger_selector.push(child_watcher.boxed());
967 self
968 }
969
970 /// Trigger the reconciliation process for a stream of `Child` objects of the owner `K`
971 ///
972 /// Same as [`Controller::owns`], but instead of an `Api`, a stream of resources is used.
973 /// This allows for customized and pre-filtered watch streams to be used as a trigger,
974 /// as well as sharing input streams between multiple controllers.
975 ///
976 /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature.
977 ///
978 /// Watcher streams passed in here should be filtered first through `touched_objects`.
979 ///
980 /// # Example:
981 ///
982 /// ```no_run
983 /// # use futures::StreamExt;
984 /// # use k8s_openapi::api::core::v1::ConfigMap;
985 /// # use k8s_openapi::api::apps::v1::StatefulSet;
986 /// # use kube::runtime::controller::Action;
987 /// # use kube::runtime::{predicates, metadata_watcher, watcher, Controller, WatchStreamExt};
988 /// # use kube::{Api, Client, Error, ResourceExt};
989 /// # use std::sync::Arc;
990 /// # type CustomResource = ConfigMap;
991 /// # async fn reconcile(_: Arc<CustomResource>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
992 /// # fn error_policy(_: Arc<CustomResource>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
993 /// # async fn doc(client: kube::Client) {
994 /// let sts_stream = metadata_watcher(Api::<StatefulSet>::all(client.clone()), watcher::Config::default())
995 /// .touched_objects()
996 /// .predicate_filter(predicates::generation);
997 ///
998 /// Controller::new(Api::<CustomResource>::all(client), watcher::Config::default())
999 /// .owns_stream(sts_stream)
1000 /// .run(reconcile, error_policy, Arc::new(()))
1001 /// .for_each(|_| std::future::ready(()))
1002 /// .await;
1003 /// # }
1004 /// ```
1005 #[cfg(feature = "unstable-runtime-stream-control")]
1006 #[must_use]
1007 pub fn owns_stream<Child: Resource<DynamicType = ()> + Send + 'static>(
1008 self,
1009 trigger: impl Stream<Item = Result<Child, watcher::Error>> + Send + 'static,
1010 ) -> Self {
1011 self.owns_stream_with(trigger, ())
1012 }
1013
1014 /// Trigger the reconciliation process for a stream of `Child` objects of the owner `K`
1015 ///
1016 /// Same as [`Controller::owns`], but instead of an `Api`, a stream of resources is used.
1017 /// This allows for customized and pre-filtered watch streams to be used as a trigger,
1018 /// as well as sharing input streams between multiple controllers.
1019 ///
1020 /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature.
1021 ///
1022 /// Same as [`Controller::owns_stream`], but accepts a `DynamicType` so it can be used with dynamic resources.
1023 #[cfg(feature = "unstable-runtime-stream-control")]
1024 #[must_use]
1025 pub fn owns_stream_with<Child: Resource + Send + 'static>(
1026 mut self,
1027 trigger: impl Stream<Item = Result<Child, watcher::Error>> + Send + 'static,
1028 dyntype: Child::DynamicType,
1029 ) -> Self
1030 where
1031 Child::DynamicType: Debug + Eq + Hash + Clone,
1032 {
1033 let child_watcher = trigger_owners(trigger, self.dyntype.clone(), dyntype);
1034 self.trigger_selector.push(child_watcher.boxed());
1035 self
1036 }
1037
1038 /// This is the same as [`Controller::for_stream`]. Instead of taking an
1039 /// `Api` (e.g. [`Controller::new`]), a stream of resources is used. Shared
1040 /// streams can be created out-of-band by subscribing on a store `Writer`.
1041 /// Through this interface, multiple controllers can use the same root
1042 /// (shared) input stream of resources to keep memory overheads smaller.
1043 ///
1044 /// **N.B**: This constructor requires an
1045 /// [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21)
1046 /// feature.
1047 ///
1048 /// Prefer [`Controller::new`] or [`Controller::for_stream`] if you do not
1049 /// need to share the stream.
1050 ///
1051 /// ## Warning:
1052 ///
1053 /// You **must** ensure the root stream (i.e. stream created through a `reflector()`)
1054 /// is driven to readiness independently of this controller to ensure the
1055 /// watcher never deadlocks.
1056 ///
1057 ///
1058 /// Trigger the reconciliation process for a shared stream of `Child`
1059 /// objects of the owner `K`
1060 ///
1061 /// Conceptually the same as [`Controller::owns`], but a stream is used
1062 /// instead of an `Api`. This interface behaves similarly to its non-shared
1063 /// counterpart [`Controller::owns_stream`].
1064 ///
1065 /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature.
1066 ///
1067 /// # Example:
1068 ///
1069 /// ```no_run
1070 /// # use futures::StreamExt;
1071 /// # use k8s_openapi::api::{apps::v1::Deployment, core::v1::Pod};
1072 /// # use kube::runtime::controller::{Action, Controller};
1073 /// # use kube::runtime::{predicates, watcher, reflector, WatchStreamExt};
1074 /// # use kube::{Api, Client, Error, ResourceExt};
1075 /// # use std::sync::Arc;
1076 /// # async fn reconcile(_: Arc<Deployment>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
1077 /// # fn error_policy(_: Arc<Deployment>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
1078 /// # async fn doc(client: kube::Client) {
1079 /// let deploys: Api<Deployment> = Api::default_namespaced(client.clone());
1080 /// let pod_api: Api<Pod> = Api::default_namespaced(client);
1081 ///
1082 /// let (reader, writer) = reflector::store_shared(128);
1083 /// let subscriber = writer
1084 /// .subscribe()
1085 /// .expect("subscribers can only be created from shared stores");
1086 /// let pods = watcher(pod_api, watcher::Config::default())
1087 /// .default_backoff()
1088 /// .reflect(writer)
1089 /// .applied_objects()
1090 /// .for_each(|ev| async move {
1091 /// match ev {
1092 /// Ok(obj) => tracing::info!("got obj {obj:?}"),
1093 /// Err(error) => tracing::error!(%error, "received error")
1094 /// }
1095 /// });
1096 ///
1097 /// let controller = Controller::new(deploys, Default::default())
1098 /// .owns_shared_stream(subscriber)
1099 /// .run(reconcile, error_policy, Arc::new(()))
1100 /// .for_each(|ev| async move {
1101 /// tracing::info!("reconciled {ev:?}")
1102 /// });
1103 ///
1104 /// // Drive streams using a select statement
1105 /// tokio::select! {
1106 /// _ = pods => {},
1107 /// _ = controller => {},
1108 /// }
1109 /// # }
1110 #[cfg(feature = "unstable-runtime-subscribe")]
1111 #[must_use]
1112 pub fn owns_shared_stream<Child: Resource<DynamicType = ()> + Send + 'static>(
1113 self,
1114 trigger: impl Stream<Item = Arc<Child>> + Send + 'static,
1115 ) -> Self {
1116 self.owns_shared_stream_with(trigger, ())
1117 }
1118
1119 /// Trigger the reconciliation process for a shared stream of `Child` objects of the owner `K`
1120 ///
1121 /// Same as [`Controller::owns`], but instead of an `Api`, a shared stream of resources is used.
1122 /// The source stream can be shared between multiple controllers, optimising
1123 /// resource usage.
1124 ///
1125 /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature.
1126 ///
1127 /// Same as [`Controller::owns_shared_stream`], but accepts a `DynamicType` so it can be used with dynamic resources.
1128 #[cfg(feature = "unstable-runtime-subscribe")]
1129 #[must_use]
1130 pub fn owns_shared_stream_with<Child: Resource<DynamicType = ()> + Send + 'static>(
1131 mut self,
1132 trigger: impl Stream<Item = Arc<Child>> + Send + 'static,
1133 dyntype: Child::DynamicType,
1134 ) -> Self
1135 where
1136 Child::DynamicType: Debug + Eq + Hash + Clone,
1137 {
1138 let child_watcher = trigger_owners_shared(trigger.map(Ok), self.dyntype.clone(), dyntype);
1139 self.trigger_selector.push(child_watcher.boxed());
1140 self
1141 }
1142
1143 /// Specify `Watched` object which `K` has a custom relation to and should be watched
1144 ///
1145 /// To define the `Watched` relation with `K`, you **must** define a custom relation mapper, which,
1146 /// when given a `Watched` object, returns an option or iterator of relevant `ObjectRef<K>` to reconcile.
1147 ///
1148 /// If the relation `K` has to `Watched` is that `K` owns `Watched`, consider using [`Controller::owns`].
1149 ///
1150 /// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `Watched`.
1151 ///
1152 /// The [`watcher::Config`] controls the subset of `Watched` objects that you want the [`Api`]
1153 /// to watch - in the Api's configured scope - and run through the custom mapper.
1154 /// To watch the full set of `Watched` objects in given the `Api` scope, you can use [`watcher::Config::default`].
1155 ///
1156 /// # Example
1157 ///
1158 /// Tracking cross cluster references using the [Operator-SDK] annotations.
1159 ///
1160 /// ```
1161 /// # use kube::runtime::{Controller, controller::Action, reflector::ObjectRef, watcher};
1162 /// # use kube::{Api, ResourceExt};
1163 /// # use k8s_openapi::api::core::v1::{ConfigMap, Namespace};
1164 /// # use futures::StreamExt;
1165 /// # use std::sync::Arc;
1166 /// # type WatchedResource = Namespace;
1167 /// # struct Context;
1168 /// # async fn reconcile(_: Arc<ConfigMap>, _: Arc<Context>) -> Result<Action, kube::Error> {
1169 /// # Ok(Action::await_change())
1170 /// # };
1171 /// # fn error_policy(_: Arc<ConfigMap>, _: &kube::Error, _: Arc<Context>) -> Action {
1172 /// # Action::await_change()
1173 /// # }
1174 /// # async fn doc(client: kube::Client) -> Result<(), Box<dyn std::error::Error>> {
1175 /// # let memcached = Api::<ConfigMap>::all(client.clone());
1176 /// # let context = Arc::new(Context);
1177 /// Controller::new(memcached, watcher::Config::default())
1178 /// .watches(
1179 /// Api::<WatchedResource>::all(client.clone()),
1180 /// watcher::Config::default(),
1181 /// |ar| {
1182 /// let prt = ar
1183 /// .annotations()
1184 /// .get("operator-sdk/primary-resource-type")
1185 /// .map(String::as_str);
1186 ///
1187 /// if prt != Some("Memcached.cache.example.com") {
1188 /// return None;
1189 /// }
1190 ///
1191 /// let (namespace, name) = ar
1192 /// .annotations()
1193 /// .get("operator-sdk/primary-resource")?
1194 /// .split_once('/')?;
1195 ///
1196 /// Some(ObjectRef::new(name).within(namespace))
1197 /// }
1198 /// )
1199 /// .run(reconcile, error_policy, context)
1200 /// .for_each(|_| futures::future::ready(()))
1201 /// .await;
1202 /// # Ok(())
1203 /// # }
1204 /// ```
1205 ///
1206 /// [Operator-SDK]: https://sdk.operatorframework.io/docs/building-operators/ansible/reference/retroactively-owned-resources/
1207 #[must_use]
1208 pub fn watches<Other, I>(
1209 self,
1210 api: Api<Other>,
1211 wc: watcher::Config,
1212 mapper: impl Fn(Other) -> I + Sync + Send + 'static,
1213 ) -> Self
1214 where
1215 Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1216 Other::DynamicType: Default + Debug + Clone + Eq + Hash,
1217 I: 'static + IntoIterator<Item = ObjectRef<K>>,
1218 I::IntoIter: Send,
1219 {
1220 self.watches_with(api, Default::default(), wc, mapper)
1221 }
1222
1223 /// Specify `Watched` object which `K` has a custom relation to and should be watched
1224 ///
1225 /// Same as [`Controller::watches`], but accepts a `DynamicType` so it can be used with dynamic resources.
1226 #[must_use]
1227 pub fn watches_with<Other, I>(
1228 mut self,
1229 api: Api<Other>,
1230 dyntype: Other::DynamicType,
1231 wc: watcher::Config,
1232 mapper: impl Fn(Other) -> I + Sync + Send + 'static,
1233 ) -> Self
1234 where
1235 Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1236 I: 'static + IntoIterator<Item = ObjectRef<K>>,
1237 I::IntoIter: Send,
1238 Other::DynamicType: Debug + Clone + Eq + Hash,
1239 {
1240 let other_watcher = trigger_others(watcher(api, wc).touched_objects(), mapper, dyntype);
1241 self.trigger_selector.push(other_watcher.boxed());
1242 self
1243 }
1244
1245 /// Trigger the reconciliation process for a stream of `Other` objects related to a `K`
1246 ///
1247 /// Same as [`Controller::watches`], but instead of an `Api`, a stream of resources is used.
1248 /// This allows for customized and pre-filtered watch streams to be used as a trigger,
1249 /// as well as sharing input streams between multiple controllers.
1250 ///
1251 /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature.
1252 ///
1253 /// Watcher streams passed in here should be filtered first through `touched_objects`.
1254 ///
1255 /// # Example:
1256 ///
1257 /// ```no_run
1258 /// # use futures::StreamExt;
1259 /// # use k8s_openapi::api::core::v1::ConfigMap;
1260 /// # use k8s_openapi::api::apps::v1::DaemonSet;
1261 /// # use kube::runtime::controller::Action;
1262 /// # use kube::runtime::{predicates, reflector::ObjectRef, watcher, Controller, WatchStreamExt};
1263 /// # use kube::{Api, Client, Error, ResourceExt};
1264 /// # use std::sync::Arc;
1265 /// # type CustomResource = ConfigMap;
1266 /// # async fn reconcile(_: Arc<CustomResource>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
1267 /// # fn error_policy(_: Arc<CustomResource>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
1268 /// fn mapper(_: DaemonSet) -> Option<ObjectRef<CustomResource>> { todo!() }
1269 /// # async fn doc(client: kube::Client) {
1270 /// let api: Api<DaemonSet> = Api::all(client.clone());
1271 /// let cr: Api<CustomResource> = Api::all(client.clone());
1272 /// let daemons = watcher(api, watcher::Config::default())
1273 /// .touched_objects()
1274 /// .predicate_filter(predicates::generation);
1275 ///
1276 /// Controller::new(cr, watcher::Config::default())
1277 /// .watches_stream(daemons, mapper)
1278 /// .run(reconcile, error_policy, Arc::new(()))
1279 /// .for_each(|_| std::future::ready(()))
1280 /// .await;
1281 /// # }
1282 /// ```
1283 #[cfg(feature = "unstable-runtime-stream-control")]
1284 #[must_use]
1285 pub fn watches_stream<Other, I>(
1286 self,
1287 trigger: impl Stream<Item = Result<Other, watcher::Error>> + Send + 'static,
1288 mapper: impl Fn(Other) -> I + Sync + Send + 'static,
1289 ) -> Self
1290 where
1291 Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1292 Other::DynamicType: Default + Debug + Clone,
1293 I: 'static + IntoIterator<Item = ObjectRef<K>>,
1294 I::IntoIter: Send,
1295 {
1296 self.watches_stream_with(trigger, mapper, Default::default())
1297 }
1298
1299 /// Trigger the reconciliation process for a stream of `Other` objects related to a `K`
1300 ///
1301 /// Same as [`Controller::watches`], but instead of an `Api`, a stream of resources is used.
1302 /// This allows for customized and pre-filtered watch streams to be used as a trigger,
1303 /// as well as sharing input streams between multiple controllers.
1304 ///
1305 /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature.
1306 ///
1307 /// Same as [`Controller::watches_stream`], but accepts a `DynamicType` so it can be used with dynamic resources.
1308 #[cfg(feature = "unstable-runtime-stream-control")]
1309 #[must_use]
1310 pub fn watches_stream_with<Other, I>(
1311 mut self,
1312 trigger: impl Stream<Item = Result<Other, watcher::Error>> + Send + 'static,
1313 mapper: impl Fn(Other) -> I + Sync + Send + 'static,
1314 dyntype: Other::DynamicType,
1315 ) -> Self
1316 where
1317 Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1318 Other::DynamicType: Debug + Clone,
1319 I: 'static + IntoIterator<Item = ObjectRef<K>>,
1320 I::IntoIter: Send,
1321 {
1322 let other_watcher = trigger_others(trigger, mapper, dyntype);
1323 self.trigger_selector.push(other_watcher.boxed());
1324 self
1325 }
1326
1327 /// Trigger the reconciliation process for a shared stream of `Other`
1328 /// objects related to a `K`
1329 ///
1330 /// Same as [`Controller::watches`], but instead of an `Api`, a shared
1331 /// stream of resources is used. This allows for sharing input streams
1332 /// between multiple controllers.
1333 ///
1334 /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature.
1335 ///
1336 /// Watcher streams passed in here should be filtered first through `touched_objects`.
1337 ///
1338 /// # Example:
1339 ///
1340 /// ```no_run
1341 /// # use futures::StreamExt;
1342 /// # use k8s_openapi::api::core::v1::ConfigMap;
1343 /// # use k8s_openapi::api::apps::v1::DaemonSet;
1344 /// # use kube::runtime::controller::Action;
1345 /// # use kube::runtime::{predicates, reflector::ObjectRef, watcher, Controller, WatchStreamExt};
1346 /// # use kube::{Api, Client, Error, ResourceExt};
1347 /// # use std::sync::Arc;
1348 /// # type CustomResource = ConfigMap;
1349 /// # async fn reconcile(_: Arc<CustomResource>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
1350 /// # fn error_policy(_: Arc<CustomResource>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
1351 /// fn mapper(_: Arc<DaemonSet>) -> Option<ObjectRef<CustomResource>> { todo!() }
1352 /// # async fn doc(client: kube::Client) {
1353 /// let api: Api<DaemonSet> = Api::all(client.clone());
1354 /// let cr: Api<CustomResource> = Api::all(client.clone());
1355 /// let (reader, writer) = kube_runtime::reflector::store_shared(128);
1356 /// let subscriber = writer
1357 /// .subscribe()
1358 /// .expect("subscribers can only be created from shared stores");
1359 /// let daemons = watcher(api, watcher::Config::default())
1360 /// .reflect(writer)
1361 /// .touched_objects()
1362 /// .for_each(|ev| async move {
1363 /// match ev {
1364 /// Ok(obj) => {},
1365 /// Err(error) => tracing::error!(%error, "received err")
1366 /// }
1367 /// });
1368 ///
1369 /// let controller = Controller::new(cr, watcher::Config::default())
1370 /// .watches_shared_stream(subscriber, mapper)
1371 /// .run(reconcile, error_policy, Arc::new(()))
1372 /// .for_each(|_| std::future::ready(()));
1373 ///
1374 /// // Drive streams using a select statement
1375 /// tokio::select! {
1376 /// _ = daemons => {},
1377 /// _ = controller => {},
1378 /// }
1379 /// # }
1380 /// ```
1381 #[cfg(feature = "unstable-runtime-subscribe")]
1382 #[must_use]
1383 pub fn watches_shared_stream<Other, I>(
1384 self,
1385 trigger: impl Stream<Item = Arc<Other>> + Send + 'static,
1386 mapper: impl Fn(Arc<Other>) -> I + Sync + Send + 'static,
1387 ) -> Self
1388 where
1389 Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1390 Other::DynamicType: Default + Debug + Clone,
1391 I: 'static + IntoIterator<Item = ObjectRef<K>>,
1392 I::IntoIter: Send,
1393 {
1394 self.watches_shared_stream_with(trigger, mapper, Default::default())
1395 }
1396
1397 /// Trigger the reconciliation process for a shared stream of `Other` objects related to a `K`
1398 ///
1399 /// Same as [`Controller::watches`], but instead of an `Api`, a shared
1400 /// stream of resources is used. This allows for sharing of streams between
1401 /// multiple controllers.
1402 ///
1403 /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature.
1404 ///
1405 /// Same as [`Controller::watches_shared_stream`], but accepts a `DynamicType` so it can be used with dynamic resources.
1406 #[cfg(feature = "unstable-runtime-subscribe")]
1407 #[must_use]
1408 pub fn watches_shared_stream_with<Other, I>(
1409 mut self,
1410 trigger: impl Stream<Item = Arc<Other>> + Send + 'static,
1411 mapper: impl Fn(Arc<Other>) -> I + Sync + Send + 'static,
1412 dyntype: Other::DynamicType,
1413 ) -> Self
1414 where
1415 Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1416 Other::DynamicType: Debug + Clone,
1417 I: 'static + IntoIterator<Item = ObjectRef<K>>,
1418 I::IntoIter: Send,
1419 {
1420 let other_watcher = trigger_others_shared(trigger.map(Ok), mapper, dyntype);
1421 self.trigger_selector.push(other_watcher.boxed());
1422 self
1423 }
1424
1425 /// Trigger a reconciliation for all managed objects whenever `trigger` emits a value
1426 ///
1427 /// For example, this can be used to reconcile all objects whenever the controller's configuration changes.
1428 ///
1429 /// To reconcile all objects when a new line is entered:
1430 ///
1431 /// ```
1432 /// # async {
1433 /// use futures::stream::StreamExt;
1434 /// use k8s_openapi::api::core::v1::ConfigMap;
1435 /// use kube::{
1436 /// Client,
1437 /// api::{Api, ResourceExt},
1438 /// runtime::{
1439 /// controller::{Controller, Action},
1440 /// watcher,
1441 /// },
1442 /// };
1443 /// use std::{convert::Infallible, io::BufRead, sync::Arc};
1444 /// let (mut reload_tx, reload_rx) = futures::channel::mpsc::channel(0);
1445 /// // Using a regular background thread since tokio::io::stdin() doesn't allow aborting reads,
1446 /// // and its worker prevents the Tokio runtime from shutting down.
1447 /// std::thread::spawn(move || {
1448 /// for _ in std::io::BufReader::new(std::io::stdin()).lines() {
1449 /// let _ = reload_tx.try_send(());
1450 /// }
1451 /// });
1452 /// Controller::new(
1453 /// Api::<ConfigMap>::all(Client::try_default().await.unwrap()),
1454 /// watcher::Config::default(),
1455 /// )
1456 /// .reconcile_all_on(reload_rx.map(|_| ()))
1457 /// .run(
1458 /// |o, _| async move {
1459 /// println!("Reconciling {}", o.name_any());
1460 /// Ok(Action::await_change())
1461 /// },
1462 /// |_object: Arc<ConfigMap>, err: &Infallible, _| Err(err).unwrap(),
1463 /// Arc::new(()),
1464 /// );
1465 /// # };
1466 /// ```
1467 ///
1468 /// This can be called multiple times, in which case they are additive; reconciles are scheduled whenever *any* [`Stream`] emits a new item.
1469 ///
1470 /// If a [`Stream`] is terminated (by emitting [`None`]) then the [`Controller`] keeps running, but the [`Stream`] stops being polled.
1471 #[must_use]
1472 pub fn reconcile_all_on(mut self, trigger: impl Stream<Item = ()> + Send + Sync + 'static) -> Self {
1473 let store = self.store();
1474 let dyntype = self.dyntype.clone();
1475 self.trigger_selector.push(
1476 trigger
1477 .flat_map(move |()| {
1478 let dyntype = dyntype.clone();
1479 stream::iter(store.state().into_iter().map(move |obj| {
1480 Ok(ReconcileRequest {
1481 obj_ref: ObjectRef::from_obj_with(&*obj, dyntype.clone()),
1482 reason: ReconcileReason::BulkReconcile,
1483 })
1484 }))
1485 })
1486 .boxed(),
1487 );
1488 self
1489 }
1490
1491 /// Trigger the reconciliation process for a managed object `ObjectRef<K>` whenever `trigger` emits a value
1492 ///
1493 /// This can be used to inject reconciliations for specific objects from an external resource.
1494 ///
1495 /// # Example:
1496 ///
1497 /// ```no_run
1498 /// # async {
1499 /// # use futures::{StreamExt, Stream, stream, TryStreamExt};
1500 /// # use k8s_openapi::api::core::v1::{ConfigMap};
1501 /// # use kube::api::Api;
1502 /// # use kube::runtime::controller::Action;
1503 /// # use kube::runtime::reflector::{ObjectRef, Store};
1504 /// # use kube::runtime::{reflector, watcher, Controller, WatchStreamExt};
1505 /// # use kube::runtime::watcher::Config;
1506 /// # use kube::{Client, Error, ResourceExt};
1507 /// # use std::future;
1508 /// # use std::sync::Arc;
1509 /// #
1510 /// # let client: Client = todo!();
1511 /// # async fn reconcile(_: Arc<ConfigMap>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
1512 /// # fn error_policy(_: Arc<ConfigMap>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
1513 /// # fn watch_external_objects() -> impl Stream<Item = ExternalObject> { stream::iter(vec![]) }
1514 /// # let ns = "controller-ns".to_string();
1515 /// struct ExternalObject {
1516 /// name: String,
1517 /// }
1518 /// let external_stream = watch_external_objects().map(|ext| {
1519 /// ObjectRef::new(&format!("{}-cm", ext.name)).within(&ns)
1520 /// });
1521 ///
1522 /// Controller::new(Api::<ConfigMap>::namespaced(client, &ns), Config::default())
1523 /// .reconcile_on(external_stream)
1524 /// .run(reconcile, error_policy, Arc::new(()))
1525 /// .for_each(|_| future::ready(()))
1526 /// .await;
1527 /// # };
1528 /// ```
1529 #[cfg(feature = "unstable-runtime-reconcile-on")]
1530 #[must_use]
1531 pub fn reconcile_on(mut self, trigger: impl Stream<Item = ObjectRef<K>> + Send + 'static) -> Self {
1532 self.trigger_selector.push(
1533 trigger
1534 .map(move |obj| {
1535 Ok(ReconcileRequest {
1536 obj_ref: obj,
1537 reason: ReconcileReason::Unknown,
1538 })
1539 })
1540 .boxed(),
1541 );
1542 self
1543 }
1544
1545 /// Start a graceful shutdown when `trigger` resolves. Once a graceful shutdown has been initiated:
1546 ///
1547 /// - No new reconciliations are started from the scheduler
1548 /// - The underlying Kubernetes watch is terminated
1549 /// - All running reconciliations are allowed to finish
1550 /// - [`Controller::run`]'s [`Stream`] terminates once all running reconciliations are done.
1551 ///
1552 /// For example, to stop the reconciler whenever the user presses Ctrl+C:
1553 ///
1554 /// ```rust
1555 /// # async {
1556 /// use futures::future::FutureExt;
1557 /// use k8s_openapi::api::core::v1::ConfigMap;
1558 /// use kube::{Api, Client, ResourceExt};
1559 /// use kube_runtime::{
1560 /// controller::{Controller, Action},
1561 /// watcher,
1562 /// };
1563 /// use std::{convert::Infallible, sync::Arc};
1564 /// Controller::new(
1565 /// Api::<ConfigMap>::all(Client::try_default().await.unwrap()),
1566 /// watcher::Config::default(),
1567 /// )
1568 /// .graceful_shutdown_on(tokio::signal::ctrl_c().map(|_| ()))
1569 /// .run(
1570 /// |o, _| async move {
1571 /// println!("Reconciling {}", o.name_any());
1572 /// Ok(Action::await_change())
1573 /// },
1574 /// |_, err: &Infallible, _| Err(err).unwrap(),
1575 /// Arc::new(()),
1576 /// );
1577 /// # };
1578 /// ```
1579 ///
1580 /// This can be called multiple times, in which case they are additive; the [`Controller`] starts to terminate
1581 /// as soon as *any* [`Future`] resolves.
1582 #[must_use]
1583 pub fn graceful_shutdown_on(mut self, trigger: impl Future<Output = ()> + Send + Sync + 'static) -> Self {
1584 self.graceful_shutdown_selector.push(trigger.boxed());
1585 self
1586 }
1587
1588 /// Initiate graceful shutdown on Ctrl+C or SIGTERM (on Unix), waiting for all reconcilers to finish.
1589 ///
1590 /// Once a graceful shutdown has been initiated, Ctrl+C (or SIGTERM) can be sent again
1591 /// to request a forceful shutdown (requesting that all reconcilers abort on the next yield point).
1592 ///
1593 /// NOTE: On Unix this leaves the default handlers for SIGINT and SIGTERM disabled after the [`Controller`] has
1594 /// terminated. If you run this in a process containing more tasks than just the [`Controller`], ensure that
1595 /// all other tasks either terminate when the [`Controller`] does, that they have their own signal handlers,
1596 /// or use [`Controller::graceful_shutdown_on`] to manage your own shutdown strategy.
1597 ///
1598 /// NOTE: If developing a Windows service then you need to listen to its lifecycle events instead, and hook that into
1599 /// [`Controller::graceful_shutdown_on`].
1600 ///
1601 /// NOTE: [`Controller::run`] terminates as soon as a forceful shutdown is requested, but leaves the reconcilers running
1602 /// in the background while they terminate. This will block [`tokio::runtime::Runtime`] termination until they actually terminate,
1603 /// unless you run [`std::process::exit`] afterwards.
1604 #[must_use]
1605 pub fn shutdown_on_signal(mut self) -> Self {
1606 async fn shutdown_signal() {
1607 futures::future::select(
1608 tokio::signal::ctrl_c().map(|_| ()).boxed(),
1609 #[cfg(unix)]
1610 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
1611 .unwrap()
1612 .recv()
1613 .map(|_| ())
1614 .boxed(),
1615 // Assume that ctrl_c is enough on non-Unix platforms (such as Windows)
1616 #[cfg(not(unix))]
1617 futures::future::pending::<()>(),
1618 )
1619 .await;
1620 }
1621
1622 let (graceful_tx, graceful_rx) = channel::oneshot::channel();
1623 self.graceful_shutdown_selector
1624 .push(graceful_rx.map(|_| ()).boxed());
1625 self.forceful_shutdown_selector.push(
1626 async {
1627 tracing::info!("press ctrl+c to shut down gracefully");
1628 shutdown_signal().await;
1629 if let Ok(()) = graceful_tx.send(()) {
1630 tracing::info!("graceful shutdown requested, press ctrl+c again to force shutdown");
1631 } else {
1632 tracing::info!(
1633 "graceful shutdown already requested, press ctrl+c again to force shutdown"
1634 );
1635 }
1636 shutdown_signal().await;
1637 tracing::info!("forced shutdown requested");
1638 }
1639 .boxed(),
1640 );
1641 self
1642 }
1643
1644 /// Consume all the parameters of the Controller and start the applier stream
1645 ///
1646 /// This creates a stream from all builder calls and starts an applier with
1647 /// a specified `reconciler` and `error_policy` callbacks. Each of these will be called
1648 /// with a configurable `context`.
1649 pub fn run<ReconcilerFut, Ctx>(
1650 self,
1651 mut reconciler: impl FnMut(Arc<K>, Arc<Ctx>) -> ReconcilerFut,
1652 error_policy: impl Fn(Arc<K>, &ReconcilerFut::Error, Arc<Ctx>) -> Action,
1653 context: Arc<Ctx>,
1654 ) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, watcher::Error>>>
1655 where
1656 K::DynamicType: Debug + Unpin,
1657 ReconcilerFut: TryFuture<Ok = Action> + Send + 'static,
1658 ReconcilerFut::Error: std::error::Error + Send + 'static,
1659 {
1660 applier(
1661 move |obj, ctx| {
1662 CancelableJoinHandle::spawn(
1663 reconciler(obj, ctx).into_future().in_current_span(),
1664 &Handle::current(),
1665 )
1666 },
1667 error_policy,
1668 context,
1669 self.reader,
1670 StreamBackoff::new(self.trigger_selector, self.trigger_backoff)
1671 .take_until(future::select_all(self.graceful_shutdown_selector)),
1672 self.config,
1673 )
1674 .take_until(futures::future::select_all(self.forceful_shutdown_selector))
1675 }
1676}
1677
1678#[cfg(test)]
1679mod tests {
1680 use std::{convert::Infallible, pin::pin, sync::Arc, time::Duration};
1681
1682 use super::{Action, APPLIER_REQUEUE_BUF_SIZE};
1683 use crate::{
1684 applier,
1685 reflector::{self, ObjectRef},
1686 watcher::{self, metadata_watcher, watcher, Event},
1687 Config, Controller,
1688 };
1689 use futures::{Stream, StreamExt, TryStreamExt};
1690 use k8s_openapi::api::core::v1::ConfigMap;
1691 use kube_client::{core::ObjectMeta, Api, Resource};
1692 use serde::de::DeserializeOwned;
1693 use tokio::time::timeout;
1694
1695 fn assert_send<T: Send>(x: T) -> T {
1696 x
1697 }
1698
1699 // Used to typecheck that a type T is a generic type that implements Stream
1700 // and returns a WatchEvent generic over a resource `K`
1701 fn assert_stream<T, K>(x: T) -> T
1702 where
1703 T: Stream<Item = watcher::Result<Event<K>>> + Send,
1704 K: Resource + Clone + DeserializeOwned + std::fmt::Debug + Send + 'static,
1705 {
1706 x
1707 }
1708
1709 fn mock_type<T>() -> T {
1710 unimplemented!(
1711 "mock_type is not supposed to be called, only used for filling holes in type assertions"
1712 )
1713 }
1714
1715 // not #[test] because we don't want to actually run it, we just want to assert that it typechecks
1716 #[allow(dead_code, unused_must_use)]
1717 fn test_controller_should_be_send() {
1718 assert_send(
1719 Controller::new(mock_type::<Api<ConfigMap>>(), Default::default()).run(
1720 |_, _| async { Ok(mock_type::<Action>()) },
1721 |_: Arc<ConfigMap>, _: &std::io::Error, _| mock_type::<Action>(),
1722 Arc::new(()),
1723 ),
1724 );
1725 }
1726
1727 // not #[test] because we don't want to actually run it, we just want to
1728 // assert that it typechecks
1729 //
1730 // will check return types for `watcher` and `watch_metadata` do not drift
1731 // given an arbitrary K that implements `Resource` (e.g ConfigMap)
1732 #[allow(dead_code, unused_must_use)]
1733 fn test_watcher_stream_type_drift() {
1734 assert_stream(watcher(mock_type::<Api<ConfigMap>>(), Default::default()));
1735 assert_stream(metadata_watcher(
1736 mock_type::<Api<ConfigMap>>(),
1737 Default::default(),
1738 ));
1739 }
1740
1741 #[tokio::test]
1742 async fn applier_must_not_deadlock_if_reschedule_buffer_fills() {
1743 // This tests that `applier` handles reschedule queue backpressure correctly, by trying to flood it with no-op reconciles
1744 // This is intended to avoid regressing on https://github.com/kube-rs/kube/issues/926
1745
1746 // Assume that we can keep APPLIER_REQUEUE_BUF_SIZE flooded if we have 100x the number of objects "in rotation"
1747 // On my (@nightkr)'s 3900X I can reliably trigger this with 10x, but let's have some safety margin to avoid false negatives
1748 let items = APPLIER_REQUEUE_BUF_SIZE * 50;
1749 // Assume that everything's OK if we can reconcile every object 3 times on average
1750 let reconciles = items * 3;
1751
1752 let (queue_tx, queue_rx) = futures::channel::mpsc::unbounded::<ObjectRef<ConfigMap>>();
1753 let (store_rx, mut store_tx) = reflector::store();
1754 let mut applier = pin!(applier(
1755 |_obj, _| {
1756 Box::pin(async move {
1757 // Try to flood the rescheduling buffer buffer by just putting it back in the queue immediately
1758 //println!("reconciling {:?}", obj.metadata.name);
1759 Ok(Action::requeue(Duration::ZERO))
1760 })
1761 },
1762 |_: Arc<ConfigMap>, _: &Infallible, _| todo!(),
1763 Arc::new(()),
1764 store_rx,
1765 queue_rx.map(Result::<_, Infallible>::Ok),
1766 Config::default(),
1767 ));
1768 store_tx.apply_watcher_event(&watcher::Event::InitDone);
1769 for i in 0..items {
1770 let obj = ConfigMap {
1771 metadata: ObjectMeta {
1772 name: Some(format!("cm-{i}")),
1773 namespace: Some("default".to_string()),
1774 ..Default::default()
1775 },
1776 ..Default::default()
1777 };
1778 store_tx.apply_watcher_event(&watcher::Event::Apply(obj.clone()));
1779 queue_tx.unbounded_send(ObjectRef::from_obj(&obj)).unwrap();
1780 }
1781
1782 timeout(
1783 Duration::from_secs(10),
1784 applier
1785 .as_mut()
1786 .take(reconciles)
1787 .try_for_each(|_| async { Ok(()) }),
1788 )
1789 .await
1790 .expect("test timeout expired, applier likely deadlocked")
1791 .unwrap();
1792
1793 // Do an orderly shutdown to ensure that no individual reconcilers are stuck
1794 drop(queue_tx);
1795 timeout(
1796 Duration::from_secs(10),
1797 applier.try_for_each(|_| async { Ok(()) }),
1798 )
1799 .await
1800 .expect("applier cleanup timeout expired, individual reconciler likely deadlocked?")
1801 .unwrap();
1802 }
1803}