differential_dataflow/
lattice.rs

1//! Partially ordered elements with a least upper bound.
2//!
3//! Lattices form the basis of differential dataflow's efficient execution in the presence of
4//! iterative sub-computations. All logical times in differential dataflow must implement the
5//! `Lattice` trait, and all reasoning in operators are done it terms of `Lattice` methods.
6
7use timely::order::PartialOrder;
8use timely::progress::{Antichain, frontier::AntichainRef};
9
10/// A bounded partially ordered type supporting joins and meets.
11pub trait Lattice : PartialOrder {
12
13    /// The smallest element greater than or equal to both arguments.
14    ///
15    /// # Examples
16    ///
17    /// ```
18    /// # use timely::PartialOrder;
19    /// # use timely::order::Product;
20    /// # use differential_dataflow::lattice::Lattice;
21    /// # fn main() {
22    ///
23    /// let time1 = Product::new(3, 7);
24    /// let time2 = Product::new(4, 6);
25    /// let join = time1.join(&time2);
26    ///
27    /// assert_eq!(join, Product::new(4, 7));
28    /// # }
29    /// ```
30    fn join(&self, other: &Self) -> Self;
31
32    /// Updates `self` to the smallest element greater than or equal to both arguments.
33    ///
34    /// # Examples
35    ///
36    /// ```
37    /// # use timely::PartialOrder;
38    /// # use timely::order::Product;
39    /// # use differential_dataflow::lattice::Lattice;
40    /// # fn main() {
41    ///
42    /// let mut time1 = Product::new(3, 7);
43    /// let time2 = Product::new(4, 6);
44    /// time1.join_assign(&time2);
45    ///
46    /// assert_eq!(time1, Product::new(4, 7));
47    /// # }
48    /// ```
49    fn join_assign(&mut self, other: &Self) where Self: Sized {
50        *self = self.join(other);
51    }
52
53    /// The largest element less than or equal to both arguments.
54    ///
55    /// # Examples
56    ///
57    /// ```
58    /// # use timely::PartialOrder;
59    /// # use timely::order::Product;
60    /// # use differential_dataflow::lattice::Lattice;
61    /// # fn main() {
62    ///
63    /// let time1 = Product::new(3, 7);
64    /// let time2 = Product::new(4, 6);
65    /// let meet = time1.meet(&time2);
66    ///
67    /// assert_eq!(meet, Product::new(3, 6));
68    /// # }
69    /// ```
70    fn meet(&self, other: &Self) -> Self;
71
72    /// Updates `self` to the largest element less than or equal to both arguments.
73    ///
74    /// # Examples
75    ///
76    /// ```
77    /// # use timely::PartialOrder;
78    /// # use timely::order::Product;
79    /// # use differential_dataflow::lattice::Lattice;
80    /// # fn main() {
81    ///
82    /// let mut time1 = Product::new(3, 7);
83    /// let time2 = Product::new(4, 6);
84    /// time1.meet_assign(&time2);
85    ///
86    /// assert_eq!(time1, Product::new(3, 6));
87    /// # }
88    /// ```
89    fn meet_assign(&mut self, other: &Self) where Self: Sized  {
90        *self = self.meet(other);
91    }
92
93    /// Advances self to the largest time indistinguishable under `frontier`.
94    ///
95    /// This method produces the "largest" lattice element with the property that for every
96    /// lattice element greater than some element of `frontier`, both the result and `self`
97    /// compare identically to the lattice element. The result is the "largest" element in
98    /// the sense that any other element with the same property (compares identically to times
99    /// greater or equal to `frontier`) must be less or equal to the result.
100    ///
101    /// When provided an empty frontier `self` is not modified.
102    ///
103    /// # Examples
104    ///
105    /// ```
106    /// # use timely::PartialOrder;
107    /// # use timely::order::Product;
108    /// # use differential_dataflow::lattice::Lattice;
109    /// # fn main() {
110    ///
111    /// use timely::progress::frontier::{Antichain, AntichainRef};
112    ///
113    /// let time = Product::new(3, 7);
114    /// let mut advanced = Product::new(3, 7);
115    /// let frontier = Antichain::from(vec![Product::new(4, 8), Product::new(5, 3)]);
116    /// advanced.advance_by(frontier.borrow());
117    ///
118    /// // `time` and `advanced` are indistinguishable to elements >= an element of `frontier`
119    /// for i in 0 .. 10 {
120    ///     for j in 0 .. 10 {
121    ///         let test = Product::new(i, j);
122    ///         // for `test` in the future of `frontier` ..
123    ///         if frontier.less_equal(&test) {
124    ///             assert_eq!(time.less_equal(&test), advanced.less_equal(&test));
125    ///         }
126    ///     }
127    /// }
128    ///
129    /// assert_eq!(advanced, Product::new(4, 7));
130    /// # }
131    /// ```
132    #[inline]
133    fn advance_by(&mut self, frontier: AntichainRef<Self>) where Self: Sized {
134        let mut iter = frontier.iter();
135        if let Some(first) = iter.next() {
136            let mut result = self.join(first);
137            for f in iter {
138                result.meet_assign(&self.join(f));
139            }
140            *self = result;
141        }
142    }
143}
144
145use timely::order::Product;
146
147impl<T1: Lattice, T2: Lattice> Lattice for Product<T1, T2> {
148    #[inline]
149    fn join(&self, other: &Product<T1, T2>) -> Product<T1, T2> {
150        Product {
151            outer: self.outer.join(&other.outer),
152            inner: self.inner.join(&other.inner),
153        }
154    }
155    #[inline]
156    fn meet(&self, other: &Product<T1, T2>) -> Product<T1, T2> {
157        Product {
158            outer: self.outer.meet(&other.outer),
159            inner: self.inner.meet(&other.inner),
160        }
161    }
162}
163
164/// A type that has a unique maximum element.
165pub trait Maximum {
166    /// The unique maximal element of the set.
167    fn maximum() -> Self;
168}
169
170/// Implements `Maximum` for elements with a `MAX` associated constant.
171macro_rules! implement_maximum {
172    ($($index_type:ty,)*) => (
173        $(
174            impl Maximum for $index_type {
175                fn maximum() -> Self { Self::MAX }
176            }
177        )*
178    )
179}
180
181implement_maximum!(usize, u128, u64, u32, u16, u8, isize, i128, i64, i32, i16, i8, Duration,);
182impl Maximum for () { fn maximum() -> () { () }}
183
184use timely::progress::Timestamp;
185
186// Tuples have the annoyance that they are only a lattice for `T2` with maximal elements,
187// as the `meet` operator on `(x, _)` and `(y, _)` would be `(x meet y, maximum())`.
188impl<T1: Lattice+Clone, T2: Lattice+Clone+Maximum+Timestamp> Lattice for (T1, T2) {
189    #[inline]
190    fn join(&self, other: &(T1, T2)) -> (T1, T2) {
191        if self.0.eq(&other.0) {
192            (self.0.clone(), self.1.join(&other.1))
193        } else if self.0.less_than(&other.0) {
194            other.clone()
195        } else if other.0.less_than(&self.0) {
196            self.clone()
197        } else {
198            (self.0.join(&other.0), T2::minimum())
199        }
200    }
201    #[inline]
202    fn meet(&self, other: &(T1, T2)) -> (T1, T2) {
203        if self.0.eq(&other.0) {
204            (self.0.clone(), self.1.meet(&other.1))
205        } else if self.0.less_than(&other.0) {
206            self.clone()
207        } else if other.0.less_than(&self.0) {
208            other.clone()
209        } else {
210            (self.0.meet(&other.0), T2::maximum())
211        }
212    }
213}
214
215macro_rules! implement_lattice {
216    ($index_type:ty, $minimum:expr) => (
217        impl Lattice for $index_type {
218            #[inline] fn join(&self, other: &Self) -> Self { ::std::cmp::max(*self, *other) }
219            #[inline] fn meet(&self, other: &Self) -> Self { ::std::cmp::min(*self, *other) }
220        }
221    )
222}
223
224use std::time::Duration;
225
226implement_lattice!(Duration, Duration::new(0, 0));
227implement_lattice!(usize, 0);
228implement_lattice!(u128, 0);
229implement_lattice!(u64, 0);
230implement_lattice!(u32, 0);
231implement_lattice!(u16, 0);
232implement_lattice!(u8, 0);
233implement_lattice!(isize, 0);
234implement_lattice!(i128, 0);
235implement_lattice!(i64, 0);
236implement_lattice!(i32, 0);
237implement_lattice!(i16, 0);
238implement_lattice!(i8, 0);
239implement_lattice!((), ());
240
241/// Returns the "smallest" minimal antichain "greater or equal" to both inputs.
242///
243/// This method is primarily meant for cases where one cannot use the methods
244/// of `Antichain`'s `PartialOrder` implementation, such as when one has only
245/// references rather than owned antichains.
246///
247/// # Examples
248///
249/// ```
250/// # use timely::PartialOrder;
251/// # use timely::order::Product;
252/// # use differential_dataflow::lattice::Lattice;
253/// # use differential_dataflow::lattice::antichain_join;
254/// # fn main() {
255///
256/// let f1 = &[Product::new(3, 7), Product::new(5, 6)];
257/// let f2 = &[Product::new(4, 6)];
258/// let join = antichain_join(f1, f2);
259/// assert_eq!(&*join.elements(), &[Product::new(4, 7), Product::new(5, 6)]);
260/// # }
261/// ```
262pub fn antichain_join<T: Lattice>(one: &[T], other: &[T]) -> Antichain<T> {
263    let mut upper = Antichain::new();
264    antichain_join_into(one, other, &mut upper);
265    upper
266}
267
268/// Returns the "smallest" minimal antichain "greater or equal" to both inputs.
269///
270/// This method is primarily meant for cases where one cannot use the methods
271/// of `Antichain`'s `PartialOrder` implementation, such as when one has only
272/// references rather than owned antichains.
273///
274/// This function is similar to [antichain_join] but reuses an existing allocation.
275/// The provided antichain is cleared before inserting elements.
276///
277/// # Examples
278///
279/// ```
280/// # use timely::PartialOrder;
281/// # use timely::order::Product;
282/// # use timely::progress::Antichain;
283/// # use differential_dataflow::lattice::Lattice;
284/// # use differential_dataflow::lattice::antichain_join_into;
285/// # fn main() {
286///
287/// let mut join = Antichain::new();
288/// let f1 = &[Product::new(3, 7), Product::new(5, 6)];
289/// let f2 = &[Product::new(4, 6)];
290/// antichain_join_into(f1, f2, &mut join);
291/// assert_eq!(&*join.elements(), &[Product::new(4, 7), Product::new(5, 6)]);
292/// # }
293/// ```
294pub fn antichain_join_into<T: Lattice>(one: &[T], other: &[T], upper: &mut Antichain<T>) {
295    upper.clear();
296    for time1 in one {
297        for time2 in other {
298            upper.insert(time1.join(time2));
299        }
300    }
301}
302
303/// Returns the "greatest" minimal antichain "less or equal" to both inputs.
304///
305/// This method is primarily meant for cases where one cannot use the methods
306/// of `Antichain`'s `PartialOrder` implementation, such as when one has only
307/// references rather than owned antichains.
308///
309/// # Examples
310///
311/// ```
312/// # use timely::PartialOrder;
313/// # use timely::order::Product;
314/// # use differential_dataflow::lattice::Lattice;
315/// # use differential_dataflow::lattice::antichain_meet;
316/// # fn main() {
317///
318/// let f1 = &[Product::new(3, 7), Product::new(5, 6)];
319/// let f2 = &[Product::new(4, 6)];
320/// let meet = antichain_meet(f1, f2);
321/// assert_eq!(&*meet.elements(), &[Product::new(3, 7), Product::new(4, 6)]);
322/// # }
323/// ```
324pub fn antichain_meet<T: Lattice+Clone>(one: &[T], other: &[T]) -> Antichain<T> {
325    let mut upper = Antichain::new();
326    for time1 in one {
327        upper.insert(time1.clone());
328    }
329    for time2 in other {
330        upper.insert(time2.clone());
331    }
332    upper
333}
334
335impl<T: Lattice+Clone> Lattice for Antichain<T> {
336    fn join(&self, other: &Self) -> Self {
337        let mut upper = Antichain::new();
338        for time1 in self.elements().iter() {
339            for time2 in other.elements().iter() {
340                upper.insert(time1.join(time2));
341            }
342        }
343        upper
344    }
345    fn meet(&self, other: &Self) -> Self {
346        let mut upper = Antichain::new();
347        for time1 in self.elements().iter() {
348            upper.insert(time1.clone());
349        }
350        for time2 in other.elements().iter() {
351            upper.insert(time2.clone());
352        }
353        upper
354    }
355}