Skip to main content

timely/progress/
frontier.rs

1//! Tracks minimal sets of mutually incomparable elements of a partial order.
2
3use serde::{Deserialize, Serialize};
4use smallvec::SmallVec;
5
6use crate::progress::ChangeBatch;
7use crate::order::{PartialOrder, TotalOrder};
8
9/// A set of mutually incomparable elements.
10///
11/// An antichain is a set of partially ordered elements, each of which is incomparable to the others.
12/// This antichain implementation allows you to repeatedly introduce elements to the antichain, and
13/// which will evict larger elements to maintain the *minimal* antichain, those incomparable elements
14/// no greater than any other element.
15///
16/// Two antichains are equal if they contain the same set of elements, even if in different orders.
17/// This can make equality testing quadratic, though linear in the common case that the sequences
18/// are identical.
19#[derive(Debug, Serialize, Deserialize)]
20pub struct Antichain<T> {
21    elements: SmallVec<[T; 1]>
22}
23
24impl<T: PartialOrder> Antichain<T> {
25    /// Updates the `Antichain` if the element is not greater than or equal to some present element.
26    ///
27    /// Returns `true` if element is added to the set
28    ///
29    /// # Examples
30    ///
31    ///```
32    /// use timely::progress::frontier::Antichain;
33    ///
34    /// let mut frontier = Antichain::new();
35    /// assert!(frontier.insert(2));
36    /// assert!(!frontier.insert(3));
37    ///```
38    pub fn insert(&mut self, element: T) -> bool {
39        if !self.elements.iter().any(|x| x.less_equal(&element)) {
40            self.elements.retain(|x| !element.less_equal(x));
41            self.elements.push(element);
42            true
43        }
44        else {
45            false
46        }
47    }
48
49    /// Updates the `Antichain` if the element is not greater than or equal to some present element.
50    ///
51    /// Returns `true` if element is added to the set
52    ///
53    /// Accepts a reference to an element, which is cloned when inserting.
54    ///
55    /// # Examples
56    ///
57    ///```
58    /// use timely::progress::frontier::Antichain;
59    ///
60    /// let mut frontier = Antichain::new();
61    /// assert!(frontier.insert_ref(&2));
62    /// assert!(!frontier.insert(3));
63    ///```
64    pub fn insert_ref(&mut self, element: &T) -> bool where T: Clone {
65        if !self.elements.iter().any(|x| x.less_equal(element)) {
66            self.elements.retain(|x| !element.less_equal(x));
67            self.elements.push(element.clone());
68            true
69        }
70        else {
71            false
72        }
73    }
74
75    /// Updates the `Antichain` if the element is not greater than or equal to some present element.
76    /// If the antichain needs updating, it uses the `to_owned` closure to convert the element into
77    /// a `T`.
78    ///
79    /// Returns `true` if element is added to the set
80    ///
81    /// # Examples
82    ///
83    ///```
84    /// use timely::progress::frontier::Antichain;
85    ///
86    /// let mut frontier = Antichain::new();
87    /// assert!(frontier.insert_with(&2, |x| *x));
88    /// assert!(!frontier.insert(3));
89    ///```
90    pub fn insert_with<O: PartialOrder<T>, F: FnOnce(&O) -> T>(&mut self, element: &O, to_owned: F) -> bool where T: PartialOrder<O> {
91        if !self.elements.iter().any(|x| x.less_equal(element)) {
92            self.elements.retain(|x| !element.less_equal(x));
93            self.elements.push(to_owned(element));
94            true
95        }
96        else {
97            false
98        }
99    }
100
101    /// Reserves capacity for at least additional more elements to be inserted in the given `Antichain`
102    pub fn reserve(&mut self, additional: usize) {
103        self.elements.reserve(additional);
104    }
105
106    /// Performs a sequence of insertion and returns `true` iff any insertion does.
107    ///
108    /// # Examples
109    ///
110    ///```
111    /// use timely::progress::frontier::Antichain;
112    ///
113    /// let mut frontier = Antichain::new();
114    /// assert!(frontier.extend(Some(3)));
115    /// assert!(frontier.extend(vec![2, 5]));
116    /// assert!(!frontier.extend(vec![3, 4]));
117    ///```
118    pub fn extend<I: IntoIterator<Item=T>>(&mut self, iterator: I) -> bool {
119        let mut added = false;
120        for element in iterator {
121            added = self.insert(element) || added;
122        }
123        added
124    }
125
126    /// Returns `true` if any item in the antichain is strictly less than the argument.
127    ///
128    /// # Examples
129    ///
130    ///```
131    /// use timely::progress::frontier::Antichain;
132    ///
133    /// let mut frontier = Antichain::from_elem(2);
134    /// assert!(frontier.less_than(&3));
135    /// assert!(!frontier.less_than(&2));
136    /// assert!(!frontier.less_than(&1));
137    ///
138    /// frontier.clear();
139    /// assert!(!frontier.less_than(&3));
140    ///```
141    #[inline]
142    pub fn less_than(&self, time: &T) -> bool {
143        self.elements.iter().any(|x| x.less_than(time))
144    }
145
146    /// Returns `true` if any item in the antichain is less than or equal to the argument.
147    ///
148    /// # Examples
149    ///
150    ///```
151    /// use timely::progress::frontier::Antichain;
152    ///
153    /// let mut frontier = Antichain::from_elem(2);
154    /// assert!(frontier.less_equal(&3));
155    /// assert!(frontier.less_equal(&2));
156    /// assert!(!frontier.less_equal(&1));
157    ///
158    /// frontier.clear();
159    /// assert!(!frontier.less_equal(&3));
160    ///```
161    #[inline]
162    pub fn less_equal(&self, time: &T) -> bool {
163        self.elements.iter().any(|x| x.less_equal(time))
164    }
165
166    /// Returns `true` if every element of `other` is greater or equal to some element of `self`.
167    #[deprecated(since="0.12.0", note="please use `PartialOrder::less_equal` instead")]
168    #[inline]
169    pub fn dominates(&self, other: &Antichain<T>) -> bool {
170        <Self as PartialOrder>::less_equal(self, other)
171    }
172}
173
174impl<T: PartialOrder> std::iter::FromIterator<T> for Antichain<T> {
175    fn from_iter<I>(iterator: I) -> Self
176    where
177        I: IntoIterator<Item=T>
178    {
179        let mut result = Self::new();
180        result.extend(iterator);
181        result
182    }
183}
184
185impl<T> Antichain<T> {
186
187    /// Creates a new empty `Antichain`.
188    ///
189    /// # Examples
190    ///
191    ///```
192    /// use timely::progress::frontier::Antichain;
193    ///
194    /// let mut frontier = Antichain::<u32>::new();
195    ///```
196    pub fn new() -> Antichain<T> { Antichain { elements: SmallVec::new() } }
197
198    /// Creates a new empty `Antichain` with space for `capacity` elements.
199    ///
200    /// # Examples
201    ///
202    ///```
203    /// use timely::progress::frontier::Antichain;
204    ///
205    /// let mut frontier = Antichain::<u32>::with_capacity(10);
206    ///```
207    pub fn with_capacity(capacity: usize) -> Self {
208        Self {
209            elements: SmallVec::with_capacity(capacity),
210        }
211    }
212
213    /// Creates a new singleton `Antichain`.
214    ///
215    /// # Examples
216    ///
217    ///```
218    /// use timely::progress::frontier::Antichain;
219    ///
220    /// let mut frontier = Antichain::from_elem(2);
221    ///```
222    pub fn from_elem(element: T) -> Antichain<T> {
223        let mut elements = SmallVec::with_capacity(1);
224        elements.push(element);
225        Antichain { elements }
226    }
227
228    /// Clears the contents of the antichain.
229    ///
230    /// # Examples
231    ///
232    ///```
233    /// use timely::progress::frontier::Antichain;
234    ///
235    /// let mut frontier = Antichain::from_elem(2);
236    /// frontier.clear();
237    /// assert!(frontier.elements().is_empty());
238    ///```
239    pub fn clear(&mut self) { self.elements.clear() }
240
241    /// Sorts the elements so that comparisons between antichains can be made.
242    pub fn sort(&mut self) where T: Ord { self.elements.sort() }
243
244    /// Reveals the elements in the antichain.
245    ///
246    /// This method is redundant with `<Antichain<T> as Deref>`, but the method
247    /// is in such broad use that we probably don't want to deprecate it without
248    /// some time to fix all things.
249    ///
250    /// # Examples
251    ///
252    ///```
253    /// use timely::progress::frontier::Antichain;
254    ///
255    /// let mut frontier = Antichain::from_elem(2);
256    /// assert_eq!(frontier.elements(), &[2]);
257    ///```
258    #[inline] pub fn elements(&self) -> &[T] { &self[..] }
259
260    /// Reveals the elements in the antichain.
261    ///
262    /// # Examples
263    ///
264    ///```
265    /// use timely::progress::frontier::Antichain;
266    ///
267    /// let mut frontier = Antichain::from_elem(2);
268    /// assert_eq!(&*frontier.borrow(), &[2]);
269    ///```
270    #[inline] pub fn borrow(&self) -> AntichainRef<'_, T> { AntichainRef::new(&self.elements) }}
271
272impl<T: PartialEq> PartialEq for Antichain<T> {
273    fn eq(&self, other: &Self) -> bool {
274        // Lengths should be the same, with the option for fast acceptance if identical.
275        self.elements().len() == other.elements().len() &&
276        (
277            self.elements().iter().zip(other.elements().iter()).all(|(t1,t2)| t1 == t2) ||
278            self.elements().iter().all(|t1| other.elements().iter().any(|t2| t1.eq(t2)))
279        )
280    }
281}
282
283impl<T: Eq> Eq for Antichain<T> { }
284
285impl<T: PartialOrder> PartialOrder for Antichain<T> {
286    fn less_equal(&self, other: &Self) -> bool {
287        other.elements().iter().all(|t2| self.elements().iter().any(|t1| t1.less_equal(t2)))
288    }
289}
290
291impl<T: Clone> Clone for Antichain<T> {
292    fn clone(&self) -> Self {
293        Antichain { elements: self.elements.clone() }
294    }
295    fn clone_from(&mut self, source: &Self) {
296        self.elements.clone_from(&source.elements)
297    }
298}
299
300impl<T> Default for Antichain<T> {
301    fn default() -> Self {
302        Self::new()
303    }
304}
305
306impl<T: TotalOrder> TotalOrder for Antichain<T> { }
307
308impl<T: TotalOrder> Antichain<T> {
309    /// Convert to the at most one element the antichain contains.
310    pub fn into_option(mut self) -> Option<T> {
311        debug_assert!(self.len() <= 1);
312        self.elements.pop()
313    }
314    /// Return a reference to the at most one element the antichain contains.
315    pub fn as_option(&self) -> Option<&T> {
316        debug_assert!(self.len() <= 1);
317        self.elements.last()
318    }
319}
320
321impl<T: Ord+std::hash::Hash> std::hash::Hash for Antichain<T> {
322    fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
323        let mut temp = self.elements.iter().collect::<Vec<_>>();
324        temp.sort();
325        for element in temp {
326            element.hash(state);
327        }
328    }
329}
330
331impl<T: PartialOrder> From<Vec<T>> for Antichain<T> {
332    fn from(vec: Vec<T>) -> Self {
333        // TODO: We could reuse `vec` with some care.
334        let mut temp = Antichain::new();
335        for elem in vec.into_iter() { temp.insert(elem); }
336        temp
337    }
338}
339
340impl<T> From<Antichain<T>> for SmallVec<[T; 1]> {
341    fn from(val: Antichain<T>) -> Self {
342        val.elements
343    }
344}
345
346impl<T> ::std::ops::Deref for Antichain<T> {
347    type Target = [T];
348    fn deref(&self) -> &Self::Target {
349        &self.elements
350    }
351}
352
353impl<T> ::std::iter::IntoIterator for Antichain<T> {
354    type Item = T;
355    type IntoIter = smallvec::IntoIter<[T; 1]>;
356    fn into_iter(self) -> Self::IntoIter {
357        self.elements.into_iter()
358    }
359}
360
361/// An antichain based on a multiset whose elements frequencies can be updated.
362///
363/// The `MutableAntichain` maintains frequencies for many elements of type `T`, and exposes the set
364/// of elements with positive count not greater than any other elements with positive count. The
365/// antichain may both advance and retreat; the changes do not all need to be to elements greater or
366/// equal to some elements of the frontier.
367///
368/// The type `T` must implement `PartialOrder` as well as `Ord`. The implementation of the `Ord` trait
369/// is used to efficiently organize the updates for cancellation, and to efficiently determine the lower
370/// bounds, and only needs to not contradict the `PartialOrder` implementation (that is, if `PartialOrder`
371/// orders two elements, then so does the `Ord` implementation).
372///
373/// The `MutableAntichain` implementation is done with the intent that updates to it are done in batches,
374/// and it is acceptable to rebuild the frontier from scratch when a batch of updates change it. This means
375/// that it can be expensive to maintain a large number of counts and change few elements near the frontier.
376#[derive(Clone, Debug, Serialize, Deserialize)]
377pub struct MutableAntichain<T> {
378    updates: ChangeBatch<T>,
379    frontier: Vec<T>,
380    changes: ChangeBatch<T>,
381}
382
383impl<T> MutableAntichain<T> {
384    /// Creates a new empty `MutableAntichain`.
385    ///
386    /// # Examples
387    ///
388    ///```
389    /// use timely::progress::frontier::MutableAntichain;
390    ///
391    /// let frontier = MutableAntichain::<usize>::new();
392    /// assert!(frontier.is_empty());
393    ///```
394    #[inline]
395    pub fn new() -> MutableAntichain<T> {
396        MutableAntichain {
397            updates: ChangeBatch::new(),
398            frontier:  Vec::new(),
399            changes: ChangeBatch::new(),
400        }
401    }
402
403    /// Removes all elements.
404    ///
405    /// # Examples
406    ///
407    ///```
408    /// use timely::progress::frontier::MutableAntichain;
409    ///
410    /// let mut frontier = MutableAntichain::<usize>::new();
411    /// frontier.clear();
412    /// assert!(frontier.is_empty());
413    ///```
414    #[inline]
415    pub fn clear(&mut self) {
416        self.updates.clear();
417        self.frontier.clear();
418        self.changes.clear();
419    }
420
421    /// Reveals the minimal elements with positive count.
422    ///
423    /// # Examples
424    ///
425    ///```
426    /// use timely::progress::frontier::MutableAntichain;
427    ///
428    /// let mut frontier = MutableAntichain::<usize>::new();
429    /// assert!(frontier.frontier().len() == 0);
430    ///```
431    #[inline]
432    pub fn frontier(&self) -> AntichainRef<'_, T> {
433        AntichainRef::new(&self.frontier)
434    }
435
436    /// Creates a new singleton `MutableAntichain`.
437    ///
438    /// # Examples
439    ///
440    ///```
441    /// use timely::progress::frontier::{AntichainRef, MutableAntichain};
442    ///
443    /// let mut frontier = MutableAntichain::from_elem(0u64);
444    /// assert!(frontier.frontier() == AntichainRef::new(&[0u64]));
445    ///```
446    #[inline]
447    pub fn from_elem(bottom: T) -> MutableAntichain<T>
448    where
449        T: Ord+Clone,
450    {
451        MutableAntichain {
452            updates: ChangeBatch::new_from(bottom.clone(), 1),
453            frontier: vec![bottom],
454            changes: ChangeBatch::new(),
455        }
456    }
457
458    /// Returns `true` if there are no elements in the `MutableAntichain`.
459    ///
460    /// # Examples
461    ///
462    ///```
463    /// use timely::progress::frontier::MutableAntichain;
464    ///
465    /// let mut frontier = MutableAntichain::<usize>::new();
466    /// assert!(frontier.is_empty());
467    ///```
468    #[inline]
469    pub fn is_empty(&self) -> bool {
470        self.frontier.is_empty()
471    }
472
473    /// Returns `true` if any item in the `MutableAntichain` is strictly less than the argument.
474    ///
475    /// # Examples
476    ///
477    ///```
478    /// use timely::progress::frontier::MutableAntichain;
479    ///
480    /// let mut frontier = MutableAntichain::from_elem(1u64);
481    /// assert!(!frontier.less_than(&0));
482    /// assert!(!frontier.less_than(&1));
483    /// assert!(frontier.less_than(&2));
484    ///```
485    #[inline]
486    pub fn less_than<O>(&self, time: &O) -> bool
487    where
488        T: PartialOrder<O>,
489    {
490        self.frontier().less_than(time)
491    }
492
493    /// Returns `true` if any item in the `MutableAntichain` is less than or equal to the argument.
494    ///
495    /// # Examples
496    ///
497    ///```
498    /// use timely::progress::frontier::MutableAntichain;
499    ///
500    /// let mut frontier = MutableAntichain::from_elem(1u64);
501    /// assert!(!frontier.less_equal(&0));
502    /// assert!(frontier.less_equal(&1));
503    /// assert!(frontier.less_equal(&2));
504    ///```
505    #[inline]
506    pub fn less_equal<O>(&self, time: &O) -> bool
507    where
508        T: PartialOrder<O>,
509    {
510        self.frontier().less_equal(time)
511    }
512
513    /// Applies updates to the antichain and enumerates any changes.
514    ///
515    /// # Examples
516    ///
517    ///```
518    /// use timely::progress::frontier::{AntichainRef, MutableAntichain};
519    ///
520    /// let mut frontier = MutableAntichain::from_elem(1u64);
521    /// let changes =
522    /// frontier
523    ///     .update_iter(vec![(1, -1), (2, 7)])
524    ///     .collect::<Vec<_>>();
525    ///
526    /// assert!(frontier.frontier() == AntichainRef::new(&[2]));
527    /// assert!(changes == vec![(1, -1), (2, 1)]);
528    ///```
529    #[inline]
530    pub fn update_iter<I>(&mut self, updates: I) -> smallvec::Drain<'_, [(T, i64); 2]>
531    where
532        T: Clone + PartialOrder + Ord,
533        I: IntoIterator<Item = (T, i64)>,
534    {
535        // track whether a rebuild is needed.
536        let mut rebuild_required = false;
537        for (time, delta) in updates {
538            // If we do not yet require a rebuild, test whether we might require one
539            // and set the flag in that case.
540            if !rebuild_required {
541                rebuild_required = self.requires_rebuild(&time, delta);
542            }
543
544            self.updates.update(time, delta);
545        }
546
547        if rebuild_required {
548            self.rebuild()
549        }
550        self.changes.drain()
551    }
552
553    /// Tests whether applying `(time, delta)` will require a frontier rebuild.
554    ///
555    /// Factored out of [`Self::update_iter`] so it is generic only over `T` and not
556    /// the iterator type, deduplicating the inlined `frontier.iter().any(...)` bodies
557    /// across `update_iter` monomorphizations.
558    fn requires_rebuild(&self, time: &T, delta: i64) -> bool
559    where
560        T: PartialOrder,
561    {
562        // Single-pass `for` loop (instead of two `Iterator::any` calls) avoids
563        // monomorphizing `slice::Iter::any` over per-call-site closure types and
564        // traverses `self.frontier` at most once.
565        let mut beyond_frontier = false;
566        let mut before_frontier = true;
567        for f in &self.frontier {
568            if !beyond_frontier && f.less_than(time) {
569                beyond_frontier = true;
570            }
571            if before_frontier && f.less_equal(time) {
572                before_frontier = false;
573            }
574            if beyond_frontier && !before_frontier {
575                break;
576            }
577        }
578        !(beyond_frontier || (delta < 0 && before_frontier))
579    }
580
581    /// Rebuilds `self.frontier` from `self.updates`.
582    ///
583    /// This method is meant to be used for bulk updates to the frontier, and does more work than one might do
584    /// for single updates, but is meant to be an efficient way to process multiple updates together. This is
585    /// especially true when we want to apply very large numbers of updates.
586    fn rebuild(&mut self)
587    where
588        T: Clone + PartialOrder + Ord,
589    {
590        for time in self.frontier.drain(..) {
591            self.changes.update(time, -1);
592        }
593
594        // build new frontier using strictly positive times.
595        // as the times are sorted, we don't need to worry that we might displace frontier elements.
596        for time in self.updates.iter().filter(|x| x.1 > 0) {
597            if !self.frontier.iter().any(|f| f.less_equal(&time.0)) {
598                self.frontier.push(time.0.clone());
599            }
600        }
601
602        for time in self.frontier.iter() {
603            self.changes.update(time.clone(), 1);
604        }
605    }
606
607    /// Reports the count for a queried time.
608    pub fn count_for<O>(&self, query_time: &O) -> i64
609    where
610        T: PartialEq<O>,
611    {
612        self.updates
613            .unstable_internal_updates()
614            .iter()
615            .filter(|td| td.0.eq(query_time))
616            .map(|td| td.1)
617            .sum()
618    }
619
620    /// Reports the updates that form the frontier. Returns an iterator of timestamps and their frequency.
621    ///
622    /// Rebuilds the internal representation before revealing times and frequencies.
623    pub fn updates(&mut self) -> impl Iterator<Item=&(T, i64)>
624    where
625        T: Clone + PartialOrder + Ord,
626    {
627        self.rebuild();
628        self.updates.iter()
629    }
630}
631
632impl<T> Default for MutableAntichain<T> {
633    fn default() -> Self {
634        Self::new()
635    }
636}
637
638/// Extension trait for filtering time changes through antichains.
639pub trait MutableAntichainFilter<T: PartialOrder+Ord+Clone> {
640    /// Filters time changes through an antichain.
641    ///
642    /// # Examples
643    ///
644    /// ```
645    /// use timely::progress::frontier::{MutableAntichain, MutableAntichainFilter};
646    ///
647    /// let mut frontier = MutableAntichain::from_elem(1u64);
648    /// let changes =
649    /// vec![(1, -1), (2, 7)]
650    ///     .filter_through(&mut frontier)
651    ///     .collect::<Vec<_>>();
652    ///
653    /// assert!(changes == vec![(1, -1), (2, 1)]);
654    /// ```
655    fn filter_through(self, antichain: &mut MutableAntichain<T>) -> smallvec::Drain<'_, [(T,i64); 2]>;
656}
657
658impl<T: PartialOrder+Ord+Clone, I: IntoIterator<Item=(T,i64)>> MutableAntichainFilter<T> for I {
659    fn filter_through(self, antichain: &mut MutableAntichain<T>) -> smallvec::Drain<'_, [(T,i64); 2]> {
660        antichain.update_iter(self)
661    }
662}
663
664impl<T: PartialOrder+Ord+Clone> From<Antichain<T>> for MutableAntichain<T> {
665    fn from(antichain: Antichain<T>) -> Self {
666        let mut result = MutableAntichain::new();
667        result.update_iter(antichain.into_iter().map(|time| (time, 1)));
668        result
669    }
670}
671impl<'a, T: PartialOrder+Ord+Clone> From<AntichainRef<'a, T>> for MutableAntichain<T> {
672    fn from(antichain: AntichainRef<'a, T>) -> Self {
673        let mut result = MutableAntichain::new();
674        result.update_iter(antichain.into_iter().map(|time| (time.clone(), 1)));
675        result
676    }
677}
678
679impl<T> std::iter::FromIterator<(T, i64)> for MutableAntichain<T>
680where
681    T: Clone + PartialOrder + Ord,
682{
683    fn from_iter<I>(iterator: I) -> Self
684    where
685        I: IntoIterator<Item=(T, i64)>,
686    {
687        let mut result = Self::new();
688        result.update_iter(iterator);
689        result
690    }
691}
692
693/// A wrapper for elements of an antichain.
694#[derive(Debug)]
695pub struct AntichainRef<'a, T: 'a> {
696    /// Elements contained in the antichain.
697    frontier: &'a [T],
698}
699
700impl<'a, T: 'a> Clone for AntichainRef<'a, T> {
701    fn clone(&self) -> Self { *self }
702}
703
704impl<'a, T: 'a> Copy for AntichainRef<'a, T> { }
705
706impl<'a, T: 'a> AntichainRef<'a, T> {
707    /// Create a new `AntichainRef` from a reference to a slice of elements forming the frontier.
708    ///
709    /// This method does not check that this antichain has any particular properties, for example
710    /// that there are no elements strictly less than other elements.
711    pub fn new(frontier: &'a [T]) -> Self {
712        Self {
713            frontier,
714        }
715    }
716
717    /// Constructs an owned antichain from the antichain reference.
718    ///
719    /// # Examples
720    ///
721    ///```
722    /// use timely::progress::{Antichain, frontier::AntichainRef};
723    ///
724    /// let frontier = AntichainRef::new(&[1u64]);
725    /// assert_eq!(frontier.to_owned(), Antichain::from_elem(1u64));
726    ///```
727    pub fn to_owned(&self) -> Antichain<T> where T: Clone {
728        Antichain {
729            elements: self.frontier.into()
730        }
731    }
732}
733
734impl<T> AntichainRef<'_, T> {
735
736    /// Returns `true` if any item in the `AntichainRef` is strictly less than the argument.
737    ///
738    /// # Examples
739    ///
740    ///```
741    /// use timely::progress::frontier::AntichainRef;
742    ///
743    /// let frontier = AntichainRef::new(&[1u64]);
744    /// assert!(!frontier.less_than(&0));
745    /// assert!(!frontier.less_than(&1));
746    /// assert!(frontier.less_than(&2));
747    ///```
748    #[inline]
749    pub fn less_than<O>(&self, time: &O) -> bool where T: PartialOrder<O> {
750        self.iter().any(|x| x.less_than(time))
751    }
752
753    /// Returns `true` if any item in the `AntichainRef` is less than or equal to the argument.
754    #[inline]
755    ///
756    /// # Examples
757    ///
758    ///```
759    /// use timely::progress::frontier::AntichainRef;
760    ///
761    /// let frontier = AntichainRef::new(&[1u64]);
762    /// assert!(!frontier.less_equal(&0));
763    /// assert!(frontier.less_equal(&1));
764    /// assert!(frontier.less_equal(&2));
765    ///```
766    pub fn less_equal<O>(&self, time: &O) -> bool where T: PartialOrder<O> {
767        self.iter().any(|x| x.less_equal(time))
768    }
769}
770
771impl<T: PartialEq> PartialEq for AntichainRef<'_, T> {
772    fn eq(&self, other: &Self) -> bool {
773        // Lengths should be the same, with the option for fast acceptance if identical.
774        self.len() == other.len() &&
775        (
776            self.iter().zip(other.iter()).all(|(t1,t2)| t1 == t2) ||
777            self.iter().all(|t1| other.iter().any(|t2| t1.eq(t2)))
778        )
779    }
780}
781
782impl<T: Eq> Eq for AntichainRef<'_, T> { }
783
784impl<T: PartialOrder> PartialOrder for AntichainRef<'_, T> {
785    fn less_equal(&self, other: &Self) -> bool {
786        other.iter().all(|t2| self.iter().any(|t1| t1.less_equal(t2)))
787    }
788}
789
790impl<T: TotalOrder> TotalOrder for AntichainRef<'_, T> { }
791
792impl<T: TotalOrder> AntichainRef<'_, T> {
793    /// Return a reference to the at most one element the antichain contains.
794    pub fn as_option(&self) -> Option<&T> {
795        debug_assert!(self.len() <= 1);
796        self.frontier.last()
797    }
798}
799
800impl<T> ::std::ops::Deref for AntichainRef<'_, T> {
801    type Target = [T];
802    fn deref(&self) -> &Self::Target {
803        self.frontier
804    }
805}
806
807impl<'a, T: 'a> ::std::iter::IntoIterator for &'a AntichainRef<'a, T> {
808    type Item = &'a T;
809    type IntoIter = ::std::slice::Iter<'a, T>;
810    fn into_iter(self) -> Self::IntoIter {
811        self.iter()
812    }
813}
814
815#[cfg(test)]
816mod tests {
817    use std::collections::HashSet;
818
819    use super::*;
820
821    #[derive(PartialEq, Eq, PartialOrd, Ord, Hash)]
822    struct Elem(char, usize);
823
824    impl PartialOrder for Elem {
825        fn less_equal(&self, other: &Self) -> bool {
826            self.0 <= other.0 && self.1 <= other.1
827        }
828    }
829
830    #[test]
831    fn antichain_hash() {
832        let mut hashed = HashSet::new();
833        hashed.insert(Antichain::from(vec![Elem('a', 2), Elem('b', 1)]));
834
835        assert!(hashed.contains(&Antichain::from(vec![Elem('a', 2), Elem('b', 1)])));
836        assert!(hashed.contains(&Antichain::from(vec![Elem('b', 1), Elem('a', 2)])));
837
838        assert!(!hashed.contains(&Antichain::from(vec![Elem('a', 2)])));
839        assert!(!hashed.contains(&Antichain::from(vec![Elem('a', 1)])));
840        assert!(!hashed.contains(&Antichain::from(vec![Elem('b', 2)])));
841        assert!(!hashed.contains(&Antichain::from(vec![Elem('a', 1), Elem('b', 2)])));
842        assert!(!hashed.contains(&Antichain::from(vec![Elem('c', 3)])));
843        assert!(!hashed.contains(&Antichain::from(vec![])));
844    }
845
846    #[test]
847    fn mutable_compaction() {
848        let mut mutable = MutableAntichain::new();
849        mutable.update_iter(Some((7, 1)));
850        mutable.update_iter(Some((7, 1)));
851        mutable.update_iter(Some((7, 1)));
852        mutable.update_iter(Some((7, 1)));
853        mutable.update_iter(Some((7, 1)));
854        mutable.update_iter(Some((7, 1)));
855        mutable.update_iter(Some((8, 1)));
856        mutable.update_iter(Some((8, 1)));
857        mutable.update_iter(Some((8, 1)));
858        mutable.update_iter(Some((8, 1)));
859        mutable.update_iter(Some((8, 1)));
860        for _ in 0 .. 1000 {
861            mutable.update_iter(Some((9, 1)));
862            mutable.update_iter(Some((9, -1)));
863        }
864        assert!(mutable.updates.unstable_internal_updates().len() <= 32);
865    }
866}