1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
use core::{
    pin::Pin,
    task::{Context, Poll},
};
use std::{fmt::Debug, sync::Arc};

use derivative::Derivative;
use futures::Stream;
use pin_project::pin_project;
use std::task::ready;

use crate::reflector::{ObjectRef, Store};
use async_broadcast::{InactiveReceiver, Receiver, Sender};

use super::Lookup;

#[derive(Derivative)]
#[derivative(Debug(bound = "K: Debug, K::DynamicType: Debug"), Clone)]
// A helper type that holds a broadcast transmitter and a broadcast receiver,
// used to fan-out events from a root stream to multiple listeners.
pub(crate) struct Dispatcher<K>
where
    K: Lookup + Clone + 'static,
    K::DynamicType: Eq + std::hash::Hash + Clone,
{
    dispatch_tx: Sender<ObjectRef<K>>,
    // An inactive reader that prevents the channel from closing until the
    // writer is dropped.
    _dispatch_rx: InactiveReceiver<ObjectRef<K>>,
}

impl<K> Dispatcher<K>
where
    K: Lookup + Clone + 'static,
    K::DynamicType: Eq + std::hash::Hash + Clone,
{
    /// Creates and returns a new self that wraps a broadcast sender and an
    /// inactive broadcast receiver
    ///
    /// A buffer size is required to create the underlying broadcast channel.
    /// Messages will be buffered until all active readers have received a copy
    /// of the message. When the channel is full, senders will apply
    /// backpressure by waiting for space to free up.
    //
    // N.B messages are eagerly broadcasted, meaning no active receivers are
    // required for a message to be broadcasted.
    pub(crate) fn new(buf_size: usize) -> Dispatcher<K> {
        // Create a broadcast (tx, rx) pair
        let (mut dispatch_tx, dispatch_rx) = async_broadcast::broadcast(buf_size);
        // The tx half will not wait for any receivers to be active before
        // broadcasting events. If no receivers are active, events will be
        // buffered.
        dispatch_tx.set_await_active(false);
        Self {
            dispatch_tx,
            _dispatch_rx: dispatch_rx.deactivate(),
        }
    }

    // Calls broadcast on the channel. Will return when the channel has enough
    // space to send an event.
    pub(crate) async fn broadcast(&mut self, obj_ref: ObjectRef<K>) {
        let _ = self.dispatch_tx.broadcast_direct(obj_ref).await;
    }

    // Creates a `ReflectHandle` by creating a receiver from the tx half.
    // N.B: the new receiver will be fast-forwarded to the _latest_ event.
    // The receiver won't have access to any events that are currently waiting
    // to be acked by listeners.
    pub(crate) fn subscribe(&self, reader: Store<K>) -> ReflectHandle<K> {
        ReflectHandle::new(reader, self.dispatch_tx.new_receiver())
    }
}

/// A handle to a shared stream reader
///
/// [`ReflectHandle`]s are created by calling [`subscribe()`] on a [`Writer`],
/// or by calling `clone()` on an already existing [`ReflectHandle`]. Each
/// shared stream reader should be polled independently and driven to readiness
/// to avoid deadlocks. When the [`Writer`]'s buffer is filled, backpressure
/// will be applied on the root stream side.
///
/// When the root stream is dropped, or it ends, all [`ReflectHandle`]s
/// subscribed to the stream will also terminate after all events yielded by
/// the root stream have been observed. This means [`ReflectHandle`] streams
/// can still be polled after the root stream has been dropped.
///
/// [`Writer`]: crate::reflector::Writer
#[pin_project]
pub struct ReflectHandle<K>
where
    K: Lookup + Clone + 'static,
    K::DynamicType: Eq + std::hash::Hash + Clone,
{
    #[pin]
    rx: Receiver<ObjectRef<K>>,
    reader: Store<K>,
}

impl<K> Clone for ReflectHandle<K>
where
    K: Lookup + Clone + 'static,
    K::DynamicType: Eq + std::hash::Hash + Clone,
{
    fn clone(&self) -> Self {
        ReflectHandle::new(self.reader.clone(), self.rx.clone())
    }
}

impl<K> ReflectHandle<K>
where
    K: Lookup + Clone,
    K::DynamicType: Eq + std::hash::Hash + Clone,
{
    pub(super) fn new(reader: Store<K>, rx: Receiver<ObjectRef<K>>) -> ReflectHandle<K> {
        Self { rx, reader }
    }

    #[must_use]
    pub fn reader(&self) -> Store<K> {
        self.reader.clone()
    }
}

impl<K> Stream for ReflectHandle<K>
where
    K: Lookup + Clone,
    K::DynamicType: Eq + std::hash::Hash + Clone + Default,
{
    type Item = Arc<K>;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let mut this = self.project();
        match ready!(this.rx.as_mut().poll_next(cx)) {
            Some(obj_ref) => this
                .reader
                .get(&obj_ref)
                .map_or(Poll::Pending, |obj| Poll::Ready(Some(obj))),
            None => Poll::Ready(None),
        }
    }
}

