kube_runtime/controller/
runner.rs

1use super::future_hash_map::FutureHashMap;
2use crate::scheduler::{ScheduleRequest, Scheduler};
3use futures::{FutureExt, Stream, StreamExt};
4use pin_project::pin_project;
5use std::{
6    convert::Infallible,
7    future::{self, Future},
8    hash::Hash,
9    pin::Pin,
10    task::{Context, Poll},
11};
12use thiserror::Error;
13
14#[derive(Debug, Error)]
15pub enum Error<ReadyErr> {
16    #[error("readiness gate failed to become ready")]
17    Readiness(#[source] ReadyErr),
18}
19
20/// Pulls items from a [`Scheduler`], and runs an action for each item in parallel,
21/// while making sure to not process [equal](`Eq`) items multiple times at once.
22///
23/// If an item is to be emitted from the [`Scheduler`] while an equal item is
24/// already being processed then it will be held pending until the current item
25/// is finished.
26#[pin_project]
27pub struct Runner<T, R, F, MkF, Ready = future::Ready<Result<(), Infallible>>> {
28    #[pin]
29    scheduler: Scheduler<T, R>,
30    run_msg: MkF,
31    slots: FutureHashMap<T, F>,
32    #[pin]
33    ready_to_execute_after: futures::future::Fuse<Ready>,
34    is_ready_to_execute: bool,
35    stopped: bool,
36    max_concurrent_executions: u16,
37}
38
39impl<T, R, F, MkF> Runner<T, R, F, MkF>
40where
41    F: Future + Unpin,
42    MkF: FnMut(&T) -> F,
43{
44    /// Creates a new [`Runner`]. [`max_concurrent_executions`] can be used to
45    /// limit the number of items are run concurrently. It can be set to 0 to
46    /// allow for unbounded concurrency.
47    pub fn new(scheduler: Scheduler<T, R>, max_concurrent_executions: u16, run_msg: MkF) -> Self {
48        Self {
49            scheduler,
50            run_msg,
51            slots: FutureHashMap::default(),
52            ready_to_execute_after: future::ready(Ok(())).fuse(),
53            is_ready_to_execute: false,
54            stopped: false,
55            max_concurrent_executions,
56        }
57    }
58
59    /// Wait for `ready_to_execute_after` to complete before starting to run any scheduled tasks.
60    ///
61    /// `scheduler` will still be polled in the meantime.
62    pub fn delay_tasks_until<Ready, ReadyErr>(
63        self,
64        ready_to_execute_after: Ready,
65    ) -> Runner<T, R, F, MkF, Ready>
66    where
67        Ready: Future<Output = Result<(), ReadyErr>>,
68    {
69        Runner {
70            scheduler: self.scheduler,
71            run_msg: self.run_msg,
72            slots: self.slots,
73            ready_to_execute_after: ready_to_execute_after.fuse(),
74            is_ready_to_execute: false,
75            stopped: false,
76            max_concurrent_executions: self.max_concurrent_executions,
77        }
78    }
79}
80
81#[allow(clippy::match_wildcard_for_single_variants)]
82impl<T, R, F, MkF, Ready, ReadyErr> Stream for Runner<T, R, F, MkF, Ready>
83where
84    T: Eq + Hash + Clone + Unpin,
85    R: Stream<Item = ScheduleRequest<T>>,
86    F: Future + Unpin,
87    MkF: FnMut(&T) -> F,
88    Ready: Future<Output = Result<(), ReadyErr>>,
89{
90    type Item = Result<F::Output, Error<ReadyErr>>;
91
92    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
93        let mut this = self.project();
94        if *this.stopped {
95            return Poll::Ready(None);
96        }
97        let slots = this.slots;
98        let scheduler = &mut this.scheduler;
99        let has_active_slots = match slots.poll_next_unpin(cx) {
100            Poll::Ready(Some(result)) => return Poll::Ready(Some(Ok(result))),
101            Poll::Ready(None) => false,
102            Poll::Pending => true,
103        };
104        match this.ready_to_execute_after.poll(cx) {
105            Poll::Ready(Ok(())) => *this.is_ready_to_execute = true,
106            Poll::Ready(Err(err)) => {
107                *this.stopped = true;
108                return Poll::Ready(Some(Err(Error::Readiness(err))));
109            }
110            Poll::Pending => {}
111        }
112        loop {
113            // If we are at our limit or not ready to start executing, then there's
114            // no point in trying to get something from the scheduler, so just put
115            // all expired messages emitted from the queue into pending.
116            if (*this.max_concurrent_executions > 0
117                && slots.len() >= *this.max_concurrent_executions as usize)
118                || !*this.is_ready_to_execute
119            {
120                match scheduler.as_mut().hold().poll_next_unpin(cx) {
121                    Poll::Pending | Poll::Ready(None) => break Poll::Pending,
122                    // The above future never returns Poll::Ready(Some(_)).
123                    _ => unreachable!(),
124                };
125            };
126
127            // Try to take a new message that isn't already being processed
128            // leave the already-processing ones in the queue, so that we can take them once
129            // we're free again.
130            let next_msg_poll = scheduler
131                .as_mut()
132                .hold_unless(|msg| !slots.contains_key(msg))
133                .poll_next_unpin(cx);
134            match next_msg_poll {
135                Poll::Ready(Some(msg)) => {
136                    let msg_fut = (this.run_msg)(&msg);
137                    assert!(
138                        slots.insert(msg, msg_fut).is_none(),
139                        "Runner tried to replace a running future.. please report this as a kube-rs bug!"
140                    );
141                    cx.waker().wake_by_ref();
142                }
143                Poll::Ready(None) => {
144                    break if has_active_slots {
145                        // We're done listening for new messages, but still have some that
146                        // haven't finished quite yet
147                        Poll::Pending
148                    } else {
149                        Poll::Ready(None)
150                    };
151                }
152                Poll::Pending => break Poll::Pending,
153            }
154        }
155    }
156}
157
158#[cfg(test)]
159mod tests {
160    use super::{Error, Runner};
161    use crate::{
162        scheduler::{scheduler, ScheduleRequest},
163        utils::delayed_init::{self, DelayedInit},
164    };
165    use futures::{
166        channel::{mpsc, oneshot},
167        future, poll, stream, Future, SinkExt, StreamExt, TryStreamExt,
168    };
169    use std::{
170        cell::RefCell,
171        collections::HashMap,
172        pin::Pin,
173        sync::{Arc, Mutex},
174        task::{Context, Poll},
175        time::Duration,
176    };
177    use tokio::{
178        runtime::Handle,
179        task::yield_now,
180        time::{advance, pause, sleep, timeout, Instant},
181    };
182
183    #[tokio::test]
184    async fn runner_should_never_run_two_instances_at_once() {
185        pause();
186        let rc = RefCell::new(());
187        let mut count = 0;
188        let (mut sched_tx, sched_rx) = mpsc::unbounded();
189        let mut runner = Box::pin(
190            // The debounce period needs to zero because a debounce period > 0
191            // will lead to the second request to be discarded.
192            Runner::new(scheduler(sched_rx), 0, |()| {
193                count += 1;
194                // Panic if this ref is already held, to simulate some unsafe action..
195                let mutex_ref = rc.borrow_mut();
196                Box::pin(async move {
197                    sleep(Duration::from_secs(1)).await;
198                    drop(mutex_ref);
199                })
200            })
201            .for_each(|_| async {}),
202        );
203        sched_tx
204            .send(ScheduleRequest {
205                message: (),
206                run_at: Instant::now(),
207            })
208            .await
209            .unwrap();
210        assert!(poll!(runner.as_mut()).is_pending());
211        sched_tx
212            .send(ScheduleRequest {
213                message: (),
214                run_at: Instant::now(),
215            })
216            .await
217            .unwrap();
218        future::join(
219            async {
220                tokio::time::sleep(Duration::from_secs(5)).await;
221                drop(sched_tx);
222            },
223            runner,
224        )
225        .await;
226        // Validate that we actually ran both requests
227        assert_eq!(count, 2);
228    }
229
230    // Test MUST be single-threaded to be consistent, since it concerns a relatively messy
231    // interplay between multiple tasks
232    #[tokio::test(flavor = "current_thread")]
233    async fn runner_should_wake_when_scheduling_messages() {
234        // pause();
235        let (mut sched_tx, sched_rx) = mpsc::unbounded();
236        let (result_tx, result_rx) = oneshot::channel();
237        let mut runner = Runner::new(scheduler(sched_rx), 0, |msg: &u8| std::future::ready(*msg));
238        // Start a background task that starts listening /before/ we enqueue the message
239        // We can't just use Stream::poll_next(), since that bypasses the waker system
240        Handle::current().spawn(async move { result_tx.send(runner.next().await).unwrap() });
241        // Ensure that the background task actually gets to initiate properly, and starts polling the runner
242        yield_now().await;
243        sched_tx
244            .send(ScheduleRequest {
245                message: 8,
246                run_at: Instant::now(),
247            })
248            .await
249            .unwrap();
250        // Eventually the background task should finish up and report the message received,
251        // a timeout here *should* mean that the background task isn't getting awoken properly
252        // when the new message is ready.
253        assert_eq!(
254            timeout(Duration::from_secs(1), result_rx)
255                .await
256                .unwrap()
257                .unwrap()
258                .transpose()
259                .unwrap(),
260            Some(8)
261        );
262    }
263
264    #[tokio::test]
265    async fn runner_should_wait_for_readiness() {
266        let is_ready = Mutex::new(false);
267        let (delayed_init, ready) = DelayedInit::<()>::new();
268        let mut runner = Box::pin(
269            Runner::new(
270                scheduler(
271                    stream::iter([ScheduleRequest {
272                        message: 1u8,
273                        run_at: Instant::now(),
274                    }])
275                    .chain(stream::pending()),
276                ),
277                0,
278                |msg| {
279                    assert!(*is_ready.lock().unwrap());
280                    std::future::ready(*msg)
281                },
282            )
283            .delay_tasks_until(ready.get()),
284        );
285        assert!(poll!(runner.next()).is_pending());
286        *is_ready.lock().unwrap() = true;
287        delayed_init.init(());
288        assert_eq!(runner.next().await.transpose().unwrap(), Some(1));
289    }
290
291    #[tokio::test]
292    async fn runner_should_dedupe_while_waiting_for_readiness() {
293        let is_ready = Mutex::new(false);
294        let (delayed_init, ready) = DelayedInit::<()>::new();
295        let mut runner = Box::pin(
296            Runner::new(
297                scheduler(
298                    stream::iter([
299                        ScheduleRequest {
300                            message: 'a',
301                            run_at: Instant::now(),
302                        },
303                        ScheduleRequest {
304                            message: 'b',
305                            run_at: Instant::now(),
306                        },
307                        ScheduleRequest {
308                            message: 'a',
309                            run_at: Instant::now(),
310                        },
311                    ])
312                    .chain(stream::pending()),
313                ),
314                0,
315                |msg| {
316                    assert!(*is_ready.lock().unwrap());
317                    std::future::ready(*msg)
318                },
319            )
320            .delay_tasks_until(ready.get()),
321        );
322        assert!(poll!(runner.next()).is_pending());
323        *is_ready.lock().unwrap() = true;
324        delayed_init.init(());
325        let mut message_counts = HashMap::new();
326        assert!(timeout(
327            Duration::from_secs(1),
328            runner.try_for_each(|msg| {
329                *message_counts.entry(msg).or_default() += 1;
330                async { Ok(()) }
331            })
332        )
333        .await
334        .is_err());
335        assert_eq!(message_counts, HashMap::from([('a', 1), ('b', 1)]));
336    }
337
338    #[tokio::test]
339    async fn runner_should_report_readiness_errors() {
340        let (delayed_init, ready) = DelayedInit::<()>::new();
341        let mut runner = Box::pin(
342            Runner::new(
343                scheduler(
344                    stream::iter([ScheduleRequest {
345                        message: (),
346                        run_at: Instant::now(),
347                    }])
348                    .chain(stream::pending()),
349                ),
350                0,
351                |()| {
352                    panic!("run_msg should never be invoked if readiness gate fails");
353                    // It's "useless", but it helps to direct rustc to the correct types
354                    #[allow(unreachable_code)]
355                    std::future::ready(())
356                },
357            )
358            .delay_tasks_until(ready.get()),
359        );
360        assert!(poll!(runner.next()).is_pending());
361        drop(delayed_init);
362        assert!(matches!(
363            runner.try_collect::<Vec<_>>().await.unwrap_err(),
364            Error::Readiness(delayed_init::InitDropped)
365        ));
366    }
367
368    // A Future that is Ready after the specified duration from its initialization.
369    struct DurationalFuture {
370        start: Instant,
371        ready_after: Duration,
372    }
373
374    impl DurationalFuture {
375        fn new(expires_in: Duration) -> Self {
376            let start = Instant::now();
377            DurationalFuture {
378                start,
379                ready_after: expires_in,
380            }
381        }
382    }
383
384    impl Future for DurationalFuture {
385        type Output = ();
386
387        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
388            let now = Instant::now();
389            if now.duration_since(self.start) > self.ready_after {
390                Poll::Ready(())
391            } else {
392                cx.waker().wake_by_ref();
393                Poll::Pending
394            }
395        }
396    }
397
398    #[tokio::test]
399    async fn runner_should_respect_max_concurrent_executions() {
400        pause();
401
402        let count = Arc::new(Mutex::new(0));
403        let (mut sched_tx, sched_rx) = mpsc::unbounded();
404        let mut runner = Box::pin(
405            Runner::new(scheduler(sched_rx), 2, |_| {
406                let mut num = count.lock().unwrap();
407                *num += 1;
408                DurationalFuture::new(Duration::from_secs(2))
409            })
410            .for_each(|_| async {}),
411        );
412
413        sched_tx
414            .send(ScheduleRequest {
415                message: 1,
416                run_at: Instant::now(),
417            })
418            .await
419            .unwrap();
420        assert!(poll!(runner.as_mut()).is_pending());
421        sched_tx
422            .send(ScheduleRequest {
423                message: 2,
424                run_at: Instant::now(),
425            })
426            .await
427            .unwrap();
428        assert!(poll!(runner.as_mut()).is_pending());
429        sched_tx
430            .send(ScheduleRequest {
431                message: 3,
432                run_at: Instant::now(),
433            })
434            .await
435            .unwrap();
436        assert!(poll!(runner.as_mut()).is_pending());
437        // Assert that we only ran two out of the three requests
438        assert_eq!(*count.lock().unwrap(), 2);
439
440        advance(Duration::from_secs(3)).await;
441        assert!(poll!(runner.as_mut()).is_pending());
442        // Assert that we run the third request when we have the capacity to
443        assert_eq!(*count.lock().unwrap(), 3);
444        advance(Duration::from_secs(3)).await;
445        assert!(poll!(runner.as_mut()).is_pending());
446
447        // Send the third message again and check it's ran
448        sched_tx
449            .send(ScheduleRequest {
450                message: 3,
451                run_at: Instant::now(),
452            })
453            .await
454            .unwrap();
455        advance(Duration::from_secs(3)).await;
456        assert!(poll!(runner.as_mut()).is_pending());
457        assert_eq!(*count.lock().unwrap(), 4);
458
459        let (mut sched_tx, sched_rx) = mpsc::unbounded();
460        let mut runner = Box::pin(
461            Runner::new(scheduler(sched_rx), 1, |_| {
462                DurationalFuture::new(Duration::from_secs(2))
463            })
464            .for_each(|_| async {}),
465        );
466
467        sched_tx
468            .send(ScheduleRequest {
469                message: 1,
470                run_at: Instant::now(),
471            })
472            .await
473            .unwrap();
474        assert!(poll!(runner.as_mut()).is_pending());
475
476        // Drop the sender to test that we stop the runner when the requests
477        // stream finishes.
478        drop(sched_tx);
479        assert_eq!(poll!(runner.as_mut()), Poll::Pending);
480    }
481}