1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! Retry utilities.
//!
//! TODO: The structure of this intentionally mirrors [mz_ore::retry], so that
//! it could be merged in if anyone were so inclined.

use std::time::{Duration, SystemTime, UNIX_EPOCH};

use rand::prelude::SmallRng;
use rand::{Rng, SeedableRng};

/// Configures a retry operation.
#[derive(Debug, Clone, PartialEq)]
pub struct Retry {
    /// An initial fixed sleep, before the exponential backoff.
    ///
    /// This is skipped if set to [Duration::ZERO].
    pub fixed_sleep: Duration,
    /// The initial backoff for the exponential backoff retries.
    pub initial_backoff: Duration,
    /// The backoff multiplier.
    pub multiplier: u32,
    /// Clamps the maximum backoff for the retry operation.
    pub clamp_backoff: Duration,
    /// A seed for the random jitter.
    pub seed: u64,
}

impl Retry {
    /// The default retry configuration for persist.
    ///
    /// Uses the given SystemTime to initialize the seed for random jitter.
    pub fn persist_defaults(now: SystemTime) -> Self {
        Retry {
            fixed_sleep: Duration::ZERO,
            // Chosen to meet the following arbitrary criteria: a power of two
            // that's close to the AWS Aurora latency of 6ms.
            initial_backoff: Duration::from_millis(4),
            multiplier: 2,
            // Chosen to meet the following arbitrary criteria: between 10s and
            // 60s.
            clamp_backoff: Duration::from_secs(16),
            seed: now
                .duration_since(UNIX_EPOCH)
                .map_or(0, |x| u64::from(x.subsec_nanos())),
        }
    }

    /// Convert into [`RetryStream`]
    pub fn into_retry_stream(self) -> RetryStream {
        let rng = SmallRng::seed_from_u64(self.seed);
        let backoff = (self.fixed_sleep == Duration::ZERO).then_some(self.initial_backoff);
        RetryStream {
            cfg: self,
            rng,
            attempt: 0,
            backoff,
        }
    }
}

/// A series of exponential, jittered, clamped sleeps.
#[derive(Debug)]
pub struct RetryStream {
    cfg: Retry,
    rng: SmallRng,
    attempt: usize,
    // None if the next sleep is `cfg.fixed_sleep`.
    backoff: Option<Duration>,
}

impl RetryStream {
    /// How many times [Self::sleep] has been called.
    pub fn attempt(&self) -> usize {
        self.attempt
    }

    /// The next sleep (without jitter for easy printing in logs).
    pub fn next_sleep(&self) -> Duration {
        self.backoff.unwrap_or(self.cfg.fixed_sleep)
    }

    /// Executes the next sleep in the series.
    ///
    /// This isn't cancel-safe, so it consumes and returns self, to prevent
    /// accidental mis-use.
    pub async fn sleep(mut self) -> Self {
        // Should the jitter be configurable?
        let jitter = self.rng.gen_range(0.9..=1.1);
        let sleep = self.next_sleep().mul_f64(jitter);
        tokio::time::sleep(sleep).await;
        self.advance()
    }

    // Only exposed for testing.
    fn advance(mut self) -> Self {
        self.attempt += 1;
        self.backoff = Some(match self.backoff {
            None => self.cfg.initial_backoff,
            Some(x) => std::cmp::min(x * self.cfg.multiplier, self.cfg.clamp_backoff),
        });
        self
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[mz_ore::test]
    fn retry_stream() {
        #[track_caller]
        fn testcase(r: Retry, expected_sleep_ms: Vec<u64>) {
            let mut r = r.into_retry_stream();
            for expected_sleep_ms in expected_sleep_ms {
                let expected = Duration::from_millis(expected_sleep_ms);
                let actual = r.next_sleep();
                assert_eq!(actual, expected);
                r = r.advance();
            }
        }

        testcase(
            Retry {
                fixed_sleep: Duration::ZERO,
                initial_backoff: Duration::from_millis(1_200),
                multiplier: 2,
                clamp_backoff: Duration::from_secs(16),
                seed: 0,
            },
            vec![1_200, 2_400, 4_800, 9_600, 16_000, 16_000],
        );
        testcase(
            Retry {
                fixed_sleep: Duration::from_millis(1_200),
                initial_backoff: Duration::from_millis(100),
                multiplier: 2,
                clamp_backoff: Duration::from_secs(16),
                seed: 0,
            },
            vec![
                1_200, 100, 200, 400, 800, 1_600, 3_200, 6_400, 12_800, 16_000, 16_000,
            ],
        );
    }
}