timely/progress/frontier.rs
1//! Tracks minimal sets of mutually incomparable elements of a partial order.
2
3use serde::{Deserialize, Serialize};
4use smallvec::SmallVec;
5
6use crate::progress::ChangeBatch;
7use crate::order::{PartialOrder, TotalOrder};
8
9/// A set of mutually incomparable elements.
10///
11/// An antichain is a set of partially ordered elements, each of which is incomparable to the others.
12/// This antichain implementation allows you to repeatedly introduce elements to the antichain, and
13/// which will evict larger elements to maintain the *minimal* antichain, those incomparable elements
14/// no greater than any other element.
15///
16/// Two antichains are equal if they contain the same set of elements, even if in different orders.
17/// This can make equality testing quadratic, though linear in the common case that the sequences
18/// are identical.
19#[derive(Debug, Serialize, Deserialize)]
20pub struct Antichain<T> {
21 elements: SmallVec<[T; 1]>
22}
23
24impl<T: PartialOrder> Antichain<T> {
25 /// Updates the `Antichain` if the element is not greater than or equal to some present element.
26 ///
27 /// Returns `true` if element is added to the set
28 ///
29 /// # Examples
30 ///
31 ///```
32 /// use timely::progress::frontier::Antichain;
33 ///
34 /// let mut frontier = Antichain::new();
35 /// assert!(frontier.insert(2));
36 /// assert!(!frontier.insert(3));
37 ///```
38 pub fn insert(&mut self, element: T) -> bool {
39 if !self.elements.iter().any(|x| x.less_equal(&element)) {
40 self.elements.retain(|x| !element.less_equal(x));
41 self.elements.push(element);
42 true
43 }
44 else {
45 false
46 }
47 }
48
49 /// Updates the `Antichain` if the element is not greater than or equal to some present element.
50 ///
51 /// Returns `true` if element is added to the set
52 ///
53 /// Accepts a reference to an element, which is cloned when inserting.
54 ///
55 /// # Examples
56 ///
57 ///```
58 /// use timely::progress::frontier::Antichain;
59 ///
60 /// let mut frontier = Antichain::new();
61 /// assert!(frontier.insert_ref(&2));
62 /// assert!(!frontier.insert(3));
63 ///```
64 pub fn insert_ref(&mut self, element: &T) -> bool where T: Clone {
65 if !self.elements.iter().any(|x| x.less_equal(element)) {
66 self.elements.retain(|x| !element.less_equal(x));
67 self.elements.push(element.clone());
68 true
69 }
70 else {
71 false
72 }
73 }
74
75 /// Updates the `Antichain` if the element is not greater than or equal to some present element.
76 /// If the antichain needs updating, it uses the `to_owned` closure to convert the element into
77 /// a `T`.
78 ///
79 /// Returns `true` if element is added to the set
80 ///
81 /// # Examples
82 ///
83 ///```
84 /// use timely::progress::frontier::Antichain;
85 ///
86 /// let mut frontier = Antichain::new();
87 /// assert!(frontier.insert_with(&2, |x| *x));
88 /// assert!(!frontier.insert(3));
89 ///```
90 pub fn insert_with<O: PartialOrder<T>, F: FnOnce(&O) -> T>(&mut self, element: &O, to_owned: F) -> bool where T: PartialOrder<O> {
91 if !self.elements.iter().any(|x| x.less_equal(element)) {
92 self.elements.retain(|x| !element.less_equal(x));
93 self.elements.push(to_owned(element));
94 true
95 }
96 else {
97 false
98 }
99 }
100
101 /// Reserves capacity for at least additional more elements to be inserted in the given `Antichain`
102 pub fn reserve(&mut self, additional: usize) {
103 self.elements.reserve(additional);
104 }
105
106 /// Performs a sequence of insertion and returns `true` iff any insertion does.
107 ///
108 /// # Examples
109 ///
110 ///```
111 /// use timely::progress::frontier::Antichain;
112 ///
113 /// let mut frontier = Antichain::new();
114 /// assert!(frontier.extend(Some(3)));
115 /// assert!(frontier.extend(vec![2, 5]));
116 /// assert!(!frontier.extend(vec![3, 4]));
117 ///```
118 pub fn extend<I: IntoIterator<Item=T>>(&mut self, iterator: I) -> bool {
119 let mut added = false;
120 for element in iterator {
121 added = self.insert(element) || added;
122 }
123 added
124 }
125
126 /// Returns `true` if any item in the antichain is strictly less than the argument.
127 ///
128 /// # Examples
129 ///
130 ///```
131 /// use timely::progress::frontier::Antichain;
132 ///
133 /// let mut frontier = Antichain::from_elem(2);
134 /// assert!(frontier.less_than(&3));
135 /// assert!(!frontier.less_than(&2));
136 /// assert!(!frontier.less_than(&1));
137 ///
138 /// frontier.clear();
139 /// assert!(!frontier.less_than(&3));
140 ///```
141 #[inline]
142 pub fn less_than(&self, time: &T) -> bool {
143 self.elements.iter().any(|x| x.less_than(time))
144 }
145
146 /// Returns `true` if any item in the antichain is less than or equal to the argument.
147 ///
148 /// # Examples
149 ///
150 ///```
151 /// use timely::progress::frontier::Antichain;
152 ///
153 /// let mut frontier = Antichain::from_elem(2);
154 /// assert!(frontier.less_equal(&3));
155 /// assert!(frontier.less_equal(&2));
156 /// assert!(!frontier.less_equal(&1));
157 ///
158 /// frontier.clear();
159 /// assert!(!frontier.less_equal(&3));
160 ///```
161 #[inline]
162 pub fn less_equal(&self, time: &T) -> bool {
163 self.elements.iter().any(|x| x.less_equal(time))
164 }
165
166 /// Returns `true` if every element of `other` is greater or equal to some element of `self`.
167 #[deprecated(since="0.12.0", note="please use `PartialOrder::less_equal` instead")]
168 #[inline]
169 pub fn dominates(&self, other: &Antichain<T>) -> bool {
170 <Self as PartialOrder>::less_equal(self, other)
171 }
172}
173
174impl<T: PartialOrder> std::iter::FromIterator<T> for Antichain<T> {
175 fn from_iter<I>(iterator: I) -> Self
176 where
177 I: IntoIterator<Item=T>
178 {
179 let mut result = Self::new();
180 result.extend(iterator);
181 result
182 }
183}
184
185impl<T> Antichain<T> {
186
187 /// Creates a new empty `Antichain`.
188 ///
189 /// # Examples
190 ///
191 ///```
192 /// use timely::progress::frontier::Antichain;
193 ///
194 /// let mut frontier = Antichain::<u32>::new();
195 ///```
196 pub fn new() -> Antichain<T> { Antichain { elements: SmallVec::new() } }
197
198 /// Creates a new empty `Antichain` with space for `capacity` elements.
199 ///
200 /// # Examples
201 ///
202 ///```
203 /// use timely::progress::frontier::Antichain;
204 ///
205 /// let mut frontier = Antichain::<u32>::with_capacity(10);
206 ///```
207 pub fn with_capacity(capacity: usize) -> Self {
208 Self {
209 elements: SmallVec::with_capacity(capacity),
210 }
211 }
212
213 /// Creates a new singleton `Antichain`.
214 ///
215 /// # Examples
216 ///
217 ///```
218 /// use timely::progress::frontier::Antichain;
219 ///
220 /// let mut frontier = Antichain::from_elem(2);
221 ///```
222 pub fn from_elem(element: T) -> Antichain<T> {
223 let mut elements = SmallVec::with_capacity(1);
224 elements.push(element);
225 Antichain { elements }
226 }
227
228 /// Clears the contents of the antichain.
229 ///
230 /// # Examples
231 ///
232 ///```
233 /// use timely::progress::frontier::Antichain;
234 ///
235 /// let mut frontier = Antichain::from_elem(2);
236 /// frontier.clear();
237 /// assert!(frontier.elements().is_empty());
238 ///```
239 pub fn clear(&mut self) { self.elements.clear() }
240
241 /// Sorts the elements so that comparisons between antichains can be made.
242 pub fn sort(&mut self) where T: Ord { self.elements.sort() }
243
244 /// Reveals the elements in the antichain.
245 ///
246 /// This method is redundant with `<Antichain<T> as Deref>`, but the method
247 /// is in such broad use that we probably don't want to deprecate it without
248 /// some time to fix all things.
249 ///
250 /// # Examples
251 ///
252 ///```
253 /// use timely::progress::frontier::Antichain;
254 ///
255 /// let mut frontier = Antichain::from_elem(2);
256 /// assert_eq!(frontier.elements(), &[2]);
257 ///```
258 #[inline] pub fn elements(&self) -> &[T] { &self[..] }
259
260 /// Reveals the elements in the antichain.
261 ///
262 /// # Examples
263 ///
264 ///```
265 /// use timely::progress::frontier::Antichain;
266 ///
267 /// let mut frontier = Antichain::from_elem(2);
268 /// assert_eq!(&*frontier.borrow(), &[2]);
269 ///```
270 #[inline] pub fn borrow(&self) -> AntichainRef<'_, T> { AntichainRef::new(&self.elements) }}
271
272impl<T: PartialEq> PartialEq for Antichain<T> {
273 fn eq(&self, other: &Self) -> bool {
274 // Lengths should be the same, with the option for fast acceptance if identical.
275 self.elements().len() == other.elements().len() &&
276 (
277 self.elements().iter().zip(other.elements().iter()).all(|(t1,t2)| t1 == t2) ||
278 self.elements().iter().all(|t1| other.elements().iter().any(|t2| t1.eq(t2)))
279 )
280 }
281}
282
283impl<T: Eq> Eq for Antichain<T> { }
284
285impl<T: PartialOrder> PartialOrder for Antichain<T> {
286 fn less_equal(&self, other: &Self) -> bool {
287 other.elements().iter().all(|t2| self.elements().iter().any(|t1| t1.less_equal(t2)))
288 }
289}
290
291impl<T: Clone> Clone for Antichain<T> {
292 fn clone(&self) -> Self {
293 Antichain { elements: self.elements.clone() }
294 }
295 fn clone_from(&mut self, source: &Self) {
296 self.elements.clone_from(&source.elements)
297 }
298}
299
300impl<T> Default for Antichain<T> {
301 fn default() -> Self {
302 Self::new()
303 }
304}
305
306impl<T: TotalOrder> TotalOrder for Antichain<T> { }
307
308impl<T: TotalOrder> Antichain<T> {
309 /// Convert to the at most one element the antichain contains.
310 pub fn into_option(mut self) -> Option<T> {
311 debug_assert!(self.len() <= 1);
312 self.elements.pop()
313 }
314 /// Return a reference to the at most one element the antichain contains.
315 pub fn as_option(&self) -> Option<&T> {
316 debug_assert!(self.len() <= 1);
317 self.elements.last()
318 }
319}
320
321impl<T: Ord+std::hash::Hash> std::hash::Hash for Antichain<T> {
322 fn hash<H: std::hash::Hasher>(&self, state: &mut H) {
323 let mut temp = self.elements.iter().collect::<Vec<_>>();
324 temp.sort();
325 for element in temp {
326 element.hash(state);
327 }
328 }
329}
330
331impl<T: PartialOrder> From<Vec<T>> for Antichain<T> {
332 fn from(vec: Vec<T>) -> Self {
333 // TODO: We could reuse `vec` with some care.
334 let mut temp = Antichain::new();
335 for elem in vec.into_iter() { temp.insert(elem); }
336 temp
337 }
338}
339
340impl<T> From<Antichain<T>> for SmallVec<[T; 1]> {
341 fn from(val: Antichain<T>) -> Self {
342 val.elements
343 }
344}
345
346impl<T> ::std::ops::Deref for Antichain<T> {
347 type Target = [T];
348 fn deref(&self) -> &Self::Target {
349 &self.elements
350 }
351}
352
353impl<T> ::std::iter::IntoIterator for Antichain<T> {
354 type Item = T;
355 type IntoIter = smallvec::IntoIter<[T; 1]>;
356 fn into_iter(self) -> Self::IntoIter {
357 self.elements.into_iter()
358 }
359}
360
361/// An antichain based on a multiset whose elements frequencies can be updated.
362///
363/// The `MutableAntichain` maintains frequencies for many elements of type `T`, and exposes the set
364/// of elements with positive count not greater than any other elements with positive count. The
365/// antichain may both advance and retreat; the changes do not all need to be to elements greater or
366/// equal to some elements of the frontier.
367///
368/// The type `T` must implement `PartialOrder` as well as `Ord`. The implementation of the `Ord` trait
369/// is used to efficiently organize the updates for cancellation, and to efficiently determine the lower
370/// bounds, and only needs to not contradict the `PartialOrder` implementation (that is, if `PartialOrder`
371/// orders two elements, then so does the `Ord` implementation).
372///
373/// The `MutableAntichain` implementation is done with the intent that updates to it are done in batches,
374/// and it is acceptable to rebuild the frontier from scratch when a batch of updates change it. This means
375/// that it can be expensive to maintain a large number of counts and change few elements near the frontier.
376#[derive(Clone, Debug, Serialize, Deserialize)]
377pub struct MutableAntichain<T> {
378 updates: ChangeBatch<T>,
379 frontier: Vec<T>,
380 changes: ChangeBatch<T>,
381}
382
383impl<T> MutableAntichain<T> {
384 /// Creates a new empty `MutableAntichain`.
385 ///
386 /// # Examples
387 ///
388 ///```
389 /// use timely::progress::frontier::MutableAntichain;
390 ///
391 /// let frontier = MutableAntichain::<usize>::new();
392 /// assert!(frontier.is_empty());
393 ///```
394 #[inline]
395 pub fn new() -> MutableAntichain<T> {
396 MutableAntichain {
397 updates: ChangeBatch::new(),
398 frontier: Vec::new(),
399 changes: ChangeBatch::new(),
400 }
401 }
402
403 /// Removes all elements.
404 ///
405 /// # Examples
406 ///
407 ///```
408 /// use timely::progress::frontier::MutableAntichain;
409 ///
410 /// let mut frontier = MutableAntichain::<usize>::new();
411 /// frontier.clear();
412 /// assert!(frontier.is_empty());
413 ///```
414 #[inline]
415 pub fn clear(&mut self) {
416 self.updates.clear();
417 self.frontier.clear();
418 self.changes.clear();
419 }
420
421 /// Reveals the minimal elements with positive count.
422 ///
423 /// # Examples
424 ///
425 ///```
426 /// use timely::progress::frontier::MutableAntichain;
427 ///
428 /// let mut frontier = MutableAntichain::<usize>::new();
429 /// assert!(frontier.frontier().len() == 0);
430 ///```
431 #[inline]
432 pub fn frontier(&self) -> AntichainRef<'_, T> {
433 AntichainRef::new(&self.frontier)
434 }
435
436 /// Creates a new singleton `MutableAntichain`.
437 ///
438 /// # Examples
439 ///
440 ///```
441 /// use timely::progress::frontier::{AntichainRef, MutableAntichain};
442 ///
443 /// let mut frontier = MutableAntichain::from_elem(0u64);
444 /// assert!(frontier.frontier() == AntichainRef::new(&[0u64]));
445 ///```
446 #[inline]
447 pub fn from_elem(bottom: T) -> MutableAntichain<T>
448 where
449 T: Ord+Clone,
450 {
451 MutableAntichain {
452 updates: ChangeBatch::new_from(bottom.clone(), 1),
453 frontier: vec![bottom],
454 changes: ChangeBatch::new(),
455 }
456 }
457
458 /// Returns `true` if there are no elements in the `MutableAntichain`.
459 ///
460 /// # Examples
461 ///
462 ///```
463 /// use timely::progress::frontier::MutableAntichain;
464 ///
465 /// let mut frontier = MutableAntichain::<usize>::new();
466 /// assert!(frontier.is_empty());
467 ///```
468 #[inline]
469 pub fn is_empty(&self) -> bool {
470 self.frontier.is_empty()
471 }
472
473 /// Returns `true` if any item in the `MutableAntichain` is strictly less than the argument.
474 ///
475 /// # Examples
476 ///
477 ///```
478 /// use timely::progress::frontier::MutableAntichain;
479 ///
480 /// let mut frontier = MutableAntichain::from_elem(1u64);
481 /// assert!(!frontier.less_than(&0));
482 /// assert!(!frontier.less_than(&1));
483 /// assert!(frontier.less_than(&2));
484 ///```
485 #[inline]
486 pub fn less_than<O>(&self, time: &O) -> bool
487 where
488 T: PartialOrder<O>,
489 {
490 self.frontier().less_than(time)
491 }
492
493 /// Returns `true` if any item in the `MutableAntichain` is less than or equal to the argument.
494 ///
495 /// # Examples
496 ///
497 ///```
498 /// use timely::progress::frontier::MutableAntichain;
499 ///
500 /// let mut frontier = MutableAntichain::from_elem(1u64);
501 /// assert!(!frontier.less_equal(&0));
502 /// assert!(frontier.less_equal(&1));
503 /// assert!(frontier.less_equal(&2));
504 ///```
505 #[inline]
506 pub fn less_equal<O>(&self, time: &O) -> bool
507 where
508 T: PartialOrder<O>,
509 {
510 self.frontier().less_equal(time)
511 }
512
513 /// Applies updates to the antichain and enumerates any changes.
514 ///
515 /// # Examples
516 ///
517 ///```
518 /// use timely::progress::frontier::{AntichainRef, MutableAntichain};
519 ///
520 /// let mut frontier = MutableAntichain::from_elem(1u64);
521 /// let changes =
522 /// frontier
523 /// .update_iter(vec![(1, -1), (2, 7)])
524 /// .collect::<Vec<_>>();
525 ///
526 /// assert!(frontier.frontier() == AntichainRef::new(&[2]));
527 /// assert!(changes == vec![(1, -1), (2, 1)]);
528 ///```
529 #[inline]
530 pub fn update_iter<I>(&mut self, updates: I) -> smallvec::Drain<'_, [(T, i64); 2]>
531 where
532 T: Clone + PartialOrder + Ord,
533 I: IntoIterator<Item = (T, i64)>,
534 {
535 // track whether a rebuild is needed.
536 let mut rebuild_required = false;
537 for (time, delta) in updates {
538 // If we do not yet require a rebuild, test whether we might require one
539 // and set the flag in that case.
540 if !rebuild_required {
541 rebuild_required = self.requires_rebuild(&time, delta);
542 }
543
544 self.updates.update(time, delta);
545 }
546
547 if rebuild_required {
548 self.rebuild()
549 }
550 self.changes.drain()
551 }
552
553 /// Tests whether applying `(time, delta)` will require a frontier rebuild.
554 ///
555 /// Factored out of [`Self::update_iter`] so it is generic only over `T` and not
556 /// the iterator type, deduplicating the inlined `frontier.iter().any(...)` bodies
557 /// across `update_iter` monomorphizations.
558 fn requires_rebuild(&self, time: &T, delta: i64) -> bool
559 where
560 T: PartialOrder,
561 {
562 // Single-pass `for` loop (instead of two `Iterator::any` calls) avoids
563 // monomorphizing `slice::Iter::any` over per-call-site closure types and
564 // traverses `self.frontier` at most once.
565 let mut beyond_frontier = false;
566 let mut before_frontier = true;
567 for f in &self.frontier {
568 if !beyond_frontier && f.less_than(time) {
569 beyond_frontier = true;
570 }
571 if before_frontier && f.less_equal(time) {
572 before_frontier = false;
573 }
574 if beyond_frontier && !before_frontier {
575 break;
576 }
577 }
578 !(beyond_frontier || (delta < 0 && before_frontier))
579 }
580
581 /// Rebuilds `self.frontier` from `self.updates`.
582 ///
583 /// This method is meant to be used for bulk updates to the frontier, and does more work than one might do
584 /// for single updates, but is meant to be an efficient way to process multiple updates together. This is
585 /// especially true when we want to apply very large numbers of updates.
586 fn rebuild(&mut self)
587 where
588 T: Clone + PartialOrder + Ord,
589 {
590 for time in self.frontier.drain(..) {
591 self.changes.update(time, -1);
592 }
593
594 // build new frontier using strictly positive times.
595 // as the times are sorted, we don't need to worry that we might displace frontier elements.
596 for time in self.updates.iter().filter(|x| x.1 > 0) {
597 if !self.frontier.iter().any(|f| f.less_equal(&time.0)) {
598 self.frontier.push(time.0.clone());
599 }
600 }
601
602 for time in self.frontier.iter() {
603 self.changes.update(time.clone(), 1);
604 }
605 }
606
607 /// Reports the count for a queried time.
608 pub fn count_for<O>(&self, query_time: &O) -> i64
609 where
610 T: PartialEq<O>,
611 {
612 self.updates
613 .unstable_internal_updates()
614 .iter()
615 .filter(|td| td.0.eq(query_time))
616 .map(|td| td.1)
617 .sum()
618 }
619
620 /// Reports the updates that form the frontier. Returns an iterator of timestamps and their frequency.
621 ///
622 /// Rebuilds the internal representation before revealing times and frequencies.
623 pub fn updates(&mut self) -> impl Iterator<Item=&(T, i64)>
624 where
625 T: Clone + PartialOrder + Ord,
626 {
627 self.rebuild();
628 self.updates.iter()
629 }
630}
631
632impl<T> Default for MutableAntichain<T> {
633 fn default() -> Self {
634 Self::new()
635 }
636}
637
638/// Extension trait for filtering time changes through antichains.
639pub trait MutableAntichainFilter<T: PartialOrder+Ord+Clone> {
640 /// Filters time changes through an antichain.
641 ///
642 /// # Examples
643 ///
644 /// ```
645 /// use timely::progress::frontier::{MutableAntichain, MutableAntichainFilter};
646 ///
647 /// let mut frontier = MutableAntichain::from_elem(1u64);
648 /// let changes =
649 /// vec![(1, -1), (2, 7)]
650 /// .filter_through(&mut frontier)
651 /// .collect::<Vec<_>>();
652 ///
653 /// assert!(changes == vec![(1, -1), (2, 1)]);
654 /// ```
655 fn filter_through(self, antichain: &mut MutableAntichain<T>) -> smallvec::Drain<'_, [(T,i64); 2]>;
656}
657
658impl<T: PartialOrder+Ord+Clone, I: IntoIterator<Item=(T,i64)>> MutableAntichainFilter<T> for I {
659 fn filter_through(self, antichain: &mut MutableAntichain<T>) -> smallvec::Drain<'_, [(T,i64); 2]> {
660 antichain.update_iter(self)
661 }
662}
663
664impl<T: PartialOrder+Ord+Clone> From<Antichain<T>> for MutableAntichain<T> {
665 fn from(antichain: Antichain<T>) -> Self {
666 let mut result = MutableAntichain::new();
667 result.update_iter(antichain.into_iter().map(|time| (time, 1)));
668 result
669 }
670}
671impl<'a, T: PartialOrder+Ord+Clone> From<AntichainRef<'a, T>> for MutableAntichain<T> {
672 fn from(antichain: AntichainRef<'a, T>) -> Self {
673 let mut result = MutableAntichain::new();
674 result.update_iter(antichain.into_iter().map(|time| (time.clone(), 1)));
675 result
676 }
677}
678
679impl<T> std::iter::FromIterator<(T, i64)> for MutableAntichain<T>
680where
681 T: Clone + PartialOrder + Ord,
682{
683 fn from_iter<I>(iterator: I) -> Self
684 where
685 I: IntoIterator<Item=(T, i64)>,
686 {
687 let mut result = Self::new();
688 result.update_iter(iterator);
689 result
690 }
691}
692
693/// A wrapper for elements of an antichain.
694#[derive(Debug)]
695pub struct AntichainRef<'a, T: 'a> {
696 /// Elements contained in the antichain.
697 frontier: &'a [T],
698}
699
700impl<'a, T: 'a> Clone for AntichainRef<'a, T> {
701 fn clone(&self) -> Self { *self }
702}
703
704impl<'a, T: 'a> Copy for AntichainRef<'a, T> { }
705
706impl<'a, T: 'a> AntichainRef<'a, T> {
707 /// Create a new `AntichainRef` from a reference to a slice of elements forming the frontier.
708 ///
709 /// This method does not check that this antichain has any particular properties, for example
710 /// that there are no elements strictly less than other elements.
711 pub fn new(frontier: &'a [T]) -> Self {
712 Self {
713 frontier,
714 }
715 }
716
717 /// Constructs an owned antichain from the antichain reference.
718 ///
719 /// # Examples
720 ///
721 ///```
722 /// use timely::progress::{Antichain, frontier::AntichainRef};
723 ///
724 /// let frontier = AntichainRef::new(&[1u64]);
725 /// assert_eq!(frontier.to_owned(), Antichain::from_elem(1u64));
726 ///```
727 pub fn to_owned(&self) -> Antichain<T> where T: Clone {
728 Antichain {
729 elements: self.frontier.into()
730 }
731 }
732}
733
734impl<T> AntichainRef<'_, T> {
735
736 /// Returns `true` if any item in the `AntichainRef` is strictly less than the argument.
737 ///
738 /// # Examples
739 ///
740 ///```
741 /// use timely::progress::frontier::AntichainRef;
742 ///
743 /// let frontier = AntichainRef::new(&[1u64]);
744 /// assert!(!frontier.less_than(&0));
745 /// assert!(!frontier.less_than(&1));
746 /// assert!(frontier.less_than(&2));
747 ///```
748 #[inline]
749 pub fn less_than<O>(&self, time: &O) -> bool where T: PartialOrder<O> {
750 self.iter().any(|x| x.less_than(time))
751 }
752
753 /// Returns `true` if any item in the `AntichainRef` is less than or equal to the argument.
754 #[inline]
755 ///
756 /// # Examples
757 ///
758 ///```
759 /// use timely::progress::frontier::AntichainRef;
760 ///
761 /// let frontier = AntichainRef::new(&[1u64]);
762 /// assert!(!frontier.less_equal(&0));
763 /// assert!(frontier.less_equal(&1));
764 /// assert!(frontier.less_equal(&2));
765 ///```
766 pub fn less_equal<O>(&self, time: &O) -> bool where T: PartialOrder<O> {
767 self.iter().any(|x| x.less_equal(time))
768 }
769}
770
771impl<T: PartialEq> PartialEq for AntichainRef<'_, T> {
772 fn eq(&self, other: &Self) -> bool {
773 // Lengths should be the same, with the option for fast acceptance if identical.
774 self.len() == other.len() &&
775 (
776 self.iter().zip(other.iter()).all(|(t1,t2)| t1 == t2) ||
777 self.iter().all(|t1| other.iter().any(|t2| t1.eq(t2)))
778 )
779 }
780}
781
782impl<T: Eq> Eq for AntichainRef<'_, T> { }
783
784impl<T: PartialOrder> PartialOrder for AntichainRef<'_, T> {
785 fn less_equal(&self, other: &Self) -> bool {
786 other.iter().all(|t2| self.iter().any(|t1| t1.less_equal(t2)))
787 }
788}
789
790impl<T: TotalOrder> TotalOrder for AntichainRef<'_, T> { }
791
792impl<T: TotalOrder> AntichainRef<'_, T> {
793 /// Return a reference to the at most one element the antichain contains.
794 pub fn as_option(&self) -> Option<&T> {
795 debug_assert!(self.len() <= 1);
796 self.frontier.last()
797 }
798}
799
800impl<T> ::std::ops::Deref for AntichainRef<'_, T> {
801 type Target = [T];
802 fn deref(&self) -> &Self::Target {
803 self.frontier
804 }
805}
806
807impl<'a, T: 'a> ::std::iter::IntoIterator for &'a AntichainRef<'a, T> {
808 type Item = &'a T;
809 type IntoIter = ::std::slice::Iter<'a, T>;
810 fn into_iter(self) -> Self::IntoIter {
811 self.iter()
812 }
813}
814
815#[cfg(test)]
816mod tests {
817 use std::collections::HashSet;
818
819 use super::*;
820
821 #[derive(PartialEq, Eq, PartialOrd, Ord, Hash)]
822 struct Elem(char, usize);
823
824 impl PartialOrder for Elem {
825 fn less_equal(&self, other: &Self) -> bool {
826 self.0 <= other.0 && self.1 <= other.1
827 }
828 }
829
830 #[test]
831 fn antichain_hash() {
832 let mut hashed = HashSet::new();
833 hashed.insert(Antichain::from(vec![Elem('a', 2), Elem('b', 1)]));
834
835 assert!(hashed.contains(&Antichain::from(vec![Elem('a', 2), Elem('b', 1)])));
836 assert!(hashed.contains(&Antichain::from(vec![Elem('b', 1), Elem('a', 2)])));
837
838 assert!(!hashed.contains(&Antichain::from(vec![Elem('a', 2)])));
839 assert!(!hashed.contains(&Antichain::from(vec![Elem('a', 1)])));
840 assert!(!hashed.contains(&Antichain::from(vec![Elem('b', 2)])));
841 assert!(!hashed.contains(&Antichain::from(vec![Elem('a', 1), Elem('b', 2)])));
842 assert!(!hashed.contains(&Antichain::from(vec![Elem('c', 3)])));
843 assert!(!hashed.contains(&Antichain::from(vec![])));
844 }
845
846 #[test]
847 fn mutable_compaction() {
848 let mut mutable = MutableAntichain::new();
849 mutable.update_iter(Some((7, 1)));
850 mutable.update_iter(Some((7, 1)));
851 mutable.update_iter(Some((7, 1)));
852 mutable.update_iter(Some((7, 1)));
853 mutable.update_iter(Some((7, 1)));
854 mutable.update_iter(Some((7, 1)));
855 mutable.update_iter(Some((8, 1)));
856 mutable.update_iter(Some((8, 1)));
857 mutable.update_iter(Some((8, 1)));
858 mutable.update_iter(Some((8, 1)));
859 mutable.update_iter(Some((8, 1)));
860 for _ in 0 .. 1000 {
861 mutable.update_iter(Some((9, 1)));
862 mutable.update_iter(Some((9, -1)));
863 }
864 assert!(mutable.updates.unstable_internal_updates().len() <= 32);
865 }
866}