1use std::cmp::Ordering;
19use std::fmt::{self, Debug};
20use std::hash::Hash;
21
22use differential_dataflow::containers::CopyRegion;
23use serde::{Deserialize, Serialize};
24use timely::order::Product;
25use timely::progress::Antichain;
26use timely::progress::timestamp::{PathSummary, Refines, Timestamp};
27use timely::{ExchangeData, PartialOrder};
28use uuid::Uuid;
29
30use mz_ore::cast::CastFrom;
31
32#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
50pub struct Partitioned<P, T>(Product<Interval<P>, T>);
51
52impl<P: Clone + PartialOrd, T> Partitioned<P, T> {
53 pub fn new_singleton(partition: P, timestamp: T) -> Self {
55 let interval = Interval {
56 lower: partition.clone(),
57 upper: partition,
58 };
59 Self(Product::new(interval, timestamp))
60 }
61
62 pub fn new_range(lower: P, upper: P, timestamp: T) -> Self {
64 assert!(lower <= upper, "invalid range bounds");
65 Self(Product::new(Interval { lower, upper }, timestamp))
66 }
67
68 pub fn interval(&self) -> &Interval<P> {
70 &self.0.outer
71 }
72
73 pub fn timestamp(&self) -> &T {
75 &self.0.inner
76 }
77
78 pub fn timestamp_mut(&mut self) -> &mut T {
79 &mut self.0.inner
80 }
81}
82
83impl<P: Clone + PartialOrd + Step, T: Clone> Partitioned<P, T> {
84 pub fn split(&self, point: &P) -> (Option<Self>, Option<Self>) {
87 let (before, after) = self.interval().split(point);
88 let mapper = |interval| Self(Product::new(interval, self.timestamp().clone()));
89 (before.map(mapper), after.map(mapper))
90 }
91}
92
93impl<P: fmt::Display, T: fmt::Display> fmt::Display for Partitioned<P, T> {
94 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
95 f.write_str("(")?;
96 self.0.outer.fmt(f)?;
97 f.write_str(", ")?;
98 self.0.inner.fmt(f)?;
99 f.write_str(")")?;
100 Ok(())
101 }
102}
103
104impl<P, T: Timestamp> Timestamp for Partitioned<P, T>
105where
106 P: Extrema + Clone + Debug + ExchangeData + Hash + Ord,
107{
108 type Summary = ();
109 fn minimum() -> Self {
110 Self(Timestamp::minimum())
111 }
112}
113impl<P, T: Timestamp> Refines<()> for Partitioned<P, T>
114where
115 P: Extrema + Clone + Debug + ExchangeData + Hash + Ord,
116{
117 fn to_inner(_other: ()) -> Self {
118 Self::minimum()
119 }
120
121 fn to_outer(self) {}
122
123 fn summarize(_path: Self::Summary) {}
124}
125
126impl<P: Ord + Eq, T: PartialOrder> PartialOrder for Partitioned<P, T> {
127 #[inline]
128 fn less_equal(&self, other: &Self) -> bool {
129 self.0.less_equal(&other.0)
130 }
131}
132impl<P: Clone, T: Timestamp> PathSummary<Partitioned<P, T>> for () {
133 #[inline]
134 fn results_in(&self, src: &Partitioned<P, T>) -> Option<Partitioned<P, T>> {
135 Some(src.clone())
136 }
137
138 #[inline]
139 fn followed_by(&self, _other: &Self) -> Option<Self> {
140 Some(())
141 }
142}
143
144impl<P: Copy, T: Copy> columnation::Columnation for Partitioned<P, T> {
145 type InnerRegion = CopyRegion<Partitioned<P, T>>;
146}
147
148pub trait Extrema {
150 fn minimum() -> Self;
152 fn maximum() -> Self;
154}
155
156impl Extrema for u64 {
157 fn minimum() -> Self {
158 Self::MIN
159 }
160 fn maximum() -> Self {
161 Self::MAX
162 }
163}
164
165impl Extrema for i32 {
166 fn minimum() -> Self {
167 Self::MIN
168 }
169 fn maximum() -> Self {
170 Self::MAX
171 }
172}
173
174impl Extrema for Uuid {
175 fn minimum() -> Self {
176 Self::nil()
177 }
178 fn maximum() -> Self {
179 Self::max()
180 }
181}
182
183pub trait Step
185where
186 Self: Sized,
187{
188 fn backward_checked(&self, count: usize) -> Option<Self>;
190 fn forward_checked(&self, count: usize) -> Option<Self>;
192}
193
194impl Step for Uuid {
195 fn backward_checked(&self, count: usize) -> Option<Self> {
196 self.as_u128()
197 .checked_sub(u128::cast_from(count))
198 .map(Self::from_u128)
199 }
200 fn forward_checked(&self, count: usize) -> Option<Self> {
201 self.as_u128()
202 .checked_add(u128::cast_from(count))
203 .map(Self::from_u128)
204 }
205}
206
207#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
208pub struct Interval<P> {
210 pub lower: P,
211 pub upper: P,
212}
213
214impl<P: Eq> Interval<P> {
215 pub fn singleton(&self) -> Option<&P> {
217 if self.lower == self.upper {
218 Some(&self.lower)
219 } else {
220 None
221 }
222 }
223}
224
225impl<P: PartialOrd> Interval<P> {
226 pub fn contains(&self, other: &P) -> bool {
227 self.lower <= *other && *other <= self.upper
228 }
229}
230
231impl<P: Step + PartialOrd + Clone> Interval<P> {
232 pub fn split(&self, point: &P) -> (Option<Self>, Option<Self>) {
235 let before = match point.backward_checked(1) {
236 Some(bef) if self.lower <= bef => Some(Interval {
237 lower: self.lower.clone(),
238 upper: bef,
239 }),
240 _ => None,
241 };
242 let after = match point.forward_checked(1) {
243 Some(aft) if self.upper >= aft => Some(Interval {
244 lower: aft,
245 upper: self.upper.clone(),
246 }),
247 _ => None,
248 };
249 (before, after)
250 }
251}
252
253impl<P: Ord + Eq> PartialOrder for Interval<P> {
254 #[inline]
255 fn less_equal(&self, other: &Self) -> bool {
256 self.lower <= other.lower && other.upper <= self.upper
257 }
258}
259
260impl<P: Clone> PathSummary<Interval<P>> for () {
261 #[inline]
262 fn results_in(&self, src: &Interval<P>) -> Option<Interval<P>> {
263 Some(src.clone())
264 }
265
266 #[inline]
267 fn followed_by(&self, _other: &Self) -> Option<Self> {
268 Some(())
269 }
270}
271
272impl<P> Timestamp for Interval<P>
273where
274 P: Extrema + Clone + Debug + ExchangeData + Hash + Ord,
275{
276 type Summary = ();
277
278 #[inline]
279 fn minimum() -> Self {
280 Self {
281 lower: P::minimum(),
282 upper: P::maximum(),
283 }
284 }
285}
286
287impl<P: fmt::Display> fmt::Display for Interval<P> {
288 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
289 f.write_str("[")?;
290 self.lower.fmt(f)?;
291 f.write_str(", ")?;
292 self.upper.fmt(f)?;
293 f.write_str("]")?;
294 Ok(())
295 }
296}
297
298#[derive(Debug, Clone, Eq, PartialEq)]
303pub struct Reverse<T>(pub T);
304
305impl<T: PartialOrder> PartialOrder for Reverse<T> {
306 #[inline]
307 fn less_equal(&self, other: &Self) -> bool {
308 PartialOrder::less_equal(&other.0, &self.0)
309 }
310}
311impl<T: PartialOrd> PartialOrd for Reverse<T> {
312 #[inline]
313 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
314 other.0.partial_cmp(&self.0)
315 }
316}
317
318impl<T: Ord> Ord for Reverse<T> {
319 #[inline]
320 fn cmp(&self, other: &Self) -> Ordering {
321 other.0.cmp(&self.0)
322 }
323}
324
325#[cfg(test)]
326mod test {
327 use timely::progress::Antichain;
328
329 use super::*;
330
331 #[mz_ore::test]
332 fn basic_properties() {
333 let minimum: Partitioned<u64, u64> = Partitioned::minimum();
334 assert_eq!(minimum, Partitioned::new_range(0, u64::MAX, 0));
335 assert!(PartialOrder::less_equal(&minimum, &minimum));
336 assert!(!PartialOrder::less_than(&minimum, &minimum));
337
338 let lower = Partitioned::new_range(0, 9, 0);
340 let partition10 = Partitioned::new_singleton(10, 0);
341 let upper = Partitioned::new_range(11, u64::MAX, 0);
342 assert!(!PartialOrder::less_equal(&lower, &partition10));
343 assert!(!PartialOrder::less_equal(&partition10, &lower));
344 assert!(!PartialOrder::less_equal(&lower, &upper));
345 assert!(!PartialOrder::less_equal(&upper, &lower));
346 assert!(!PartialOrder::less_equal(&partition10, &upper));
347 assert!(!PartialOrder::less_equal(&upper, &partition10));
348
349 let partition5 = Partitioned::new_singleton(5, 0);
350 assert!(PartialOrder::less_than(&lower, &partition5));
352 assert!(!PartialOrder::less_equal(&upper, &partition5));
354 assert!(!PartialOrder::less_equal(&partition5, &upper));
355
356 let sub_range = Partitioned::new_range(2, 4, 0);
357 assert!(PartialOrder::less_than(&lower, &sub_range));
359 assert!(!PartialOrder::less_equal(&upper, &sub_range));
361 assert!(!PartialOrder::less_equal(&sub_range, &upper));
362
363 assert!(PartialOrder::less_equal(&lower, &lower));
365 assert!(PartialOrder::less_equal(&partition5, &partition5));
366 assert!(PartialOrder::less_equal(&upper, &upper));
367 }
368
369 #[mz_ore::test]
370 fn antichain_properties() {
371 let mut frontier = Antichain::new();
372
373 frontier.extend([
375 Partitioned::new_range(0, 9, 5),
376 Partitioned::new_singleton(10, 5),
377 Partitioned::new_range(11, u64::MAX, 5),
378 ]);
379 assert_eq!(frontier.len(), 3);
380
381 frontier.insert(Partitioned::new_range(0, u64::MAX, 4));
383 assert_eq!(
384 frontier,
385 Antichain::from_elem(Partitioned::new_range(0, u64::MAX, 4))
386 );
387
388 let frontier = Antichain::from_iter([
390 Partitioned::new_range(0, 9, 5),
391 Partitioned::new_singleton(10, 10),
392 Partitioned::new_range(11, u64::MAX, 5),
393 ]);
394
395 assert!(frontier.less_than(&Partitioned::new_singleton(10, 11)));
397 assert!(frontier.less_than(&Partitioned::new_singleton(0, 6)));
399 assert!(!frontier.less_than(&Partitioned::new_singleton(0, 4)));
401 assert!(frontier.less_than(&Partitioned::new_range(3, 5, 6)));
403 assert!(!frontier.less_than(&Partitioned::new_range(3, 5, 4)));
405 }
406}
407
408pub fn refine_antichain<T: Timestamp, Inner: Timestamp + Refines<T>>(
412 frontier: &Antichain<T>,
413) -> Antichain<Inner> {
414 Antichain::from_iter(frontier.iter().map(|t| Refines::to_inner(t.clone())))
415}