mz_timely_util/
order.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Traits and types for partially ordered sets.
17
18use std::cmp::Ordering;
19use std::fmt::{self, Debug};
20use std::hash::Hash;
21
22use differential_dataflow::containers::CopyRegion;
23use serde::{Deserialize, Serialize};
24use timely::order::Product;
25use timely::progress::Antichain;
26use timely::progress::timestamp::{PathSummary, Refines, Timestamp};
27use timely::{ExchangeData, PartialOrder};
28use uuid::Uuid;
29
30use mz_ore::cast::CastFrom;
31
32/// A partially ordered timestamp that is partitioned by an arbitrary number of partitions
33/// identified by `P`. The construction allows for efficient representation of frontiers with
34/// Antichains.
35///
36/// A `Partitioned<P, T>` timestamp is internally the product order of an `Interval<P>` and a bare
37/// timestamp `T`. An `Interval<P>` represents an inclusive range of values from the type `P` and
38/// its partial order corresponds to the subset order.
39///
40/// Effectively, the minimum `Partitioned` timestamp will start out with the maximum possible
41/// `Interval<P>` on one side and the minimum timestamp `T` on the other side. Users of this
42/// timestamp can selectively downgrade the timestamp by advancing `T`, shrinking the interval, or
43/// both.
44///
45/// Antichains of this type are efficient in storage. In the worst case, where all chosen
46/// partitions have gaps between them, the produced antichain has twice as many elements as
47/// partitions. This is because the "dead space" between the selected partitions must have a
48/// representative timestamp in order for that space to be useable in the future.
49#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
50pub struct Partitioned<P, T>(Product<Interval<P>, T>);
51
52impl<P: Clone + PartialOrd, T> Partitioned<P, T> {
53    /// Constructs a new timestamp for a specific partition.
54    pub fn new_singleton(partition: P, timestamp: T) -> Self {
55        let interval = Interval {
56            lower: partition.clone(),
57            upper: partition,
58        };
59        Self(Product::new(interval, timestamp))
60    }
61
62    /// Constructs a new timestamp for a partition range.
63    pub fn new_range(lower: P, upper: P, timestamp: T) -> Self {
64        assert!(lower <= upper, "invalid range bounds");
65        Self(Product::new(Interval { lower, upper }, timestamp))
66    }
67
68    /// Returns the interval component of this partitioned timestamp.
69    pub fn interval(&self) -> &Interval<P> {
70        &self.0.outer
71    }
72
73    /// Returns the timestamp component of this partitioned timestamp.
74    pub fn timestamp(&self) -> &T {
75        &self.0.inner
76    }
77
78    pub fn timestamp_mut(&mut self) -> &mut T {
79        &mut self.0.inner
80    }
81}
82
83impl<P: Clone + PartialOrd + Step, T: Clone> Partitioned<P, T> {
84    /// Returns up to two partitions that contain the interval before and/or
85    /// after given partition point, neither of which contain the point.
86    pub fn split(&self, point: &P) -> (Option<Self>, Option<Self>) {
87        let (before, after) = self.interval().split(point);
88        let mapper = |interval| Self(Product::new(interval, self.timestamp().clone()));
89        (before.map(mapper), after.map(mapper))
90    }
91}
92
93impl<P: fmt::Display, T: fmt::Display> fmt::Display for Partitioned<P, T> {
94    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
95        f.write_str("(")?;
96        self.0.outer.fmt(f)?;
97        f.write_str(", ")?;
98        self.0.inner.fmt(f)?;
99        f.write_str(")")?;
100        Ok(())
101    }
102}
103
104impl<P, T: Timestamp> Timestamp for Partitioned<P, T>
105where
106    P: Extrema + Clone + Debug + ExchangeData + Hash + Ord,
107{
108    type Summary = ();
109    fn minimum() -> Self {
110        Self(Timestamp::minimum())
111    }
112}
113impl<P, T: Timestamp> Refines<()> for Partitioned<P, T>
114where
115    P: Extrema + Clone + Debug + ExchangeData + Hash + Ord,
116{
117    fn to_inner(_other: ()) -> Self {
118        Self::minimum()
119    }
120
121    fn to_outer(self) {}
122
123    fn summarize(_path: Self::Summary) {}
124}
125
126impl<P: Ord + Eq, T: PartialOrder> PartialOrder for Partitioned<P, T> {
127    #[inline]
128    fn less_equal(&self, other: &Self) -> bool {
129        self.0.less_equal(&other.0)
130    }
131}
132impl<P: Clone, T: Timestamp> PathSummary<Partitioned<P, T>> for () {
133    #[inline]
134    fn results_in(&self, src: &Partitioned<P, T>) -> Option<Partitioned<P, T>> {
135        Some(src.clone())
136    }
137
138    #[inline]
139    fn followed_by(&self, _other: &Self) -> Option<Self> {
140        Some(())
141    }
142}
143
144impl<P: Copy, T: Copy> columnation::Columnation for Partitioned<P, T> {
145    type InnerRegion = CopyRegion<Partitioned<P, T>>;
146}
147
148/// A trait defining the minimum and maximum values of a type.
149pub trait Extrema {
150    /// The minimum value of this type.
151    fn minimum() -> Self;
152    /// The maximum value of this type.
153    fn maximum() -> Self;
154}
155
156impl Extrema for u64 {
157    fn minimum() -> Self {
158        Self::MIN
159    }
160    fn maximum() -> Self {
161        Self::MAX
162    }
163}
164
165impl Extrema for i32 {
166    fn minimum() -> Self {
167        Self::MIN
168    }
169    fn maximum() -> Self {
170        Self::MAX
171    }
172}
173
174impl Extrema for Uuid {
175    fn minimum() -> Self {
176        Self::nil()
177    }
178    fn maximum() -> Self {
179        Self::max()
180    }
181}
182
183// TODO: Switch to the std one when it's no longer unstable: https://doc.rust-lang.org/std/iter/trait.Step.html
184pub trait Step
185where
186    Self: Sized,
187{
188    // Returns the element value sequenced before the type.
189    fn backward_checked(&self, count: usize) -> Option<Self>;
190    // Returns the element value sequenced after the type.
191    fn forward_checked(&self, count: usize) -> Option<Self>;
192}
193
194impl Step for Uuid {
195    fn backward_checked(&self, count: usize) -> Option<Self> {
196        self.as_u128()
197            .checked_sub(u128::cast_from(count))
198            .map(Self::from_u128)
199    }
200    fn forward_checked(&self, count: usize) -> Option<Self> {
201        self.as_u128()
202            .checked_add(u128::cast_from(count))
203            .map(Self::from_u128)
204    }
205}
206
207#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
208/// A type representing an inclusive interval of type `P`, ordered under the subset relation.
209pub struct Interval<P> {
210    pub lower: P,
211    pub upper: P,
212}
213
214impl<P: Eq> Interval<P> {
215    /// Returns the contained element if it's a singleton set.
216    pub fn singleton(&self) -> Option<&P> {
217        if self.lower == self.upper {
218            Some(&self.lower)
219        } else {
220            None
221        }
222    }
223}
224
225impl<P: PartialOrd> Interval<P> {
226    pub fn contains(&self, other: &P) -> bool {
227        self.lower <= *other && *other <= self.upper
228    }
229}
230
231impl<P: Step + PartialOrd + Clone> Interval<P> {
232    /// Returns up to two intervals that contain the range before and/or after given point,
233    /// neither of which contain the point.
234    pub fn split(&self, point: &P) -> (Option<Self>, Option<Self>) {
235        let before = match point.backward_checked(1) {
236            Some(bef) if self.lower <= bef => Some(Interval {
237                lower: self.lower.clone(),
238                upper: bef,
239            }),
240            _ => None,
241        };
242        let after = match point.forward_checked(1) {
243            Some(aft) if self.upper >= aft => Some(Interval {
244                lower: aft,
245                upper: self.upper.clone(),
246            }),
247            _ => None,
248        };
249        (before, after)
250    }
251}
252
253impl<P: Ord + Eq> PartialOrder for Interval<P> {
254    #[inline]
255    fn less_equal(&self, other: &Self) -> bool {
256        self.lower <= other.lower && other.upper <= self.upper
257    }
258}
259
260impl<P: Clone> PathSummary<Interval<P>> for () {
261    #[inline]
262    fn results_in(&self, src: &Interval<P>) -> Option<Interval<P>> {
263        Some(src.clone())
264    }
265
266    #[inline]
267    fn followed_by(&self, _other: &Self) -> Option<Self> {
268        Some(())
269    }
270}
271
272impl<P> Timestamp for Interval<P>
273where
274    P: Extrema + Clone + Debug + ExchangeData + Hash + Ord,
275{
276    type Summary = ();
277
278    #[inline]
279    fn minimum() -> Self {
280        Self {
281            lower: P::minimum(),
282            upper: P::maximum(),
283        }
284    }
285}
286
287impl<P: fmt::Display> fmt::Display for Interval<P> {
288    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
289        f.write_str("[")?;
290        self.lower.fmt(f)?;
291        f.write_str(", ")?;
292        self.upper.fmt(f)?;
293        f.write_str("]")?;
294        Ok(())
295    }
296}
297
298/// A helper struct for reverse partial ordering.
299///
300/// This struct is a helper that can be used with `Antichain` when the maximum inclusive frontier
301/// needs to be maintained as opposed to the mininimum inclusive.
302#[derive(Debug, Clone, Eq, PartialEq)]
303pub struct Reverse<T>(pub T);
304
305impl<T: PartialOrder> PartialOrder for Reverse<T> {
306    #[inline]
307    fn less_equal(&self, other: &Self) -> bool {
308        PartialOrder::less_equal(&other.0, &self.0)
309    }
310}
311impl<T: PartialOrd> PartialOrd for Reverse<T> {
312    #[inline]
313    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
314        other.0.partial_cmp(&self.0)
315    }
316}
317
318impl<T: Ord> Ord for Reverse<T> {
319    #[inline]
320    fn cmp(&self, other: &Self) -> Ordering {
321        other.0.cmp(&self.0)
322    }
323}
324
325#[cfg(test)]
326mod test {
327    use timely::progress::Antichain;
328
329    use super::*;
330
331    #[mz_ore::test]
332    fn basic_properties() {
333        let minimum: Partitioned<u64, u64> = Partitioned::minimum();
334        assert_eq!(minimum, Partitioned::new_range(0, u64::MAX, 0));
335        assert!(PartialOrder::less_equal(&minimum, &minimum));
336        assert!(!PartialOrder::less_than(&minimum, &minimum));
337
338        // All of these should be uncomparable in pairs
339        let lower = Partitioned::new_range(0, 9, 0);
340        let partition10 = Partitioned::new_singleton(10, 0);
341        let upper = Partitioned::new_range(11, u64::MAX, 0);
342        assert!(!PartialOrder::less_equal(&lower, &partition10));
343        assert!(!PartialOrder::less_equal(&partition10, &lower));
344        assert!(!PartialOrder::less_equal(&lower, &upper));
345        assert!(!PartialOrder::less_equal(&upper, &lower));
346        assert!(!PartialOrder::less_equal(&partition10, &upper));
347        assert!(!PartialOrder::less_equal(&upper, &partition10));
348
349        let partition5 = Partitioned::new_singleton(5, 0);
350        // Point 5 is greater than the lower range
351        assert!(PartialOrder::less_than(&lower, &partition5));
352        // But uncomparable with the upper range
353        assert!(!PartialOrder::less_equal(&upper, &partition5));
354        assert!(!PartialOrder::less_equal(&partition5, &upper));
355
356        let sub_range = Partitioned::new_range(2, 4, 0);
357        // This is a subrange of lower
358        assert!(PartialOrder::less_than(&lower, &sub_range));
359        // But uncomparable with the upper range
360        assert!(!PartialOrder::less_equal(&upper, &sub_range));
361        assert!(!PartialOrder::less_equal(&sub_range, &upper));
362
363        // Check less than or equals holds when equals holds
364        assert!(PartialOrder::less_equal(&lower, &lower));
365        assert!(PartialOrder::less_equal(&partition5, &partition5));
366        assert!(PartialOrder::less_equal(&upper, &upper));
367    }
368
369    #[mz_ore::test]
370    fn antichain_properties() {
371        let mut frontier = Antichain::new();
372
373        // Insert a few uncomparable elements at timestamp 5
374        frontier.extend([
375            Partitioned::new_range(0, 9, 5),
376            Partitioned::new_singleton(10, 5),
377            Partitioned::new_range(11, u64::MAX, 5),
378        ]);
379        assert_eq!(frontier.len(), 3);
380
381        // Insert the biggest range at timestamp 4 that should shadow all other elements
382        frontier.insert(Partitioned::new_range(0, u64::MAX, 4));
383        assert_eq!(
384            frontier,
385            Antichain::from_elem(Partitioned::new_range(0, u64::MAX, 4))
386        );
387
388        // Create a frontier with singleton partition downgraded to timestamp 10 and all the rest at timestamp 5
389        let frontier = Antichain::from_iter([
390            Partitioned::new_range(0, 9, 5),
391            Partitioned::new_singleton(10, 10),
392            Partitioned::new_range(11, u64::MAX, 5),
393        ]);
394
395        // The frontier is less than future timestamps of singleton partition 10
396        assert!(frontier.less_than(&Partitioned::new_singleton(10, 11)));
397        // And also less than any other singleton partition at timestamp 6
398        assert!(frontier.less_than(&Partitioned::new_singleton(0, 6)));
399        // But it's not less than any partition at time 4
400        assert!(!frontier.less_than(&Partitioned::new_singleton(0, 4)));
401        // It's also less than the partition range [3, 5] at time 6
402        assert!(frontier.less_than(&Partitioned::new_range(3, 5, 6)));
403        // But it's not less than the partition range [3, 5] at time 4
404        assert!(!frontier.less_than(&Partitioned::new_range(3, 5, 4)));
405    }
406}
407
408/// Refine an `Antichain<T>` into a `Antichain<Inner>`, using a `Refines`
409/// implementation (in the case of tuple-style timestamps, this usually
410/// means appending a minimum time).
411pub fn refine_antichain<T: Timestamp, Inner: Timestamp + Refines<T>>(
412    frontier: &Antichain<T>,
413) -> Antichain<Inner> {
414    Antichain::from_iter(frontier.iter().map(|t| Refines::to_inner(t.clone())))
415}