governor/state/
in_memory.rs1use crate::nanos::Nanos;
2use crate::state::{NotKeyed, StateStore};
3use core::fmt;
4use core::fmt::Debug;
5use core::num::NonZeroU64;
6use core::sync::atomic::Ordering;
7use core::time::Duration;
8
9use portable_atomic::AtomicU64;
10
11#[derive(Default)]
20pub struct InMemoryState(AtomicU64);
21
22impl InMemoryState {
23 pub(crate) fn measure_and_replace_one<T, F, E>(&self, mut f: F) -> Result<T, E>
24 where
25 F: FnMut(Option<Nanos>) -> Result<(T, Nanos), E>,
26 {
27 let mut prev = self.0.load(Ordering::Acquire);
28 let mut decision = f(NonZeroU64::new(prev).map(|n| n.get().into()));
29 while let Ok((result, new_data)) = decision {
30 match self.0.compare_exchange_weak(
31 prev,
32 new_data.into(),
33 Ordering::Release,
34 Ordering::Relaxed,
35 ) {
36 Ok(_) => return Ok(result),
37 Err(next_prev) => prev = next_prev,
38 }
39 decision = f(NonZeroU64::new(prev).map(|n| n.get().into()));
40 }
41 decision.map(|(result, _)| result)
44 }
45
46 pub(crate) fn is_older_than(&self, nanos: Nanos) -> bool {
47 self.0.load(Ordering::Relaxed) <= nanos.into()
48 }
49}
50
51impl StateStore for InMemoryState {
53 type Key = NotKeyed;
54
55 fn measure_and_replace<T, F, E>(&self, _key: &Self::Key, f: F) -> Result<T, E>
56 where
57 F: Fn(Option<Nanos>) -> Result<(T, Nanos), E>,
58 {
59 self.measure_and_replace_one(f)
60 }
61}
62
63impl Debug for InMemoryState {
64 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
65 let d = Duration::from_nanos(self.0.load(Ordering::Relaxed));
66 write!(f, "InMemoryState({d:?})")
67 }
68}
69
70#[cfg(test)]
71#[allow(clippy::needless_collect)]
72mod test {
73
74 use assertables::assert_gt;
75
76 use super::*;
77
78 #[cfg(feature = "std")]
79 fn try_triggering_collisions(n_threads: u64, tries_per_thread: u64) -> (u64, u64) {
80 use std::sync::Arc;
81 use std::thread;
82
83 let mut state = Arc::new(InMemoryState(AtomicU64::new(0)));
84 let threads: Vec<thread::JoinHandle<_>> = (0..n_threads)
85 .map(|_| {
86 thread::spawn({
87 let state = Arc::clone(&state);
88 move || {
89 let mut hits = 0;
90 for _ in 0..tries_per_thread {
91 assert!(state
92 .measure_and_replace_one(|old| {
93 hits += 1;
94 Ok::<((), Nanos), ()>((
95 (),
96 Nanos::from(old.map(Nanos::as_u64).unwrap_or(0) + 1),
97 ))
98 })
99 .is_ok());
100 }
101 hits
102 }
103 })
104 })
105 .collect();
106 let hits: u64 = threads.into_iter().map(|t| t.join().unwrap()).sum();
107 let value = Arc::get_mut(&mut state).unwrap().0.get_mut();
108 (*value, hits)
109 }
110
111 #[cfg(feature = "std")]
112 #[test]
113 fn stresstest_collisions() {
116 use assertables::assert_gt;
117
118 const THREADS: u64 = 8;
119 const MAX_TRIES: u64 = 20_000_000;
120 let (mut value, mut hits) = (0, 0);
121 for tries in (0..MAX_TRIES).step_by((MAX_TRIES / 100) as usize) {
122 let attempt = try_triggering_collisions(THREADS, tries);
123 value = attempt.0;
124 hits = attempt.1;
125 assert_eq!(value, tries * THREADS);
126 if hits > value {
127 break;
128 }
129 println!("Didn't trigger a collision in {tries} iterations");
130 }
131 assert_gt!(hits, value);
132 }
133
134 #[test]
135 fn in_memory_state_impls() {
136 use alloc::format;
137 let state = InMemoryState(AtomicU64::new(0));
138 assert_gt!(format!("{state:?}").len(), 0);
139 }
140}