Skip to main content

mz_repr/
refresh_schedule.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::time::Duration;
11
12use serde::{Deserialize, Serialize};
13
14use crate::Timestamp;
15
16#[derive(
17    Clone,
18    Debug,
19    Default,
20    Serialize,
21    Deserialize,
22    Eq,
23    PartialEq,
24    Ord,
25    PartialOrd
26)]
27pub struct RefreshSchedule {
28    // `REFRESH EVERY`s
29    pub everies: Vec<RefreshEvery>,
30    // `REFRESH AT`s
31    pub ats: Vec<Timestamp>,
32}
33
34impl RefreshSchedule {
35    /// Rounds up the timestamp to the time of the next refresh.
36    /// Returns None if there is no next refresh.
37    /// It saturates, i.e., if the next refresh would be larger than the maximum timestamp, then it
38    /// returns the maximum timestamp.
39    /// Note that this function is monotonic.
40    pub fn round_up_timestamp(&self, timestamp: Timestamp) -> Option<Timestamp> {
41        let everies = self.everies.iter();
42        let next_everies = everies.map(|every| every.round_up_timestamp(timestamp));
43        let next_ats = self.ats.iter().copied().filter(|at| *at >= timestamp);
44        next_everies.chain(next_ats).min()
45    }
46
47    /// Rounds down `timestamp - 1` to the time of the previous refresh.
48    /// Returns None if there is no previous refresh.
49    /// It saturates, i.e., if the previous refresh would be smaller than the minimum timestamp,
50    /// then it returns the minimum timestamp.
51    /// Note that this fn is monotonic.
52    pub fn round_down_timestamp_m1(&self, timestamp: Timestamp) -> Option<Timestamp> {
53        let everies = self.everies.iter();
54        let prev_everies = everies.map(|every| every.round_down_timestamp_m1(timestamp));
55        // Note that we use `<` instead of `<=`. This is because we are rounding
56        // `timestamp - 1`, and not simply `timestamp`.
57        let prev_ats = self.ats.iter().copied().filter(|at| *at < timestamp);
58        prev_everies.chain(prev_ats).max()
59    }
60
61    /// Returns the time of the last refresh. Returns None if there is no last refresh (e.g., for a
62    /// periodic refresh).
63    ///
64    /// (If there is no last refresh, then we have a `REFRESH EVERY`, in which case the saturating
65    /// roundup puts a refresh at the maximum possible timestamp. This means that it would make
66    /// some sense to return the maximum possible timestamp instead of None. Indeed, some of our
67    /// callers handle our None return value in exactly this way. However, some other callers do
68    /// something else with None, and therefore we don't want to hardcode this behavior in this
69    /// function.)
70    pub fn last_refresh(&self) -> Option<Timestamp> {
71        if self.everies.is_empty() {
72            self.ats.iter().max().cloned()
73        } else {
74            None
75        }
76    }
77
78    /// Returns whether the schedule is empty, i.e., no `EVERY` or `AT`.
79    pub fn is_empty(&self) -> bool {
80        self.everies.is_empty() && self.ats.is_empty()
81    }
82}
83
84#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
85pub struct RefreshEvery {
86    pub interval: Duration,
87    pub aligned_to: Timestamp,
88}
89
90impl RefreshEvery {
91    /// Rounds up the timestamp to the time of the next refresh, according to the given periodic
92    /// refresh schedule. It saturates, i.e., if the rounding would make it overflow, then it
93    /// returns the maximum possible timestamp.
94    ///
95    /// # Panics
96    /// - if the refresh interval converted to milliseconds cast to u64 overflows;
97    /// - if the interval is 0.
98    /// (These should be checked in HIR planning.)
99    pub fn round_up_timestamp(&self, timestamp: Timestamp) -> Timestamp {
100        let interval = u64::try_from(self.interval.as_millis()).unwrap();
101        let aligned_to = u64::from(self.aligned_to);
102        let timestamp = u64::from(timestamp);
103
104        let result = if timestamp > aligned_to {
105            let rounded = Self::round_up_to_multiple_of_interval(interval, timestamp - aligned_to);
106            aligned_to.saturating_add(rounded)
107        } else {
108            // Note: `timestamp == aligned_to` has to be handled here, because in the other branch
109            // `x - 1` in `round_up_to_multiple_of_interval` would underflow.
110            //
111            // Also, no need to check for overflows here, since all the numbers are either between
112            // `timestamp` and `aligned_to`, or not greater than `aligned_to - timestamp`.
113            aligned_to - Self::round_down_to_multiple_of_interval(interval, aligned_to - timestamp)
114        };
115        // TODO: Downgrade these to non-logging soft asserts when we have built more confidence in the code.
116        assert!(result >= timestamp);
117        assert!(result - timestamp < interval);
118        Timestamp::new(result)
119    }
120
121    /// Rounds down `timestamp - 1` to the time of the previous refresh, according to the given
122    /// periodic refresh schedule. It saturates, i.e., if the rounding would make it underflow, then
123    /// it returns the minimum possible timestamp.
124    ///
125    /// # Panics
126    /// - if the refresh interval converted to milliseconds cast to u64 overflows;
127    /// - if the interval is 0.
128    /// (These should be checked in HIR planning.)
129    pub fn round_down_timestamp_m1(&self, timestamp: Timestamp) -> Timestamp {
130        let interval = u64::try_from(self.interval.as_millis()).unwrap();
131        let aligned_to = u64::from(self.aligned_to);
132        let timestamp = u64::from(timestamp.saturating_sub(1));
133
134        let result = if timestamp >= aligned_to {
135            // Note: `timestamp == aligned_to` has to be handled here, because in the other branch
136            // `x - 1` in `round_up_to_multiple_of_interval` would underflow.
137            //
138            // Also, No need to check for overflows here, since all the numbers are either between
139            // `aligned_to` and `timestamp`, or not greater than `timestamp - aligned_to`.
140            aligned_to + Self::round_down_to_multiple_of_interval(interval, timestamp - aligned_to)
141        } else {
142            let rounded = Self::round_up_to_multiple_of_interval(interval, aligned_to - timestamp);
143            aligned_to.saturating_sub(rounded)
144        };
145        // TODO: Downgrade these to non-logging soft asserts when we have built more confidence in the code.
146        assert!(result <= timestamp);
147        assert!(timestamp - result < interval);
148        Timestamp::new(result)
149    }
150
151    /// Rounds up `x` to the nearest multiple of `interval`.
152    /// `x` must not be 0.
153    ///
154    /// It saturates, i.e., if the rounding would make it overflow, then it
155    /// returns the maximum possible timestamp.
156    fn round_up_to_multiple_of_interval(interval: u64, x: u64) -> u64 {
157        assert_ne!(x, 0);
158        (((x - 1) / interval) + 1).saturating_mul(interval)
159    }
160
161    /// Rounds down `x` to the nearest multiple of `interval`.
162    fn round_down_to_multiple_of_interval(interval: u64, x: u64) -> u64 {
163        x / interval * interval
164    }
165}
166
167#[cfg(test)]
168mod tests {
169    use crate::Timestamp;
170    use crate::adt::interval::Interval;
171    use crate::refresh_schedule::{RefreshEvery, RefreshSchedule};
172    use std::str::FromStr;
173
174    #[mz_ore::test]
175    fn test_round_up_down_timestamp() {
176        let ts = |t: u64| Timestamp::new(t);
177        let test = |schedule: RefreshSchedule| {
178            move |expected_round_down_ts: Option<u64>,
179                  expected_round_up_ts: Option<u64>,
180                  input_ts| {
181                assert_eq!(
182                    expected_round_down_ts.map(ts),
183                    schedule.round_down_timestamp_m1(ts(input_ts)),
184                );
185                assert_eq!(
186                    expected_round_up_ts.map(ts),
187                    schedule.round_up_timestamp(ts(input_ts))
188                );
189            }
190        };
191        {
192            let schedule = RefreshSchedule {
193                everies: vec![],
194                ats: vec![ts(123), ts(456)],
195            };
196            let test = test(schedule);
197            test(None, Some(123), 0);
198            test(None, Some(123), 50);
199            test(None, Some(123), 122);
200            test(None, Some(123), 123);
201            test(Some(123), Some(456), 124);
202            test(Some(123), Some(456), 130);
203            test(Some(123), Some(456), 455);
204            test(Some(123), Some(456), 456);
205            test(Some(456), None, 457);
206            test(Some(456), None, 12345678);
207            test(Some(456), None, u64::MAX - 1000);
208            test(Some(456), None, u64::MAX - 1);
209            test(Some(456), None, u64::MAX);
210        }
211        {
212            let schedule = RefreshSchedule {
213                everies: vec![RefreshEvery {
214                    interval: Interval::from_str("100 milliseconds")
215                        .unwrap()
216                        .duration()
217                        .unwrap(),
218                    aligned_to: ts(500),
219                }],
220                ats: vec![],
221            };
222            let test = test(schedule);
223            test(Some(0), Some(0), 0);
224            test(Some(0), Some(100), 1);
225            test(Some(0), Some(100), 2);
226            test(Some(0), Some(100), 99);
227            test(Some(0), Some(100), 100);
228            test(Some(100), Some(200), 101);
229            test(Some(100), Some(200), 102);
230            test(Some(100), Some(200), 199);
231            test(Some(100), Some(200), 200);
232            test(Some(200), Some(300), 201);
233            test(Some(300), Some(400), 400);
234            test(Some(400), Some(500), 401);
235            test(Some(400), Some(500), 450);
236            test(Some(400), Some(500), 499);
237            test(Some(400), Some(500), 500);
238            test(Some(500), Some(600), 501);
239            test(Some(500), Some(600), 599);
240            test(Some(500), Some(600), 600);
241            test(Some(600), Some(700), 601);
242            test(Some(5434532500), Some(5434532600), 5434532599);
243            test(Some(5434532500), Some(5434532600), 5434532600);
244            test(Some(5434532600), Some(5434532700), 5434532601);
245            test(Some(18446744073709551600), Some(u64::MAX), u64::MAX - 1);
246            test(Some(18446744073709551600), Some(u64::MAX), u64::MAX);
247        }
248        {
249            let schedule = RefreshSchedule {
250                everies: vec![RefreshEvery {
251                    interval: Interval::from_str("100 milliseconds")
252                        .unwrap()
253                        .duration()
254                        .unwrap(),
255                    aligned_to: ts(542),
256                }],
257                ats: vec![],
258            };
259            let test = test(schedule);
260            test(Some(0), Some(42), 0);
261            test(Some(0), Some(42), 1);
262            test(Some(0), Some(42), 41);
263            test(Some(0), Some(42), 42);
264            test(Some(42), Some(142), 43);
265            test(Some(342), Some(442), 441);
266            test(Some(342), Some(442), 442);
267            test(Some(442), Some(542), 443);
268            test(Some(442), Some(542), 541);
269            test(Some(442), Some(542), 542);
270            test(Some(542), Some(642), 543);
271            test(Some(18446744073709551542), Some(u64::MAX), u64::MAX - 1);
272            test(Some(18446744073709551542), Some(u64::MAX), u64::MAX);
273        }
274        {
275            let schedule = RefreshSchedule {
276                everies: vec![
277                    RefreshEvery {
278                        interval: Interval::from_str("100 milliseconds")
279                            .unwrap()
280                            .duration()
281                            .unwrap(),
282                        aligned_to: ts(400),
283                    },
284                    RefreshEvery {
285                        interval: Interval::from_str("100 milliseconds")
286                            .unwrap()
287                            .duration()
288                            .unwrap(),
289                        aligned_to: ts(542),
290                    },
291                ],
292                ats: vec![ts(2), ts(300), ts(400), ts(471), ts(541), ts(123456)],
293            };
294            let test = test(schedule);
295            test(Some(0), Some(0), 0);
296            test(Some(0), Some(2), 1);
297            test(Some(0), Some(2), 2);
298            test(Some(2), Some(42), 3);
299            test(Some(2), Some(42), 41);
300            test(Some(2), Some(42), 42);
301            test(Some(42), Some(100), 43);
302            test(Some(42), Some(100), 99);
303            test(Some(42), Some(100), 100);
304            test(Some(100), Some(142), 101);
305            test(Some(100), Some(142), 141);
306            test(Some(100), Some(142), 142);
307            test(Some(142), Some(200), 143);
308            test(Some(242), Some(300), 243);
309            test(Some(242), Some(300), 299);
310            test(Some(242), Some(300), 300);
311            test(Some(300), Some(342), 301);
312            test(Some(342), Some(400), 343);
313            test(Some(342), Some(400), 399);
314            test(Some(342), Some(400), 400);
315            test(Some(400), Some(442), 401);
316            test(Some(400), Some(442), 441);
317            test(Some(400), Some(442), 442);
318            test(Some(442), Some(471), 443);
319            test(Some(442), Some(471), 470);
320            test(Some(442), Some(471), 471);
321            test(Some(471), Some(500), 472);
322            test(Some(471), Some(500), 480);
323            test(Some(471), Some(500), 500);
324            test(Some(500), Some(541), 501);
325            test(Some(500), Some(541), 540);
326            test(Some(500), Some(541), 541);
327            test(Some(541), Some(542), 542);
328            test(Some(542), Some(600), 543);
329            test(Some(65442), Some(65500), 65454);
330            test(Some(87800), Some(87842), 87831);
331            test(Some(123400), Some(123442), 123442);
332            test(Some(123442), Some(123456), 123443);
333            test(Some(123442), Some(123456), 123456);
334            test(Some(123456), Some(123500), 123457);
335            test(Some(18446744073709551600), Some(u64::MAX), u64::MAX - 1);
336            test(Some(18446744073709551600), Some(u64::MAX), u64::MAX);
337        }
338    }
339}