mz_storage/source/probe.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Support for sending frontier probes to upstream systems.
11
12use std::time::Duration;
13
14use mz_ore::now::{EpochMillis, NowFn};
15use mz_repr::Timestamp;
16use tracing::trace;
17
18/// A ticker to drive source upstream probing.
19///
20/// This type works similar to [`tokio::time::Interval`] but returns timestamps from its
21/// [`Ticker::tick`] method that can be used as probe timestamps. These timestamps are rounded down
22/// to the nearest multiple of the tick interval, to reduce the amount of unique timestamps emitted
23/// by sources, thereby reducing churn in downstream dataflows.
24///
25/// The ticker also supports usage in non-async contexts, using [`Ticker::tick_blocking`].
26///
27/// The tick interval is determined by the result of the `get_interval` closure. It is updated
28/// after each tick, allowing it to be changed dynamically during the operation of the ticker.
29pub(super) struct Ticker<G> {
30 interval: EpochMillis,
31 now: NowFn,
32 last_tick: Option<EpochMillis>,
33 get_interval: G,
34}
35
36impl<G: Fn() -> Duration> Ticker<G> {
37 pub fn new(get_interval: G, now: NowFn) -> Self {
38 let interval = get_interval().as_millis().try_into().unwrap();
39 Self {
40 interval,
41 now,
42 last_tick: None,
43 get_interval,
44 }
45 }
46
47 /// Wait until it is time for the next probe, returning a suitable probe timestamp.
48 ///
49 /// This method tries to resolve as close as possible to the returned probe timestamp, though
50 /// it is not guaranteed to always succeed. If a tick is missed, it is skipped entirely.
51 pub async fn tick(&mut self) -> Timestamp {
52 let target = self.next_tick_target();
53
54 let mut now = (self.now)();
55 while now < target {
56 let wait = Duration::from_millis(target - now);
57 tokio::time::sleep(wait).await;
58 now = (self.now)();
59 }
60
61 trace!(target, now, "probe ticker skew: {}ms", now - target);
62 self.apply_tick(now)
63 }
64
65 /// Wait until it is time for the next probe, returning a suitable probe timestamp.
66 ///
67 /// Blocking version of [`Ticker::tick`].
68 pub fn tick_blocking(&mut self) -> Timestamp {
69 let target = self.next_tick_target();
70
71 let mut now = (self.now)();
72 while now < target {
73 let wait = Duration::from_millis(target - now);
74 std::thread::sleep(wait);
75 now = (self.now)();
76 }
77
78 trace!(target, now, "probe ticker skew: {}ms", now - target);
79 self.apply_tick(now)
80 }
81
82 /// Return the desired time of the next tick.
83 fn next_tick_target(&self) -> EpochMillis {
84 let target = match self.last_tick {
85 Some(ms) => ms + self.interval,
86 None => (self.now)(),
87 };
88 self.round_to_interval(target)
89 }
90
91 /// Apply a tick at the given time, returning the probe timestamp.
92 fn apply_tick(&mut self, time: EpochMillis) -> Timestamp {
93 let time = self.round_to_interval(time);
94 self.last_tick = Some(time);
95
96 // Refresh the interval for the next tick.
97 self.interval = (self.get_interval)().as_millis().try_into().unwrap();
98 trace!("probe ticker interval: {}ms", self.interval);
99
100 time.into()
101 }
102
103 fn round_to_interval(&self, ms: EpochMillis) -> EpochMillis {
104 ms - (ms % self.interval)
105 }
106}