1use std::time::{Duration, SystemTime, UNIX_EPOCH};
16
17use rand::prelude::SmallRng;
18use rand::{Rng, SeedableRng};
19
20#[derive(Debug, Clone, PartialEq)]
22pub struct Retry {
23 pub fixed_sleep: Duration,
27 pub initial_backoff: Duration,
29 pub multiplier: u32,
31 pub clamp_backoff: Duration,
33 pub seed: u64,
35}
36
37impl Retry {
38 pub fn persist_defaults(now: SystemTime) -> Self {
42 Retry {
43 fixed_sleep: Duration::ZERO,
44 initial_backoff: Duration::from_millis(4),
47 multiplier: 2,
48 clamp_backoff: Duration::from_secs(16),
51 seed: now
52 .duration_since(UNIX_EPOCH)
53 .map_or(0, |x| u64::from(x.subsec_nanos())),
54 }
55 }
56
57 pub fn into_retry_stream(self) -> RetryStream {
59 let rng = SmallRng::seed_from_u64(self.seed);
60 let backoff = (self.fixed_sleep == Duration::ZERO).then_some(self.initial_backoff);
61 RetryStream {
62 cfg: self,
63 rng,
64 attempt: 0,
65 backoff,
66 }
67 }
68}
69
70#[derive(Debug)]
72pub struct RetryStream {
73 cfg: Retry,
74 rng: SmallRng,
75 attempt: usize,
76 backoff: Option<Duration>,
78}
79
80impl RetryStream {
81 pub fn attempt(&self) -> usize {
83 self.attempt
84 }
85
86 pub fn next_sleep(&self) -> Duration {
88 self.backoff.unwrap_or(self.cfg.fixed_sleep)
89 }
90
91 pub async fn sleep(mut self) -> Self {
96 let jitter = self.rng.gen_range(0.9..=1.1);
98 let sleep = self.next_sleep().mul_f64(jitter);
99 tokio::time::sleep(sleep).await;
100 self.advance()
101 }
102
103 fn advance(mut self) -> Self {
105 self.attempt += 1;
106 self.backoff = Some(match self.backoff {
107 None => self.cfg.initial_backoff,
108 Some(x) => std::cmp::min(x * self.cfg.multiplier, self.cfg.clamp_backoff),
109 });
110 self
111 }
112}
113
114#[cfg(test)]
115mod tests {
116 use super::*;
117
118 #[mz_ore::test]
119 fn retry_stream() {
120 #[track_caller]
121 fn testcase(r: Retry, expected_sleep_ms: Vec<u64>) {
122 let mut r = r.into_retry_stream();
123 for expected_sleep_ms in expected_sleep_ms {
124 let expected = Duration::from_millis(expected_sleep_ms);
125 let actual = r.next_sleep();
126 assert_eq!(actual, expected);
127 r = r.advance();
128 }
129 }
130
131 testcase(
132 Retry {
133 fixed_sleep: Duration::ZERO,
134 initial_backoff: Duration::from_millis(1_200),
135 multiplier: 2,
136 clamp_backoff: Duration::from_secs(16),
137 seed: 0,
138 },
139 vec![1_200, 2_400, 4_800, 9_600, 16_000, 16_000],
140 );
141 testcase(
142 Retry {
143 fixed_sleep: Duration::from_millis(1_200),
144 initial_backoff: Duration::from_millis(100),
145 multiplier: 2,
146 clamp_backoff: Duration::from_secs(16),
147 seed: 0,
148 },
149 vec![
150 1_200, 100, 200, 400, 800, 1_600, 3_200, 6_400, 12_800, 16_000, 16_000,
151 ],
152 );
153 }
154}