kube_runtime/utils/watch_ext.rs
1use crate::{
2 utils::{
3 event_decode::EventDecode,
4 event_modify::EventModify,
5 predicate::{Predicate, PredicateFilter},
6 stream_backoff::StreamBackoff,
7 },
8 watcher,
9};
10use kube_client::Resource;
11
12use crate::{reflector::store::Writer, utils::Reflect};
13
14use crate::watcher::DefaultBackoff;
15use backoff::backoff::Backoff;
16use futures::{Stream, TryStream};
17
18/// Extension trait for streams returned by [`watcher`](watcher()) or [`reflector`](crate::reflector::reflector)
19pub trait WatchStreamExt: Stream {
20 /// Apply the [`DefaultBackoff`] watcher [`Backoff`] policy
21 ///
22 /// This is recommended for controllers that want to play nicely with the apiserver.
23 fn default_backoff(self) -> StreamBackoff<Self, DefaultBackoff>
24 where
25 Self: TryStream + Sized,
26 {
27 StreamBackoff::new(self, DefaultBackoff::default())
28 }
29
30 /// Apply a specific [`Backoff`] policy to a [`Stream`] using [`StreamBackoff`]
31 fn backoff<B>(self, b: B) -> StreamBackoff<Self, B>
32 where
33 B: Backoff,
34 Self: TryStream + Sized,
35 {
36 StreamBackoff::new(self, b)
37 }
38
39 /// Decode a [`watcher()`] stream into a stream of applied objects
40 ///
41 /// All Added/Modified events are passed through, and critical errors bubble up.
42 fn applied_objects<K>(self) -> EventDecode<Self>
43 where
44 Self: Stream<Item = Result<watcher::Event<K>, watcher::Error>> + Sized,
45 {
46 EventDecode::new(self, false)
47 }
48
49 /// Decode a [`watcher()`] stream into a stream of touched objects
50 ///
51 /// All Added/Modified/Deleted events are passed through, and critical errors bubble up.
52 fn touched_objects<K>(self) -> EventDecode<Self>
53 where
54 Self: Stream<Item = Result<watcher::Event<K>, watcher::Error>> + Sized,
55 {
56 EventDecode::new(self, true)
57 }
58
59 /// Modify elements of a [`watcher()`] stream.
60 ///
61 /// Calls [`watcher::Event::modify()`] on every element.
62 /// Stream shorthand for `stream.map_ok(|event| { event.modify(f) })`.
63 ///
64 /// ```no_run
65 /// # use std::pin::pin;
66 /// # use futures::{Stream, StreamExt, TryStreamExt};
67 /// # use kube::{Api, Client, ResourceExt};
68 /// # use kube_runtime::{watcher, WatchStreamExt};
69 /// # use k8s_openapi::api::apps::v1::Deployment;
70 /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
71 /// # let client: kube::Client = todo!();
72 /// let deploys: Api<Deployment> = Api::all(client);
73 /// let mut truncated_deploy_stream = pin!(watcher(deploys, watcher::Config::default())
74 /// .modify(|deploy| {
75 /// deploy.managed_fields_mut().clear();
76 /// deploy.status = None;
77 /// })
78 /// .applied_objects());
79 ///
80 /// while let Some(d) = truncated_deploy_stream.try_next().await? {
81 /// println!("Truncated Deployment: '{:?}'", serde_json::to_string(&d)?);
82 /// }
83 /// # Ok(())
84 /// # }
85 /// ```
86 fn modify<F, K>(self, f: F) -> EventModify<Self, F>
87 where
88 Self: Stream<Item = Result<watcher::Event<K>, watcher::Error>> + Sized,
89 F: FnMut(&mut K),
90 {
91 EventModify::new(self, f)
92 }
93
94 /// Filter a stream based on on [`predicates`](crate::predicates).
95 ///
96 /// This will filter out repeat calls where the predicate returns the same result.
97 /// Common use case for this is to avoid repeat events for status updates
98 /// by filtering on [`predicates::generation`](crate::predicates::generation).
99 ///
100 /// ## Usage
101 /// ```no_run
102 /// # use std::pin::pin;
103 /// # use futures::{Stream, StreamExt, TryStreamExt};
104 /// use kube::{Api, Client, ResourceExt};
105 /// use kube_runtime::{watcher, WatchStreamExt, predicates};
106 /// use k8s_openapi::api::apps::v1::Deployment;
107 /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
108 /// # let client: kube::Client = todo!();
109 /// let deploys: Api<Deployment> = Api::default_namespaced(client);
110 /// let mut changed_deploys = pin!(watcher(deploys, watcher::Config::default())
111 /// .applied_objects()
112 /// .predicate_filter(predicates::generation));
113 ///
114 /// while let Some(d) = changed_deploys.try_next().await? {
115 /// println!("saw Deployment '{} with hitherto unseen generation", d.name_any());
116 /// }
117 /// # Ok(())
118 /// # }
119 /// ```
120 fn predicate_filter<K, P>(self, predicate: P) -> PredicateFilter<Self, K, P>
121 where
122 Self: Stream<Item = Result<K, watcher::Error>> + Sized,
123 K: Resource + 'static,
124 P: Predicate<K> + 'static,
125 {
126 PredicateFilter::new(self, predicate)
127 }
128
129 /// Reflect a [`watcher()`] stream into a [`Store`] through a [`Writer`]
130 ///
131 /// Returns the stream unmodified, but passes every [`watcher::Event`] through a [`Writer`].
132 /// This populates a [`Store`] as the stream is polled.
133 ///
134 /// ## Usage
135 /// ```no_run
136 /// # use futures::{Stream, StreamExt, TryStreamExt};
137 /// # use std::time::Duration;
138 /// # use tracing::{info, warn};
139 /// use kube::{Api, Client, ResourceExt};
140 /// use kube_runtime::{watcher, WatchStreamExt, reflector};
141 /// use k8s_openapi::api::apps::v1::Deployment;
142 /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
143 /// # let client: kube::Client = todo!();
144 ///
145 /// let deploys: Api<Deployment> = Api::default_namespaced(client);
146 /// let (reader, writer) = reflector::store::<Deployment>();
147 ///
148 /// tokio::spawn(async move {
149 /// // start polling the store once the reader is ready
150 /// reader.wait_until_ready().await.unwrap();
151 /// loop {
152 /// let names = reader.state().iter().map(|d| d.name_any()).collect::<Vec<_>>();
153 /// info!("Current {} deploys: {:?}", names.len(), names);
154 /// tokio::time::sleep(Duration::from_secs(10)).await;
155 /// }
156 /// });
157 ///
158 /// // configure the watcher stream and populate the store while polling
159 /// watcher(deploys, watcher::Config::default())
160 /// .reflect(writer)
161 /// .applied_objects()
162 /// .for_each(|res| async move {
163 /// match res {
164 /// Ok(o) => info!("saw {}", o.name_any()),
165 /// Err(e) => warn!("watcher error: {}", e),
166 /// }
167 /// })
168 /// .await;
169 ///
170 /// # Ok(())
171 /// # }
172 /// ```
173 ///
174 /// [`Store`]: crate::reflector::Store
175 fn reflect<K>(self, writer: Writer<K>) -> Reflect<Self, K>
176 where
177 Self: Stream<Item = watcher::Result<watcher::Event<K>>> + Sized,
178 K: Resource + Clone + 'static,
179 K::DynamicType: Eq + std::hash::Hash + Clone,
180 {
181 Reflect::new(self, writer)
182 }
183
184 /// Reflect a shared [`watcher()`] stream into a [`Store`] through a [`Writer`]
185 ///
186 /// Returns the stream unmodified, but passes every [`watcher::Event`]
187 /// through a [`Writer`]. This populates a [`Store`] as the stream is
188 /// polled. When the [`watcher::Event`] is not an error or a
189 /// [`watcher::Event::Deleted`] then its inner object will also be
190 /// propagated to subscribers.
191 ///
192 /// Subscribers can be created by calling [`subscribe()`] on a [`Writer`].
193 /// This will return a [`ReflectHandle`] stream that should be polled
194 /// independently. When the root stream is dropped, or it ends, all [`ReflectHandle`]s
195 /// subscribed to the stream will also terminate after all events yielded by
196 /// the root stream have been observed. This means [`ReflectHandle`] streams
197 /// can still be polled after the root stream has been dropped.
198 ///
199 /// **NB**: This adapter requires an
200 /// [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21)
201 /// feature
202 ///
203 /// ## Warning
204 ///
205 /// If the root [`Stream`] is not polled, [`ReflectHandle`] streams will
206 /// never receive any events. This will cause the streams to deadlock since
207 /// the root stream will apply backpressure when downstream readers are not
208 /// consuming events.
209 ///
210 ///
211 /// [`Store`]: crate::reflector::Store
212 /// [`subscribe()`]: crate::reflector::store::Writer::subscribe()
213 /// [`Stream`]: futures::stream::Stream
214 /// [`ReflectHandle`]: crate::reflector::dispatcher::ReflectHandle
215 /// ## Usage
216 /// ```no_run
217 /// # use futures::StreamExt;
218 /// # use std::time::Duration;
219 /// # use tracing::{info, warn};
220 /// use kube::{Api, ResourceExt};
221 /// use kube_runtime::{watcher, WatchStreamExt, reflector};
222 /// use k8s_openapi::api::apps::v1::Deployment;
223 /// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
224 /// # let client: kube::Client = todo!();
225 ///
226 /// let deploys: Api<Deployment> = Api::default_namespaced(client);
227 /// let subscriber_buf_sz = 100;
228 /// let (reader, writer) = reflector::store_shared::<Deployment>(subscriber_buf_sz);
229 /// let subscriber = writer.subscribe().unwrap();
230 ///
231 /// tokio::spawn(async move {
232 /// // start polling the store once the reader is ready
233 /// reader.wait_until_ready().await.unwrap();
234 /// loop {
235 /// let names = reader.state().iter().map(|d| d.name_any()).collect::<Vec<_>>();
236 /// info!("Current {} deploys: {:?}", names.len(), names);
237 /// tokio::time::sleep(Duration::from_secs(10)).await;
238 /// }
239 /// });
240 ///
241 /// tokio::spawn(async move {
242 /// // subscriber can be used to receive applied_objects
243 /// subscriber.for_each(|obj| async move {
244 /// info!("saw in subscriber {}", &obj.name_any())
245 /// }).await;
246 /// });
247 ///
248 /// // configure the watcher stream and populate the store while polling
249 /// watcher(deploys, watcher::Config::default())
250 /// .reflect_shared(writer)
251 /// .applied_objects()
252 /// .for_each(|res| async move {
253 /// match res {
254 /// Ok(o) => info!("saw in root stream {}", o.name_any()),
255 /// Err(e) => warn!("watcher error in root stream: {}", e),
256 /// }
257 /// })
258 /// .await;
259 ///
260 /// # Ok(())
261 /// # }
262 /// ```
263 #[cfg(feature = "unstable-runtime-subscribe")]
264 fn reflect_shared<K>(self, writer: Writer<K>) -> impl Stream<Item = Self::Item>
265 where
266 Self: Stream<Item = watcher::Result<watcher::Event<K>>> + Sized,
267 K: Resource + Clone + 'static,
268 K::DynamicType: Eq + std::hash::Hash + Clone,
269 {
270 crate::reflector(writer, self)
271 }
272}
273
274impl<St: ?Sized> WatchStreamExt for St where St: Stream {}
275
276// Compile tests
277#[cfg(test)]
278pub(crate) mod tests {
279 use super::watcher;
280 use crate::{predicates, WatchStreamExt as _};
281 use futures::prelude::*;
282 use k8s_openapi::api::core::v1::Pod;
283 use kube_client::{Api, Resource};
284
285 fn compile_type<T>() -> T {
286 unimplemented!("not called - compile test only")
287 }
288
289 pub fn assert_stream<T, K>(x: T) -> T
290 where
291 T: Stream<Item = watcher::Result<K>> + Send,
292 K: Resource + Clone + Send + 'static,
293 {
294 x
295 }
296
297 // not #[test] because this is only a compile check verification
298 #[allow(dead_code, unused_must_use)]
299 fn test_watcher_stream_type_drift() {
300 let pred_watch = watcher(compile_type::<Api<Pod>>(), Default::default())
301 .touched_objects()
302 .predicate_filter(predicates::generation)
303 .boxed();
304 assert_stream(pred_watch);
305 }
306}