mz_timestamp_oracle/retry.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Retry utilities.
11//!
12//! TODO: The structure of this intentionally mirrors [mz_ore::retry] (and
13//! `mz_persist::retry`, from which it is copied), so that it could be merged in
14//! if anyone were so inclined.
15
16use std::time::{Duration, SystemTime, UNIX_EPOCH};
17
18use rand::prelude::SmallRng;
19use rand::{Rng, SeedableRng};
20
21/// Configures a retry operation.
22#[derive(Debug, Clone, PartialEq)]
23pub struct Retry {
24 /// The initial backoff for the retry operation.
25 pub initial_backoff: Duration,
26 /// The backoff multiplier.
27 pub multiplier: u32,
28 /// Clamps the maximum backoff for the retry operation.
29 pub clamp_backoff: Duration,
30 /// A seed for the random jitter.
31 pub seed: u64,
32}
33
34impl Retry {
35 /// The default retry configuration for timestamp oracles.
36 ///
37 /// Uses the given SystemTime to initialize the seed for random jitter.
38 pub fn oracle_defaults(now: SystemTime) -> Self {
39 Retry {
40 // Chosen to meet the following arbitrary criteria: a power of two
41 // that's close to the AWS Aurora latency of 6ms.
42 initial_backoff: Duration::from_millis(4),
43 multiplier: 2,
44 // Chosen to meet the following arbitrary criteria: between 10s and
45 // 60s.
46 clamp_backoff: Duration::from_secs(16),
47 seed: now
48 .duration_since(UNIX_EPOCH)
49 .map_or(0, |x| u64::from(x.subsec_nanos())),
50 }
51 }
52
53 /// Convert into [`RetryStream`]
54 pub fn into_retry_stream(self) -> RetryStream {
55 let rng = SmallRng::seed_from_u64(self.seed);
56 let backoff = self.initial_backoff;
57 RetryStream {
58 cfg: self,
59 rng,
60 attempt: 0,
61 backoff,
62 }
63 }
64}
65
66/// A series of exponential, jittered, clamped sleeps.
67#[derive(Debug)]
68pub struct RetryStream {
69 cfg: Retry,
70 rng: SmallRng,
71 attempt: usize,
72 backoff: Duration,
73}
74
75impl RetryStream {
76 /// How many times [Self::sleep] has been called.
77 pub fn attempt(&self) -> usize {
78 self.attempt
79 }
80
81 /// The next sleep (without jitter for easy printing in logs).
82 pub fn next_sleep(&self) -> Duration {
83 self.backoff
84 }
85
86 /// Executes the next sleep in the series.
87 ///
88 /// This isn't cancel-safe, so it consumes and returns self, to prevent
89 /// accidental mis-use.
90 pub async fn sleep(mut self) -> Self {
91 // Should the jitter be configurable?
92 let jitter = self.rng.gen_range(0.9..=1.1);
93 let sleep = self.backoff.mul_f64(jitter);
94 tokio::time::sleep(sleep).await;
95 self.attempt += 1;
96 self.backoff = std::cmp::min(self.backoff * self.cfg.multiplier, self.cfg.clamp_backoff);
97 self
98 }
99}