1use std::time::Duration;
11
12use serde::{Deserialize, Serialize};
13
14use crate::Timestamp;
15
16#[derive(
17 Clone,
18 Debug,
19 Default,
20 Serialize,
21 Deserialize,
22 Eq,
23 PartialEq,
24 Ord,
25 PartialOrd
26)]
27pub struct RefreshSchedule {
28 pub everies: Vec<RefreshEvery>,
30 pub ats: Vec<Timestamp>,
32}
33
34impl RefreshSchedule {
35 pub fn round_up_timestamp(&self, timestamp: Timestamp) -> Option<Timestamp> {
41 let everies = self.everies.iter();
42 let next_everies = everies.map(|every| every.round_up_timestamp(timestamp));
43 let next_ats = self.ats.iter().copied().filter(|at| *at >= timestamp);
44 next_everies.chain(next_ats).min()
45 }
46
47 pub fn round_down_timestamp_m1(&self, timestamp: Timestamp) -> Option<Timestamp> {
53 let everies = self.everies.iter();
54 let prev_everies = everies.map(|every| every.round_down_timestamp_m1(timestamp));
55 let prev_ats = self.ats.iter().copied().filter(|at| *at < timestamp);
58 prev_everies.chain(prev_ats).max()
59 }
60
61 pub fn last_refresh(&self) -> Option<Timestamp> {
71 if self.everies.is_empty() {
72 self.ats.iter().max().cloned()
73 } else {
74 None
75 }
76 }
77
78 pub fn is_empty(&self) -> bool {
80 self.everies.is_empty() && self.ats.is_empty()
81 }
82}
83
84#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
85pub struct RefreshEvery {
86 pub interval: Duration,
87 pub aligned_to: Timestamp,
88}
89
90impl RefreshEvery {
91 pub fn round_up_timestamp(&self, timestamp: Timestamp) -> Timestamp {
100 let interval = u64::try_from(self.interval.as_millis()).unwrap();
101 let aligned_to = u64::from(self.aligned_to);
102 let timestamp = u64::from(timestamp);
103
104 let result = if timestamp > aligned_to {
105 let rounded = Self::round_up_to_multiple_of_interval(interval, timestamp - aligned_to);
106 aligned_to.saturating_add(rounded)
107 } else {
108 aligned_to - Self::round_down_to_multiple_of_interval(interval, aligned_to - timestamp)
114 };
115 assert!(result >= timestamp);
117 assert!(result - timestamp < interval);
118 Timestamp::new(result)
119 }
120
121 pub fn round_down_timestamp_m1(&self, timestamp: Timestamp) -> Timestamp {
130 let interval = u64::try_from(self.interval.as_millis()).unwrap();
131 let aligned_to = u64::from(self.aligned_to);
132 let timestamp = u64::from(timestamp.saturating_sub(1));
133
134 let result = if timestamp >= aligned_to {
135 aligned_to + Self::round_down_to_multiple_of_interval(interval, timestamp - aligned_to)
141 } else {
142 let rounded = Self::round_up_to_multiple_of_interval(interval, aligned_to - timestamp);
143 aligned_to.saturating_sub(rounded)
144 };
145 assert!(result <= timestamp);
147 assert!(timestamp - result < interval);
148 Timestamp::new(result)
149 }
150
151 fn round_up_to_multiple_of_interval(interval: u64, x: u64) -> u64 {
157 assert_ne!(x, 0);
158 (((x - 1) / interval) + 1).saturating_mul(interval)
159 }
160
161 fn round_down_to_multiple_of_interval(interval: u64, x: u64) -> u64 {
163 x / interval * interval
164 }
165}
166
167#[cfg(test)]
168mod tests {
169 use crate::Timestamp;
170 use crate::adt::interval::Interval;
171 use crate::refresh_schedule::{RefreshEvery, RefreshSchedule};
172 use std::str::FromStr;
173
174 #[mz_ore::test]
175 fn test_round_up_down_timestamp() {
176 let ts = |t: u64| Timestamp::new(t);
177 let test = |schedule: RefreshSchedule| {
178 move |expected_round_down_ts: Option<u64>,
179 expected_round_up_ts: Option<u64>,
180 input_ts| {
181 assert_eq!(
182 expected_round_down_ts.map(ts),
183 schedule.round_down_timestamp_m1(ts(input_ts)),
184 );
185 assert_eq!(
186 expected_round_up_ts.map(ts),
187 schedule.round_up_timestamp(ts(input_ts))
188 );
189 }
190 };
191 {
192 let schedule = RefreshSchedule {
193 everies: vec![],
194 ats: vec![ts(123), ts(456)],
195 };
196 let test = test(schedule);
197 test(None, Some(123), 0);
198 test(None, Some(123), 50);
199 test(None, Some(123), 122);
200 test(None, Some(123), 123);
201 test(Some(123), Some(456), 124);
202 test(Some(123), Some(456), 130);
203 test(Some(123), Some(456), 455);
204 test(Some(123), Some(456), 456);
205 test(Some(456), None, 457);
206 test(Some(456), None, 12345678);
207 test(Some(456), None, u64::MAX - 1000);
208 test(Some(456), None, u64::MAX - 1);
209 test(Some(456), None, u64::MAX);
210 }
211 {
212 let schedule = RefreshSchedule {
213 everies: vec![RefreshEvery {
214 interval: Interval::from_str("100 milliseconds")
215 .unwrap()
216 .duration()
217 .unwrap(),
218 aligned_to: ts(500),
219 }],
220 ats: vec![],
221 };
222 let test = test(schedule);
223 test(Some(0), Some(0), 0);
224 test(Some(0), Some(100), 1);
225 test(Some(0), Some(100), 2);
226 test(Some(0), Some(100), 99);
227 test(Some(0), Some(100), 100);
228 test(Some(100), Some(200), 101);
229 test(Some(100), Some(200), 102);
230 test(Some(100), Some(200), 199);
231 test(Some(100), Some(200), 200);
232 test(Some(200), Some(300), 201);
233 test(Some(300), Some(400), 400);
234 test(Some(400), Some(500), 401);
235 test(Some(400), Some(500), 450);
236 test(Some(400), Some(500), 499);
237 test(Some(400), Some(500), 500);
238 test(Some(500), Some(600), 501);
239 test(Some(500), Some(600), 599);
240 test(Some(500), Some(600), 600);
241 test(Some(600), Some(700), 601);
242 test(Some(5434532500), Some(5434532600), 5434532599);
243 test(Some(5434532500), Some(5434532600), 5434532600);
244 test(Some(5434532600), Some(5434532700), 5434532601);
245 test(Some(18446744073709551600), Some(u64::MAX), u64::MAX - 1);
246 test(Some(18446744073709551600), Some(u64::MAX), u64::MAX);
247 }
248 {
249 let schedule = RefreshSchedule {
250 everies: vec![RefreshEvery {
251 interval: Interval::from_str("100 milliseconds")
252 .unwrap()
253 .duration()
254 .unwrap(),
255 aligned_to: ts(542),
256 }],
257 ats: vec![],
258 };
259 let test = test(schedule);
260 test(Some(0), Some(42), 0);
261 test(Some(0), Some(42), 1);
262 test(Some(0), Some(42), 41);
263 test(Some(0), Some(42), 42);
264 test(Some(42), Some(142), 43);
265 test(Some(342), Some(442), 441);
266 test(Some(342), Some(442), 442);
267 test(Some(442), Some(542), 443);
268 test(Some(442), Some(542), 541);
269 test(Some(442), Some(542), 542);
270 test(Some(542), Some(642), 543);
271 test(Some(18446744073709551542), Some(u64::MAX), u64::MAX - 1);
272 test(Some(18446744073709551542), Some(u64::MAX), u64::MAX);
273 }
274 {
275 let schedule = RefreshSchedule {
276 everies: vec![
277 RefreshEvery {
278 interval: Interval::from_str("100 milliseconds")
279 .unwrap()
280 .duration()
281 .unwrap(),
282 aligned_to: ts(400),
283 },
284 RefreshEvery {
285 interval: Interval::from_str("100 milliseconds")
286 .unwrap()
287 .duration()
288 .unwrap(),
289 aligned_to: ts(542),
290 },
291 ],
292 ats: vec![ts(2), ts(300), ts(400), ts(471), ts(541), ts(123456)],
293 };
294 let test = test(schedule);
295 test(Some(0), Some(0), 0);
296 test(Some(0), Some(2), 1);
297 test(Some(0), Some(2), 2);
298 test(Some(2), Some(42), 3);
299 test(Some(2), Some(42), 41);
300 test(Some(2), Some(42), 42);
301 test(Some(42), Some(100), 43);
302 test(Some(42), Some(100), 99);
303 test(Some(42), Some(100), 100);
304 test(Some(100), Some(142), 101);
305 test(Some(100), Some(142), 141);
306 test(Some(100), Some(142), 142);
307 test(Some(142), Some(200), 143);
308 test(Some(242), Some(300), 243);
309 test(Some(242), Some(300), 299);
310 test(Some(242), Some(300), 300);
311 test(Some(300), Some(342), 301);
312 test(Some(342), Some(400), 343);
313 test(Some(342), Some(400), 399);
314 test(Some(342), Some(400), 400);
315 test(Some(400), Some(442), 401);
316 test(Some(400), Some(442), 441);
317 test(Some(400), Some(442), 442);
318 test(Some(442), Some(471), 443);
319 test(Some(442), Some(471), 470);
320 test(Some(442), Some(471), 471);
321 test(Some(471), Some(500), 472);
322 test(Some(471), Some(500), 480);
323 test(Some(471), Some(500), 500);
324 test(Some(500), Some(541), 501);
325 test(Some(500), Some(541), 540);
326 test(Some(500), Some(541), 541);
327 test(Some(541), Some(542), 542);
328 test(Some(542), Some(600), 543);
329 test(Some(65442), Some(65500), 65454);
330 test(Some(87800), Some(87842), 87831);
331 test(Some(123400), Some(123442), 123442);
332 test(Some(123442), Some(123456), 123443);
333 test(Some(123442), Some(123456), 123456);
334 test(Some(123456), Some(123500), 123457);
335 test(Some(18446744073709551600), Some(u64::MAX), u64::MAX - 1);
336 test(Some(18446744073709551600), Some(u64::MAX), u64::MAX);
337 }
338 }
339}