1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
910//! Retry utilities.
11//!
12//! TODO: The structure of this intentionally mirrors [mz_ore::retry], so that
13//! it could be merged in if anyone were so inclined.
1415use std::time::{Duration, SystemTime, UNIX_EPOCH};
1617use rand::prelude::SmallRng;
18use rand::{Rng, SeedableRng};
1920/// Configures a retry operation.
21#[derive(Debug, Clone, PartialEq)]
22pub struct Retry {
23/// An initial fixed sleep, before the exponential backoff.
24 ///
25 /// This is skipped if set to [Duration::ZERO].
26pub fixed_sleep: Duration,
27/// The initial backoff for the exponential backoff retries.
28pub initial_backoff: Duration,
29/// The backoff multiplier.
30pub multiplier: u32,
31/// Clamps the maximum backoff for the retry operation.
32pub clamp_backoff: Duration,
33/// A seed for the random jitter.
34pub seed: u64,
35}
3637impl Retry {
38/// The default retry configuration for persist.
39 ///
40 /// Uses the given SystemTime to initialize the seed for random jitter.
41pub fn persist_defaults(now: SystemTime) -> Self {
42 Retry {
43 fixed_sleep: Duration::ZERO,
44// Chosen to meet the following arbitrary criteria: a power of two
45 // that's close to the AWS Aurora latency of 6ms.
46initial_backoff: Duration::from_millis(4),
47 multiplier: 2,
48// Chosen to meet the following arbitrary criteria: between 10s and
49 // 60s.
50clamp_backoff: Duration::from_secs(16),
51 seed: now
52 .duration_since(UNIX_EPOCH)
53 .map_or(0, |x| u64::from(x.subsec_nanos())),
54 }
55 }
5657/// Convert into [`RetryStream`]
58pub fn into_retry_stream(self) -> RetryStream {
59let rng = SmallRng::seed_from_u64(self.seed);
60let backoff = (self.fixed_sleep == Duration::ZERO).then_some(self.initial_backoff);
61 RetryStream {
62 cfg: self,
63 rng,
64 attempt: 0,
65 backoff,
66 }
67 }
68}
6970/// A series of exponential, jittered, clamped sleeps.
71#[derive(Debug)]
72pub struct RetryStream {
73 cfg: Retry,
74 rng: SmallRng,
75 attempt: usize,
76// None if the next sleep is `cfg.fixed_sleep`.
77backoff: Option<Duration>,
78}
7980impl RetryStream {
81/// How many times [Self::sleep] has been called.
82pub fn attempt(&self) -> usize {
83self.attempt
84 }
8586/// The next sleep (without jitter for easy printing in logs).
87pub fn next_sleep(&self) -> Duration {
88self.backoff.unwrap_or(self.cfg.fixed_sleep)
89 }
9091/// Executes the next sleep in the series.
92 ///
93 /// This isn't cancel-safe, so it consumes and returns self, to prevent
94 /// accidental mis-use.
95pub async fn sleep(mut self) -> Self {
96// Should the jitter be configurable?
97let jitter = self.rng.gen_range(0.9..=1.1);
98let sleep = self.next_sleep().mul_f64(jitter);
99 tokio::time::sleep(sleep).await;
100self.advance()
101 }
102103// Only exposed for testing.
104fn advance(mut self) -> Self {
105self.attempt += 1;
106self.backoff = Some(match self.backoff {
107None => self.cfg.initial_backoff,
108Some(x) => std::cmp::min(x * self.cfg.multiplier, self.cfg.clamp_backoff),
109 });
110self
111}
112}
113114#[cfg(test)]
115mod tests {
116use super::*;
117118#[mz_ore::test]
119fn retry_stream() {
120#[track_caller]
121fn testcase(r: Retry, expected_sleep_ms: Vec<u64>) {
122let mut r = r.into_retry_stream();
123for expected_sleep_ms in expected_sleep_ms {
124let expected = Duration::from_millis(expected_sleep_ms);
125let actual = r.next_sleep();
126assert_eq!(actual, expected);
127 r = r.advance();
128 }
129 }
130131 testcase(
132 Retry {
133 fixed_sleep: Duration::ZERO,
134 initial_backoff: Duration::from_millis(1_200),
135 multiplier: 2,
136 clamp_backoff: Duration::from_secs(16),
137 seed: 0,
138 },
139vec![1_200, 2_400, 4_800, 9_600, 16_000, 16_000],
140 );
141 testcase(
142 Retry {
143 fixed_sleep: Duration::from_millis(1_200),
144 initial_backoff: Duration::from_millis(100),
145 multiplier: 2,
146 clamp_backoff: Duration::from_secs(16),
147 seed: 0,
148 },
149vec![
1501_200, 100, 200, 400, 800, 1_600, 3_200, 6_400, 12_800, 16_000, 16_000,
151 ],
152 );
153 }
154}