differential_dataflow/trace/implementations/
rhh.rs

1//! Batch implementation based on Robin Hood Hashing.
2//! 
3//! Items are ordered by `(hash(Key), Key)` rather than `Key`, which means
4//! that these implementations should only be used with each other, under 
5//! the same `hash` function, or for types that also order by `(hash(X), X)`,
6//! for example wrapped types that implement `Ord` that way.
7
8use std::rc::Rc;
9use std::cmp::Ordering;
10
11use serde::{Deserialize, Serialize};
12
13use crate::Hashable;
14use crate::containers::TimelyStack;
15use crate::trace::implementations::chunker::{ColumnationChunker, VecChunker};
16use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger, ColMerger};
17use crate::trace::implementations::spine_fueled::Spine;
18use crate::trace::rc_blanket_impls::RcBuilder;
19
20use super::{Update, Layout, Vector, TStack};
21
22use self::val_batch::{RhhValBatch, RhhValBuilder};
23
24/// A trace implementation using a spine of ordered lists.
25pub type VecSpine<K, V, T, R> = Spine<Rc<RhhValBatch<Vector<((K,V),T,R)>>>>;
26/// A batcher for ordered lists.
27pub type VecBatcher<K,V,T,R> = MergeBatcher<Vec<((K,V),T,R)>, VecChunker<((K,V),T,R)>, VecMerger<(K, V), T, R>>;
28/// A builder for ordered lists.
29pub type VecBuilder<K,V,T,R> = RcBuilder<RhhValBuilder<Vector<((K,V),T,R)>, Vec<((K,V),T,R)>>>;
30
31// /// A trace implementation for empty values using a spine of ordered lists.
32// pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
33
34/// A trace implementation backed by columnar storage.
35pub type ColSpine<K, V, T, R> = Spine<Rc<RhhValBatch<TStack<((K,V),T,R)>>>>;
36/// A batcher for columnar storage.
37pub type ColBatcher<K,V,T,R> = MergeBatcher<Vec<((K,V),T,R)>, ColumnationChunker<((K,V),T,R)>, ColMerger<(K,V),T,R>>;
38/// A builder for columnar storage.
39pub type ColBuilder<K,V,T,R> = RcBuilder<RhhValBuilder<TStack<((K,V),T,R)>, TimelyStack<((K,V),T,R)>>>;
40
41// /// A trace implementation backed by columnar storage.
42// pub type ColKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<TStack<((K,()),T,R)>>>>;
43
44/// A carrier trait indicating that the type's `Ord` and `PartialOrd` implementations are by `Hashable::hashed()`.
45pub trait HashOrdered: Hashable { }
46
47impl<'a, T: std::hash::Hash + HashOrdered> HashOrdered for &'a T { }
48
49/// A hash-ordered wrapper that modifies `Ord` and `PartialOrd`.
50#[derive(Copy, Clone, Eq, PartialEq, Debug, Default, Serialize, Deserialize)]
51pub struct HashWrapper<T: std::hash::Hash + Hashable> {
52    /// The inner value, freely modifiable.
53    pub inner: T
54}
55
56impl<T: PartialOrd + std::hash::Hash + Hashable<Output: PartialOrd>> PartialOrd for HashWrapper<T> {
57    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
58        let this_hash = self.inner.hashed();
59        let that_hash = other.inner.hashed();
60        (this_hash, &self.inner).partial_cmp(&(that_hash, &other.inner))
61    }
62}
63
64impl<T: Ord + PartialOrd + std::hash::Hash + Hashable<Output: PartialOrd>> Ord for HashWrapper<T> {
65    fn cmp(&self, other: &Self) -> Ordering {
66        self.partial_cmp(other).unwrap()
67    }
68}
69
70impl<T: std::hash::Hash + Hashable> HashOrdered for HashWrapper<T> { }
71
72impl<T: std::hash::Hash + Hashable> Hashable for HashWrapper<T> {
73    type Output = T::Output;
74    fn hashed(&self) -> Self::Output { self.inner.hashed() }
75}
76
77impl<T: std::hash::Hash + Hashable> HashOrdered for &HashWrapper<T> { }
78
79impl<T: std::hash::Hash + Hashable> Hashable for &HashWrapper<T> {
80    type Output = T::Output;
81    fn hashed(&self) -> Self::Output { self.inner.hashed() }
82}
83
84mod val_batch {
85
86    use std::convert::TryInto;
87    use std::marker::PhantomData;
88    use serde::{Deserialize, Serialize};
89    use timely::container::PushInto;
90    use timely::progress::{Antichain, frontier::AntichainRef};
91
92    use crate::hashable::Hashable;
93    use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
94    use crate::trace::implementations::{BatchContainer, BuilderInput};
95    use crate::IntoOwned;
96
97    use super::{Layout, Update, HashOrdered};
98
99    /// Update tuples organized as a Robin Hood Hash map, ordered by `(hash(Key), Key, Val, Time)`.
100    ///
101    /// Specifically, this means that we attempt to place any `Key` at `alloc_len * (hash(Key) / 2^64)`, 
102    /// and spill onward if the slot is occupied. The cleverness of RHH is that you may instead evict 
103    /// someone else, in order to maintain the ordering up above. In fact, that is basically the rule: 
104    /// when there is a conflict, evict the greater of the two and attempt to place it in the next slot.
105    ///
106    /// This RHH implementation uses a repeated `keys_offs` offset to indicate an absent element, as all
107    /// keys for valid updates must have some associated values with updates. This is the same type of 
108    /// optimization made for repeated updates, and it rules out (here) using that trick for repeated values.
109    ///
110    /// We will use the `Hashable` trait here, but any consistent hash function should work out ok. 
111    /// We specifically want to use the highest bits of the result (we will) because the low bits have
112    /// likely been spent shuffling the data between workers (by key), and are likely low entropy.
113    #[derive(Debug, Serialize, Deserialize)]
114    pub struct RhhValStorage<L: Layout> 
115    where 
116        <L::Target as Update>::Key: Default + HashOrdered,
117    {
118
119        /// The requested capacity for `keys`. We use this when determining where a key with a certain hash
120        /// would most like to end up. The `BatchContainer` trait does not provide a `capacity()` method,
121        /// otherwise we would just use that.
122        pub key_capacity: usize,
123        /// A number large enough that when it divides any `u64` the result is at most `self.key_capacity`.
124        /// When that capacity is zero or one, this is set to zero instead.
125        pub divisor: u64,
126        /// The number of present keys, distinct from `keys.len()` which contains 
127        pub key_count: usize,
128
129        /// An ordered list of keys, corresponding to entries in `keys_offs`.
130        pub keys: L::KeyContainer,
131        /// Offsets used to provide indexes from keys to values.
132        ///
133        /// The length of this list is one longer than `keys`, so that we can avoid bounds logic.
134        pub keys_offs: L::OffsetContainer,
135        /// Concatenated ordered lists of values, bracketed by offsets in `keys_offs`.
136        pub vals: L::ValContainer,
137        /// Offsets used to provide indexes from values to updates.
138        ///
139        /// This list has a special representation that any empty range indicates the singleton
140        /// element just before the range, as if the start were decremented by one. The empty
141        /// range is otherwise an invalid representation, and we borrow it to compactly encode
142        /// single common update values (e.g. in a snapshot, the minimal time and a diff of one).
143        ///
144        /// The length of this list is one longer than `vals`, so that we can avoid bounds logic.
145        pub vals_offs: L::OffsetContainer,
146        /// Concatenated ordered lists of update times, bracketed by offsets in `vals_offs`.
147        pub times: L::TimeContainer,
148        /// Concatenated ordered lists of update diffs, bracketed by offsets in `vals_offs`.
149        pub diffs: L::DiffContainer,
150    }
151
152    impl<L: Layout> RhhValStorage<L>
153    where 
154        <L::Target as Update>::Key: Default + HashOrdered,
155        for<'a> <L::KeyContainer as BatchContainer>::ReadItem<'a>: HashOrdered,
156    {
157        /// Lower and upper bounds in `self.vals` corresponding to the key at `index`.
158        fn values_for_key(&self, index: usize) -> (usize, usize) {
159            let lower = self.keys_offs.index(index);
160            let upper = self.keys_offs.index(index+1);
161            // Looking up values for an invalid key indicates something is wrong.
162            assert!(lower < upper, "{:?} v {:?} at {:?}", lower, upper, index);
163            (lower, upper)
164        }
165        /// Lower and upper bounds in `self.updates` corresponding to the value at `index`.
166        fn updates_for_value(&self, index: usize) -> (usize, usize) {
167            let mut lower = self.vals_offs.index(index);
168            let upper = self.vals_offs.index(index+1);
169            // We use equal lower and upper to encode "singleton update; just before here".
170            // It should only apply when there is a prior element, so `lower` should be greater than zero.
171            if lower == upper {
172                assert!(lower > 0);
173                lower -= 1;
174            }
175            (lower, upper)
176        }
177
178        /// Inserts the key at its desired location, or nearby.
179        ///
180        /// Because there may be collisions, they key may be placed just after its desired location.
181        /// If necessary, this method will introduce default keys and copy the offsets to create space
182        /// after which to insert the key. These will be indicated by `None` entries in the `hash` vector.
183        ///
184        /// If `offset` is specified, we will insert it at the appropriate location. If it is not specified,
185        /// we leave `keys_offs` ready to receive it as the next `push`. This is so that builders that may
186        /// not know the final offset at the moment of key insertion can prepare for receiving the offset.
187        fn insert_key(&mut self, key: <L::KeyContainer as BatchContainer>::ReadItem<'_>, offset: Option<usize>) {
188            let desired = self.desired_location(&key);
189            // Were we to push the key now, it would be at `self.keys.len()`, so while that is wrong, 
190            // push additional blank entries in.
191            while self.keys.len() < desired {
192                // We insert a default (dummy) key and repeat the offset to indicate this.
193                let current_offset = self.keys_offs.index(self.keys.len());
194                self.keys.push(<<L::Target as Update>::Key as Default>::default());
195                self.keys_offs.push(current_offset);
196            }
197
198            // Now we insert the key. Even if it is no longer the desired location because of contention.
199            // If an offset has been supplied we insert it, and otherwise leave it for future determination.
200            self.keys.push(key);
201            if let Some(offset) = offset {
202                self.keys_offs.push(offset);
203            }
204            self.key_count += 1;
205        }
206
207        /// Indicates both the desired location and the hash signature of the key.
208        fn desired_location<K: Hashable>(&self, key: &K) -> usize {
209            if self.divisor == 0 { 0 }
210            else {
211                (key.hashed().into() / self.divisor).try_into().expect("divisor not large enough to force u64 into uisze")
212            }
213        }
214
215        /// Returns true if one should advance one's index in the search for `key`.
216        fn advance_key(&self, index: usize, key: <L::KeyContainer as BatchContainer>::ReadItem<'_>) -> bool {
217            // Ideally this short-circuits, as `self.keys[index]` is bogus data.
218            !self.live_key(index) || self.keys.index(index).lt(&<L::KeyContainer as BatchContainer>::reborrow(key))
219        }
220
221        /// Indicates that a key is valid, rather than dead space, by looking for a valid offset range.
222        fn live_key(&self, index: usize) -> bool {
223            self.keys_offs.index(index) != self.keys_offs.index(index+1)
224        }
225
226        /// Advances `index` until it references a live key, or is `keys.len()`.
227        fn advance_to_live_key(&self, index: &mut usize) {
228            while *index < self.keys.len() && !self.live_key(*index) {
229                *index += 1;
230            }
231        }
232
233        /// A value large enough that any `u64` divided by it is less than `capacity`.
234        ///
235        /// This is `2^64 / capacity`, except in the cases where `capacity` is zero or one.
236        /// In those cases, we'll return `0` to communicate the exception, for which we should
237        /// just return `0` when announcing a target location (and a zero capacity that we insert
238        /// into becomes a bug).
239        fn divisor_for_capacity(capacity: usize) -> u64 {
240            let capacity: u64 = capacity.try_into().expect("usize exceeds u64");
241            if capacity == 0 || capacity == 1 { 0 }
242            else {
243                ((1 << 63) / capacity) << 1
244            }
245        }
246    }
247
248    /// An immutable collection of update tuples, from a contiguous interval of logical times.
249    ///
250    /// The `L` parameter captures how the updates should be laid out, and `C` determines which
251    /// merge batcher to select.
252    #[derive(Serialize, Deserialize)]
253    #[serde(bound = "
254        L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
255        L::ValContainer: Serialize + for<'a> Deserialize<'a>,
256        L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
257        L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
258        L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
259    ")]
260    pub struct RhhValBatch<L: Layout>
261    where 
262        <L::Target as Update>::Key: Default + HashOrdered,
263    {
264        /// The updates themselves.
265        pub storage: RhhValStorage<L>,
266        /// Description of the update times this layer represents.
267        pub description: Description<<L::Target as Update>::Time>,
268        /// The number of updates reflected in the batch.
269        ///
270        /// We track this separately from `storage` because due to the singleton optimization,
271        /// we may have many more updates than `storage.updates.len()`. It should equal that 
272        /// length, plus the number of singleton optimizations employed.
273        pub updates: usize,
274    }
275
276    impl<L: Layout> BatchReader for RhhValBatch<L> 
277    where 
278        <L::Target as Update>::Key: Default + HashOrdered,
279        for<'a> <L::KeyContainer as BatchContainer>::ReadItem<'a>: HashOrdered,
280    {
281        type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
282        type Val<'a> = <L::ValContainer as BatchContainer>::ReadItem<'a>;
283        type Time = <L::Target as Update>::Time;
284        type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
285        type Diff = <L::Target as Update>::Diff;
286        type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;
287
288        type Cursor = RhhValCursor<L>;
289        fn cursor(&self) -> Self::Cursor { 
290            let mut cursor = RhhValCursor {
291                key_cursor: 0,
292                val_cursor: 0,
293                phantom: std::marker::PhantomData,
294            };
295            cursor.step_key(self);
296            cursor
297        }
298        fn len(&self) -> usize { 
299            // Normally this would be `self.updates.len()`, but we have a clever compact encoding.
300            // Perhaps we should count such exceptions to the side, to provide a correct accounting.
301            self.updates
302        }
303        fn description(&self) -> &Description<<L::Target as Update>::Time> { &self.description }
304    }
305
306    impl<L: Layout> Batch for RhhValBatch<L> 
307    where 
308        <L::Target as Update>::Key: Default + HashOrdered,
309        for<'a> <L::KeyContainer as BatchContainer>::ReadItem<'a>: HashOrdered,
310    {
311        type Merger = RhhValMerger<L>;
312
313        fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self::Merger {
314            RhhValMerger::new(self, other, compaction_frontier)
315        }
316
317        fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
318            use timely::progress::Timestamp;
319            Self {
320                storage: RhhValStorage {
321                    keys: L::KeyContainer::with_capacity(0),
322                    keys_offs: L::OffsetContainer::with_capacity(0),
323                    vals: L::ValContainer::with_capacity(0),
324                    vals_offs: L::OffsetContainer::with_capacity(0),
325                    times: L::TimeContainer::with_capacity(0),
326                    diffs: L::DiffContainer::with_capacity(0),
327                    key_count: 0,
328                    key_capacity: 0,
329                    divisor: 0,
330                },
331                description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())),
332                updates: 0,
333            }
334        }
335    }
336
337    /// State for an in-progress merge.
338    pub struct RhhValMerger<L: Layout> 
339    where 
340        <L::Target as Update>::Key: Default + HashOrdered,
341    {
342        /// Key position to merge next in the first batch.
343        key_cursor1: usize,
344        /// Key position to merge next in the second batch.
345        key_cursor2: usize,
346        /// result that we are currently assembling.
347        result: RhhValStorage<L>,
348        /// description
349        description: Description<<L::Target as Update>::Time>,
350
351        /// Local stash of updates, to use for consolidation.
352        ///
353        /// We could emulate a `ChangeBatch` here, with related compaction smarts.
354        /// A `ChangeBatch` itself needs an `i64` diff type, which we have not.
355        update_stash: Vec<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
356        /// Counts the number of singleton-optimized entries, that we may correctly count the updates.
357        singletons: usize,
358    }
359
360    impl<L: Layout> Merger<RhhValBatch<L>> for RhhValMerger<L>
361    where
362        <L::Target as Update>::Key: Default + HashOrdered,
363        RhhValBatch<L>: Batch<Time=<L::Target as Update>::Time>,
364        for<'a> <L::KeyContainer as BatchContainer>::ReadItem<'a>: HashOrdered,
365    {
366        fn new(batch1: &RhhValBatch<L>, batch2: &RhhValBatch<L>, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self {
367
368            assert!(batch1.upper() == batch2.lower());
369            use crate::lattice::Lattice;
370            let mut since = batch1.description().since().join(batch2.description().since());
371            since = since.join(&compaction_frontier.to_owned());
372
373            let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
374
375            // This is a massive overestimate on the number of keys, but we don't have better information.
376            // An over-estimate can be a massive problem as well, with sparse regions being hard to cross.
377            let max_cap = batch1.len() + batch2.len();
378            let rhh_cap = 2 * max_cap;
379
380            let batch1 = &batch1.storage;
381            let batch2 = &batch2.storage;
382
383            let mut storage = RhhValStorage {
384                keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
385                keys_offs: L::OffsetContainer::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()),
386                vals: L::ValContainer::merge_capacity(&batch1.vals, &batch2.vals),
387                vals_offs: L::OffsetContainer::with_capacity(batch1.vals_offs.len() + batch2.vals_offs.len()),
388                times: L::TimeContainer::merge_capacity(&batch1.times, &batch2.times),
389                diffs: L::DiffContainer::merge_capacity(&batch1.diffs, &batch2.diffs),
390                key_count: 0,
391                key_capacity: rhh_cap,
392                divisor: RhhValStorage::<L>::divisor_for_capacity(rhh_cap),
393            };
394
395            // Mark explicit types because type inference fails to resolve it.
396            let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs;
397            keys_offs.push(0);
398            let vals_offs: &mut L::OffsetContainer = &mut storage.vals_offs;
399            vals_offs.push(0);
400
401            RhhValMerger {
402                key_cursor1: 0,
403                key_cursor2: 0,
404                result: storage,
405                description,
406                update_stash: Vec::new(),
407                singletons: 0,
408            }
409        }
410        fn done(self) -> RhhValBatch<L> {
411            RhhValBatch {
412                updates: self.result.times.len() + self.singletons,
413                storage: self.result,
414                description: self.description,
415            }
416        }
417        fn work(&mut self, source1: &RhhValBatch<L>, source2: &RhhValBatch<L>, fuel: &mut isize) {
418
419            // An (incomplete) indication of the amount of work we've done so far.
420            let starting_updates = self.result.times.len();
421            let mut effort = 0isize;
422
423            source1.storage.advance_to_live_key(&mut self.key_cursor1);
424            source2.storage.advance_to_live_key(&mut self.key_cursor2);
425
426            // While both mergees are still active, perform single-key merges.
427            while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
428                self.merge_key(&source1.storage, &source2.storage);
429                source1.storage.advance_to_live_key(&mut self.key_cursor1);
430                source2.storage.advance_to_live_key(&mut self.key_cursor2);
431                    // An (incomplete) accounting of the work we've done.
432                effort = (self.result.times.len() - starting_updates) as isize;
433            }
434
435            // Merging is complete, and only copying remains. 
436            // Key-by-key copying allows effort interruption, and compaction.
437            while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
438                self.copy_key(&source1.storage, self.key_cursor1);
439                self.key_cursor1 += 1;
440                source1.storage.advance_to_live_key(&mut self.key_cursor1);
441                effort = (self.result.times.len() - starting_updates) as isize;
442            }
443            while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
444                self.copy_key(&source2.storage, self.key_cursor2);
445                self.key_cursor2 += 1;
446                source2.storage.advance_to_live_key(&mut self.key_cursor2);
447                effort = (self.result.times.len() - starting_updates) as isize;
448            }
449
450            *fuel -= effort;
451        }
452    }
453
454    // Helper methods in support of merging batches.
455    impl<L: Layout> RhhValMerger<L> 
456    where 
457        <L::Target as Update>::Key: Default + HashOrdered,
458        for<'a> <L::KeyContainer as BatchContainer>::ReadItem<'a>: HashOrdered,
459    {
460        /// Copy the next key in `source`.
461        ///
462        /// The method extracts the key in `source` at `cursor`, and merges it in to `self`.
463        /// If the result does not wholly cancel, they key will be present in `self` with the
464        /// compacted values and updates. 
465        /// 
466        /// The caller should be certain to update the cursor, as this method does not do this.
467        fn copy_key(&mut self, source: &RhhValStorage<L>, cursor: usize) {
468            // Capture the initial number of values to determine if the merge was ultimately non-empty.
469            let init_vals = self.result.vals.len();
470            let (mut lower, upper) = source.values_for_key(cursor);
471            while lower < upper {
472                self.stash_updates_for_val(source, lower);
473                if let Some(off) = self.consolidate_updates() {
474                    self.result.vals_offs.push(off);
475                    self.result.vals.push(source.vals.index(lower));
476                }
477                lower += 1;
478            }            
479
480            // If we have pushed any values, copy the key as well.
481            if self.result.vals.len() > init_vals {
482                self.result.insert_key(source.keys.index(cursor), Some(self.result.vals.len()));
483            }           
484        }
485        /// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors.
486        ///
487        /// This method only merges a single key. It applies all compaction necessary, and may result in no output
488        /// if the updates cancel either directly or after compaction.
489        fn merge_key(&mut self, source1: &RhhValStorage<L>, source2: &RhhValStorage<L>) {
490
491            use ::std::cmp::Ordering;
492            match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
493                Ordering::Less => { 
494                    self.copy_key(source1, self.key_cursor1);
495                    self.key_cursor1 += 1;
496                },
497                Ordering::Equal => {
498                    // Keys are equal; must merge all values from both sources for this one key.
499                    let (lower1, upper1) = source1.values_for_key(self.key_cursor1);
500                    let (lower2, upper2) = source2.values_for_key(self.key_cursor2);
501                    if let Some(off) = self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2)) {
502                        self.result.insert_key(source1.keys.index(self.key_cursor1), Some(off));
503                    }
504                    // Increment cursors in either case; the keys are merged.
505                    self.key_cursor1 += 1;
506                    self.key_cursor2 += 1;
507                },
508                Ordering::Greater => {
509                    self.copy_key(source2, self.key_cursor2);
510                    self.key_cursor2 += 1;
511                },
512            }
513        }
514        /// Merge two ranges of values into `self`.
515        ///
516        /// If the compacted result contains values with non-empty updates, the function returns
517        /// an offset that should be recorded to indicate the upper extent of the result values.
518        fn merge_vals(
519            &mut self, 
520            (source1, mut lower1, upper1): (&RhhValStorage<L>, usize, usize), 
521            (source2, mut lower2, upper2): (&RhhValStorage<L>, usize, usize),
522        ) -> Option<usize> {
523            // Capture the initial number of values to determine if the merge was ultimately non-empty.
524            let init_vals = self.result.vals.len();
525            while lower1 < upper1 && lower2 < upper2 {
526                // We compare values, and fold in updates for the lowest values;
527                // if they are non-empty post-consolidation, we write the value.
528                // We could multi-way merge and it wouldn't be very complicated.
529                use ::std::cmp::Ordering;
530                match source1.vals.index(lower1).cmp(&source2.vals.index(lower2)) {
531                    Ordering::Less => { 
532                        // Extend stash by updates, with logical compaction applied.
533                        self.stash_updates_for_val(source1, lower1);
534                        if let Some(off) = self.consolidate_updates() {
535                            self.result.vals_offs.push(off);
536                            self.result.vals.push(source1.vals.index(lower1));
537                        }
538                        lower1 += 1;
539                    },
540                    Ordering::Equal => {
541                        self.stash_updates_for_val(source1, lower1);
542                        self.stash_updates_for_val(source2, lower2);
543                        if let Some(off) = self.consolidate_updates() {
544                            self.result.vals_offs.push(off);
545                            self.result.vals.push(source1.vals.index(lower1));
546                        }
547                        lower1 += 1;
548                        lower2 += 1;
549                    },
550                    Ordering::Greater => { 
551                        // Extend stash by updates, with logical compaction applied.
552                        self.stash_updates_for_val(source2, lower2);
553                        if let Some(off) = self.consolidate_updates() {
554                            self.result.vals_offs.push(off);
555                            self.result.vals.push(source2.vals.index(lower2));
556                        }
557                        lower2 += 1;
558                    },
559                }
560            }
561            // Merging is complete, but we may have remaining elements to push.
562            while lower1 < upper1 {
563                self.stash_updates_for_val(source1, lower1);
564                if let Some(off) = self.consolidate_updates() {
565                    self.result.vals_offs.push(off);
566                    self.result.vals.push(source1.vals.index(lower1));
567                }
568                lower1 += 1;
569            }
570            while lower2 < upper2 {
571                self.stash_updates_for_val(source2, lower2);
572                if let Some(off) = self.consolidate_updates() {
573                    self.result.vals_offs.push(off);
574                    self.result.vals.push(source2.vals.index(lower2));
575                }
576                lower2 += 1;
577            }
578
579            // Values being pushed indicate non-emptiness.
580            if self.result.vals.len() > init_vals {
581                Some(self.result.vals.len())
582            } else {
583                None
584            }
585        }
586
587        /// Transfer updates for an indexed value in `source` into `self`, with compaction applied.
588        fn stash_updates_for_val(&mut self, source: &RhhValStorage<L>, index: usize) {
589            let (lower, upper) = source.updates_for_value(index);
590            for i in lower .. upper {
591                // NB: Here is where we would need to look back if `lower == upper`.
592                let time = source.times.index(i);
593                let diff = source.diffs.index(i);
594                let mut new_time = time.into_owned();
595                use crate::lattice::Lattice;
596                new_time.advance_by(self.description.since().borrow());
597                self.update_stash.push((new_time, diff.into_owned()));
598            }
599        }
600
601        /// Consolidates `self.updates_stash` and produces the offset to record, if any.
602        fn consolidate_updates(&mut self) -> Option<usize> {
603            use crate::consolidation;
604            consolidation::consolidate(&mut self.update_stash);
605            if !self.update_stash.is_empty() {
606                // If there is a single element, equal to a just-prior recorded update,
607                // we push nothing and report an unincremented offset to encode this case.
608                let time_diff = self.result.times.last().zip(self.result.diffs.last());
609                let last_eq = self.update_stash.last().zip(time_diff).map(|((t1, d1), (t2, d2))| {
610                    let t1 = <<L::TimeContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(t1);
611                    let d1 = <<L::DiffContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(d1);
612                    t1.eq(&t2) && d1.eq(&d2)
613                });
614                if self.update_stash.len() == 1 && last_eq.unwrap_or(false) {
615                    // Just clear out update_stash, as we won't drain it here.
616                    self.update_stash.clear();
617                    self.singletons += 1;
618                }
619                else {
620                    // Conventional; move `update_stash` into `updates`.
621                    for (time, diff) in self.update_stash.drain(..) {
622                        self.result.times.push(time);
623                        self.result.diffs.push(diff);
624                    }
625                }
626                Some(self.result.times.len())
627            } else {
628                None
629            }
630        }
631    }
632
633
634    /// A cursor through a Robin Hood Hashed list of keys, vals, and such.
635    ///
636    /// The important detail is that not all of `keys` represent valid keys.
637    /// We must consult `storage.hashed` to see if the associated data is valid.
638    /// Importantly, we should skip over invalid keys, rather than report them as
639    /// invalid through `key_valid`: that method is meant to indicate the end of
640    /// the cursor, rather than internal state.
641    pub struct RhhValCursor<L: Layout> 
642    where 
643        <L::Target as Update>::Key: Default + HashOrdered,
644    {
645        /// Absolute position of the current key.
646        key_cursor: usize,
647        /// Absolute position of the current value.
648        val_cursor: usize,
649        /// Phantom marker for Rust happiness.
650        phantom: PhantomData<L>,
651    }
652
653    impl<L: Layout> Cursor for RhhValCursor<L> 
654    where 
655        <L::Target as Update>::Key: Default + HashOrdered,
656        for<'a> <L::KeyContainer as BatchContainer>::ReadItem<'a>: HashOrdered,
657    {
658        type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
659        type Val<'a> = <L::ValContainer as BatchContainer>::ReadItem<'a>;
660        type Time = <L::Target as Update>::Time;
661        type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
662        type Diff = <L::Target as Update>::Diff;
663        type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;
664
665        type Storage = RhhValBatch<L>;
666
667        fn get_key<'a>(&self, storage: &'a RhhValBatch<L>) -> Option<Self::Key<'a>> { storage.storage.keys.get(self.key_cursor) }
668        fn get_val<'a>(&self, storage: &'a RhhValBatch<L>) -> Option<Self::Val<'a>> { if self.val_valid(storage) { storage.storage.vals.get(self.val_cursor) } else { None } }
669        fn key<'a>(&self, storage: &'a RhhValBatch<L>) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
670        fn val<'a>(&self, storage: &'a RhhValBatch<L>) -> Self::Val<'a> { storage.storage.vals.index(self.val_cursor) }
671        fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &RhhValBatch<L>, mut logic: L2) {
672            let (lower, upper) = storage.storage.updates_for_value(self.val_cursor);
673            for index in lower .. upper {
674                let time = storage.storage.times.index(index);
675                let diff = storage.storage.diffs.index(index);
676                logic(time, diff);
677            }
678        }
679        fn key_valid(&self, storage: &RhhValBatch<L>) -> bool { self.key_cursor < storage.storage.keys.len() }
680        fn val_valid(&self, storage: &RhhValBatch<L>) -> bool { self.val_cursor < storage.storage.values_for_key(self.key_cursor).1 }
681        fn step_key(&mut self, storage: &RhhValBatch<L>){
682            // We advance the cursor by one for certain, and then as long as we need to find a valid key.
683            self.key_cursor += 1;
684            storage.storage.advance_to_live_key(&mut self.key_cursor);
685
686            if self.key_valid(storage) {
687                self.rewind_vals(storage);
688            }
689            else {
690                self.key_cursor = storage.storage.keys.len();
691            }
692        }
693        fn seek_key(&mut self, storage: &RhhValBatch<L>, key: Self::Key<'_>) {
694            // self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| x.lt(key));
695            let desired = storage.storage.desired_location(&key);
696            // Advance the cursor, if `desired` is ahead of it.
697            if self.key_cursor < desired {
698                self.key_cursor = desired;
699            }
700            // Advance the cursor as long as we have not found a value greater or equal to `key`.
701            // We may have already passed `key`, and confirmed its absence, but our goal is to
702            // find the next key afterwards so that users can, for example, alternately iterate.
703            while self.key_valid(storage) && storage.storage.advance_key(self.key_cursor, key) {
704                // TODO: Based on our encoding, we could skip logarithmically over empty regions by galloping
705                //       through `storage.keys_offs`, which stays put for dead space.
706                self.key_cursor += 1;
707            }
708
709            if self.key_valid(storage) {
710                self.rewind_vals(storage);
711            }
712        }
713        fn step_val(&mut self, storage: &RhhValBatch<L>) {
714            self.val_cursor += 1; 
715            if !self.val_valid(storage) {
716                self.val_cursor = storage.storage.values_for_key(self.key_cursor).1;
717            }
718        }
719        fn seek_val(&mut self, storage: &RhhValBatch<L>, val: Self::Val<'_>) {
720            self.val_cursor += storage.storage.vals.advance(self.val_cursor, storage.storage.values_for_key(self.key_cursor).1, |x| <L::ValContainer as BatchContainer>::reborrow(x).lt(&<L::ValContainer as BatchContainer>::reborrow(val)));
721        }
722        fn rewind_keys(&mut self, storage: &RhhValBatch<L>) {
723            self.key_cursor = 0;
724            storage.storage.advance_to_live_key(&mut self.key_cursor);
725
726            if self.key_valid(storage) {
727                self.rewind_vals(storage)
728            }
729        }
730        fn rewind_vals(&mut self, storage: &RhhValBatch<L>) {
731            self.val_cursor = storage.storage.values_for_key(self.key_cursor).0;
732        }
733    }
734
735    /// A builder for creating layers from unsorted update tuples.
736    pub struct RhhValBuilder<L: Layout, CI>
737    where 
738        <L::Target as Update>::Key: Default + HashOrdered,
739    {
740        result: RhhValStorage<L>,
741        singleton: Option<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
742        /// Counts the number of singleton optimizations we performed.
743        ///
744        /// This number allows us to correctly gauge the total number of updates reflected in a batch,
745        /// even though `updates.len()` may be much shorter than this amount.
746        singletons: usize,
747        _marker: PhantomData<CI>,
748    }
749
750    impl<L: Layout, CI> RhhValBuilder<L, CI>
751    where 
752        <L::Target as Update>::Key: Default + HashOrdered,
753    {
754        /// Pushes a single update, which may set `self.singleton` rather than push.
755        ///
756        /// This operation is meant to be equivalent to `self.results.updates.push((time, diff))`.
757        /// However, for "clever" reasons it does not do this. Instead, it looks for opportunities
758        /// to encode a singleton update with an "absert" update: repeating the most recent offset.
759        /// This otherwise invalid state encodes "look back one element".
760        ///
761        /// When `self.singleton` is `Some`, it means that we have seen one update and it matched the
762        /// previously pushed update exactly. In that case, we do not push the update into `updates`.
763        /// The update tuple is retained in `self.singleton` in case we see another update and need
764        /// to recover the singleton to push it into `updates` to join the second update.
765        fn push_update(&mut self, time: <L::Target as Update>::Time, diff: <L::Target as Update>::Diff) {
766            // If a just-pushed update exactly equals `(time, diff)` we can avoid pushing it.
767            let t1 = <<L::TimeContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&time);
768            let d1 = <<L::DiffContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&diff);
769            if self.result.times.last().map(|t| t == t1).unwrap_or(false) && self.result.diffs.last().map(|d| d == d1).unwrap_or(false) {
770                assert!(self.singleton.is_none());
771                self.singleton = Some((time, diff));
772            }
773            else {
774                // If we have pushed a single element, we need to copy it out to meet this one.
775                if let Some((time, diff)) = self.singleton.take() {
776                    self.result.times.push(time);
777                    self.result.diffs.push(diff);
778                }
779                self.result.times.push(time);
780                self.result.diffs.push(diff);
781            }
782        }
783    }
784
785    impl<L: Layout, CI> Builder for RhhValBuilder<L, CI>
786    where
787        <L::Target as Update>::Key: Default + HashOrdered,
788        CI: for<'a> BuilderInput<L::KeyContainer, L::ValContainer, Key<'a> = <L::Target as Update>::Key, Time=<L::Target as Update>::Time, Diff=<L::Target as Update>::Diff>,
789        for<'a> L::ValContainer: PushInto<CI::Val<'a>>,
790        for<'a> <L::KeyContainer as BatchContainer>::ReadItem<'a>: HashOrdered + IntoOwned<'a, Owned = <L::Target as Update>::Key>,
791    {
792        type Input = CI;
793        type Time = <L::Target as Update>::Time;
794        type Output = RhhValBatch<L>;
795
796        fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
797
798            // Double the capacity for RHH; probably excessive.
799            let rhh_capacity = 2 * keys;
800            let divisor = RhhValStorage::<L>::divisor_for_capacity(rhh_capacity);                        
801            // We want some additive slop, in case we spill over.
802            // This number magically chosen based on nothing in particular.
803            // Worst case, we will re-alloc and copy if we spill beyond this.
804            let keys = rhh_capacity + 10;
805
806            // We don't introduce zero offsets as they will be introduced by the first `push` call.
807            Self { 
808                result: RhhValStorage {
809                    keys: L::KeyContainer::with_capacity(keys),
810                    keys_offs: L::OffsetContainer::with_capacity(keys + 1),
811                    vals: L::ValContainer::with_capacity(vals),
812                    vals_offs: L::OffsetContainer::with_capacity(vals + 1),
813                    times: L::TimeContainer::with_capacity(upds),
814                    diffs: L::DiffContainer::with_capacity(upds),
815                    key_count: 0,
816                    key_capacity: rhh_capacity,
817                    divisor,
818                },
819                singleton: None,
820                singletons: 0,
821                _marker: PhantomData,
822            }
823        }
824
825        #[inline]
826        fn push(&mut self, chunk: &mut Self::Input) {
827            for item in chunk.drain() {
828                let (key, val, time, diff) = CI::into_parts(item);
829                // Perhaps this is a continuation of an already received key.
830                if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) {
831                    // Perhaps this is a continuation of an already received value.
832                    if self.result.vals.last().map(|v| CI::val_eq(&val, v)).unwrap_or(false) {
833                        self.push_update(time, diff);
834                    } else {
835                        // New value; complete representation of prior value.
836                        self.result.vals_offs.push(self.result.times.len());
837                        if self.singleton.take().is_some() { self.singletons += 1; }
838                        self.push_update(time, diff);
839                        self.result.vals.push(val);
840                    }
841                } else {
842                    // New key; complete representation of prior key.
843                    self.result.vals_offs.push(self.result.times.len());
844                    if self.singleton.take().is_some() { self.singletons += 1; }
845                    self.result.keys_offs.push(self.result.vals.len());
846                    self.push_update(time, diff);
847                    self.result.vals.push(val);
848                    // Insert the key, but with no specified offset.
849                    self.result.insert_key(IntoOwned::borrow_as(&key), None);
850                }
851            }
852        }
853
854        #[inline(never)]
855        fn done(mut self, description: Description<Self::Time>) -> RhhValBatch<L> {
856            // Record the final offsets
857            self.result.vals_offs.push(self.result.times.len());
858            // Remove any pending singleton, and if it was set increment our count.
859            if self.singleton.take().is_some() { self.singletons += 1; }
860            self.result.keys_offs.push(self.result.vals.len());
861            RhhValBatch {
862                updates: self.result.times.len() + self.singletons,
863                storage: self.result,
864                description,
865            }
866        }
867
868        fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
869            let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
870            let mut builder = Self::with_capacity(keys, vals, upds);
871            for mut chunk in chain.drain(..) {
872                builder.push(&mut chunk);
873            }
874
875            builder.done(description)
876        }
877    }
878
879}
880
881mod key_batch {
882
883    // Copy the above, once it works!
884
885}