mz_timestamp_oracle/
retry.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Retry utilities.
11//!
12//! TODO: The structure of this intentionally mirrors [mz_ore::retry] (and
13//! `mz_persist::retry`, from which it is copied), so that it could be merged in
14//! if anyone were so inclined.
15
16use std::time::{Duration, SystemTime, UNIX_EPOCH};
17
18use rand::prelude::SmallRng;
19use rand::{Rng, SeedableRng};
20
21/// Configures a retry operation.
22#[derive(Debug, Clone, PartialEq)]
23pub struct Retry {
24    /// The initial backoff for the retry operation.
25    pub initial_backoff: Duration,
26    /// The backoff multiplier.
27    pub multiplier: u32,
28    /// Clamps the maximum backoff for the retry operation.
29    pub clamp_backoff: Duration,
30    /// A seed for the random jitter.
31    pub seed: u64,
32}
33
34impl Retry {
35    /// The default retry configuration for timestamp oracles.
36    ///
37    /// Uses the given SystemTime to initialize the seed for random jitter.
38    pub fn oracle_defaults(now: SystemTime) -> Self {
39        Retry {
40            // Chosen to meet the following arbitrary criteria: a power of two
41            // that's close to the AWS Aurora latency of 6ms.
42            initial_backoff: Duration::from_millis(4),
43            multiplier: 2,
44            // Chosen to meet the following arbitrary criteria: between 10s and
45            // 60s.
46            clamp_backoff: Duration::from_secs(16),
47            seed: now
48                .duration_since(UNIX_EPOCH)
49                .map_or(0, |x| u64::from(x.subsec_nanos())),
50        }
51    }
52
53    /// Convert into [`RetryStream`]
54    pub fn into_retry_stream(self) -> RetryStream {
55        let rng = SmallRng::seed_from_u64(self.seed);
56        let backoff = self.initial_backoff;
57        RetryStream {
58            cfg: self,
59            rng,
60            attempt: 0,
61            backoff,
62        }
63    }
64}
65
66/// A series of exponential, jittered, clamped sleeps.
67#[derive(Debug)]
68pub struct RetryStream {
69    cfg: Retry,
70    rng: SmallRng,
71    attempt: usize,
72    backoff: Duration,
73}
74
75impl RetryStream {
76    /// How many times [Self::sleep] has been called.
77    pub fn attempt(&self) -> usize {
78        self.attempt
79    }
80
81    /// The next sleep (without jitter for easy printing in logs).
82    pub fn next_sleep(&self) -> Duration {
83        self.backoff
84    }
85
86    /// Executes the next sleep in the series.
87    ///
88    /// This isn't cancel-safe, so it consumes and returns self, to prevent
89    /// accidental mis-use.
90    pub async fn sleep(mut self) -> Self {
91        // Should the jitter be configurable?
92        let jitter = self.rng.gen_range(0.9..=1.1);
93        let sleep = self.backoff.mul_f64(jitter);
94        tokio::time::sleep(sleep).await;
95        self.attempt += 1;
96        self.backoff = std::cmp::min(self.backoff * self.cfg.multiplier, self.cfg.clamp_backoff);
97        self
98    }
99}