kube_runtime/watcher.rs
1//! Watches a Kubernetes Resource for changes, with error recovery
2//!
3//! See [`watcher`] for the primary entry point.
4
5use crate::utils::{Backoff, ResetTimerBackoff};
6
7use backon::BackoffBuilder;
8use educe::Educe;
9use futures::{stream::BoxStream, Stream, StreamExt};
10use kube_client::{
11 api::{ListParams, Resource, ResourceExt, VersionMatch, WatchEvent, WatchParams},
12 core::{metadata::PartialObjectMeta, ObjectList, Selector},
13 error::ErrorResponse,
14 Api, Error as ClientErr,
15};
16use serde::de::DeserializeOwned;
17use std::{clone::Clone, collections::VecDeque, fmt::Debug, future, time::Duration};
18use thiserror::Error;
19use tracing::{debug, error, warn};
20
21#[derive(Debug, Error)]
22pub enum Error {
23 #[error("failed to perform initial object list: {0}")]
24 InitialListFailed(#[source] kube_client::Error),
25 #[error("failed to start watching object: {0}")]
26 WatchStartFailed(#[source] kube_client::Error),
27 #[error("error returned by apiserver during watch: {0}")]
28 WatchError(#[source] ErrorResponse),
29 #[error("watch stream failed: {0}")]
30 WatchFailed(#[source] kube_client::Error),
31 #[error("no metadata.resourceVersion in watch result (does resource support watch?)")]
32 NoResourceVersion,
33}
34pub type Result<T, E = Error> = std::result::Result<T, E>;
35
36#[derive(Debug, Clone)]
37/// Watch events returned from the [`watcher`]
38pub enum Event<K> {
39 /// An object was added or modified
40 Apply(K),
41 /// An object was deleted
42 ///
43 /// NOTE: This should not be used for managing persistent state elsewhere, since
44 /// events may be lost if the watcher is unavailable. Use Finalizers instead.
45 Delete(K),
46 /// The watch stream was restarted.
47 ///
48 /// A series of `InitApply` events are expected to follow until all matching objects
49 /// have been listed. This event can be used to prepare a buffer for `InitApply` events.
50 Init,
51 /// Received an object during `Init`.
52 ///
53 /// Objects returned here are either from the initial stream using the `StreamingList` strategy,
54 /// or from pages using the `ListWatch` strategy.
55 ///
56 /// These events can be passed up if having a complete set of objects is not a concern.
57 /// If you need to wait for a complete set, please buffer these events until an `InitDone`.
58 InitApply(K),
59 /// The initialisation is complete.
60 ///
61 /// This can be used as a signal to replace buffered store contents atomically.
62 /// No more `InitApply` events will happen until the next `Init` event.
63 ///
64 /// Any objects that were previously [`Applied`](Event::Applied) but are not listed in any of
65 /// the `InitApply` events should be assumed to have been [`Deleted`](Event::Deleted).
66 InitDone,
67}
68
69impl<K> Event<K> {
70 /// Map each object in an event through a mutator fn
71 ///
72 /// This allows for memory optimizations in watch streams.
73 /// If you are chaining a watch stream into a reflector as an in memory state store,
74 /// you can control the space used by each object by dropping fields.
75 ///
76 /// ```no_run
77 /// use k8s_openapi::api::core::v1::Pod;
78 /// use kube::ResourceExt;
79 /// # use kube::runtime::watcher::Event;
80 /// # let event: Event<Pod> = todo!();
81 /// event.modify(|pod| {
82 /// pod.managed_fields_mut().clear();
83 /// pod.annotations_mut().clear();
84 /// pod.status = None;
85 /// });
86 /// ```
87 #[must_use]
88 pub fn modify(mut self, mut f: impl FnMut(&mut K)) -> Self {
89 match &mut self {
90 Self::Apply(obj) | Self::Delete(obj) | Self::InitApply(obj) => (f)(obj),
91 Self::Init | Self::InitDone => {} // markers, nothing to modify
92 }
93 self
94 }
95}
96
97#[derive(Educe, Default)]
98#[educe(Debug)]
99/// The internal finite state machine driving the [`watcher`]
100enum State<K> {
101 /// The Watcher is empty, and the next [`poll`](Stream::poll_next) will start the initial LIST to get all existing objects
102 #[default]
103 Empty,
104 /// The Watcher is in the process of paginating through the initial LIST
105 InitPage {
106 continue_token: Option<String>,
107 objects: VecDeque<K>,
108 last_bookmark: Option<String>,
109 },
110 /// Kubernetes 1.27 Streaming Lists
111 /// The initial watch is in progress
112 InitialWatch {
113 #[educe(Debug(ignore))]
114 stream: BoxStream<'static, kube_client::Result<WatchEvent<K>>>,
115 },
116 /// The initial LIST was successful, so we should move on to starting the actual watch.
117 InitListed { resource_version: String },
118 /// The watch is in progress, from this point we just return events from the server.
119 ///
120 /// If the connection is disrupted then we propagate the error but try to restart the watch stream by
121 /// returning to the `InitListed` state.
122 /// If we fall out of the K8s watch window then we propagate the error and fall back doing a re-list
123 /// with `Empty`.
124 Watching {
125 resource_version: String,
126 #[educe(Debug(ignore))]
127 stream: BoxStream<'static, kube_client::Result<WatchEvent<K>>>,
128 },
129}
130
131/// Used to control whether the watcher receives the full object, or only the
132/// metadata
133trait ApiMode {
134 type Value: Clone;
135
136 async fn list(&self, lp: &ListParams) -> kube_client::Result<ObjectList<Self::Value>>;
137 async fn watch(
138 &self,
139 wp: &WatchParams,
140 version: &str,
141 ) -> kube_client::Result<BoxStream<'static, kube_client::Result<WatchEvent<Self::Value>>>>;
142}
143
144/// A wrapper around the `Api` of a `Resource` type that when used by the
145/// watcher will return the entire (full) object
146struct FullObject<'a, K> {
147 api: &'a Api<K>,
148}
149
150/// Configurable list semantics for `watcher` relists
151#[derive(Clone, Default, Debug, PartialEq)]
152pub enum ListSemantic {
153 /// List calls perform a full quorum read for most recent results
154 ///
155 /// Prefer this if you have strong consistency requirements. Note that this
156 /// is more taxing for the apiserver and can be less scalable for the cluster.
157 ///
158 /// If you are observing large resource sets (such as congested `Controller` cases),
159 /// you typically have a delay between the list call completing, and all the events
160 /// getting processed. In such cases, it is probably worth picking `Any` over `MostRecent`,
161 /// as your events are not guaranteed to be up-to-date by the time you get to them anyway.
162 #[default]
163 MostRecent,
164
165 /// List calls returns cached results from apiserver
166 ///
167 /// This is faster and much less taxing on the apiserver, but can result
168 /// in much older results than has previously observed for `Restarted` events,
169 /// particularly in HA configurations, due to partitions or stale caches.
170 ///
171 /// This option makes the most sense for controller usage where events have
172 /// some delay between being seen by the runtime, and it being sent to the reconciler.
173 Any,
174}
175
176/// Configurable watcher listwatch semantics
177
178#[derive(Clone, Default, Debug, PartialEq)]
179pub enum InitialListStrategy {
180 /// List first, then watch from given resouce version
181 ///
182 /// This is the old and default way of watching. The watcher will do a paginated list call first before watching.
183 /// When using this mode, you can configure the `page_size` on the watcher.
184 #[default]
185 ListWatch,
186 /// Kubernetes 1.27 Streaming Lists
187 ///
188 /// See [upstream documentation on streaming lists](https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists),
189 /// and the [KEP](https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details).
190 StreamingList,
191}
192
193/// Accumulates all options that can be used on the watcher invocation.
194#[derive(Clone, Debug, PartialEq)]
195pub struct Config {
196 /// A selector to restrict the list of returned objects by their labels.
197 ///
198 /// Defaults to everything if `None`.
199 pub label_selector: Option<String>,
200
201 /// A selector to restrict the list of returned objects by their fields.
202 ///
203 /// Defaults to everything if `None`.
204 pub field_selector: Option<String>,
205
206 /// Timeout for the list/watch call.
207 ///
208 /// This limits the duration of the call, regardless of any activity or inactivity.
209 /// If unset for a watch call, we will use 290s.
210 /// We limit this to 295s due to [inherent watch limitations](https://github.com/kubernetes/kubernetes/issues/6513).
211 pub timeout: Option<u32>,
212
213 /// Semantics for list calls.
214 ///
215 /// Configures re-list for performance vs. consistency.
216 ///
217 /// NB: This option only has an effect for [`InitialListStrategy::ListWatch`].
218 pub list_semantic: ListSemantic,
219
220 /// Control how the watcher fetches the initial list of objects.
221 ///
222 /// - `ListWatch`: The watcher will fetch the initial list of objects using a list call.
223 /// - `StreamingList`: The watcher will fetch the initial list of objects using a watch call.
224 ///
225 /// `StreamingList` is more efficient than `ListWatch`, but it requires the server to support
226 /// streaming list bookmarks (opt-in feature gate in Kubernetes 1.27).
227 ///
228 /// See [upstream documentation on streaming lists](https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists),
229 /// and the [KEP](https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details).
230 pub initial_list_strategy: InitialListStrategy,
231
232 /// Maximum number of objects retrieved per list operation resyncs.
233 ///
234 /// This can reduce the memory consumption during resyncs, at the cost of requiring more
235 /// API roundtrips to complete.
236 ///
237 /// Defaults to 500. Note that `None` represents unbounded.
238 ///
239 /// NB: This option only has an effect for [`InitialListStrategy::ListWatch`].
240 pub page_size: Option<u32>,
241
242 /// Enables watch events with type "BOOKMARK".
243 ///
244 /// Requests watch bookmarks from the apiserver when enabled for improved watch precision and reduced list calls.
245 /// This is default enabled and should generally not be turned off.
246 pub bookmarks: bool,
247}
248
249impl Default for Config {
250 fn default() -> Self {
251 Self {
252 bookmarks: true,
253 label_selector: None,
254 field_selector: None,
255 timeout: None,
256 list_semantic: ListSemantic::default(),
257 // same default page size limit as client-go
258 // https://github.com/kubernetes/client-go/blob/aed71fa5cf054e1c196d67b2e21f66fd967b8ab1/tools/pager/pager.go#L31
259 page_size: Some(500),
260 initial_list_strategy: InitialListStrategy::ListWatch,
261 }
262 }
263}
264
265/// Builder interface to Config
266///
267/// Usage:
268/// ```
269/// use kube::runtime::watcher::Config;
270/// let wc = Config::default()
271/// .timeout(60)
272/// .labels("kubernetes.io/lifecycle=spot");
273/// ```
274impl Config {
275 /// Configure the timeout for list/watch calls
276 ///
277 /// This limits the duration of the call, regardless of any activity or inactivity.
278 /// Defaults to 290s
279 #[must_use]
280 pub fn timeout(mut self, timeout_secs: u32) -> Self {
281 self.timeout = Some(timeout_secs);
282 self
283 }
284
285 /// Configure the selector to restrict the list of returned objects by their fields.
286 ///
287 /// Defaults to everything.
288 /// Supports `=`, `==`, `!=`, and can be comma separated: `key1=value1,key2=value2`.
289 /// The server only supports a limited number of field queries per type.
290 #[must_use]
291 pub fn fields(mut self, field_selector: &str) -> Self {
292 self.field_selector = Some(field_selector.to_string());
293 self
294 }
295
296 /// Configure the selector to restrict the list of returned objects by their labels.
297 ///
298 /// Defaults to everything.
299 /// Supports `=`, `==`, `!=`, and can be comma separated: `key1=value1,key2=value2`.
300 #[must_use]
301 pub fn labels(mut self, label_selector: &str) -> Self {
302 self.label_selector = Some(label_selector.to_string());
303 self
304 }
305
306 /// Configure typed label selectors
307 ///
308 /// Configure typed selectors from [`Selector`](kube_client::core::Selector) and [`Expression`](kube_client::core::Expression) lists.
309 ///
310 /// ```
311 /// use kube_runtime::watcher::Config;
312 /// use kube_client::core::{Expression, Selector, ParseExpressionError};
313 /// use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector;
314 /// let selector: Selector = Expression::In("env".into(), ["development".into(), "sandbox".into()].into()).into();
315 /// let cfg = Config::default().labels_from(&selector);
316 /// let cfg = Config::default().labels_from(&Expression::Exists("foo".into()).into());
317 /// let selector: Selector = LabelSelector::default().try_into()?;
318 /// let cfg = Config::default().labels_from(&selector);
319 /// # Ok::<(), ParseExpressionError>(())
320 ///```
321 #[must_use]
322 pub fn labels_from(mut self, selector: &Selector) -> Self {
323 self.label_selector = Some(selector.to_string());
324 self
325 }
326
327 /// Sets list semantic to configure re-list performance and consistency
328 ///
329 /// NB: This option only has an effect for [`InitialListStrategy::ListWatch`].
330 #[must_use]
331 pub fn list_semantic(mut self, semantic: ListSemantic) -> Self {
332 self.list_semantic = semantic;
333 self
334 }
335
336 /// Sets list semantic to `Any` to improve list performance
337 ///
338 /// NB: This option only has an effect for [`InitialListStrategy::ListWatch`].
339 #[must_use]
340 pub fn any_semantic(self) -> Self {
341 self.list_semantic(ListSemantic::Any)
342 }
343
344 /// Disables watch bookmarks to simplify watch handling
345 ///
346 /// This is not recommended to use with production watchers as it can cause desyncs.
347 /// See [#219](https://github.com/kube-rs/kube/issues/219) for details.
348 #[must_use]
349 pub fn disable_bookmarks(mut self) -> Self {
350 self.bookmarks = false;
351 self
352 }
353
354 /// Limits the number of objects retrieved in each list operation during resync.
355 ///
356 /// This can reduce the memory consumption during resyncs, at the cost of requiring more
357 /// API roundtrips to complete.
358 ///
359 /// NB: This option only has an effect for [`InitialListStrategy::ListWatch`].
360 #[must_use]
361 pub fn page_size(mut self, page_size: u32) -> Self {
362 self.page_size = Some(page_size);
363 self
364 }
365
366 /// Kubernetes 1.27 Streaming Lists
367 /// Sets list semantic to `Stream` to make use of watch bookmarks
368 #[must_use]
369 pub fn streaming_lists(mut self) -> Self {
370 self.initial_list_strategy = InitialListStrategy::StreamingList;
371 self
372 }
373
374 /// Converts generic `watcher::Config` structure to the instance of `ListParams` used for list requests.
375 fn to_list_params(&self) -> ListParams {
376 let (resource_version, version_match) = match self.list_semantic {
377 ListSemantic::Any => (Some("0".into()), Some(VersionMatch::NotOlderThan)),
378 ListSemantic::MostRecent => (None, None),
379 };
380 ListParams {
381 label_selector: self.label_selector.clone(),
382 field_selector: self.field_selector.clone(),
383 timeout: self.timeout,
384 version_match,
385 resource_version,
386 // The watcher handles pagination internally.
387 limit: self.page_size,
388 continue_token: None,
389 }
390 }
391
392 /// Converts generic `watcher::Config` structure to the instance of `WatchParams` used for watch requests.
393 fn to_watch_params(&self) -> WatchParams {
394 WatchParams {
395 label_selector: self.label_selector.clone(),
396 field_selector: self.field_selector.clone(),
397 timeout: self.timeout,
398 bookmarks: self.bookmarks,
399 send_initial_events: self.initial_list_strategy == InitialListStrategy::StreamingList,
400 }
401 }
402}
403
404impl<K> ApiMode for FullObject<'_, K>
405where
406 K: Clone + Debug + DeserializeOwned + Send + 'static,
407{
408 type Value = K;
409
410 async fn list(&self, lp: &ListParams) -> kube_client::Result<ObjectList<Self::Value>> {
411 self.api.list(lp).await
412 }
413
414 async fn watch(
415 &self,
416 wp: &WatchParams,
417 version: &str,
418 ) -> kube_client::Result<BoxStream<'static, kube_client::Result<WatchEvent<Self::Value>>>> {
419 self.api.watch(wp, version).await.map(StreamExt::boxed)
420 }
421}
422
423/// A wrapper around the `Api` of a `Resource` type that when used by the
424/// watcher will return only the metadata associated with an object
425struct MetaOnly<'a, K> {
426 api: &'a Api<K>,
427}
428
429impl<K> ApiMode for MetaOnly<'_, K>
430where
431 K: Clone + Debug + DeserializeOwned + Send + 'static,
432{
433 type Value = PartialObjectMeta<K>;
434
435 async fn list(&self, lp: &ListParams) -> kube_client::Result<ObjectList<Self::Value>> {
436 self.api.list_metadata(lp).await
437 }
438
439 async fn watch(
440 &self,
441 wp: &WatchParams,
442 version: &str,
443 ) -> kube_client::Result<BoxStream<'static, kube_client::Result<WatchEvent<Self::Value>>>> {
444 self.api.watch_metadata(wp, version).await.map(StreamExt::boxed)
445 }
446}
447
448/// Progresses the watcher a single step, returning (event, state)
449///
450/// This function should be trampolined: if event == `None`
451/// then the function should be called again until it returns a Some.
452#[allow(clippy::too_many_lines)] // for now
453async fn step_trampolined<A>(
454 api: &A,
455 wc: &Config,
456 state: State<A::Value>,
457) -> (Option<Result<Event<A::Value>>>, State<A::Value>)
458where
459 A: ApiMode,
460 A::Value: Resource + 'static,
461{
462 match state {
463 State::Empty => match wc.initial_list_strategy {
464 InitialListStrategy::ListWatch => (Some(Ok(Event::Init)), State::InitPage {
465 continue_token: None,
466 objects: VecDeque::default(),
467 last_bookmark: None,
468 }),
469 InitialListStrategy::StreamingList => match api.watch(&wc.to_watch_params(), "0").await {
470 Ok(stream) => (None, State::InitialWatch { stream }),
471 Err(err) => {
472 if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) {
473 warn!("watch initlist error with 403: {err:?}");
474 } else {
475 debug!("watch initlist error: {err:?}");
476 }
477 (Some(Err(Error::WatchStartFailed(err))), State::default())
478 }
479 },
480 },
481 State::InitPage {
482 continue_token,
483 mut objects,
484 last_bookmark,
485 } => {
486 if let Some(next) = objects.pop_front() {
487 return (Some(Ok(Event::InitApply(next))), State::InitPage {
488 continue_token,
489 objects,
490 last_bookmark,
491 });
492 }
493 // check if we need to perform more pages
494 if continue_token.is_none() {
495 if let Some(resource_version) = last_bookmark {
496 // we have drained the last page - move on to next stage
497 return (Some(Ok(Event::InitDone)), State::InitListed { resource_version });
498 }
499 }
500 let mut lp = wc.to_list_params();
501 lp.continue_token = continue_token;
502 match api.list(&lp).await {
503 Ok(list) => {
504 let last_bookmark = list.metadata.resource_version.filter(|s| !s.is_empty());
505 let continue_token = list.metadata.continue_.filter(|s| !s.is_empty());
506 if last_bookmark.is_none() && continue_token.is_none() {
507 return (Some(Err(Error::NoResourceVersion)), State::Empty);
508 }
509 // Buffer page here, causing us to return to this enum branch (State::InitPage)
510 // until the objects buffer has drained
511 (None, State::InitPage {
512 continue_token,
513 objects: list.items.into_iter().collect(),
514 last_bookmark,
515 })
516 }
517 Err(err) => {
518 if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) {
519 warn!("watch list error with 403: {err:?}");
520 } else {
521 debug!("watch list error: {err:?}");
522 }
523 (Some(Err(Error::InitialListFailed(err))), State::Empty)
524 }
525 }
526 }
527 State::InitialWatch { mut stream } => {
528 match stream.next().await {
529 Some(Ok(WatchEvent::Added(obj) | WatchEvent::Modified(obj))) => {
530 (Some(Ok(Event::InitApply(obj))), State::InitialWatch { stream })
531 }
532 Some(Ok(WatchEvent::Deleted(_obj))) => {
533 // Kubernetes claims these events are impossible
534 // https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists
535 error!("got deleted event during initial watch. this is a bug");
536 (None, State::InitialWatch { stream })
537 }
538 Some(Ok(WatchEvent::Bookmark(bm))) => {
539 let marks_initial_end = bm.metadata.annotations.contains_key("k8s.io/initial-events-end");
540 if marks_initial_end {
541 (Some(Ok(Event::InitDone)), State::Watching {
542 resource_version: bm.metadata.resource_version,
543 stream,
544 })
545 } else {
546 (None, State::InitialWatch { stream })
547 }
548 }
549 Some(Ok(WatchEvent::Error(err))) => {
550 // HTTP GONE, means we have desynced and need to start over and re-list :(
551 let new_state = if err.code == 410 {
552 State::default()
553 } else {
554 State::InitialWatch { stream }
555 };
556 if err.code == 403 {
557 warn!("watcher watchevent error 403: {err:?}");
558 } else {
559 debug!("error watchevent error: {err:?}");
560 }
561 (Some(Err(Error::WatchError(err))), new_state)
562 }
563 Some(Err(err)) => {
564 if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) {
565 warn!("watcher error 403: {err:?}");
566 } else {
567 debug!("watcher error: {err:?}");
568 }
569 (Some(Err(Error::WatchFailed(err))), State::InitialWatch { stream })
570 }
571 None => (None, State::default()),
572 }
573 }
574 State::InitListed { resource_version } => {
575 match api.watch(&wc.to_watch_params(), &resource_version).await {
576 Ok(stream) => (None, State::Watching {
577 resource_version,
578 stream,
579 }),
580 Err(err) => {
581 if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) {
582 warn!("watch initlist error with 403: {err:?}");
583 } else {
584 debug!("watch initlist error: {err:?}");
585 }
586 (Some(Err(Error::WatchStartFailed(err))), State::InitListed {
587 resource_version,
588 })
589 }
590 }
591 }
592 State::Watching {
593 resource_version,
594 mut stream,
595 } => match stream.next().await {
596 Some(Ok(WatchEvent::Added(obj) | WatchEvent::Modified(obj))) => {
597 let resource_version = obj.resource_version().unwrap_or_default();
598 if resource_version.is_empty() {
599 (Some(Err(Error::NoResourceVersion)), State::default())
600 } else {
601 (Some(Ok(Event::Apply(obj))), State::Watching {
602 resource_version,
603 stream,
604 })
605 }
606 }
607 Some(Ok(WatchEvent::Deleted(obj))) => {
608 let resource_version = obj.resource_version().unwrap_or_default();
609 if resource_version.is_empty() {
610 (Some(Err(Error::NoResourceVersion)), State::default())
611 } else {
612 (Some(Ok(Event::Delete(obj))), State::Watching {
613 resource_version,
614 stream,
615 })
616 }
617 }
618 Some(Ok(WatchEvent::Bookmark(bm))) => (None, State::Watching {
619 resource_version: bm.metadata.resource_version,
620 stream,
621 }),
622 Some(Ok(WatchEvent::Error(err))) => {
623 // HTTP GONE, means we have desynced and need to start over and re-list :(
624 let new_state = if err.code == 410 {
625 State::default()
626 } else {
627 State::Watching {
628 resource_version,
629 stream,
630 }
631 };
632 if err.code == 403 {
633 warn!("watcher watchevent error 403: {err:?}");
634 } else {
635 debug!("error watchevent error: {err:?}");
636 }
637 (Some(Err(Error::WatchError(err))), new_state)
638 }
639 Some(Err(err)) => {
640 if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) {
641 warn!("watcher error 403: {err:?}");
642 } else {
643 debug!("watcher error: {err:?}");
644 }
645 (Some(Err(Error::WatchFailed(err))), State::Watching {
646 resource_version,
647 stream,
648 })
649 }
650 None => (None, State::InitListed { resource_version }),
651 },
652 }
653}
654
655/// Trampoline helper for `step_trampolined`
656async fn step<A>(
657 api: &A,
658 config: &Config,
659 mut state: State<A::Value>,
660) -> (Result<Event<A::Value>>, State<A::Value>)
661where
662 A: ApiMode,
663 A::Value: Resource + 'static,
664{
665 loop {
666 match step_trampolined(api, config, state).await {
667 (Some(result), new_state) => return (result, new_state),
668 (None, new_state) => state = new_state,
669 }
670 }
671}
672
673/// Watches a Kubernetes Resource for changes continuously
674///
675/// Compared to [`Api::watch`], this automatically tries to recover the stream upon errors.
676///
677/// Errors from the underlying watch are propagated, after which the stream will go into recovery mode on the next poll.
678/// You can apply your own backoff by not polling the stream for a duration after errors.
679/// Keep in mind that some [`TryStream`](futures::TryStream) combinators (such as
680/// [`try_for_each`](futures::TryStreamExt::try_for_each) and [`try_concat`](futures::TryStreamExt::try_concat))
681/// will terminate eagerly as soon as they receive an [`Err`].
682///
683/// The events are intended to provide a safe input interface for a state store like a [`reflector`].
684/// Direct users may want to use [`WatchStreamExt`] for higher-level constructs.
685///
686/// ```no_run
687/// use kube::{
688/// api::{Api, ResourceExt}, Client,
689/// runtime::{watcher, WatchStreamExt}
690/// };
691/// use k8s_openapi::api::core::v1::Pod;
692/// use futures::TryStreamExt;
693/// #[tokio::main]
694/// async fn main() -> Result<(), watcher::Error> {
695/// let client = Client::try_default().await.unwrap();
696/// let pods: Api<Pod> = Api::namespaced(client, "apps");
697///
698/// watcher(pods, watcher::Config::default()).applied_objects()
699/// .try_for_each(|p| async move {
700/// println!("Applied: {}", p.name_any());
701/// Ok(())
702/// })
703/// .await?;
704/// Ok(())
705/// }
706/// ```
707/// [`WatchStreamExt`]: super::WatchStreamExt
708/// [`reflector`]: super::reflector::reflector
709/// [`Api::watch`]: kube_client::Api::watch
710///
711/// # Recovery
712///
713/// The stream will attempt to be recovered on the next poll after an [`Err`] is returned.
714/// This will normally happen immediately, but you can use [`StreamBackoff`](crate::utils::StreamBackoff)
715/// to introduce an artificial delay. [`default_backoff`] returns a suitable default set of parameters.
716///
717/// If the watch connection is interrupted, then `watcher` will attempt to restart the watch using the last
718/// [resource version](https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes)
719/// that we have seen on the stream. If this is successful then the stream is simply resumed from where it left off.
720/// If this fails because the resource version is no longer valid then we start over with a new stream, starting with
721/// an [`Event::Init`]. The internals mechanics of recovery should be considered an implementation detail.
722pub fn watcher<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
723 api: Api<K>,
724 watcher_config: Config,
725) -> impl Stream<Item = Result<Event<K>>> + Send {
726 futures::stream::unfold(
727 (api, watcher_config, State::default()),
728 |(api, watcher_config, state)| async {
729 let (event, state) = step(&FullObject { api: &api }, &watcher_config, state).await;
730 Some((event, (api, watcher_config, state)))
731 },
732 )
733}
734
735/// Watches a Kubernetes Resource for changes continuously and receives only the
736/// metadata
737///
738/// Compared to [`Api::watch_metadata`], this automatically tries to recover the stream upon errors.
739///
740/// Errors from the underlying watch are propagated, after which the stream will go into recovery mode on the next poll.
741/// You can apply your own backoff by not polling the stream for a duration after errors.
742/// Keep in mind that some [`TryStream`](futures::TryStream) combinators (such as
743/// [`try_for_each`](futures::TryStreamExt::try_for_each) and [`try_concat`](futures::TryStreamExt::try_concat))
744/// will terminate eagerly as soon as they receive an [`Err`].
745///
746/// The events are intended to provide a safe input interface for a state store like a [`reflector`].
747/// Direct users may want to use [`WatchStreamExt`] for higher-level constructs.
748///
749/// ```no_run
750/// use kube::{
751/// api::{Api, ResourceExt}, Client,
752/// runtime::{watcher, metadata_watcher, WatchStreamExt}
753/// };
754/// use k8s_openapi::api::core::v1::Pod;
755/// use futures::TryStreamExt;
756/// #[tokio::main]
757/// async fn main() -> Result<(), watcher::Error> {
758/// let client = Client::try_default().await.unwrap();
759/// let pods: Api<Pod> = Api::namespaced(client, "apps");
760///
761/// metadata_watcher(pods, watcher::Config::default()).applied_objects()
762/// .try_for_each(|p| async move {
763/// println!("Applied: {}", p.name_any());
764/// Ok(())
765/// })
766/// .await?;
767/// Ok(())
768/// }
769/// ```
770/// [`WatchStreamExt`]: super::WatchStreamExt
771/// [`reflector`]: super::reflector::reflector
772/// [`Api::watch`]: kube_client::Api::watch
773///
774/// # Recovery
775///
776/// The stream will attempt to be recovered on the next poll after an [`Err`] is returned.
777/// This will normally happen immediately, but you can use [`StreamBackoff`](crate::utils::StreamBackoff)
778/// to introduce an artificial delay. [`default_backoff`] returns a suitable default set of parameters.
779///
780/// If the watch connection is interrupted, then `watcher` will attempt to restart the watch using the last
781/// [resource version](https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes)
782/// that we have seen on the stream. If this is successful then the stream is simply resumed from where it left off.
783/// If this fails because the resource version is no longer valid then we start over with a new stream, starting with
784/// an [`Event::Init`]. The internals mechanics of recovery should be considered an implementation detail.
785#[allow(clippy::module_name_repetitions)]
786pub fn metadata_watcher<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
787 api: Api<K>,
788 watcher_config: Config,
789) -> impl Stream<Item = Result<Event<PartialObjectMeta<K>>>> + Send {
790 futures::stream::unfold(
791 (api, watcher_config, State::default()),
792 |(api, watcher_config, state)| async {
793 let (event, state) = step(&MetaOnly { api: &api }, &watcher_config, state).await;
794 Some((event, (api, watcher_config, state)))
795 },
796 )
797}
798
799/// Watch a single named object for updates
800///
801/// Emits `None` if the object is deleted (or not found), and `Some` if an object is updated (or created/found).
802///
803/// Often invoked indirectly via [`await_condition`](crate::wait::await_condition()).
804///
805/// ## Scope Warning
806///
807/// When using this with an `Api::all` on namespaced resources there is a chance of duplicated names.
808/// To avoid getting confusing / wrong answers for this, use `Api::namespaced` bound to a specific namespace
809/// when watching for transitions to namespaced objects.
810pub fn watch_object<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
811 api: Api<K>,
812 name: &str,
813) -> impl Stream<Item = Result<Option<K>>> + Send {
814 // filtering by object name in given scope, so there's at most one matching object
815 // footgun: Api::all may generate events from namespaced objects with the same name in different namespaces
816 let fields = format!("metadata.name={name}");
817 watcher(api, Config::default().fields(&fields))
818 // The `obj_seen` state is used to track whether the object exists in each Init / InitApply / InitDone
819 // sequence of events. If the object wasn't seen in any particular sequence it is treated as deleted and
820 // `None` is emitted when the InitDone event is received.
821 //
822 // The first check ensures `None` is emitted if the object was already gone (or not found), subsequent
823 // checks ensure `None` is emitted even if for some reason the Delete event wasn't received, which
824 // could happen given K8S events aren't guaranteed delivery.
825 .scan(false, |obj_seen, event| {
826 if matches!(event, Ok(Event::Init)) {
827 *obj_seen = false;
828 } else if matches!(event, Ok(Event::InitApply(_))) {
829 *obj_seen = true;
830 }
831 future::ready(Some((*obj_seen, event)))
832 })
833 .filter_map(|(obj_seen, event)| async move {
834 match event {
835 // Pass up `Some` for Found / Updated
836 Ok(Event::Apply(obj) | Event::InitApply(obj)) => Some(Ok(Some(obj))),
837 // Pass up `None` for Deleted
838 Ok(Event::Delete(_)) => Some(Ok(None)),
839 // Pass up `None` if the object wasn't seen in the initial list
840 Ok(Event::InitDone) if !obj_seen => Some(Ok(None)),
841 // Ignore marker events
842 Ok(Event::Init | Event::InitDone) => None,
843 // Bubble up errors
844 Err(err) => Some(Err(err)),
845 }
846 })
847}
848
849pub struct ExponentialBackoff {
850 inner: backon::ExponentialBackoff,
851 builder: backon::ExponentialBuilder,
852}
853
854impl ExponentialBackoff {
855 fn new(min_delay: Duration, max_delay: Duration, factor: f32, enable_jitter: bool) -> Self {
856 let builder = backon::ExponentialBuilder::default()
857 .with_min_delay(min_delay)
858 .with_max_delay(max_delay)
859 .with_factor(factor)
860 .without_max_times();
861
862 if enable_jitter {
863 builder.with_jitter();
864 }
865
866 Self {
867 inner: builder.build(),
868 builder,
869 }
870 }
871}
872
873impl Backoff for ExponentialBackoff {
874 fn reset(&mut self) {
875 self.inner = self.builder.build();
876 }
877}
878
879impl Iterator for ExponentialBackoff {
880 type Item = Duration;
881
882 fn next(&mut self) -> Option<Self::Item> {
883 self.inner.next()
884 }
885}
886
887impl From<backon::ExponentialBuilder> for ExponentialBackoff {
888 fn from(builder: backon::ExponentialBuilder) -> Self {
889 Self {
890 inner: builder.build(),
891 builder,
892 }
893 }
894}
895
896/// Default watcher backoff inspired by Kubernetes' client-go.
897///
898/// The parameters currently optimize for being kind to struggling apiservers.
899/// The exact parameters are taken from
900/// [client-go's reflector source](https://github.com/kubernetes/client-go/blob/980663e185ab6fc79163b1c2565034f6d58368db/tools/cache/reflector.go#L177-L181)
901/// and should not be considered stable.
902///
903/// This struct implements [`Backoff`] and is the default strategy used
904/// when calling `WatchStreamExt::default_backoff`. If you need to create
905/// this manually then [`DefaultBackoff::default`] can be used.
906pub struct DefaultBackoff(Strategy);
907type Strategy = ResetTimerBackoff<ExponentialBackoff>;
908
909impl Default for DefaultBackoff {
910 fn default() -> Self {
911 Self(ResetTimerBackoff::new(
912 ExponentialBackoff::new(Duration::from_millis(800), Duration::from_secs(30), 2.0, true),
913 Duration::from_secs(120),
914 ))
915 }
916}
917
918impl Iterator for DefaultBackoff {
919 type Item = Duration;
920
921 fn next(&mut self) -> Option<Self::Item> {
922 self.0.next()
923 }
924}
925
926impl Backoff for DefaultBackoff {
927 fn reset(&mut self) {
928 self.0.reset();
929 }
930}