use core::{
pin::Pin,
task::{Context, Poll},
};
use std::{fmt::Debug, sync::Arc};
use derivative::Derivative;
use futures::Stream;
use pin_project::pin_project;
use std::task::ready;
use crate::reflector::{ObjectRef, Store};
use async_broadcast::{InactiveReceiver, Receiver, Sender};
use super::Lookup;
#[derive(Derivative)]
#[derivative(Debug(bound = "K: Debug, K::DynamicType: Debug"), Clone)]
pub(crate) struct Dispatcher<K>
where
K: Lookup + Clone + 'static,
K::DynamicType: Eq + std::hash::Hash + Clone,
{
dispatch_tx: Sender<ObjectRef<K>>,
_dispatch_rx: InactiveReceiver<ObjectRef<K>>,
}
impl<K> Dispatcher<K>
where
K: Lookup + Clone + 'static,
K::DynamicType: Eq + std::hash::Hash + Clone,
{
pub(crate) fn new(buf_size: usize) -> Dispatcher<K> {
let (mut dispatch_tx, dispatch_rx) = async_broadcast::broadcast(buf_size);
dispatch_tx.set_await_active(false);
Self {
dispatch_tx,
_dispatch_rx: dispatch_rx.deactivate(),
}
}
pub(crate) async fn broadcast(&mut self, obj_ref: ObjectRef<K>) {
let _ = self.dispatch_tx.broadcast_direct(obj_ref).await;
}
pub(crate) fn subscribe(&self, reader: Store<K>) -> ReflectHandle<K> {
ReflectHandle::new(reader, self.dispatch_tx.new_receiver())
}
}
#[pin_project]
pub struct ReflectHandle<K>
where
K: Lookup + Clone + 'static,
K::DynamicType: Eq + std::hash::Hash + Clone,
{
#[pin]
rx: Receiver<ObjectRef<K>>,
reader: Store<K>,
}
impl<K> Clone for ReflectHandle<K>
where
K: Lookup + Clone + 'static,
K::DynamicType: Eq + std::hash::Hash + Clone,
{
fn clone(&self) -> Self {
ReflectHandle::new(self.reader.clone(), self.rx.clone())
}
}
impl<K> ReflectHandle<K>
where
K: Lookup + Clone,
K::DynamicType: Eq + std::hash::Hash + Clone,
{
pub(super) fn new(reader: Store<K>, rx: Receiver<ObjectRef<K>>) -> ReflectHandle<K> {
Self { rx, reader }
}
#[must_use]
pub fn reader(&self) -> Store<K> {
self.reader.clone()
}
}
impl<K> Stream for ReflectHandle<K>
where
K: Lookup + Clone,
K::DynamicType: Eq + std::hash::Hash + Clone + Default,
{
type Item = Arc<K>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
match ready!(this.rx.as_mut().poll_next(cx)) {
Some(obj_ref) => this
.reader
.get(&obj_ref)
.map_or(Poll::Pending, |obj| Poll::Ready(Some(obj))),
None => Poll::Ready(None),
}
}
}
#[cfg(feature = "unstable-runtime-subscribe")]
#[cfg(test)]
pub(crate) mod test {
use crate::{
watcher::{Error, Event},
WatchStreamExt,
};
use std::{sync::Arc, task::Poll};
use crate::reflector;
use futures::{pin_mut, poll, stream, StreamExt};
use k8s_openapi::api::core::v1::Pod;
fn testpod(name: &str) -> Pod {
let mut pod = Pod::default();
pod.metadata.name = Some(name.to_string());
pod
}
#[tokio::test]
async fn events_are_passed_through() {
let foo = testpod("foo");
let bar = testpod("bar");
let st = stream::iter([
Ok(Event::Apply(foo.clone())),
Err(Error::NoResourceVersion),
Ok(Event::Init),
Ok(Event::InitApply(foo)),
Ok(Event::InitApply(bar)),
Ok(Event::InitDone),
]);
let (reader, writer) = reflector::store_shared(10);
let reflect = st.reflect_shared(writer);
pin_mut!(reflect);
assert_eq!(reader.len(), 0);
assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::Apply(_))))
));
assert_eq!(reader.len(), 1);
assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Err(Error::NoResourceVersion)))
));
assert_eq!(reader.len(), 1);
let restarted = poll!(reflect.next());
assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::Init)))));
assert_eq!(reader.len(), 1);
let restarted = poll!(reflect.next());
assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::InitApply(_))))));
assert_eq!(reader.len(), 1);
let restarted = poll!(reflect.next());
assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::InitApply(_))))));
assert_eq!(reader.len(), 1);
let restarted = poll!(reflect.next());
assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::InitDone)))));
assert_eq!(reader.len(), 2);
assert!(matches!(poll!(reflect.next()), Poll::Ready(None)));
assert_eq!(reader.len(), 2);
}
#[tokio::test]
async fn readers_yield_touched_objects() {
let foo = testpod("foo");
let bar = testpod("bar");
let st = stream::iter([
Ok(Event::Delete(foo.clone())),
Ok(Event::Apply(foo.clone())),
Err(Error::NoResourceVersion),
Ok(Event::Init),
Ok(Event::InitApply(foo.clone())),
Ok(Event::InitApply(bar.clone())),
Ok(Event::InitDone),
]);
let foo = Arc::new(foo);
let _bar = Arc::new(bar);
let (_, writer) = reflector::store_shared(10);
let subscriber = writer.subscribe().unwrap();
let reflect = st.reflect_shared(writer);
pin_mut!(reflect);
pin_mut!(subscriber);
assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::Delete(_))))
));
assert_eq!(poll!(subscriber.next()), Poll::Pending);
assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::Apply(_))))
));
assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone())));
assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Err(Error::NoResourceVersion)))
));
assert!(matches!(poll!(subscriber.next()), Poll::Pending));
assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::Init)))
));
assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::InitApply(_))))
));
assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::InitApply(_))))
));
assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::InitDone)))
));
assert!(matches!(poll!(subscriber.next()), Poll::Ready(Some(_))));
assert!(matches!(poll!(subscriber.next()), Poll::Ready(Some(_))));
assert!(matches!(poll!(reflect.next()), Poll::Ready(None)));
assert_eq!(poll!(subscriber.next()), Poll::Ready(None));
}
#[tokio::test]
async fn readers_yield_when_tx_drops() {
let foo = testpod("foo");
let bar = testpod("bar");
let st = stream::iter([
Ok(Event::Apply(foo.clone())),
Ok(Event::Init),
Ok(Event::InitApply(foo.clone())),
Ok(Event::InitApply(bar.clone())),
Ok(Event::InitDone),
]);
let foo = Arc::new(foo);
let _bar = Arc::new(bar);
let (_, writer) = reflector::store_shared(10);
let subscriber = writer.subscribe().unwrap();
let mut reflect = Box::pin(st.reflect_shared(writer));
pin_mut!(subscriber);
assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::Apply(_))))
));
assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone())));
assert_eq!(poll!(subscriber.next()), Poll::Pending);
assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::Init)))
));
assert_eq!(poll!(subscriber.next()), Poll::Pending);
assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::InitApply(_))))
));
assert_eq!(poll!(subscriber.next()), Poll::Pending);
assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::InitApply(_))))
));
assert_eq!(poll!(subscriber.next()), Poll::Pending);
assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::InitDone)))
));
drop(reflect);
assert!(matches!(poll!(subscriber.next()), Poll::Ready(Some(_))));
assert!(matches!(poll!(subscriber.next()), Poll::Ready(Some(_))));
assert_eq!(poll!(subscriber.next()), Poll::Ready(None));
}
#[tokio::test]
async fn reflect_applies_backpressure() {
let foo = testpod("foo");
let bar = testpod("bar");
let st = stream::iter([
Ok(Event::Apply(foo.clone())),
Ok(Event::Apply(bar.clone())),
Ok(Event::Apply(foo.clone())),
]);
let foo = Arc::new(foo);
let bar = Arc::new(bar);
let (_, writer) = reflector::store_shared(1);
let subscriber = writer.subscribe().unwrap();
let subscriber_slow = writer.subscribe().unwrap();
let reflect = st.reflect_shared(writer);
pin_mut!(reflect);
pin_mut!(subscriber);
pin_mut!(subscriber_slow);
assert_eq!(poll!(subscriber.next()), Poll::Pending);
assert_eq!(poll!(subscriber_slow.next()), Poll::Pending);
assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::Apply(_))))
));
assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone())));
assert!(matches!(poll!(reflect.next()), Poll::Pending));
assert_eq!(poll!(subscriber.next()), Poll::Pending);
assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(Some(foo.clone())));
assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::Apply(_))))
));
assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(bar.clone())));
assert!(matches!(poll!(reflect.next()), Poll::Pending));
assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(Some(bar.clone())));
assert!(matches!(
poll!(reflect.next()),
Poll::Ready(Some(Ok(Event::Apply(_))))
));
assert!(matches!(poll!(reflect.next()), Poll::Ready(None)));
assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone())));
assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(Some(foo.clone())));
assert_eq!(poll!(subscriber.next()), Poll::Ready(None));
assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(None));
}
}