mz_storage/source/probe.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Support for sending frontier probes to upstream systems.
11
12use std::time::Duration;
13
14use mz_ore::now::{EpochMillis, NowFn};
15use mz_repr::Timestamp;
16use tracing::trace;
17
18/// A ticker to drive source upstream probing.
19///
20/// This type works similar to [`tokio::time::Interval`] but returns timestamps from its
21/// [`Ticker::tick`] method that can be used as probe timestamps. These timestamps are rounded down
22/// to the nearest multiple of the tick interval, to reduce the amount of unique timestamps emitted
23/// by sources, thereby reducing churn in downstream dataflows.
24///
25/// The ticker also supports usage in non-async contexts, using [`Ticker::tick_blocking`].
26///
27/// The tick interval is determined by the result of the `get_interval` closure. It is updated
28/// after each tick, allowing it to be changed dynamically during the operation of the ticker.
29pub(super) struct Ticker<G> {
30 interval: EpochMillis,
31 now: NowFn,
32 last_tick: Option<EpochMillis>,
33 get_interval: G,
34}
35
36impl<G: Fn() -> Duration> Ticker<G> {
37 pub fn new(get_interval: G, now: NowFn) -> Self {
38 let mut ticker = Self {
39 interval: Default::default(),
40 now,
41 last_tick: None,
42 get_interval,
43 };
44 ticker.refresh_interval();
45 ticker
46 }
47
48 /// Wait until it is time for the next probe, returning a suitable probe timestamp.
49 ///
50 /// This method tries to resolve as close as possible to the returned probe timestamp, though
51 /// it is not guaranteed to always succeed. If a tick is missed, it is skipped entirely.
52 pub async fn tick(&mut self) -> Timestamp {
53 let target = self.next_tick_target();
54
55 let mut now = (self.now)();
56 while now < target {
57 let wait = Duration::from_millis(target - now);
58 tokio::time::sleep(wait).await;
59 now = (self.now)();
60 }
61
62 trace!(target, now, "probe ticker skew: {}ms", now - target);
63 self.apply_tick(now)
64 }
65
66 /// Wait until it is time for the next probe, returning a suitable probe timestamp.
67 ///
68 /// Blocking version of [`Ticker::tick`].
69 pub fn tick_blocking(&mut self) -> Timestamp {
70 let target = self.next_tick_target();
71
72 let mut now = (self.now)();
73 while now < target {
74 let wait = Duration::from_millis(target - now);
75 std::thread::sleep(wait);
76 now = (self.now)();
77 }
78
79 trace!(target, now, "probe ticker skew: {}ms", now - target);
80 self.apply_tick(now)
81 }
82
83 fn refresh_interval(&mut self) {
84 let ms = (self.get_interval)().as_millis().try_into().unwrap();
85 self.interval = std::cmp::max(ms, 1);
86 }
87
88 /// Return the desired time of the next tick.
89 fn next_tick_target(&self) -> EpochMillis {
90 let target = match self.last_tick {
91 Some(ms) => ms + self.interval,
92 None => (self.now)(),
93 };
94 self.round_to_interval(target)
95 }
96
97 /// Apply a tick at the given time, returning the probe timestamp.
98 fn apply_tick(&mut self, time: EpochMillis) -> Timestamp {
99 let time = self.round_to_interval(time);
100 self.last_tick = Some(time);
101
102 // Refresh the interval for the next tick.
103 self.refresh_interval();
104 trace!("probe ticker interval: {}ms", self.interval);
105
106 time.into()
107 }
108
109 fn round_to_interval(&self, ms: EpochMillis) -> EpochMillis {
110 ms - (ms % self.interval)
111 }
112}