Trait kube_runtime::utils::WatchStreamExt
source · pub trait WatchStreamExt: Stream {
// Provided methods
fn default_backoff(self) -> StreamBackoff<Self, DefaultBackoff>
where Self: TryStream + Sized { ... }
fn backoff<B>(self, b: B) -> StreamBackoff<Self, B>
where B: Backoff,
Self: TryStream + Sized { ... }
fn applied_objects<K>(self) -> EventFlatten<Self>
where Self: Stream<Item = Result<Event<K>, Error>> + Sized { ... }
fn touched_objects<K>(self) -> EventFlatten<Self>
where Self: Stream<Item = Result<Event<K>, Error>> + Sized { ... }
fn modify<F, K>(self, f: F) -> EventModify<Self, F>
where Self: Stream<Item = Result<Event<K>, Error>> + Sized,
F: FnMut(&mut K) { ... }
fn reflect<K>(self, writer: Writer<K>) -> Reflect<Self, K>
where Self: Stream<Item = Result<Event<K>>> + Sized,
K: Resource + Clone + 'static,
K::DynamicType: Eq + Hash + Clone { ... }
}
Provided Methods§
sourcefn default_backoff(self) -> StreamBackoff<Self, DefaultBackoff>
fn default_backoff(self) -> StreamBackoff<Self, DefaultBackoff>
Apply the DefaultBackoff
watcher Backoff
policy
This is recommended for controllers that want to play nicely with the apiserver.
sourcefn backoff<B>(self, b: B) -> StreamBackoff<Self, B>
fn backoff<B>(self, b: B) -> StreamBackoff<Self, B>
Apply a specific Backoff
policy to a Stream
using StreamBackoff
sourcefn applied_objects<K>(self) -> EventFlatten<Self>
fn applied_objects<K>(self) -> EventFlatten<Self>
Flatten a watcher()
stream into a stream of applied objects
All Added/Modified events are passed through, and critical errors bubble up.
sourcefn touched_objects<K>(self) -> EventFlatten<Self>
fn touched_objects<K>(self) -> EventFlatten<Self>
Flatten a watcher()
stream into a stream of touched objects
All Added/Modified/Deleted events are passed through, and critical errors bubble up.
sourcefn modify<F, K>(self, f: F) -> EventModify<Self, F>
fn modify<F, K>(self, f: F) -> EventModify<Self, F>
Modify elements of a watcher()
stream.
Calls watcher::Event::modify()
on every element.
Stream shorthand for stream.map_ok(|event| { event.modify(f) })
.
let deploys: Api<Deployment> = Api::all(client);
let mut truncated_deploy_stream = pin!(watcher(deploys, watcher::Config::default())
.modify(|deploy| {
deploy.managed_fields_mut().clear();
deploy.status = None;
})
.applied_objects());
while let Some(d) = truncated_deploy_stream.try_next().await? {
println!("Truncated Deployment: '{:?}'", serde_json::to_string(&d)?);
}
sourcefn reflect<K>(self, writer: Writer<K>) -> Reflect<Self, K>
fn reflect<K>(self, writer: Writer<K>) -> Reflect<Self, K>
Reflect a watcher()
stream into a Store
through a Writer
Returns the stream unmodified, but passes every watcher::Event
through a Writer
.
This populates a Store
as the stream is polled.
§Usage
use kube::{Api, Client, ResourceExt};
use kube_runtime::{watcher, WatchStreamExt, reflector};
use k8s_openapi::api::apps::v1::Deployment;
let deploys: Api<Deployment> = Api::default_namespaced(client);
let (reader, writer) = reflector::store::<Deployment>();
tokio::spawn(async move {
// start polling the store once the reader is ready
reader.wait_until_ready().await.unwrap();
loop {
let names = reader.state().iter().map(|d| d.name_any()).collect::<Vec<_>>();
info!("Current {} deploys: {:?}", names.len(), names);
tokio::time::sleep(Duration::from_secs(10)).await;
}
});
// configure the watcher stream and populate the store while polling
watcher(deploys, watcher::Config::default())
.reflect(writer)
.applied_objects()
.for_each(|res| async move {
match res {
Ok(o) => info!("saw {}", o.name_any()),
Err(e) => warn!("watcher error: {}", e),
}
})
.await;