1//! A retry "budget" for allowing only a certain amount of retries over time.
23use std::{
4 fmt,
5 sync::{
6 atomic::{AtomicIsize, Ordering},
7 Mutex,
8 },
9 time::Duration,
10};
11use tokio::time::Instant;
1213/// Represents a "budget" for retrying requests.
14///
15/// This is useful for limiting the amount of retries a service can perform
16/// over a period of time, or per a certain number of requests attempted.
17pub struct Budget {
18 bucket: Bucket,
19 deposit_amount: isize,
20 withdraw_amount: isize,
21}
2223/// Indicates that it is not currently allowed to "withdraw" another retry
24/// from the [`Budget`].
25#[derive(Debug)]
26pub struct Overdrawn {
27 _inner: (),
28}
2930#[derive(Debug)]
31struct Bucket {
32 generation: Mutex<Generation>,
33/// Initial budget allowed for every second.
34reserve: isize,
35/// Slots of a the TTL divided evenly.
36slots: Box<[AtomicIsize]>,
37/// The amount of time represented by each slot.
38window: Duration,
39/// The changers for the current slot to be commited
40 /// after the slot expires.
41writer: AtomicIsize,
42}
4344#[derive(Debug)]
45struct Generation {
46/// Slot index of the last generation.
47index: usize,
48/// The timestamp since the last generation expired.
49time: Instant,
50}
5152// ===== impl Budget =====
5354impl Budget {
55/// Create a [`Budget`] that allows for a certain percent of the total
56 /// requests to be retried.
57 ///
58 /// - The `ttl` is the duration of how long a single `deposit` should be
59 /// considered. Must be between 1 and 60 seconds.
60 /// - The `min_per_sec` is the minimum rate of retries allowed to accomodate
61 /// clients that have just started issuing requests, or clients that do
62 /// not issue many requests per window.
63 /// - The `retry_percent` is the percentage of calls to `deposit` that can
64 /// be retried. This is in addition to any retries allowed for via
65 /// `min_per_sec`. Must be between 0 and 1000.
66 ///
67 /// As an example, if `0.1` is used, then for every 10 calls to `deposit`,
68 /// 1 retry will be allowed. If `2.0` is used, then every `deposit`
69 /// allows for 2 retries.
70pub fn new(ttl: Duration, min_per_sec: u32, retry_percent: f32) -> Self {
71// assertions taken from finagle
72assert!(ttl >= Duration::from_secs(1));
73assert!(ttl <= Duration::from_secs(60));
74assert!(retry_percent >= 0.0);
75assert!(retry_percent <= 1000.0);
76assert!(min_per_sec < ::std::i32::MAX as u32);
7778let (deposit_amount, withdraw_amount) = if retry_percent == 0.0 {
79// If there is no percent, then you gain nothing from deposits.
80 // Withdrawals can only be made against the reserve, over time.
81(0, 1)
82 } else if retry_percent <= 1.0 {
83 (1, (1.0 / retry_percent) as isize)
84 } else {
85// Support for when retry_percent is between 1.0 and 1000.0,
86 // meaning for every deposit D, D*retry_percent withdrawals
87 // can be made.
88(1000, (1000.0 / retry_percent) as isize)
89 };
90let reserve = (min_per_sec as isize)
91 .saturating_mul(ttl.as_secs() as isize) // ttl is between 1 and 60 seconds
92.saturating_mul(withdraw_amount);
9394// AtomicIsize isn't clone, so the slots need to be built in a loop...
95let windows = 10u32;
96let mut slots = Vec::with_capacity(windows as usize);
97for _ in 0..windows {
98 slots.push(AtomicIsize::new(0));
99 }
100101 Budget {
102 bucket: Bucket {
103 generation: Mutex::new(Generation {
104 index: 0,
105 time: Instant::now(),
106 }),
107 reserve,
108 slots: slots.into_boxed_slice(),
109 window: ttl / windows,
110 writer: AtomicIsize::new(0),
111 },
112 deposit_amount,
113 withdraw_amount,
114 }
115 }
116117/// Store a "deposit" in the budget, which will be used to permit future
118 /// withdrawals.
119pub fn deposit(&self) {
120self.bucket.put(self.deposit_amount);
121 }
122123/// Check whether there is enough "balance" in the budget to issue a new
124 /// retry.
125 ///
126 /// If there is not enough, an `Err(Overdrawn)` is returned.
127pub fn withdraw(&self) -> Result<(), Overdrawn> {
128if self.bucket.try_get(self.withdraw_amount) {
129Ok(())
130 } else {
131Err(Overdrawn { _inner: () })
132 }
133 }
134}
135136impl Default for Budget {
137fn default() -> Budget {
138 Budget::new(Duration::from_secs(10), 10, 0.2)
139 }
140}
141142impl fmt::Debug for Budget {
143fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
144 f.debug_struct("Budget")
145 .field("deposit", &self.deposit_amount)
146 .field("withdraw", &self.withdraw_amount)
147 .field("balance", &self.bucket.sum())
148 .finish()
149 }
150}
151152// ===== impl Bucket =====
153154impl Bucket {
155fn put(&self, amt: isize) {
156self.expire();
157self.writer.fetch_add(amt, Ordering::SeqCst);
158 }
159160fn try_get(&self, amt: isize) -> bool {
161debug_assert!(amt >= 0);
162163self.expire();
164165let sum = self.sum();
166if sum >= amt {
167self.writer.fetch_add(-amt, Ordering::SeqCst);
168true
169} else {
170false
171}
172 }
173174fn expire(&self) {
175let mut gen = self.generation.lock().expect("generation lock");
176177let now = Instant::now();
178let diff = now.saturating_duration_since(gen.time);
179if diff < self.window {
180// not expired yet
181return;
182 }
183184let to_commit = self.writer.swap(0, Ordering::SeqCst);
185self.slots[gen.index].store(to_commit, Ordering::SeqCst);
186187let mut diff = diff;
188let mut idx = (gen.index + 1) % self.slots.len();
189while diff > self.window {
190self.slots[idx].store(0, Ordering::SeqCst);
191 diff -= self.window;
192 idx = (idx + 1) % self.slots.len();
193 }
194195 gen.index = idx;
196 gen.time = now;
197 }
198199fn sum(&self) -> isize {
200let current = self.writer.load(Ordering::SeqCst);
201let windowed_sum: isize = self
202.slots
203 .iter()
204 .map(|slot| slot.load(Ordering::SeqCst))
205// fold() is used instead of sum() to determine overflow behavior
206.fold(0, isize::saturating_add);
207208 current
209 .saturating_add(windowed_sum)
210 .saturating_add(self.reserve)
211 }
212}
213214#[cfg(test)]
215mod tests {
216use super::*;
217use tokio::time;
218219#[test]
220fn empty() {
221let bgt = Budget::new(Duration::from_secs(1), 0, 1.0);
222 bgt.withdraw().unwrap_err();
223 }
224225#[tokio::test]
226async fn leaky() {
227 time::pause();
228229let bgt = Budget::new(Duration::from_secs(1), 0, 1.0);
230 bgt.deposit();
231232 time::advance(Duration::from_secs(3)).await;
233234 bgt.withdraw().unwrap_err();
235 }
236237#[tokio::test]
238async fn slots() {
239 time::pause();
240241let bgt = Budget::new(Duration::from_secs(1), 0, 0.5);
242 bgt.deposit();
243 bgt.deposit();
244 time::advance(Duration::from_millis(901)).await;
245// 900ms later, the deposit should still be valid
246bgt.withdraw().unwrap();
247248// blank slate
249time::advance(Duration::from_millis(2001)).await;
250251 bgt.deposit();
252 time::advance(Duration::from_millis(301)).await;
253 bgt.deposit();
254 time::advance(Duration::from_millis(801)).await;
255 bgt.deposit();
256257// the first deposit is expired, but the 2nd should still be valid,
258 // combining with the 3rd
259bgt.withdraw().unwrap();
260 }
261262#[tokio::test]
263async fn reserve() {
264let bgt = Budget::new(Duration::from_secs(1), 5, 1.0);
265 bgt.withdraw().unwrap();
266 bgt.withdraw().unwrap();
267 bgt.withdraw().unwrap();
268 bgt.withdraw().unwrap();
269 bgt.withdraw().unwrap();
270271 bgt.withdraw().unwrap_err();
272 }
273}