mz_repr/
refresh_schedule.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use proptest::arbitrary::{Arbitrary, any};
11use proptest::prelude::{BoxedStrategy, Strategy};
12use std::time::Duration;
13
14use mz_proto::IntoRustIfSome;
15use mz_proto::{ProtoType, RustType, TryFromProtoError};
16use serde::{Deserialize, Serialize};
17
18use crate::Timestamp;
19
20include!(concat!(env!("OUT_DIR"), "/mz_repr.refresh_schedule.rs"));
21
22#[derive(Clone, Debug, Default, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
23pub struct RefreshSchedule {
24    // `REFRESH EVERY`s
25    pub everies: Vec<RefreshEvery>,
26    // `REFRESH AT`s
27    pub ats: Vec<Timestamp>,
28}
29
30impl RefreshSchedule {
31    /// Rounds up the timestamp to the time of the next refresh.
32    /// Returns None if there is no next refresh.
33    /// It saturates, i.e., if the next refresh would be larger than the maximum timestamp, then it
34    /// returns the maximum timestamp.
35    /// Note that this function is monotonic.
36    pub fn round_up_timestamp(&self, timestamp: Timestamp) -> Option<Timestamp> {
37        let everies = self.everies.iter();
38        let next_everies = everies.map(|every| every.round_up_timestamp(timestamp));
39        let next_ats = self.ats.iter().copied().filter(|at| *at >= timestamp);
40        next_everies.chain(next_ats).min()
41    }
42
43    /// Rounds down `timestamp - 1` to the time of the previous refresh.
44    /// Returns None if there is no previous refresh.
45    /// It saturates, i.e., if the previous refresh would be smaller than the minimum timestamp,
46    /// then it returns the minimum timestamp.
47    /// Note that this fn is monotonic.
48    pub fn round_down_timestamp_m1(&self, timestamp: Timestamp) -> Option<Timestamp> {
49        let everies = self.everies.iter();
50        let prev_everies = everies.map(|every| every.round_down_timestamp_m1(timestamp));
51        // Note that we use `<` instead of `<=`. This is because we are rounding
52        // `timestamp - 1`, and not simply `timestamp`.
53        let prev_ats = self.ats.iter().copied().filter(|at| *at < timestamp);
54        prev_everies.chain(prev_ats).max()
55    }
56
57    /// Returns the time of the last refresh. Returns None if there is no last refresh (e.g., for a
58    /// periodic refresh).
59    ///
60    /// (If there is no last refresh, then we have a `REFRESH EVERY`, in which case the saturating
61    /// roundup puts a refresh at the maximum possible timestamp. This means that it would make
62    /// some sense to return the maximum possible timestamp instead of None. Indeed, some of our
63    /// callers handle our None return value in exactly this way. However, some other callers do
64    /// something else with None, and therefore we don't want to hardcode this behavior in this
65    /// function.)
66    pub fn last_refresh(&self) -> Option<Timestamp> {
67        if self.everies.is_empty() {
68            self.ats.iter().max().cloned()
69        } else {
70            None
71        }
72    }
73
74    /// Returns whether the schedule is empty, i.e., no `EVERY` or `AT`.
75    pub fn is_empty(&self) -> bool {
76        self.everies.is_empty() && self.ats.is_empty()
77    }
78}
79
80#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
81pub struct RefreshEvery {
82    pub interval: Duration,
83    pub aligned_to: Timestamp,
84}
85
86impl RefreshEvery {
87    /// Rounds up the timestamp to the time of the next refresh, according to the given periodic
88    /// refresh schedule. It saturates, i.e., if the rounding would make it overflow, then it
89    /// returns the maximum possible timestamp.
90    ///
91    /// # Panics
92    /// - if the refresh interval converted to milliseconds cast to u64 overflows;
93    /// - if the interval is 0.
94    /// (These should be checked in HIR planning.)
95    pub fn round_up_timestamp(&self, timestamp: Timestamp) -> Timestamp {
96        let interval = u64::try_from(self.interval.as_millis()).unwrap();
97        let aligned_to = u64::from(self.aligned_to);
98        let timestamp = u64::from(timestamp);
99
100        let result = if timestamp > aligned_to {
101            let rounded = Self::round_up_to_multiple_of_interval(interval, timestamp - aligned_to);
102            aligned_to.saturating_add(rounded)
103        } else {
104            // Note: `timestamp == aligned_to` has to be handled here, because in the other branch
105            // `x - 1` in `round_up_to_multiple_of_interval` would underflow.
106            //
107            // Also, no need to check for overflows here, since all the numbers are either between
108            // `timestamp` and `aligned_to`, or not greater than `aligned_to - timestamp`.
109            aligned_to - Self::round_down_to_multiple_of_interval(interval, aligned_to - timestamp)
110        };
111        // TODO: Downgrade these to non-logging soft asserts when we have built more confidence in the code.
112        assert!(result >= timestamp);
113        assert!(result - timestamp < interval);
114        Timestamp::new(result)
115    }
116
117    /// Rounds down `timestamp - 1` to the time of the previous refresh, according to the given
118    /// periodic refresh schedule. It saturates, i.e., if the rounding would make it underflow, then
119    /// it returns the minimum possible timestamp.
120    ///
121    /// # Panics
122    /// - if the refresh interval converted to milliseconds cast to u64 overflows;
123    /// - if the interval is 0.
124    /// (These should be checked in HIR planning.)
125    pub fn round_down_timestamp_m1(&self, timestamp: Timestamp) -> Timestamp {
126        let interval = u64::try_from(self.interval.as_millis()).unwrap();
127        let aligned_to = u64::from(self.aligned_to);
128        let timestamp = u64::from(timestamp.saturating_sub(1));
129
130        let result = if timestamp >= aligned_to {
131            // Note: `timestamp == aligned_to` has to be handled here, because in the other branch
132            // `x - 1` in `round_up_to_multiple_of_interval` would underflow.
133            //
134            // Also, No need to check for overflows here, since all the numbers are either between
135            // `aligned_to` and `timestamp`, or not greater than `timestamp - aligned_to`.
136            aligned_to + Self::round_down_to_multiple_of_interval(interval, timestamp - aligned_to)
137        } else {
138            let rounded = Self::round_up_to_multiple_of_interval(interval, aligned_to - timestamp);
139            aligned_to.saturating_sub(rounded)
140        };
141        // TODO: Downgrade these to non-logging soft asserts when we have built more confidence in the code.
142        assert!(result <= timestamp);
143        assert!(timestamp - result < interval);
144        Timestamp::new(result)
145    }
146
147    /// Rounds up `x` to the nearest multiple of `interval`.
148    /// `x` must not be 0.
149    ///
150    /// It saturates, i.e., if the rounding would make it overflow, then it
151    /// returns the maximum possible timestamp.
152    fn round_up_to_multiple_of_interval(interval: u64, x: u64) -> u64 {
153        assert_ne!(x, 0);
154        (((x - 1) / interval) + 1).saturating_mul(interval)
155    }
156
157    /// Rounds down `x` to the nearest multiple of `interval`.
158    fn round_down_to_multiple_of_interval(interval: u64, x: u64) -> u64 {
159        x / interval * interval
160    }
161}
162
163impl RustType<ProtoRefreshSchedule> for RefreshSchedule {
164    fn into_proto(&self) -> ProtoRefreshSchedule {
165        ProtoRefreshSchedule {
166            everies: self.everies.into_proto(),
167            ats: self.ats.into_proto(),
168        }
169    }
170
171    fn from_proto(proto: ProtoRefreshSchedule) -> Result<Self, TryFromProtoError> {
172        Ok(RefreshSchedule {
173            everies: proto.everies.into_rust()?,
174            ats: proto.ats.into_rust()?,
175        })
176    }
177}
178
179impl RustType<ProtoRefreshEvery> for RefreshEvery {
180    fn into_proto(&self) -> ProtoRefreshEvery {
181        ProtoRefreshEvery {
182            interval: Some(self.interval.into_proto()),
183            aligned_to: Some(self.aligned_to.into_proto()),
184        }
185    }
186
187    fn from_proto(proto: ProtoRefreshEvery) -> Result<Self, TryFromProtoError> {
188        Ok(RefreshEvery {
189            interval: proto
190                .interval
191                .into_rust_if_some("ProtoRefreshEvery::interval")?,
192            aligned_to: proto
193                .aligned_to
194                .into_rust_if_some("ProtoRefreshEvery::aligned_to")?,
195        })
196    }
197}
198
199impl Arbitrary for RefreshSchedule {
200    type Strategy = BoxedStrategy<Self>;
201    type Parameters = ();
202
203    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
204        (
205            proptest::collection::vec(any::<RefreshEvery>(), 0..4),
206            proptest::collection::vec(any::<Timestamp>(), 0..4),
207        )
208            .prop_map(|(everies, ats)| RefreshSchedule { everies, ats })
209            .boxed()
210    }
211}
212
213impl Arbitrary for RefreshEvery {
214    type Strategy = BoxedStrategy<Self>;
215    type Parameters = ();
216
217    fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
218        (any::<Duration>(), any::<Timestamp>())
219            .prop_map(|(interval, aligned_to)| RefreshEvery {
220                interval,
221                aligned_to,
222            })
223            .boxed()
224    }
225}
226
227#[cfg(test)]
228mod tests {
229    use crate::Timestamp;
230    use crate::adt::interval::Interval;
231    use crate::refresh_schedule::{RefreshEvery, RefreshSchedule};
232    use std::str::FromStr;
233
234    #[mz_ore::test]
235    fn test_round_up_down_timestamp() {
236        let ts = |t: u64| Timestamp::new(t);
237        let test = |schedule: RefreshSchedule| {
238            move |expected_round_down_ts: Option<u64>,
239                  expected_round_up_ts: Option<u64>,
240                  input_ts| {
241                assert_eq!(
242                    expected_round_down_ts.map(ts),
243                    schedule.round_down_timestamp_m1(ts(input_ts)),
244                );
245                assert_eq!(
246                    expected_round_up_ts.map(ts),
247                    schedule.round_up_timestamp(ts(input_ts))
248                );
249            }
250        };
251        {
252            let schedule = RefreshSchedule {
253                everies: vec![],
254                ats: vec![ts(123), ts(456)],
255            };
256            let test = test(schedule);
257            test(None, Some(123), 0);
258            test(None, Some(123), 50);
259            test(None, Some(123), 122);
260            test(None, Some(123), 123);
261            test(Some(123), Some(456), 124);
262            test(Some(123), Some(456), 130);
263            test(Some(123), Some(456), 455);
264            test(Some(123), Some(456), 456);
265            test(Some(456), None, 457);
266            test(Some(456), None, 12345678);
267            test(Some(456), None, u64::MAX - 1000);
268            test(Some(456), None, u64::MAX - 1);
269            test(Some(456), None, u64::MAX);
270        }
271        {
272            let schedule = RefreshSchedule {
273                everies: vec![RefreshEvery {
274                    interval: Interval::from_str("100 milliseconds")
275                        .unwrap()
276                        .duration()
277                        .unwrap(),
278                    aligned_to: ts(500),
279                }],
280                ats: vec![],
281            };
282            let test = test(schedule);
283            test(Some(0), Some(0), 0);
284            test(Some(0), Some(100), 1);
285            test(Some(0), Some(100), 2);
286            test(Some(0), Some(100), 99);
287            test(Some(0), Some(100), 100);
288            test(Some(100), Some(200), 101);
289            test(Some(100), Some(200), 102);
290            test(Some(100), Some(200), 199);
291            test(Some(100), Some(200), 200);
292            test(Some(200), Some(300), 201);
293            test(Some(300), Some(400), 400);
294            test(Some(400), Some(500), 401);
295            test(Some(400), Some(500), 450);
296            test(Some(400), Some(500), 499);
297            test(Some(400), Some(500), 500);
298            test(Some(500), Some(600), 501);
299            test(Some(500), Some(600), 599);
300            test(Some(500), Some(600), 600);
301            test(Some(600), Some(700), 601);
302            test(Some(5434532500), Some(5434532600), 5434532599);
303            test(Some(5434532500), Some(5434532600), 5434532600);
304            test(Some(5434532600), Some(5434532700), 5434532601);
305            test(Some(18446744073709551600), Some(u64::MAX), u64::MAX - 1);
306            test(Some(18446744073709551600), Some(u64::MAX), u64::MAX);
307        }
308        {
309            let schedule = RefreshSchedule {
310                everies: vec![RefreshEvery {
311                    interval: Interval::from_str("100 milliseconds")
312                        .unwrap()
313                        .duration()
314                        .unwrap(),
315                    aligned_to: ts(542),
316                }],
317                ats: vec![],
318            };
319            let test = test(schedule);
320            test(Some(0), Some(42), 0);
321            test(Some(0), Some(42), 1);
322            test(Some(0), Some(42), 41);
323            test(Some(0), Some(42), 42);
324            test(Some(42), Some(142), 43);
325            test(Some(342), Some(442), 441);
326            test(Some(342), Some(442), 442);
327            test(Some(442), Some(542), 443);
328            test(Some(442), Some(542), 541);
329            test(Some(442), Some(542), 542);
330            test(Some(542), Some(642), 543);
331            test(Some(18446744073709551542), Some(u64::MAX), u64::MAX - 1);
332            test(Some(18446744073709551542), Some(u64::MAX), u64::MAX);
333        }
334        {
335            let schedule = RefreshSchedule {
336                everies: vec![
337                    RefreshEvery {
338                        interval: Interval::from_str("100 milliseconds")
339                            .unwrap()
340                            .duration()
341                            .unwrap(),
342                        aligned_to: ts(400),
343                    },
344                    RefreshEvery {
345                        interval: Interval::from_str("100 milliseconds")
346                            .unwrap()
347                            .duration()
348                            .unwrap(),
349                        aligned_to: ts(542),
350                    },
351                ],
352                ats: vec![ts(2), ts(300), ts(400), ts(471), ts(541), ts(123456)],
353            };
354            let test = test(schedule);
355            test(Some(0), Some(0), 0);
356            test(Some(0), Some(2), 1);
357            test(Some(0), Some(2), 2);
358            test(Some(2), Some(42), 3);
359            test(Some(2), Some(42), 41);
360            test(Some(2), Some(42), 42);
361            test(Some(42), Some(100), 43);
362            test(Some(42), Some(100), 99);
363            test(Some(42), Some(100), 100);
364            test(Some(100), Some(142), 101);
365            test(Some(100), Some(142), 141);
366            test(Some(100), Some(142), 142);
367            test(Some(142), Some(200), 143);
368            test(Some(242), Some(300), 243);
369            test(Some(242), Some(300), 299);
370            test(Some(242), Some(300), 300);
371            test(Some(300), Some(342), 301);
372            test(Some(342), Some(400), 343);
373            test(Some(342), Some(400), 399);
374            test(Some(342), Some(400), 400);
375            test(Some(400), Some(442), 401);
376            test(Some(400), Some(442), 441);
377            test(Some(400), Some(442), 442);
378            test(Some(442), Some(471), 443);
379            test(Some(442), Some(471), 470);
380            test(Some(442), Some(471), 471);
381            test(Some(471), Some(500), 472);
382            test(Some(471), Some(500), 480);
383            test(Some(471), Some(500), 500);
384            test(Some(500), Some(541), 501);
385            test(Some(500), Some(541), 540);
386            test(Some(500), Some(541), 541);
387            test(Some(541), Some(542), 542);
388            test(Some(542), Some(600), 543);
389            test(Some(65442), Some(65500), 65454);
390            test(Some(87800), Some(87842), 87831);
391            test(Some(123400), Some(123442), 123442);
392            test(Some(123442), Some(123456), 123443);
393            test(Some(123442), Some(123456), 123456);
394            test(Some(123456), Some(123500), 123457);
395            test(Some(18446744073709551600), Some(u64::MAX), u64::MAX - 1);
396            test(Some(18446744073709551600), Some(u64::MAX), u64::MAX);
397        }
398    }
399}