kube_runtime/controller/
mod.rs

1//! Runs a user-supplied reconciler function on objects when they (or related objects) are updated
2
3use self::runner::Runner;
4use crate::{
5    reflector::{
6        self, reflector,
7        store::{Store, Writer},
8        ObjectRef,
9    },
10    scheduler::{debounced_scheduler, ScheduleRequest},
11    utils::{trystream_try_via, CancelableJoinHandle, KubeRuntimeStreamExt, StreamBackoff, WatchStreamExt},
12    watcher::{self, metadata_watcher, watcher, DefaultBackoff},
13};
14use backoff::backoff::Backoff;
15use educe::Educe;
16use futures::{
17    channel,
18    future::{self, BoxFuture},
19    stream, FutureExt, Stream, StreamExt, TryFuture, TryFutureExt, TryStream, TryStreamExt,
20};
21use kube_client::api::{Api, DynamicObject, Resource};
22use pin_project::pin_project;
23use serde::de::DeserializeOwned;
24use std::{
25    fmt::{Debug, Display},
26    future::Future,
27    hash::Hash,
28    sync::Arc,
29    task::{ready, Poll},
30    time::Duration,
31};
32use stream::BoxStream;
33use thiserror::Error;
34use tokio::{runtime::Handle, time::Instant};
35use tracing::{info_span, Instrument};
36
37mod future_hash_map;
38mod runner;
39
40pub type RunnerError = runner::Error<reflector::store::WriterDropped>;
41
42#[derive(Debug, Error)]
43pub enum Error<ReconcilerErr: 'static, QueueErr: 'static> {
44    #[error("tried to reconcile object {0} that was not found in local store")]
45    ObjectNotFound(ObjectRef<DynamicObject>),
46    #[error("reconciler for object {1} failed")]
47    ReconcilerFailed(#[source] ReconcilerErr, ObjectRef<DynamicObject>),
48    #[error("event queue error")]
49    QueueError(#[source] QueueErr),
50    #[error("runner error")]
51    RunnerError(#[source] RunnerError),
52}
53
54/// Results of the reconciliation attempt
55#[derive(Debug, Clone, Eq, PartialEq)]
56pub struct Action {
57    /// Whether (and when) to next trigger the reconciliation if no external watch triggers hit
58    ///
59    /// For example, use this to query external systems for updates, expire time-limited resources, or
60    /// (in your `error_policy`) retry after errors.
61    requeue_after: Option<Duration>,
62}
63
64impl Action {
65    /// Action to the reconciliation at this time even if no external watch triggers hit
66    ///
67    /// This is the best-practice action that ensures eventual consistency of your controller
68    /// even in the case of missed changes (which can happen).
69    ///
70    /// Watch events are not normally missed, so running this once per hour (`Default`) as a fallback is reasonable.
71    #[must_use]
72    pub fn requeue(duration: Duration) -> Self {
73        Self {
74            requeue_after: Some(duration),
75        }
76    }
77
78    /// Do nothing until a change is detected
79    ///
80    /// This stops the controller periodically reconciling this object until a relevant watch event
81    /// was **detected**.
82    ///
83    /// **Warning**: If you have watch desyncs, it is possible to miss changes entirely.
84    /// It is therefore not recommended to disable requeuing this way, unless you have
85    /// frequent changes to the underlying object, or some other hook to retain eventual consistency.
86    #[must_use]
87    pub fn await_change() -> Self {
88        Self { requeue_after: None }
89    }
90}
91
92/// Helper for building custom trigger filters, see the implementations of [`trigger_self`] and [`trigger_owners`] for some examples.
93pub fn trigger_with<T, K, I, S>(
94    stream: S,
95    mapper: impl Fn(T) -> I,
96) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
97where
98    S: TryStream<Ok = T>,
99    I: IntoIterator,
100    I::Item: Into<ReconcileRequest<K>>,
101    K: Resource,
102{
103    stream
104        .map_ok(move |obj| stream::iter(mapper(obj).into_iter().map(Into::into).map(Ok)))
105        .try_flatten()
106}
107
108/// Enqueues the object itself for reconciliation
109pub fn trigger_self<K, S>(
110    stream: S,
111    dyntype: K::DynamicType,
112) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
113where
114    S: TryStream<Ok = K>,
115    K: Resource,
116    K::DynamicType: Clone,
117{
118    trigger_with(stream, move |obj| {
119        Some(ReconcileRequest {
120            obj_ref: ObjectRef::from_obj_with(&obj, dyntype.clone()),
121            reason: ReconcileReason::ObjectUpdated,
122        })
123    })
124}
125
126/// Enqueues the object itself for reconciliation when the object is behind a
127/// shared pointer
128#[cfg(feature = "unstable-runtime-subscribe")]
129fn trigger_self_shared<K, S>(
130    stream: S,
131    dyntype: K::DynamicType,
132) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
133where
134    // Input stream has item as some Arc'd Resource (via
135    // Controller::for_shared_stream)
136    S: TryStream<Ok = Arc<K>>,
137    K: Resource,
138    K::DynamicType: Clone,
139{
140    trigger_with(stream, move |obj| {
141        Some(ReconcileRequest {
142            obj_ref: ObjectRef::from_obj_with(obj.as_ref(), dyntype.clone()),
143            reason: ReconcileReason::ObjectUpdated,
144        })
145    })
146}
147
148/// Enqueues any mapper returned `K` types for reconciliation
149fn trigger_others<S, K, I>(
150    stream: S,
151    mapper: impl Fn(S::Ok) -> I + Sync + Send + 'static,
152    dyntype: <S::Ok as Resource>::DynamicType,
153) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
154where
155    // Input stream has items as some Resource (via Controller::watches)
156    S: TryStream,
157    S::Ok: Resource,
158    <S::Ok as Resource>::DynamicType: Clone,
159    // Output stream is requests for the root type K
160    K: Resource,
161    K::DynamicType: Clone,
162    // but the mapper can produce many of them
163    I: 'static + IntoIterator<Item = ObjectRef<K>>,
164    I::IntoIter: Send,
165{
166    trigger_with(stream, move |obj| {
167        let watch_ref = ObjectRef::from_obj_with(&obj, dyntype.clone()).erase();
168        mapper(obj)
169            .into_iter()
170            .map(move |mapped_obj_ref| ReconcileRequest {
171                obj_ref: mapped_obj_ref,
172                reason: ReconcileReason::RelatedObjectUpdated {
173                    obj_ref: Box::new(watch_ref.clone()),
174                },
175            })
176    })
177}
178
179/// Enqueues any mapper returned `Arc<K>` types for reconciliation
180#[cfg(feature = "unstable-runtime-subscribe")]
181fn trigger_others_shared<S, O, K, I>(
182    stream: S,
183    mapper: impl Fn(S::Ok) -> I + Sync + Send + 'static,
184    dyntype: O::DynamicType,
185) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
186where
187    // Input is some shared resource (`Arc<O>`) obtained via `reflect`
188    S: TryStream<Ok = Arc<O>>,
189    O: Resource,
190    O::DynamicType: Clone,
191    // Output stream is requests for the root type K
192    K: Resource,
193    K::DynamicType: Clone,
194    // but the mapper can produce many of them
195    I: 'static + IntoIterator<Item = ObjectRef<K>>,
196    I::IntoIter: Send,
197{
198    trigger_with(stream, move |obj| {
199        let watch_ref = ObjectRef::from_obj_with(obj.as_ref(), dyntype.clone()).erase();
200        mapper(obj)
201            .into_iter()
202            .map(move |mapped_obj_ref| ReconcileRequest {
203                obj_ref: mapped_obj_ref,
204                reason: ReconcileReason::RelatedObjectUpdated {
205                    obj_ref: Box::new(watch_ref.clone()),
206                },
207            })
208    })
209}
210
211/// Enqueues any owners of type `KOwner` for reconciliation
212pub fn trigger_owners<KOwner, S>(
213    stream: S,
214    owner_type: KOwner::DynamicType,
215    child_type: <S::Ok as Resource>::DynamicType,
216) -> impl Stream<Item = Result<ReconcileRequest<KOwner>, S::Error>>
217where
218    S: TryStream,
219    S::Ok: Resource,
220    <S::Ok as Resource>::DynamicType: Clone,
221    KOwner: Resource,
222    KOwner::DynamicType: Clone,
223{
224    let mapper = move |obj: S::Ok| {
225        let meta = obj.meta().clone();
226        let ns = meta.namespace;
227        let owner_type = owner_type.clone();
228        meta.owner_references
229            .into_iter()
230            .flatten()
231            .filter_map(move |owner| ObjectRef::from_owner_ref(ns.as_deref(), &owner, owner_type.clone()))
232    };
233    trigger_others(stream, mapper, child_type)
234}
235
236// TODO: do we really need to deal with a trystream? can we simplify this at
237// all?
238/// Enqueues any owners of type `KOwner` for reconciliation based on a stream of
239/// owned `K` objects
240#[cfg(feature = "unstable-runtime-subscribe")]
241fn trigger_owners_shared<KOwner, S, K>(
242    stream: S,
243    owner_type: KOwner::DynamicType,
244    child_type: K::DynamicType,
245) -> impl Stream<Item = Result<ReconcileRequest<KOwner>, S::Error>>
246where
247    S: TryStream<Ok = Arc<K>>,
248    K: Resource,
249    K::DynamicType: Clone,
250    KOwner: Resource,
251    KOwner::DynamicType: Clone,
252{
253    let mapper = move |obj: S::Ok| {
254        let meta = obj.meta().clone();
255        let ns = meta.namespace;
256        let owner_type = owner_type.clone();
257        meta.owner_references
258            .into_iter()
259            .flatten()
260            .filter_map(move |owner| ObjectRef::from_owner_ref(ns.as_deref(), &owner, owner_type.clone()))
261    };
262    trigger_others_shared(stream, mapper, child_type)
263}
264
265/// A request to reconcile an object, annotated with why that request was made.
266///
267/// NOTE: The reason is ignored for comparison purposes. This means that, for example,
268/// an object can only occupy one scheduler slot, even if it has been scheduled for multiple reasons.
269/// In this case, only *the first* reason is stored.
270#[derive(Educe)]
271#[educe(
272    Debug(bound("K::DynamicType: Debug")),
273    Clone(bound("K::DynamicType: Clone")),
274    PartialEq(bound("K::DynamicType: PartialEq")),
275    Hash(bound("K::DynamicType: Hash"))
276)]
277pub struct ReconcileRequest<K: Resource> {
278    pub obj_ref: ObjectRef<K>,
279    #[educe(PartialEq(ignore), Hash(ignore))]
280    pub reason: ReconcileReason,
281}
282
283impl<K: Resource> Eq for ReconcileRequest<K> where K::DynamicType: Eq {}
284
285impl<K: Resource> From<ObjectRef<K>> for ReconcileRequest<K> {
286    fn from(obj_ref: ObjectRef<K>) -> Self {
287        ReconcileRequest {
288            obj_ref,
289            reason: ReconcileReason::Unknown,
290        }
291    }
292}
293
294#[derive(Debug, Clone)]
295pub enum ReconcileReason {
296    Unknown,
297    ObjectUpdated,
298    RelatedObjectUpdated { obj_ref: Box<ObjectRef<DynamicObject>> },
299    ReconcilerRequestedRetry,
300    ErrorPolicyRequestedRetry,
301    BulkReconcile,
302    Custom { reason: String },
303}
304
305impl Display for ReconcileReason {
306    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
307        match self {
308            ReconcileReason::Unknown => f.write_str("unknown"),
309            ReconcileReason::ObjectUpdated => f.write_str("object updated"),
310            ReconcileReason::RelatedObjectUpdated { obj_ref: object } => {
311                f.write_fmt(format_args!("related object updated: {object}"))
312            }
313            ReconcileReason::BulkReconcile => f.write_str("bulk reconcile requested"),
314            ReconcileReason::ReconcilerRequestedRetry => f.write_str("reconciler requested retry"),
315            ReconcileReason::ErrorPolicyRequestedRetry => f.write_str("error policy requested retry"),
316            ReconcileReason::Custom { reason } => f.write_str(reason),
317        }
318    }
319}
320
321const APPLIER_REQUEUE_BUF_SIZE: usize = 100;
322
323/// Apply a reconciler to an input stream, with a given retry policy
324///
325/// Takes a `store` parameter for the core objects, which should usually be updated by a [`reflector()`].
326///
327/// The `queue` indicates which objects should be reconciled. For the core objects this will usually be
328/// the [`reflector()`] (piped through [`trigger_self`]). If your core objects own any subobjects then you
329/// can also make them trigger reconciliations by [merging](`futures::stream::select`) the [`reflector()`]
330/// with a [`watcher()`] or [`reflector()`] for the subobject.
331///
332/// This is the "hard-mode" version of [`Controller`], which allows you some more customization
333/// (such as triggering from arbitrary [`Stream`]s), at the cost of being a bit more verbose.
334#[allow(clippy::needless_pass_by_value)]
335#[allow(clippy::type_complexity)]
336pub fn applier<K, QueueStream, ReconcilerFut, Ctx>(
337    mut reconciler: impl FnMut(Arc<K>, Arc<Ctx>) -> ReconcilerFut,
338    error_policy: impl Fn(Arc<K>, &ReconcilerFut::Error, Arc<Ctx>) -> Action,
339    context: Arc<Ctx>,
340    store: Store<K>,
341    queue: QueueStream,
342    config: Config,
343) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, QueueStream::Error>>>
344where
345    K: Clone + Resource + 'static,
346    K::DynamicType: Debug + Eq + Hash + Clone + Unpin,
347    ReconcilerFut: TryFuture<Ok = Action> + Unpin,
348    ReconcilerFut::Error: std::error::Error + 'static,
349    QueueStream: TryStream,
350    QueueStream::Ok: Into<ReconcileRequest<K>>,
351    QueueStream::Error: std::error::Error + 'static,
352{
353    let (scheduler_shutdown_tx, scheduler_shutdown_rx) = channel::oneshot::channel();
354    let (scheduler_tx, scheduler_rx) =
355        channel::mpsc::channel::<ScheduleRequest<ReconcileRequest<K>>>(APPLIER_REQUEUE_BUF_SIZE);
356    let error_policy = Arc::new(error_policy);
357    let delay_store = store.clone();
358    // Create a stream of ObjectRefs that need to be reconciled
359    trystream_try_via(
360        // input: stream combining scheduled tasks and user specified inputs event
361        Box::pin(stream::select(
362            // 1. inputs from users queue stream
363            queue
364                .map_err(Error::QueueError)
365                .map_ok(|request| ScheduleRequest {
366                    message: request.into(),
367                    run_at: Instant::now(),
368                })
369                .on_complete(async move {
370                    // On error: scheduler has already been shut down and there is nothing for us to do
371                    let _ = scheduler_shutdown_tx.send(());
372                    tracing::debug!("applier queue terminated, starting graceful shutdown")
373                }),
374            // 2. requests sent to scheduler_tx
375            scheduler_rx
376                .map(Ok)
377                .take_until(scheduler_shutdown_rx)
378                .on_complete(async { tracing::debug!("applier scheduler consumer terminated") }),
379        )),
380        // all the Oks from the select gets passed through the scheduler stream, and are then executed
381        move |s| {
382            Runner::new(
383                debounced_scheduler(s, config.debounce),
384                config.concurrency,
385                move |request| {
386                    let request = request.clone();
387                    match store.get(&request.obj_ref) {
388                        Some(obj) => {
389                            let scheduler_tx = scheduler_tx.clone();
390                            let error_policy_ctx = context.clone();
391                            let error_policy = error_policy.clone();
392                            let reconciler_span = info_span!(
393                                "reconciling object",
394                                "object.ref" = %request.obj_ref,
395                                object.reason = %request.reason
396                            );
397                            reconciler_span
398                                .in_scope(|| reconciler(Arc::clone(&obj), context.clone()))
399                                .into_future()
400                                .then(move |res| {
401                                    let error_policy = error_policy;
402                                    RescheduleReconciliation::new(
403                                        res,
404                                        |err| error_policy(obj, err, error_policy_ctx),
405                                        request.obj_ref.clone(),
406                                        scheduler_tx,
407                                    )
408                                    // Reconciler errors are OK from the applier's PoV, we need to apply the error policy
409                                    // to them separately
410                                    .map(|res| Ok((request.obj_ref, res)))
411                                })
412                                .instrument(reconciler_span)
413                                .left_future()
414                        }
415                        None => std::future::ready(Err(Error::ObjectNotFound(request.obj_ref.erase())))
416                            .right_future(),
417                    }
418                },
419            )
420            .delay_tasks_until(async move {
421                tracing::debug!("applier runner held until store is ready");
422                let res = delay_store.wait_until_ready().await;
423                tracing::debug!("store is ready, starting runner");
424                res
425            })
426            .map(|runner_res| runner_res.unwrap_or_else(|err| Err(Error::RunnerError(err))))
427            .on_complete(async { tracing::debug!("applier runner terminated") })
428        },
429    )
430    .on_complete(async { tracing::debug!("applier runner-merge terminated") })
431    // finally, for each completed reconcile call:
432    .and_then(move |(obj_ref, reconciler_result)| async move {
433        match reconciler_result {
434            Ok(action) => Ok((obj_ref, action)),
435            Err(err) => Err(Error::ReconcilerFailed(err, obj_ref.erase())),
436        }
437    })
438    .on_complete(async { tracing::debug!("applier terminated") })
439}
440
441/// Internal helper [`Future`] that reschedules reconciliation of objects (if required), in the scheduled context of the reconciler
442///
443/// This could be an `async fn`, but isn't because we want it to be [`Unpin`]
444#[pin_project]
445#[must_use]
446struct RescheduleReconciliation<K: Resource, ReconcilerErr> {
447    reschedule_tx: channel::mpsc::Sender<ScheduleRequest<ReconcileRequest<K>>>,
448
449    reschedule_request: Option<ScheduleRequest<ReconcileRequest<K>>>,
450    result: Option<Result<Action, ReconcilerErr>>,
451}
452
453impl<K, ReconcilerErr> RescheduleReconciliation<K, ReconcilerErr>
454where
455    K: Resource,
456{
457    fn new(
458        result: Result<Action, ReconcilerErr>,
459        error_policy: impl FnOnce(&ReconcilerErr) -> Action,
460        obj_ref: ObjectRef<K>,
461        reschedule_tx: channel::mpsc::Sender<ScheduleRequest<ReconcileRequest<K>>>,
462    ) -> Self {
463        let reconciler_finished_at = Instant::now();
464
465        let (action, reschedule_reason) = result.as_ref().map_or_else(
466            |err| (error_policy(err), ReconcileReason::ErrorPolicyRequestedRetry),
467            |action| (action.clone(), ReconcileReason::ReconcilerRequestedRetry),
468        );
469
470        Self {
471            reschedule_tx,
472            reschedule_request: action.requeue_after.map(|requeue_after| ScheduleRequest {
473                message: ReconcileRequest {
474                    obj_ref,
475                    reason: reschedule_reason,
476                },
477                run_at: reconciler_finished_at
478                    .checked_add(requeue_after)
479                    .unwrap_or_else(crate::scheduler::far_future),
480            }),
481            result: Some(result),
482        }
483    }
484}
485
486impl<K, ReconcilerErr> Future for RescheduleReconciliation<K, ReconcilerErr>
487where
488    K: Resource,
489{
490    type Output = Result<Action, ReconcilerErr>;
491
492    fn poll(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> Poll<Self::Output> {
493        let this = self.get_mut();
494
495        if this.reschedule_request.is_some() {
496            let rescheduler_ready = ready!(this.reschedule_tx.poll_ready(cx));
497            let reschedule_request = this
498                .reschedule_request
499                .take()
500                .expect("PostReconciler::reschedule_request was taken during processing");
501            // Failure to schedule item = in graceful shutdown mode, ignore
502            if let Ok(()) = rescheduler_ready {
503                let _ = this.reschedule_tx.start_send(reschedule_request);
504            }
505        }
506
507        Poll::Ready(
508            this.result
509                .take()
510                .expect("PostReconciler::result was already taken"),
511        )
512    }
513}
514
515/// Accumulates all options that can be used on a [`Controller`] invocation.
516#[derive(Clone, Debug, Default)]
517pub struct Config {
518    debounce: Duration,
519    concurrency: u16,
520}
521
522impl Config {
523    /// The debounce duration used to deduplicate reconciliation requests.
524    ///
525    /// When set to a non-zero duration, debouncing is enabled in the [`scheduler`](crate::scheduler())
526    /// resulting in __trailing edge debouncing__ of reconciler requests.
527    /// This option can help to reduce the amount of unnecessary reconciler calls
528    /// when using multiple controller relations, or during rapid phase transitions.
529    ///
530    /// ## Warning
531    /// This option delays (and keeps delaying) reconcile requests for objects while
532    /// the object is updated. It can **permanently hide** updates from your reconciler
533    /// if set too high on objects that are updated frequently (like nodes).
534    #[must_use]
535    pub fn debounce(mut self, debounce: Duration) -> Self {
536        self.debounce = debounce;
537        self
538    }
539
540    /// The number of concurrent reconciliations of that are allowed to run at an given moment.
541    ///
542    /// This can be adjusted to the controller's needs to increase
543    /// performance and/or make performance predictable. By default, its 0 meaning
544    /// the controller runs with unbounded concurrency.
545    ///
546    /// Note that despite concurrency, a controller never schedules concurrent reconciles
547    /// on the same object.
548    #[must_use]
549    pub fn concurrency(mut self, concurrency: u16) -> Self {
550        self.concurrency = concurrency;
551        self
552    }
553}
554
555/// Controller for a Resource `K`
556///
557/// A controller is an infinite stream of objects to be reconciled.
558///
559/// Once `run` and continuously awaited, it continuously calls out to user provided
560/// `reconcile` and `error_policy` callbacks whenever relevant changes are detected
561/// or if errors are seen from `reconcile`.
562///
563/// Reconciles are generally requested for all changes on your root objects.
564/// Changes to managed child resources will also trigger the reconciler for the
565/// managing object by traversing owner references (for `Controller::owns`),
566/// or traverse a custom mapping (for `Controller::watches`).
567///
568/// This mapping mechanism ultimately hides the reason for the reconciliation request,
569/// and forces you to write an idempotent reconciler.
570///
571/// General setup:
572/// ```no_run
573/// use kube::{Api, Client, CustomResource};
574/// use kube::runtime::{controller::{Controller, Action}, watcher};
575/// # use serde::{Deserialize, Serialize};
576/// # use tokio::time::Duration;
577/// use futures::StreamExt;
578/// use k8s_openapi::api::core::v1::ConfigMap;
579/// use schemars::JsonSchema;
580/// # use std::sync::Arc;
581/// use thiserror::Error;
582///
583/// #[derive(Debug, Error)]
584/// enum Error {}
585///
586/// /// A custom resource
587/// #[derive(CustomResource, Debug, Clone, Deserialize, Serialize, JsonSchema)]
588/// #[kube(group = "nullable.se", version = "v1", kind = "ConfigMapGenerator", namespaced)]
589/// struct ConfigMapGeneratorSpec {
590///     content: String,
591/// }
592///
593/// /// The reconciler that will be called when either object change
594/// async fn reconcile(g: Arc<ConfigMapGenerator>, _ctx: Arc<()>) -> Result<Action, Error> {
595///     // .. use api here to reconcile a child ConfigMap with ownerreferences
596///     // see configmapgen_controller example for full info
597///     Ok(Action::requeue(Duration::from_secs(300)))
598/// }
599/// /// an error handler that will be called when the reconciler fails with access to both the
600/// /// object that caused the failure and the actual error
601/// fn error_policy(obj: Arc<ConfigMapGenerator>, _error: &Error, _ctx: Arc<()>) -> Action {
602///     Action::requeue(Duration::from_secs(60))
603/// }
604///
605/// /// something to drive the controller
606/// #[tokio::main]
607/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
608///     let client = Client::try_default().await?;
609///     let context = Arc::new(()); // bad empty context - put client in here
610///     let cmgs = Api::<ConfigMapGenerator>::all(client.clone());
611///     let cms = Api::<ConfigMap>::all(client.clone());
612///     Controller::new(cmgs, watcher::Config::default())
613///         .owns(cms, watcher::Config::default())
614///         .run(reconcile, error_policy, context)
615///         .for_each(|res| async move {
616///             match res {
617///                 Ok(o) => println!("reconciled {:?}", o),
618///                 Err(e) => println!("reconcile failed: {:?}", e),
619///             }
620///         })
621///         .await; // controller does nothing unless polled
622///     Ok(())
623/// }
624/// ```
625pub struct Controller<K>
626where
627    K: Clone + Resource + Debug + 'static,
628    K::DynamicType: Eq + Hash,
629{
630    // NB: Need to Unpin for stream::select_all
631    trigger_selector: stream::SelectAll<BoxStream<'static, Result<ReconcileRequest<K>, watcher::Error>>>,
632    trigger_backoff: Box<dyn Backoff + Send>,
633    /// [`run`](crate::Controller::run) starts a graceful shutdown when any of these [`Future`]s complete,
634    /// refusing to start any new reconciliations but letting any existing ones finish.
635    graceful_shutdown_selector: Vec<BoxFuture<'static, ()>>,
636    /// [`run`](crate::Controller::run) terminates immediately when any of these [`Future`]s complete,
637    /// requesting that all running reconciliations be aborted.
638    /// However, note that they *will* keep running until their next yield point (`.await`),
639    /// blocking [`tokio::runtime::Runtime`] destruction (unless you follow up by calling [`std::process::exit`] after `run`).
640    forceful_shutdown_selector: Vec<BoxFuture<'static, ()>>,
641    dyntype: K::DynamicType,
642    reader: Store<K>,
643    config: Config,
644}
645
646impl<K> Controller<K>
647where
648    K: Clone + Resource + DeserializeOwned + Debug + Send + Sync + 'static,
649    K::DynamicType: Eq + Hash + Clone,
650{
651    /// Create a Controller for a resource `K`
652    ///
653    /// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `K`.
654    ///
655    /// The [`watcher::Config`] controls to the possible subset of objects of `K` that you want to manage
656    /// and receive reconcile events for.
657    /// For the full set of objects `K` in the given `Api` scope, you can use [`watcher::Config::default`].
658    #[must_use]
659    pub fn new(main_api: Api<K>, wc: watcher::Config) -> Self
660    where
661        K::DynamicType: Default,
662    {
663        Self::new_with(main_api, wc, Default::default())
664    }
665
666    /// Create a Controller for a resource `K`
667    ///
668    /// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `K`.
669    ///
670    /// The [`watcher::Config`] lets you define a possible subset of objects of `K` that you want the [`Api`]
671    /// to watch - in the Api's  configured scope - and receive reconcile events for.
672    /// For the full set of objects `K` in the given `Api` scope, you can use [`Config::default`].
673    ///
674    /// This variant constructor is for [`dynamic`] types found through discovery. Prefer [`Controller::new`] for static types.
675    ///
676    /// [`watcher::Config`]: crate::watcher::Config
677    /// [`Api`]: kube_client::Api
678    /// [`dynamic`]: kube_client::core::dynamic
679    /// [`Config::default`]: crate::watcher::Config::default
680    pub fn new_with(main_api: Api<K>, wc: watcher::Config, dyntype: K::DynamicType) -> Self {
681        let writer = Writer::<K>::new(dyntype.clone());
682        let reader = writer.as_reader();
683        let mut trigger_selector = stream::SelectAll::new();
684        let self_watcher = trigger_self(
685            reflector(writer, watcher(main_api, wc)).applied_objects(),
686            dyntype.clone(),
687        )
688        .boxed();
689        trigger_selector.push(self_watcher);
690        Self {
691            trigger_selector,
692            trigger_backoff: Box::<DefaultBackoff>::default(),
693            graceful_shutdown_selector: vec![
694                // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
695                future::pending().boxed(),
696            ],
697            forceful_shutdown_selector: vec![
698                // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
699                future::pending().boxed(),
700            ],
701            dyntype,
702            reader,
703            config: Default::default(),
704        }
705    }
706
707    /// Create a Controller for a resource `K` from a stream of `K` objects
708    ///
709    /// Same as [`Controller::new`], but instead of an `Api`, a stream of resources is used.
710    /// This allows for customized and pre-filtered watch streams to be used as a trigger,
711    /// as well as sharing input streams between multiple controllers.
712    ///
713    /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature.
714    ///
715    /// # Example:
716    ///
717    /// ```no_run
718    /// # use futures::StreamExt;
719    /// # use k8s_openapi::api::apps::v1::Deployment;
720    /// # use kube::runtime::controller::{Action, Controller};
721    /// # use kube::runtime::{predicates, watcher, reflector, WatchStreamExt};
722    /// # use kube::{Api, Client, Error, ResourceExt};
723    /// # use std::sync::Arc;
724    /// # async fn reconcile(_: Arc<Deployment>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
725    /// # fn error_policy(_: Arc<Deployment>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
726    /// # async fn doc(client: kube::Client) {
727    /// let api: Api<Deployment> = Api::default_namespaced(client);
728    /// let (reader, writer) = reflector::store();
729    /// let deploys = watcher(api, watcher::Config::default())
730    ///     .default_backoff()
731    ///     .reflect(writer)
732    ///     .applied_objects()
733    ///     .predicate_filter(predicates::generation);
734    ///
735    /// Controller::for_stream(deploys, reader)
736    ///     .run(reconcile, error_policy, Arc::new(()))
737    ///     .for_each(|_| std::future::ready(()))
738    ///     .await;
739    /// # }
740    /// ```
741    ///
742    /// Prefer [`Controller::new`] if you do not need to share the stream, or do not need pre-filtering.
743    #[cfg(feature = "unstable-runtime-stream-control")]
744    pub fn for_stream(
745        trigger: impl Stream<Item = Result<K, watcher::Error>> + Send + 'static,
746        reader: Store<K>,
747    ) -> Self
748    where
749        K::DynamicType: Default,
750    {
751        Self::for_stream_with(trigger, reader, Default::default())
752    }
753
754    /// Create a Controller for a resource `K` from a stream of `K` objects
755    ///
756    /// Same as [`Controller::new`], but instead of an `Api`, a stream of resources is used.
757    /// This allows for customized and pre-filtered watch streams to be used as a trigger,
758    /// as well as sharing input streams between multiple controllers.
759    ///
760    /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature.
761    ///
762    /// Prefer [`Controller::new`] if you do not need to share the stream, or do not need pre-filtering.
763    ///
764    /// This variant constructor is for [`dynamic`] types found through discovery. Prefer [`Controller::for_stream`] for static types.
765    ///
766    /// [`dynamic`]: kube_client::core::dynamic
767    #[cfg(feature = "unstable-runtime-stream-control")]
768    pub fn for_stream_with(
769        trigger: impl Stream<Item = Result<K, watcher::Error>> + Send + 'static,
770        reader: Store<K>,
771        dyntype: K::DynamicType,
772    ) -> Self {
773        let mut trigger_selector = stream::SelectAll::new();
774        let self_watcher = trigger_self(trigger, dyntype.clone()).boxed();
775        trigger_selector.push(self_watcher);
776        Self {
777            trigger_selector,
778            trigger_backoff: Box::<DefaultBackoff>::default(),
779            graceful_shutdown_selector: vec![
780                // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
781                future::pending().boxed(),
782            ],
783            forceful_shutdown_selector: vec![
784                // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
785                future::pending().boxed(),
786            ],
787            dyntype,
788            reader,
789            config: Default::default(),
790        }
791    }
792
793    /// This is the same as [`Controller::for_stream`]. Instead of taking an
794    /// `Api` (e.g. [`Controller::new`]), a stream of resources is used. Shared
795    /// streams can be created out-of-band by subscribing on a store `Writer`.
796    /// Through this interface, multiple controllers can use the same root
797    /// (shared) input stream of resources to keep memory overheads smaller.
798    ///
799    /// **N.B**: This constructor requires an
800    /// [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21)
801    /// feature.
802    ///
803    /// Prefer [`Controller::new`] or [`Controller::for_stream`] if you do not
804    /// need to share the stream.
805    ///
806    /// ## Warning:
807    ///
808    /// You **must** ensure the root stream (i.e. stream created through a `reflector()`)
809    /// is driven to readiness independently of this controller to ensure the
810    /// watcher never deadlocks.
811    ///
812    /// # Example:
813    ///
814    /// ```no_run
815    /// # use futures::StreamExt;
816    /// # use k8s_openapi::api::apps::v1::Deployment;
817    /// # use kube::runtime::controller::{Action, Controller};
818    /// # use kube::runtime::{predicates, watcher, reflector, WatchStreamExt};
819    /// # use kube::{Api, Client, Error, ResourceExt};
820    /// # use std::sync::Arc;
821    /// # async fn reconcile(_: Arc<Deployment>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
822    /// # fn error_policy(_: Arc<Deployment>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
823    /// # async fn doc(client: kube::Client) {
824    /// let api: Api<Deployment> = Api::default_namespaced(client);
825    /// let (reader, writer) = reflector::store_shared(128);
826    /// let subscriber = writer
827    ///     .subscribe()
828    ///     .expect("subscribers can only be created from shared stores");
829    /// let deploys = watcher(api, watcher::Config::default())
830    ///     .default_backoff()
831    ///     .reflect(writer)
832    ///     .applied_objects()
833    ///     .for_each(|ev| async move {
834    ///         match ev {
835    ///             Ok(obj) => tracing::info!("got obj {obj:?}"),
836    ///             Err(error) => tracing::error!(%error, "received error")
837    ///         }
838    ///     });
839    ///
840    /// let controller = Controller::for_shared_stream(subscriber, reader)
841    ///     .run(reconcile, error_policy, Arc::new(()))
842    ///     .for_each(|ev| async move {
843    ///         tracing::info!("reconciled {ev:?}")
844    ///     });
845    ///
846    /// // Drive streams using a select statement
847    /// tokio::select! {
848    ///   _ = deploys => {},
849    ///   _ = controller => {},
850    /// }
851    /// # }
852    #[cfg(feature = "unstable-runtime-subscribe")]
853    pub fn for_shared_stream(trigger: impl Stream<Item = Arc<K>> + Send + 'static, reader: Store<K>) -> Self
854    where
855        K::DynamicType: Default,
856    {
857        Self::for_shared_stream_with(trigger, reader, Default::default())
858    }
859
860    /// This is the same as [`Controller::for_stream`]. Instead of taking an
861    /// `Api` (e.g. [`Controller::new`]), a stream of resources is used. Shared
862    /// streams can be created out-of-band by subscribing on a store `Writer`.
863    /// Through this interface, multiple controllers can use the same root
864    /// (shared) input stream of resources to keep memory overheads smaller.
865    ///
866    /// **N.B**: This constructor requires an
867    /// [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21)
868    /// feature.
869    ///
870    /// Prefer [`Controller::new`] or [`Controller::for_stream`] if you do not
871    /// need to share the stream.
872    ///
873    /// This variant constructor is used for [`dynamic`] types found through
874    /// discovery. Prefer [`Controller::for_shared_stream`] for static types (i.e.
875    /// known at compile time).
876    ///
877    /// [`dynamic`]: kube_client::core::dynamic
878    #[cfg(feature = "unstable-runtime-subscribe")]
879    pub fn for_shared_stream_with(
880        trigger: impl Stream<Item = Arc<K>> + Send + 'static,
881        reader: Store<K>,
882        dyntype: K::DynamicType,
883    ) -> Self {
884        let mut trigger_selector = stream::SelectAll::new();
885        let self_watcher = trigger_self_shared(trigger.map(Ok), dyntype.clone()).boxed();
886        trigger_selector.push(self_watcher);
887        Self {
888            trigger_selector,
889            trigger_backoff: Box::<DefaultBackoff>::default(),
890            graceful_shutdown_selector: vec![
891                // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
892                future::pending().boxed(),
893            ],
894            forceful_shutdown_selector: vec![
895                // Fallback future, ensuring that we never terminate if no additional futures are added to the selector
896                future::pending().boxed(),
897            ],
898            dyntype,
899            reader,
900            config: Default::default(),
901        }
902    }
903
904    /// Specify the configuration for the controller's behavior.
905    #[must_use]
906    pub fn with_config(mut self, config: Config) -> Self {
907        self.config = config;
908        self
909    }
910
911    /// Specify the backoff policy for "trigger" watches
912    ///
913    /// This includes the core watch, as well as auxilary watches introduced by [`Self::owns`] and [`Self::watches`].
914    ///
915    /// The [`default_backoff`](crate::watcher::default_backoff) follows client-go conventions,
916    /// but can be overridden by calling this method.
917    #[must_use]
918    pub fn trigger_backoff(mut self, backoff: impl Backoff + Send + 'static) -> Self {
919        self.trigger_backoff = Box::new(backoff);
920        self
921    }
922
923    /// Retrieve a copy of the reader before starting the controller
924    pub fn store(&self) -> Store<K> {
925        self.reader.clone()
926    }
927
928    /// Specify `Child` objects which `K` owns and should be watched
929    ///
930    /// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `Child`.
931    /// All owned `Child` objects **must** contain an [`OwnerReference`] pointing back to a `K`.
932    ///
933    /// The [`watcher::Config`] controls the subset of `Child` objects that you want the [`Api`]
934    /// to watch - in the Api's configured scope - and receive reconcile events for.
935    /// To watch the full set of `Child` objects in the given `Api` scope, you can use [`watcher::Config::default`].
936    ///
937    /// [`OwnerReference`]: k8s_openapi::apimachinery::pkg::apis::meta::v1::OwnerReference
938    #[must_use]
939    pub fn owns<Child: Clone + Resource<DynamicType = ()> + DeserializeOwned + Debug + Send + 'static>(
940        self,
941        api: Api<Child>,
942        wc: watcher::Config,
943    ) -> Self {
944        self.owns_with(api, (), wc)
945    }
946
947    /// Specify `Child` objects which `K` owns and should be watched
948    ///
949    /// Same as [`Controller::owns`], but accepts a `DynamicType` so it can be used with dynamic resources.
950    #[must_use]
951    pub fn owns_with<Child: Clone + Resource + DeserializeOwned + Debug + Send + 'static>(
952        mut self,
953        api: Api<Child>,
954        dyntype: Child::DynamicType,
955        wc: watcher::Config,
956    ) -> Self
957    where
958        Child::DynamicType: Debug + Eq + Hash + Clone,
959    {
960        // TODO: call owns_stream_with when it's stable
961        let child_watcher = trigger_owners(
962            metadata_watcher(api, wc).touched_objects(),
963            self.dyntype.clone(),
964            dyntype,
965        );
966        self.trigger_selector.push(child_watcher.boxed());
967        self
968    }
969
970    /// Trigger the reconciliation process for a stream of `Child` objects of the owner `K`
971    ///
972    /// Same as [`Controller::owns`], but instead of an `Api`, a stream of resources is used.
973    /// This allows for customized and pre-filtered watch streams to be used as a trigger,
974    /// as well as sharing input streams between multiple controllers.
975    ///
976    /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature.
977    ///
978    /// Watcher streams passed in here should be filtered first through `touched_objects`.
979    ///
980    /// # Example:
981    ///
982    /// ```no_run
983    /// # use futures::StreamExt;
984    /// # use k8s_openapi::api::core::v1::ConfigMap;
985    /// # use k8s_openapi::api::apps::v1::StatefulSet;
986    /// # use kube::runtime::controller::Action;
987    /// # use kube::runtime::{predicates, metadata_watcher, watcher, Controller, WatchStreamExt};
988    /// # use kube::{Api, Client, Error, ResourceExt};
989    /// # use std::sync::Arc;
990    /// # type CustomResource = ConfigMap;
991    /// # async fn reconcile(_: Arc<CustomResource>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
992    /// # fn error_policy(_: Arc<CustomResource>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
993    /// # async fn doc(client: kube::Client) {
994    /// let sts_stream = metadata_watcher(Api::<StatefulSet>::all(client.clone()), watcher::Config::default())
995    ///     .touched_objects()
996    ///     .predicate_filter(predicates::generation);
997    ///
998    /// Controller::new(Api::<CustomResource>::all(client), watcher::Config::default())
999    ///     .owns_stream(sts_stream)
1000    ///     .run(reconcile, error_policy, Arc::new(()))
1001    ///     .for_each(|_| std::future::ready(()))
1002    ///     .await;
1003    /// # }
1004    /// ```
1005    #[cfg(feature = "unstable-runtime-stream-control")]
1006    #[must_use]
1007    pub fn owns_stream<Child: Resource<DynamicType = ()> + Send + 'static>(
1008        self,
1009        trigger: impl Stream<Item = Result<Child, watcher::Error>> + Send + 'static,
1010    ) -> Self {
1011        self.owns_stream_with(trigger, ())
1012    }
1013
1014    /// Trigger the reconciliation process for a stream of `Child` objects of the owner `K`
1015    ///
1016    /// Same as [`Controller::owns`], but instead of an `Api`, a stream of resources is used.
1017    /// This allows for customized and pre-filtered watch streams to be used as a trigger,
1018    /// as well as sharing input streams between multiple controllers.
1019    ///
1020    /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature.
1021    ///
1022    /// Same as [`Controller::owns_stream`], but accepts a `DynamicType` so it can be used with dynamic resources.
1023    #[cfg(feature = "unstable-runtime-stream-control")]
1024    #[must_use]
1025    pub fn owns_stream_with<Child: Resource + Send + 'static>(
1026        mut self,
1027        trigger: impl Stream<Item = Result<Child, watcher::Error>> + Send + 'static,
1028        dyntype: Child::DynamicType,
1029    ) -> Self
1030    where
1031        Child::DynamicType: Debug + Eq + Hash + Clone,
1032    {
1033        let child_watcher = trigger_owners(trigger, self.dyntype.clone(), dyntype);
1034        self.trigger_selector.push(child_watcher.boxed());
1035        self
1036    }
1037
1038    /// This is the same as [`Controller::for_stream`]. Instead of taking an
1039    /// `Api` (e.g. [`Controller::new`]), a stream of resources is used. Shared
1040    /// streams can be created out-of-band by subscribing on a store `Writer`.
1041    /// Through this interface, multiple controllers can use the same root
1042    /// (shared) input stream of resources to keep memory overheads smaller.
1043    ///
1044    /// **N.B**: This constructor requires an
1045    /// [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21)
1046    /// feature.
1047    ///
1048    /// Prefer [`Controller::new`] or [`Controller::for_stream`] if you do not
1049    /// need to share the stream.
1050    ///
1051    /// ## Warning:
1052    ///
1053    /// You **must** ensure the root stream (i.e. stream created through a `reflector()`)
1054    /// is driven to readiness independently of this controller to ensure the
1055    /// watcher never deadlocks.
1056    ///
1057    ///
1058    /// Trigger the reconciliation process for a shared stream of `Child`
1059    /// objects of the owner `K`
1060    ///
1061    /// Conceptually the same as [`Controller::owns`], but a stream is used
1062    /// instead of an `Api`. This interface behaves similarly to its non-shared
1063    /// counterpart [`Controller::owns_stream`].
1064    ///
1065    /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature.
1066    ///
1067    /// # Example:
1068    ///
1069    /// ```no_run
1070    /// # use futures::StreamExt;
1071    /// # use k8s_openapi::api::{apps::v1::Deployment, core::v1::Pod};
1072    /// # use kube::runtime::controller::{Action, Controller};
1073    /// # use kube::runtime::{predicates, watcher, reflector, WatchStreamExt};
1074    /// # use kube::{Api, Client, Error, ResourceExt};
1075    /// # use std::sync::Arc;
1076    /// # async fn reconcile(_: Arc<Deployment>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
1077    /// # fn error_policy(_: Arc<Deployment>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
1078    /// # async fn doc(client: kube::Client) {
1079    /// let deploys: Api<Deployment> = Api::default_namespaced(client.clone());
1080    /// let pod_api: Api<Pod> = Api::default_namespaced(client);
1081    ///
1082    /// let (reader, writer) = reflector::store_shared(128);
1083    /// let subscriber = writer
1084    ///     .subscribe()
1085    ///     .expect("subscribers can only be created from shared stores");
1086    /// let pods = watcher(pod_api, watcher::Config::default())
1087    ///     .default_backoff()
1088    ///     .reflect(writer)
1089    ///     .applied_objects()
1090    ///     .for_each(|ev| async move {
1091    ///         match ev {
1092    ///             Ok(obj) => tracing::info!("got obj {obj:?}"),
1093    ///             Err(error) => tracing::error!(%error, "received error")
1094    ///         }
1095    ///     });
1096    ///
1097    /// let controller = Controller::new(deploys, Default::default())
1098    ///     .owns_shared_stream(subscriber)
1099    ///     .run(reconcile, error_policy, Arc::new(()))
1100    ///     .for_each(|ev| async move {
1101    ///         tracing::info!("reconciled {ev:?}")
1102    ///     });
1103    ///
1104    /// // Drive streams using a select statement
1105    /// tokio::select! {
1106    ///   _ = pods => {},
1107    ///   _ = controller => {},
1108    /// }
1109    /// # }
1110    #[cfg(feature = "unstable-runtime-subscribe")]
1111    #[must_use]
1112    pub fn owns_shared_stream<Child: Resource<DynamicType = ()> + Send + 'static>(
1113        self,
1114        trigger: impl Stream<Item = Arc<Child>> + Send + 'static,
1115    ) -> Self {
1116        self.owns_shared_stream_with(trigger, ())
1117    }
1118
1119    /// Trigger the reconciliation process for a shared stream of `Child` objects of the owner `K`
1120    ///
1121    /// Same as [`Controller::owns`], but instead of an `Api`, a shared stream of resources is used.
1122    /// The source stream can be shared between multiple controllers, optimising
1123    /// resource usage.
1124    ///
1125    /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature.
1126    ///
1127    /// Same as [`Controller::owns_shared_stream`], but accepts a `DynamicType` so it can be used with dynamic resources.
1128    #[cfg(feature = "unstable-runtime-subscribe")]
1129    #[must_use]
1130    pub fn owns_shared_stream_with<Child: Resource<DynamicType = ()> + Send + 'static>(
1131        mut self,
1132        trigger: impl Stream<Item = Arc<Child>> + Send + 'static,
1133        dyntype: Child::DynamicType,
1134    ) -> Self
1135    where
1136        Child::DynamicType: Debug + Eq + Hash + Clone,
1137    {
1138        let child_watcher = trigger_owners_shared(trigger.map(Ok), self.dyntype.clone(), dyntype);
1139        self.trigger_selector.push(child_watcher.boxed());
1140        self
1141    }
1142
1143    /// Specify `Watched` object which `K` has a custom relation to and should be watched
1144    ///
1145    /// To define the `Watched` relation with `K`, you **must** define a custom relation mapper, which,
1146    /// when given a `Watched` object, returns an option or iterator of relevant `ObjectRef<K>` to reconcile.
1147    ///
1148    /// If the relation `K` has to `Watched` is that `K` owns `Watched`, consider using [`Controller::owns`].
1149    ///
1150    /// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `Watched`.
1151    ///
1152    /// The [`watcher::Config`] controls the subset of `Watched` objects that you want the [`Api`]
1153    /// to watch - in the Api's configured scope - and run through the custom mapper.
1154    /// To watch the full set of `Watched` objects in given the `Api` scope, you can use [`watcher::Config::default`].
1155    ///
1156    /// # Example
1157    ///
1158    /// Tracking cross cluster references using the [Operator-SDK] annotations.
1159    ///
1160    /// ```
1161    /// # use kube::runtime::{Controller, controller::Action, reflector::ObjectRef, watcher};
1162    /// # use kube::{Api, ResourceExt};
1163    /// # use k8s_openapi::api::core::v1::{ConfigMap, Namespace};
1164    /// # use futures::StreamExt;
1165    /// # use std::sync::Arc;
1166    /// # type WatchedResource = Namespace;
1167    /// # struct Context;
1168    /// # async fn reconcile(_: Arc<ConfigMap>, _: Arc<Context>) -> Result<Action, kube::Error> {
1169    /// #     Ok(Action::await_change())
1170    /// # };
1171    /// # fn error_policy(_: Arc<ConfigMap>, _: &kube::Error, _: Arc<Context>) -> Action {
1172    /// #     Action::await_change()
1173    /// # }
1174    /// # async fn doc(client: kube::Client) -> Result<(), Box<dyn std::error::Error>> {
1175    /// # let memcached = Api::<ConfigMap>::all(client.clone());
1176    /// # let context = Arc::new(Context);
1177    /// Controller::new(memcached, watcher::Config::default())
1178    ///     .watches(
1179    ///         Api::<WatchedResource>::all(client.clone()),
1180    ///         watcher::Config::default(),
1181    ///         |ar| {
1182    ///             let prt = ar
1183    ///                 .annotations()
1184    ///                 .get("operator-sdk/primary-resource-type")
1185    ///                 .map(String::as_str);
1186    ///
1187    ///             if prt != Some("Memcached.cache.example.com") {
1188    ///                 return None;
1189    ///             }
1190    ///
1191    ///             let (namespace, name) = ar
1192    ///                 .annotations()
1193    ///                 .get("operator-sdk/primary-resource")?
1194    ///                 .split_once('/')?;
1195    ///
1196    ///             Some(ObjectRef::new(name).within(namespace))
1197    ///         }
1198    ///     )
1199    ///     .run(reconcile, error_policy, context)
1200    ///     .for_each(|_| futures::future::ready(()))
1201    ///     .await;
1202    /// # Ok(())
1203    /// # }
1204    /// ```
1205    ///
1206    /// [Operator-SDK]: https://sdk.operatorframework.io/docs/building-operators/ansible/reference/retroactively-owned-resources/
1207    #[must_use]
1208    pub fn watches<Other, I>(
1209        self,
1210        api: Api<Other>,
1211        wc: watcher::Config,
1212        mapper: impl Fn(Other) -> I + Sync + Send + 'static,
1213    ) -> Self
1214    where
1215        Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1216        Other::DynamicType: Default + Debug + Clone + Eq + Hash,
1217        I: 'static + IntoIterator<Item = ObjectRef<K>>,
1218        I::IntoIter: Send,
1219    {
1220        self.watches_with(api, Default::default(), wc, mapper)
1221    }
1222
1223    /// Specify `Watched` object which `K` has a custom relation to and should be watched
1224    ///
1225    /// Same as [`Controller::watches`], but accepts a `DynamicType` so it can be used with dynamic resources.
1226    #[must_use]
1227    pub fn watches_with<Other, I>(
1228        mut self,
1229        api: Api<Other>,
1230        dyntype: Other::DynamicType,
1231        wc: watcher::Config,
1232        mapper: impl Fn(Other) -> I + Sync + Send + 'static,
1233    ) -> Self
1234    where
1235        Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1236        I: 'static + IntoIterator<Item = ObjectRef<K>>,
1237        I::IntoIter: Send,
1238        Other::DynamicType: Debug + Clone + Eq + Hash,
1239    {
1240        let other_watcher = trigger_others(watcher(api, wc).touched_objects(), mapper, dyntype);
1241        self.trigger_selector.push(other_watcher.boxed());
1242        self
1243    }
1244
1245    /// Trigger the reconciliation process for a stream of `Other` objects related to a `K`
1246    ///
1247    /// Same as [`Controller::watches`], but instead of an `Api`, a stream of resources is used.
1248    /// This allows for customized and pre-filtered watch streams to be used as a trigger,
1249    /// as well as sharing input streams between multiple controllers.
1250    ///
1251    /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature.
1252    ///
1253    /// Watcher streams passed in here should be filtered first through `touched_objects`.
1254    ///
1255    /// # Example:
1256    ///
1257    /// ```no_run
1258    /// # use futures::StreamExt;
1259    /// # use k8s_openapi::api::core::v1::ConfigMap;
1260    /// # use k8s_openapi::api::apps::v1::DaemonSet;
1261    /// # use kube::runtime::controller::Action;
1262    /// # use kube::runtime::{predicates, reflector::ObjectRef, watcher, Controller, WatchStreamExt};
1263    /// # use kube::{Api, Client, Error, ResourceExt};
1264    /// # use std::sync::Arc;
1265    /// # type CustomResource = ConfigMap;
1266    /// # async fn reconcile(_: Arc<CustomResource>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
1267    /// # fn error_policy(_: Arc<CustomResource>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
1268    /// fn mapper(_: DaemonSet) -> Option<ObjectRef<CustomResource>> { todo!() }
1269    /// # async fn doc(client: kube::Client) {
1270    /// let api: Api<DaemonSet> = Api::all(client.clone());
1271    /// let cr: Api<CustomResource> = Api::all(client.clone());
1272    /// let daemons = watcher(api, watcher::Config::default())
1273    ///     .touched_objects()
1274    ///     .predicate_filter(predicates::generation);
1275    ///
1276    /// Controller::new(cr, watcher::Config::default())
1277    ///     .watches_stream(daemons, mapper)
1278    ///     .run(reconcile, error_policy, Arc::new(()))
1279    ///     .for_each(|_| std::future::ready(()))
1280    ///     .await;
1281    /// # }
1282    /// ```
1283    #[cfg(feature = "unstable-runtime-stream-control")]
1284    #[must_use]
1285    pub fn watches_stream<Other, I>(
1286        self,
1287        trigger: impl Stream<Item = Result<Other, watcher::Error>> + Send + 'static,
1288        mapper: impl Fn(Other) -> I + Sync + Send + 'static,
1289    ) -> Self
1290    where
1291        Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1292        Other::DynamicType: Default + Debug + Clone,
1293        I: 'static + IntoIterator<Item = ObjectRef<K>>,
1294        I::IntoIter: Send,
1295    {
1296        self.watches_stream_with(trigger, mapper, Default::default())
1297    }
1298
1299    /// Trigger the reconciliation process for a stream of `Other` objects related to a `K`
1300    ///
1301    /// Same as [`Controller::watches`], but instead of an `Api`, a stream of resources is used.
1302    /// This allows for customized and pre-filtered watch streams to be used as a trigger,
1303    /// as well as sharing input streams between multiple controllers.
1304    ///
1305    /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature.
1306    ///
1307    /// Same as [`Controller::watches_stream`], but accepts a `DynamicType` so it can be used with dynamic resources.
1308    #[cfg(feature = "unstable-runtime-stream-control")]
1309    #[must_use]
1310    pub fn watches_stream_with<Other, I>(
1311        mut self,
1312        trigger: impl Stream<Item = Result<Other, watcher::Error>> + Send + 'static,
1313        mapper: impl Fn(Other) -> I + Sync + Send + 'static,
1314        dyntype: Other::DynamicType,
1315    ) -> Self
1316    where
1317        Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1318        Other::DynamicType: Debug + Clone,
1319        I: 'static + IntoIterator<Item = ObjectRef<K>>,
1320        I::IntoIter: Send,
1321    {
1322        let other_watcher = trigger_others(trigger, mapper, dyntype);
1323        self.trigger_selector.push(other_watcher.boxed());
1324        self
1325    }
1326
1327    /// Trigger the reconciliation process for a shared stream of `Other`
1328    /// objects related to a `K`
1329    ///
1330    /// Same as [`Controller::watches`], but instead of an `Api`, a shared
1331    /// stream of resources is used. This allows for sharing input streams
1332    /// between multiple controllers.
1333    ///
1334    /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature.
1335    ///
1336    /// Watcher streams passed in here should be filtered first through `touched_objects`.
1337    ///
1338    /// # Example:
1339    ///
1340    /// ```no_run
1341    /// # use futures::StreamExt;
1342    /// # use k8s_openapi::api::core::v1::ConfigMap;
1343    /// # use k8s_openapi::api::apps::v1::DaemonSet;
1344    /// # use kube::runtime::controller::Action;
1345    /// # use kube::runtime::{predicates, reflector::ObjectRef, watcher, Controller, WatchStreamExt};
1346    /// # use kube::{Api, Client, Error, ResourceExt};
1347    /// # use std::sync::Arc;
1348    /// # type CustomResource = ConfigMap;
1349    /// # async fn reconcile(_: Arc<CustomResource>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
1350    /// # fn error_policy(_: Arc<CustomResource>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
1351    /// fn mapper(_: Arc<DaemonSet>) -> Option<ObjectRef<CustomResource>> { todo!() }
1352    /// # async fn doc(client: kube::Client) {
1353    /// let api: Api<DaemonSet> = Api::all(client.clone());
1354    /// let cr: Api<CustomResource> = Api::all(client.clone());
1355    /// let (reader, writer) = kube_runtime::reflector::store_shared(128);
1356    /// let subscriber = writer
1357    ///     .subscribe()
1358    ///     .expect("subscribers can only be created from shared stores");
1359    /// let daemons = watcher(api, watcher::Config::default())
1360    ///     .reflect(writer)
1361    ///     .touched_objects()
1362    ///     .for_each(|ev| async move {
1363    ///         match ev {
1364    ///             Ok(obj) => {},
1365    ///             Err(error) => tracing::error!(%error, "received err")
1366    ///         }
1367    ///     });
1368    ///
1369    /// let controller = Controller::new(cr, watcher::Config::default())
1370    ///     .watches_shared_stream(subscriber, mapper)
1371    ///     .run(reconcile, error_policy, Arc::new(()))
1372    ///     .for_each(|_| std::future::ready(()));
1373    ///
1374    /// // Drive streams using a select statement
1375    /// tokio::select! {
1376    ///   _ = daemons => {},
1377    ///   _ = controller => {},
1378    /// }
1379    /// # }
1380    /// ```
1381    #[cfg(feature = "unstable-runtime-subscribe")]
1382    #[must_use]
1383    pub fn watches_shared_stream<Other, I>(
1384        self,
1385        trigger: impl Stream<Item = Arc<Other>> + Send + 'static,
1386        mapper: impl Fn(Arc<Other>) -> I + Sync + Send + 'static,
1387    ) -> Self
1388    where
1389        Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1390        Other::DynamicType: Default + Debug + Clone,
1391        I: 'static + IntoIterator<Item = ObjectRef<K>>,
1392        I::IntoIter: Send,
1393    {
1394        self.watches_shared_stream_with(trigger, mapper, Default::default())
1395    }
1396
1397    /// Trigger the reconciliation process for a shared stream of `Other` objects related to a `K`
1398    ///
1399    /// Same as [`Controller::watches`], but instead of an `Api`, a shared
1400    /// stream of resources is used. This allows for sharing of streams between
1401    /// multiple controllers.
1402    ///
1403    /// **NB**: This is constructor requires an [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) feature.
1404    ///
1405    /// Same as [`Controller::watches_shared_stream`], but accepts a `DynamicType` so it can be used with dynamic resources.
1406    #[cfg(feature = "unstable-runtime-subscribe")]
1407    #[must_use]
1408    pub fn watches_shared_stream_with<Other, I>(
1409        mut self,
1410        trigger: impl Stream<Item = Arc<Other>> + Send + 'static,
1411        mapper: impl Fn(Arc<Other>) -> I + Sync + Send + 'static,
1412        dyntype: Other::DynamicType,
1413    ) -> Self
1414    where
1415        Other: Clone + Resource + DeserializeOwned + Debug + Send + 'static,
1416        Other::DynamicType: Debug + Clone,
1417        I: 'static + IntoIterator<Item = ObjectRef<K>>,
1418        I::IntoIter: Send,
1419    {
1420        let other_watcher = trigger_others_shared(trigger.map(Ok), mapper, dyntype);
1421        self.trigger_selector.push(other_watcher.boxed());
1422        self
1423    }
1424
1425    /// Trigger a reconciliation for all managed objects whenever `trigger` emits a value
1426    ///
1427    /// For example, this can be used to reconcile all objects whenever the controller's configuration changes.
1428    ///
1429    /// To reconcile all objects when a new line is entered:
1430    ///
1431    /// ```
1432    /// # async {
1433    /// use futures::stream::StreamExt;
1434    /// use k8s_openapi::api::core::v1::ConfigMap;
1435    /// use kube::{
1436    ///     Client,
1437    ///     api::{Api, ResourceExt},
1438    ///     runtime::{
1439    ///         controller::{Controller, Action},
1440    ///         watcher,
1441    ///     },
1442    /// };
1443    /// use std::{convert::Infallible, io::BufRead, sync::Arc};
1444    /// let (mut reload_tx, reload_rx) = futures::channel::mpsc::channel(0);
1445    /// // Using a regular background thread since tokio::io::stdin() doesn't allow aborting reads,
1446    /// // and its worker prevents the Tokio runtime from shutting down.
1447    /// std::thread::spawn(move || {
1448    ///     for _ in std::io::BufReader::new(std::io::stdin()).lines() {
1449    ///         let _ = reload_tx.try_send(());
1450    ///     }
1451    /// });
1452    /// Controller::new(
1453    ///     Api::<ConfigMap>::all(Client::try_default().await.unwrap()),
1454    ///     watcher::Config::default(),
1455    /// )
1456    /// .reconcile_all_on(reload_rx.map(|_| ()))
1457    /// .run(
1458    ///     |o, _| async move {
1459    ///         println!("Reconciling {}", o.name_any());
1460    ///         Ok(Action::await_change())
1461    ///     },
1462    ///     |_object: Arc<ConfigMap>, err: &Infallible, _| Err(err).unwrap(),
1463    ///     Arc::new(()),
1464    /// );
1465    /// # };
1466    /// ```
1467    ///
1468    /// This can be called multiple times, in which case they are additive; reconciles are scheduled whenever *any* [`Stream`] emits a new item.
1469    ///
1470    /// If a [`Stream`] is terminated (by emitting [`None`]) then the [`Controller`] keeps running, but the [`Stream`] stops being polled.
1471    #[must_use]
1472    pub fn reconcile_all_on(mut self, trigger: impl Stream<Item = ()> + Send + Sync + 'static) -> Self {
1473        let store = self.store();
1474        let dyntype = self.dyntype.clone();
1475        self.trigger_selector.push(
1476            trigger
1477                .flat_map(move |()| {
1478                    let dyntype = dyntype.clone();
1479                    stream::iter(store.state().into_iter().map(move |obj| {
1480                        Ok(ReconcileRequest {
1481                            obj_ref: ObjectRef::from_obj_with(&*obj, dyntype.clone()),
1482                            reason: ReconcileReason::BulkReconcile,
1483                        })
1484                    }))
1485                })
1486                .boxed(),
1487        );
1488        self
1489    }
1490
1491    /// Trigger the reconciliation process for a managed object `ObjectRef<K>` whenever `trigger` emits a value
1492    ///
1493    /// This can be used to inject reconciliations for specific objects from an external resource.
1494    ///
1495    /// # Example:
1496    ///
1497    /// ```no_run
1498    /// # async {
1499    /// # use futures::{StreamExt, Stream, stream, TryStreamExt};
1500    /// # use k8s_openapi::api::core::v1::{ConfigMap};
1501    /// # use kube::api::Api;
1502    /// # use kube::runtime::controller::Action;
1503    /// # use kube::runtime::reflector::{ObjectRef, Store};
1504    /// # use kube::runtime::{reflector, watcher, Controller, WatchStreamExt};
1505    /// # use kube::runtime::watcher::Config;
1506    /// # use kube::{Client, Error, ResourceExt};
1507    /// # use std::future;
1508    /// # use std::sync::Arc;
1509    /// #
1510    /// # let client: Client = todo!();
1511    /// # async fn reconcile(_: Arc<ConfigMap>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
1512    /// # fn error_policy(_: Arc<ConfigMap>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
1513    /// # fn watch_external_objects() -> impl Stream<Item = ExternalObject> { stream::iter(vec![]) }
1514    /// # let ns = "controller-ns".to_string();
1515    /// struct ExternalObject {
1516    ///     name: String,
1517    /// }
1518    /// let external_stream = watch_external_objects().map(|ext| {
1519    ///     ObjectRef::new(&format!("{}-cm", ext.name)).within(&ns)
1520    /// });
1521    ///
1522    /// Controller::new(Api::<ConfigMap>::namespaced(client, &ns), Config::default())
1523    ///     .reconcile_on(external_stream)
1524    ///     .run(reconcile, error_policy, Arc::new(()))
1525    ///     .for_each(|_| future::ready(()))
1526    ///     .await;
1527    /// # };
1528    /// ```
1529    #[cfg(feature = "unstable-runtime-reconcile-on")]
1530    #[must_use]
1531    pub fn reconcile_on(mut self, trigger: impl Stream<Item = ObjectRef<K>> + Send + 'static) -> Self {
1532        self.trigger_selector.push(
1533            trigger
1534                .map(move |obj| {
1535                    Ok(ReconcileRequest {
1536                        obj_ref: obj,
1537                        reason: ReconcileReason::Unknown,
1538                    })
1539                })
1540                .boxed(),
1541        );
1542        self
1543    }
1544
1545    /// Start a graceful shutdown when `trigger` resolves. Once a graceful shutdown has been initiated:
1546    ///
1547    /// - No new reconciliations are started from the scheduler
1548    /// - The underlying Kubernetes watch is terminated
1549    /// - All running reconciliations are allowed to finish
1550    /// - [`Controller::run`]'s [`Stream`] terminates once all running reconciliations are done.
1551    ///
1552    /// For example, to stop the reconciler whenever the user presses Ctrl+C:
1553    ///
1554    /// ```rust
1555    /// # async {
1556    /// use futures::future::FutureExt;
1557    /// use k8s_openapi::api::core::v1::ConfigMap;
1558    /// use kube::{Api, Client, ResourceExt};
1559    /// use kube_runtime::{
1560    ///     controller::{Controller, Action},
1561    ///     watcher,
1562    /// };
1563    /// use std::{convert::Infallible, sync::Arc};
1564    /// Controller::new(
1565    ///     Api::<ConfigMap>::all(Client::try_default().await.unwrap()),
1566    ///     watcher::Config::default(),
1567    /// )
1568    /// .graceful_shutdown_on(tokio::signal::ctrl_c().map(|_| ()))
1569    /// .run(
1570    ///     |o, _| async move {
1571    ///         println!("Reconciling {}", o.name_any());
1572    ///         Ok(Action::await_change())
1573    ///     },
1574    ///     |_, err: &Infallible, _| Err(err).unwrap(),
1575    ///     Arc::new(()),
1576    /// );
1577    /// # };
1578    /// ```
1579    ///
1580    /// This can be called multiple times, in which case they are additive; the [`Controller`] starts to terminate
1581    /// as soon as *any* [`Future`] resolves.
1582    #[must_use]
1583    pub fn graceful_shutdown_on(mut self, trigger: impl Future<Output = ()> + Send + Sync + 'static) -> Self {
1584        self.graceful_shutdown_selector.push(trigger.boxed());
1585        self
1586    }
1587
1588    /// Initiate graceful shutdown on Ctrl+C or SIGTERM (on Unix), waiting for all reconcilers to finish.
1589    ///
1590    /// Once a graceful shutdown has been initiated, Ctrl+C (or SIGTERM) can be sent again
1591    /// to request a forceful shutdown (requesting that all reconcilers abort on the next yield point).
1592    ///
1593    /// NOTE: On Unix this leaves the default handlers for SIGINT and SIGTERM disabled after the [`Controller`] has
1594    /// terminated. If you run this in a process containing more tasks than just the [`Controller`], ensure that
1595    /// all other tasks either terminate when the [`Controller`] does, that they have their own signal handlers,
1596    /// or use [`Controller::graceful_shutdown_on`] to manage your own shutdown strategy.
1597    ///
1598    /// NOTE: If developing a Windows service then you need to listen to its lifecycle events instead, and hook that into
1599    /// [`Controller::graceful_shutdown_on`].
1600    ///
1601    /// NOTE: [`Controller::run`] terminates as soon as a forceful shutdown is requested, but leaves the reconcilers running
1602    /// in the background while they terminate. This will block [`tokio::runtime::Runtime`] termination until they actually terminate,
1603    /// unless you run [`std::process::exit`] afterwards.
1604    #[must_use]
1605    pub fn shutdown_on_signal(mut self) -> Self {
1606        async fn shutdown_signal() {
1607            futures::future::select(
1608                tokio::signal::ctrl_c().map(|_| ()).boxed(),
1609                #[cfg(unix)]
1610                tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
1611                    .unwrap()
1612                    .recv()
1613                    .map(|_| ())
1614                    .boxed(),
1615                // Assume that ctrl_c is enough on non-Unix platforms (such as Windows)
1616                #[cfg(not(unix))]
1617                futures::future::pending::<()>(),
1618            )
1619            .await;
1620        }
1621
1622        let (graceful_tx, graceful_rx) = channel::oneshot::channel();
1623        self.graceful_shutdown_selector
1624            .push(graceful_rx.map(|_| ()).boxed());
1625        self.forceful_shutdown_selector.push(
1626            async {
1627                tracing::info!("press ctrl+c to shut down gracefully");
1628                shutdown_signal().await;
1629                if let Ok(()) = graceful_tx.send(()) {
1630                    tracing::info!("graceful shutdown requested, press ctrl+c again to force shutdown");
1631                } else {
1632                    tracing::info!(
1633                        "graceful shutdown already requested, press ctrl+c again to force shutdown"
1634                    );
1635                }
1636                shutdown_signal().await;
1637                tracing::info!("forced shutdown requested");
1638            }
1639            .boxed(),
1640        );
1641        self
1642    }
1643
1644    /// Consume all the parameters of the Controller and start the applier stream
1645    ///
1646    /// This creates a stream from all builder calls and starts an applier with
1647    /// a specified `reconciler` and `error_policy` callbacks. Each of these will be called
1648    /// with a configurable `context`.
1649    pub fn run<ReconcilerFut, Ctx>(
1650        self,
1651        mut reconciler: impl FnMut(Arc<K>, Arc<Ctx>) -> ReconcilerFut,
1652        error_policy: impl Fn(Arc<K>, &ReconcilerFut::Error, Arc<Ctx>) -> Action,
1653        context: Arc<Ctx>,
1654    ) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, watcher::Error>>>
1655    where
1656        K::DynamicType: Debug + Unpin,
1657        ReconcilerFut: TryFuture<Ok = Action> + Send + 'static,
1658        ReconcilerFut::Error: std::error::Error + Send + 'static,
1659    {
1660        applier(
1661            move |obj, ctx| {
1662                CancelableJoinHandle::spawn(
1663                    reconciler(obj, ctx).into_future().in_current_span(),
1664                    &Handle::current(),
1665                )
1666            },
1667            error_policy,
1668            context,
1669            self.reader,
1670            StreamBackoff::new(self.trigger_selector, self.trigger_backoff)
1671                .take_until(future::select_all(self.graceful_shutdown_selector)),
1672            self.config,
1673        )
1674        .take_until(futures::future::select_all(self.forceful_shutdown_selector))
1675    }
1676}
1677
1678#[cfg(test)]
1679mod tests {
1680    use std::{convert::Infallible, pin::pin, sync::Arc, time::Duration};
1681
1682    use super::{Action, APPLIER_REQUEUE_BUF_SIZE};
1683    use crate::{
1684        applier,
1685        reflector::{self, ObjectRef},
1686        watcher::{self, metadata_watcher, watcher, Event},
1687        Config, Controller,
1688    };
1689    use futures::{Stream, StreamExt, TryStreamExt};
1690    use k8s_openapi::api::core::v1::ConfigMap;
1691    use kube_client::{core::ObjectMeta, Api, Resource};
1692    use serde::de::DeserializeOwned;
1693    use tokio::time::timeout;
1694
1695    fn assert_send<T: Send>(x: T) -> T {
1696        x
1697    }
1698
1699    // Used to typecheck that a type T is a generic type that implements Stream
1700    // and returns a WatchEvent generic over a resource `K`
1701    fn assert_stream<T, K>(x: T) -> T
1702    where
1703        T: Stream<Item = watcher::Result<Event<K>>> + Send,
1704        K: Resource + Clone + DeserializeOwned + std::fmt::Debug + Send + 'static,
1705    {
1706        x
1707    }
1708
1709    fn mock_type<T>() -> T {
1710        unimplemented!(
1711            "mock_type is not supposed to be called, only used for filling holes in type assertions"
1712        )
1713    }
1714
1715    // not #[test] because we don't want to actually run it, we just want to assert that it typechecks
1716    #[allow(dead_code, unused_must_use)]
1717    fn test_controller_should_be_send() {
1718        assert_send(
1719            Controller::new(mock_type::<Api<ConfigMap>>(), Default::default()).run(
1720                |_, _| async { Ok(mock_type::<Action>()) },
1721                |_: Arc<ConfigMap>, _: &std::io::Error, _| mock_type::<Action>(),
1722                Arc::new(()),
1723            ),
1724        );
1725    }
1726
1727    // not #[test] because we don't want to actually run it, we just want to
1728    // assert that it typechecks
1729    //
1730    // will check return types for `watcher` and `watch_metadata` do not drift
1731    // given an arbitrary K that implements `Resource` (e.g ConfigMap)
1732    #[allow(dead_code, unused_must_use)]
1733    fn test_watcher_stream_type_drift() {
1734        assert_stream(watcher(mock_type::<Api<ConfigMap>>(), Default::default()));
1735        assert_stream(metadata_watcher(
1736            mock_type::<Api<ConfigMap>>(),
1737            Default::default(),
1738        ));
1739    }
1740
1741    #[tokio::test]
1742    async fn applier_must_not_deadlock_if_reschedule_buffer_fills() {
1743        // This tests that `applier` handles reschedule queue backpressure correctly, by trying to flood it with no-op reconciles
1744        // This is intended to avoid regressing on https://github.com/kube-rs/kube/issues/926
1745
1746        // Assume that we can keep APPLIER_REQUEUE_BUF_SIZE flooded if we have 100x the number of objects "in rotation"
1747        // On my (@nightkr)'s 3900X I can reliably trigger this with 10x, but let's have some safety margin to avoid false negatives
1748        let items = APPLIER_REQUEUE_BUF_SIZE * 50;
1749        // Assume that everything's OK if we can reconcile every object 3 times on average
1750        let reconciles = items * 3;
1751
1752        let (queue_tx, queue_rx) = futures::channel::mpsc::unbounded::<ObjectRef<ConfigMap>>();
1753        let (store_rx, mut store_tx) = reflector::store();
1754        let mut applier = pin!(applier(
1755            |_obj, _| {
1756                Box::pin(async move {
1757                    // Try to flood the rescheduling buffer buffer by just putting it back in the queue immediately
1758                    //println!("reconciling {:?}", obj.metadata.name);
1759                    Ok(Action::requeue(Duration::ZERO))
1760                })
1761            },
1762            |_: Arc<ConfigMap>, _: &Infallible, _| todo!(),
1763            Arc::new(()),
1764            store_rx,
1765            queue_rx.map(Result::<_, Infallible>::Ok),
1766            Config::default(),
1767        ));
1768        store_tx.apply_watcher_event(&watcher::Event::InitDone);
1769        for i in 0..items {
1770            let obj = ConfigMap {
1771                metadata: ObjectMeta {
1772                    name: Some(format!("cm-{i}")),
1773                    namespace: Some("default".to_string()),
1774                    ..Default::default()
1775                },
1776                ..Default::default()
1777            };
1778            store_tx.apply_watcher_event(&watcher::Event::Apply(obj.clone()));
1779            queue_tx.unbounded_send(ObjectRef::from_obj(&obj)).unwrap();
1780        }
1781
1782        timeout(
1783            Duration::from_secs(10),
1784            applier
1785                .as_mut()
1786                .take(reconciles)
1787                .try_for_each(|_| async { Ok(()) }),
1788        )
1789        .await
1790        .expect("test timeout expired, applier likely deadlocked")
1791        .unwrap();
1792
1793        // Do an orderly shutdown to ensure that no individual reconcilers are stuck
1794        drop(queue_tx);
1795        timeout(
1796            Duration::from_secs(10),
1797            applier.try_for_each(|_| async { Ok(()) }),
1798        )
1799        .await
1800        .expect("applier cleanup timeout expired, individual reconciler likely deadlocked?")
1801        .unwrap();
1802    }
1803}