1use super::future_hash_map::FutureHashMap;
2use crate::scheduler::{ScheduleRequest, Scheduler};
3use futures::{FutureExt, Stream, StreamExt};
4use pin_project::pin_project;
5use std::{
6 convert::Infallible,
7 future::{self, Future},
8 hash::Hash,
9 pin::Pin,
10 task::{Context, Poll},
11};
12use thiserror::Error;
13
14#[derive(Debug, Error)]
15pub enum Error<ReadyErr> {
16 #[error("readiness gate failed to become ready")]
17 Readiness(#[source] ReadyErr),
18}
19
20#[pin_project]
27pub struct Runner<T, R, F, MkF, Ready = future::Ready<Result<(), Infallible>>> {
28 #[pin]
29 scheduler: Scheduler<T, R>,
30 run_msg: MkF,
31 slots: FutureHashMap<T, F>,
32 #[pin]
33 ready_to_execute_after: futures::future::Fuse<Ready>,
34 is_ready_to_execute: bool,
35 stopped: bool,
36 max_concurrent_executions: u16,
37}
38
39impl<T, R, F, MkF> Runner<T, R, F, MkF>
40where
41 F: Future + Unpin,
42 MkF: FnMut(&T) -> F,
43{
44 pub fn new(scheduler: Scheduler<T, R>, max_concurrent_executions: u16, run_msg: MkF) -> Self {
48 Self {
49 scheduler,
50 run_msg,
51 slots: FutureHashMap::default(),
52 ready_to_execute_after: future::ready(Ok(())).fuse(),
53 is_ready_to_execute: false,
54 stopped: false,
55 max_concurrent_executions,
56 }
57 }
58
59 pub fn delay_tasks_until<Ready, ReadyErr>(
63 self,
64 ready_to_execute_after: Ready,
65 ) -> Runner<T, R, F, MkF, Ready>
66 where
67 Ready: Future<Output = Result<(), ReadyErr>>,
68 {
69 Runner {
70 scheduler: self.scheduler,
71 run_msg: self.run_msg,
72 slots: self.slots,
73 ready_to_execute_after: ready_to_execute_after.fuse(),
74 is_ready_to_execute: false,
75 stopped: false,
76 max_concurrent_executions: self.max_concurrent_executions,
77 }
78 }
79}
80
81#[allow(clippy::match_wildcard_for_single_variants)]
82impl<T, R, F, MkF, Ready, ReadyErr> Stream for Runner<T, R, F, MkF, Ready>
83where
84 T: Eq + Hash + Clone + Unpin,
85 R: Stream<Item = ScheduleRequest<T>>,
86 F: Future + Unpin,
87 MkF: FnMut(&T) -> F,
88 Ready: Future<Output = Result<(), ReadyErr>>,
89{
90 type Item = Result<F::Output, Error<ReadyErr>>;
91
92 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
93 let mut this = self.project();
94 if *this.stopped {
95 return Poll::Ready(None);
96 }
97 let slots = this.slots;
98 let scheduler = &mut this.scheduler;
99 let has_active_slots = match slots.poll_next_unpin(cx) {
100 Poll::Ready(Some(result)) => return Poll::Ready(Some(Ok(result))),
101 Poll::Ready(None) => false,
102 Poll::Pending => true,
103 };
104 match this.ready_to_execute_after.poll(cx) {
105 Poll::Ready(Ok(())) => *this.is_ready_to_execute = true,
106 Poll::Ready(Err(err)) => {
107 *this.stopped = true;
108 return Poll::Ready(Some(Err(Error::Readiness(err))));
109 }
110 Poll::Pending => {}
111 }
112 loop {
113 if (*this.max_concurrent_executions > 0
117 && slots.len() >= *this.max_concurrent_executions as usize)
118 || !*this.is_ready_to_execute
119 {
120 match scheduler.as_mut().hold().poll_next_unpin(cx) {
121 Poll::Pending | Poll::Ready(None) => break Poll::Pending,
122 _ => unreachable!(),
124 };
125 };
126
127 let next_msg_poll = scheduler
131 .as_mut()
132 .hold_unless(|msg| !slots.contains_key(msg))
133 .poll_next_unpin(cx);
134 match next_msg_poll {
135 Poll::Ready(Some(msg)) => {
136 let msg_fut = (this.run_msg)(&msg);
137 assert!(
138 slots.insert(msg, msg_fut).is_none(),
139 "Runner tried to replace a running future.. please report this as a kube-rs bug!"
140 );
141 cx.waker().wake_by_ref();
142 }
143 Poll::Ready(None) => {
144 break if has_active_slots {
145 Poll::Pending
148 } else {
149 Poll::Ready(None)
150 };
151 }
152 Poll::Pending => break Poll::Pending,
153 }
154 }
155 }
156}
157
158#[cfg(test)]
159mod tests {
160 use super::{Error, Runner};
161 use crate::{
162 scheduler::{scheduler, ScheduleRequest},
163 utils::delayed_init::{self, DelayedInit},
164 };
165 use futures::{
166 channel::{mpsc, oneshot},
167 future, poll, stream, Future, SinkExt, StreamExt, TryStreamExt,
168 };
169 use std::{
170 cell::RefCell,
171 collections::HashMap,
172 pin::Pin,
173 sync::{Arc, Mutex},
174 task::{Context, Poll},
175 time::Duration,
176 };
177 use tokio::{
178 runtime::Handle,
179 task::yield_now,
180 time::{advance, pause, sleep, timeout, Instant},
181 };
182
183 #[tokio::test]
184 async fn runner_should_never_run_two_instances_at_once() {
185 pause();
186 let rc = RefCell::new(());
187 let mut count = 0;
188 let (mut sched_tx, sched_rx) = mpsc::unbounded();
189 let mut runner = Box::pin(
190 Runner::new(scheduler(sched_rx), 0, |()| {
193 count += 1;
194 let mutex_ref = rc.borrow_mut();
196 Box::pin(async move {
197 sleep(Duration::from_secs(1)).await;
198 drop(mutex_ref);
199 })
200 })
201 .for_each(|_| async {}),
202 );
203 sched_tx
204 .send(ScheduleRequest {
205 message: (),
206 run_at: Instant::now(),
207 })
208 .await
209 .unwrap();
210 assert!(poll!(runner.as_mut()).is_pending());
211 sched_tx
212 .send(ScheduleRequest {
213 message: (),
214 run_at: Instant::now(),
215 })
216 .await
217 .unwrap();
218 future::join(
219 async {
220 tokio::time::sleep(Duration::from_secs(5)).await;
221 drop(sched_tx);
222 },
223 runner,
224 )
225 .await;
226 assert_eq!(count, 2);
228 }
229
230 #[tokio::test(flavor = "current_thread")]
233 async fn runner_should_wake_when_scheduling_messages() {
234 let (mut sched_tx, sched_rx) = mpsc::unbounded();
236 let (result_tx, result_rx) = oneshot::channel();
237 let mut runner = Runner::new(scheduler(sched_rx), 0, |msg: &u8| std::future::ready(*msg));
238 Handle::current().spawn(async move { result_tx.send(runner.next().await).unwrap() });
241 yield_now().await;
243 sched_tx
244 .send(ScheduleRequest {
245 message: 8,
246 run_at: Instant::now(),
247 })
248 .await
249 .unwrap();
250 assert_eq!(
254 timeout(Duration::from_secs(1), result_rx)
255 .await
256 .unwrap()
257 .unwrap()
258 .transpose()
259 .unwrap(),
260 Some(8)
261 );
262 }
263
264 #[tokio::test]
265 async fn runner_should_wait_for_readiness() {
266 let is_ready = Mutex::new(false);
267 let (delayed_init, ready) = DelayedInit::<()>::new();
268 let mut runner = Box::pin(
269 Runner::new(
270 scheduler(
271 stream::iter([ScheduleRequest {
272 message: 1u8,
273 run_at: Instant::now(),
274 }])
275 .chain(stream::pending()),
276 ),
277 0,
278 |msg| {
279 assert!(*is_ready.lock().unwrap());
280 std::future::ready(*msg)
281 },
282 )
283 .delay_tasks_until(ready.get()),
284 );
285 assert!(poll!(runner.next()).is_pending());
286 *is_ready.lock().unwrap() = true;
287 delayed_init.init(());
288 assert_eq!(runner.next().await.transpose().unwrap(), Some(1));
289 }
290
291 #[tokio::test]
292 async fn runner_should_dedupe_while_waiting_for_readiness() {
293 let is_ready = Mutex::new(false);
294 let (delayed_init, ready) = DelayedInit::<()>::new();
295 let mut runner = Box::pin(
296 Runner::new(
297 scheduler(
298 stream::iter([
299 ScheduleRequest {
300 message: 'a',
301 run_at: Instant::now(),
302 },
303 ScheduleRequest {
304 message: 'b',
305 run_at: Instant::now(),
306 },
307 ScheduleRequest {
308 message: 'a',
309 run_at: Instant::now(),
310 },
311 ])
312 .chain(stream::pending()),
313 ),
314 0,
315 |msg| {
316 assert!(*is_ready.lock().unwrap());
317 std::future::ready(*msg)
318 },
319 )
320 .delay_tasks_until(ready.get()),
321 );
322 assert!(poll!(runner.next()).is_pending());
323 *is_ready.lock().unwrap() = true;
324 delayed_init.init(());
325 let mut message_counts = HashMap::new();
326 assert!(timeout(
327 Duration::from_secs(1),
328 runner.try_for_each(|msg| {
329 *message_counts.entry(msg).or_default() += 1;
330 async { Ok(()) }
331 })
332 )
333 .await
334 .is_err());
335 assert_eq!(message_counts, HashMap::from([('a', 1), ('b', 1)]));
336 }
337
338 #[tokio::test]
339 async fn runner_should_report_readiness_errors() {
340 let (delayed_init, ready) = DelayedInit::<()>::new();
341 let mut runner = Box::pin(
342 Runner::new(
343 scheduler(
344 stream::iter([ScheduleRequest {
345 message: (),
346 run_at: Instant::now(),
347 }])
348 .chain(stream::pending()),
349 ),
350 0,
351 |()| {
352 panic!("run_msg should never be invoked if readiness gate fails");
353 #[allow(unreachable_code)]
355 std::future::ready(())
356 },
357 )
358 .delay_tasks_until(ready.get()),
359 );
360 assert!(poll!(runner.next()).is_pending());
361 drop(delayed_init);
362 assert!(matches!(
363 runner.try_collect::<Vec<_>>().await.unwrap_err(),
364 Error::Readiness(delayed_init::InitDropped)
365 ));
366 }
367
368 struct DurationalFuture {
370 start: Instant,
371 ready_after: Duration,
372 }
373
374 impl DurationalFuture {
375 fn new(expires_in: Duration) -> Self {
376 let start = Instant::now();
377 DurationalFuture {
378 start,
379 ready_after: expires_in,
380 }
381 }
382 }
383
384 impl Future for DurationalFuture {
385 type Output = ();
386
387 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
388 let now = Instant::now();
389 if now.duration_since(self.start) > self.ready_after {
390 Poll::Ready(())
391 } else {
392 cx.waker().wake_by_ref();
393 Poll::Pending
394 }
395 }
396 }
397
398 #[tokio::test]
399 async fn runner_should_respect_max_concurrent_executions() {
400 pause();
401
402 let count = Arc::new(Mutex::new(0));
403 let (mut sched_tx, sched_rx) = mpsc::unbounded();
404 let mut runner = Box::pin(
405 Runner::new(scheduler(sched_rx), 2, |_| {
406 let mut num = count.lock().unwrap();
407 *num += 1;
408 DurationalFuture::new(Duration::from_secs(2))
409 })
410 .for_each(|_| async {}),
411 );
412
413 sched_tx
414 .send(ScheduleRequest {
415 message: 1,
416 run_at: Instant::now(),
417 })
418 .await
419 .unwrap();
420 assert!(poll!(runner.as_mut()).is_pending());
421 sched_tx
422 .send(ScheduleRequest {
423 message: 2,
424 run_at: Instant::now(),
425 })
426 .await
427 .unwrap();
428 assert!(poll!(runner.as_mut()).is_pending());
429 sched_tx
430 .send(ScheduleRequest {
431 message: 3,
432 run_at: Instant::now(),
433 })
434 .await
435 .unwrap();
436 assert!(poll!(runner.as_mut()).is_pending());
437 assert_eq!(*count.lock().unwrap(), 2);
439
440 advance(Duration::from_secs(3)).await;
441 assert!(poll!(runner.as_mut()).is_pending());
442 assert_eq!(*count.lock().unwrap(), 3);
444 advance(Duration::from_secs(3)).await;
445 assert!(poll!(runner.as_mut()).is_pending());
446
447 sched_tx
449 .send(ScheduleRequest {
450 message: 3,
451 run_at: Instant::now(),
452 })
453 .await
454 .unwrap();
455 advance(Duration::from_secs(3)).await;
456 assert!(poll!(runner.as_mut()).is_pending());
457 assert_eq!(*count.lock().unwrap(), 4);
458
459 let (mut sched_tx, sched_rx) = mpsc::unbounded();
460 let mut runner = Box::pin(
461 Runner::new(scheduler(sched_rx), 1, |_| {
462 DurationalFuture::new(Duration::from_secs(2))
463 })
464 .for_each(|_| async {}),
465 );
466
467 sched_tx
468 .send(ScheduleRequest {
469 message: 1,
470 run_at: Instant::now(),
471 })
472 .await
473 .unwrap();
474 assert!(poll!(runner.as_mut()).is_pending());
475
476 drop(sched_tx);
479 assert_eq!(poll!(runner.as_mut()), Poll::Pending);
480 }
481}