differential_dataflow/lattice.rs
1//! Partially ordered elements with a least upper bound.
2//!
3//! Lattices form the basis of differential dataflow's efficient execution in the presence of
4//! iterative sub-computations. All logical times in differential dataflow must implement the
5//! `Lattice` trait, and all reasoning in operators are done it terms of `Lattice` methods.
6
7use timely::order::PartialOrder;
8use timely::progress::{Antichain, frontier::AntichainRef};
9
10/// A bounded partially ordered type supporting joins and meets.
11pub trait Lattice : PartialOrder {
12
13 /// The smallest element greater than or equal to both arguments.
14 ///
15 /// # Examples
16 ///
17 /// ```
18 /// # use timely::PartialOrder;
19 /// # use timely::order::Product;
20 /// # use differential_dataflow::lattice::Lattice;
21 /// # fn main() {
22 ///
23 /// let time1 = Product::new(3, 7);
24 /// let time2 = Product::new(4, 6);
25 /// let join = time1.join(&time2);
26 ///
27 /// assert_eq!(join, Product::new(4, 7));
28 /// # }
29 /// ```
30 #[must_use]
31 fn join(&self, other: &Self) -> Self;
32
33 /// Updates `self` to the smallest element greater than or equal to both arguments.
34 ///
35 /// # Examples
36 ///
37 /// ```
38 /// # use timely::PartialOrder;
39 /// # use timely::order::Product;
40 /// # use differential_dataflow::lattice::Lattice;
41 /// # fn main() {
42 ///
43 /// let mut time1 = Product::new(3, 7);
44 /// let time2 = Product::new(4, 6);
45 /// time1.join_assign(&time2);
46 ///
47 /// assert_eq!(time1, Product::new(4, 7));
48 /// # }
49 /// ```
50 fn join_assign(&mut self, other: &Self) where Self: Sized {
51 *self = self.join(other);
52 }
53
54 /// The largest element less than or equal to both arguments.
55 ///
56 /// # Examples
57 ///
58 /// ```
59 /// # use timely::PartialOrder;
60 /// # use timely::order::Product;
61 /// # use differential_dataflow::lattice::Lattice;
62 /// # fn main() {
63 ///
64 /// let time1 = Product::new(3, 7);
65 /// let time2 = Product::new(4, 6);
66 /// let meet = time1.meet(&time2);
67 ///
68 /// assert_eq!(meet, Product::new(3, 6));
69 /// # }
70 /// ```
71 #[must_use]
72 fn meet(&self, other: &Self) -> Self;
73
74 /// Updates `self` to the largest element less than or equal to both arguments.
75 ///
76 /// # Examples
77 ///
78 /// ```
79 /// # use timely::PartialOrder;
80 /// # use timely::order::Product;
81 /// # use differential_dataflow::lattice::Lattice;
82 /// # fn main() {
83 ///
84 /// let mut time1 = Product::new(3, 7);
85 /// let time2 = Product::new(4, 6);
86 /// time1.meet_assign(&time2);
87 ///
88 /// assert_eq!(time1, Product::new(3, 6));
89 /// # }
90 /// ```
91 fn meet_assign(&mut self, other: &Self) where Self: Sized {
92 *self = self.meet(other);
93 }
94
95 /// Advances self to the largest time indistinguishable under `frontier`.
96 ///
97 /// This method produces the "largest" lattice element with the property that for every
98 /// lattice element greater than some element of `frontier`, both the result and `self`
99 /// compare identically to the lattice element. The result is the "largest" element in
100 /// the sense that any other element with the same property (compares identically to times
101 /// greater or equal to `frontier`) must be less or equal to the result.
102 ///
103 /// When provided an empty frontier `self` is not modified.
104 ///
105 /// # Examples
106 ///
107 /// ```
108 /// # use timely::PartialOrder;
109 /// # use timely::order::Product;
110 /// # use differential_dataflow::lattice::Lattice;
111 /// # fn main() {
112 ///
113 /// use timely::progress::frontier::{Antichain, AntichainRef};
114 ///
115 /// let time = Product::new(3, 7);
116 /// let mut advanced = Product::new(3, 7);
117 /// let frontier = Antichain::from(vec![Product::new(4, 8), Product::new(5, 3)]);
118 /// advanced.advance_by(frontier.borrow());
119 ///
120 /// // `time` and `advanced` are indistinguishable to elements >= an element of `frontier`
121 /// for i in 0 .. 10 {
122 /// for j in 0 .. 10 {
123 /// let test = Product::new(i, j);
124 /// // for `test` in the future of `frontier` ..
125 /// if frontier.less_equal(&test) {
126 /// assert_eq!(time.less_equal(&test), advanced.less_equal(&test));
127 /// }
128 /// }
129 /// }
130 ///
131 /// assert_eq!(advanced, Product::new(4, 7));
132 /// # }
133 /// ```
134 #[inline]
135 fn advance_by(&mut self, frontier: AntichainRef<Self>) where Self: Sized {
136 let mut iter = frontier.iter();
137 if let Some(first) = iter.next() {
138 let mut result = self.join(first);
139 for f in iter {
140 result.meet_assign(&self.join(f));
141 }
142 *self = result;
143 }
144 }
145}
146
147use timely::order::Product;
148
149impl<T1: Lattice, T2: Lattice> Lattice for Product<T1, T2> {
150 #[inline]
151 fn join(&self, other: &Product<T1, T2>) -> Product<T1, T2> {
152 Product {
153 outer: self.outer.join(&other.outer),
154 inner: self.inner.join(&other.inner),
155 }
156 }
157 #[inline]
158 fn meet(&self, other: &Product<T1, T2>) -> Product<T1, T2> {
159 Product {
160 outer: self.outer.meet(&other.outer),
161 inner: self.inner.meet(&other.inner),
162 }
163 }
164}
165
166/// A type that has a unique maximum element.
167pub trait Maximum {
168 /// The unique maximal element of the set.
169 fn maximum() -> Self;
170}
171
172/// Implements `Maximum` for elements with a `MAX` associated constant.
173macro_rules! implement_maximum {
174 ($($index_type:ty,)*) => (
175 $(
176 impl Maximum for $index_type {
177 fn maximum() -> Self { Self::MAX }
178 }
179 )*
180 )
181}
182
183implement_maximum!(usize, u128, u64, u32, u16, u8, isize, i128, i64, i32, i16, i8, Duration,);
184impl Maximum for () { fn maximum() -> () { () }}
185
186use timely::progress::Timestamp;
187
188// Tuples have the annoyance that they are only a lattice for `T2` with maximal elements,
189// as the `meet` operator on `(x, _)` and `(y, _)` would be `(x meet y, maximum())`.
190impl<T1: Lattice+Clone, T2: Lattice+Clone+Maximum+Timestamp> Lattice for (T1, T2) {
191 #[inline]
192 fn join(&self, other: &(T1, T2)) -> (T1, T2) {
193 if self.0.eq(&other.0) {
194 (self.0.clone(), self.1.join(&other.1))
195 } else if self.0.less_than(&other.0) {
196 other.clone()
197 } else if other.0.less_than(&self.0) {
198 self.clone()
199 } else {
200 (self.0.join(&other.0), T2::minimum())
201 }
202 }
203 #[inline]
204 fn meet(&self, other: &(T1, T2)) -> (T1, T2) {
205 if self.0.eq(&other.0) {
206 (self.0.clone(), self.1.meet(&other.1))
207 } else if self.0.less_than(&other.0) {
208 self.clone()
209 } else if other.0.less_than(&self.0) {
210 other.clone()
211 } else {
212 (self.0.meet(&other.0), T2::maximum())
213 }
214 }
215}
216
217macro_rules! implement_lattice {
218 ($index_type:ty, $minimum:expr) => (
219 impl Lattice for $index_type {
220 #[inline] fn join(&self, other: &Self) -> Self { ::std::cmp::max(*self, *other) }
221 #[inline] fn meet(&self, other: &Self) -> Self { ::std::cmp::min(*self, *other) }
222 }
223 )
224}
225
226use std::time::Duration;
227
228implement_lattice!(Duration, Duration::new(0, 0));
229implement_lattice!(usize, 0);
230implement_lattice!(u128, 0);
231implement_lattice!(u64, 0);
232implement_lattice!(u32, 0);
233implement_lattice!(u16, 0);
234implement_lattice!(u8, 0);
235implement_lattice!(isize, 0);
236implement_lattice!(i128, 0);
237implement_lattice!(i64, 0);
238implement_lattice!(i32, 0);
239implement_lattice!(i16, 0);
240implement_lattice!(i8, 0);
241implement_lattice!((), ());
242
243/// Returns the "smallest" minimal antichain "greater or equal" to both inputs.
244///
245/// This method is primarily meant for cases where one cannot use the methods
246/// of `Antichain`'s `PartialOrder` implementation, such as when one has only
247/// references rather than owned antichains.
248///
249/// # Examples
250///
251/// ```
252/// # use timely::PartialOrder;
253/// # use timely::order::Product;
254/// # use differential_dataflow::lattice::Lattice;
255/// # use differential_dataflow::lattice::antichain_join;
256/// # fn main() {
257///
258/// let f1 = &[Product::new(3, 7), Product::new(5, 6)];
259/// let f2 = &[Product::new(4, 6)];
260/// let join = antichain_join(f1, f2);
261/// assert_eq!(&*join.elements(), &[Product::new(4, 7), Product::new(5, 6)]);
262/// # }
263/// ```
264pub fn antichain_join<T: Lattice>(one: &[T], other: &[T]) -> Antichain<T> {
265 let mut upper = Antichain::new();
266 antichain_join_into(one, other, &mut upper);
267 upper
268}
269
270/// Returns the "smallest" minimal antichain "greater or equal" to both inputs.
271///
272/// This method is primarily meant for cases where one cannot use the methods
273/// of `Antichain`'s `PartialOrder` implementation, such as when one has only
274/// references rather than owned antichains.
275///
276/// This function is similar to [antichain_join] but reuses an existing allocation.
277/// The provided antichain is cleared before inserting elements.
278///
279/// # Examples
280///
281/// ```
282/// # use timely::PartialOrder;
283/// # use timely::order::Product;
284/// # use timely::progress::Antichain;
285/// # use differential_dataflow::lattice::Lattice;
286/// # use differential_dataflow::lattice::antichain_join_into;
287/// # fn main() {
288///
289/// let mut join = Antichain::new();
290/// let f1 = &[Product::new(3, 7), Product::new(5, 6)];
291/// let f2 = &[Product::new(4, 6)];
292/// antichain_join_into(f1, f2, &mut join);
293/// assert_eq!(&*join.elements(), &[Product::new(4, 7), Product::new(5, 6)]);
294/// # }
295/// ```
296pub fn antichain_join_into<T: Lattice>(one: &[T], other: &[T], upper: &mut Antichain<T>) {
297 upper.clear();
298 for time1 in one {
299 for time2 in other {
300 upper.insert(time1.join(time2));
301 }
302 }
303}
304
305/// Returns the "greatest" minimal antichain "less or equal" to both inputs.
306///
307/// This method is primarily meant for cases where one cannot use the methods
308/// of `Antichain`'s `PartialOrder` implementation, such as when one has only
309/// references rather than owned antichains.
310///
311/// # Examples
312///
313/// ```
314/// # use timely::PartialOrder;
315/// # use timely::order::Product;
316/// # use differential_dataflow::lattice::Lattice;
317/// # use differential_dataflow::lattice::antichain_meet;
318/// # fn main() {
319///
320/// let f1 = &[Product::new(3, 7), Product::new(5, 6)];
321/// let f2 = &[Product::new(4, 6)];
322/// let meet = antichain_meet(f1, f2);
323/// assert_eq!(&*meet.elements(), &[Product::new(3, 7), Product::new(4, 6)]);
324/// # }
325/// ```
326pub fn antichain_meet<T: Lattice+Clone>(one: &[T], other: &[T]) -> Antichain<T> {
327 let mut upper = Antichain::new();
328 for time1 in one {
329 upper.insert(time1.clone());
330 }
331 for time2 in other {
332 upper.insert(time2.clone());
333 }
334 upper
335}
336
337impl<T: Lattice+Clone> Lattice for Antichain<T> {
338 fn join(&self, other: &Self) -> Self {
339 let mut upper = Antichain::new();
340 for time1 in self.elements().iter() {
341 for time2 in other.elements().iter() {
342 upper.insert(time1.join(time2));
343 }
344 }
345 upper
346 }
347 fn meet(&self, other: &Self) -> Self {
348 let mut upper = Antichain::new();
349 for time1 in self.elements().iter() {
350 upper.insert(time1.clone());
351 }
352 for time2 in other.elements().iter() {
353 upper.insert(time2.clone());
354 }
355 upper
356 }
357}