governor/state/
in_memory.rs

1use crate::nanos::Nanos;
2use crate::state::{NotKeyed, StateStore};
3use core::fmt;
4use core::fmt::Debug;
5use core::num::NonZeroU64;
6use core::sync::atomic::Ordering;
7use core::time::Duration;
8
9use portable_atomic::AtomicU64;
10
11/// An in-memory representation of a GCRA's rate-limiting state.
12///
13/// Implemented using [`AtomicU64`] operations, this state representation can be used to
14/// construct rate limiting states for other in-memory states: e.g., this crate uses
15/// `InMemoryState` as the states it tracks in the keyed rate limiters it implements.
16///
17/// Internally, the number tracked here is the theoretical arrival time (a GCRA term) in number of
18/// nanoseconds since the rate limiter was created.
19#[derive(Default)]
20pub struct InMemoryState(AtomicU64);
21
22impl InMemoryState {
23    pub(crate) fn measure_and_replace_one<T, F, E>(&self, mut f: F) -> Result<T, E>
24    where
25        F: FnMut(Option<Nanos>) -> Result<(T, Nanos), E>,
26    {
27        let mut prev = self.0.load(Ordering::Acquire);
28        let mut decision = f(NonZeroU64::new(prev).map(|n| n.get().into()));
29        while let Ok((result, new_data)) = decision {
30            match self.0.compare_exchange_weak(
31                prev,
32                new_data.into(),
33                Ordering::Release,
34                Ordering::Relaxed,
35            ) {
36                Ok(_) => return Ok(result),
37                Err(next_prev) => prev = next_prev,
38            }
39            decision = f(NonZeroU64::new(prev).map(|n| n.get().into()));
40        }
41        // This map shouldn't be needed, as we only get here in the error case, but the compiler
42        // can't see it.
43        decision.map(|(result, _)| result)
44    }
45
46    pub(crate) fn is_older_than(&self, nanos: Nanos) -> bool {
47        self.0.load(Ordering::Relaxed) <= nanos.into()
48    }
49}
50
51/// The InMemoryState is the canonical "direct" state store.
52impl StateStore for InMemoryState {
53    type Key = NotKeyed;
54
55    fn measure_and_replace<T, F, E>(&self, _key: &Self::Key, f: F) -> Result<T, E>
56    where
57        F: Fn(Option<Nanos>) -> Result<(T, Nanos), E>,
58    {
59        self.measure_and_replace_one(f)
60    }
61}
62
63impl Debug for InMemoryState {
64    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
65        let d = Duration::from_nanos(self.0.load(Ordering::Relaxed));
66        write!(f, "InMemoryState({d:?})")
67    }
68}
69
70#[cfg(test)]
71#[allow(clippy::needless_collect)]
72mod test {
73
74    use assertables::assert_gt;
75
76    use super::*;
77
78    #[cfg(feature = "std")]
79    fn try_triggering_collisions(n_threads: u64, tries_per_thread: u64) -> (u64, u64) {
80        use std::sync::Arc;
81        use std::thread;
82
83        let mut state = Arc::new(InMemoryState(AtomicU64::new(0)));
84        let threads: Vec<thread::JoinHandle<_>> = (0..n_threads)
85            .map(|_| {
86                thread::spawn({
87                    let state = Arc::clone(&state);
88                    move || {
89                        let mut hits = 0;
90                        for _ in 0..tries_per_thread {
91                            assert!(state
92                                .measure_and_replace_one(|old| {
93                                    hits += 1;
94                                    Ok::<((), Nanos), ()>((
95                                        (),
96                                        Nanos::from(old.map(Nanos::as_u64).unwrap_or(0) + 1),
97                                    ))
98                                })
99                                .is_ok());
100                        }
101                        hits
102                    }
103                })
104            })
105            .collect();
106        let hits: u64 = threads.into_iter().map(|t| t.join().unwrap()).sum();
107        let value = Arc::get_mut(&mut state).unwrap().0.get_mut();
108        (*value, hits)
109    }
110
111    #[cfg(feature = "std")]
112    #[test]
113    /// Checks that many threads running simultaneously will collide,
114    /// but result in the correct number being recorded in the state.
115    fn stresstest_collisions() {
116        use assertables::assert_gt;
117
118        const THREADS: u64 = 8;
119        const MAX_TRIES: u64 = 20_000_000;
120        let (mut value, mut hits) = (0, 0);
121        for tries in (0..MAX_TRIES).step_by((MAX_TRIES / 100) as usize) {
122            let attempt = try_triggering_collisions(THREADS, tries);
123            value = attempt.0;
124            hits = attempt.1;
125            assert_eq!(value, tries * THREADS);
126            if hits > value {
127                break;
128            }
129            println!("Didn't trigger a collision in {tries} iterations");
130        }
131        assert_gt!(hits, value);
132    }
133
134    #[test]
135    fn in_memory_state_impls() {
136        use alloc::format;
137        let state = InMemoryState(AtomicU64::new(0));
138        assert_gt!(format!("{state:?}").len(), 0);
139    }
140}