kube_runtime/utils/
watch_ext.rs

1use crate::{
2    utils::{
3        event_decode::EventDecode,
4        event_modify::EventModify,
5        predicate::{Predicate, PredicateFilter},
6        stream_backoff::StreamBackoff,
7    },
8    watcher,
9};
10use kube_client::Resource;
11
12use crate::{reflector::store::Writer, utils::Reflect};
13
14use crate::watcher::DefaultBackoff;
15use backoff::backoff::Backoff;
16use futures::{Stream, TryStream};
17
18/// Extension trait for streams returned by [`watcher`](watcher()) or [`reflector`](crate::reflector::reflector)
19pub trait WatchStreamExt: Stream {
20    /// Apply the [`DefaultBackoff`] watcher [`Backoff`] policy
21    ///
22    /// This is recommended for controllers that want to play nicely with the apiserver.
23    fn default_backoff(self) -> StreamBackoff<Self, DefaultBackoff>
24    where
25        Self: TryStream + Sized,
26    {
27        StreamBackoff::new(self, DefaultBackoff::default())
28    }
29
30    /// Apply a specific [`Backoff`] policy to a [`Stream`] using [`StreamBackoff`]
31    fn backoff<B>(self, b: B) -> StreamBackoff<Self, B>
32    where
33        B: Backoff,
34        Self: TryStream + Sized,
35    {
36        StreamBackoff::new(self, b)
37    }
38
39    /// Decode a [`watcher()`] stream into a stream of applied objects
40    ///
41    /// All Added/Modified events are passed through, and critical errors bubble up.
42    fn applied_objects<K>(self) -> EventDecode<Self>
43    where
44        Self: Stream<Item = Result<watcher::Event<K>, watcher::Error>> + Sized,
45    {
46        EventDecode::new(self, false)
47    }
48
49    /// Decode a [`watcher()`] stream into a stream of touched objects
50    ///
51    /// All Added/Modified/Deleted events are passed through, and critical errors bubble up.
52    fn touched_objects<K>(self) -> EventDecode<Self>
53    where
54        Self: Stream<Item = Result<watcher::Event<K>, watcher::Error>> + Sized,
55    {
56        EventDecode::new(self, true)
57    }
58
59    /// Modify elements of a [`watcher()`] stream.
60    ///
61    /// Calls [`watcher::Event::modify()`] on every element.
62    /// Stream shorthand for `stream.map_ok(|event| { event.modify(f) })`.
63    ///
64    /// ```no_run
65    /// # use std::pin::pin;
66    /// # use futures::{Stream, StreamExt, TryStreamExt};
67    /// # use kube::{Api, Client, ResourceExt};
68    /// # use kube_runtime::{watcher, WatchStreamExt};
69    /// # use k8s_openapi::api::apps::v1::Deployment;
70    /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
71    /// # let client: kube::Client = todo!();
72    /// let deploys: Api<Deployment> = Api::all(client);
73    /// let mut truncated_deploy_stream = pin!(watcher(deploys, watcher::Config::default())
74    ///     .modify(|deploy| {
75    ///         deploy.managed_fields_mut().clear();
76    ///         deploy.status = None;
77    ///     })
78    ///     .applied_objects());
79    ///
80    /// while let Some(d) = truncated_deploy_stream.try_next().await? {
81    ///    println!("Truncated Deployment: '{:?}'", serde_json::to_string(&d)?);
82    /// }
83    /// # Ok(())
84    /// # }
85    /// ```
86    fn modify<F, K>(self, f: F) -> EventModify<Self, F>
87    where
88        Self: Stream<Item = Result<watcher::Event<K>, watcher::Error>> + Sized,
89        F: FnMut(&mut K),
90    {
91        EventModify::new(self, f)
92    }
93
94    /// Filter a stream based on on [`predicates`](crate::predicates).
95    ///
96    /// This will filter out repeat calls where the predicate returns the same result.
97    /// Common use case for this is to avoid repeat events for status updates
98    /// by filtering on [`predicates::generation`](crate::predicates::generation).
99    ///
100    /// ## Usage
101    /// ```no_run
102    /// # use std::pin::pin;
103    /// # use futures::{Stream, StreamExt, TryStreamExt};
104    /// use kube::{Api, Client, ResourceExt};
105    /// use kube_runtime::{watcher, WatchStreamExt, predicates};
106    /// use k8s_openapi::api::apps::v1::Deployment;
107    /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
108    /// # let client: kube::Client = todo!();
109    /// let deploys: Api<Deployment> = Api::default_namespaced(client);
110    /// let mut changed_deploys = pin!(watcher(deploys, watcher::Config::default())
111    ///     .applied_objects()
112    ///     .predicate_filter(predicates::generation));
113    ///
114    /// while let Some(d) = changed_deploys.try_next().await? {
115    ///    println!("saw Deployment '{} with hitherto unseen generation", d.name_any());
116    /// }
117    /// # Ok(())
118    /// # }
119    /// ```
120    fn predicate_filter<K, P>(self, predicate: P) -> PredicateFilter<Self, K, P>
121    where
122        Self: Stream<Item = Result<K, watcher::Error>> + Sized,
123        K: Resource + 'static,
124        P: Predicate<K> + 'static,
125    {
126        PredicateFilter::new(self, predicate)
127    }
128
129    /// Reflect a [`watcher()`] stream into a [`Store`] through a [`Writer`]
130    ///
131    /// Returns the stream unmodified, but passes every [`watcher::Event`] through a [`Writer`].
132    /// This populates a [`Store`] as the stream is polled.
133    ///
134    /// ## Usage
135    /// ```no_run
136    /// # use futures::{Stream, StreamExt, TryStreamExt};
137    /// # use std::time::Duration;
138    /// # use tracing::{info, warn};
139    /// use kube::{Api, Client, ResourceExt};
140    /// use kube_runtime::{watcher, WatchStreamExt, reflector};
141    /// use k8s_openapi::api::apps::v1::Deployment;
142    /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
143    /// # let client: kube::Client = todo!();
144    ///
145    /// let deploys: Api<Deployment> = Api::default_namespaced(client);
146    /// let (reader, writer) = reflector::store::<Deployment>();
147    ///
148    /// tokio::spawn(async move {
149    ///     // start polling the store once the reader is ready
150    ///     reader.wait_until_ready().await.unwrap();
151    ///     loop {
152    ///         let names = reader.state().iter().map(|d| d.name_any()).collect::<Vec<_>>();
153    ///         info!("Current {} deploys: {:?}", names.len(), names);
154    ///         tokio::time::sleep(Duration::from_secs(10)).await;
155    ///     }
156    /// });
157    ///
158    /// // configure the watcher stream and populate the store while polling
159    /// watcher(deploys, watcher::Config::default())
160    ///     .reflect(writer)
161    ///     .applied_objects()
162    ///     .for_each(|res| async move {
163    ///         match res {
164    ///             Ok(o) => info!("saw {}", o.name_any()),
165    ///             Err(e) => warn!("watcher error: {}", e),
166    ///         }
167    ///     })
168    ///     .await;
169    ///
170    /// # Ok(())
171    /// # }
172    /// ```
173    ///
174    /// [`Store`]: crate::reflector::Store
175    fn reflect<K>(self, writer: Writer<K>) -> Reflect<Self, K>
176    where
177        Self: Stream<Item = watcher::Result<watcher::Event<K>>> + Sized,
178        K: Resource + Clone + 'static,
179        K::DynamicType: Eq + std::hash::Hash + Clone,
180    {
181        Reflect::new(self, writer)
182    }
183
184    /// Reflect a shared [`watcher()`] stream into a [`Store`] through a [`Writer`]
185    ///
186    /// Returns the stream unmodified, but passes every [`watcher::Event`]
187    /// through a [`Writer`]. This populates a [`Store`] as the stream is
188    /// polled. When the [`watcher::Event`] is not an error or a
189    /// [`watcher::Event::Deleted`] then its inner object will also be
190    /// propagated to subscribers.
191    ///
192    /// Subscribers can be created by calling [`subscribe()`] on a [`Writer`].
193    /// This will return a [`ReflectHandle`] stream that should be polled
194    /// independently. When the root stream is dropped, or it ends, all [`ReflectHandle`]s
195    /// subscribed to the stream will also terminate after all events yielded by
196    /// the root stream have been observed. This means [`ReflectHandle`] streams
197    /// can still be polled after the root stream has been dropped.
198    ///
199    /// **NB**: This adapter requires an
200    /// [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21)
201    /// feature
202    ///
203    /// ## Warning
204    ///
205    /// If the root [`Stream`] is not polled, [`ReflectHandle`] streams will
206    /// never receive any events. This will cause the streams to deadlock since
207    /// the root stream will apply backpressure when downstream readers are not
208    /// consuming events.
209    ///
210    ///
211    /// [`Store`]: crate::reflector::Store
212    /// [`subscribe()`]: crate::reflector::store::Writer::subscribe()
213    /// [`Stream`]: futures::stream::Stream
214    /// [`ReflectHandle`]: crate::reflector::dispatcher::ReflectHandle
215    /// ## Usage
216    /// ```no_run
217    /// # use futures::StreamExt;
218    /// # use std::time::Duration;
219    /// # use tracing::{info, warn};
220    /// use kube::{Api, ResourceExt};
221    /// use kube_runtime::{watcher, WatchStreamExt, reflector};
222    /// use k8s_openapi::api::apps::v1::Deployment;
223    /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
224    /// # let client: kube::Client = todo!();
225    ///
226    /// let deploys: Api<Deployment> = Api::default_namespaced(client);
227    /// let subscriber_buf_sz = 100;
228    /// let (reader, writer) = reflector::store_shared::<Deployment>(subscriber_buf_sz);
229    /// let subscriber = writer.subscribe().unwrap();
230    ///
231    /// tokio::spawn(async move {
232    ///     // start polling the store once the reader is ready
233    ///     reader.wait_until_ready().await.unwrap();
234    ///     loop {
235    ///         let names = reader.state().iter().map(|d| d.name_any()).collect::<Vec<_>>();
236    ///         info!("Current {} deploys: {:?}", names.len(), names);
237    ///         tokio::time::sleep(Duration::from_secs(10)).await;
238    ///     }
239    /// });
240    ///
241    /// tokio::spawn(async move {
242    ///     // subscriber can be used to receive applied_objects
243    ///     subscriber.for_each(|obj| async move {
244    ///         info!("saw in subscriber {}", &obj.name_any())
245    ///     }).await;
246    /// });
247    ///
248    /// // configure the watcher stream and populate the store while polling
249    /// watcher(deploys, watcher::Config::default())
250    ///     .reflect_shared(writer)
251    ///     .applied_objects()
252    ///     .for_each(|res| async move {
253    ///         match res {
254    ///             Ok(o) => info!("saw in root stream {}", o.name_any()),
255    ///             Err(e) => warn!("watcher error in root stream: {}", e),
256    ///         }
257    ///     })
258    ///     .await;
259    ///
260    /// # Ok(())
261    /// # }
262    /// ```
263    #[cfg(feature = "unstable-runtime-subscribe")]
264    fn reflect_shared<K>(self, writer: Writer<K>) -> impl Stream<Item = Self::Item>
265    where
266        Self: Stream<Item = watcher::Result<watcher::Event<K>>> + Sized,
267        K: Resource + Clone + 'static,
268        K::DynamicType: Eq + std::hash::Hash + Clone,
269    {
270        crate::reflector(writer, self)
271    }
272}
273
274impl<St: ?Sized> WatchStreamExt for St where St: Stream {}
275
276// Compile tests
277#[cfg(test)]
278pub(crate) mod tests {
279    use super::watcher;
280    use crate::{predicates, WatchStreamExt as _};
281    use futures::prelude::*;
282    use k8s_openapi::api::core::v1::Pod;
283    use kube_client::{Api, Resource};
284
285    fn compile_type<T>() -> T {
286        unimplemented!("not called - compile test only")
287    }
288
289    pub fn assert_stream<T, K>(x: T) -> T
290    where
291        T: Stream<Item = watcher::Result<K>> + Send,
292        K: Resource + Clone + Send + 'static,
293    {
294        x
295    }
296
297    // not #[test] because this is only a compile check verification
298    #[allow(dead_code, unused_must_use)]
299    fn test_watcher_stream_type_drift() {
300        let pred_watch = watcher(compile_type::<Api<Pod>>(), Default::default())
301            .touched_objects()
302            .predicate_filter(predicates::generation)
303            .boxed();
304        assert_stream(pred_watch);
305    }
306}