timely/progress/frontier.rs
1//! Tracks minimal sets of mutually incomparable elements of a partial order.
2
3use serde::{Deserialize, Serialize};
4use smallvec::SmallVec;
5
6use crate::progress::ChangeBatch;
7use crate::order::{PartialOrder, TotalOrder};
8
9/// A set of mutually incomparable elements.
10///
11/// An antichain is a set of partially ordered elements, each of which is incomparable to the others.
12/// This antichain implementation allows you to repeatedly introduce elements to the antichain, and
13/// which will evict larger elements to maintain the *minimal* antichain, those incomparable elements
14/// no greater than any other element.
15///
16/// Two antichains are equal if they contain the same set of elements, even if in different orders.
17/// This can make equality testing quadratic, though linear in the common case that the sequences
18/// are identical.
19#[derive(Debug, Serialize, Deserialize)]
20pub struct Antichain<T> {
21 elements: SmallVec<[T; 1]>
22}
23
24impl<T: PartialOrder> Antichain<T> {
25 /// Updates the `Antichain` if the element is not greater than or equal to some present element.
26 ///
27 /// Returns `true` if element is added to the set
28 ///
29 /// # Examples
30 ///
31 ///```
32 /// use timely::progress::frontier::Antichain;
33 ///
34 /// let mut frontier = Antichain::new();
35 /// assert!(frontier.insert(2));
36 /// assert!(!frontier.insert(3));
37 ///```
38 pub fn insert(&mut self, element: T) -> bool {
39 if !self.elements.iter().any(|x| x.less_equal(&element)) {
40 self.elements.retain(|x| !element.less_equal(x));
41 self.elements.push(element);
42 true
43 }
44 else {
45 false
46 }
47 }
48
49 /// Updates the `Antichain` if the element is not greater than or equal to some present element.
50 ///
51 /// Returns `true` if element is added to the set
52 ///
53 /// Accepts a reference to an element, which is cloned when inserting.
54 ///
55 /// # Examples
56 ///
57 ///```
58 /// use timely::progress::frontier::Antichain;
59 ///
60 /// let mut frontier = Antichain::new();
61 /// assert!(frontier.insert_ref(&2));
62 /// assert!(!frontier.insert(3));
63 ///```
64 pub fn insert_ref(&mut self, element: &T) -> bool where T: Clone {
65 if !self.elements.iter().any(|x| x.less_equal(element)) {
66 self.elements.retain(|x| !element.less_equal(x));
67 self.elements.push(element.clone());
68 true
69 }
70 else {
71 false
72 }
73 }
74
75 /// Updates the `Antichain` if the element is not greater than or equal to some present element.
76 /// If the antichain needs updating, it uses the `to_owned` closure to convert the element into
77 /// a `T`.
78 ///
79 /// Returns `true` if element is added to the set
80 ///
81 /// # Examples
82 ///
83 ///```
84 /// use timely::progress::frontier::Antichain;
85 ///
86 /// let mut frontier = Antichain::new();
87 /// assert!(frontier.insert_with(&2, |x| *x));
88 /// assert!(!frontier.insert(3));
89 ///```
90 pub fn insert_with<O: PartialOrder<T>, F: FnOnce(&O) -> T>(&mut self, element: &O, to_owned: F) -> bool where T: PartialOrder<O> {
91 if !self.elements.iter().any(|x| x.less_equal(element)) {
92 self.elements.retain(|x| !element.less_equal(x));
93 self.elements.push(to_owned(element));
94 true
95 }
96 else {
97 false
98 }
99 }
100
101 /// Reserves capacity for at least additional more elements to be inserted in the given `Antichain`
102 pub fn reserve(&mut self, additional: usize) {
103 self.elements.reserve(additional);
104 }
105
106 /// Performs a sequence of insertion and returns `true` iff any insertion does.
107 ///
108 /// # Examples
109 ///
110 ///```
111 /// use timely::progress::frontier::Antichain;
112 ///
113 /// let mut frontier = Antichain::new();
114 /// assert!(frontier.extend(Some(3)));
115 /// assert!(frontier.extend(vec![2, 5]));
116 /// assert!(!frontier.extend(vec![3, 4]));
117 ///```
118 pub fn extend<I: IntoIterator<Item=T>>(&mut self, iterator: I) -> bool {
119 let mut added = false;
120 for element in iterator {
121 added = self.insert(element) || added;
122 }
123 added
124 }
125
126 /// Returns `true` if any item in the antichain is strictly less than the argument.
127 ///
128 /// # Examples
129 ///
130 ///```
131 /// use timely::progress::frontier::Antichain;
132 ///
133 /// let mut frontier = Antichain::from_elem(2);
134 /// assert!(frontier.less_than(&3));
135 /// assert!(!frontier.less_than(&2));
136 /// assert!(!frontier.less_than(&1));
137 ///
138 /// frontier.clear();
139 /// assert!(!frontier.less_than(&3));
140 ///```
141 #[inline]
142 pub fn less_than(&self, time: &T) -> bool {
143 self.elements.iter().any(|x| x.less_than(time))
144 }
145
146 /// Returns `true` if any item in the antichain is less than or equal to the argument.
147 ///
148 /// # Examples
149 ///
150 ///```
151 /// use timely::progress::frontier::Antichain;
152 ///
153 /// let mut frontier = Antichain::from_elem(2);
154 /// assert!(frontier.less_equal(&3));
155 /// assert!(frontier.less_equal(&2));
156 /// assert!(!frontier.less_equal(&1));
157 ///
158 /// frontier.clear();
159 /// assert!(!frontier.less_equal(&3));
160 ///```
161 #[inline]
162 pub fn less_equal(&self, time: &T) -> bool {
163 self.elements.iter().any(|x| x.less_equal(time))
164 }
165
166 /// Returns `true` if every element of `other` is greater or equal to some element of `self`.
167 #[deprecated(since="0.12.0", note="please use `PartialOrder::less_equal` instead")]
168 #[inline]
169 pub fn dominates(&self, other: &Antichain<T>) -> bool {
170 <Self as PartialOrder>::less_equal(self, other)
171 }
172}
173
174impl<T: PartialOrder> std::iter::FromIterator<T> for Antichain<T> {
175 fn from_iter<I>(iterator: I) -> Self
176 where
177 I: IntoIterator<Item=T>
178 {
179 let mut result = Self::new();
180 result.extend(iterator);
181 result
182 }
183}
184
185impl<T> Antichain<T> {
186
187 /// Creates a new empty `Antichain`.
188 ///
189 /// # Examples
190 ///
191 ///```
192 /// use timely::progress::frontier::Antichain;
193 ///
194 /// let mut frontier = Antichain::<u32>::new();
195 ///```
196 pub fn new() -> Antichain<T> { Antichain { elements: SmallVec::new() } }
197
198 /// Creates a new empty `Antichain` with space for `capacity` elements.
199 ///
200 /// # Examples
201 ///
202 ///```
203 /// use timely::progress::frontier::Antichain;
204 ///
205 /// let mut frontier = Antichain::<u32>::with_capacity(10);
206 ///```
207 pub fn with_capacity(capacity: usize) -> Self {
208 Self {
209 elements: SmallVec::with_capacity(capacity),
210 }
211 }
212
213 /// Creates a new singleton `Antichain`.
214 ///
215 /// # Examples
216 ///
217 ///```
218 /// use timely::progress::frontier::Antichain;
219 ///
220 /// let mut frontier = Antichain::from_elem(2);
221 ///```
222 pub fn from_elem(element: T) -> Antichain<T> {
223 let mut elements = SmallVec::with_capacity(1);
224 elements.push(element);
225 Antichain { elements }
226 }
227
228 /// Clears the contents of the antichain.
229 ///
230 /// # Examples
231 ///
232 ///```
233 /// use timely::progress::frontier::Antichain;
234 ///
235 /// let mut frontier = Antichain::from_elem(2);
236 /// frontier.clear();
237 /// assert!(frontier.elements().is_empty());
238 ///```
239 pub fn clear(&mut self) { self.elements.clear() }
240
241 /// Sorts the elements so that comparisons between antichains can be made.
242 pub fn sort(&mut self) where T: Ord { self.elements.sort() }
243
244 /// Reveals the elements in the antichain.
245 ///
246 /// This method is redundant with `<Antichain<T> as Deref>`, but the method
247 /// is in such broad use that we probably don't want to deprecate it without
248 /// some time to fix all things.
249 ///
250 /// # Examples
251 ///
252 ///```
253 /// use timely::progress::frontier::Antichain;
254 ///
255 /// let mut frontier = Antichain::from_elem(2);
256 /// assert_eq!(frontier.elements(), &[2]);
257 ///```
258 #[inline] pub fn elements(&self) -> &[T] { &self[..] }
259
260 /// Reveals the elements in the antichain.
261 ///
262 /// # Examples
263 ///
264 ///```
265 /// use timely::progress::frontier::Antichain;
266 ///
267 /// let mut frontier = Antichain::from_elem(2);
268 /// assert_eq!(&*frontier.borrow(), &[2]);
269 ///```
270 #[inline] pub fn borrow(&self) -> AntichainRef<T> { AntichainRef::new(&self.elements) }}
271
272impl<T: PartialEq> PartialEq for Antichain<T> {
273 fn eq(&self, other: &Self) -> bool {
274 // Lengths should be the same, with the option for fast acceptance if identical.
275 self.elements().len() == other.elements().len() &&
276 (
277 self.elements().iter().zip(other.elements().iter()).all(|(t1,t2)| t1 == t2) ||
278 self.elements().iter().all(|t1| other.elements().iter().any(|t2| t1.eq(t2)))
279 )
280 }
281}
282
283impl<T: Eq> Eq for Antichain<T> { }
284
285impl<T: PartialOrder> PartialOrder for Antichain<T> {
286 fn less_equal(&self, other: &Self) -> bool {
287 other.elements().iter().all(|t2| self.elements().iter().any(|t1| t1.less_equal(t2)))
288 }
289}
290
291impl<T: Clone> Clone for Antichain<T> {
292 fn clone(&self) -> Self {
293 Antichain { elements: self.elements.clone() }
294 }
295 fn clone_from(&mut self, source: &Self) {
296 self.elements.clone_from(&source.elements)
297 }
298}
299
300impl<T> Default for Antichain<T> {
301 fn default() -> Self {
302 Self::new()
303 }
304}
305
306impl<T: TotalOrder> TotalOrder for Antichain<T> { }
307
308impl<T: TotalOrder> Antichain<T> {
309 /// Convert to the at most one element the antichain contains.
310 pub fn into_option(mut self) -> Option<T> {
311 debug_assert!(self.len() <= 1);
312 self.elements.pop()
313 }
314 /// Return a reference to the at most one element the antichain contains.
315 pub fn as_option(&self) -> Option<&T> {
316 debug_assert!(self.len() <= 1);
317 self.elements.last()
318 }
319}
320
321impl<T: Ord+std::hash::Hash> std::hash::Hash for Antichain<T> {
322 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
323 let mut temp = self.elements.iter().collect::<Vec<_>>();
324 temp.sort();
325 for element in temp {
326 element.hash(state);
327 }
328 }
329}
330
331impl<T: PartialOrder> From<Vec<T>> for Antichain<T> {
332 fn from(vec: Vec<T>) -> Self {
333 // TODO: We could reuse `vec` with some care.
334 let mut temp = Antichain::new();
335 for elem in vec.into_iter() { temp.insert(elem); }
336 temp
337 }
338}
339
340impl<T> From<Antichain<T>> for SmallVec<[T; 1]> {
341 fn from(val: Antichain<T>) -> Self {
342 val.elements
343 }
344}
345
346impl<T> ::std::ops::Deref for Antichain<T> {
347 type Target = [T];
348 fn deref(&self) -> &Self::Target {
349 &self.elements
350 }
351}
352
353impl<T> ::std::iter::IntoIterator for Antichain<T> {
354 type Item = T;
355 type IntoIter = smallvec::IntoIter<[T; 1]>;
356 fn into_iter(self) -> Self::IntoIter {
357 self.elements.into_iter()
358 }
359}
360
361/// An antichain based on a multiset whose elements frequencies can be updated.
362///
363/// The `MutableAntichain` maintains frequencies for many elements of type `T`, and exposes the set
364/// of elements with positive count not greater than any other elements with positive count. The
365/// antichain may both advance and retreat; the changes do not all need to be to elements greater or
366/// equal to some elements of the frontier.
367///
368/// The type `T` must implement `PartialOrder` as well as `Ord`. The implementation of the `Ord` trait
369/// is used to efficiently organize the updates for cancellation, and to efficiently determine the lower
370/// bounds, and only needs to not contradict the `PartialOrder` implementation (that is, if `PartialOrder`
371/// orders two elements, then so does the `Ord` implementation).
372///
373/// The `MutableAntichain` implementation is done with the intent that updates to it are done in batches,
374/// and it is acceptable to rebuild the frontier from scratch when a batch of updates change it. This means
375/// that it can be expensive to maintain a large number of counts and change few elements near the frontier.
376#[derive(Clone, Debug, Serialize, Deserialize)]
377pub struct MutableAntichain<T> {
378 updates: ChangeBatch<T>,
379 frontier: Vec<T>,
380 changes: ChangeBatch<T>,
381}
382
383impl<T> MutableAntichain<T> {
384 /// Creates a new empty `MutableAntichain`.
385 ///
386 /// # Examples
387 ///
388 ///```
389 /// use timely::progress::frontier::MutableAntichain;
390 ///
391 /// let frontier = MutableAntichain::<usize>::new();
392 /// assert!(frontier.is_empty());
393 ///```
394 #[inline]
395 pub fn new() -> MutableAntichain<T> {
396 MutableAntichain {
397 updates: ChangeBatch::new(),
398 frontier: Vec::new(),
399 changes: ChangeBatch::new(),
400 }
401 }
402
403 /// Removes all elements.
404 ///
405 /// # Examples
406 ///
407 ///```
408 /// use timely::progress::frontier::MutableAntichain;
409 ///
410 /// let mut frontier = MutableAntichain::<usize>::new();
411 /// frontier.clear();
412 /// assert!(frontier.is_empty());
413 ///```
414 #[inline]
415 pub fn clear(&mut self) {
416 self.updates.clear();
417 self.frontier.clear();
418 self.changes.clear();
419 }
420
421 /// Reveals the minimal elements with positive count.
422 ///
423 /// # Examples
424 ///
425 ///```
426 /// use timely::progress::frontier::MutableAntichain;
427 ///
428 /// let mut frontier = MutableAntichain::<usize>::new();
429 /// assert!(frontier.frontier().len() == 0);
430 ///```
431 #[inline]
432 pub fn frontier(&self) -> AntichainRef<'_, T> {
433 AntichainRef::new(&self.frontier)
434 }
435
436 /// Creates a new singleton `MutableAntichain`.
437 ///
438 /// # Examples
439 ///
440 ///```
441 /// use timely::progress::frontier::{AntichainRef, MutableAntichain};
442 ///
443 /// let mut frontier = MutableAntichain::new_bottom(0u64);
444 /// assert!(frontier.frontier() == AntichainRef::new(&[0u64]));
445 ///```
446 #[inline]
447 pub fn new_bottom(bottom: T) -> MutableAntichain<T>
448 where
449 T: Ord+Clone,
450 {
451 MutableAntichain {
452 updates: ChangeBatch::new_from(bottom.clone(), 1),
453 frontier: vec![bottom],
454 changes: ChangeBatch::new(),
455 }
456 }
457
458 /// Returns `true` if there are no elements in the `MutableAntichain`.
459 ///
460 /// # Examples
461 ///
462 ///```
463 /// use timely::progress::frontier::MutableAntichain;
464 ///
465 /// let mut frontier = MutableAntichain::<usize>::new();
466 /// assert!(frontier.is_empty());
467 ///```
468 #[inline]
469 pub fn is_empty(&self) -> bool {
470 self.frontier.is_empty()
471 }
472
473 /// Returns `true` if any item in the `MutableAntichain` is strictly less than the argument.
474 ///
475 /// # Examples
476 ///
477 ///```
478 /// use timely::progress::frontier::MutableAntichain;
479 ///
480 /// let mut frontier = MutableAntichain::new_bottom(1u64);
481 /// assert!(!frontier.less_than(&0));
482 /// assert!(!frontier.less_than(&1));
483 /// assert!(frontier.less_than(&2));
484 ///```
485 #[inline]
486 pub fn less_than<O>(&self, time: &O) -> bool
487 where
488 T: PartialOrder<O>,
489 {
490 self.frontier().less_than(time)
491 }
492
493 /// Returns `true` if any item in the `MutableAntichain` is less than or equal to the argument.
494 ///
495 /// # Examples
496 ///
497 ///```
498 /// use timely::progress::frontier::MutableAntichain;
499 ///
500 /// let mut frontier = MutableAntichain::new_bottom(1u64);
501 /// assert!(!frontier.less_equal(&0));
502 /// assert!(frontier.less_equal(&1));
503 /// assert!(frontier.less_equal(&2));
504 ///```
505 #[inline]
506 pub fn less_equal<O>(&self, time: &O) -> bool
507 where
508 T: PartialOrder<O>,
509 {
510 self.frontier().less_equal(time)
511 }
512
513 /// Applies updates to the antichain and enumerates any changes.
514 ///
515 /// # Examples
516 ///
517 ///```
518 /// use timely::progress::frontier::{AntichainRef, MutableAntichain};
519 ///
520 /// let mut frontier = MutableAntichain::new_bottom(1u64);
521 /// let changes =
522 /// frontier
523 /// .update_iter(vec![(1, -1), (2, 7)])
524 /// .collect::<Vec<_>>();
525 ///
526 /// assert!(frontier.frontier() == AntichainRef::new(&[2]));
527 /// assert!(changes == vec![(1, -1), (2, 1)]);
528 ///```
529 #[inline]
530 pub fn update_iter<I>(&mut self, updates: I) -> smallvec::Drain<'_, [(T, i64); 2]>
531 where
532 T: Clone + PartialOrder + Ord,
533 I: IntoIterator<Item = (T, i64)>,
534 {
535 let updates = updates.into_iter();
536
537 // track whether a rebuild is needed.
538 let mut rebuild_required = false;
539 for (time, delta) in updates {
540
541 // If we do not yet require a rebuild, test whether we might require one
542 // and set the flag in that case.
543 if !rebuild_required {
544 let beyond_frontier = self.frontier.iter().any(|f| f.less_than(&time));
545 let before_frontier = !self.frontier.iter().any(|f| f.less_equal(&time));
546 rebuild_required = !(beyond_frontier || (delta < 0 && before_frontier));
547 }
548
549 self.updates.update(time, delta);
550 }
551
552 if rebuild_required {
553 self.rebuild()
554 }
555 self.changes.drain()
556 }
557
558 /// Rebuilds `self.frontier` from `self.updates`.
559 ///
560 /// This method is meant to be used for bulk updates to the frontier, and does more work than one might do
561 /// for single updates, but is meant to be an efficient way to process multiple updates together. This is
562 /// especially true when we want to apply very large numbers of updates.
563 fn rebuild(&mut self)
564 where
565 T: Clone + PartialOrder + Ord,
566 {
567 for time in self.frontier.drain(..) {
568 self.changes.update(time, -1);
569 }
570
571 // build new frontier using strictly positive times.
572 // as the times are sorted, we don't need to worry that we might displace frontier elements.
573 for time in self.updates.iter().filter(|x| x.1 > 0) {
574 if !self.frontier.iter().any(|f| f.less_equal(&time.0)) {
575 self.frontier.push(time.0.clone());
576 }
577 }
578
579 for time in self.frontier.iter() {
580 self.changes.update(time.clone(), 1);
581 }
582 }
583
584 /// Reports the count for a queried time.
585 pub fn count_for<O>(&self, query_time: &O) -> i64
586 where
587 T: PartialEq<O>,
588 {
589 self.updates
590 .unstable_internal_updates()
591 .iter()
592 .filter(|td| td.0.eq(query_time))
593 .map(|td| td.1)
594 .sum()
595 }
596
597 /// Reports the updates that form the frontier. Returns an iterator of timestamps and their frequency.
598 ///
599 /// Rebuilds the internal representation before revealing times and frequencies.
600 pub fn updates(&mut self) -> impl Iterator<Item=&(T, i64)>
601 where
602 T: Clone + PartialOrder + Ord,
603 {
604 self.rebuild();
605 self.updates.iter()
606 }
607}
608
609impl<T> Default for MutableAntichain<T> {
610 fn default() -> Self {
611 Self::new()
612 }
613}
614
615/// Extension trait for filtering time changes through antichains.
616pub trait MutableAntichainFilter<T: PartialOrder+Ord+Clone> {
617 /// Filters time changes through an antichain.
618 ///
619 /// # Examples
620 ///
621 /// ```
622 /// use timely::progress::frontier::{MutableAntichain, MutableAntichainFilter};
623 ///
624 /// let mut frontier = MutableAntichain::new_bottom(1u64);
625 /// let changes =
626 /// vec![(1, -1), (2, 7)]
627 /// .filter_through(&mut frontier)
628 /// .collect::<Vec<_>>();
629 ///
630 /// assert!(changes == vec![(1, -1), (2, 1)]);
631 /// ```
632 fn filter_through(self, antichain: &mut MutableAntichain<T>) -> smallvec::Drain<[(T,i64); 2]>;
633}
634
635impl<T: PartialOrder+Ord+Clone, I: IntoIterator<Item=(T,i64)>> MutableAntichainFilter<T> for I {
636 fn filter_through(self, antichain: &mut MutableAntichain<T>) -> smallvec::Drain<[(T,i64); 2]> {
637 antichain.update_iter(self)
638 }
639}
640
641impl<T: PartialOrder+Ord+Clone> From<Antichain<T>> for MutableAntichain<T> {
642 fn from(antichain: Antichain<T>) -> Self {
643 let mut result = MutableAntichain::new();
644 result.update_iter(antichain.into_iter().map(|time| (time, 1)));
645 result
646 }
647}
648impl<'a, T: PartialOrder+Ord+Clone> From<AntichainRef<'a, T>> for MutableAntichain<T> {
649 fn from(antichain: AntichainRef<'a, T>) -> Self {
650 let mut result = MutableAntichain::new();
651 result.update_iter(antichain.into_iter().map(|time| (time.clone(), 1)));
652 result
653 }
654}
655
656impl<T> std::iter::FromIterator<(T, i64)> for MutableAntichain<T>
657where
658 T: Clone + PartialOrder + Ord,
659{
660 fn from_iter<I>(iterator: I) -> Self
661 where
662 I: IntoIterator<Item=(T, i64)>,
663 {
664 let mut result = Self::new();
665 result.update_iter(iterator);
666 result
667 }
668}
669
670/// A wrapper for elements of an antichain.
671#[derive(Debug)]
672pub struct AntichainRef<'a, T: 'a> {
673 /// Elements contained in the antichain.
674 frontier: &'a [T],
675}
676
677impl<'a, T: 'a> Clone for AntichainRef<'a, T> {
678 fn clone(&self) -> Self { *self }
679}
680
681impl<'a, T: 'a> Copy for AntichainRef<'a, T> { }
682
683impl<'a, T: 'a> AntichainRef<'a, T> {
684 /// Create a new `AntichainRef` from a reference to a slice of elements forming the frontier.
685 ///
686 /// This method does not check that this antichain has any particular properties, for example
687 /// that there are no elements strictly less than other elements.
688 pub fn new(frontier: &'a [T]) -> Self {
689 Self {
690 frontier,
691 }
692 }
693
694 /// Constructs an owned antichain from the antichain reference.
695 ///
696 /// # Examples
697 ///
698 ///```
699 /// use timely::progress::{Antichain, frontier::AntichainRef};
700 ///
701 /// let frontier = AntichainRef::new(&[1u64]);
702 /// assert_eq!(frontier.to_owned(), Antichain::from_elem(1u64));
703 ///```
704 pub fn to_owned(&self) -> Antichain<T> where T: Clone {
705 Antichain {
706 elements: self.frontier.into()
707 }
708 }
709}
710
711impl<T> AntichainRef<'_, T> {
712
713 /// Returns `true` if any item in the `AntichainRef` is strictly less than the argument.
714 ///
715 /// # Examples
716 ///
717 ///```
718 /// use timely::progress::frontier::AntichainRef;
719 ///
720 /// let frontier = AntichainRef::new(&[1u64]);
721 /// assert!(!frontier.less_than(&0));
722 /// assert!(!frontier.less_than(&1));
723 /// assert!(frontier.less_than(&2));
724 ///```
725 #[inline]
726 pub fn less_than<O>(&self, time: &O) -> bool where T: PartialOrder<O> {
727 self.iter().any(|x| x.less_than(time))
728 }
729
730 /// Returns `true` if any item in the `AntichainRef` is less than or equal to the argument.
731 #[inline]
732 ///
733 /// # Examples
734 ///
735 ///```
736 /// use timely::progress::frontier::AntichainRef;
737 ///
738 /// let frontier = AntichainRef::new(&[1u64]);
739 /// assert!(!frontier.less_equal(&0));
740 /// assert!(frontier.less_equal(&1));
741 /// assert!(frontier.less_equal(&2));
742 ///```
743 pub fn less_equal<O>(&self, time: &O) -> bool where T: PartialOrder<O> {
744 self.iter().any(|x| x.less_equal(time))
745 }
746}
747
748impl<T: PartialEq> PartialEq for AntichainRef<'_, T> {
749 fn eq(&self, other: &Self) -> bool {
750 // Lengths should be the same, with the option for fast acceptance if identical.
751 self.len() == other.len() &&
752 (
753 self.iter().zip(other.iter()).all(|(t1,t2)| t1 == t2) ||
754 self.iter().all(|t1| other.iter().any(|t2| t1.eq(t2)))
755 )
756 }
757}
758
759impl<T: Eq> Eq for AntichainRef<'_, T> { }
760
761impl<T: PartialOrder> PartialOrder for AntichainRef<'_, T> {
762 fn less_equal(&self, other: &Self) -> bool {
763 other.iter().all(|t2| self.iter().any(|t1| t1.less_equal(t2)))
764 }
765}
766
767impl<T: TotalOrder> TotalOrder for AntichainRef<'_, T> { }
768
769impl<T: TotalOrder> AntichainRef<'_, T> {
770 /// Return a reference to the at most one element the antichain contains.
771 pub fn as_option(&self) -> Option<&T> {
772 debug_assert!(self.len() <= 1);
773 self.frontier.last()
774 }
775}
776
777impl<T> ::std::ops::Deref for AntichainRef<'_, T> {
778 type Target = [T];
779 fn deref(&self) -> &Self::Target {
780 self.frontier
781 }
782}
783
784impl<'a, T: 'a> ::std::iter::IntoIterator for &'a AntichainRef<'a, T> {
785 type Item = &'a T;
786 type IntoIter = ::std::slice::Iter<'a, T>;
787 fn into_iter(self) -> Self::IntoIter {
788 self.iter()
789 }
790}
791
792#[cfg(test)]
793mod tests {
794 use std::collections::HashSet;
795
796 use super::*;
797
798 #[derive(PartialEq, Eq, PartialOrd, Ord, Hash)]
799 struct Elem(char, usize);
800
801 impl PartialOrder for Elem {
802 fn less_equal(&self, other: &Self) -> bool {
803 self.0 <= other.0 && self.1 <= other.1
804 }
805 }
806
807 #[test]
808 fn antichain_hash() {
809 let mut hashed = HashSet::new();
810 hashed.insert(Antichain::from(vec![Elem('a', 2), Elem('b', 1)]));
811
812 assert!(hashed.contains(&Antichain::from(vec![Elem('a', 2), Elem('b', 1)])));
813 assert!(hashed.contains(&Antichain::from(vec![Elem('b', 1), Elem('a', 2)])));
814
815 assert!(!hashed.contains(&Antichain::from(vec![Elem('a', 2)])));
816 assert!(!hashed.contains(&Antichain::from(vec![Elem('a', 1)])));
817 assert!(!hashed.contains(&Antichain::from(vec![Elem('b', 2)])));
818 assert!(!hashed.contains(&Antichain::from(vec![Elem('a', 1), Elem('b', 2)])));
819 assert!(!hashed.contains(&Antichain::from(vec![Elem('c', 3)])));
820 assert!(!hashed.contains(&Antichain::from(vec![])));
821 }
822
823 #[test]
824 fn mutable_compaction() {
825 let mut mutable = MutableAntichain::new();
826 mutable.update_iter(Some((7, 1)));
827 mutable.update_iter(Some((7, 1)));
828 mutable.update_iter(Some((7, 1)));
829 mutable.update_iter(Some((7, 1)));
830 mutable.update_iter(Some((7, 1)));
831 mutable.update_iter(Some((7, 1)));
832 mutable.update_iter(Some((8, 1)));
833 mutable.update_iter(Some((8, 1)));
834 mutable.update_iter(Some((8, 1)));
835 mutable.update_iter(Some((8, 1)));
836 mutable.update_iter(Some((8, 1)));
837 for _ in 0 .. 1000 {
838 mutable.update_iter(Some((9, 1)));
839 mutable.update_iter(Some((9, -1)));
840 }
841 assert!(mutable.updates.unstable_internal_updates().len() <= 32);
842 }
843}