mz_persist/
retry.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Retry utilities.
11//!
12//! TODO: The structure of this intentionally mirrors [mz_ore::retry], so that
13//! it could be merged in if anyone were so inclined.
14
15use std::time::{Duration, SystemTime, UNIX_EPOCH};
16
17use rand::prelude::SmallRng;
18use rand::{Rng, SeedableRng};
19
20/// Configures a retry operation.
21#[derive(Debug, Clone, PartialEq)]
22pub struct Retry {
23    /// An initial fixed sleep, before the exponential backoff.
24    ///
25    /// This is skipped if set to [Duration::ZERO].
26    pub fixed_sleep: Duration,
27    /// The initial backoff for the exponential backoff retries.
28    pub initial_backoff: Duration,
29    /// The backoff multiplier.
30    pub multiplier: u32,
31    /// Clamps the maximum backoff for the retry operation.
32    pub clamp_backoff: Duration,
33    /// A seed for the random jitter.
34    pub seed: u64,
35}
36
37impl Retry {
38    /// The default retry configuration for persist.
39    ///
40    /// Uses the given SystemTime to initialize the seed for random jitter.
41    pub fn persist_defaults(now: SystemTime) -> Self {
42        Retry {
43            fixed_sleep: Duration::ZERO,
44            // Chosen to meet the following arbitrary criteria: a power of two
45            // that's close to the AWS Aurora latency of 6ms.
46            initial_backoff: Duration::from_millis(4),
47            multiplier: 2,
48            // Chosen to meet the following arbitrary criteria: between 10s and
49            // 60s.
50            clamp_backoff: Duration::from_secs(16),
51            seed: now
52                .duration_since(UNIX_EPOCH)
53                .map_or(0, |x| u64::from(x.subsec_nanos())),
54        }
55    }
56
57    /// Convert into [`RetryStream`]
58    pub fn into_retry_stream(self) -> RetryStream {
59        let rng = SmallRng::seed_from_u64(self.seed);
60        let backoff = (self.fixed_sleep == Duration::ZERO).then_some(self.initial_backoff);
61        RetryStream {
62            cfg: self,
63            rng,
64            attempt: 0,
65            backoff,
66        }
67    }
68}
69
70/// A series of exponential, jittered, clamped sleeps.
71#[derive(Debug)]
72pub struct RetryStream {
73    cfg: Retry,
74    rng: SmallRng,
75    attempt: usize,
76    // None if the next sleep is `cfg.fixed_sleep`.
77    backoff: Option<Duration>,
78}
79
80impl RetryStream {
81    /// How many times [Self::sleep] has been called.
82    pub fn attempt(&self) -> usize {
83        self.attempt
84    }
85
86    /// The next sleep (without jitter for easy printing in logs).
87    pub fn next_sleep(&self) -> Duration {
88        self.backoff.unwrap_or(self.cfg.fixed_sleep)
89    }
90
91    /// Executes the next sleep in the series.
92    ///
93    /// This isn't cancel-safe, so it consumes and returns self, to prevent
94    /// accidental mis-use.
95    pub async fn sleep(mut self) -> Self {
96        // Should the jitter be configurable?
97        let jitter = self.rng.gen_range(0.9..=1.1);
98        let sleep = self.next_sleep().mul_f64(jitter);
99        tokio::time::sleep(sleep).await;
100        self.advance()
101    }
102
103    // Only exposed for testing.
104    fn advance(mut self) -> Self {
105        self.attempt += 1;
106        self.backoff = Some(match self.backoff {
107            None => self.cfg.initial_backoff,
108            Some(x) => std::cmp::min(x * self.cfg.multiplier, self.cfg.clamp_backoff),
109        });
110        self
111    }
112}
113
114#[cfg(test)]
115mod tests {
116    use super::*;
117
118    #[mz_ore::test]
119    fn retry_stream() {
120        #[track_caller]
121        fn testcase(r: Retry, expected_sleep_ms: Vec<u64>) {
122            let mut r = r.into_retry_stream();
123            for expected_sleep_ms in expected_sleep_ms {
124                let expected = Duration::from_millis(expected_sleep_ms);
125                let actual = r.next_sleep();
126                assert_eq!(actual, expected);
127                r = r.advance();
128            }
129        }
130
131        testcase(
132            Retry {
133                fixed_sleep: Duration::ZERO,
134                initial_backoff: Duration::from_millis(1_200),
135                multiplier: 2,
136                clamp_backoff: Duration::from_secs(16),
137                seed: 0,
138            },
139            vec![1_200, 2_400, 4_800, 9_600, 16_000, 16_000],
140        );
141        testcase(
142            Retry {
143                fixed_sleep: Duration::from_millis(1_200),
144                initial_backoff: Duration::from_millis(100),
145                multiplier: 2,
146                clamp_backoff: Duration::from_secs(16),
147                seed: 0,
148            },
149            vec![
150                1_200, 100, 200, 400, 800, 1_600, 3_200, 6_400, 12_800, 16_000, 16_000,
151            ],
152        );
153    }
154}