#[cfg(feature = "unstable-runtime-subscribe")]
#[cfg(test)]
pub(crate) mod test {
    use crate::{
        watcher::{Error, Event},
        WatchStreamExt,
    };
    use std::{sync::Arc, task::Poll};

    use crate::reflector;
    use futures::{pin_mut, poll, stream, StreamExt};
    use k8s_openapi::api::core::v1::Pod;

    fn testpod(name: &str) -> Pod {
        let mut pod = Pod::default();
        pod.metadata.name = Some(name.to_string());
        pod
    }

    #[tokio::test]
    async fn events_are_passed_through() {
        let foo = testpod("foo");
        let bar = testpod("bar");
        let st = stream::iter([
            Ok(Event::Apply(foo.clone())),
            Err(Error::NoResourceVersion),
            Ok(Event::Init),
            Ok(Event::InitApply(foo)),
            Ok(Event::InitApply(bar)),
            Ok(Event::InitDone),
        ]);

        let (reader, writer) = reflector::store_shared(10);
        let reflect = st.reflect_shared(writer);
        pin_mut!(reflect);

        // Prior to any polls, we should have an empty store.
        assert_eq!(reader.len(), 0);
        assert!(matches!(
            poll!(reflect.next()),
            Poll::Ready(Some(Ok(Event::Apply(_))))
        ));

        // Make progress and assert all events are seen
        assert_eq!(reader.len(), 1);
        assert!(matches!(
            poll!(reflect.next()),
            Poll::Ready(Some(Err(Error::NoResourceVersion)))
        ));
        assert_eq!(reader.len(), 1);

        let restarted = poll!(reflect.next());
        assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::Init)))));
        assert_eq!(reader.len(), 1);

        let restarted = poll!(reflect.next());
        assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::InitApply(_))))));
        assert_eq!(reader.len(), 1);

        let restarted = poll!(reflect.next());
        assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::InitApply(_))))));
        assert_eq!(reader.len(), 1);

        let restarted = poll!(reflect.next());
        assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::InitDone)))));
        assert_eq!(reader.len(), 2);

        assert!(matches!(poll!(reflect.next()), Poll::Ready(None)));
        assert_eq!(reader.len(), 2);
    }

    #[tokio::test]
    async fn readers_yield_touched_objects() {
        // Readers should yield touched objects they receive from Stream events.
        //
        // NOTE: a Delete(_) event will be ignored if the item does not exist in
        // the cache. Same with a Restarted(vec![delete_item])
        let foo = testpod("foo");
        let bar = testpod("bar");
        let st = stream::iter([
            Ok(Event::Delete(foo.clone())),
            Ok(Event::Apply(foo.clone())),
            Err(Error::NoResourceVersion),
            Ok(Event::Init),
            Ok(Event::InitApply(foo.clone())),
            Ok(Event::InitApply(bar.clone())),
            Ok(Event::InitDone),
        ]);

        let foo = Arc::new(foo);
        let _bar = Arc::new(bar);

        let (_, writer) = reflector::store_shared(10);
        let subscriber = writer.subscribe().unwrap();
        let reflect = st.reflect_shared(writer);
        pin_mut!(reflect);
        pin_mut!(subscriber);

        // Deleted events should be skipped by subscriber.
        assert!(matches!(
            poll!(reflect.next()),
            Poll::Ready(Some(Ok(Event::Delete(_))))
        ));
        assert_eq!(poll!(subscriber.next()), Poll::Pending);

        assert!(matches!(
            poll!(reflect.next()),
            Poll::Ready(Some(Ok(Event::Apply(_))))
        ));
        assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone())));

        // Errors are not propagated to subscribers.
        assert!(matches!(
            poll!(reflect.next()),
            Poll::Ready(Some(Err(Error::NoResourceVersion)))
        ));
        assert!(matches!(poll!(subscriber.next()), Poll::Pending));

        // Restart event will yield all objects in the list

        assert!(matches!(
            poll!(reflect.next()),
            Poll::Ready(Some(Ok(Event::Init)))
        ));

        assert!(matches!(
            poll!(reflect.next()),
            Poll::Ready(Some(Ok(Event::InitApply(_))))
        ));
        assert!(matches!(
            poll!(reflect.next()),
            Poll::Ready(Some(Ok(Event::InitApply(_))))
        ));

        assert!(matches!(
            poll!(reflect.next()),
            Poll::Ready(Some(Ok(Event::InitDone)))
        ));

        // these don't come back in order atm:
        assert!(matches!(poll!(subscriber.next()), Poll::Ready(Some(_))));
        assert!(matches!(poll!(subscriber.next()), Poll::Ready(Some(_))));

        // When main channel is closed, it is propagated to subscribers
        assert!(matches!(poll!(reflect.next()), Poll::Ready(None)));
        assert_eq!(poll!(subscriber.next()), Poll::Ready(None));
    }

    #[tokio::test]
    async fn readers_yield_when_tx_drops() {
        // Once the main stream is dropped, readers should continue to make
        // progress and read values that have been sent on the channel.
        let foo = testpod("foo");
        let bar = testpod("bar");
        let st = stream::iter([
            Ok(Event::Apply(foo.clone())),
            Ok(Event::Init),
            Ok(Event::InitApply(foo.clone())),
            Ok(Event::InitApply(bar.clone())),
            Ok(Event::InitDone),
        ]);

        let foo = Arc::new(foo);
        let _bar = Arc::new(bar);

        let (_, writer) = reflector::store_shared(10);
        let subscriber = writer.subscribe().unwrap();
        let mut reflect = Box::pin(st.reflect_shared(writer));
        pin_mut!(subscriber);

        assert!(matches!(
            poll!(reflect.next()),
            Poll::Ready(Some(Ok(Event::Apply(_))))
        ));
        assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone())));

        // Restart event will yield all objects in the list. Broadcast values
        // without polling and then drop.
        //
        // First, subscribers should be pending.
        assert_eq!(poll!(subscriber.next()), Poll::Pending);

        assert!(matches!(
            poll!(reflect.next()),
            Poll::Ready(Some(Ok(Event::Init)))
        ));
        assert_eq!(poll!(subscriber.next()), Poll::Pending);

        assert!(matches!(
            poll!(reflect.next()),
            Poll::Ready(Some(Ok(Event::InitApply(_))))
        ));
        assert_eq!(poll!(subscriber.next()), Poll::Pending);

        assert!(matches!(
            poll!(reflect.next()),
            Poll::Ready(Some(Ok(Event::InitApply(_))))
        ));
        assert_eq!(poll!(subscriber.next()), Poll::Pending);

        assert!(matches!(
            poll!(reflect.next()),
            Poll::Ready(Some(Ok(Event::InitDone)))
        ));
        drop(reflect);

        // we will get foo and bar here, but we dont have a guaranteed ordering on page events
        assert!(matches!(poll!(subscriber.next()), Poll::Ready(Some(_))));
        assert!(matches!(poll!(subscriber.next()), Poll::Ready(Some(_))));
        assert_eq!(poll!(subscriber.next()), Poll::Ready(None));
    }

    #[tokio::test]
    async fn reflect_applies_backpressure() {
        // When the channel is full, we should observe backpressure applied.
        //
        // This will be manifested by receiving Poll::Pending on the reflector
        // stream while the reader stream is not polled. Once we unblock the
        // buffer, the reflector will make progress.
        let foo = testpod("foo");
        let bar = testpod("bar");
        let st = stream::iter([
            //TODO: include a ready event here to avoid dealing with Init?
            Ok(Event::Apply(foo.clone())),
            Ok(Event::Apply(bar.clone())),
            Ok(Event::Apply(foo.clone())),
        ]);

        let foo = Arc::new(foo);
        let bar = Arc::new(bar);

        let (_, writer) = reflector::store_shared(1);
        let subscriber = writer.subscribe().unwrap();
        let subscriber_slow = writer.subscribe().unwrap();
        let reflect = st.reflect_shared(writer);
        pin_mut!(reflect);
        pin_mut!(subscriber);
        pin_mut!(subscriber_slow);

        assert_eq!(poll!(subscriber.next()), Poll::Pending);
        assert_eq!(poll!(subscriber_slow.next()), Poll::Pending);

        // Poll first subscriber, but not the second.
        //
        // The buffer can hold one object value, so even if we have a slow subscriber,
        // we will still get an event from the root.
        assert!(matches!(
            poll!(reflect.next()),
            Poll::Ready(Some(Ok(Event::Apply(_))))
        ));
        assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone())));

        // One subscriber is not reading, so we need to apply backpressure until
        // channel has capacity.
        //
        // At this point, the buffer is full. Polling again will trigger the
        // backpressure logic.
        assert!(matches!(poll!(reflect.next()), Poll::Pending));

        // Our "fast" subscriber will also have nothing else to poll until the
        // slower subscriber advances its pointer in the buffer.
        assert_eq!(poll!(subscriber.next()), Poll::Pending);

        // Advance slow reader
        assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(Some(foo.clone())));

        // We now have room for only one more item. In total, the previous event
        // had two. We repeat the same pattern.
        assert!(matches!(
            poll!(reflect.next()),
            Poll::Ready(Some(Ok(Event::Apply(_))))
        ));
        assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(bar.clone())));
        assert!(matches!(poll!(reflect.next()), Poll::Pending));
        assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(Some(bar.clone())));
        assert!(matches!(
            poll!(reflect.next()),
            Poll::Ready(Some(Ok(Event::Apply(_))))
        ));
        // Poll again to drain the queue.
        assert!(matches!(poll!(reflect.next()), Poll::Ready(None)));
        assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone())));
        assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(Some(foo.clone())));

        assert_eq!(poll!(subscriber.next()), Poll::Ready(None));
        assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(None));
    }

    // TODO (matei): tests around cloning subscribers once a watch stream has already
    // been established. This will depend on the interfaces & impl so are left
    // out for now.
}