timely/progress/
frontier.rs

1//! Tracks minimal sets of mutually incomparable elements of a partial order.
2
3use serde::{Deserialize, Serialize};
4use smallvec::SmallVec;
5
6use crate::progress::ChangeBatch;
7use crate::order::{PartialOrder, TotalOrder};
8
9/// A set of mutually incomparable elements.
10///
11/// An antichain is a set of partially ordered elements, each of which is incomparable to the others.
12/// This antichain implementation allows you to repeatedly introduce elements to the antichain, and
13/// which will evict larger elements to maintain the *minimal* antichain, those incomparable elements
14/// no greater than any other element.
15///
16/// Two antichains are equal if they contain the same set of elements, even if in different orders.
17/// This can make equality testing quadratic, though linear in the common case that the sequences
18/// are identical.
19#[derive(Debug, Serialize, Deserialize)]
20pub struct Antichain<T> {
21    elements: SmallVec<[T; 1]>
22}
23
24impl<T: PartialOrder> Antichain<T> {
25    /// Updates the `Antichain` if the element is not greater than or equal to some present element.
26    ///
27    /// Returns `true` if element is added to the set
28    ///
29    /// # Examples
30    ///
31    ///```
32    /// use timely::progress::frontier::Antichain;
33    ///
34    /// let mut frontier = Antichain::new();
35    /// assert!(frontier.insert(2));
36    /// assert!(!frontier.insert(3));
37    ///```
38    pub fn insert(&mut self, element: T) -> bool {
39        if !self.elements.iter().any(|x| x.less_equal(&element)) {
40            self.elements.retain(|x| !element.less_equal(x));
41            self.elements.push(element);
42            true
43        }
44        else {
45            false
46        }
47    }
48
49    /// Updates the `Antichain` if the element is not greater than or equal to some present element.
50    ///
51    /// Returns `true` if element is added to the set
52    ///
53    /// Accepts a reference to an element, which is cloned when inserting.
54    ///
55    /// # Examples
56    ///
57    ///```
58    /// use timely::progress::frontier::Antichain;
59    ///
60    /// let mut frontier = Antichain::new();
61    /// assert!(frontier.insert_ref(&2));
62    /// assert!(!frontier.insert(3));
63    ///```
64    pub fn insert_ref(&mut self, element: &T) -> bool where T: Clone {
65        if !self.elements.iter().any(|x| x.less_equal(element)) {
66            self.elements.retain(|x| !element.less_equal(x));
67            self.elements.push(element.clone());
68            true
69        }
70        else {
71            false
72        }
73    }
74
75    /// Updates the `Antichain` if the element is not greater than or equal to some present element.
76    /// If the antichain needs updating, it uses the `to_owned` closure to convert the element into
77    /// a `T`.
78    ///
79    /// Returns `true` if element is added to the set
80    ///
81    /// # Examples
82    ///
83    ///```
84    /// use timely::progress::frontier::Antichain;
85    ///
86    /// let mut frontier = Antichain::new();
87    /// assert!(frontier.insert_with(&2, |x| *x));
88    /// assert!(!frontier.insert(3));
89    ///```
90    pub fn insert_with<O: PartialOrder<T>, F: FnOnce(&O) -> T>(&mut self, element: &O, to_owned: F) -> bool where T: PartialOrder<O> {
91        if !self.elements.iter().any(|x| x.less_equal(element)) {
92            self.elements.retain(|x| !element.less_equal(x));
93            self.elements.push(to_owned(element));
94            true
95        }
96        else {
97            false
98        }
99    }
100
101    /// Reserves capacity for at least additional more elements to be inserted in the given `Antichain`
102    pub fn reserve(&mut self, additional: usize) {
103        self.elements.reserve(additional);
104    }
105
106    /// Performs a sequence of insertion and returns `true` iff any insertion does.
107    ///
108    /// # Examples
109    ///
110    ///```
111    /// use timely::progress::frontier::Antichain;
112    ///
113    /// let mut frontier = Antichain::new();
114    /// assert!(frontier.extend(Some(3)));
115    /// assert!(frontier.extend(vec![2, 5]));
116    /// assert!(!frontier.extend(vec![3, 4]));
117    ///```
118    pub fn extend<I: IntoIterator<Item=T>>(&mut self, iterator: I) -> bool {
119        let mut added = false;
120        for element in iterator {
121            added = self.insert(element) || added;
122        }
123        added
124    }
125
126    /// Returns `true` if any item in the antichain is strictly less than the argument.
127    ///
128    /// # Examples
129    ///
130    ///```
131    /// use timely::progress::frontier::Antichain;
132    ///
133    /// let mut frontier = Antichain::from_elem(2);
134    /// assert!(frontier.less_than(&3));
135    /// assert!(!frontier.less_than(&2));
136    /// assert!(!frontier.less_than(&1));
137    ///
138    /// frontier.clear();
139    /// assert!(!frontier.less_than(&3));
140    ///```
141    #[inline]
142    pub fn less_than(&self, time: &T) -> bool {
143        self.elements.iter().any(|x| x.less_than(time))
144    }
145
146    /// Returns `true` if any item in the antichain is less than or equal to the argument.
147    ///
148    /// # Examples
149    ///
150    ///```
151    /// use timely::progress::frontier::Antichain;
152    ///
153    /// let mut frontier = Antichain::from_elem(2);
154    /// assert!(frontier.less_equal(&3));
155    /// assert!(frontier.less_equal(&2));
156    /// assert!(!frontier.less_equal(&1));
157    ///
158    /// frontier.clear();
159    /// assert!(!frontier.less_equal(&3));
160    ///```
161    #[inline]
162    pub fn less_equal(&self, time: &T) -> bool {
163        self.elements.iter().any(|x| x.less_equal(time))
164    }
165
166    /// Returns `true` if every element of `other` is greater or equal to some element of `self`.
167    #[deprecated(since="0.12.0", note="please use `PartialOrder::less_equal` instead")]
168    #[inline]
169    pub fn dominates(&self, other: &Antichain<T>) -> bool {
170        <Self as PartialOrder>::less_equal(self, other)
171    }
172}
173
174impl<T: PartialOrder> std::iter::FromIterator<T> for Antichain<T> {
175    fn from_iter<I>(iterator: I) -> Self
176    where
177        I: IntoIterator<Item=T>
178    {
179        let mut result = Self::new();
180        result.extend(iterator);
181        result
182    }
183}
184
185impl<T> Antichain<T> {
186
187    /// Creates a new empty `Antichain`.
188    ///
189    /// # Examples
190    ///
191    ///```
192    /// use timely::progress::frontier::Antichain;
193    ///
194    /// let mut frontier = Antichain::<u32>::new();
195    ///```
196    pub fn new() -> Antichain<T> { Antichain { elements: SmallVec::new() } }
197
198    /// Creates a new empty `Antichain` with space for `capacity` elements.
199    ///
200    /// # Examples
201    ///
202    ///```
203    /// use timely::progress::frontier::Antichain;
204    ///
205    /// let mut frontier = Antichain::<u32>::with_capacity(10);
206    ///```
207    pub fn with_capacity(capacity: usize) -> Self {
208        Self {
209            elements: SmallVec::with_capacity(capacity),
210        }
211    }
212
213    /// Creates a new singleton `Antichain`.
214    ///
215    /// # Examples
216    ///
217    ///```
218    /// use timely::progress::frontier::Antichain;
219    ///
220    /// let mut frontier = Antichain::from_elem(2);
221    ///```
222    pub fn from_elem(element: T) -> Antichain<T> { 
223        let mut elements = SmallVec::with_capacity(1);
224        elements.push(element);
225        Antichain { elements } 
226    }
227
228    /// Clears the contents of the antichain.
229    ///
230    /// # Examples
231    ///
232    ///```
233    /// use timely::progress::frontier::Antichain;
234    ///
235    /// let mut frontier = Antichain::from_elem(2);
236    /// frontier.clear();
237    /// assert!(frontier.elements().is_empty());
238    ///```
239    pub fn clear(&mut self) { self.elements.clear() }
240
241    /// Sorts the elements so that comparisons between antichains can be made.
242    pub fn sort(&mut self) where T: Ord { self.elements.sort() }
243
244    /// Reveals the elements in the antichain.
245    ///
246    /// This method is redundant with `<Antichain<T> as Deref>`, but the method
247    /// is in such broad use that we probably don't want to deprecate it without
248    /// some time to fix all things.
249    ///
250    /// # Examples
251    ///
252    ///```
253    /// use timely::progress::frontier::Antichain;
254    ///
255    /// let mut frontier = Antichain::from_elem(2);
256    /// assert_eq!(frontier.elements(), &[2]);
257    ///```
258    #[inline] pub fn elements(&self) -> &[T] { &self[..] }
259
260    /// Reveals the elements in the antichain.
261    ///
262    /// # Examples
263    ///
264    ///```
265    /// use timely::progress::frontier::Antichain;
266    ///
267    /// let mut frontier = Antichain::from_elem(2);
268    /// assert_eq!(&*frontier.borrow(), &[2]);
269    ///```
270    #[inline] pub fn borrow(&self) -> AntichainRef<T> { AntichainRef::new(&self.elements) }}
271
272impl<T: PartialEq> PartialEq for Antichain<T> {
273    fn eq(&self, other: &Self) -> bool {
274        // Lengths should be the same, with the option for fast acceptance if identical.
275        self.elements().len() == other.elements().len() &&
276        (
277            self.elements().iter().zip(other.elements().iter()).all(|(t1,t2)| t1 == t2) ||
278            self.elements().iter().all(|t1| other.elements().iter().any(|t2| t1.eq(t2)))
279        )
280    }
281}
282
283impl<T: Eq> Eq for Antichain<T> { }
284
285impl<T: PartialOrder> PartialOrder for Antichain<T> {
286    fn less_equal(&self, other: &Self) -> bool {
287        other.elements().iter().all(|t2| self.elements().iter().any(|t1| t1.less_equal(t2)))
288    }
289}
290
291impl<T: Clone> Clone for Antichain<T> {
292    fn clone(&self) -> Self {
293        Antichain { elements: self.elements.clone() }
294    }
295    fn clone_from(&mut self, source: &Self) {
296        self.elements.clone_from(&source.elements)
297    }
298}
299
300impl<T> Default for Antichain<T> {
301    fn default() -> Self {
302        Self::new()
303    }
304}
305
306impl<T: TotalOrder> TotalOrder for Antichain<T> { }
307
308impl<T: TotalOrder> Antichain<T> {
309    /// Convert to the at most one element the antichain contains.
310    pub fn into_option(mut self) -> Option<T> {
311        debug_assert!(self.len() <= 1);
312        self.elements.pop()
313    }
314    /// Return a reference to the at most one element the antichain contains.
315    pub fn as_option(&self) -> Option<&T> {
316        debug_assert!(self.len() <= 1);
317        self.elements.last()
318    }
319}
320
321impl<T: Ord+std::hash::Hash> std::hash::Hash for Antichain<T> {
322    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
323        let mut temp = self.elements.iter().collect::<Vec<_>>();
324        temp.sort();
325        for element in temp {
326            element.hash(state);
327        }
328    }
329}
330
331impl<T: PartialOrder> From<Vec<T>> for Antichain<T> {
332    fn from(vec: Vec<T>) -> Self {
333        // TODO: We could reuse `vec` with some care.
334        let mut temp = Antichain::new();
335        for elem in vec.into_iter() { temp.insert(elem); }
336        temp
337    }
338}
339
340impl<T> From<Antichain<T>> for SmallVec<[T; 1]> {
341    fn from(val: Antichain<T>) -> Self {
342        val.elements
343    }
344}
345
346impl<T> ::std::ops::Deref for Antichain<T> {
347    type Target = [T];
348    fn deref(&self) -> &Self::Target {
349        &self.elements
350    }
351}
352
353impl<T> ::std::iter::IntoIterator for Antichain<T> {
354    type Item = T;
355    type IntoIter = smallvec::IntoIter<[T; 1]>;
356    fn into_iter(self) -> Self::IntoIter {
357        self.elements.into_iter()
358    }
359}
360
361/// An antichain based on a multiset whose elements frequencies can be updated.
362///
363/// The `MutableAntichain` maintains frequencies for many elements of type `T`, and exposes the set
364/// of elements with positive count not greater than any other elements with positive count. The
365/// antichain may both advance and retreat; the changes do not all need to be to elements greater or
366/// equal to some elements of the frontier.
367///
368/// The type `T` must implement `PartialOrder` as well as `Ord`. The implementation of the `Ord` trait
369/// is used to efficiently organize the updates for cancellation, and to efficiently determine the lower
370/// bounds, and only needs to not contradict the `PartialOrder` implementation (that is, if `PartialOrder`
371/// orders two elements, then so does the `Ord` implementation).
372///
373/// The `MutableAntichain` implementation is done with the intent that updates to it are done in batches,
374/// and it is acceptable to rebuild the frontier from scratch when a batch of updates change it. This means
375/// that it can be expensive to maintain a large number of counts and change few elements near the frontier.
376#[derive(Clone, Debug, Serialize, Deserialize)]
377pub struct MutableAntichain<T> {
378    updates: ChangeBatch<T>,
379    frontier: Vec<T>,
380    changes: ChangeBatch<T>,
381}
382
383impl<T> MutableAntichain<T> {
384    /// Creates a new empty `MutableAntichain`.
385    ///
386    /// # Examples
387    ///
388    ///```
389    /// use timely::progress::frontier::MutableAntichain;
390    ///
391    /// let frontier = MutableAntichain::<usize>::new();
392    /// assert!(frontier.is_empty());
393    ///```
394    #[inline]
395    pub fn new() -> MutableAntichain<T> {
396        MutableAntichain {
397            updates: ChangeBatch::new(),
398            frontier:  Vec::new(),
399            changes: ChangeBatch::new(),
400        }
401    }
402
403    /// Removes all elements.
404    ///
405    /// # Examples
406    ///
407    ///```
408    /// use timely::progress::frontier::MutableAntichain;
409    ///
410    /// let mut frontier = MutableAntichain::<usize>::new();
411    /// frontier.clear();
412    /// assert!(frontier.is_empty());
413    ///```
414    #[inline]
415    pub fn clear(&mut self) {
416        self.updates.clear();
417        self.frontier.clear();
418        self.changes.clear();
419    }
420
421    /// Reveals the minimal elements with positive count.
422    ///
423    /// # Examples
424    ///
425    ///```
426    /// use timely::progress::frontier::MutableAntichain;
427    ///
428    /// let mut frontier = MutableAntichain::<usize>::new();
429    /// assert!(frontier.frontier().len() == 0);
430    ///```
431    #[inline]
432    pub fn frontier(&self) -> AntichainRef<'_, T> {
433        AntichainRef::new(&self.frontier)
434    }
435
436    /// Creates a new singleton `MutableAntichain`.
437    ///
438    /// # Examples
439    ///
440    ///```
441    /// use timely::progress::frontier::{AntichainRef, MutableAntichain};
442    ///
443    /// let mut frontier = MutableAntichain::new_bottom(0u64);
444    /// assert!(frontier.frontier() == AntichainRef::new(&[0u64]));
445    ///```
446    #[inline]
447    pub fn new_bottom(bottom: T) -> MutableAntichain<T> 
448    where
449        T: Ord+Clone,
450    {
451        MutableAntichain {
452            updates: ChangeBatch::new_from(bottom.clone(), 1),
453            frontier: vec![bottom],
454            changes: ChangeBatch::new(),
455        }
456    }
457
458    /// Returns `true` if there are no elements in the `MutableAntichain`.
459    ///
460    /// # Examples
461    ///
462    ///```
463    /// use timely::progress::frontier::MutableAntichain;
464    ///
465    /// let mut frontier = MutableAntichain::<usize>::new();
466    /// assert!(frontier.is_empty());
467    ///```
468    #[inline]
469    pub fn is_empty(&self) -> bool {
470        self.frontier.is_empty()
471    }
472
473    /// Returns `true` if any item in the `MutableAntichain` is strictly less than the argument.
474    ///
475    /// # Examples
476    ///
477    ///```
478    /// use timely::progress::frontier::MutableAntichain;
479    ///
480    /// let mut frontier = MutableAntichain::new_bottom(1u64);
481    /// assert!(!frontier.less_than(&0));
482    /// assert!(!frontier.less_than(&1));
483    /// assert!(frontier.less_than(&2));
484    ///```
485    #[inline]
486    pub fn less_than<O>(&self, time: &O) -> bool
487    where
488        T: PartialOrder<O>,
489    {
490        self.frontier().less_than(time)
491    }
492
493    /// Returns `true` if any item in the `MutableAntichain` is less than or equal to the argument.
494    ///
495    /// # Examples
496    ///
497    ///```
498    /// use timely::progress::frontier::MutableAntichain;
499    ///
500    /// let mut frontier = MutableAntichain::new_bottom(1u64);
501    /// assert!(!frontier.less_equal(&0));
502    /// assert!(frontier.less_equal(&1));
503    /// assert!(frontier.less_equal(&2));
504    ///```
505    #[inline]
506    pub fn less_equal<O>(&self, time: &O) -> bool
507    where
508        T: PartialOrder<O>,
509    {
510        self.frontier().less_equal(time)
511    }
512
513    /// Applies updates to the antichain and enumerates any changes.
514    ///
515    /// # Examples
516    ///
517    ///```
518    /// use timely::progress::frontier::{AntichainRef, MutableAntichain};
519    ///
520    /// let mut frontier = MutableAntichain::new_bottom(1u64);
521    /// let changes =
522    /// frontier
523    ///     .update_iter(vec![(1, -1), (2, 7)])
524    ///     .collect::<Vec<_>>();
525    ///
526    /// assert!(frontier.frontier() == AntichainRef::new(&[2]));
527    /// assert!(changes == vec![(1, -1), (2, 1)]);
528    ///```
529    #[inline]
530    pub fn update_iter<I>(&mut self, updates: I) -> smallvec::Drain<'_, [(T, i64); 2]>
531    where
532        T: Clone + PartialOrder + Ord,
533        I: IntoIterator<Item = (T, i64)>,
534    {
535        let updates = updates.into_iter();
536
537        // track whether a rebuild is needed.
538        let mut rebuild_required = false;
539        for (time, delta) in updates {
540
541            // If we do not yet require a rebuild, test whether we might require one
542            // and set the flag in that case.
543            if !rebuild_required {
544                let beyond_frontier = self.frontier.iter().any(|f| f.less_than(&time));
545                let before_frontier = !self.frontier.iter().any(|f| f.less_equal(&time));
546                rebuild_required = !(beyond_frontier || (delta < 0 && before_frontier));
547            }
548
549            self.updates.update(time, delta);
550        }
551
552        if rebuild_required {
553            self.rebuild()
554        }
555        self.changes.drain()
556    }
557
558    /// Rebuilds `self.frontier` from `self.updates`.
559    ///
560    /// This method is meant to be used for bulk updates to the frontier, and does more work than one might do
561    /// for single updates, but is meant to be an efficient way to process multiple updates together. This is
562    /// especially true when we want to apply very large numbers of updates.
563    fn rebuild(&mut self)
564    where
565        T: Clone + PartialOrder + Ord,
566    {
567        for time in self.frontier.drain(..) {
568            self.changes.update(time, -1);
569        }
570
571        // build new frontier using strictly positive times.
572        // as the times are sorted, we don't need to worry that we might displace frontier elements.
573        for time in self.updates.iter().filter(|x| x.1 > 0) {
574            if !self.frontier.iter().any(|f| f.less_equal(&time.0)) {
575                self.frontier.push(time.0.clone());
576            }
577        }
578
579        for time in self.frontier.iter() {
580            self.changes.update(time.clone(), 1);
581        }
582    }
583
584    /// Reports the count for a queried time.
585    pub fn count_for<O>(&self, query_time: &O) -> i64
586    where
587        T: PartialEq<O>,
588    {
589        self.updates
590            .unstable_internal_updates()
591            .iter()
592            .filter(|td| td.0.eq(query_time))
593            .map(|td| td.1)
594            .sum()
595    }
596
597    /// Reports the updates that form the frontier. Returns an iterator of timestamps and their frequency.
598    ///
599    /// Rebuilds the internal representation before revealing times and frequencies.
600    pub fn updates(&mut self) -> impl Iterator<Item=&(T, i64)>
601    where
602        T: Clone + PartialOrder + Ord,
603    {
604        self.rebuild();
605        self.updates.iter()
606    }
607}
608
609impl<T> Default for MutableAntichain<T> {
610    fn default() -> Self {
611        Self::new()
612    }
613}
614
615/// Extension trait for filtering time changes through antichains.
616pub trait MutableAntichainFilter<T: PartialOrder+Ord+Clone> {
617    /// Filters time changes through an antichain.
618    ///
619    /// # Examples
620    ///
621    /// ```
622    /// use timely::progress::frontier::{MutableAntichain, MutableAntichainFilter};
623    ///
624    /// let mut frontier = MutableAntichain::new_bottom(1u64);
625    /// let changes =
626    /// vec![(1, -1), (2, 7)]
627    ///     .filter_through(&mut frontier)
628    ///     .collect::<Vec<_>>();
629    ///
630    /// assert!(changes == vec![(1, -1), (2, 1)]);
631    /// ```
632    fn filter_through(self, antichain: &mut MutableAntichain<T>) -> smallvec::Drain<[(T,i64); 2]>;
633}
634
635impl<T: PartialOrder+Ord+Clone, I: IntoIterator<Item=(T,i64)>> MutableAntichainFilter<T> for I {
636    fn filter_through(self, antichain: &mut MutableAntichain<T>) -> smallvec::Drain<[(T,i64); 2]> {
637        antichain.update_iter(self)
638    }
639}
640
641impl<T: PartialOrder+Ord+Clone> From<Antichain<T>> for MutableAntichain<T> {
642    fn from(antichain: Antichain<T>) -> Self {
643        let mut result = MutableAntichain::new();
644        result.update_iter(antichain.into_iter().map(|time| (time, 1)));
645        result
646    }
647}
648impl<'a, T: PartialOrder+Ord+Clone> From<AntichainRef<'a, T>> for MutableAntichain<T> {
649    fn from(antichain: AntichainRef<'a, T>) -> Self {
650        let mut result = MutableAntichain::new();
651        result.update_iter(antichain.into_iter().map(|time| (time.clone(), 1)));
652        result
653    }
654}
655
656impl<T> std::iter::FromIterator<(T, i64)> for MutableAntichain<T>
657where
658    T: Clone + PartialOrder + Ord,
659{
660    fn from_iter<I>(iterator: I) -> Self
661    where
662        I: IntoIterator<Item=(T, i64)>,
663    {
664        let mut result = Self::new();
665        result.update_iter(iterator);
666        result
667    }
668}
669
670/// A wrapper for elements of an antichain.
671#[derive(Debug)]
672pub struct AntichainRef<'a, T: 'a> {
673    /// Elements contained in the antichain.
674    frontier: &'a [T],
675}
676
677impl<'a, T: 'a> Clone for AntichainRef<'a, T> {
678    fn clone(&self) -> Self { *self }
679}
680
681impl<'a, T: 'a> Copy for AntichainRef<'a, T> { }
682
683impl<'a, T: 'a> AntichainRef<'a, T> {
684    /// Create a new `AntichainRef` from a reference to a slice of elements forming the frontier.
685    ///
686    /// This method does not check that this antichain has any particular properties, for example
687    /// that there are no elements strictly less than other elements.
688    pub fn new(frontier: &'a [T]) -> Self {
689        Self {
690            frontier,
691        }
692    }
693
694    /// Constructs an owned antichain from the antichain reference.
695    ///
696    /// # Examples
697    ///
698    ///```
699    /// use timely::progress::{Antichain, frontier::AntichainRef};
700    ///
701    /// let frontier = AntichainRef::new(&[1u64]);
702    /// assert_eq!(frontier.to_owned(), Antichain::from_elem(1u64));
703    ///```
704    pub fn to_owned(&self) -> Antichain<T> where T: Clone {
705        Antichain {
706            elements: self.frontier.into()
707        }
708    }
709}
710
711impl<T> AntichainRef<'_, T> {
712
713    /// Returns `true` if any item in the `AntichainRef` is strictly less than the argument.
714    ///
715    /// # Examples
716    ///
717    ///```
718    /// use timely::progress::frontier::AntichainRef;
719    ///
720    /// let frontier = AntichainRef::new(&[1u64]);
721    /// assert!(!frontier.less_than(&0));
722    /// assert!(!frontier.less_than(&1));
723    /// assert!(frontier.less_than(&2));
724    ///```
725    #[inline]
726    pub fn less_than<O>(&self, time: &O) -> bool where T: PartialOrder<O> {
727        self.iter().any(|x| x.less_than(time))
728    }
729
730    /// Returns `true` if any item in the `AntichainRef` is less than or equal to the argument.
731    #[inline]
732    ///
733    /// # Examples
734    ///
735    ///```
736    /// use timely::progress::frontier::AntichainRef;
737    ///
738    /// let frontier = AntichainRef::new(&[1u64]);
739    /// assert!(!frontier.less_equal(&0));
740    /// assert!(frontier.less_equal(&1));
741    /// assert!(frontier.less_equal(&2));
742    ///```
743    pub fn less_equal<O>(&self, time: &O) -> bool where T: PartialOrder<O> {
744        self.iter().any(|x| x.less_equal(time))
745    }
746}
747
748impl<T: PartialEq> PartialEq for AntichainRef<'_, T> {
749    fn eq(&self, other: &Self) -> bool {
750        // Lengths should be the same, with the option for fast acceptance if identical.
751        self.len() == other.len() &&
752        (
753            self.iter().zip(other.iter()).all(|(t1,t2)| t1 == t2) ||
754            self.iter().all(|t1| other.iter().any(|t2| t1.eq(t2)))
755        )
756    }
757}
758
759impl<T: Eq> Eq for AntichainRef<'_, T> { }
760
761impl<T: PartialOrder> PartialOrder for AntichainRef<'_, T> {
762    fn less_equal(&self, other: &Self) -> bool {
763        other.iter().all(|t2| self.iter().any(|t1| t1.less_equal(t2)))
764    }
765}
766
767impl<T: TotalOrder> TotalOrder for AntichainRef<'_, T> { }
768
769impl<T: TotalOrder> AntichainRef<'_, T> {
770    /// Return a reference to the at most one element the antichain contains.
771    pub fn as_option(&self) -> Option<&T> {
772        debug_assert!(self.len() <= 1);
773        self.frontier.last()
774    }
775}
776
777impl<T> ::std::ops::Deref for AntichainRef<'_, T> {
778    type Target = [T];
779    fn deref(&self) -> &Self::Target {
780        self.frontier
781    }
782}
783
784impl<'a, T: 'a> ::std::iter::IntoIterator for &'a AntichainRef<'a, T> {
785    type Item = &'a T;
786    type IntoIter = ::std::slice::Iter<'a, T>;
787    fn into_iter(self) -> Self::IntoIter {
788        self.iter()
789    }
790}
791
792#[cfg(test)]
793mod tests {
794    use std::collections::HashSet;
795
796    use super::*;
797
798    #[derive(PartialEq, Eq, PartialOrd, Ord, Hash)]
799    struct Elem(char, usize);
800
801    impl PartialOrder for Elem {
802        fn less_equal(&self, other: &Self) -> bool {
803            self.0 <= other.0 && self.1 <= other.1
804        }
805    }
806
807    #[test]
808    fn antichain_hash() {
809        let mut hashed = HashSet::new();
810        hashed.insert(Antichain::from(vec![Elem('a', 2), Elem('b', 1)]));
811
812        assert!(hashed.contains(&Antichain::from(vec![Elem('a', 2), Elem('b', 1)])));
813        assert!(hashed.contains(&Antichain::from(vec![Elem('b', 1), Elem('a', 2)])));
814
815        assert!(!hashed.contains(&Antichain::from(vec![Elem('a', 2)])));
816        assert!(!hashed.contains(&Antichain::from(vec![Elem('a', 1)])));
817        assert!(!hashed.contains(&Antichain::from(vec![Elem('b', 2)])));
818        assert!(!hashed.contains(&Antichain::from(vec![Elem('a', 1), Elem('b', 2)])));
819        assert!(!hashed.contains(&Antichain::from(vec![Elem('c', 3)])));
820        assert!(!hashed.contains(&Antichain::from(vec![])));
821    }
822
823    #[test]
824    fn mutable_compaction() {
825        let mut mutable = MutableAntichain::new();
826        mutable.update_iter(Some((7, 1)));
827        mutable.update_iter(Some((7, 1)));
828        mutable.update_iter(Some((7, 1)));
829        mutable.update_iter(Some((7, 1)));
830        mutable.update_iter(Some((7, 1)));
831        mutable.update_iter(Some((7, 1)));
832        mutable.update_iter(Some((8, 1)));
833        mutable.update_iter(Some((8, 1)));
834        mutable.update_iter(Some((8, 1)));
835        mutable.update_iter(Some((8, 1)));
836        mutable.update_iter(Some((8, 1)));
837        for _ in 0 .. 1000 {
838            mutable.update_iter(Some((9, 1)));
839            mutable.update_iter(Some((9, -1)));
840        }
841        assert!(mutable.updates.unstable_internal_updates().len() <= 32);
842    }
843}