1use std::cmp::Ordering;
19use std::fmt::{self, Debug};
20use std::hash::Hash;
21
22use differential_dataflow::containers::CopyRegion;
23use serde::{Deserialize, Serialize};
24use timely::order::Product;
25use timely::progress::Antichain;
26use timely::progress::timestamp::{PathSummary, Refines, Timestamp};
27use timely::{ExchangeData, PartialOrder};
28use uuid::Uuid;
29
30use mz_ore::cast::CastFrom;
31
32#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
50pub struct Partitioned<P, T>(Product<Interval<P>, T>);
51
52impl<P: Clone + PartialOrd, T> Partitioned<P, T> {
53    pub fn new_singleton(partition: P, timestamp: T) -> Self {
55        let interval = Interval {
56            lower: partition.clone(),
57            upper: partition,
58        };
59        Self(Product::new(interval, timestamp))
60    }
61
62    pub fn new_range(lower: P, upper: P, timestamp: T) -> Self {
64        assert!(lower <= upper, "invalid range bounds");
65        Self(Product::new(Interval { lower, upper }, timestamp))
66    }
67
68    pub fn interval(&self) -> &Interval<P> {
70        &self.0.outer
71    }
72
73    pub fn timestamp(&self) -> &T {
75        &self.0.inner
76    }
77
78    pub fn timestamp_mut(&mut self) -> &mut T {
79        &mut self.0.inner
80    }
81}
82
83impl<P: Clone + PartialOrd + Step, T: Clone> Partitioned<P, T> {
84    pub fn split(&self, point: &P) -> (Option<Self>, Option<Self>) {
87        let (before, after) = self.interval().split(point);
88        let mapper = |interval| Self(Product::new(interval, self.timestamp().clone()));
89        (before.map(mapper), after.map(mapper))
90    }
91}
92
93impl<P: fmt::Display, T: fmt::Display> fmt::Display for Partitioned<P, T> {
94    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
95        f.write_str("(")?;
96        self.0.outer.fmt(f)?;
97        f.write_str(", ")?;
98        self.0.inner.fmt(f)?;
99        f.write_str(")")?;
100        Ok(())
101    }
102}
103
104impl<P, T: Timestamp> Timestamp for Partitioned<P, T>
105where
106    P: Extrema + Clone + Debug + ExchangeData + Hash + Ord,
107{
108    type Summary = ();
109    fn minimum() -> Self {
110        Self(Timestamp::minimum())
111    }
112}
113impl<P, T: Timestamp> Refines<()> for Partitioned<P, T>
114where
115    P: Extrema + Clone + Debug + ExchangeData + Hash + Ord,
116{
117    fn to_inner(_other: ()) -> Self {
118        Self::minimum()
119    }
120
121    fn to_outer(self) {}
122
123    fn summarize(_path: Self::Summary) {}
124}
125
126impl<P: Ord + Eq, T: PartialOrder> PartialOrder for Partitioned<P, T> {
127    #[inline]
128    fn less_equal(&self, other: &Self) -> bool {
129        self.0.less_equal(&other.0)
130    }
131}
132impl<P: Clone, T: Timestamp> PathSummary<Partitioned<P, T>> for () {
133    #[inline]
134    fn results_in(&self, src: &Partitioned<P, T>) -> Option<Partitioned<P, T>> {
135        Some(src.clone())
136    }
137
138    #[inline]
139    fn followed_by(&self, _other: &Self) -> Option<Self> {
140        Some(())
141    }
142}
143
144impl<P: Copy, T: Copy> columnation::Columnation for Partitioned<P, T> {
145    type InnerRegion = CopyRegion<Partitioned<P, T>>;
146}
147
148pub trait Extrema {
150    fn minimum() -> Self;
152    fn maximum() -> Self;
154}
155
156impl Extrema for u64 {
157    fn minimum() -> Self {
158        Self::MIN
159    }
160    fn maximum() -> Self {
161        Self::MAX
162    }
163}
164
165impl Extrema for i32 {
166    fn minimum() -> Self {
167        Self::MIN
168    }
169    fn maximum() -> Self {
170        Self::MAX
171    }
172}
173
174impl Extrema for Uuid {
175    fn minimum() -> Self {
176        Self::nil()
177    }
178    fn maximum() -> Self {
179        Self::max()
180    }
181}
182
183pub trait Step
185where
186    Self: Sized,
187{
188    fn backward_checked(&self, count: usize) -> Option<Self>;
190    fn forward_checked(&self, count: usize) -> Option<Self>;
192}
193
194impl Step for Uuid {
195    fn backward_checked(&self, count: usize) -> Option<Self> {
196        self.as_u128()
197            .checked_sub(u128::cast_from(count))
198            .map(Self::from_u128)
199    }
200    fn forward_checked(&self, count: usize) -> Option<Self> {
201        self.as_u128()
202            .checked_add(u128::cast_from(count))
203            .map(Self::from_u128)
204    }
205}
206
207#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
208pub struct Interval<P> {
210    pub lower: P,
211    pub upper: P,
212}
213
214impl<P: Eq> Interval<P> {
215    pub fn singleton(&self) -> Option<&P> {
217        if self.lower == self.upper {
218            Some(&self.lower)
219        } else {
220            None
221        }
222    }
223}
224
225impl<P: PartialOrd> Interval<P> {
226    pub fn contains(&self, other: &P) -> bool {
227        self.lower <= *other && *other <= self.upper
228    }
229}
230
231impl<P: Step + PartialOrd + Clone> Interval<P> {
232    pub fn split(&self, point: &P) -> (Option<Self>, Option<Self>) {
235        let before = match point.backward_checked(1) {
236            Some(bef) if self.lower <= bef => Some(Interval {
237                lower: self.lower.clone(),
238                upper: bef,
239            }),
240            _ => None,
241        };
242        let after = match point.forward_checked(1) {
243            Some(aft) if self.upper >= aft => Some(Interval {
244                lower: aft,
245                upper: self.upper.clone(),
246            }),
247            _ => None,
248        };
249        (before, after)
250    }
251}
252
253impl<P: Ord + Eq> PartialOrder for Interval<P> {
254    #[inline]
255    fn less_equal(&self, other: &Self) -> bool {
256        self.lower <= other.lower && other.upper <= self.upper
257    }
258}
259
260impl<P: Clone> PathSummary<Interval<P>> for () {
261    #[inline]
262    fn results_in(&self, src: &Interval<P>) -> Option<Interval<P>> {
263        Some(src.clone())
264    }
265
266    #[inline]
267    fn followed_by(&self, _other: &Self) -> Option<Self> {
268        Some(())
269    }
270}
271
272impl<P> Timestamp for Interval<P>
273where
274    P: Extrema + Clone + Debug + ExchangeData + Hash + Ord,
275{
276    type Summary = ();
277
278    #[inline]
279    fn minimum() -> Self {
280        Self {
281            lower: P::minimum(),
282            upper: P::maximum(),
283        }
284    }
285}
286
287impl<P: fmt::Display> fmt::Display for Interval<P> {
288    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
289        f.write_str("[")?;
290        self.lower.fmt(f)?;
291        f.write_str(", ")?;
292        self.upper.fmt(f)?;
293        f.write_str("]")?;
294        Ok(())
295    }
296}
297
298#[derive(Debug, Clone, Eq, PartialEq)]
303pub struct Reverse<T>(pub T);
304
305impl<T: PartialOrder> PartialOrder for Reverse<T> {
306    #[inline]
307    fn less_equal(&self, other: &Self) -> bool {
308        PartialOrder::less_equal(&other.0, &self.0)
309    }
310}
311impl<T: PartialOrd> PartialOrd for Reverse<T> {
312    #[inline]
313    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
314        other.0.partial_cmp(&self.0)
315    }
316}
317
318impl<T: Ord> Ord for Reverse<T> {
319    #[inline]
320    fn cmp(&self, other: &Self) -> Ordering {
321        other.0.cmp(&self.0)
322    }
323}
324
325#[cfg(test)]
326mod test {
327    use timely::progress::Antichain;
328
329    use super::*;
330
331    #[mz_ore::test]
332    fn basic_properties() {
333        let minimum: Partitioned<u64, u64> = Partitioned::minimum();
334        assert_eq!(minimum, Partitioned::new_range(0, u64::MAX, 0));
335        assert!(PartialOrder::less_equal(&minimum, &minimum));
336        assert!(!PartialOrder::less_than(&minimum, &minimum));
337
338        let lower = Partitioned::new_range(0, 9, 0);
340        let partition10 = Partitioned::new_singleton(10, 0);
341        let upper = Partitioned::new_range(11, u64::MAX, 0);
342        assert!(!PartialOrder::less_equal(&lower, &partition10));
343        assert!(!PartialOrder::less_equal(&partition10, &lower));
344        assert!(!PartialOrder::less_equal(&lower, &upper));
345        assert!(!PartialOrder::less_equal(&upper, &lower));
346        assert!(!PartialOrder::less_equal(&partition10, &upper));
347        assert!(!PartialOrder::less_equal(&upper, &partition10));
348
349        let partition5 = Partitioned::new_singleton(5, 0);
350        assert!(PartialOrder::less_than(&lower, &partition5));
352        assert!(!PartialOrder::less_equal(&upper, &partition5));
354        assert!(!PartialOrder::less_equal(&partition5, &upper));
355
356        let sub_range = Partitioned::new_range(2, 4, 0);
357        assert!(PartialOrder::less_than(&lower, &sub_range));
359        assert!(!PartialOrder::less_equal(&upper, &sub_range));
361        assert!(!PartialOrder::less_equal(&sub_range, &upper));
362
363        assert!(PartialOrder::less_equal(&lower, &lower));
365        assert!(PartialOrder::less_equal(&partition5, &partition5));
366        assert!(PartialOrder::less_equal(&upper, &upper));
367    }
368
369    #[mz_ore::test]
370    fn antichain_properties() {
371        let mut frontier = Antichain::new();
372
373        frontier.extend([
375            Partitioned::new_range(0, 9, 5),
376            Partitioned::new_singleton(10, 5),
377            Partitioned::new_range(11, u64::MAX, 5),
378        ]);
379        assert_eq!(frontier.len(), 3);
380
381        frontier.insert(Partitioned::new_range(0, u64::MAX, 4));
383        assert_eq!(
384            frontier,
385            Antichain::from_elem(Partitioned::new_range(0, u64::MAX, 4))
386        );
387
388        let frontier = Antichain::from_iter([
390            Partitioned::new_range(0, 9, 5),
391            Partitioned::new_singleton(10, 10),
392            Partitioned::new_range(11, u64::MAX, 5),
393        ]);
394
395        assert!(frontier.less_than(&Partitioned::new_singleton(10, 11)));
397        assert!(frontier.less_than(&Partitioned::new_singleton(0, 6)));
399        assert!(!frontier.less_than(&Partitioned::new_singleton(0, 4)));
401        assert!(frontier.less_than(&Partitioned::new_range(3, 5, 6)));
403        assert!(!frontier.less_than(&Partitioned::new_range(3, 5, 4)));
405    }
406}
407
408pub fn refine_antichain<T: Timestamp, Inner: Timestamp + Refines<T>>(
412    frontier: &Antichain<T>,
413) -> Antichain<Inner> {
414    Antichain::from_iter(frontier.iter().map(|t| Refines::to_inner(t.clone())))
415}