kube_runtime/reflector/mod.rs
1//! Caches objects in memory
2
3mod dispatcher;
4mod object_ref;
5pub mod store;
6
7pub use self::{
8 dispatcher::ReflectHandle,
9 object_ref::{Extra as ObjectRefExtra, Lookup, ObjectRef},
10};
11use crate::watcher;
12use async_stream::stream;
13use futures::{Stream, StreamExt};
14use std::hash::Hash;
15#[cfg(feature = "unstable-runtime-subscribe")] pub use store::store_shared;
16pub use store::{Store, store};
17
18/// Cache objects from a [`watcher()`] stream into a local [`Store`]
19///
20/// Observes the raw `Stream` of [`watcher::Event`] objects, and modifies the cache.
21/// It passes the raw [`watcher()`] stream through unmodified.
22///
23/// ## Usage
24/// Create a [`Store`] through e.g. [`store::store()`]. The `writer` part is not-cloneable,
25/// and must be moved into the reflector. The `reader` part is the [`Store`] interface
26/// that you can send to other parts of your program as state.
27///
28/// The cache contains the last-seen state of objects,
29/// which may lag slightly behind the actual state.
30///
31/// ## Example
32///
33/// Infinite watch of [`Node`](k8s_openapi::api::core::v1::Node) resources with a certain label.
34///
35/// The `reader` part being passed around to a webserver is omitted.
36/// For examples see [version-rs](https://github.com/kube-rs/version-rs) for integration with [axum](https://github.com/tokio-rs/axum),
37/// or [controller-rs](https://github.com/kube-rs/controller-rs) for the similar controller integration with [actix-web](https://actix.rs/).
38///
39/// ```no_run
40/// use std::future::ready;
41/// use k8s_openapi::api::core::v1::Node;
42/// use kube::runtime::{reflector, watcher, WatchStreamExt, watcher::Config};
43/// use futures::StreamExt;
44/// # use kube::api::Api;
45/// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
46/// # let client: kube::Client = todo!();
47///
48/// let nodes: Api<Node> = Api::all(client);
49/// let node_filter = Config::default().labels("kubernetes.io/arch=amd64");
50/// let (reader, writer) = reflector::store();
51///
52/// // Create the infinite reflector stream
53/// let rf = reflector(writer, watcher(nodes, node_filter));
54///
55/// // !!! pass reader to your webserver/manager as state !!!
56///
57/// // Poll the stream (needed to keep the store up-to-date)
58/// let infinite_watch = rf.applied_objects().for_each(|o| { ready(()) });
59/// infinite_watch.await;
60/// # Ok(())
61/// # }
62/// ```
63///
64///
65/// ## Memory Usage
66///
67/// A reflector often constitutes one of the biggest components of a controller's memory use.
68/// Given a ~2000 pods cluster, a reflector saving everything (including injected sidecars, managed fields)
69/// can quickly consume a couple of hundred megabytes or more, depending on how much of this you are storing.
70///
71/// While generally acceptable, there are techniques you can leverage to reduce the memory usage
72/// depending on your use case.
73///
74/// 1. Reflect a [`PartialObjectMeta<K>`](kube_client::core::PartialObjectMeta) stream rather than a stream of `K`
75///
76/// You can send in a [`metadata_watcher()`](crate::watcher::metadata_watcher()) for a type rather than a [`watcher()`],
77/// and this can drop your memory usage by more than a factor of two,
78/// depending on the size of `K`. 60% reduction seen for `Pod`. Usage is otherwise identical.
79///
80/// 2. Use `modify` the raw [`watcher::Event`] object stream to clear unneeded properties
81///
82/// For instance, managed fields typically constitutes around half the size of `ObjectMeta` and can often be dropped:
83///
84/// ```no_run
85/// # use futures::TryStreamExt;
86/// # use kube::{ResourceExt, Api, runtime::watcher};
87/// # let api: Api<k8s_openapi::api::core::v1::Node> = todo!();
88/// let stream = watcher(api, Default::default()).map_ok(|ev| {
89/// ev.modify(|pod| {
90/// pod.managed_fields_mut().clear();
91/// pod.annotations_mut().clear();
92/// pod.status = None;
93/// })
94/// });
95/// ```
96/// The `stream` can then be passed to `reflector` causing smaller objects to be written to its store.
97/// Note that you **cannot drop everything**; you minimally need the spec properties your app relies on.
98/// Additionally, only `labels`, `annotations` and `managed_fields` are safe to drop from `ObjectMeta`.
99///
100/// For more information check out: <https://kube.rs/controllers/optimization/> for graphs and techniques.
101///
102/// ## Stream sharing
103///
104/// `reflector()` as an interface may optionally create a stream that can be
105/// shared with other components to help with resource usage.
106///
107/// To share a stream, the `Writer<K>` consumed by `reflector()` must be
108/// created through an interface that allows a store to be subscribed on, such
109/// as [`store_shared()`]. When the store supports being subscribed on, it will
110/// broadcast an event to all active listeners after caching any object
111/// contained in the event.
112pub fn reflector<K, W>(mut writer: store::Writer<K>, stream: W) -> impl Stream<Item = W::Item>
113where
114 K: Lookup + Clone,
115 K::DynamicType: Eq + Hash + Clone,
116 W: Stream<Item = watcher::Result<watcher::Event<K>>>,
117{
118 let mut stream = Box::pin(stream);
119 stream! {
120 while let Some(event) = stream.next().await {
121 match event {
122 Ok(ev) => {
123 writer.apply_watcher_event(&ev);
124 writer.dispatch_event(&ev).await;
125 yield Ok(ev);
126 },
127 Err(ev) => yield Err(ev)
128 }
129 }
130 }
131}
132
133#[cfg(test)]
134mod tests {
135 use super::{ObjectRef, reflector, store};
136 use crate::watcher;
137 use futures::{StreamExt, TryStreamExt, stream};
138 use k8s_openapi::{api::core::v1::ConfigMap, apimachinery::pkg::apis::meta::v1::ObjectMeta};
139 use rand::{
140 RngExt,
141 distr::{Bernoulli, Uniform},
142 };
143 use std::collections::{BTreeMap, HashMap};
144
145 #[tokio::test]
146 async fn reflector_applied_should_add_object() {
147 let store_w = store::Writer::default();
148 let store = store_w.as_reader();
149 let cm = ConfigMap {
150 metadata: ObjectMeta {
151 name: Some("a".to_string()),
152 ..ObjectMeta::default()
153 },
154 ..ConfigMap::default()
155 };
156 reflector(store_w, stream::iter(vec![Ok(watcher::Event::Apply(cm.clone()))]))
157 .map(|_| ())
158 .collect::<()>()
159 .await;
160 assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm));
161 }
162
163 #[tokio::test]
164 async fn reflector_applied_should_update_object() {
165 let store_w = store::Writer::default();
166 let store = store_w.as_reader();
167 let cm = ConfigMap {
168 metadata: ObjectMeta {
169 name: Some("a".to_string()),
170 ..ObjectMeta::default()
171 },
172 ..ConfigMap::default()
173 };
174 let updated_cm = ConfigMap {
175 data: Some({
176 let mut data = BTreeMap::new();
177 data.insert("data".to_string(), "present!".to_string());
178 data
179 }),
180 ..cm.clone()
181 };
182 reflector(
183 store_w,
184 stream::iter(vec![
185 Ok(watcher::Event::Apply(cm.clone())),
186 Ok(watcher::Event::Apply(updated_cm.clone())),
187 ]),
188 )
189 .map(|_| ())
190 .collect::<()>()
191 .await;
192 assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&updated_cm));
193 }
194
195 #[tokio::test]
196 async fn reflector_deleted_should_remove_object() {
197 let store_w = store::Writer::default();
198 let store = store_w.as_reader();
199 let cm = ConfigMap {
200 metadata: ObjectMeta {
201 name: Some("a".to_string()),
202 ..ObjectMeta::default()
203 },
204 ..ConfigMap::default()
205 };
206 reflector(
207 store_w,
208 stream::iter(vec![
209 Ok(watcher::Event::Apply(cm.clone())),
210 Ok(watcher::Event::Delete(cm.clone())),
211 ]),
212 )
213 .map(|_| ())
214 .collect::<()>()
215 .await;
216 assert_eq!(store.get(&ObjectRef::from_obj(&cm)), None);
217 }
218
219 #[tokio::test]
220 async fn reflector_restarted_should_clear_objects() {
221 let store_w = store::Writer::default();
222 let store = store_w.as_reader();
223 let cm_a = ConfigMap {
224 metadata: ObjectMeta {
225 name: Some("a".to_string()),
226 ..ObjectMeta::default()
227 },
228 ..ConfigMap::default()
229 };
230 let cm_b = ConfigMap {
231 metadata: ObjectMeta {
232 name: Some("b".to_string()),
233 ..ObjectMeta::default()
234 },
235 ..ConfigMap::default()
236 };
237 reflector(
238 store_w,
239 stream::iter(vec![
240 Ok(watcher::Event::Apply(cm_a.clone())),
241 Ok(watcher::Event::Init),
242 Ok(watcher::Event::InitApply(cm_b.clone())),
243 Ok(watcher::Event::InitDone),
244 ]),
245 )
246 .map(|_| ())
247 .collect::<()>()
248 .await;
249 assert_eq!(store.get(&ObjectRef::from_obj(&cm_a)), None);
250 assert_eq!(store.get(&ObjectRef::from_obj(&cm_b)).as_deref(), Some(&cm_b));
251 }
252
253 #[tokio::test]
254 async fn reflector_store_should_not_contain_duplicates() {
255 let mut rng = rand::rng();
256 let item_dist = Uniform::new(0_u8, 100).unwrap();
257 let deleted_dist = Bernoulli::new(0.40).unwrap();
258 let store_w = store::Writer::default();
259 let store = store_w.as_reader();
260 reflector(
261 store_w,
262 stream::iter((0_u32..100_000).map(|num| {
263 let item = rng.sample(item_dist);
264 let deleted = rng.sample(deleted_dist);
265 let obj = ConfigMap {
266 metadata: ObjectMeta {
267 name: Some(item.to_string()),
268 resource_version: Some(num.to_string()),
269 ..ObjectMeta::default()
270 },
271 ..ConfigMap::default()
272 };
273 Ok(if deleted {
274 watcher::Event::Delete(obj)
275 } else {
276 watcher::Event::Apply(obj)
277 })
278 })),
279 )
280 .map_ok(|_| ())
281 .try_collect::<()>()
282 .await
283 .unwrap();
284
285 let mut seen_objects = HashMap::new();
286 for obj in store.state() {
287 assert_eq!(seen_objects.get(obj.metadata.name.as_ref().unwrap()), None);
288 seen_objects.insert(obj.metadata.name.clone().unwrap(), obj);
289 }
290 }
291}