mz_repr/
refresh_schedule.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::time::Duration;
11
12use serde::{Deserialize, Serialize};
13
14use crate::Timestamp;
15
16#[derive(Clone, Debug, Default, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
17pub struct RefreshSchedule {
18    // `REFRESH EVERY`s
19    pub everies: Vec<RefreshEvery>,
20    // `REFRESH AT`s
21    pub ats: Vec<Timestamp>,
22}
23
24impl RefreshSchedule {
25    /// Rounds up the timestamp to the time of the next refresh.
26    /// Returns None if there is no next refresh.
27    /// It saturates, i.e., if the next refresh would be larger than the maximum timestamp, then it
28    /// returns the maximum timestamp.
29    /// Note that this function is monotonic.
30    pub fn round_up_timestamp(&self, timestamp: Timestamp) -> Option<Timestamp> {
31        let everies = self.everies.iter();
32        let next_everies = everies.map(|every| every.round_up_timestamp(timestamp));
33        let next_ats = self.ats.iter().copied().filter(|at| *at >= timestamp);
34        next_everies.chain(next_ats).min()
35    }
36
37    /// Rounds down `timestamp - 1` to the time of the previous refresh.
38    /// Returns None if there is no previous refresh.
39    /// It saturates, i.e., if the previous refresh would be smaller than the minimum timestamp,
40    /// then it returns the minimum timestamp.
41    /// Note that this fn is monotonic.
42    pub fn round_down_timestamp_m1(&self, timestamp: Timestamp) -> Option<Timestamp> {
43        let everies = self.everies.iter();
44        let prev_everies = everies.map(|every| every.round_down_timestamp_m1(timestamp));
45        // Note that we use `<` instead of `<=`. This is because we are rounding
46        // `timestamp - 1`, and not simply `timestamp`.
47        let prev_ats = self.ats.iter().copied().filter(|at| *at < timestamp);
48        prev_everies.chain(prev_ats).max()
49    }
50
51    /// Returns the time of the last refresh. Returns None if there is no last refresh (e.g., for a
52    /// periodic refresh).
53    ///
54    /// (If there is no last refresh, then we have a `REFRESH EVERY`, in which case the saturating
55    /// roundup puts a refresh at the maximum possible timestamp. This means that it would make
56    /// some sense to return the maximum possible timestamp instead of None. Indeed, some of our
57    /// callers handle our None return value in exactly this way. However, some other callers do
58    /// something else with None, and therefore we don't want to hardcode this behavior in this
59    /// function.)
60    pub fn last_refresh(&self) -> Option<Timestamp> {
61        if self.everies.is_empty() {
62            self.ats.iter().max().cloned()
63        } else {
64            None
65        }
66    }
67
68    /// Returns whether the schedule is empty, i.e., no `EVERY` or `AT`.
69    pub fn is_empty(&self) -> bool {
70        self.everies.is_empty() && self.ats.is_empty()
71    }
72}
73
74#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
75pub struct RefreshEvery {
76    pub interval: Duration,
77    pub aligned_to: Timestamp,
78}
79
80impl RefreshEvery {
81    /// Rounds up the timestamp to the time of the next refresh, according to the given periodic
82    /// refresh schedule. It saturates, i.e., if the rounding would make it overflow, then it
83    /// returns the maximum possible timestamp.
84    ///
85    /// # Panics
86    /// - if the refresh interval converted to milliseconds cast to u64 overflows;
87    /// - if the interval is 0.
88    /// (These should be checked in HIR planning.)
89    pub fn round_up_timestamp(&self, timestamp: Timestamp) -> Timestamp {
90        let interval = u64::try_from(self.interval.as_millis()).unwrap();
91        let aligned_to = u64::from(self.aligned_to);
92        let timestamp = u64::from(timestamp);
93
94        let result = if timestamp > aligned_to {
95            let rounded = Self::round_up_to_multiple_of_interval(interval, timestamp - aligned_to);
96            aligned_to.saturating_add(rounded)
97        } else {
98            // Note: `timestamp == aligned_to` has to be handled here, because in the other branch
99            // `x - 1` in `round_up_to_multiple_of_interval` would underflow.
100            //
101            // Also, no need to check for overflows here, since all the numbers are either between
102            // `timestamp` and `aligned_to`, or not greater than `aligned_to - timestamp`.
103            aligned_to - Self::round_down_to_multiple_of_interval(interval, aligned_to - timestamp)
104        };
105        // TODO: Downgrade these to non-logging soft asserts when we have built more confidence in the code.
106        assert!(result >= timestamp);
107        assert!(result - timestamp < interval);
108        Timestamp::new(result)
109    }
110
111    /// Rounds down `timestamp - 1` to the time of the previous refresh, according to the given
112    /// periodic refresh schedule. It saturates, i.e., if the rounding would make it underflow, then
113    /// it returns the minimum possible timestamp.
114    ///
115    /// # Panics
116    /// - if the refresh interval converted to milliseconds cast to u64 overflows;
117    /// - if the interval is 0.
118    /// (These should be checked in HIR planning.)
119    pub fn round_down_timestamp_m1(&self, timestamp: Timestamp) -> Timestamp {
120        let interval = u64::try_from(self.interval.as_millis()).unwrap();
121        let aligned_to = u64::from(self.aligned_to);
122        let timestamp = u64::from(timestamp.saturating_sub(1));
123
124        let result = if timestamp >= aligned_to {
125            // Note: `timestamp == aligned_to` has to be handled here, because in the other branch
126            // `x - 1` in `round_up_to_multiple_of_interval` would underflow.
127            //
128            // Also, No need to check for overflows here, since all the numbers are either between
129            // `aligned_to` and `timestamp`, or not greater than `timestamp - aligned_to`.
130            aligned_to + Self::round_down_to_multiple_of_interval(interval, timestamp - aligned_to)
131        } else {
132            let rounded = Self::round_up_to_multiple_of_interval(interval, aligned_to - timestamp);
133            aligned_to.saturating_sub(rounded)
134        };
135        // TODO: Downgrade these to non-logging soft asserts when we have built more confidence in the code.
136        assert!(result <= timestamp);
137        assert!(timestamp - result < interval);
138        Timestamp::new(result)
139    }
140
141    /// Rounds up `x` to the nearest multiple of `interval`.
142    /// `x` must not be 0.
143    ///
144    /// It saturates, i.e., if the rounding would make it overflow, then it
145    /// returns the maximum possible timestamp.
146    fn round_up_to_multiple_of_interval(interval: u64, x: u64) -> u64 {
147        assert_ne!(x, 0);
148        (((x - 1) / interval) + 1).saturating_mul(interval)
149    }
150
151    /// Rounds down `x` to the nearest multiple of `interval`.
152    fn round_down_to_multiple_of_interval(interval: u64, x: u64) -> u64 {
153        x / interval * interval
154    }
155}
156
157#[cfg(test)]
158mod tests {
159    use crate::Timestamp;
160    use crate::adt::interval::Interval;
161    use crate::refresh_schedule::{RefreshEvery, RefreshSchedule};
162    use std::str::FromStr;
163
164    #[mz_ore::test]
165    fn test_round_up_down_timestamp() {
166        let ts = |t: u64| Timestamp::new(t);
167        let test = |schedule: RefreshSchedule| {
168            move |expected_round_down_ts: Option<u64>,
169                  expected_round_up_ts: Option<u64>,
170                  input_ts| {
171                assert_eq!(
172                    expected_round_down_ts.map(ts),
173                    schedule.round_down_timestamp_m1(ts(input_ts)),
174                );
175                assert_eq!(
176                    expected_round_up_ts.map(ts),
177                    schedule.round_up_timestamp(ts(input_ts))
178                );
179            }
180        };
181        {
182            let schedule = RefreshSchedule {
183                everies: vec![],
184                ats: vec![ts(123), ts(456)],
185            };
186            let test = test(schedule);
187            test(None, Some(123), 0);
188            test(None, Some(123), 50);
189            test(None, Some(123), 122);
190            test(None, Some(123), 123);
191            test(Some(123), Some(456), 124);
192            test(Some(123), Some(456), 130);
193            test(Some(123), Some(456), 455);
194            test(Some(123), Some(456), 456);
195            test(Some(456), None, 457);
196            test(Some(456), None, 12345678);
197            test(Some(456), None, u64::MAX - 1000);
198            test(Some(456), None, u64::MAX - 1);
199            test(Some(456), None, u64::MAX);
200        }
201        {
202            let schedule = RefreshSchedule {
203                everies: vec![RefreshEvery {
204                    interval: Interval::from_str("100 milliseconds")
205                        .unwrap()
206                        .duration()
207                        .unwrap(),
208                    aligned_to: ts(500),
209                }],
210                ats: vec![],
211            };
212            let test = test(schedule);
213            test(Some(0), Some(0), 0);
214            test(Some(0), Some(100), 1);
215            test(Some(0), Some(100), 2);
216            test(Some(0), Some(100), 99);
217            test(Some(0), Some(100), 100);
218            test(Some(100), Some(200), 101);
219            test(Some(100), Some(200), 102);
220            test(Some(100), Some(200), 199);
221            test(Some(100), Some(200), 200);
222            test(Some(200), Some(300), 201);
223            test(Some(300), Some(400), 400);
224            test(Some(400), Some(500), 401);
225            test(Some(400), Some(500), 450);
226            test(Some(400), Some(500), 499);
227            test(Some(400), Some(500), 500);
228            test(Some(500), Some(600), 501);
229            test(Some(500), Some(600), 599);
230            test(Some(500), Some(600), 600);
231            test(Some(600), Some(700), 601);
232            test(Some(5434532500), Some(5434532600), 5434532599);
233            test(Some(5434532500), Some(5434532600), 5434532600);
234            test(Some(5434532600), Some(5434532700), 5434532601);
235            test(Some(18446744073709551600), Some(u64::MAX), u64::MAX - 1);
236            test(Some(18446744073709551600), Some(u64::MAX), u64::MAX);
237        }
238        {
239            let schedule = RefreshSchedule {
240                everies: vec![RefreshEvery {
241                    interval: Interval::from_str("100 milliseconds")
242                        .unwrap()
243                        .duration()
244                        .unwrap(),
245                    aligned_to: ts(542),
246                }],
247                ats: vec![],
248            };
249            let test = test(schedule);
250            test(Some(0), Some(42), 0);
251            test(Some(0), Some(42), 1);
252            test(Some(0), Some(42), 41);
253            test(Some(0), Some(42), 42);
254            test(Some(42), Some(142), 43);
255            test(Some(342), Some(442), 441);
256            test(Some(342), Some(442), 442);
257            test(Some(442), Some(542), 443);
258            test(Some(442), Some(542), 541);
259            test(Some(442), Some(542), 542);
260            test(Some(542), Some(642), 543);
261            test(Some(18446744073709551542), Some(u64::MAX), u64::MAX - 1);
262            test(Some(18446744073709551542), Some(u64::MAX), u64::MAX);
263        }
264        {
265            let schedule = RefreshSchedule {
266                everies: vec![
267                    RefreshEvery {
268                        interval: Interval::from_str("100 milliseconds")
269                            .unwrap()
270                            .duration()
271                            .unwrap(),
272                        aligned_to: ts(400),
273                    },
274                    RefreshEvery {
275                        interval: Interval::from_str("100 milliseconds")
276                            .unwrap()
277                            .duration()
278                            .unwrap(),
279                        aligned_to: ts(542),
280                    },
281                ],
282                ats: vec![ts(2), ts(300), ts(400), ts(471), ts(541), ts(123456)],
283            };
284            let test = test(schedule);
285            test(Some(0), Some(0), 0);
286            test(Some(0), Some(2), 1);
287            test(Some(0), Some(2), 2);
288            test(Some(2), Some(42), 3);
289            test(Some(2), Some(42), 41);
290            test(Some(2), Some(42), 42);
291            test(Some(42), Some(100), 43);
292            test(Some(42), Some(100), 99);
293            test(Some(42), Some(100), 100);
294            test(Some(100), Some(142), 101);
295            test(Some(100), Some(142), 141);
296            test(Some(100), Some(142), 142);
297            test(Some(142), Some(200), 143);
298            test(Some(242), Some(300), 243);
299            test(Some(242), Some(300), 299);
300            test(Some(242), Some(300), 300);
301            test(Some(300), Some(342), 301);
302            test(Some(342), Some(400), 343);
303            test(Some(342), Some(400), 399);
304            test(Some(342), Some(400), 400);
305            test(Some(400), Some(442), 401);
306            test(Some(400), Some(442), 441);
307            test(Some(400), Some(442), 442);
308            test(Some(442), Some(471), 443);
309            test(Some(442), Some(471), 470);
310            test(Some(442), Some(471), 471);
311            test(Some(471), Some(500), 472);
312            test(Some(471), Some(500), 480);
313            test(Some(471), Some(500), 500);
314            test(Some(500), Some(541), 501);
315            test(Some(500), Some(541), 540);
316            test(Some(500), Some(541), 541);
317            test(Some(541), Some(542), 542);
318            test(Some(542), Some(600), 543);
319            test(Some(65442), Some(65500), 65454);
320            test(Some(87800), Some(87842), 87831);
321            test(Some(123400), Some(123442), 123442);
322            test(Some(123442), Some(123456), 123443);
323            test(Some(123442), Some(123456), 123456);
324            test(Some(123456), Some(123500), 123457);
325            test(Some(18446744073709551600), Some(u64::MAX), u64::MAX - 1);
326            test(Some(18446744073709551600), Some(u64::MAX), u64::MAX);
327        }
328    }
329}