pub fn watcher<K>(
api: Api<K>,
watcher_config: Config,
) -> impl Stream<Item = Result<Event<K>, Error>> + Send
Expand description
Watches a Kubernetes Resource for changes continuously
Compared to Api::watch
, this automatically tries to recover the stream upon errors.
Errors from the underlying watch are propagated, after which the stream will go into recovery mode on the next poll.
You can apply your own backoff by not polling the stream for a duration after errors.
Keep in mind that some TryStream
combinators (such as
try_for_each
and try_concat
)
will terminate eagerly as soon as they receive an Err
.
This is intended to provide a safe and atomic input interface for a state store like a reflector
.
Direct users may want to flatten composite events via WatchStreamExt
:
use kube::{
api::{Api, ResourceExt}, Client,
runtime::{watcher, WatchStreamExt}
};
use k8s_openapi::api::core::v1::Pod;
use futures::TryStreamExt;
#[tokio::main]
async fn main() -> Result<(), watcher::Error> {
let client = Client::try_default().await.unwrap();
let pods: Api<Pod> = Api::namespaced(client, "apps");
watcher(pods, watcher::Config::default()).applied_objects()
.try_for_each(|p| async move {
println!("Applied: {}", p.name_any());
Ok(())
})
.await?;
Ok(())
}
ยงRecovery
The stream will attempt to be recovered on the next poll after an Err
is returned.
This will normally happen immediately, but you can use StreamBackoff
to introduce an artificial delay. default_backoff
returns a suitable default set of parameters.
If the watch connection is interrupted, then watcher
will attempt to restart the watch using the last
resource version
that we have seen on the stream. If this is successful then the stream is simply resumed from where it left off.
If this fails because the resource version is no longer valid then we start over with a new stream, starting with
an [Event::Restarted
]. The internals mechanics of recovery should be considered an implementation detail.