1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
//! Retry utilities.
//!
//! TODO: The structure of this intentionally mirrors [mz_ore::retry] (and
//! `mz_persist::retry`, from which it is copied), so that it could be merged in
//! if anyone were so inclined.
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use rand::prelude::SmallRng;
use rand::{Rng, SeedableRng};
/// Configures a retry operation.
#[derive(Debug, Clone, PartialEq)]
pub struct Retry {
/// The initial backoff for the retry operation.
pub initial_backoff: Duration,
/// The backoff multiplier.
pub multiplier: u32,
/// Clamps the maximum backoff for the retry operation.
pub clamp_backoff: Duration,
/// A seed for the random jitter.
pub seed: u64,
}
impl Retry {
/// The default retry configuration for timestamp oracles.
///
/// Uses the given SystemTime to initialize the seed for random jitter.
pub fn oracle_defaults(now: SystemTime) -> Self {
Retry {
// Chosen to meet the following arbitrary criteria: a power of two
// that's close to the AWS Aurora latency of 6ms.
initial_backoff: Duration::from_millis(4),
multiplier: 2,
// Chosen to meet the following arbitrary criteria: between 10s and
// 60s.
clamp_backoff: Duration::from_secs(16),
seed: now
.duration_since(UNIX_EPOCH)
.map_or(0, |x| u64::from(x.subsec_nanos())),
}
}
/// Convert into [`RetryStream`]
pub fn into_retry_stream(self) -> RetryStream {
let rng = SmallRng::seed_from_u64(self.seed);
let backoff = self.initial_backoff;
RetryStream {
cfg: self,
rng,
attempt: 0,
backoff,
}
}
}
/// A series of exponential, jittered, clamped sleeps.
#[derive(Debug)]
pub struct RetryStream {
cfg: Retry,
rng: SmallRng,
attempt: usize,
backoff: Duration,
}
impl RetryStream {
/// How many times [Self::sleep] has been called.
pub fn attempt(&self) -> usize {
self.attempt
}
/// The next sleep (without jitter for easy printing in logs).
pub fn next_sleep(&self) -> Duration {
self.backoff
}
/// Executes the next sleep in the series.
///
/// This isn't cancel-safe, so it consumes and returns self, to prevent
/// accidental mis-use.
pub async fn sleep(mut self) -> Self {
// Should the jitter be configurable?
let jitter = self.rng.gen_range(0.9..=1.1);
let sleep = self.backoff.mul_f64(jitter);
tokio::time::sleep(sleep).await;
self.attempt += 1;
self.backoff = std::cmp::min(self.backoff * self.cfg.multiplier, self.cfg.clamp_backoff);
self
}
}