Skip to main content

mz_timely_util/
order.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Traits and types for partially ordered sets.
17
18use std::cmp::Ordering;
19use std::fmt::{self, Debug};
20use std::hash::Hash;
21
22use differential_dataflow::containers::CopyRegion;
23use serde::{Deserialize, Serialize};
24use timely::order::Product;
25use timely::progress::Antichain;
26use timely::progress::timestamp::{PathSummary, Refines, Timestamp};
27use timely::{ExchangeData, PartialOrder};
28use uuid::Uuid;
29
30use mz_ore::cast::CastFrom;
31
32/// A partially ordered timestamp that is partitioned by an arbitrary number of partitions
33/// identified by `P`. The construction allows for efficient representation of frontiers with
34/// Antichains.
35///
36/// A `Partitioned<P, T>` timestamp is internally the product order of an `Interval<P>` and a bare
37/// timestamp `T`. An `Interval<P>` represents an inclusive range of values from the type `P` and
38/// its partial order corresponds to the subset order.
39///
40/// Effectively, the minimum `Partitioned` timestamp will start out with the maximum possible
41/// `Interval<P>` on one side and the minimum timestamp `T` on the other side. Users of this
42/// timestamp can selectively downgrade the timestamp by advancing `T`, shrinking the interval, or
43/// both.
44///
45/// Antichains of this type are efficient in storage. In the worst case, where all chosen
46/// partitions have gaps between them, the produced antichain has twice as many elements as
47/// partitions. This is because the "dead space" between the selected partitions must have a
48/// representative timestamp in order for that space to be useable in the future.
49#[derive(
50    Debug,
51    Copy,
52    Clone,
53    PartialEq,
54    Eq,
55    PartialOrd,
56    Ord,
57    Hash,
58    Serialize,
59    Deserialize
60)]
61pub struct Partitioned<P, T>(Product<Interval<P>, T>);
62
63impl<P: Clone + PartialOrd, T> Partitioned<P, T> {
64    /// Constructs a new timestamp for a specific partition.
65    pub fn new_singleton(partition: P, timestamp: T) -> Self {
66        let interval = Interval {
67            lower: partition.clone(),
68            upper: partition,
69        };
70        Self(Product::new(interval, timestamp))
71    }
72
73    /// Constructs a new timestamp for a partition range.
74    pub fn new_range(lower: P, upper: P, timestamp: T) -> Self {
75        assert!(lower <= upper, "invalid range bounds");
76        Self(Product::new(Interval { lower, upper }, timestamp))
77    }
78
79    /// Returns the interval component of this partitioned timestamp.
80    pub fn interval(&self) -> &Interval<P> {
81        &self.0.outer
82    }
83
84    /// Returns the timestamp component of this partitioned timestamp.
85    pub fn timestamp(&self) -> &T {
86        &self.0.inner
87    }
88
89    pub fn timestamp_mut(&mut self) -> &mut T {
90        &mut self.0.inner
91    }
92}
93
94impl<P: Clone + PartialOrd + Step, T: Clone> Partitioned<P, T> {
95    /// Returns up to two partitions that contain the interval before and/or
96    /// after given partition point, neither of which contain the point.
97    pub fn split(&self, point: &P) -> (Option<Self>, Option<Self>) {
98        let (before, after) = self.interval().split(point);
99        let mapper = |interval| Self(Product::new(interval, self.timestamp().clone()));
100        (before.map(mapper), after.map(mapper))
101    }
102}
103
104impl<P: fmt::Display, T: fmt::Display> fmt::Display for Partitioned<P, T> {
105    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
106        f.write_str("(")?;
107        self.0.outer.fmt(f)?;
108        f.write_str(", ")?;
109        self.0.inner.fmt(f)?;
110        f.write_str(")")?;
111        Ok(())
112    }
113}
114
115impl<P, T: Timestamp> Timestamp for Partitioned<P, T>
116where
117    P: Extrema + Clone + Debug + ExchangeData + Hash + Ord,
118{
119    type Summary = ();
120    fn minimum() -> Self {
121        Self(Timestamp::minimum())
122    }
123}
124impl<P, T: Timestamp> Refines<()> for Partitioned<P, T>
125where
126    P: Extrema + Clone + Debug + ExchangeData + Hash + Ord,
127{
128    fn to_inner(_other: ()) -> Self {
129        Self::minimum()
130    }
131
132    fn to_outer(self) {}
133
134    fn summarize(_path: Self::Summary) {}
135}
136
137impl<P: Ord + Eq, T: PartialOrder> PartialOrder for Partitioned<P, T> {
138    #[inline]
139    fn less_equal(&self, other: &Self) -> bool {
140        self.0.less_equal(&other.0)
141    }
142}
143impl<P: Clone, T: Timestamp> PathSummary<Partitioned<P, T>> for () {
144    #[inline]
145    fn results_in(&self, src: &Partitioned<P, T>) -> Option<Partitioned<P, T>> {
146        Some(src.clone())
147    }
148
149    #[inline]
150    fn followed_by(&self, _other: &Self) -> Option<Self> {
151        Some(())
152    }
153}
154
155impl<P: Copy, T: Copy> columnation::Columnation for Partitioned<P, T> {
156    type InnerRegion = CopyRegion<Partitioned<P, T>>;
157}
158
159/// A trait defining the minimum and maximum values of a type.
160pub trait Extrema {
161    /// The minimum value of this type.
162    fn minimum() -> Self;
163    /// The maximum value of this type.
164    fn maximum() -> Self;
165}
166
167impl Extrema for u64 {
168    fn minimum() -> Self {
169        Self::MIN
170    }
171    fn maximum() -> Self {
172        Self::MAX
173    }
174}
175
176impl Extrema for i32 {
177    fn minimum() -> Self {
178        Self::MIN
179    }
180    fn maximum() -> Self {
181        Self::MAX
182    }
183}
184
185impl Extrema for Uuid {
186    fn minimum() -> Self {
187        Self::nil()
188    }
189    fn maximum() -> Self {
190        Self::max()
191    }
192}
193
194// TODO: Switch to the std one when it's no longer unstable: https://doc.rust-lang.org/std/iter/trait.Step.html
195pub trait Step
196where
197    Self: Sized,
198{
199    // Returns the element value sequenced before the type.
200    fn backward_checked(&self, count: usize) -> Option<Self>;
201    // Returns the element value sequenced after the type.
202    fn forward_checked(&self, count: usize) -> Option<Self>;
203}
204
205impl Step for Uuid {
206    fn backward_checked(&self, count: usize) -> Option<Self> {
207        self.as_u128()
208            .checked_sub(u128::cast_from(count))
209            .map(Self::from_u128)
210    }
211    fn forward_checked(&self, count: usize) -> Option<Self> {
212        self.as_u128()
213            .checked_add(u128::cast_from(count))
214            .map(Self::from_u128)
215    }
216}
217
218#[derive(
219    Debug,
220    Copy,
221    Clone,
222    PartialEq,
223    Eq,
224    PartialOrd,
225    Ord,
226    Hash,
227    Serialize,
228    Deserialize
229)]
230/// A type representing an inclusive interval of type `P`, ordered under the subset relation.
231pub struct Interval<P> {
232    pub lower: P,
233    pub upper: P,
234}
235
236impl<P: Eq> Interval<P> {
237    /// Returns the contained element if it's a singleton set.
238    pub fn singleton(&self) -> Option<&P> {
239        if self.lower == self.upper {
240            Some(&self.lower)
241        } else {
242            None
243        }
244    }
245}
246
247impl<P: PartialOrd> Interval<P> {
248    pub fn contains(&self, other: &P) -> bool {
249        self.lower <= *other && *other <= self.upper
250    }
251}
252
253impl<P: Step + PartialOrd + Clone> Interval<P> {
254    /// Returns up to two intervals that contain the range before and/or after given point,
255    /// neither of which contain the point.
256    pub fn split(&self, point: &P) -> (Option<Self>, Option<Self>) {
257        let before = match point.backward_checked(1) {
258            Some(bef) if self.lower <= bef => Some(Interval {
259                lower: self.lower.clone(),
260                upper: bef,
261            }),
262            _ => None,
263        };
264        let after = match point.forward_checked(1) {
265            Some(aft) if self.upper >= aft => Some(Interval {
266                lower: aft,
267                upper: self.upper.clone(),
268            }),
269            _ => None,
270        };
271        (before, after)
272    }
273}
274
275impl<P: Ord + Eq> PartialOrder for Interval<P> {
276    #[inline]
277    fn less_equal(&self, other: &Self) -> bool {
278        self.lower <= other.lower && other.upper <= self.upper
279    }
280}
281
282impl<P: Clone> PathSummary<Interval<P>> for () {
283    #[inline]
284    fn results_in(&self, src: &Interval<P>) -> Option<Interval<P>> {
285        Some(src.clone())
286    }
287
288    #[inline]
289    fn followed_by(&self, _other: &Self) -> Option<Self> {
290        Some(())
291    }
292}
293
294impl<P> Timestamp for Interval<P>
295where
296    P: Extrema + Clone + Debug + ExchangeData + Hash + Ord,
297{
298    type Summary = ();
299
300    #[inline]
301    fn minimum() -> Self {
302        Self {
303            lower: P::minimum(),
304            upper: P::maximum(),
305        }
306    }
307}
308
309impl<P: fmt::Display> fmt::Display for Interval<P> {
310    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
311        f.write_str("[")?;
312        self.lower.fmt(f)?;
313        f.write_str(", ")?;
314        self.upper.fmt(f)?;
315        f.write_str("]")?;
316        Ok(())
317    }
318}
319
320/// A helper struct for reverse partial ordering.
321///
322/// This struct is a helper that can be used with `Antichain` when the maximum inclusive frontier
323/// needs to be maintained as opposed to the mininimum inclusive.
324#[derive(Debug, Clone, Eq, PartialEq)]
325pub struct Reverse<T>(pub T);
326
327impl<T: PartialOrder> PartialOrder for Reverse<T> {
328    #[inline]
329    fn less_equal(&self, other: &Self) -> bool {
330        PartialOrder::less_equal(&other.0, &self.0)
331    }
332}
333impl<T: PartialOrd> PartialOrd for Reverse<T> {
334    #[inline]
335    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
336        other.0.partial_cmp(&self.0)
337    }
338}
339
340impl<T: Ord> Ord for Reverse<T> {
341    #[inline]
342    fn cmp(&self, other: &Self) -> Ordering {
343        other.0.cmp(&self.0)
344    }
345}
346
347#[cfg(test)]
348mod test {
349    use timely::progress::Antichain;
350
351    use super::*;
352
353    #[mz_ore::test]
354    fn basic_properties() {
355        let minimum: Partitioned<u64, u64> = Partitioned::minimum();
356        assert_eq!(minimum, Partitioned::new_range(0, u64::MAX, 0));
357        assert!(PartialOrder::less_equal(&minimum, &minimum));
358        assert!(!PartialOrder::less_than(&minimum, &minimum));
359
360        // All of these should be uncomparable in pairs
361        let lower = Partitioned::new_range(0, 9, 0);
362        let partition10 = Partitioned::new_singleton(10, 0);
363        let upper = Partitioned::new_range(11, u64::MAX, 0);
364        assert!(!PartialOrder::less_equal(&lower, &partition10));
365        assert!(!PartialOrder::less_equal(&partition10, &lower));
366        assert!(!PartialOrder::less_equal(&lower, &upper));
367        assert!(!PartialOrder::less_equal(&upper, &lower));
368        assert!(!PartialOrder::less_equal(&partition10, &upper));
369        assert!(!PartialOrder::less_equal(&upper, &partition10));
370
371        let partition5 = Partitioned::new_singleton(5, 0);
372        // Point 5 is greater than the lower range
373        assert!(PartialOrder::less_than(&lower, &partition5));
374        // But uncomparable with the upper range
375        assert!(!PartialOrder::less_equal(&upper, &partition5));
376        assert!(!PartialOrder::less_equal(&partition5, &upper));
377
378        let sub_range = Partitioned::new_range(2, 4, 0);
379        // This is a subrange of lower
380        assert!(PartialOrder::less_than(&lower, &sub_range));
381        // But uncomparable with the upper range
382        assert!(!PartialOrder::less_equal(&upper, &sub_range));
383        assert!(!PartialOrder::less_equal(&sub_range, &upper));
384
385        // Check less than or equals holds when equals holds
386        assert!(PartialOrder::less_equal(&lower, &lower));
387        assert!(PartialOrder::less_equal(&partition5, &partition5));
388        assert!(PartialOrder::less_equal(&upper, &upper));
389    }
390
391    #[mz_ore::test]
392    fn antichain_properties() {
393        let mut frontier = Antichain::new();
394
395        // Insert a few uncomparable elements at timestamp 5
396        frontier.extend([
397            Partitioned::new_range(0, 9, 5),
398            Partitioned::new_singleton(10, 5),
399            Partitioned::new_range(11, u64::MAX, 5),
400        ]);
401        assert_eq!(frontier.len(), 3);
402
403        // Insert the biggest range at timestamp 4 that should shadow all other elements
404        frontier.insert(Partitioned::new_range(0, u64::MAX, 4));
405        assert_eq!(
406            frontier,
407            Antichain::from_elem(Partitioned::new_range(0, u64::MAX, 4))
408        );
409
410        // Create a frontier with singleton partition downgraded to timestamp 10 and all the rest at timestamp 5
411        let frontier = Antichain::from_iter([
412            Partitioned::new_range(0, 9, 5),
413            Partitioned::new_singleton(10, 10),
414            Partitioned::new_range(11, u64::MAX, 5),
415        ]);
416
417        // The frontier is less than future timestamps of singleton partition 10
418        assert!(frontier.less_than(&Partitioned::new_singleton(10, 11)));
419        // And also less than any other singleton partition at timestamp 6
420        assert!(frontier.less_than(&Partitioned::new_singleton(0, 6)));
421        // But it's not less than any partition at time 4
422        assert!(!frontier.less_than(&Partitioned::new_singleton(0, 4)));
423        // It's also less than the partition range [3, 5] at time 6
424        assert!(frontier.less_than(&Partitioned::new_range(3, 5, 6)));
425        // But it's not less than the partition range [3, 5] at time 4
426        assert!(!frontier.less_than(&Partitioned::new_range(3, 5, 4)));
427    }
428}
429
430/// Refine an `Antichain<T>` into a `Antichain<Inner>`, using a `Refines`
431/// implementation (in the case of tuple-style timestamps, this usually
432/// means appending a minimum time).
433pub fn refine_antichain<T: Timestamp, Inner: Timestamp + Refines<T>>(
434    frontier: &Antichain<T>,
435) -> Antichain<Inner> {
436    Antichain::from_iter(frontier.iter().map(|t| Refines::to_inner(t.clone())))
437}