Skip to main content

kube_runtime/reflector/
mod.rs

1//! Caches objects in memory
2
3mod dispatcher;
4mod object_ref;
5pub mod store;
6
7pub use self::{
8    dispatcher::ReflectHandle,
9    object_ref::{Extra as ObjectRefExtra, Lookup, ObjectRef},
10};
11use crate::watcher;
12use async_stream::stream;
13use futures::{Stream, StreamExt};
14use std::hash::Hash;
15#[cfg(feature = "unstable-runtime-subscribe")] pub use store::store_shared;
16pub use store::{Store, store};
17
18/// Cache objects from a [`watcher()`] stream into a local [`Store`]
19///
20/// Observes the raw `Stream` of [`watcher::Event`] objects, and modifies the cache.
21/// It passes the raw [`watcher()`] stream through unmodified.
22///
23/// ## Usage
24/// Create a [`Store`] through e.g. [`store::store()`]. The `writer` part is not-cloneable,
25/// and must be moved into the reflector. The `reader` part is the [`Store`] interface
26/// that you can send to other parts of your program as state.
27///
28/// The cache contains the last-seen state of objects,
29/// which may lag slightly behind the actual state.
30///
31/// ## Example
32///
33/// Infinite watch of [`Node`](k8s_openapi::api::core::v1::Node) resources with a certain label.
34///
35/// The `reader` part being passed around to a webserver is omitted.
36/// For examples see [version-rs](https://github.com/kube-rs/version-rs) for integration with [axum](https://github.com/tokio-rs/axum),
37/// or [controller-rs](https://github.com/kube-rs/controller-rs) for the similar controller integration with [actix-web](https://actix.rs/).
38///
39/// ```no_run
40/// use std::future::ready;
41/// use k8s_openapi::api::core::v1::Node;
42/// use kube::runtime::{reflector, watcher, WatchStreamExt, watcher::Config};
43/// use futures::StreamExt;
44/// # use kube::api::Api;
45/// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
46/// # let client: kube::Client = todo!();
47///
48/// let nodes: Api<Node> = Api::all(client);
49/// let node_filter = Config::default().labels("kubernetes.io/arch=amd64");
50/// let (reader, writer) = reflector::store();
51///
52/// // Create the infinite reflector stream
53/// let rf = reflector(writer, watcher(nodes, node_filter));
54///
55/// // !!! pass reader to your webserver/manager as state !!!
56///
57/// // Poll the stream (needed to keep the store up-to-date)
58/// let infinite_watch = rf.applied_objects().for_each(|o| { ready(()) });
59/// infinite_watch.await;
60/// # Ok(())
61/// # }
62/// ```
63///
64///
65/// ## Memory Usage
66///
67/// A reflector often constitutes one of the biggest components of a controller's memory use.
68/// Given a ~2000 pods cluster, a reflector saving everything (including injected sidecars, managed fields)
69/// can quickly consume a couple of hundred megabytes or more, depending on how much of this you are storing.
70///
71/// While generally acceptable, there are techniques you can leverage to reduce the memory usage
72/// depending on your use case.
73///
74/// 1. Reflect a [`PartialObjectMeta<K>`](kube_client::core::PartialObjectMeta) stream rather than a stream of `K`
75///
76/// You can send in a [`metadata_watcher()`](crate::watcher::metadata_watcher()) for a type rather than a [`watcher()`],
77/// and this can drop your memory usage by more than a factor of two,
78/// depending on the size of `K`. 60% reduction seen for `Pod`. Usage is otherwise identical.
79///
80/// 2. Use `modify` the raw [`watcher::Event`] object stream to clear unneeded properties
81///
82/// For instance, managed fields typically constitutes around half the size of `ObjectMeta` and can often be dropped:
83///
84/// ```no_run
85/// # use futures::TryStreamExt;
86/// # use kube::{ResourceExt, Api, runtime::watcher};
87/// # let api: Api<k8s_openapi::api::core::v1::Node> = todo!();
88/// let stream = watcher(api, Default::default()).map_ok(|ev| {
89///     ev.modify(|pod| {
90///         pod.managed_fields_mut().clear();
91///         pod.annotations_mut().clear();
92///         pod.status = None;
93///     })
94/// });
95/// ```
96/// The `stream` can then be passed to `reflector` causing smaller objects to be written to its store.
97/// Note that you **cannot drop everything**; you minimally need the spec properties your app relies on.
98/// Additionally, only `labels`, `annotations` and `managed_fields` are safe to drop from `ObjectMeta`.
99///
100/// For more information check out: <https://kube.rs/controllers/optimization/> for graphs and techniques.
101///
102/// ## Stream sharing
103///
104/// `reflector()` as an interface may optionally create a stream that can be
105/// shared with other components to help with resource usage.
106///
107/// To share a stream, the `Writer<K>` consumed by `reflector()` must be
108/// created through an interface that allows a store to be subscribed on, such
109/// as [`store_shared()`]. When the store supports being subscribed on, it will
110/// broadcast an event to all active listeners after caching any object
111/// contained in the event.
112pub fn reflector<K, W>(mut writer: store::Writer<K>, stream: W) -> impl Stream<Item = W::Item>
113where
114    K: Lookup + Clone,
115    K::DynamicType: Eq + Hash + Clone,
116    W: Stream<Item = watcher::Result<watcher::Event<K>>>,
117{
118    let mut stream = Box::pin(stream);
119    stream! {
120        while let Some(event) = stream.next().await {
121            match event {
122                Ok(ev) => {
123                    writer.apply_watcher_event(&ev);
124                    writer.dispatch_event(&ev).await;
125                    yield Ok(ev);
126                },
127                Err(ev) => yield Err(ev)
128            }
129        }
130    }
131}
132
133#[cfg(test)]
134mod tests {
135    use super::{ObjectRef, reflector, store};
136    use crate::watcher;
137    use futures::{StreamExt, TryStreamExt, stream};
138    use k8s_openapi::{api::core::v1::ConfigMap, apimachinery::pkg::apis::meta::v1::ObjectMeta};
139    use rand::{
140        RngExt,
141        distr::{Bernoulli, Uniform},
142    };
143    use std::collections::{BTreeMap, HashMap};
144
145    #[tokio::test]
146    async fn reflector_applied_should_add_object() {
147        let store_w = store::Writer::default();
148        let store = store_w.as_reader();
149        let cm = ConfigMap {
150            metadata: ObjectMeta {
151                name: Some("a".to_string()),
152                ..ObjectMeta::default()
153            },
154            ..ConfigMap::default()
155        };
156        reflector(store_w, stream::iter(vec![Ok(watcher::Event::Apply(cm.clone()))]))
157            .map(|_| ())
158            .collect::<()>()
159            .await;
160        assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&cm));
161    }
162
163    #[tokio::test]
164    async fn reflector_applied_should_update_object() {
165        let store_w = store::Writer::default();
166        let store = store_w.as_reader();
167        let cm = ConfigMap {
168            metadata: ObjectMeta {
169                name: Some("a".to_string()),
170                ..ObjectMeta::default()
171            },
172            ..ConfigMap::default()
173        };
174        let updated_cm = ConfigMap {
175            data: Some({
176                let mut data = BTreeMap::new();
177                data.insert("data".to_string(), "present!".to_string());
178                data
179            }),
180            ..cm.clone()
181        };
182        reflector(
183            store_w,
184            stream::iter(vec![
185                Ok(watcher::Event::Apply(cm.clone())),
186                Ok(watcher::Event::Apply(updated_cm.clone())),
187            ]),
188        )
189        .map(|_| ())
190        .collect::<()>()
191        .await;
192        assert_eq!(store.get(&ObjectRef::from_obj(&cm)).as_deref(), Some(&updated_cm));
193    }
194
195    #[tokio::test]
196    async fn reflector_deleted_should_remove_object() {
197        let store_w = store::Writer::default();
198        let store = store_w.as_reader();
199        let cm = ConfigMap {
200            metadata: ObjectMeta {
201                name: Some("a".to_string()),
202                ..ObjectMeta::default()
203            },
204            ..ConfigMap::default()
205        };
206        reflector(
207            store_w,
208            stream::iter(vec![
209                Ok(watcher::Event::Apply(cm.clone())),
210                Ok(watcher::Event::Delete(cm.clone())),
211            ]),
212        )
213        .map(|_| ())
214        .collect::<()>()
215        .await;
216        assert_eq!(store.get(&ObjectRef::from_obj(&cm)), None);
217    }
218
219    #[tokio::test]
220    async fn reflector_restarted_should_clear_objects() {
221        let store_w = store::Writer::default();
222        let store = store_w.as_reader();
223        let cm_a = ConfigMap {
224            metadata: ObjectMeta {
225                name: Some("a".to_string()),
226                ..ObjectMeta::default()
227            },
228            ..ConfigMap::default()
229        };
230        let cm_b = ConfigMap {
231            metadata: ObjectMeta {
232                name: Some("b".to_string()),
233                ..ObjectMeta::default()
234            },
235            ..ConfigMap::default()
236        };
237        reflector(
238            store_w,
239            stream::iter(vec![
240                Ok(watcher::Event::Apply(cm_a.clone())),
241                Ok(watcher::Event::Init),
242                Ok(watcher::Event::InitApply(cm_b.clone())),
243                Ok(watcher::Event::InitDone),
244            ]),
245        )
246        .map(|_| ())
247        .collect::<()>()
248        .await;
249        assert_eq!(store.get(&ObjectRef::from_obj(&cm_a)), None);
250        assert_eq!(store.get(&ObjectRef::from_obj(&cm_b)).as_deref(), Some(&cm_b));
251    }
252
253    #[tokio::test]
254    async fn reflector_store_should_not_contain_duplicates() {
255        let mut rng = rand::rng();
256        let item_dist = Uniform::new(0_u8, 100).unwrap();
257        let deleted_dist = Bernoulli::new(0.40).unwrap();
258        let store_w = store::Writer::default();
259        let store = store_w.as_reader();
260        reflector(
261            store_w,
262            stream::iter((0_u32..100_000).map(|num| {
263                let item = rng.sample(item_dist);
264                let deleted = rng.sample(deleted_dist);
265                let obj = ConfigMap {
266                    metadata: ObjectMeta {
267                        name: Some(item.to_string()),
268                        resource_version: Some(num.to_string()),
269                        ..ObjectMeta::default()
270                    },
271                    ..ConfigMap::default()
272                };
273                Ok(if deleted {
274                    watcher::Event::Delete(obj)
275                } else {
276                    watcher::Event::Apply(obj)
277                })
278            })),
279        )
280        .map_ok(|_| ())
281        .try_collect::<()>()
282        .await
283        .unwrap();
284
285        let mut seen_objects = HashMap::new();
286        for obj in store.state() {
287            assert_eq!(seen_objects.get(obj.metadata.name.as_ref().unwrap()), None);
288            seen_objects.insert(obj.metadata.name.clone().unwrap(), obj);
289        }
290    }
291}