differential_dataflow/trace/implementations/
mod.rs

1//! Implementations of `Trace` and associated traits.
2//!
3//! The `Trace` trait provides access to an ordered collection of `(key, val, time, diff)` tuples, but
4//! there is substantial flexibility in implementations of this trait. Depending on characteristics of
5//! the data, we may wish to represent the data in different ways. This module contains several of these
6//! implementations, and combiners for merging the results of different traces.
7//!
8//! As examples of implementations,
9//!
10//! *  The `trie` module is meant to represent general update tuples, with no particular assumptions made
11//!    about their contents. It organizes the data first by key, then by val, and then leaves the rest
12//!    in an unordered pile.
13//!
14//! *  The `keys` module is meant for collections whose value type is `()`, which is to say there is no
15//!    (key, val) structure on the records; all of them are just viewed as "keys".
16//!
17//! *  The `time` module is meant for collections with a single time value. This can remove repetition
18//!    from the representation, at the cost of requiring more instances and run-time merging.
19//!
20//! *  The `base` module is meant for collections with a single time value equivalent to the least time.
21//!    These collections must always accumulate to non-negative collections, and as such we can indicate
22//!    the frequency of an element by its multiplicity. This removes both the time and weight from the
23//!    representation, but is only appropriate for a subset (often substantial) of the data.
24//!
25//! Each of these representations is best suited for different data, but they can be combined to get the
26//! benefits of each, as appropriate. There are several `Cursor` combiners, `CursorList` and `CursorPair`,
27//! for homogeneous and inhomogeneous cursors, respectively.
28//!
29//! #Musings
30//!
31//! What is less clear is how to transfer updates between the representations at merge time in a tasteful
32//! way. Perhaps we could put an ordering on the representations, each pair with a dominant representation,
33//! and part of merging the latter filters updates into the former. Although back and forth might be
34//! appealing, more thinking is required to negotiate all of these policies.
35//!
36//! One option would be to require the layer builder to handle these smarts. Merging is currently done by
37//! the layer as part of custom code, but we could make it simply be "iterate through cursor, push results
38//! into 'ordered builder'". Then the builder would be bright enough to emit a "batch" for the composite
39//! trace, rather than just a batch of the type merged.
40
41pub mod spine_fueled;
42
43pub mod merge_batcher;
44pub mod ord_neu;
45pub mod rhh;
46pub mod huffman_container;
47pub mod chunker;
48
49// Opinionated takes on default spines.
50pub use self::ord_neu::OrdValSpine as ValSpine;
51pub use self::ord_neu::OrdValBatcher as ValBatcher;
52pub use self::ord_neu::RcOrdValBuilder as ValBuilder;
53pub use self::ord_neu::OrdKeySpine as KeySpine;
54pub use self::ord_neu::OrdKeyBatcher as KeyBatcher;
55pub use self::ord_neu::RcOrdKeyBuilder as KeyBuilder;
56
57use std::convert::TryInto;
58
59use columnation::Columnation;
60use serde::{Deserialize, Serialize};
61use timely::Container;
62use timely::container::PushInto;
63use timely::progress::Timestamp;
64
65use crate::containers::TimelyStack;
66use crate::lattice::Lattice;
67use crate::difference::Semigroup;
68
69/// A type that names constituent update types.
70pub trait Update {
71    /// Key by which data are grouped.
72    type Key: Ord + Clone + 'static;
73    /// Values associated with the key.
74    type Val: Ord + Clone + 'static;
75    /// Time at which updates occur.
76    type Time: Ord + Clone + Lattice + timely::progress::Timestamp;
77    /// Way in which updates occur.
78    type Diff: Ord + Semigroup + 'static;
79}
80
81impl<K,V,T,R> Update for ((K, V), T, R)
82where
83    K: Ord+Clone+'static,
84    V: Ord+Clone+'static,
85    T: Ord+Clone+Lattice+timely::progress::Timestamp,
86    R: Ord+Semigroup+'static,
87{
88    type Key = K;
89    type Val = V;
90    type Time = T;
91    type Diff = R;
92}
93
94/// A type with opinions on how updates should be laid out.
95pub trait Layout {
96    /// Container for update keys.
97    type KeyContainer: BatchContainer;
98    /// Container for update vals.
99    type ValContainer: BatchContainer;
100    /// Container for times.
101    type TimeContainer: BatchContainer<Owned: Lattice + timely::progress::Timestamp>;
102    /// Container for diffs.
103    type DiffContainer: BatchContainer<Owned: Semigroup + 'static>;
104    /// Container for offsets.
105    type OffsetContainer: for<'a> BatchContainer<ReadItem<'a> = usize>;
106}
107
108/// A type bearing a layout.
109pub trait WithLayout {
110    /// The layout.
111    type Layout: Layout;
112}
113
114/// Automatically implemented trait for types with layouts.
115pub trait LayoutExt : WithLayout<Layout: Layout<KeyContainer = Self::KeyContainer, ValContainer = Self::ValContainer, TimeContainer = Self::TimeContainer, DiffContainer = Self::DiffContainer>> {
116    /// Alias for an owned key of a layout.
117    type KeyOwn;
118    /// Alias for an borrowed key of a layout.
119    type Key<'a>: Copy + Ord;
120    /// Alias for an owned val of a layout.
121    type ValOwn: Clone + Ord;
122    /// Alias for an borrowed val of a layout.
123    type Val<'a>: Copy + Ord;
124    /// Alias for an owned time of a layout.
125    type Time: Lattice + timely::progress::Timestamp;
126    /// Alias for an borrowed time of a layout.
127    type TimeGat<'a>: Copy + Ord;
128    /// Alias for an owned diff of a layout.
129    type Diff: Semigroup + 'static;
130    /// Alias for an borrowed diff of a layout.
131    type DiffGat<'a>: Copy + Ord;
132
133    /// Container for update keys.
134    type KeyContainer: for<'a> BatchContainer<ReadItem<'a> = Self::Key<'a>, Owned = Self::KeyOwn>;
135    /// Container for update vals.
136    type ValContainer: for<'a> BatchContainer<ReadItem<'a> = Self::Val<'a>, Owned = Self::ValOwn>;
137    /// Container for times.
138    type TimeContainer: for<'a> BatchContainer<ReadItem<'a> = Self::TimeGat<'a>, Owned = Self::Time>;
139    /// Container for diffs.
140    type DiffContainer: for<'a> BatchContainer<ReadItem<'a> = Self::DiffGat<'a>, Owned = Self::Diff>;
141
142    /// Construct an owned key from a reference.
143    fn owned_key(key: Self::Key<'_>) -> Self::KeyOwn;
144    /// Construct an owned val from a reference.
145    fn owned_val(val: Self::Val<'_>) -> Self::ValOwn;
146    /// Construct an owned time from a reference.
147    fn owned_time(time: Self::TimeGat<'_>) -> Self::Time;
148    /// Construct an owned diff from a reference.
149    fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff;
150
151    /// Clones a reference time onto an owned time.
152    fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time);
153}
154
155impl<L: WithLayout> LayoutExt for L {
156    type KeyOwn = <<L::Layout as Layout>::KeyContainer as BatchContainer>::Owned;
157    type Key<'a> = <<L::Layout as Layout>::KeyContainer as BatchContainer>::ReadItem<'a>;
158    type ValOwn = <<L::Layout as Layout>::ValContainer as BatchContainer>::Owned;
159    type Val<'a> = <<L::Layout as Layout>::ValContainer as BatchContainer>::ReadItem<'a>;
160    type Time = <<L::Layout as Layout>::TimeContainer as BatchContainer>::Owned;
161    type TimeGat<'a> = <<L::Layout as Layout>::TimeContainer as BatchContainer>::ReadItem<'a>;
162    type Diff = <<L::Layout as Layout>::DiffContainer as BatchContainer>::Owned;
163    type DiffGat<'a> = <<L::Layout as Layout>::DiffContainer as BatchContainer>::ReadItem<'a>;
164
165    type KeyContainer = <L::Layout as Layout>::KeyContainer;
166    type ValContainer = <L::Layout as Layout>::ValContainer;
167    type TimeContainer = <L::Layout as Layout>::TimeContainer;
168    type DiffContainer = <L::Layout as Layout>::DiffContainer;
169
170    #[inline(always)] fn owned_key(key: Self::Key<'_>) -> Self::KeyOwn { <Self::Layout as Layout>::KeyContainer::into_owned(key) }
171    #[inline(always)] fn owned_val(val: Self::Val<'_>) -> Self::ValOwn { <Self::Layout as Layout>::ValContainer::into_owned(val) }
172    #[inline(always)] fn owned_time(time: Self::TimeGat<'_>) -> Self::Time { <Self::Layout as Layout>::TimeContainer::into_owned(time) }
173    #[inline(always)] fn owned_diff(diff: Self::DiffGat<'_>) -> Self::Diff { <Self::Layout as Layout>::DiffContainer::into_owned(diff) }
174    #[inline(always)] fn clone_time_onto(time: Self::TimeGat<'_>, onto: &mut Self::Time) { <Self::Layout as Layout>::TimeContainer::clone_onto(time, onto) }
175
176}
177
178// An easy way to provide an explicit layout: as a 5-tuple.
179// Valuable when one wants to perform layout surgery.
180impl<KC, VC, TC, DC, OC> Layout for (KC, VC, TC, DC, OC)
181where
182    KC: BatchContainer,
183    VC: BatchContainer,
184    TC: BatchContainer<Owned: Lattice + timely::progress::Timestamp>,
185    DC: BatchContainer<Owned: Semigroup>,
186    OC: for<'a> BatchContainer<ReadItem<'a> = usize>,
187{
188    type KeyContainer = KC;
189    type ValContainer = VC;
190    type TimeContainer = TC;
191    type DiffContainer = DC;
192    type OffsetContainer = OC;
193}
194
195/// Aliases for types provided by the containers within a `Layout`.
196///
197/// For clarity, prefer `use layout;` and then `layout::Key<L>` to retain the layout context.
198pub mod layout {
199    use crate::trace::implementations::{BatchContainer, Layout};
200
201    /// Alias for an owned key of a layout.
202    pub type Key<L> = <<L as Layout>::KeyContainer as BatchContainer>::Owned;
203    /// Alias for an borrowed key of a layout.
204    pub type KeyRef<'a, L> = <<L as Layout>::KeyContainer as BatchContainer>::ReadItem<'a>;
205    /// Alias for an owned val of a layout.
206    pub type Val<L> = <<L as Layout>::ValContainer as BatchContainer>::Owned;
207    /// Alias for an borrowed val of a layout.
208    pub type ValRef<'a, L> = <<L as Layout>::ValContainer as BatchContainer>::ReadItem<'a>;
209    /// Alias for an owned time of a layout.
210    pub type Time<L> = <<L as Layout>::TimeContainer as BatchContainer>::Owned;
211    /// Alias for an borrowed time of a layout.
212    pub type TimeRef<'a, L> = <<L as Layout>::TimeContainer as BatchContainer>::ReadItem<'a>;
213    /// Alias for an owned diff of a layout.
214    pub type Diff<L> = <<L as Layout>::DiffContainer as BatchContainer>::Owned;
215    /// Alias for an borrowed diff of a layout.
216    pub type DiffRef<'a, L> = <<L as Layout>::DiffContainer as BatchContainer>::ReadItem<'a>;
217}
218
219/// A layout that uses vectors
220pub struct Vector<U: Update> {
221    phantom: std::marker::PhantomData<U>,
222}
223
224impl<U: Update<Diff: Ord>> Layout for Vector<U> {
225    type KeyContainer = Vec<U::Key>;
226    type ValContainer = Vec<U::Val>;
227    type TimeContainer = Vec<U::Time>;
228    type DiffContainer = Vec<U::Diff>;
229    type OffsetContainer = OffsetList;
230}
231
232/// A layout based on timely stacks
233pub struct TStack<U: Update> {
234    phantom: std::marker::PhantomData<U>,
235}
236
237impl<U> Layout for TStack<U>
238where
239    U: Update<
240        Key: Columnation,
241        Val: Columnation,
242        Time: Columnation,
243        Diff: Columnation + Ord,
244    >,
245{
246    type KeyContainer = TimelyStack<U::Key>;
247    type ValContainer = TimelyStack<U::Val>;
248    type TimeContainer = TimelyStack<U::Time>;
249    type DiffContainer = TimelyStack<U::Diff>;
250    type OffsetContainer = OffsetList;
251}
252
253/// A list of unsigned integers that uses `u32` elements as long as they are small enough, and switches to `u64` once they are not.
254#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Serialize, Deserialize)]
255pub struct OffsetList {
256    /// Length of a prefix of zero elements.
257    pub zero_prefix: usize,
258    /// Offsets that fit within a `u32`.
259    pub smol: Vec<u32>,
260    /// Offsets that either do not fit in a `u32`, or are inserted after some offset that did not fit.
261    pub chonk: Vec<u64>,
262}
263
264impl std::fmt::Debug for OffsetList {
265    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
266        f.debug_list().entries(self.into_iter()).finish()
267    }
268}
269
270impl OffsetList {
271    /// Allocate a new list with a specified capacity.
272    pub fn with_capacity(cap: usize) -> Self {
273        Self {
274            zero_prefix: 0,
275            smol: Vec::with_capacity(cap),
276            chonk: Vec::new(),
277        }
278    }
279    /// Inserts the offset, as a `u32` if that is still on the table.
280    pub fn push(&mut self, offset: usize) {
281        if self.smol.is_empty() && self.chonk.is_empty() && offset == 0 {
282            self.zero_prefix += 1;
283        }
284        else if self.chonk.is_empty() {
285            if let Ok(smol) = offset.try_into() {
286                self.smol.push(smol);
287            }
288            else {
289                self.chonk.push(offset.try_into().unwrap())
290            }
291        }
292        else {
293            self.chonk.push(offset.try_into().unwrap())
294        }
295    }
296    /// Like `std::ops::Index`, which we cannot implement as it must return a `&usize`.
297    pub fn index(&self, index: usize) -> usize {
298        if index < self.zero_prefix {
299            0
300        }
301        else if index - self.zero_prefix < self.smol.len() {
302            self.smol[index - self.zero_prefix].try_into().unwrap()
303        }
304        else {
305            self.chonk[index - self.zero_prefix - self.smol.len()].try_into().unwrap()
306        }
307    }
308    /// The number of offsets in the list.
309    pub fn len(&self) -> usize {
310        self.zero_prefix + self.smol.len() + self.chonk.len()
311    }
312}
313
314impl<'a> IntoIterator for &'a OffsetList {
315    type Item = usize;
316    type IntoIter = OffsetListIter<'a>;
317
318    fn into_iter(self) -> Self::IntoIter {
319        OffsetListIter {list: self, index: 0 }
320    }
321}
322
323/// An iterator for [`OffsetList`].
324pub struct OffsetListIter<'a> {
325    list: &'a OffsetList,
326    index: usize,
327}
328
329impl<'a> Iterator for OffsetListIter<'a> {
330    type Item = usize;
331
332    fn next(&mut self) -> Option<Self::Item> {
333        if self.index < self.list.len() {
334            let res = Some(self.list.index(self.index));
335            self.index += 1;
336            res
337        } else {
338            None
339        }
340    }
341}
342
343impl PushInto<usize> for OffsetList {
344    fn push_into(&mut self, item: usize) {
345        self.push(item);
346    }
347}
348
349impl BatchContainer for OffsetList {
350    type Owned = usize;
351    type ReadItem<'a> = usize;
352
353    #[inline(always)]
354    fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { item }
355    #[inline(always)]
356    fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
357
358    fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) }
359    fn push_own(&mut self, item: &Self::Owned) { self.push_into(*item) }
360
361    fn clear(&mut self) { self.zero_prefix = 0; self.smol.clear(); self.chonk.clear(); }
362
363    fn with_capacity(size: usize) -> Self {
364        Self::with_capacity(size)
365    }
366
367    fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
368        Self::with_capacity(cont1.len() + cont2.len())
369    }
370
371    fn index(&self, index: usize) -> Self::ReadItem<'_> {
372        self.index(index)
373    }
374
375    fn len(&self) -> usize {
376        self.len()
377    }
378}
379
380/// Behavior to split an update into principal components.
381pub trait BuilderInput<K: BatchContainer, V: BatchContainer>: Container {
382    /// Key portion
383    type Key<'a>: Ord;
384    /// Value portion
385    type Val<'a>: Ord;
386    /// Time
387    type Time;
388    /// Diff
389    type Diff;
390
391    /// Split an item into separate parts.
392    fn into_parts<'a>(item: Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff);
393
394    /// Test that the key equals a key in the layout's key container.
395    fn key_eq(this: &Self::Key<'_>, other: K::ReadItem<'_>) -> bool;
396
397    /// Test that the value equals a key in the layout's value container.
398    fn val_eq(this: &Self::Val<'_>, other: V::ReadItem<'_>) -> bool;
399
400    /// Count the number of distinct keys, (key, val) pairs, and total updates.
401    fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize);
402}
403
404impl<K,KBC,V,VBC,T,R> BuilderInput<KBC, VBC> for Vec<((K, V), T, R)>
405where
406    K: Ord + Clone + 'static,
407    KBC: for<'a> BatchContainer<ReadItem<'a>: PartialEq<&'a K>>,
408    V: Ord + Clone + 'static,
409    VBC: for<'a> BatchContainer<ReadItem<'a>: PartialEq<&'a V>>,
410    T: Timestamp + Lattice + Clone + 'static,
411    R: Ord + Semigroup + 'static,
412{
413    type Key<'a> = K;
414    type Val<'a> = V;
415    type Time = T;
416    type Diff = R;
417
418    fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) {
419        (key, val, time, diff)
420    }
421
422    fn key_eq(this: &K, other: KBC::ReadItem<'_>) -> bool {
423        KBC::reborrow(other) == this
424    }
425
426    fn val_eq(this: &V, other: VBC::ReadItem<'_>) -> bool {
427        VBC::reborrow(other) == this
428    }
429
430    fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) {
431        let mut keys = 0;
432        let mut vals = 0;
433        let mut upds = 0;
434        let mut prev_keyval = None;
435        for link in chain.iter() {
436            for ((key, val), _, _) in link.iter() {
437                if let Some((p_key, p_val)) = prev_keyval {
438                    if p_key != key {
439                        keys += 1;
440                        vals += 1;
441                    } else if p_val != val {
442                        vals += 1;
443                    }
444                } else {
445                    keys += 1;
446                    vals += 1;
447                }
448                upds += 1;
449                prev_keyval = Some((key, val));
450            }
451        }
452        (keys, vals, upds)
453    }
454}
455
456impl<K,V,T,R> BuilderInput<K, V> for TimelyStack<((K::Owned, V::Owned), T, R)>
457where
458    K: for<'a> BatchContainer<
459        ReadItem<'a>: PartialEq<&'a K::Owned>,
460        Owned: Ord + Columnation + Clone + 'static,
461    >,
462    V: for<'a> BatchContainer<
463        ReadItem<'a>: PartialEq<&'a V::Owned>,
464        Owned: Ord + Columnation + Clone + 'static,
465    >,
466    T: Timestamp + Lattice + Columnation + Clone + 'static,
467    R: Ord + Clone + Semigroup + Columnation + 'static,
468{
469    type Key<'a> = &'a K::Owned;
470    type Val<'a> = &'a V::Owned;
471    type Time = T;
472    type Diff = R;
473
474    fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) {
475        (key, val, time.clone(), diff.clone())
476    }
477
478    fn key_eq(this: &&K::Owned, other: K::ReadItem<'_>) -> bool {
479        K::reborrow(other) == *this
480    }
481
482    fn val_eq(this: &&V::Owned, other: V::ReadItem<'_>) -> bool {
483        V::reborrow(other) == *this
484    }
485
486    fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) {
487        let mut keys = 0;
488        let mut vals = 0;
489        let mut upds = 0;
490        let mut prev_keyval = None;
491        for link in chain.iter() {
492            for ((key, val), _, _) in link.iter() {
493                if let Some((p_key, p_val)) = prev_keyval {
494                    if p_key != key {
495                        keys += 1;
496                        vals += 1;
497                    } else if p_val != val {
498                        vals += 1;
499                    }
500                } else {
501                    keys += 1;
502                    vals += 1;
503                }
504                upds += 1;
505                prev_keyval = Some((key, val));
506            }
507        }
508        (keys, vals, upds)
509    }
510}
511
512pub use self::containers::{BatchContainer, SliceContainer};
513
514/// Containers for data that resemble `Vec<T>`, with leaner implementations.
515pub mod containers {
516
517    use columnation::Columnation;
518    use timely::container::PushInto;
519
520    use crate::containers::TimelyStack;
521
522    /// A general-purpose container resembling `Vec<T>`.
523    pub trait BatchContainer: 'static {
524        /// An owned instance of `Self::ReadItem<'_>`.
525        type Owned: Clone + Ord;
526
527        /// The type that can be read back out of the container.
528        type ReadItem<'a>: Copy + Ord;
529
530
531        /// Conversion from an instance of this type to the owned type.
532        #[must_use]
533        fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned;
534        /// Clones `self` onto an existing instance of the owned type.
535        #[inline(always)]
536        fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) {
537            *other = Self::into_owned(item);
538        }
539
540        /// Push an item into this container
541        fn push_ref(&mut self, item: Self::ReadItem<'_>);
542        /// Push an item into this container
543        fn push_own(&mut self, item: &Self::Owned);
544
545        /// Clears the container. May not release resources.
546        fn clear(&mut self);
547
548        /// Creates a new container with sufficient capacity.
549        fn with_capacity(size: usize) -> Self;
550        /// Creates a new container with sufficient capacity.
551        fn merge_capacity(cont1: &Self, cont2: &Self) -> Self;
552
553        /// Converts a read item into one with a narrower lifetime.
554        fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b>;
555
556        /// Reference to the element at this position.
557        fn index(&self, index: usize) -> Self::ReadItem<'_>;
558
559        /// Reference to the element at this position, if it exists.
560        fn get(&self, index: usize) -> Option<Self::ReadItem<'_>> {
561            if index < self.len() {
562                Some(self.index(index))
563            }
564            else { None }
565        }
566
567        /// Number of contained elements
568        fn len(&self) -> usize;
569        /// Returns the last item if the container is non-empty.
570        fn last(&self) -> Option<Self::ReadItem<'_>> {
571            if self.len() > 0 {
572                Some(self.index(self.len()-1))
573            }
574            else {
575                None
576            }
577        }
578        /// Indicates if the length is zero.
579        fn is_empty(&self) -> bool { self.len() == 0 }
580
581        /// Reports the number of elements satisfying the predicate.
582        ///
583        /// This methods *relies strongly* on the assumption that the predicate
584        /// stays false once it becomes false, a joint property of the predicate
585        /// and the layout of `Self. This allows `advance` to use exponential search to
586        /// count the number of elements in time logarithmic in the result.
587        fn advance<F: for<'a> Fn(Self::ReadItem<'a>)->bool>(&self, start: usize, end: usize, function: F) -> usize {
588
589            let small_limit = 8;
590
591            // Exponential search if the answer isn't within `small_limit`.
592            if end > start + small_limit && function(self.index(start + small_limit)) {
593
594                // start with no advance
595                let mut index = small_limit + 1;
596                if start + index < end && function(self.index(start + index)) {
597
598                    // advance in exponentially growing steps.
599                    let mut step = 1;
600                    while start + index + step < end && function(self.index(start + index + step)) {
601                        index += step;
602                        step <<= 1;
603                    }
604
605                    // advance in exponentially shrinking steps.
606                    step >>= 1;
607                    while step > 0 {
608                        if start + index + step < end && function(self.index(start + index + step)) {
609                            index += step;
610                        }
611                        step >>= 1;
612                    }
613
614                    index += 1;
615                }
616
617                index
618            }
619            else {
620                let limit = std::cmp::min(end, start + small_limit);
621                (start .. limit).filter(|x| function(self.index(*x))).count()
622            }
623        }
624    }
625
626    // All `T: Clone` also implement `ToOwned<Owned = T>`, but without the constraint Rust
627    // struggles to understand why the owned type must be `T` (i.e. the one blanket impl).
628    impl<T: Ord + Clone + 'static> BatchContainer for Vec<T> {
629        type Owned = T;
630        type ReadItem<'a> = &'a T;
631
632        #[inline(always)] fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { item.clone() }
633        #[inline(always)] fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) { other.clone_from(item); }
634
635        fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
636
637        fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) }
638        fn push_own(&mut self, item: &Self::Owned) { self.push_into(item.clone()) }
639
640        fn clear(&mut self) { self.clear() }
641
642        fn with_capacity(size: usize) -> Self {
643            Vec::with_capacity(size)
644        }
645        fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
646            Vec::with_capacity(cont1.len() + cont2.len())
647        }
648        fn index(&self, index: usize) -> Self::ReadItem<'_> {
649            &self[index]
650        }
651        fn get(&self, index: usize) -> Option<Self::ReadItem<'_>> {
652            <[T]>::get(&self, index)
653        }
654        fn len(&self) -> usize {
655            self[..].len()
656        }
657    }
658
659    // The `ToOwned` requirement exists to satisfy `self.reserve_items`, who must for now
660    // be presented with the actual contained type, rather than a type that borrows into it.
661    impl<T: Clone + Ord + Columnation + 'static> BatchContainer for TimelyStack<T> {
662        type Owned = T;
663        type ReadItem<'a> = &'a T;
664
665        #[inline(always)] fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { item.clone() }
666        #[inline(always)] fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) { other.clone_from(item); }
667
668        fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
669
670        fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) }
671        fn push_own(&mut self, item: &Self::Owned) { self.push_into(item) }
672
673        fn clear(&mut self) { self.clear() }
674
675        fn with_capacity(size: usize) -> Self {
676            Self::with_capacity(size)
677        }
678        fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
679            let mut new = Self::default();
680            new.reserve_regions(std::iter::once(cont1).chain(std::iter::once(cont2)));
681            new
682        }
683        fn index(&self, index: usize) -> Self::ReadItem<'_> {
684            &self[index]
685        }
686        fn len(&self) -> usize {
687            self[..].len()
688        }
689    }
690
691    /// A container that accepts slices `[B::Item]`.
692    pub struct SliceContainer<B> {
693        /// Offsets that bound each contained slice.
694        ///
695        /// The length will be one greater than the number of contained slices,
696        /// starting with zero and ending with `self.inner.len()`.
697        offsets: Vec<usize>,
698        /// An inner container for sequences of `B` that dereferences to a slice.
699        inner: Vec<B>,
700    }
701
702    impl<B: Ord + Clone + 'static> PushInto<&[B]> for SliceContainer<B> {
703        fn push_into(&mut self, item: &[B]) {
704            for x in item.iter() {
705                self.inner.push_into(x);
706            }
707            self.offsets.push(self.inner.len());
708        }
709    }
710
711    impl<B: Ord + Clone + 'static> PushInto<&Vec<B>> for SliceContainer<B> {
712        fn push_into(&mut self, item: &Vec<B>) {
713            self.push_into(&item[..]);
714        }
715    }
716
717    impl<B> BatchContainer for SliceContainer<B>
718    where
719        B: Ord + Clone + Sized + 'static,
720    {
721        type Owned = Vec<B>;
722        type ReadItem<'a> = &'a [B];
723
724        #[inline(always)] fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned { item.to_vec() }
725        #[inline(always)] fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) { other.clone_from_slice(item); }
726
727        fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
728
729        fn push_ref(&mut self, item: Self::ReadItem<'_>) { self.push_into(item) }
730        fn push_own(&mut self, item: &Self::Owned) { self.push_into(item) }
731
732        fn clear(&mut self) {
733            self.offsets.clear();
734            self.offsets.push(0);
735            self.inner.clear();
736        }
737
738        fn with_capacity(size: usize) -> Self {
739            let mut offsets = Vec::with_capacity(size + 1);
740            offsets.push(0);
741            Self {
742                offsets,
743                inner: Vec::with_capacity(size),
744            }
745        }
746        fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
747            let mut offsets = Vec::with_capacity(cont1.inner.len() + cont2.inner.len() + 1);
748            offsets.push(0);
749            Self {
750                offsets,
751                inner: Vec::with_capacity(cont1.inner.len() + cont2.inner.len()),
752            }
753        }
754        fn index(&self, index: usize) -> Self::ReadItem<'_> {
755            let lower = self.offsets[index];
756            let upper = self.offsets[index+1];
757            &self.inner[lower .. upper]
758        }
759        fn len(&self) -> usize {
760            self.offsets.len() - 1
761        }
762    }
763
764    /// Default implementation introduces a first offset.
765    impl<B> Default for SliceContainer<B> {
766        fn default() -> Self {
767            Self {
768                offsets: vec![0],
769                inner: Default::default(),
770            }
771        }
772    }
773}