governor/state/direct/future.rs
1use std::num::NonZeroU32;
2
3use super::RateLimiter;
4use crate::{
5 clock,
6 errors::InsufficientCapacity,
7 middleware::RateLimitingMiddleware,
8 state::{DirectStateStore, NotKeyed},
9 Jitter, NotUntil,
10};
11use futures_timer::Delay;
12
13#[cfg(feature = "std")]
14/// # Direct rate limiters - `async`/`await`
15impl<S, C, MW> RateLimiter<NotKeyed, S, C, MW>
16where
17 S: DirectStateStore,
18 C: clock::ReasonablyRealtime,
19 MW: RateLimitingMiddleware<C::Instant, NegativeOutcome = NotUntil<C::Instant>>,
20{
21 /// Asynchronously resolves as soon as the rate limiter allows it.
22 ///
23 /// When polled, the returned future either resolves immediately (in the case where the rate
24 /// limiter allows it), or else triggers an asynchronous delay, after which the rate limiter
25 /// is polled again. This means that the future might resolve at some later time (depending
26 /// on what other measurements are made on the rate limiter).
27 ///
28 /// If multiple futures are dispatched against the rate limiter, it is advisable to use
29 /// [`until_ready_with_jitter`](#method.until_ready_with_jitter), to avoid thundering herds.
30 pub async fn until_ready(&self) -> MW::PositiveOutcome {
31 self.until_ready_with_jitter(Jitter::NONE).await
32 }
33
34 /// Asynchronously resolves as soon as the rate limiter allows it, with a randomized wait
35 /// period.
36 ///
37 /// When polled, the returned future either resolves immediately (in the case where the rate
38 /// limiter allows it), or else triggers an asynchronous delay, after which the rate limiter
39 /// is polled again. This means that the future might resolve at some later time (depending
40 /// on what other measurements are made on the rate limiter).
41 ///
42 /// This method allows for a randomized additional delay between polls of the rate limiter,
43 /// which can help reduce the likelihood of thundering herd effects if multiple tasks try to
44 /// wait on the same rate limiter.
45 pub async fn until_ready_with_jitter(&self, jitter: Jitter) -> MW::PositiveOutcome {
46 loop {
47 match self.check() {
48 Ok(x) => {
49 return x;
50 }
51 Err(negative) => {
52 let delay = Delay::new(jitter + negative.wait_time_from(self.clock.now()));
53 delay.await;
54 }
55 }
56 }
57 }
58
59 /// Asynchronously resolves as soon as the rate limiter allows it.
60 ///
61 /// This is similar to `until_ready` except it waits for an abitrary number
62 /// of `n` cells to be available.
63 ///
64 /// Returns `InsufficientCapacity` if the `n` provided exceeds the maximum
65 /// capacity of the rate limiter.
66 pub async fn until_n_ready(
67 &self,
68 n: NonZeroU32,
69 ) -> Result<MW::PositiveOutcome, InsufficientCapacity> {
70 self.until_n_ready_with_jitter(n, Jitter::NONE).await
71 }
72
73 /// Asynchronously resolves as soon as the rate limiter allows it, with a
74 /// randomized wait period.
75 ///
76 /// This is similar to `until_ready_with_jitter` except it waits for an
77 /// abitrary number of `n` cells to be available.
78 ///
79 /// Returns `InsufficientCapacity` if the `n` provided exceeds the maximum
80 /// capacity of the rate limiter.
81 pub async fn until_n_ready_with_jitter(
82 &self,
83 n: NonZeroU32,
84 jitter: Jitter,
85 ) -> Result<MW::PositiveOutcome, InsufficientCapacity> {
86 loop {
87 match self.check_n(n)? {
88 Ok(x) => {
89 return Ok(x);
90 }
91 Err(negative) => {
92 let delay = Delay::new(jitter + negative.wait_time_from(self.clock.now()));
93 delay.await;
94 }
95 }
96 }
97 }
98}
99
100#[cfg(test)]
101mod test {
102 use all_asserts::assert_gt;
103
104 use super::*;
105
106 #[test]
107 fn insufficient_capacity_impl_coverage() {
108 let i = InsufficientCapacity(1);
109 assert_eq!(i.0, i.clone().0);
110 assert_gt!(format!("{}", i).len(), 0);
111 }
112}