1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! Retry utilities.
//!
//! TODO: The structure of this intentionally mirrors [mz_ore::retry] (and
//! `mz_persist::retry`, from which it is copied), so that it could be merged in
//! if anyone were so inclined.

use std::time::{Duration, SystemTime, UNIX_EPOCH};

use rand::prelude::SmallRng;
use rand::{Rng, SeedableRng};

/// Configures a retry operation.
#[derive(Debug, Clone, PartialEq)]
pub struct Retry {
    /// The initial backoff for the retry operation.
    pub initial_backoff: Duration,
    /// The backoff multiplier.
    pub multiplier: u32,
    /// Clamps the maximum backoff for the retry operation.
    pub clamp_backoff: Duration,
    /// A seed for the random jitter.
    pub seed: u64,
}

impl Retry {
    /// The default retry configuration for timestamp oracles.
    ///
    /// Uses the given SystemTime to initialize the seed for random jitter.
    pub fn oracle_defaults(now: SystemTime) -> Self {
        Retry {
            // Chosen to meet the following arbitrary criteria: a power of two
            // that's close to the AWS Aurora latency of 6ms.
            initial_backoff: Duration::from_millis(4),
            multiplier: 2,
            // Chosen to meet the following arbitrary criteria: between 10s and
            // 60s.
            clamp_backoff: Duration::from_secs(16),
            seed: now
                .duration_since(UNIX_EPOCH)
                .map_or(0, |x| u64::from(x.subsec_nanos())),
        }
    }

    /// Convert into [`RetryStream`]
    pub fn into_retry_stream(self) -> RetryStream {
        let rng = SmallRng::seed_from_u64(self.seed);
        let backoff = self.initial_backoff;
        RetryStream {
            cfg: self,
            rng,
            attempt: 0,
            backoff,
        }
    }
}

/// A series of exponential, jittered, clamped sleeps.
#[derive(Debug)]
pub struct RetryStream {
    cfg: Retry,
    rng: SmallRng,
    attempt: usize,
    backoff: Duration,
}

impl RetryStream {
    /// How many times [Self::sleep] has been called.
    pub fn attempt(&self) -> usize {
        self.attempt
    }

    /// The next sleep (without jitter for easy printing in logs).
    pub fn next_sleep(&self) -> Duration {
        self.backoff
    }

    /// Executes the next sleep in the series.
    ///
    /// This isn't cancel-safe, so it consumes and returns self, to prevent
    /// accidental mis-use.
    pub async fn sleep(mut self) -> Self {
        // Should the jitter be configurable?
        let jitter = self.rng.gen_range(0.9..=1.1);
        let sleep = self.backoff.mul_f64(jitter);
        tokio::time::sleep(sleep).await;
        self.attempt += 1;
        self.backoff = std::cmp::min(self.backoff * self.cfg.multiplier, self.cfg.clamp_backoff);
        self
    }
}