differential_dataflow/trace/implementations/
mod.rs

1//! Implementations of `Trace` and associated traits.
2//!
3//! The `Trace` trait provides access to an ordered collection of `(key, val, time, diff)` tuples, but
4//! there is substantial flexibility in implementations of this trait. Depending on characteristics of
5//! the data, we may wish to represent the data in different ways. This module contains several of these
6//! implementations, and combiners for merging the results of different traces.
7//!
8//! As examples of implementations,
9//!
10//! *  The `trie` module is meant to represent general update tuples, with no particular assumptions made
11//!    about their contents. It organizes the data first by key, then by val, and then leaves the rest
12//!    in an unordered pile.
13//!
14//! *  The `keys` module is meant for collections whose value type is `()`, which is to say there is no
15//!    (key, val) structure on the records; all of them are just viewed as "keys".
16//!
17//! *  The `time` module is meant for collections with a single time value. This can remove repetition
18//!    from the representation, at the cost of requiring more instances and run-time merging.
19//!
20//! *  The `base` module is meant for collections with a single time value equivalent to the least time.
21//!    These collections must always accumulate to non-negative collections, and as such we can indicate
22//!    the frequency of an element by its multiplicity. This removes both the time and weight from the
23//!    representation, but is only appropriate for a subset (often substantial) of the data.
24//!
25//! Each of these representations is best suited for different data, but they can be combined to get the
26//! benefits of each, as appropriate. There are several `Cursor` combiners, `CursorList` and `CursorPair`,
27//! for homogeneous and inhomogeneous cursors, respectively.
28//!
29//! #Musings
30//!
31//! What is less clear is how to transfer updates between the representations at merge time in a tasteful
32//! way. Perhaps we could put an ordering on the representations, each pair with a dominant representation,
33//! and part of merging the latter filters updates into the former. Although back and forth might be
34//! appealing, more thinking is required to negotiate all of these policies.
35//!
36//! One option would be to require the layer builder to handle these smarts. Merging is currently done by
37//! the layer as part of custom code, but we could make it simply be "iterate through cursor, push results
38//! into 'ordered builder'". Then the builder would be bright enough to emit a "batch" for the composite
39//! trace, rather than just a batch of the type merged.
40
41pub mod spine_fueled;
42
43pub mod merge_batcher;
44pub mod ord_neu;
45pub mod rhh;
46pub mod huffman_container;
47pub mod chunker;
48
49// Opinionated takes on default spines.
50pub use self::ord_neu::OrdValSpine as ValSpine;
51pub use self::ord_neu::OrdValBatcher as ValBatcher;
52pub use self::ord_neu::RcOrdValBuilder as ValBuilder;
53pub use self::ord_neu::OrdKeySpine as KeySpine;
54pub use self::ord_neu::OrdKeyBatcher as KeyBatcher;
55pub use self::ord_neu::RcOrdKeyBuilder as KeyBuilder;
56
57use std::borrow::{ToOwned};
58use std::convert::TryInto;
59
60use columnation::Columnation;
61use serde::{Deserialize, Serialize};
62use timely::Container;
63use timely::container::PushInto;
64use timely::progress::Timestamp;
65
66use crate::containers::TimelyStack;
67use crate::lattice::Lattice;
68use crate::difference::Semigroup;
69
70/// A type that names constituent update types.
71pub trait Update {
72    /// Key by which data are grouped.
73    type Key: Ord + Clone + 'static;
74    /// Values associated with the key.
75    type Val: Ord + Clone + 'static;
76    /// Time at which updates occur.
77    type Time: Ord + Clone + Lattice + timely::progress::Timestamp;
78    /// Way in which updates occur.
79    type Diff: Ord + Semigroup + 'static;
80}
81
82impl<K,V,T,R> Update for ((K, V), T, R)
83where
84    K: Ord+Clone+'static,
85    V: Ord+Clone+'static,
86    T: Ord+Clone+Lattice+timely::progress::Timestamp,
87    R: Ord+Semigroup+'static,
88{
89    type Key = K;
90    type Val = V;
91    type Time = T;
92    type Diff = R;
93}
94
95/// A type with opinions on how updates should be laid out.
96pub trait Layout {
97    /// The represented update.
98    type Target: Update + ?Sized;
99    /// Container for update keys.
100    // NB: The `PushInto` constraint is only required by `rhh.rs` to push default values.
101    type KeyContainer: BatchContainer + PushInto<<Self::Target as Update>::Key>;
102    /// Container for update vals.
103    type ValContainer: BatchContainer;
104    /// Container for times.
105    type TimeContainer: BatchContainer<Owned = <Self::Target as Update>::Time> + PushInto<<Self::Target as Update>::Time>;
106    /// Container for diffs.
107    type DiffContainer: BatchContainer<Owned = <Self::Target as Update>::Diff> + PushInto<<Self::Target as Update>::Diff>;
108    /// Container for offsets.
109    type OffsetContainer: for<'a> BatchContainer<ReadItem<'a> = usize>;
110}
111
112/// A layout that uses vectors
113pub struct Vector<U: Update> {
114    phantom: std::marker::PhantomData<U>,
115}
116
117impl<U: Update> Layout for Vector<U>
118where
119    U::Diff: Ord,
120{
121    type Target = U;
122    type KeyContainer = Vec<U::Key>;
123    type ValContainer = Vec<U::Val>;
124    type TimeContainer = Vec<U::Time>;
125    type DiffContainer = Vec<U::Diff>;
126    type OffsetContainer = OffsetList;
127}
128
129/// A layout based on timely stacks
130pub struct TStack<U: Update> {
131    phantom: std::marker::PhantomData<U>,
132}
133
134impl<U: Update> Layout for TStack<U>
135where
136    U::Key: Columnation,
137    U::Val: Columnation,
138    U::Time: Columnation,
139    U::Diff: Columnation + Ord,
140{
141    type Target = U;
142    type KeyContainer = TimelyStack<U::Key>;
143    type ValContainer = TimelyStack<U::Val>;
144    type TimeContainer = TimelyStack<U::Time>;
145    type DiffContainer = TimelyStack<U::Diff>;
146    type OffsetContainer = OffsetList;
147}
148
149/// A type with a preferred container.
150///
151/// Examples include types that implement `Clone` who prefer
152pub trait PreferredContainer : ToOwned {
153    /// The preferred container for the type.
154    type Container: BatchContainer + PushInto<Self::Owned>;
155}
156
157impl<T: Ord + Clone + 'static> PreferredContainer for T {
158    type Container = Vec<T>;
159}
160
161impl<T: Ord + Clone + 'static> PreferredContainer for [T] {
162    type Container = SliceContainer<T>;
163}
164
165/// An update and layout description based on preferred containers.
166pub struct Preferred<K: ?Sized, V: ?Sized, T, D> {
167    phantom: std::marker::PhantomData<(Box<K>, Box<V>, T, D)>,
168}
169
170impl<K,V,T,R> Update for Preferred<K, V, T, R>
171where
172    K: ToOwned + ?Sized,
173    K::Owned: Ord+Clone+'static,
174    V: ToOwned + ?Sized,
175    V::Owned: Ord+Clone+'static,
176    T: Ord+Clone+Lattice+timely::progress::Timestamp,
177    R: Ord+Clone+Semigroup+'static,
178{
179    type Key = K::Owned;
180    type Val = V::Owned;
181    type Time = T;
182    type Diff = R;
183}
184
185impl<K, V, T, D> Layout for Preferred<K, V, T, D>
186where
187    K: Ord+ToOwned+PreferredContainer + ?Sized,
188    K::Owned: Ord+Clone+'static,
189    V: Ord+ToOwned+PreferredContainer + ?Sized,
190    V::Owned: Ord+Clone+'static,
191    T: Ord+Clone+Lattice+timely::progress::Timestamp,
192    D: Ord+Clone+Semigroup+'static,
193{
194    type Target = Preferred<K, V, T, D>;
195    type KeyContainer = K::Container;
196    type ValContainer = V::Container;
197    type TimeContainer = Vec<T>;
198    type DiffContainer = Vec<D>;
199    type OffsetContainer = OffsetList;
200}
201
202/// A list of unsigned integers that uses `u32` elements as long as they are small enough, and switches to `u64` once they are not.
203#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Serialize, Deserialize)]
204pub struct OffsetList {
205    /// Length of a prefix of zero elements.
206    pub zero_prefix: usize,
207    /// Offsets that fit within a `u32`.
208    pub smol: Vec<u32>,
209    /// Offsets that either do not fit in a `u32`, or are inserted after some offset that did not fit.
210    pub chonk: Vec<u64>,
211}
212
213impl std::fmt::Debug for OffsetList {
214    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
215        f.debug_list().entries(self.into_iter()).finish()
216    }
217}
218
219impl OffsetList {
220    /// Allocate a new list with a specified capacity.
221    pub fn with_capacity(cap: usize) -> Self {
222        Self {
223            zero_prefix: 0,
224            smol: Vec::with_capacity(cap),
225            chonk: Vec::new(),
226        }
227    }
228    /// Inserts the offset, as a `u32` if that is still on the table.
229    pub fn push(&mut self, offset: usize) {
230        if self.smol.is_empty() && self.chonk.is_empty() && offset == 0 {
231            self.zero_prefix += 1;
232        }
233        else if self.chonk.is_empty() {
234            if let Ok(smol) = offset.try_into() {
235                self.smol.push(smol);
236            }
237            else {
238                self.chonk.push(offset.try_into().unwrap())
239            }
240        }
241        else {
242            self.chonk.push(offset.try_into().unwrap())
243        }
244    }
245    /// Like `std::ops::Index`, which we cannot implement as it must return a `&usize`.
246    pub fn index(&self, index: usize) -> usize {
247        if index < self.zero_prefix {
248            0
249        }
250        else if index - self.zero_prefix < self.smol.len() {
251            self.smol[index - self.zero_prefix].try_into().unwrap()
252        }
253        else {
254            self.chonk[index - self.zero_prefix - self.smol.len()].try_into().unwrap()
255        }
256    }
257    /// The number of offsets in the list.
258    pub fn len(&self) -> usize {
259        self.zero_prefix + self.smol.len() + self.chonk.len()
260    }
261}
262
263impl<'a> IntoIterator for &'a OffsetList {
264    type Item = usize;
265    type IntoIter = OffsetListIter<'a>;
266
267    fn into_iter(self) -> Self::IntoIter {
268        OffsetListIter {list: self, index: 0 }
269    }
270}
271
272/// An iterator for [`OffsetList`].
273pub struct OffsetListIter<'a> {
274    list: &'a OffsetList,
275    index: usize,
276}
277
278impl<'a> Iterator for OffsetListIter<'a> {
279    type Item = usize;
280
281    fn next(&mut self) -> Option<Self::Item> {
282        if self.index < self.list.len() {
283            let res = Some(self.list.index(self.index));
284            self.index += 1;
285            res
286        } else {
287            None
288        }
289    }
290}
291
292impl PushInto<usize> for OffsetList {
293    fn push_into(&mut self, item: usize) {
294        self.push(item);
295    }
296}
297
298impl BatchContainer for OffsetList {
299    type Owned = usize;
300    type ReadItem<'a> = usize;
301
302    fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
303
304    fn with_capacity(size: usize) -> Self {
305        Self::with_capacity(size)
306    }
307
308    fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
309        Self::with_capacity(cont1.len() + cont2.len())
310    }
311
312    fn index(&self, index: usize) -> Self::ReadItem<'_> {
313        self.index(index)
314    }
315
316    fn len(&self) -> usize {
317        self.len()
318    }
319}
320
321/// Behavior to split an update into principal components.
322pub trait BuilderInput<K: BatchContainer, V: BatchContainer>: Container {
323    /// Key portion
324    type Key<'a>: Ord;
325    /// Value portion
326    type Val<'a>: Ord;
327    /// Time
328    type Time;
329    /// Diff
330    type Diff;
331
332    /// Split an item into separate parts.
333    fn into_parts<'a>(item: Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff);
334
335    /// Test that the key equals a key in the layout's key container.
336    fn key_eq(this: &Self::Key<'_>, other: K::ReadItem<'_>) -> bool;
337
338    /// Test that the value equals a key in the layout's value container.
339    fn val_eq(this: &Self::Val<'_>, other: V::ReadItem<'_>) -> bool;
340
341    /// Count the number of distinct keys, (key, val) pairs, and total updates.
342    fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize);
343}
344
345impl<K,KBC,V,VBC,T,R> BuilderInput<KBC, VBC> for Vec<((K, V), T, R)>
346where
347    K: Ord + Clone + 'static,
348    KBC: BatchContainer,
349    for<'a> KBC::ReadItem<'a>: PartialEq<&'a K>,
350    V: Ord + Clone + 'static,
351    VBC: BatchContainer,
352    for<'a> VBC::ReadItem<'a>: PartialEq<&'a V>,
353    T: Timestamp + Lattice + Clone + 'static,
354    R: Ord + Semigroup + 'static,
355{
356    type Key<'a> = K;
357    type Val<'a> = V;
358    type Time = T;
359    type Diff = R;
360
361    fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) {
362        (key, val, time, diff)
363    }
364
365    fn key_eq(this: &K, other: KBC::ReadItem<'_>) -> bool {
366        KBC::reborrow(other) == this
367    }
368
369    fn val_eq(this: &V, other: VBC::ReadItem<'_>) -> bool {
370        VBC::reborrow(other) == this
371    }
372
373    fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) {
374        let mut keys = 0;
375        let mut vals = 0;
376        let mut upds = 0;
377        let mut prev_keyval = None;
378        for link in chain.iter() {
379            for ((key, val), _, _) in link.iter() {
380                if let Some((p_key, p_val)) = prev_keyval {
381                    if p_key != key {
382                        keys += 1;
383                        vals += 1;
384                    } else if p_val != val {
385                        vals += 1;
386                    }
387                } else {
388                    keys += 1;
389                    vals += 1;
390                }
391                upds += 1;
392                prev_keyval = Some((key, val));
393            }
394        }
395        (keys, vals, upds)
396    }
397}
398
399impl<K,V,T,R> BuilderInput<K, V> for TimelyStack<((K::Owned, V::Owned), T, R)>
400where
401    K: BatchContainer,
402    for<'a> K::ReadItem<'a>: PartialEq<&'a K::Owned>,
403    K::Owned: Ord + Columnation + Clone + 'static,
404    V: BatchContainer,
405    for<'a> V::ReadItem<'a>: PartialEq<&'a V::Owned>,
406    V::Owned: Ord + Columnation + Clone + 'static,
407    T: Timestamp + Lattice + Columnation + Clone + 'static,
408    R: Ord + Clone + Semigroup + Columnation + 'static,
409{
410    type Key<'a> = &'a K::Owned;
411    type Val<'a> = &'a V::Owned;
412    type Time = T;
413    type Diff = R;
414
415    fn into_parts<'a>(((key, val), time, diff): Self::Item<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) {
416        (key, val, time.clone(), diff.clone())
417    }
418
419    fn key_eq(this: &&K::Owned, other: K::ReadItem<'_>) -> bool {
420        K::reborrow(other) == *this
421    }
422
423    fn val_eq(this: &&V::Owned, other: V::ReadItem<'_>) -> bool {
424        V::reborrow(other) == *this
425    }
426
427    fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) {
428        let mut keys = 0;
429        let mut vals = 0;
430        let mut upds = 0;
431        let mut prev_keyval = None;
432        for link in chain.iter() {
433            for ((key, val), _, _) in link.iter() {
434                if let Some((p_key, p_val)) = prev_keyval {
435                    if p_key != key {
436                        keys += 1;
437                        vals += 1;
438                    } else if p_val != val {
439                        vals += 1;
440                    }
441                } else {
442                    keys += 1;
443                    vals += 1;
444                }
445                upds += 1;
446                prev_keyval = Some((key, val));
447            }
448        }
449        (keys, vals, upds)
450    }
451}
452
453pub use self::containers::{BatchContainer, SliceContainer};
454
455/// Containers for data that resemble `Vec<T>`, with leaner implementations.
456pub mod containers {
457
458    use columnation::Columnation;
459    use timely::container::PushInto;
460
461    use crate::containers::TimelyStack;
462    use crate::IntoOwned;
463
464    /// A general-purpose container resembling `Vec<T>`.
465    pub trait BatchContainer: for<'a> PushInto<Self::ReadItem<'a>> + 'static {
466        /// An owned instance of `Self::ReadItem<'_>`.
467        type Owned;
468
469        /// The type that can be read back out of the container.
470        type ReadItem<'a>: Copy + Ord + IntoOwned<'a, Owned = Self::Owned>;
471
472        /// Push an item into this container
473        fn push<D>(&mut self, item: D) where Self: PushInto<D> {
474            self.push_into(item);
475        }
476        /// Creates a new container with sufficient capacity.
477        fn with_capacity(size: usize) -> Self;
478        /// Creates a new container with sufficient capacity.
479        fn merge_capacity(cont1: &Self, cont2: &Self) -> Self;
480
481        /// Converts a read item into one with a narrower lifetime.
482        fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b>;
483
484        /// Reference to the element at this position.
485        fn index(&self, index: usize) -> Self::ReadItem<'_>;
486        /// Number of contained elements
487        fn len(&self) -> usize;
488        /// Returns the last item if the container is non-empty.
489        fn last(&self) -> Option<Self::ReadItem<'_>> {
490            if self.len() > 0 {
491                Some(self.index(self.len()-1))
492            }
493            else {
494                None
495            }
496        }
497        /// Indicates if the length is zero.
498        fn is_empty(&self) -> bool { self.len() == 0 }
499
500        /// Reports the number of elements satisfying the predicate.
501        ///
502        /// This methods *relies strongly* on the assumption that the predicate
503        /// stays false once it becomes false, a joint property of the predicate
504        /// and the layout of `Self. This allows `advance` to use exponential search to
505        /// count the number of elements in time logarithmic in the result.
506        fn advance<F: for<'a> Fn(Self::ReadItem<'a>)->bool>(&self, start: usize, end: usize, function: F) -> usize {
507
508            let small_limit = 8;
509
510            // Exponential search if the answer isn't within `small_limit`.
511            if end > start + small_limit && function(self.index(start + small_limit)) {
512
513                // start with no advance
514                let mut index = small_limit + 1;
515                if start + index < end && function(self.index(start + index)) {
516
517                    // advance in exponentially growing steps.
518                    let mut step = 1;
519                    while start + index + step < end && function(self.index(start + index + step)) {
520                        index += step;
521                        step <<= 1;
522                    }
523
524                    // advance in exponentially shrinking steps.
525                    step >>= 1;
526                    while step > 0 {
527                        if start + index + step < end && function(self.index(start + index + step)) {
528                            index += step;
529                        }
530                        step >>= 1;
531                    }
532
533                    index += 1;
534                }
535
536                index
537            }
538            else {
539                let limit = std::cmp::min(end, start + small_limit);
540                (start .. limit).filter(|x| function(self.index(*x))).count()
541            }
542        }
543    }
544
545    // All `T: Clone` also implement `ToOwned<Owned = T>`, but without the constraint Rust
546    // struggles to understand why the owned type must be `T` (i.e. the one blanket impl).
547    impl<T: Ord + Clone + 'static> BatchContainer for Vec<T> {
548        type Owned = T;
549        type ReadItem<'a> = &'a T;
550
551        fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
552
553        fn with_capacity(size: usize) -> Self {
554            Vec::with_capacity(size)
555        }
556        fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
557            Vec::with_capacity(cont1.len() + cont2.len())
558        }
559        fn index(&self, index: usize) -> Self::ReadItem<'_> {
560            &self[index]
561        }
562        fn len(&self) -> usize {
563            self[..].len()
564        }
565    }
566
567    // The `ToOwned` requirement exists to satisfy `self.reserve_items`, who must for now
568    // be presented with the actual contained type, rather than a type that borrows into it.
569    impl<T: Clone + Ord + Columnation + 'static> BatchContainer for TimelyStack<T> {
570        type Owned = T;
571        type ReadItem<'a> = &'a T;
572
573        fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
574
575        fn with_capacity(size: usize) -> Self {
576            Self::with_capacity(size)
577        }
578        fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
579            let mut new = Self::default();
580            new.reserve_regions(std::iter::once(cont1).chain(std::iter::once(cont2)));
581            new
582        }
583        fn index(&self, index: usize) -> Self::ReadItem<'_> {
584            &self[index]
585        }
586        fn len(&self) -> usize {
587            self[..].len()
588        }
589    }
590
591    /// A container that accepts slices `[B::Item]`.
592    pub struct SliceContainer<B> {
593        /// Offsets that bound each contained slice.
594        ///
595        /// The length will be one greater than the number of contained slices,
596        /// starting with zero and ending with `self.inner.len()`.
597        offsets: Vec<usize>,
598        /// An inner container for sequences of `B` that dereferences to a slice.
599        inner: Vec<B>,
600    }
601
602    impl<B: Ord + Clone + 'static> PushInto<&[B]> for SliceContainer<B> {
603        fn push_into(&mut self, item: &[B]) {
604            for x in item.iter() {
605                self.inner.push_into(x);
606            }
607            self.offsets.push(self.inner.len());
608        }
609    }
610
611    impl<B: Ord + Clone + 'static> PushInto<&Vec<B>> for SliceContainer<B> {
612        fn push_into(&mut self, item: &Vec<B>) {
613            self.push_into(&item[..]);
614        }
615    }
616
617    impl<B> PushInto<Vec<B>> for SliceContainer<B> {
618        fn push_into(&mut self, item: Vec<B>) {
619            for x in item.into_iter() {
620                self.inner.push(x);
621            }
622            self.offsets.push(self.inner.len());
623        }
624    }
625
626    impl<B> BatchContainer for SliceContainer<B>
627    where
628        B: Ord + Clone + Sized + 'static,
629    {
630        type Owned = Vec<B>;
631        type ReadItem<'a> = &'a [B];
632
633        fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }
634
635        fn with_capacity(size: usize) -> Self {
636            let mut offsets = Vec::with_capacity(size + 1);
637            offsets.push(0);
638            Self {
639                offsets,
640                inner: Vec::with_capacity(size),
641            }
642        }
643        fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
644            let mut offsets = Vec::with_capacity(cont1.inner.len() + cont2.inner.len() + 1);
645            offsets.push(0);
646            Self {
647                offsets,
648                inner: Vec::with_capacity(cont1.inner.len() + cont2.inner.len()),
649            }
650        }
651        fn index(&self, index: usize) -> Self::ReadItem<'_> {
652            let lower = self.offsets[index];
653            let upper = self.offsets[index+1];
654            &self.inner[lower .. upper]
655        }
656        fn len(&self) -> usize {
657            self.offsets.len() - 1
658        }
659    }
660
661    /// Default implementation introduces a first offset.
662    impl<B> Default for SliceContainer<B> {
663        fn default() -> Self {
664            Self {
665                offsets: vec![0],
666                inner: Default::default(),
667            }
668        }
669    }
670}