1use proptest::arbitrary::{Arbitrary, any};
11use proptest::prelude::{BoxedStrategy, Strategy};
12use std::time::Duration;
13
14use mz_proto::IntoRustIfSome;
15use mz_proto::{ProtoType, RustType, TryFromProtoError};
16use serde::{Deserialize, Serialize};
17
18use crate::Timestamp;
19
20include!(concat!(env!("OUT_DIR"), "/mz_repr.refresh_schedule.rs"));
21
22#[derive(Clone, Debug, Default, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
23pub struct RefreshSchedule {
24 pub everies: Vec<RefreshEvery>,
26 pub ats: Vec<Timestamp>,
28}
29
30impl RefreshSchedule {
31 pub fn round_up_timestamp(&self, timestamp: Timestamp) -> Option<Timestamp> {
37 let everies = self.everies.iter();
38 let next_everies = everies.map(|every| every.round_up_timestamp(timestamp));
39 let next_ats = self.ats.iter().copied().filter(|at| *at >= timestamp);
40 next_everies.chain(next_ats).min()
41 }
42
43 pub fn round_down_timestamp_m1(&self, timestamp: Timestamp) -> Option<Timestamp> {
49 let everies = self.everies.iter();
50 let prev_everies = everies.map(|every| every.round_down_timestamp_m1(timestamp));
51 let prev_ats = self.ats.iter().copied().filter(|at| *at < timestamp);
54 prev_everies.chain(prev_ats).max()
55 }
56
57 pub fn last_refresh(&self) -> Option<Timestamp> {
67 if self.everies.is_empty() {
68 self.ats.iter().max().cloned()
69 } else {
70 None
71 }
72 }
73
74 pub fn is_empty(&self) -> bool {
76 self.everies.is_empty() && self.ats.is_empty()
77 }
78}
79
80#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
81pub struct RefreshEvery {
82 pub interval: Duration,
83 pub aligned_to: Timestamp,
84}
85
86impl RefreshEvery {
87 pub fn round_up_timestamp(&self, timestamp: Timestamp) -> Timestamp {
96 let interval = u64::try_from(self.interval.as_millis()).unwrap();
97 let aligned_to = u64::from(self.aligned_to);
98 let timestamp = u64::from(timestamp);
99
100 let result = if timestamp > aligned_to {
101 let rounded = Self::round_up_to_multiple_of_interval(interval, timestamp - aligned_to);
102 aligned_to.saturating_add(rounded)
103 } else {
104 aligned_to - Self::round_down_to_multiple_of_interval(interval, aligned_to - timestamp)
110 };
111 assert!(result >= timestamp);
113 assert!(result - timestamp < interval);
114 Timestamp::new(result)
115 }
116
117 pub fn round_down_timestamp_m1(&self, timestamp: Timestamp) -> Timestamp {
126 let interval = u64::try_from(self.interval.as_millis()).unwrap();
127 let aligned_to = u64::from(self.aligned_to);
128 let timestamp = u64::from(timestamp.saturating_sub(1));
129
130 let result = if timestamp >= aligned_to {
131 aligned_to + Self::round_down_to_multiple_of_interval(interval, timestamp - aligned_to)
137 } else {
138 let rounded = Self::round_up_to_multiple_of_interval(interval, aligned_to - timestamp);
139 aligned_to.saturating_sub(rounded)
140 };
141 assert!(result <= timestamp);
143 assert!(timestamp - result < interval);
144 Timestamp::new(result)
145 }
146
147 fn round_up_to_multiple_of_interval(interval: u64, x: u64) -> u64 {
153 assert_ne!(x, 0);
154 (((x - 1) / interval) + 1).saturating_mul(interval)
155 }
156
157 fn round_down_to_multiple_of_interval(interval: u64, x: u64) -> u64 {
159 x / interval * interval
160 }
161}
162
163impl RustType<ProtoRefreshSchedule> for RefreshSchedule {
164 fn into_proto(&self) -> ProtoRefreshSchedule {
165 ProtoRefreshSchedule {
166 everies: self.everies.into_proto(),
167 ats: self.ats.into_proto(),
168 }
169 }
170
171 fn from_proto(proto: ProtoRefreshSchedule) -> Result<Self, TryFromProtoError> {
172 Ok(RefreshSchedule {
173 everies: proto.everies.into_rust()?,
174 ats: proto.ats.into_rust()?,
175 })
176 }
177}
178
179impl RustType<ProtoRefreshEvery> for RefreshEvery {
180 fn into_proto(&self) -> ProtoRefreshEvery {
181 ProtoRefreshEvery {
182 interval: Some(self.interval.into_proto()),
183 aligned_to: Some(self.aligned_to.into_proto()),
184 }
185 }
186
187 fn from_proto(proto: ProtoRefreshEvery) -> Result<Self, TryFromProtoError> {
188 Ok(RefreshEvery {
189 interval: proto
190 .interval
191 .into_rust_if_some("ProtoRefreshEvery::interval")?,
192 aligned_to: proto
193 .aligned_to
194 .into_rust_if_some("ProtoRefreshEvery::aligned_to")?,
195 })
196 }
197}
198
199impl Arbitrary for RefreshSchedule {
200 type Strategy = BoxedStrategy<Self>;
201 type Parameters = ();
202
203 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
204 (
205 proptest::collection::vec(any::<RefreshEvery>(), 0..4),
206 proptest::collection::vec(any::<Timestamp>(), 0..4),
207 )
208 .prop_map(|(everies, ats)| RefreshSchedule { everies, ats })
209 .boxed()
210 }
211}
212
213impl Arbitrary for RefreshEvery {
214 type Strategy = BoxedStrategy<Self>;
215 type Parameters = ();
216
217 fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
218 (any::<Duration>(), any::<Timestamp>())
219 .prop_map(|(interval, aligned_to)| RefreshEvery {
220 interval,
221 aligned_to,
222 })
223 .boxed()
224 }
225}
226
227#[cfg(test)]
228mod tests {
229 use crate::Timestamp;
230 use crate::adt::interval::Interval;
231 use crate::refresh_schedule::{RefreshEvery, RefreshSchedule};
232 use std::str::FromStr;
233
234 #[mz_ore::test]
235 fn test_round_up_down_timestamp() {
236 let ts = |t: u64| Timestamp::new(t);
237 let test = |schedule: RefreshSchedule| {
238 move |expected_round_down_ts: Option<u64>,
239 expected_round_up_ts: Option<u64>,
240 input_ts| {
241 assert_eq!(
242 expected_round_down_ts.map(ts),
243 schedule.round_down_timestamp_m1(ts(input_ts)),
244 );
245 assert_eq!(
246 expected_round_up_ts.map(ts),
247 schedule.round_up_timestamp(ts(input_ts))
248 );
249 }
250 };
251 {
252 let schedule = RefreshSchedule {
253 everies: vec![],
254 ats: vec![ts(123), ts(456)],
255 };
256 let test = test(schedule);
257 test(None, Some(123), 0);
258 test(None, Some(123), 50);
259 test(None, Some(123), 122);
260 test(None, Some(123), 123);
261 test(Some(123), Some(456), 124);
262 test(Some(123), Some(456), 130);
263 test(Some(123), Some(456), 455);
264 test(Some(123), Some(456), 456);
265 test(Some(456), None, 457);
266 test(Some(456), None, 12345678);
267 test(Some(456), None, u64::MAX - 1000);
268 test(Some(456), None, u64::MAX - 1);
269 test(Some(456), None, u64::MAX);
270 }
271 {
272 let schedule = RefreshSchedule {
273 everies: vec![RefreshEvery {
274 interval: Interval::from_str("100 milliseconds")
275 .unwrap()
276 .duration()
277 .unwrap(),
278 aligned_to: ts(500),
279 }],
280 ats: vec![],
281 };
282 let test = test(schedule);
283 test(Some(0), Some(0), 0);
284 test(Some(0), Some(100), 1);
285 test(Some(0), Some(100), 2);
286 test(Some(0), Some(100), 99);
287 test(Some(0), Some(100), 100);
288 test(Some(100), Some(200), 101);
289 test(Some(100), Some(200), 102);
290 test(Some(100), Some(200), 199);
291 test(Some(100), Some(200), 200);
292 test(Some(200), Some(300), 201);
293 test(Some(300), Some(400), 400);
294 test(Some(400), Some(500), 401);
295 test(Some(400), Some(500), 450);
296 test(Some(400), Some(500), 499);
297 test(Some(400), Some(500), 500);
298 test(Some(500), Some(600), 501);
299 test(Some(500), Some(600), 599);
300 test(Some(500), Some(600), 600);
301 test(Some(600), Some(700), 601);
302 test(Some(5434532500), Some(5434532600), 5434532599);
303 test(Some(5434532500), Some(5434532600), 5434532600);
304 test(Some(5434532600), Some(5434532700), 5434532601);
305 test(Some(18446744073709551600), Some(u64::MAX), u64::MAX - 1);
306 test(Some(18446744073709551600), Some(u64::MAX), u64::MAX);
307 }
308 {
309 let schedule = RefreshSchedule {
310 everies: vec![RefreshEvery {
311 interval: Interval::from_str("100 milliseconds")
312 .unwrap()
313 .duration()
314 .unwrap(),
315 aligned_to: ts(542),
316 }],
317 ats: vec![],
318 };
319 let test = test(schedule);
320 test(Some(0), Some(42), 0);
321 test(Some(0), Some(42), 1);
322 test(Some(0), Some(42), 41);
323 test(Some(0), Some(42), 42);
324 test(Some(42), Some(142), 43);
325 test(Some(342), Some(442), 441);
326 test(Some(342), Some(442), 442);
327 test(Some(442), Some(542), 443);
328 test(Some(442), Some(542), 541);
329 test(Some(442), Some(542), 542);
330 test(Some(542), Some(642), 543);
331 test(Some(18446744073709551542), Some(u64::MAX), u64::MAX - 1);
332 test(Some(18446744073709551542), Some(u64::MAX), u64::MAX);
333 }
334 {
335 let schedule = RefreshSchedule {
336 everies: vec![
337 RefreshEvery {
338 interval: Interval::from_str("100 milliseconds")
339 .unwrap()
340 .duration()
341 .unwrap(),
342 aligned_to: ts(400),
343 },
344 RefreshEvery {
345 interval: Interval::from_str("100 milliseconds")
346 .unwrap()
347 .duration()
348 .unwrap(),
349 aligned_to: ts(542),
350 },
351 ],
352 ats: vec![ts(2), ts(300), ts(400), ts(471), ts(541), ts(123456)],
353 };
354 let test = test(schedule);
355 test(Some(0), Some(0), 0);
356 test(Some(0), Some(2), 1);
357 test(Some(0), Some(2), 2);
358 test(Some(2), Some(42), 3);
359 test(Some(2), Some(42), 41);
360 test(Some(2), Some(42), 42);
361 test(Some(42), Some(100), 43);
362 test(Some(42), Some(100), 99);
363 test(Some(42), Some(100), 100);
364 test(Some(100), Some(142), 101);
365 test(Some(100), Some(142), 141);
366 test(Some(100), Some(142), 142);
367 test(Some(142), Some(200), 143);
368 test(Some(242), Some(300), 243);
369 test(Some(242), Some(300), 299);
370 test(Some(242), Some(300), 300);
371 test(Some(300), Some(342), 301);
372 test(Some(342), Some(400), 343);
373 test(Some(342), Some(400), 399);
374 test(Some(342), Some(400), 400);
375 test(Some(400), Some(442), 401);
376 test(Some(400), Some(442), 441);
377 test(Some(400), Some(442), 442);
378 test(Some(442), Some(471), 443);
379 test(Some(442), Some(471), 470);
380 test(Some(442), Some(471), 471);
381 test(Some(471), Some(500), 472);
382 test(Some(471), Some(500), 480);
383 test(Some(471), Some(500), 500);
384 test(Some(500), Some(541), 501);
385 test(Some(500), Some(541), 540);
386 test(Some(500), Some(541), 541);
387 test(Some(541), Some(542), 542);
388 test(Some(542), Some(600), 543);
389 test(Some(65442), Some(65500), 65454);
390 test(Some(87800), Some(87842), 87831);
391 test(Some(123400), Some(123442), 123442);
392 test(Some(123442), Some(123456), 123443);
393 test(Some(123442), Some(123456), 123456);
394 test(Some(123456), Some(123500), 123457);
395 test(Some(18446744073709551600), Some(u64::MAX), u64::MAX - 1);
396 test(Some(18446744073709551600), Some(u64::MAX), u64::MAX);
397 }
398 }
399}