kube_runtime/
watcher.rs

1//! Watches a Kubernetes Resource for changes, with error recovery
2//!
3//! See [`watcher`] for the primary entry point.
4
5use crate::utils::ResetTimerBackoff;
6use async_trait::async_trait;
7use backoff::{backoff::Backoff, ExponentialBackoff};
8use educe::Educe;
9use futures::{stream::BoxStream, Stream, StreamExt};
10use kube_client::{
11    api::{ListParams, Resource, ResourceExt, VersionMatch, WatchEvent, WatchParams},
12    core::{metadata::PartialObjectMeta, ObjectList, Selector},
13    error::ErrorResponse,
14    Api, Error as ClientErr,
15};
16use serde::de::DeserializeOwned;
17use std::{clone::Clone, collections::VecDeque, fmt::Debug, future, time::Duration};
18use thiserror::Error;
19use tracing::{debug, error, warn};
20
21#[derive(Debug, Error)]
22pub enum Error {
23    #[error("failed to perform initial object list: {0}")]
24    InitialListFailed(#[source] kube_client::Error),
25    #[error("failed to start watching object: {0}")]
26    WatchStartFailed(#[source] kube_client::Error),
27    #[error("error returned by apiserver during watch: {0}")]
28    WatchError(#[source] ErrorResponse),
29    #[error("watch stream failed: {0}")]
30    WatchFailed(#[source] kube_client::Error),
31    #[error("no metadata.resourceVersion in watch result (does resource support watch?)")]
32    NoResourceVersion,
33}
34pub type Result<T, E = Error> = std::result::Result<T, E>;
35
36#[derive(Debug, Clone)]
37/// Watch events returned from the [`watcher`]
38pub enum Event<K> {
39    /// An object was added or modified
40    Apply(K),
41    /// An object was deleted
42    ///
43    /// NOTE: This should not be used for managing persistent state elsewhere, since
44    /// events may be lost if the watcher is unavailable. Use Finalizers instead.
45    Delete(K),
46    /// The watch stream was restarted.
47    ///
48    /// A series of `InitApply` events are expected to follow until all matching objects
49    /// have been listed. This event can be used to prepare a buffer for `InitApply` events.
50    Init,
51    /// Received an object during `Init`.
52    ///
53    /// Objects returned here are either from the initial stream using the `StreamingList` strategy,
54    /// or from pages using the `ListWatch` strategy.
55    ///
56    /// These events can be passed up if having a complete set of objects is not a concern.
57    /// If you need to wait for a complete set, please buffer these events until an `InitDone`.
58    InitApply(K),
59    /// The initialisation is complete.
60    ///
61    /// This can be used as a signal to replace buffered store contents atomically.
62    /// No more `InitApply` events will happen until the next `Init` event.
63    ///
64    /// Any objects that were previously [`Applied`](Event::Applied) but are not listed in any of
65    /// the `InitApply` events should be assumed to have been [`Deleted`](Event::Deleted).
66    InitDone,
67}
68
69impl<K> Event<K> {
70    /// Flattens out all objects that were added or modified in the event.
71    ///
72    /// `Deleted` objects are ignored, all objects mentioned by `Restarted` events are
73    /// emitted individually.
74    #[deprecated(
75        since = "0.92.0",
76        note = "unnecessary to flatten a single object. This fn will be removed in 0.96.0."
77    )]
78    pub fn into_iter_applied(self) -> impl Iterator<Item = K> {
79        match self {
80            Self::Apply(obj) | Self::InitApply(obj) => Some(obj),
81            Self::Delete(_) | Self::Init | Self::InitDone => None,
82        }
83        .into_iter()
84    }
85
86    /// Flattens out all objects that were added, modified, or deleted in the event.
87    ///
88    /// Note that `Deleted` events may be missed when restarting the stream. Use finalizers
89    /// or owner references instead if you care about cleaning up external resources after
90    /// deleted objects.
91    #[deprecated(
92        since = "0.92.0",
93        note = "unnecessary to flatten a single object. This fn will be removed in 0.96.0."
94    )]
95    pub fn into_iter_touched(self) -> impl Iterator<Item = K> {
96        match self {
97            Self::Apply(obj) | Self::Delete(obj) | Self::InitApply(obj) => Some(obj),
98            Self::Init | Self::InitDone => None,
99        }
100        .into_iter()
101    }
102
103    /// Map each object in an event through a mutator fn
104    ///
105    /// This allows for memory optimizations in watch streams.
106    /// If you are chaining a watch stream into a reflector as an in memory state store,
107    /// you can control the space used by each object by dropping fields.
108    ///
109    /// ```no_run
110    /// use k8s_openapi::api::core::v1::Pod;
111    /// use kube::ResourceExt;
112    /// # use kube::runtime::watcher::Event;
113    /// # let event: Event<Pod> = todo!();
114    /// event.modify(|pod| {
115    ///     pod.managed_fields_mut().clear();
116    ///     pod.annotations_mut().clear();
117    ///     pod.status = None;
118    /// });
119    /// ```
120    #[must_use]
121    pub fn modify(mut self, mut f: impl FnMut(&mut K)) -> Self {
122        match &mut self {
123            Self::Apply(obj) | Self::Delete(obj) | Self::InitApply(obj) => (f)(obj),
124            Self::Init | Self::InitDone => {} // markers, nothing to modify
125        }
126        self
127    }
128}
129
130#[derive(Educe, Default)]
131#[educe(Debug)]
132/// The internal finite state machine driving the [`watcher`]
133enum State<K> {
134    /// The Watcher is empty, and the next [`poll`](Stream::poll_next) will start the initial LIST to get all existing objects
135    #[default]
136    Empty,
137    /// The Watcher is in the process of paginating through the initial LIST
138    InitPage {
139        continue_token: Option<String>,
140        objects: VecDeque<K>,
141        last_bookmark: Option<String>,
142    },
143    /// Kubernetes 1.27 Streaming Lists
144    /// The initial watch is in progress
145    InitialWatch {
146        #[educe(Debug(ignore))]
147        stream: BoxStream<'static, kube_client::Result<WatchEvent<K>>>,
148    },
149    /// The initial LIST was successful, so we should move on to starting the actual watch.
150    InitListed { resource_version: String },
151    /// The watch is in progress, from this point we just return events from the server.
152    ///
153    /// If the connection is disrupted then we propagate the error but try to restart the watch stream by
154    /// returning to the `InitListed` state.
155    /// If we fall out of the K8s watch window then we propagate the error and fall back doing a re-list
156    /// with `Empty`.
157    Watching {
158        resource_version: String,
159        #[educe(Debug(ignore))]
160        stream: BoxStream<'static, kube_client::Result<WatchEvent<K>>>,
161    },
162}
163
164/// Used to control whether the watcher receives the full object, or only the
165/// metadata
166#[async_trait]
167trait ApiMode {
168    type Value: Clone;
169
170    async fn list(&self, lp: &ListParams) -> kube_client::Result<ObjectList<Self::Value>>;
171    async fn watch(
172        &self,
173        wp: &WatchParams,
174        version: &str,
175    ) -> kube_client::Result<BoxStream<'static, kube_client::Result<WatchEvent<Self::Value>>>>;
176}
177
178/// A wrapper around the `Api` of a `Resource` type that when used by the
179/// watcher will return the entire (full) object
180struct FullObject<'a, K> {
181    api: &'a Api<K>,
182}
183
184/// Configurable list semantics for `watcher` relists
185#[derive(Clone, Default, Debug, PartialEq)]
186pub enum ListSemantic {
187    /// List calls perform a full quorum read for most recent results
188    ///
189    /// Prefer this if you have strong consistency requirements. Note that this
190    /// is more taxing for the apiserver and can be less scalable for the cluster.
191    ///
192    /// If you are observing large resource sets (such as congested `Controller` cases),
193    /// you typically have a delay between the list call completing, and all the events
194    /// getting processed. In such cases, it is probably worth picking `Any` over `MostRecent`,
195    /// as your events are not guaranteed to be up-to-date by the time you get to them anyway.
196    #[default]
197    MostRecent,
198
199    /// List calls returns cached results from apiserver
200    ///
201    /// This is faster and much less taxing on the apiserver, but can result
202    /// in much older results than has previously observed for `Restarted` events,
203    /// particularly in HA configurations, due to partitions or stale caches.
204    ///
205    /// This option makes the most sense for controller usage where events have
206    /// some delay between being seen by the runtime, and it being sent to the reconciler.
207    Any,
208}
209
210/// Configurable watcher listwatch semantics
211
212#[derive(Clone, Default, Debug, PartialEq)]
213pub enum InitialListStrategy {
214    /// List first, then watch from given resouce version
215    ///
216    /// This is the old and default way of watching. The watcher will do a paginated list call first before watching.
217    /// When using this mode, you can configure the `page_size` on the watcher.
218    #[default]
219    ListWatch,
220    /// Kubernetes 1.27 Streaming Lists
221    ///
222    /// See [upstream documentation on streaming lists](https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists),
223    /// and the [KEP](https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details).
224    StreamingList,
225}
226
227/// Accumulates all options that can be used on the watcher invocation.
228#[derive(Clone, Debug, PartialEq)]
229pub struct Config {
230    /// A selector to restrict the list of returned objects by their labels.
231    ///
232    /// Defaults to everything if `None`.
233    pub label_selector: Option<String>,
234
235    /// A selector to restrict the list of returned objects by their fields.
236    ///
237    /// Defaults to everything if `None`.
238    pub field_selector: Option<String>,
239
240    /// Timeout for the list/watch call.
241    ///
242    /// This limits the duration of the call, regardless of any activity or inactivity.
243    /// If unset for a watch call, we will use 290s.
244    /// We limit this to 295s due to [inherent watch limitations](https://github.com/kubernetes/kubernetes/issues/6513).
245    pub timeout: Option<u32>,
246
247    /// Semantics for list calls.
248    ///
249    /// Configures re-list for performance vs. consistency.
250    ///
251    /// NB: This option only has an effect for [`InitialListStrategy::ListWatch`].
252    pub list_semantic: ListSemantic,
253
254    /// Control how the watcher fetches the initial list of objects.
255    ///
256    /// - `ListWatch`: The watcher will fetch the initial list of objects using a list call.
257    /// - `StreamingList`: The watcher will fetch the initial list of objects using a watch call.
258    ///
259    /// `StreamingList` is more efficient than `ListWatch`, but it requires the server to support
260    /// streaming list bookmarks (opt-in feature gate in Kubernetes 1.27).
261    ///
262    /// See [upstream documentation on streaming lists](https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists),
263    /// and the [KEP](https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details).
264    pub initial_list_strategy: InitialListStrategy,
265
266    /// Maximum number of objects retrieved per list operation resyncs.
267    ///
268    /// This can reduce the memory consumption during resyncs, at the cost of requiring more
269    /// API roundtrips to complete.
270    ///
271    /// Defaults to 500. Note that `None` represents unbounded.
272    ///
273    /// NB: This option only has an effect for [`InitialListStrategy::ListWatch`].
274    pub page_size: Option<u32>,
275
276    /// Enables watch events with type "BOOKMARK".
277    ///
278    /// Requests watch bookmarks from the apiserver when enabled for improved watch precision and reduced list calls.
279    /// This is default enabled and should generally not be turned off.
280    pub bookmarks: bool,
281}
282
283impl Default for Config {
284    fn default() -> Self {
285        Self {
286            bookmarks: true,
287            label_selector: None,
288            field_selector: None,
289            timeout: None,
290            list_semantic: ListSemantic::default(),
291            // same default page size limit as client-go
292            // https://github.com/kubernetes/client-go/blob/aed71fa5cf054e1c196d67b2e21f66fd967b8ab1/tools/pager/pager.go#L31
293            page_size: Some(500),
294            initial_list_strategy: InitialListStrategy::ListWatch,
295        }
296    }
297}
298
299/// Builder interface to Config
300///
301/// Usage:
302/// ```
303/// use kube::runtime::watcher::Config;
304/// let wc = Config::default()
305///     .timeout(60)
306///     .labels("kubernetes.io/lifecycle=spot");
307/// ```
308impl Config {
309    /// Configure the timeout for list/watch calls
310    ///
311    /// This limits the duration of the call, regardless of any activity or inactivity.
312    /// Defaults to 290s
313    #[must_use]
314    pub fn timeout(mut self, timeout_secs: u32) -> Self {
315        self.timeout = Some(timeout_secs);
316        self
317    }
318
319    /// Configure the selector to restrict the list of returned objects by their fields.
320    ///
321    /// Defaults to everything.
322    /// Supports `=`, `==`, `!=`, and can be comma separated: `key1=value1,key2=value2`.
323    /// The server only supports a limited number of field queries per type.
324    #[must_use]
325    pub fn fields(mut self, field_selector: &str) -> Self {
326        self.field_selector = Some(field_selector.to_string());
327        self
328    }
329
330    /// Configure the selector to restrict the list of returned objects by their labels.
331    ///
332    /// Defaults to everything.
333    /// Supports `=`, `==`, `!=`, and can be comma separated: `key1=value1,key2=value2`.
334    #[must_use]
335    pub fn labels(mut self, label_selector: &str) -> Self {
336        self.label_selector = Some(label_selector.to_string());
337        self
338    }
339
340    /// Configure typed label selectors
341    ///
342    /// Configure typed selectors from [`Selector`](kube_client::core::Selector) and [`Expression`](kube_client::core::Expression) lists.
343    ///
344    /// ```
345    /// use kube_runtime::watcher::Config;
346    /// use kube_client::core::{Expression, Selector, ParseExpressionError};
347    /// use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector;
348    /// let selector: Selector = Expression::In("env".into(), ["development".into(), "sandbox".into()].into()).into();
349    /// let cfg = Config::default().labels_from(&selector);
350    /// let cfg = Config::default().labels_from(&Expression::Exists("foo".into()).into());
351    /// let selector: Selector = LabelSelector::default().try_into()?;
352    /// let cfg = Config::default().labels_from(&selector);
353    /// # Ok::<(), ParseExpressionError>(())
354    ///```
355    #[must_use]
356    pub fn labels_from(mut self, selector: &Selector) -> Self {
357        self.label_selector = Some(selector.to_string());
358        self
359    }
360
361    /// Sets list semantic to configure re-list performance and consistency
362    ///
363    /// NB: This option only has an effect for [`InitialListStrategy::ListWatch`].
364    #[must_use]
365    pub fn list_semantic(mut self, semantic: ListSemantic) -> Self {
366        self.list_semantic = semantic;
367        self
368    }
369
370    /// Sets list semantic to `Any` to improve list performance
371    ///
372    /// NB: This option only has an effect for [`InitialListStrategy::ListWatch`].
373    #[must_use]
374    pub fn any_semantic(self) -> Self {
375        self.list_semantic(ListSemantic::Any)
376    }
377
378    /// Disables watch bookmarks to simplify watch handling
379    ///
380    /// This is not recommended to use with production watchers as it can cause desyncs.
381    /// See [#219](https://github.com/kube-rs/kube/issues/219) for details.
382    #[must_use]
383    pub fn disable_bookmarks(mut self) -> Self {
384        self.bookmarks = false;
385        self
386    }
387
388    /// Limits the number of objects retrieved in each list operation during resync.
389    ///
390    /// This can reduce the memory consumption during resyncs, at the cost of requiring more
391    /// API roundtrips to complete.
392    ///
393    /// NB: This option only has an effect for [`InitialListStrategy::ListWatch`].
394    #[must_use]
395    pub fn page_size(mut self, page_size: u32) -> Self {
396        self.page_size = Some(page_size);
397        self
398    }
399
400    /// Kubernetes 1.27 Streaming Lists
401    /// Sets list semantic to `Stream` to make use of watch bookmarks
402    #[must_use]
403    pub fn streaming_lists(mut self) -> Self {
404        self.initial_list_strategy = InitialListStrategy::StreamingList;
405        self
406    }
407
408    /// Converts generic `watcher::Config` structure to the instance of `ListParams` used for list requests.
409    fn to_list_params(&self) -> ListParams {
410        let (resource_version, version_match) = match self.list_semantic {
411            ListSemantic::Any => (Some("0".into()), Some(VersionMatch::NotOlderThan)),
412            ListSemantic::MostRecent => (None, None),
413        };
414        ListParams {
415            label_selector: self.label_selector.clone(),
416            field_selector: self.field_selector.clone(),
417            timeout: self.timeout,
418            version_match,
419            resource_version,
420            // The watcher handles pagination internally.
421            limit: self.page_size,
422            continue_token: None,
423        }
424    }
425
426    /// Converts generic `watcher::Config` structure to the instance of `WatchParams` used for watch requests.
427    fn to_watch_params(&self) -> WatchParams {
428        WatchParams {
429            label_selector: self.label_selector.clone(),
430            field_selector: self.field_selector.clone(),
431            timeout: self.timeout,
432            bookmarks: self.bookmarks,
433            send_initial_events: self.initial_list_strategy == InitialListStrategy::StreamingList,
434        }
435    }
436}
437
438#[async_trait]
439impl<K> ApiMode for FullObject<'_, K>
440where
441    K: Clone + Debug + DeserializeOwned + Send + 'static,
442{
443    type Value = K;
444
445    async fn list(&self, lp: &ListParams) -> kube_client::Result<ObjectList<Self::Value>> {
446        self.api.list(lp).await
447    }
448
449    async fn watch(
450        &self,
451        wp: &WatchParams,
452        version: &str,
453    ) -> kube_client::Result<BoxStream<'static, kube_client::Result<WatchEvent<Self::Value>>>> {
454        self.api.watch(wp, version).await.map(StreamExt::boxed)
455    }
456}
457
458/// A wrapper around the `Api` of a `Resource` type that when used by the
459/// watcher will return only the metadata associated with an object
460struct MetaOnly<'a, K> {
461    api: &'a Api<K>,
462}
463
464#[async_trait]
465impl<K> ApiMode for MetaOnly<'_, K>
466where
467    K: Clone + Debug + DeserializeOwned + Send + 'static,
468{
469    type Value = PartialObjectMeta<K>;
470
471    async fn list(&self, lp: &ListParams) -> kube_client::Result<ObjectList<Self::Value>> {
472        self.api.list_metadata(lp).await
473    }
474
475    async fn watch(
476        &self,
477        wp: &WatchParams,
478        version: &str,
479    ) -> kube_client::Result<BoxStream<'static, kube_client::Result<WatchEvent<Self::Value>>>> {
480        self.api.watch_metadata(wp, version).await.map(StreamExt::boxed)
481    }
482}
483
484/// Progresses the watcher a single step, returning (event, state)
485///
486/// This function should be trampolined: if event == `None`
487/// then the function should be called again until it returns a Some.
488#[allow(clippy::too_many_lines)] // for now
489async fn step_trampolined<A>(
490    api: &A,
491    wc: &Config,
492    state: State<A::Value>,
493) -> (Option<Result<Event<A::Value>>>, State<A::Value>)
494where
495    A: ApiMode,
496    A::Value: Resource + 'static,
497{
498    match state {
499        State::Empty => match wc.initial_list_strategy {
500            InitialListStrategy::ListWatch => (Some(Ok(Event::Init)), State::InitPage {
501                continue_token: None,
502                objects: VecDeque::default(),
503                last_bookmark: None,
504            }),
505            InitialListStrategy::StreamingList => match api.watch(&wc.to_watch_params(), "0").await {
506                Ok(stream) => (None, State::InitialWatch { stream }),
507                Err(err) => {
508                    if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) {
509                        warn!("watch initlist error with 403: {err:?}");
510                    } else {
511                        debug!("watch initlist error: {err:?}");
512                    }
513                    (Some(Err(Error::WatchStartFailed(err))), State::default())
514                }
515            },
516        },
517        State::InitPage {
518            continue_token,
519            mut objects,
520            last_bookmark,
521        } => {
522            if let Some(next) = objects.pop_front() {
523                return (Some(Ok(Event::InitApply(next))), State::InitPage {
524                    continue_token,
525                    objects,
526                    last_bookmark,
527                });
528            }
529            // check if we need to perform more pages
530            if continue_token.is_none() {
531                if let Some(resource_version) = last_bookmark {
532                    // we have drained the last page - move on to next stage
533                    return (Some(Ok(Event::InitDone)), State::InitListed { resource_version });
534                }
535            }
536            let mut lp = wc.to_list_params();
537            lp.continue_token = continue_token;
538            match api.list(&lp).await {
539                Ok(list) => {
540                    let last_bookmark = list.metadata.resource_version.filter(|s| !s.is_empty());
541                    let continue_token = list.metadata.continue_.filter(|s| !s.is_empty());
542                    if last_bookmark.is_none() && continue_token.is_none() {
543                        return (Some(Err(Error::NoResourceVersion)), State::Empty);
544                    }
545                    // Buffer page here, causing us to return to this enum branch (State::InitPage)
546                    // until the objects buffer has drained
547                    (None, State::InitPage {
548                        continue_token,
549                        objects: list.items.into_iter().collect(),
550                        last_bookmark,
551                    })
552                }
553                Err(err) => {
554                    if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) {
555                        warn!("watch list error with 403: {err:?}");
556                    } else {
557                        debug!("watch list error: {err:?}");
558                    }
559                    (Some(Err(Error::InitialListFailed(err))), State::Empty)
560                }
561            }
562        }
563        State::InitialWatch { mut stream } => {
564            match stream.next().await {
565                Some(Ok(WatchEvent::Added(obj) | WatchEvent::Modified(obj))) => {
566                    (Some(Ok(Event::InitApply(obj))), State::InitialWatch { stream })
567                }
568                Some(Ok(WatchEvent::Deleted(_obj))) => {
569                    // Kubernetes claims these events are impossible
570                    // https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists
571                    error!("got deleted event during initial watch. this is a bug");
572                    (None, State::InitialWatch { stream })
573                }
574                Some(Ok(WatchEvent::Bookmark(bm))) => {
575                    let marks_initial_end = bm.metadata.annotations.contains_key("k8s.io/initial-events-end");
576                    if marks_initial_end {
577                        (Some(Ok(Event::InitDone)), State::Watching {
578                            resource_version: bm.metadata.resource_version,
579                            stream,
580                        })
581                    } else {
582                        (None, State::InitialWatch { stream })
583                    }
584                }
585                Some(Ok(WatchEvent::Error(err))) => {
586                    // HTTP GONE, means we have desynced and need to start over and re-list :(
587                    let new_state = if err.code == 410 {
588                        State::default()
589                    } else {
590                        State::InitialWatch { stream }
591                    };
592                    if err.code == 403 {
593                        warn!("watcher watchevent error 403: {err:?}");
594                    } else {
595                        debug!("error watchevent error: {err:?}");
596                    }
597                    (Some(Err(Error::WatchError(err))), new_state)
598                }
599                Some(Err(err)) => {
600                    if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) {
601                        warn!("watcher error 403: {err:?}");
602                    } else {
603                        debug!("watcher error: {err:?}");
604                    }
605                    (Some(Err(Error::WatchFailed(err))), State::InitialWatch { stream })
606                }
607                None => (None, State::default()),
608            }
609        }
610        State::InitListed { resource_version } => {
611            match api.watch(&wc.to_watch_params(), &resource_version).await {
612                Ok(stream) => (None, State::Watching {
613                    resource_version,
614                    stream,
615                }),
616                Err(err) => {
617                    if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) {
618                        warn!("watch initlist error with 403: {err:?}");
619                    } else {
620                        debug!("watch initlist error: {err:?}");
621                    }
622                    (Some(Err(Error::WatchStartFailed(err))), State::InitListed {
623                        resource_version,
624                    })
625                }
626            }
627        }
628        State::Watching {
629            resource_version,
630            mut stream,
631        } => match stream.next().await {
632            Some(Ok(WatchEvent::Added(obj) | WatchEvent::Modified(obj))) => {
633                let resource_version = obj.resource_version().unwrap_or_default();
634                if resource_version.is_empty() {
635                    (Some(Err(Error::NoResourceVersion)), State::default())
636                } else {
637                    (Some(Ok(Event::Apply(obj))), State::Watching {
638                        resource_version,
639                        stream,
640                    })
641                }
642            }
643            Some(Ok(WatchEvent::Deleted(obj))) => {
644                let resource_version = obj.resource_version().unwrap_or_default();
645                if resource_version.is_empty() {
646                    (Some(Err(Error::NoResourceVersion)), State::default())
647                } else {
648                    (Some(Ok(Event::Delete(obj))), State::Watching {
649                        resource_version,
650                        stream,
651                    })
652                }
653            }
654            Some(Ok(WatchEvent::Bookmark(bm))) => (None, State::Watching {
655                resource_version: bm.metadata.resource_version,
656                stream,
657            }),
658            Some(Ok(WatchEvent::Error(err))) => {
659                // HTTP GONE, means we have desynced and need to start over and re-list :(
660                let new_state = if err.code == 410 {
661                    State::default()
662                } else {
663                    State::Watching {
664                        resource_version,
665                        stream,
666                    }
667                };
668                if err.code == 403 {
669                    warn!("watcher watchevent error 403: {err:?}");
670                } else {
671                    debug!("error watchevent error: {err:?}");
672                }
673                (Some(Err(Error::WatchError(err))), new_state)
674            }
675            Some(Err(err)) => {
676                if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) {
677                    warn!("watcher error 403: {err:?}");
678                } else {
679                    debug!("watcher error: {err:?}");
680                }
681                (Some(Err(Error::WatchFailed(err))), State::Watching {
682                    resource_version,
683                    stream,
684                })
685            }
686            None => (None, State::InitListed { resource_version }),
687        },
688    }
689}
690
691/// Trampoline helper for `step_trampolined`
692async fn step<A>(
693    api: &A,
694    config: &Config,
695    mut state: State<A::Value>,
696) -> (Result<Event<A::Value>>, State<A::Value>)
697where
698    A: ApiMode,
699    A::Value: Resource + 'static,
700{
701    loop {
702        match step_trampolined(api, config, state).await {
703            (Some(result), new_state) => return (result, new_state),
704            (None, new_state) => state = new_state,
705        }
706    }
707}
708
709/// Watches a Kubernetes Resource for changes continuously
710///
711/// Compared to [`Api::watch`], this automatically tries to recover the stream upon errors.
712///
713/// Errors from the underlying watch are propagated, after which the stream will go into recovery mode on the next poll.
714/// You can apply your own backoff by not polling the stream for a duration after errors.
715/// Keep in mind that some [`TryStream`](futures::TryStream) combinators (such as
716/// [`try_for_each`](futures::TryStreamExt::try_for_each) and [`try_concat`](futures::TryStreamExt::try_concat))
717/// will terminate eagerly as soon as they receive an [`Err`].
718///
719/// The events are intended to provide a safe input interface for a state store like a [`reflector`].
720/// Direct users may want to use [`WatchStreamExt`] for higher-level constructs.
721///
722/// ```no_run
723/// use kube::{
724///   api::{Api, ResourceExt}, Client,
725///   runtime::{watcher, WatchStreamExt}
726/// };
727/// use k8s_openapi::api::core::v1::Pod;
728/// use futures::TryStreamExt;
729/// #[tokio::main]
730/// async fn main() -> Result<(), watcher::Error> {
731///     let client = Client::try_default().await.unwrap();
732///     let pods: Api<Pod> = Api::namespaced(client, "apps");
733///
734///     watcher(pods, watcher::Config::default()).applied_objects()
735///         .try_for_each(|p| async move {
736///          println!("Applied: {}", p.name_any());
737///             Ok(())
738///         })
739///         .await?;
740///     Ok(())
741/// }
742/// ```
743/// [`WatchStreamExt`]: super::WatchStreamExt
744/// [`reflector`]: super::reflector::reflector
745/// [`Api::watch`]: kube_client::Api::watch
746///
747/// # Recovery
748///
749/// The stream will attempt to be recovered on the next poll after an [`Err`] is returned.
750/// This will normally happen immediately, but you can use [`StreamBackoff`](crate::utils::StreamBackoff)
751/// to introduce an artificial delay. [`default_backoff`] returns a suitable default set of parameters.
752///
753/// If the watch connection is interrupted, then `watcher` will attempt to restart the watch using the last
754/// [resource version](https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes)
755/// that we have seen on the stream. If this is successful then the stream is simply resumed from where it left off.
756/// If this fails because the resource version is no longer valid then we start over with a new stream, starting with
757/// an [`Event::Init`]. The internals mechanics of recovery should be considered an implementation detail.
758pub fn watcher<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
759    api: Api<K>,
760    watcher_config: Config,
761) -> impl Stream<Item = Result<Event<K>>> + Send {
762    futures::stream::unfold(
763        (api, watcher_config, State::default()),
764        |(api, watcher_config, state)| async {
765            let (event, state) = step(&FullObject { api: &api }, &watcher_config, state).await;
766            Some((event, (api, watcher_config, state)))
767        },
768    )
769}
770
771/// Watches a Kubernetes Resource for changes continuously and receives only the
772/// metadata
773///
774/// Compared to [`Api::watch_metadata`], this automatically tries to recover the stream upon errors.
775///
776/// Errors from the underlying watch are propagated, after which the stream will go into recovery mode on the next poll.
777/// You can apply your own backoff by not polling the stream for a duration after errors.
778/// Keep in mind that some [`TryStream`](futures::TryStream) combinators (such as
779/// [`try_for_each`](futures::TryStreamExt::try_for_each) and [`try_concat`](futures::TryStreamExt::try_concat))
780/// will terminate eagerly as soon as they receive an [`Err`].
781///
782/// The events are intended to provide a safe input interface for a state store like a [`reflector`].
783/// Direct users may want to use [`WatchStreamExt`] for higher-level constructs.
784///
785/// ```no_run
786/// use kube::{
787///   api::{Api, ResourceExt}, Client,
788///   runtime::{watcher, metadata_watcher, WatchStreamExt}
789/// };
790/// use k8s_openapi::api::core::v1::Pod;
791/// use futures::TryStreamExt;
792/// #[tokio::main]
793/// async fn main() -> Result<(), watcher::Error> {
794///     let client = Client::try_default().await.unwrap();
795///     let pods: Api<Pod> = Api::namespaced(client, "apps");
796///
797///     metadata_watcher(pods, watcher::Config::default()).applied_objects()
798///         .try_for_each(|p| async move {
799///          println!("Applied: {}", p.name_any());
800///             Ok(())
801///         })
802///         .await?;
803///     Ok(())
804/// }
805/// ```
806/// [`WatchStreamExt`]: super::WatchStreamExt
807/// [`reflector`]: super::reflector::reflector
808/// [`Api::watch`]: kube_client::Api::watch
809///
810/// # Recovery
811///
812/// The stream will attempt to be recovered on the next poll after an [`Err`] is returned.
813/// This will normally happen immediately, but you can use [`StreamBackoff`](crate::utils::StreamBackoff)
814/// to introduce an artificial delay. [`default_backoff`] returns a suitable default set of parameters.
815///
816/// If the watch connection is interrupted, then `watcher` will attempt to restart the watch using the last
817/// [resource version](https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes)
818/// that we have seen on the stream. If this is successful then the stream is simply resumed from where it left off.
819/// If this fails because the resource version is no longer valid then we start over with a new stream, starting with
820/// an [`Event::Init`]. The internals mechanics of recovery should be considered an implementation detail.
821#[allow(clippy::module_name_repetitions)]
822pub fn metadata_watcher<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
823    api: Api<K>,
824    watcher_config: Config,
825) -> impl Stream<Item = Result<Event<PartialObjectMeta<K>>>> + Send {
826    futures::stream::unfold(
827        (api, watcher_config, State::default()),
828        |(api, watcher_config, state)| async {
829            let (event, state) = step(&MetaOnly { api: &api }, &watcher_config, state).await;
830            Some((event, (api, watcher_config, state)))
831        },
832    )
833}
834
835/// Watch a single named object for updates
836///
837/// Emits `None` if the object is deleted (or not found), and `Some` if an object is updated (or created/found).
838///
839/// Often invoked indirectly via [`await_condition`](crate::wait::await_condition()).
840///
841/// ## Scope Warning
842///
843/// When using this with an `Api::all` on namespaced resources there is a chance of duplicated names.
844/// To avoid getting confusing / wrong answers for this, use `Api::namespaced` bound to a specific namespace
845/// when watching for transitions to namespaced objects.
846pub fn watch_object<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
847    api: Api<K>,
848    name: &str,
849) -> impl Stream<Item = Result<Option<K>>> + Send {
850    // filtering by object name in given scope, so there's at most one matching object
851    // footgun: Api::all may generate events from namespaced objects with the same name in different namespaces
852    let fields = format!("metadata.name={name}");
853    watcher(api, Config::default().fields(&fields))
854        // The `obj_seen` state is used to track whether the object exists in each Init / InitApply / InitDone
855        // sequence of events. If the object wasn't seen in any particular sequence it is treated as deleted and
856        // `None` is emitted when the InitDone event is received.
857        //
858        // The first check ensures `None` is emitted if the object was already gone (or not found), subsequent
859        // checks ensure `None` is emitted even if for some reason the Delete event wasn't received, which
860        // could happen given K8S events aren't guaranteed delivery.
861        .scan(false, |obj_seen, event| {
862            if matches!(event, Ok(Event::Init)) {
863                *obj_seen = false;
864            } else if matches!(event, Ok(Event::InitApply(_))) {
865                *obj_seen = true;
866            }
867            future::ready(Some((*obj_seen, event)))
868        })
869        .filter_map(|(obj_seen, event)| async move {
870            match event {
871                // Pass up `Some` for Found / Updated
872                Ok(Event::Apply(obj) | Event::InitApply(obj)) => Some(Ok(Some(obj))),
873                // Pass up `None` for Deleted
874                Ok(Event::Delete(_)) => Some(Ok(None)),
875                // Pass up `None` if the object wasn't seen in the initial list
876                Ok(Event::InitDone) if !obj_seen => Some(Ok(None)),
877                // Ignore marker events
878                Ok(Event::Init | Event::InitDone) => None,
879                // Bubble up errors
880                Err(err) => Some(Err(err)),
881            }
882        })
883}
884
885/// Default watcher backoff inspired by Kubernetes' client-go.
886///
887/// The parameters currently optimize for being kind to struggling apiservers.
888/// The exact parameters are taken from
889/// [client-go's reflector source](https://github.com/kubernetes/client-go/blob/980663e185ab6fc79163b1c2565034f6d58368db/tools/cache/reflector.go#L177-L181)
890/// and should not be considered stable.
891///
892/// This struct implements [`Backoff`] and is the default strategy used
893/// when calling `WatchStreamExt::default_backoff`. If you need to create
894/// this manually then [`DefaultBackoff::default`] can be used.
895pub struct DefaultBackoff(Strategy);
896type Strategy = ResetTimerBackoff<ExponentialBackoff>;
897
898impl Default for DefaultBackoff {
899    fn default() -> Self {
900        Self(ResetTimerBackoff::new(
901            backoff::ExponentialBackoffBuilder::new()
902                .with_initial_interval(Duration::from_millis(800))
903                .with_max_interval(Duration::from_secs(30))
904                .with_randomization_factor(1.0)
905                .with_multiplier(2.0)
906                .with_max_elapsed_time(None)
907                .build(),
908            Duration::from_secs(120),
909        ))
910    }
911}
912
913impl Backoff for DefaultBackoff {
914    fn next_backoff(&mut self) -> Option<Duration> {
915        self.0.next_backoff()
916    }
917
918    fn reset(&mut self) {
919        self.0.reset()
920    }
921}