mz_storage/source/
probe.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Support for sending frontier probes to upstream systems.
11
12use std::time::Duration;
13
14use mz_ore::now::{EpochMillis, NowFn};
15use mz_repr::Timestamp;
16use tracing::trace;
17
18/// A ticker to drive source upstream probing.
19///
20/// This type works similar to [`tokio::time::Interval`] but returns timestamps from its
21/// [`Ticker::tick`] method that can be used as probe timestamps. These timestamps are rounded down
22/// to the nearest multiple of the tick interval, to reduce the amount of unique timestamps emitted
23/// by sources, thereby reducing churn in downstream dataflows.
24///
25/// The ticker also supports usage in non-async contexts, using [`Ticker::tick_blocking`].
26///
27/// The tick interval is determined by the result of the `get_interval` closure. It is updated
28/// after each tick, allowing it to be changed dynamically during the operation of the ticker.
29pub(super) struct Ticker<G> {
30    interval: EpochMillis,
31    now: NowFn,
32    last_tick: Option<EpochMillis>,
33    get_interval: G,
34}
35
36impl<G: Fn() -> Duration> Ticker<G> {
37    pub fn new(get_interval: G, now: NowFn) -> Self {
38        let interval = get_interval().as_millis().try_into().unwrap();
39        Self {
40            interval,
41            now,
42            last_tick: None,
43            get_interval,
44        }
45    }
46
47    /// Wait until it is time for the next probe, returning a suitable probe timestamp.
48    ///
49    /// This method tries to resolve as close as possible to the returned probe timestamp, though
50    /// it is not guaranteed to always succeed. If a tick is missed, it is skipped entirely.
51    pub async fn tick(&mut self) -> Timestamp {
52        let target = self.next_tick_target();
53
54        let mut now = (self.now)();
55        while now < target {
56            let wait = Duration::from_millis(target - now);
57            tokio::time::sleep(wait).await;
58            now = (self.now)();
59        }
60
61        trace!(target, now, "probe ticker skew: {}ms", now - target);
62        self.apply_tick(now)
63    }
64
65    /// Wait until it is time for the next probe, returning a suitable probe timestamp.
66    ///
67    /// Blocking version of [`Ticker::tick`].
68    pub fn tick_blocking(&mut self) -> Timestamp {
69        let target = self.next_tick_target();
70
71        let mut now = (self.now)();
72        while now < target {
73            let wait = Duration::from_millis(target - now);
74            std::thread::sleep(wait);
75            now = (self.now)();
76        }
77
78        trace!(target, now, "probe ticker skew: {}ms", now - target);
79        self.apply_tick(now)
80    }
81
82    /// Return the desired time of the next tick.
83    fn next_tick_target(&self) -> EpochMillis {
84        let target = match self.last_tick {
85            Some(ms) => ms + self.interval,
86            None => (self.now)(),
87        };
88        self.round_to_interval(target)
89    }
90
91    /// Apply a tick at the given time, returning the probe timestamp.
92    fn apply_tick(&mut self, time: EpochMillis) -> Timestamp {
93        let time = self.round_to_interval(time);
94        self.last_tick = Some(time);
95
96        // Refresh the interval for the next tick.
97        self.interval = (self.get_interval)().as_millis().try_into().unwrap();
98        trace!("probe ticker interval: {}ms", self.interval);
99
100        time.into()
101    }
102
103    fn round_to_interval(&self, ms: EpochMillis) -> EpochMillis {
104        ms - (ms % self.interval)
105    }
106}