governor/state/keyed/
future.rs

1use std::prelude::v1::*;
2
3use crate::{
4    clock, errors::InsufficientCapacity, middleware::RateLimitingMiddleware,
5    state::keyed::KeyedStateStore, Jitter, NotUntil, RateLimiter,
6};
7use futures_timer::Delay;
8use std::{hash::Hash, num::NonZeroU32};
9
10#[cfg(feature = "std")]
11/// # Keyed rate limiters - `async`/`await`
12impl<K, S, C, MW> RateLimiter<K, S, C, MW>
13where
14    K: Hash + Eq + Clone,
15    S: KeyedStateStore<K>,
16    C: clock::ReasonablyRealtime,
17    MW: RateLimitingMiddleware<C::Instant, NegativeOutcome = NotUntil<C::Instant>>,
18{
19    /// Asynchronously resolves as soon as the rate limiter allows it.
20    ///
21    /// When polled, the returned future either resolves immediately (in the case where the rate
22    /// limiter allows it), or else triggers an asynchronous delay, after which the rate limiter
23    /// is polled again. This means that the future might resolve at some later time (depending
24    /// on what other measurements are made on the rate limiter).
25    ///
26    /// If multiple futures are dispatched against the rate limiter, it is advisable to use
27    /// [`until_ready_with_jitter`](#method.until_ready_with_jitter), to avoid thundering herds.
28    pub async fn until_key_ready(&self, key: &K) -> MW::PositiveOutcome {
29        self.until_key_ready_with_jitter(key, Jitter::NONE).await
30    }
31
32    /// Asynchronously resolves as soon as the rate limiter allows it, with a randomized wait
33    /// period.
34    ///
35    /// When polled, the returned future either resolves immediately (in the case where the rate
36    /// limiter allows it), or else triggers an asynchronous delay, after which the rate limiter
37    /// is polled again. This means that the future might resolve at some later time (depending
38    /// on what other measurements are made on the rate limiter).
39    ///
40    /// This method allows for a randomized additional delay between polls of the rate limiter,
41    /// which can help reduce the likelihood of thundering herd effects if multiple tasks try to
42    /// wait on the same rate limiter.
43    pub async fn until_key_ready_with_jitter(
44        &self,
45        key: &K,
46        jitter: Jitter,
47    ) -> MW::PositiveOutcome {
48        loop {
49            match self.check_key(key) {
50                Ok(x) => {
51                    return x;
52                }
53                Err(negative) => {
54                    let delay = Delay::new(jitter + negative.wait_time_from(self.clock.now()));
55                    delay.await;
56                }
57            }
58        }
59    }
60
61    /// Asynchronously resolves as soon as the rate limiter allows it.
62    ///
63    /// This is similar to `until_key_ready` except it waits for an abitrary number
64    /// of `n` cells to be available.
65    ///
66    /// Returns `InsufficientCapacity` if the `n` provided exceeds the maximum
67    /// capacity of the rate limiter.
68    pub async fn until_key_n_ready(
69        &self,
70        key: &K,
71        n: NonZeroU32,
72    ) -> Result<MW::PositiveOutcome, InsufficientCapacity> {
73        self.until_key_n_ready_with_jitter(key, n, Jitter::NONE)
74            .await
75    }
76
77    /// Asynchronously resolves as soon as the rate limiter allows it, with a
78    /// randomized wait period.
79    ///
80    /// This is similar to `until_key_ready_with_jitter` except it waits for an
81    /// abitrary number of `n` cells to be available.
82    ///
83    /// Returns `InsufficientCapacity` if the `n` provided exceeds the maximum
84    /// capacity of the rate limiter.
85    pub async fn until_key_n_ready_with_jitter(
86        &self,
87        key: &K,
88        n: NonZeroU32,
89        jitter: Jitter,
90    ) -> Result<MW::PositiveOutcome, InsufficientCapacity> {
91        loop {
92            match self.check_key_n(key, n)? {
93                Ok(x) => {
94                    return Ok(x);
95                }
96                Err(negative) => {
97                    let delay = Delay::new(jitter + negative.wait_time_from(self.clock.now()));
98                    delay.await;
99                }
100            }
101        }
102    }
103}