Function kube_runtime::watcher::watcher

source ·
pub fn watcher<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
    api: Api<K>,
    watcher_config: Config,
) -> impl Stream<Item = Result<Event<K>>> + Send
Expand description

Watches a Kubernetes Resource for changes continuously

Compared to Api::watch, this automatically tries to recover the stream upon errors.

Errors from the underlying watch are propagated, after which the stream will go into recovery mode on the next poll. You can apply your own backoff by not polling the stream for a duration after errors. Keep in mind that some TryStream combinators (such as try_for_each and try_concat) will terminate eagerly as soon as they receive an Err.

This is intended to provide a safe and atomic input interface for a state store like a reflector. Direct users may want to flatten composite events via WatchStreamExt:

use kube::{
  api::{Api, ResourceExt}, Client,
  runtime::{watcher, WatchStreamExt}
};
use k8s_openapi::api::core::v1::Pod;
use futures::TryStreamExt;
#[tokio::main]
async fn main() -> Result<(), watcher::Error> {
    let client = Client::try_default().await.unwrap();
    let pods: Api<Pod> = Api::namespaced(client, "apps");

    watcher(pods, watcher::Config::default()).applied_objects()
        .try_for_each(|p| async move {
         println!("Applied: {}", p.name_any());
            Ok(())
        })
        .await?;
   Ok(())
}

§Recovery

The stream will attempt to be recovered on the next poll after an Err is returned. This will normally happen immediately, but you can use StreamBackoff to introduce an artificial delay. default_backoff returns a suitable default set of parameters.

If the watch connection is interrupted, then watcher will attempt to restart the watch using the last resource version that we have seen on the stream. If this is successful then the stream is simply resumed from where it left off. If this fails because the resource version is no longer valid then we start over with a new stream, starting with an [Event::Restarted]. The internals mechanics of recovery should be considered an implementation detail.