kube::runtime::utils

Trait WatchStreamExt

Source
pub trait WatchStreamExt: Stream {
    // Provided methods
    fn default_backoff(self) -> StreamBackoff<Self, DefaultBackoff>
       where Self: Sized + TryStream { ... }
    fn backoff<B>(self, b: B) -> StreamBackoff<Self, B>
       where B: Backoff,
             Self: Sized + TryStream { ... }
    fn applied_objects<K>(self) -> EventFlatten<Self>
       where Self: Sized + Stream<Item = Result<Event<K>, Error>> { ... }
    fn touched_objects<K>(self) -> EventFlatten<Self>
       where Self: Sized + Stream<Item = Result<Event<K>, Error>> { ... }
    fn modify<F, K>(self, f: F) -> EventModify<Self, F>
       where Self: Sized + Stream<Item = Result<Event<K>, Error>>,
             F: FnMut(&mut K) { ... }
    fn reflect<K>(self, writer: Writer<K>) -> Reflect<Self, K>
       where Self: Sized + Stream<Item = Result<Event<K>, Error>>,
             K: Resource + Clone + 'static,
             <K as Resource>::DynamicType: Eq + Hash + Clone { ... }
}
Expand description

Extension trait for streams returned by watcher or reflector

Provided Methods§

Source

fn default_backoff(self) -> StreamBackoff<Self, DefaultBackoff>
where Self: Sized + TryStream,

Apply the DefaultBackoff watcher Backoff policy

This is recommended for controllers that want to play nicely with the apiserver.

Source

fn backoff<B>(self, b: B) -> StreamBackoff<Self, B>
where B: Backoff, Self: Sized + TryStream,

Apply a specific Backoff policy to a Stream using StreamBackoff

Source

fn applied_objects<K>(self) -> EventFlatten<Self>
where Self: Sized + Stream<Item = Result<Event<K>, Error>>,

Flatten a watcher() stream into a stream of applied objects

All Added/Modified events are passed through, and critical errors bubble up.

Source

fn touched_objects<K>(self) -> EventFlatten<Self>
where Self: Sized + Stream<Item = Result<Event<K>, Error>>,

Flatten a watcher() stream into a stream of touched objects

All Added/Modified/Deleted events are passed through, and critical errors bubble up.

Source

fn modify<F, K>(self, f: F) -> EventModify<Self, F>
where Self: Sized + Stream<Item = Result<Event<K>, Error>>, F: FnMut(&mut K),

Modify elements of a watcher() stream.

Calls watcher::Event::modify() on every element. Stream shorthand for stream.map_ok(|event| { event.modify(f) }).

let deploys: Api<Deployment> = Api::all(client);
let mut truncated_deploy_stream = pin!(watcher(deploys, watcher::Config::default())
    .modify(|deploy| {
        deploy.managed_fields_mut().clear();
        deploy.status = None;
    })
    .applied_objects());

while let Some(d) = truncated_deploy_stream.try_next().await? {
   println!("Truncated Deployment: '{:?}'", serde_json::to_string(&d)?);
}
Source

fn reflect<K>(self, writer: Writer<K>) -> Reflect<Self, K>
where Self: Sized + Stream<Item = Result<Event<K>, Error>>, K: Resource + Clone + 'static, <K as Resource>::DynamicType: Eq + Hash + Clone,

Reflect a watcher() stream into a Store through a Writer

Returns the stream unmodified, but passes every watcher::Event through a Writer. This populates a Store as the stream is polled.

§Usage
use kube::{Api, Client, ResourceExt};
use kube_runtime::{watcher, WatchStreamExt, reflector};
use k8s_openapi::api::apps::v1::Deployment;

let deploys: Api<Deployment> = Api::default_namespaced(client);
let (reader, writer) = reflector::store::<Deployment>();

tokio::spawn(async move {
    // start polling the store once the reader is ready
    reader.wait_until_ready().await.unwrap();
    loop {
        let names = reader.state().iter().map(|d| d.name_any()).collect::<Vec<_>>();
        info!("Current {} deploys: {:?}", names.len(), names);
        tokio::time::sleep(Duration::from_secs(10)).await;
    }
});

// configure the watcher stream and populate the store while polling
watcher(deploys, watcher::Config::default())
    .reflect(writer)
    .applied_objects()
    .for_each(|res| async move {
        match res {
            Ok(o) => info!("saw {}", o.name_any()),
            Err(e) => warn!("watcher error: {}", e),
        }
    })
    .await;

Implementors§

Source§

impl<St> WatchStreamExt for St
where St: Stream + ?Sized,