1use std::time::Duration;
11
12use serde::{Deserialize, Serialize};
13
14use crate::Timestamp;
15
16#[derive(Clone, Debug, Default, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
17pub struct RefreshSchedule {
18 pub everies: Vec<RefreshEvery>,
20 pub ats: Vec<Timestamp>,
22}
23
24impl RefreshSchedule {
25 pub fn round_up_timestamp(&self, timestamp: Timestamp) -> Option<Timestamp> {
31 let everies = self.everies.iter();
32 let next_everies = everies.map(|every| every.round_up_timestamp(timestamp));
33 let next_ats = self.ats.iter().copied().filter(|at| *at >= timestamp);
34 next_everies.chain(next_ats).min()
35 }
36
37 pub fn round_down_timestamp_m1(&self, timestamp: Timestamp) -> Option<Timestamp> {
43 let everies = self.everies.iter();
44 let prev_everies = everies.map(|every| every.round_down_timestamp_m1(timestamp));
45 let prev_ats = self.ats.iter().copied().filter(|at| *at < timestamp);
48 prev_everies.chain(prev_ats).max()
49 }
50
51 pub fn last_refresh(&self) -> Option<Timestamp> {
61 if self.everies.is_empty() {
62 self.ats.iter().max().cloned()
63 } else {
64 None
65 }
66 }
67
68 pub fn is_empty(&self) -> bool {
70 self.everies.is_empty() && self.ats.is_empty()
71 }
72}
73
74#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
75pub struct RefreshEvery {
76 pub interval: Duration,
77 pub aligned_to: Timestamp,
78}
79
80impl RefreshEvery {
81 pub fn round_up_timestamp(&self, timestamp: Timestamp) -> Timestamp {
90 let interval = u64::try_from(self.interval.as_millis()).unwrap();
91 let aligned_to = u64::from(self.aligned_to);
92 let timestamp = u64::from(timestamp);
93
94 let result = if timestamp > aligned_to {
95 let rounded = Self::round_up_to_multiple_of_interval(interval, timestamp - aligned_to);
96 aligned_to.saturating_add(rounded)
97 } else {
98 aligned_to - Self::round_down_to_multiple_of_interval(interval, aligned_to - timestamp)
104 };
105 assert!(result >= timestamp);
107 assert!(result - timestamp < interval);
108 Timestamp::new(result)
109 }
110
111 pub fn round_down_timestamp_m1(&self, timestamp: Timestamp) -> Timestamp {
120 let interval = u64::try_from(self.interval.as_millis()).unwrap();
121 let aligned_to = u64::from(self.aligned_to);
122 let timestamp = u64::from(timestamp.saturating_sub(1));
123
124 let result = if timestamp >= aligned_to {
125 aligned_to + Self::round_down_to_multiple_of_interval(interval, timestamp - aligned_to)
131 } else {
132 let rounded = Self::round_up_to_multiple_of_interval(interval, aligned_to - timestamp);
133 aligned_to.saturating_sub(rounded)
134 };
135 assert!(result <= timestamp);
137 assert!(timestamp - result < interval);
138 Timestamp::new(result)
139 }
140
141 fn round_up_to_multiple_of_interval(interval: u64, x: u64) -> u64 {
147 assert_ne!(x, 0);
148 (((x - 1) / interval) + 1).saturating_mul(interval)
149 }
150
151 fn round_down_to_multiple_of_interval(interval: u64, x: u64) -> u64 {
153 x / interval * interval
154 }
155}
156
157#[cfg(test)]
158mod tests {
159 use crate::Timestamp;
160 use crate::adt::interval::Interval;
161 use crate::refresh_schedule::{RefreshEvery, RefreshSchedule};
162 use std::str::FromStr;
163
164 #[mz_ore::test]
165 fn test_round_up_down_timestamp() {
166 let ts = |t: u64| Timestamp::new(t);
167 let test = |schedule: RefreshSchedule| {
168 move |expected_round_down_ts: Option<u64>,
169 expected_round_up_ts: Option<u64>,
170 input_ts| {
171 assert_eq!(
172 expected_round_down_ts.map(ts),
173 schedule.round_down_timestamp_m1(ts(input_ts)),
174 );
175 assert_eq!(
176 expected_round_up_ts.map(ts),
177 schedule.round_up_timestamp(ts(input_ts))
178 );
179 }
180 };
181 {
182 let schedule = RefreshSchedule {
183 everies: vec![],
184 ats: vec![ts(123), ts(456)],
185 };
186 let test = test(schedule);
187 test(None, Some(123), 0);
188 test(None, Some(123), 50);
189 test(None, Some(123), 122);
190 test(None, Some(123), 123);
191 test(Some(123), Some(456), 124);
192 test(Some(123), Some(456), 130);
193 test(Some(123), Some(456), 455);
194 test(Some(123), Some(456), 456);
195 test(Some(456), None, 457);
196 test(Some(456), None, 12345678);
197 test(Some(456), None, u64::MAX - 1000);
198 test(Some(456), None, u64::MAX - 1);
199 test(Some(456), None, u64::MAX);
200 }
201 {
202 let schedule = RefreshSchedule {
203 everies: vec![RefreshEvery {
204 interval: Interval::from_str("100 milliseconds")
205 .unwrap()
206 .duration()
207 .unwrap(),
208 aligned_to: ts(500),
209 }],
210 ats: vec![],
211 };
212 let test = test(schedule);
213 test(Some(0), Some(0), 0);
214 test(Some(0), Some(100), 1);
215 test(Some(0), Some(100), 2);
216 test(Some(0), Some(100), 99);
217 test(Some(0), Some(100), 100);
218 test(Some(100), Some(200), 101);
219 test(Some(100), Some(200), 102);
220 test(Some(100), Some(200), 199);
221 test(Some(100), Some(200), 200);
222 test(Some(200), Some(300), 201);
223 test(Some(300), Some(400), 400);
224 test(Some(400), Some(500), 401);
225 test(Some(400), Some(500), 450);
226 test(Some(400), Some(500), 499);
227 test(Some(400), Some(500), 500);
228 test(Some(500), Some(600), 501);
229 test(Some(500), Some(600), 599);
230 test(Some(500), Some(600), 600);
231 test(Some(600), Some(700), 601);
232 test(Some(5434532500), Some(5434532600), 5434532599);
233 test(Some(5434532500), Some(5434532600), 5434532600);
234 test(Some(5434532600), Some(5434532700), 5434532601);
235 test(Some(18446744073709551600), Some(u64::MAX), u64::MAX - 1);
236 test(Some(18446744073709551600), Some(u64::MAX), u64::MAX);
237 }
238 {
239 let schedule = RefreshSchedule {
240 everies: vec![RefreshEvery {
241 interval: Interval::from_str("100 milliseconds")
242 .unwrap()
243 .duration()
244 .unwrap(),
245 aligned_to: ts(542),
246 }],
247 ats: vec![],
248 };
249 let test = test(schedule);
250 test(Some(0), Some(42), 0);
251 test(Some(0), Some(42), 1);
252 test(Some(0), Some(42), 41);
253 test(Some(0), Some(42), 42);
254 test(Some(42), Some(142), 43);
255 test(Some(342), Some(442), 441);
256 test(Some(342), Some(442), 442);
257 test(Some(442), Some(542), 443);
258 test(Some(442), Some(542), 541);
259 test(Some(442), Some(542), 542);
260 test(Some(542), Some(642), 543);
261 test(Some(18446744073709551542), Some(u64::MAX), u64::MAX - 1);
262 test(Some(18446744073709551542), Some(u64::MAX), u64::MAX);
263 }
264 {
265 let schedule = RefreshSchedule {
266 everies: vec![
267 RefreshEvery {
268 interval: Interval::from_str("100 milliseconds")
269 .unwrap()
270 .duration()
271 .unwrap(),
272 aligned_to: ts(400),
273 },
274 RefreshEvery {
275 interval: Interval::from_str("100 milliseconds")
276 .unwrap()
277 .duration()
278 .unwrap(),
279 aligned_to: ts(542),
280 },
281 ],
282 ats: vec![ts(2), ts(300), ts(400), ts(471), ts(541), ts(123456)],
283 };
284 let test = test(schedule);
285 test(Some(0), Some(0), 0);
286 test(Some(0), Some(2), 1);
287 test(Some(0), Some(2), 2);
288 test(Some(2), Some(42), 3);
289 test(Some(2), Some(42), 41);
290 test(Some(2), Some(42), 42);
291 test(Some(42), Some(100), 43);
292 test(Some(42), Some(100), 99);
293 test(Some(42), Some(100), 100);
294 test(Some(100), Some(142), 101);
295 test(Some(100), Some(142), 141);
296 test(Some(100), Some(142), 142);
297 test(Some(142), Some(200), 143);
298 test(Some(242), Some(300), 243);
299 test(Some(242), Some(300), 299);
300 test(Some(242), Some(300), 300);
301 test(Some(300), Some(342), 301);
302 test(Some(342), Some(400), 343);
303 test(Some(342), Some(400), 399);
304 test(Some(342), Some(400), 400);
305 test(Some(400), Some(442), 401);
306 test(Some(400), Some(442), 441);
307 test(Some(400), Some(442), 442);
308 test(Some(442), Some(471), 443);
309 test(Some(442), Some(471), 470);
310 test(Some(442), Some(471), 471);
311 test(Some(471), Some(500), 472);
312 test(Some(471), Some(500), 480);
313 test(Some(471), Some(500), 500);
314 test(Some(500), Some(541), 501);
315 test(Some(500), Some(541), 540);
316 test(Some(500), Some(541), 541);
317 test(Some(541), Some(542), 542);
318 test(Some(542), Some(600), 543);
319 test(Some(65442), Some(65500), 65454);
320 test(Some(87800), Some(87842), 87831);
321 test(Some(123400), Some(123442), 123442);
322 test(Some(123442), Some(123456), 123443);
323 test(Some(123442), Some(123456), 123456);
324 test(Some(123456), Some(123500), 123457);
325 test(Some(18446744073709551600), Some(u64::MAX), u64::MAX - 1);
326 test(Some(18446744073709551600), Some(u64::MAX), u64::MAX);
327 }
328 }
329}