differential_dataflow/trace/implementations/
ord_neu.rs

1//! Trace and batch implementations based on sorted ranges.
2//!
3//! The types and type aliases in this module start with either
4//!
5//! * `OrdVal`: Collections whose data have the form `(key, val)` where `key` is ordered.
6//! * `OrdKey`: Collections whose data have the form `key` where `key` is ordered.
7//!
8//! Although `OrdVal` is more general than `OrdKey`, the latter has a simpler representation
9//! and should consume fewer resources (computation and memory) when it applies.
10
11use std::rc::Rc;
12
13use crate::containers::TimelyStack;
14use crate::trace::implementations::chunker::{ColumnationChunker, VecChunker};
15use crate::trace::implementations::spine_fueled::Spine;
16use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger, ColMerger};
17use crate::trace::rc_blanket_impls::RcBuilder;
18
19use super::{Update, Layout, Vector, TStack, Preferred};
20
21pub use self::val_batch::{OrdValBatch, OrdValBuilder};
22pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder};
23
24/// A trace implementation using a spine of ordered lists.
25pub type OrdValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<Vector<((K,V),T,R)>>>>;
26/// A batcher using ordered lists.
27pub type OrdValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, VecChunker<((K,V),T,R)>, VecMerger<(K, V), T, R>>;
28/// A builder using ordered lists.
29pub type RcOrdValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Vector<((K,V),T,R)>, Vec<((K,V),T,R)>>>;
30
31// /// A trace implementation for empty values using a spine of ordered lists.
32// pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
33
34/// A trace implementation backed by columnar storage.
35pub type ColValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<TStack<((K,V),T,R)>>>>;
36/// A batcher for columnar storage.
37pub type ColValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, ColumnationChunker<((K,V),T,R)>, ColMerger<(K,V),T,R>>;
38/// A builder for columnar storage.
39pub type ColValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<TStack<((K,V),T,R)>, TimelyStack<((K,V),T,R)>>>;
40
41/// A trace implementation using a spine of ordered lists.
42pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
43/// A batcher for ordered lists.
44pub type OrdKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, VecChunker<((K,()),T,R)>, VecMerger<(K, ()), T, R>>;
45/// A builder for ordered lists.
46pub type RcOrdKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<Vector<((K,()),T,R)>, Vec<((K,()),T,R)>>>;
47
48// /// A trace implementation for empty values using a spine of ordered lists.
49// pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
50
51/// A trace implementation backed by columnar storage.
52pub type ColKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<TStack<((K,()),T,R)>>>>;
53/// A batcher for columnar storage
54pub type ColKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, ColumnationChunker<((K,()),T,R)>, ColMerger<(K,()),T,R>>;
55/// A builder for columnar storage
56pub type ColKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<TStack<((K,()),T,R)>, TimelyStack<((K,()),T,R)>>>;
57
58/// A trace implementation backed by columnar storage.
59pub type PreferredSpine<K, V, T, R> = Spine<Rc<OrdValBatch<Preferred<K,V,T,R>>>>;
60/// A batcher for columnar storage.
61pub type PreferredBatcher<K, V, T, R> = MergeBatcher<Vec<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>, ColumnationChunker<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>, ColMerger<(<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R>>;
62/// A builder for columnar storage.
63pub type PreferredBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Preferred<K,V,T,R>, TimelyStack<((<K as ToOwned>::Owned,<V as ToOwned>::Owned),T,R)>>>;
64
65// /// A trace implementation backed by columnar storage.
66// pub type ColKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<TStack<((K,()),T,R)>>>>;
67
68
69/// Types related to forming batches with values.
70pub mod val_batch {
71
72    use std::marker::PhantomData;
73    use serde::{Deserialize, Serialize};
74    use timely::container::PushInto;
75    use timely::progress::{Antichain, frontier::AntichainRef};
76
77    use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
78    use crate::trace::implementations::{BatchContainer, BuilderInput};
79    use crate::IntoOwned;
80
81    use super::{Layout, Update};
82
83    /// An immutable collection of update tuples, from a contiguous interval of logical times.
84    #[derive(Debug, Serialize, Deserialize)]
85    pub struct OrdValStorage<L: Layout> {
86        /// An ordered list of keys, corresponding to entries in `keys_offs`.
87        pub keys: L::KeyContainer,
88        /// Offsets used to provide indexes from keys to values.
89        ///
90        /// The length of this list is one longer than `keys`, so that we can avoid bounds logic.
91        pub keys_offs: L::OffsetContainer,
92        /// Concatenated ordered lists of values, bracketed by offsets in `keys_offs`.
93        pub vals: L::ValContainer,
94        /// Offsets used to provide indexes from values to updates.
95        ///
96        /// This list has a special representation that any empty range indicates the singleton
97        /// element just before the range, as if the start were decremented by one. The empty
98        /// range is otherwise an invalid representation, and we borrow it to compactly encode
99        /// single common update values (e.g. in a snapshot, the minimal time and a diff of one).
100        ///
101        /// The length of this list is one longer than `vals`, so that we can avoid bounds logic.
102        pub vals_offs: L::OffsetContainer,
103        /// Concatenated ordered lists of update times, bracketed by offsets in `vals_offs`.
104        pub times: L::TimeContainer,
105        /// Concatenated ordered lists of update diffs, bracketed by offsets in `vals_offs`.
106        pub diffs: L::DiffContainer,
107    }
108
109    impl<L: Layout> OrdValStorage<L> {
110        /// Lower and upper bounds in `self.vals` corresponding to the key at `index`.
111        fn values_for_key(&self, index: usize) -> (usize, usize) {
112            (self.keys_offs.index(index), self.keys_offs.index(index+1))
113        }
114        /// Lower and upper bounds in `self.updates` corresponding to the value at `index`.
115        fn updates_for_value(&self, index: usize) -> (usize, usize) {
116            let mut lower = self.vals_offs.index(index);
117            let upper = self.vals_offs.index(index+1);
118            // We use equal lower and upper to encode "singleton update; just before here".
119            // It should only apply when there is a prior element, so `lower` should be greater than zero.
120            if lower == upper {
121                assert!(lower > 0);
122                lower -= 1;
123            }
124            (lower, upper)
125        }
126    }
127
128    /// An immutable collection of update tuples, from a contiguous interval of logical times.
129    ///
130    /// The `L` parameter captures how the updates should be laid out, and `C` determines which
131    /// merge batcher to select.
132    #[derive(Serialize, Deserialize)]
133    #[serde(bound = "
134        L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
135        L::ValContainer: Serialize + for<'a> Deserialize<'a>,
136        L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
137        L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
138        L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
139    ")]
140    pub struct OrdValBatch<L: Layout> {
141        /// The updates themselves.
142        pub storage: OrdValStorage<L>,
143        /// Description of the update times this layer represents.
144        pub description: Description<<L::Target as Update>::Time>,
145        /// The number of updates reflected in the batch.
146        ///
147        /// We track this separately from `storage` because due to the singleton optimization,
148        /// we may have many more updates than `storage.updates.len()`. It should equal that 
149        /// length, plus the number of singleton optimizations employed.
150        pub updates: usize,
151    }
152
153    impl<L: Layout> BatchReader for OrdValBatch<L> {
154        type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
155        type Val<'a> = <L::ValContainer as BatchContainer>::ReadItem<'a>;
156        type Time = <L::Target as Update>::Time;
157        type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
158        type Diff = <L::Target as Update>::Diff;
159        type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;
160
161        type Cursor = OrdValCursor<L>;
162        fn cursor(&self) -> Self::Cursor { 
163            OrdValCursor {
164                key_cursor: 0,
165                val_cursor: 0,
166                phantom: PhantomData,
167            }
168        }
169        fn len(&self) -> usize { 
170            // Normally this would be `self.updates.len()`, but we have a clever compact encoding.
171            // Perhaps we should count such exceptions to the side, to provide a correct accounting.
172            self.updates
173        }
174        fn description(&self) -> &Description<<L::Target as Update>::Time> { &self.description }
175    }
176
177    impl<L: Layout> Batch for OrdValBatch<L> {
178        type Merger = OrdValMerger<L>;
179
180        fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self::Merger {
181            OrdValMerger::new(self, other, compaction_frontier)
182        }
183
184        fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
185            use timely::progress::Timestamp;
186            Self {
187                storage: OrdValStorage {
188                    keys: L::KeyContainer::with_capacity(0),
189                    keys_offs: L::OffsetContainer::with_capacity(0),
190                    vals: L::ValContainer::with_capacity(0),
191                    vals_offs: L::OffsetContainer::with_capacity(0),
192                    times: L::TimeContainer::with_capacity(0),
193                    diffs: L::DiffContainer::with_capacity(0),
194                },
195                description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())),
196                updates: 0,
197            }
198        }
199    }
200
201    /// State for an in-progress merge.
202    pub struct OrdValMerger<L: Layout> {
203        /// Key position to merge next in the first batch.
204        key_cursor1: usize,
205        /// Key position to merge next in the second batch.
206        key_cursor2: usize,
207        /// result that we are currently assembling.
208        result: OrdValStorage<L>,
209        /// description
210        description: Description<<L::Target as Update>::Time>,
211
212        /// Local stash of updates, to use for consolidation.
213        ///
214        /// We could emulate a `ChangeBatch` here, with related compaction smarts.
215        /// A `ChangeBatch` itself needs an `i64` diff type, which we have not.
216        update_stash: Vec<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
217        /// Counts the number of singleton-optimized entries, that we may correctly count the updates.
218        singletons: usize,
219    }
220
221    impl<L: Layout> Merger<OrdValBatch<L>> for OrdValMerger<L>
222    where
223        OrdValBatch<L>: Batch<Time=<L::Target as Update>::Time>,
224        for<'a> <L::TimeContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Time>,
225        for<'a> <L::DiffContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Diff>,
226    {
227        fn new(batch1: &OrdValBatch<L>, batch2: &OrdValBatch<L>, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self {
228
229            assert!(batch1.upper() == batch2.lower());
230            use crate::lattice::Lattice;
231            let mut since = batch1.description().since().join(batch2.description().since());
232            since = since.join(&compaction_frontier.to_owned());
233
234            let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
235
236            let batch1 = &batch1.storage;
237            let batch2 = &batch2.storage;
238
239            let mut storage = OrdValStorage {
240                keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
241                keys_offs: L::OffsetContainer::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()),
242                vals: L::ValContainer::merge_capacity(&batch1.vals, &batch2.vals),
243                vals_offs: L::OffsetContainer::with_capacity(batch1.vals_offs.len() + batch2.vals_offs.len()),
244                times: L::TimeContainer::merge_capacity(&batch1.times, &batch2.times),
245                diffs: L::DiffContainer::merge_capacity(&batch1.diffs, &batch2.diffs),
246            };
247
248            // Mark explicit types because type inference fails to resolve it.
249            let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs;
250            keys_offs.push(0);
251            let vals_offs: &mut L::OffsetContainer = &mut storage.vals_offs;
252            vals_offs.push(0);
253
254            OrdValMerger {
255                key_cursor1: 0,
256                key_cursor2: 0,
257                result: storage,
258                description,
259                update_stash: Vec::new(),
260                singletons: 0,
261            }
262        }
263        fn done(self) -> OrdValBatch<L> {
264            OrdValBatch {
265                updates: self.result.times.len() + self.singletons,
266                storage: self.result,
267                description: self.description,
268            }
269        }
270        fn work(&mut self, source1: &OrdValBatch<L>, source2: &OrdValBatch<L>, fuel: &mut isize) {
271
272            // An (incomplete) indication of the amount of work we've done so far.
273            let starting_updates = self.result.times.len();
274            let mut effort = 0isize;
275
276            // While both mergees are still active, perform single-key merges.
277            while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
278                self.merge_key(&source1.storage, &source2.storage);
279                // An (incomplete) accounting of the work we've done.
280                effort = (self.result.times.len() - starting_updates) as isize;
281            }
282
283            // Merging is complete, and only copying remains. 
284            // Key-by-key copying allows effort interruption, and compaction.
285            while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
286                self.copy_key(&source1.storage, self.key_cursor1);
287                self.key_cursor1 += 1;
288                effort = (self.result.times.len() - starting_updates) as isize;
289            }
290            while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
291                self.copy_key(&source2.storage, self.key_cursor2);
292                self.key_cursor2 += 1;
293                effort = (self.result.times.len() - starting_updates) as isize;
294            }
295
296            *fuel -= effort;
297        }
298    }
299
300    // Helper methods in support of merging batches.
301    impl<L: Layout> OrdValMerger<L> {
302        /// Copy the next key in `source`.
303        ///
304        /// The method extracts the key in `source` at `cursor`, and merges it in to `self`.
305        /// If the result does not wholly cancel, they key will be present in `self` with the
306        /// compacted values and updates.
307        ///
308        /// The caller should be certain to update the cursor, as this method does not do this.
309        fn copy_key(&mut self, source: &OrdValStorage<L>, cursor: usize) {
310            // Capture the initial number of values to determine if the merge was ultimately non-empty.
311            let init_vals = self.result.vals.len();
312            let (mut lower, upper) = source.values_for_key(cursor);
313            while lower < upper {
314                self.stash_updates_for_val(source, lower);
315                if let Some(off) = self.consolidate_updates() {
316                    self.result.vals_offs.push(off);
317                    self.result.vals.push(source.vals.index(lower));
318                }
319                lower += 1;
320            }            
321
322            // If we have pushed any values, copy the key as well.
323            if self.result.vals.len() > init_vals {
324                self.result.keys.push(source.keys.index(cursor));
325                self.result.keys_offs.push(self.result.vals.len());
326            }           
327        }
328        /// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors.
329        ///
330        /// This method only merges a single key. It applies all compaction necessary, and may result in no output
331        /// if the updates cancel either directly or after compaction.
332        fn merge_key(&mut self, source1: &OrdValStorage<L>, source2: &OrdValStorage<L>) {
333            use ::std::cmp::Ordering;
334            match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
335                Ordering::Less => { 
336                    self.copy_key(source1, self.key_cursor1);
337                    self.key_cursor1 += 1;
338                },
339                Ordering::Equal => {
340                    // Keys are equal; must merge all values from both sources for this one key.
341                    let (lower1, upper1) = source1.values_for_key(self.key_cursor1);
342                    let (lower2, upper2) = source2.values_for_key(self.key_cursor2);
343                    if let Some(off) = self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2)) {
344                        self.result.keys.push(source1.keys.index(self.key_cursor1));
345                        self.result.keys_offs.push(off);
346                    }
347                    // Increment cursors in either case; the keys are merged.
348                    self.key_cursor1 += 1;
349                    self.key_cursor2 += 1;
350                },
351                Ordering::Greater => {
352                    self.copy_key(source2, self.key_cursor2);
353                    self.key_cursor2 += 1;
354                },
355            }
356        }
357        /// Merge two ranges of values into `self`.
358        ///
359        /// If the compacted result contains values with non-empty updates, the function returns
360        /// an offset that should be recorded to indicate the upper extent of the result values.
361        fn merge_vals(
362            &mut self, 
363            (source1, mut lower1, upper1): (&OrdValStorage<L>, usize, usize), 
364            (source2, mut lower2, upper2): (&OrdValStorage<L>, usize, usize),
365        ) -> Option<usize> {
366            // Capture the initial number of values to determine if the merge was ultimately non-empty.
367            let init_vals = self.result.vals.len();
368            while lower1 < upper1 && lower2 < upper2 {
369                // We compare values, and fold in updates for the lowest values;
370                // if they are non-empty post-consolidation, we write the value.
371                // We could multi-way merge and it wouldn't be very complicated.
372                use ::std::cmp::Ordering;
373                match source1.vals.index(lower1).cmp(&source2.vals.index(lower2)) {
374                    Ordering::Less => { 
375                        // Extend stash by updates, with logical compaction applied.
376                        self.stash_updates_for_val(source1, lower1);
377                        if let Some(off) = self.consolidate_updates() {
378                            self.result.vals_offs.push(off);
379                            self.result.vals.push(source1.vals.index(lower1));
380                        }
381                        lower1 += 1;
382                    },
383                    Ordering::Equal => {
384                        self.stash_updates_for_val(source1, lower1);
385                        self.stash_updates_for_val(source2, lower2);
386                        if let Some(off) = self.consolidate_updates() {
387                            self.result.vals_offs.push(off);
388                            self.result.vals.push(source1.vals.index(lower1));
389                        }
390                        lower1 += 1;
391                        lower2 += 1;
392                    },
393                    Ordering::Greater => { 
394                        // Extend stash by updates, with logical compaction applied.
395                        self.stash_updates_for_val(source2, lower2);
396                        if let Some(off) = self.consolidate_updates() {
397                            self.result.vals_offs.push(off);
398                            self.result.vals.push(source2.vals.index(lower2));
399                        }
400                        lower2 += 1;
401                    },
402                }
403            }
404            // Merging is complete, but we may have remaining elements to push.
405            while lower1 < upper1 {
406                self.stash_updates_for_val(source1, lower1);
407                if let Some(off) = self.consolidate_updates() {
408                    self.result.vals_offs.push(off);
409                    self.result.vals.push(source1.vals.index(lower1));
410                }
411                lower1 += 1;
412            }
413            while lower2 < upper2 {
414                self.stash_updates_for_val(source2, lower2);
415                if let Some(off) = self.consolidate_updates() {
416                    self.result.vals_offs.push(off);
417                    self.result.vals.push(source2.vals.index(lower2));
418                }
419                lower2 += 1;
420            }
421
422            // Values being pushed indicate non-emptiness.
423            if self.result.vals.len() > init_vals {
424                Some(self.result.vals.len())
425            } else {
426                None
427            }
428        }
429
430        /// Transfer updates for an indexed value in `source` into `self`, with compaction applied.
431        fn stash_updates_for_val(&mut self, source: &OrdValStorage<L>, index: usize) {
432            let (lower, upper) = source.updates_for_value(index);
433            for i in lower .. upper {
434                // NB: Here is where we would need to look back if `lower == upper`.
435                let time = source.times.index(i);
436                let diff = source.diffs.index(i);
437                use crate::lattice::Lattice;
438                let mut new_time: <L::Target as Update>::Time = time.into_owned();
439                new_time.advance_by(self.description.since().borrow());
440                self.update_stash.push((new_time, diff.into_owned()));
441            }
442        }
443
444        /// Consolidates `self.updates_stash` and produces the offset to record, if any.
445        fn consolidate_updates(&mut self) -> Option<usize> {
446            use crate::consolidation;
447            consolidation::consolidate(&mut self.update_stash);
448            if !self.update_stash.is_empty() {
449                // If there is a single element, equal to a just-prior recorded update,
450                // we push nothing and report an unincremented offset to encode this case.
451                let time_diff = self.result.times.last().zip(self.result.diffs.last());
452                let last_eq = self.update_stash.last().zip(time_diff).map(|((t1, d1), (t2, d2))| {
453                    let t1 = <<L::TimeContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(t1);
454                    let d1 = <<L::DiffContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(d1);
455                    t1.eq(&t2) && d1.eq(&d2)
456                });
457                if self.update_stash.len() == 1 && last_eq.unwrap_or(false) {
458                    // Just clear out update_stash, as we won't drain it here.
459                    self.update_stash.clear();
460                    self.singletons += 1;
461                }
462                else {
463                    // Conventional; move `update_stash` into `updates`.
464                    for (time, diff) in self.update_stash.drain(..) {
465                        self.result.times.push(time);
466                        self.result.diffs.push(diff);
467                    }
468                }
469                Some(self.result.times.len())
470            } else {
471                None
472            }
473        }
474    }
475
476    /// A cursor for navigating a single layer.
477    pub struct OrdValCursor<L: Layout> {
478        /// Absolute position of the current key.
479        key_cursor: usize,
480        /// Absolute position of the current value.
481        val_cursor: usize,
482        /// Phantom marker for Rust happiness.
483        phantom: PhantomData<L>,
484    }
485
486    impl<L: Layout> Cursor for OrdValCursor<L> {
487
488        type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
489        type Val<'a> = <L::ValContainer as BatchContainer>::ReadItem<'a>;
490        type Time = <L::Target as Update>::Time;
491        type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
492        type Diff = <L::Target as Update>::Diff;
493        type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;
494
495        type Storage = OrdValBatch<L>;
496
497        fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { storage.storage.keys.get(self.key_cursor) }
498        fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { if self.val_valid(storage) { Some(self.val(storage)) } else { None } }
499
500        fn key<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
501        fn val<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Val<'a> { storage.storage.vals.index(self.val_cursor) }
502        fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &OrdValBatch<L>, mut logic: L2) {
503            let (lower, upper) = storage.storage.updates_for_value(self.val_cursor);
504            for index in lower .. upper {
505                let time = storage.storage.times.index(index);
506                let diff = storage.storage.diffs.index(index);
507                logic(time, diff);
508            }
509        }
510        fn key_valid(&self, storage: &OrdValBatch<L>) -> bool { self.key_cursor < storage.storage.keys.len() }
511        fn val_valid(&self, storage: &OrdValBatch<L>) -> bool { self.val_cursor < storage.storage.values_for_key(self.key_cursor).1 }
512        fn step_key(&mut self, storage: &OrdValBatch<L>){
513            self.key_cursor += 1;
514            if self.key_valid(storage) {
515                self.rewind_vals(storage);
516            }
517            else {
518                self.key_cursor = storage.storage.keys.len();
519            }
520        }
521        fn seek_key(&mut self, storage: &OrdValBatch<L>, key: Self::Key<'_>) {
522            self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| <L::KeyContainer as BatchContainer>::reborrow(x).lt(&<L::KeyContainer as BatchContainer>::reborrow(key)));
523            if self.key_valid(storage) {
524                self.rewind_vals(storage);
525            }
526        }
527        fn step_val(&mut self, storage: &OrdValBatch<L>) {
528            self.val_cursor += 1; 
529            if !self.val_valid(storage) {
530                self.val_cursor = storage.storage.values_for_key(self.key_cursor).1;
531            }
532        }
533        fn seek_val(&mut self, storage: &OrdValBatch<L>, val: Self::Val<'_>) {
534            self.val_cursor += storage.storage.vals.advance(self.val_cursor, storage.storage.values_for_key(self.key_cursor).1, |x| <L::ValContainer as BatchContainer>::reborrow(x).lt(&<L::ValContainer as BatchContainer>::reborrow(val)));
535        }
536        fn rewind_keys(&mut self, storage: &OrdValBatch<L>) {
537            self.key_cursor = 0;
538            if self.key_valid(storage) {
539                self.rewind_vals(storage)
540            }
541        }
542        fn rewind_vals(&mut self, storage: &OrdValBatch<L>) {
543            self.val_cursor = storage.storage.values_for_key(self.key_cursor).0;
544        }
545    }
546
547    /// A builder for creating layers from unsorted update tuples.
548    pub struct OrdValBuilder<L: Layout, CI> {
549        /// The in-progress result.
550        ///
551        /// This is public to allow container implementors to set and inspect their container.
552        pub result: OrdValStorage<L>,
553        singleton: Option<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
554        /// Counts the number of singleton optimizations we performed.
555        ///
556        /// This number allows us to correctly gauge the total number of updates reflected in a batch,
557        /// even though `updates.len()` may be much shorter than this amount.
558        singletons: usize,
559        _marker: PhantomData<CI>,
560    }
561
562    impl<L: Layout, CI> OrdValBuilder<L, CI> {
563        /// Pushes a single update, which may set `self.singleton` rather than push.
564        ///
565        /// This operation is meant to be equivalent to `self.results.updates.push((time, diff))`.
566        /// However, for "clever" reasons it does not do this. Instead, it looks for opportunities
567        /// to encode a singleton update with an "absert" update: repeating the most recent offset.
568        /// This otherwise invalid state encodes "look back one element".
569        ///
570        /// When `self.singleton` is `Some`, it means that we have seen one update and it matched the
571        /// previously pushed update exactly. In that case, we do not push the update into `updates`.
572        /// The update tuple is retained in `self.singleton` in case we see another update and need
573        /// to recover the singleton to push it into `updates` to join the second update.
574        fn push_update(&mut self, time: <L::Target as Update>::Time, diff: <L::Target as Update>::Diff) {
575            // If a just-pushed update exactly equals `(time, diff)` we can avoid pushing it.
576            if self.result.times.last().map(|t| t == <<L::TimeContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&time)) == Some(true) &&
577               self.result.diffs.last().map(|d| d == <<L::DiffContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&diff)) == Some(true)
578            {
579                assert!(self.singleton.is_none());
580                self.singleton = Some((time, diff));
581            }
582            else {
583                // If we have pushed a single element, we need to copy it out to meet this one.
584                if let Some((time, diff)) = self.singleton.take() {
585                    self.result.times.push(time);
586                    self.result.diffs.push(diff);
587                }
588                self.result.times.push(time);
589                self.result.diffs.push(diff);
590            }
591        }
592    }
593
594    impl<L, CI> Builder for OrdValBuilder<L, CI>
595    where
596        L: for<'a> Layout<
597            KeyContainer: PushInto<CI::Key<'a>>,
598            ValContainer: PushInto<CI::Val<'a>>,
599        >,
600        CI: for<'a> BuilderInput<L::KeyContainer, L::ValContainer, Time=<L::Target as Update>::Time, Diff=<L::Target as Update>::Diff>,
601        for<'a> <L::TimeContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Time>,
602        for<'a> <L::DiffContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Diff>,
603    {
604
605        type Input = CI;
606        type Time = <L::Target as Update>::Time;
607        type Output = OrdValBatch<L>;
608
609        fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
610            // We don't introduce zero offsets as they will be introduced by the first `push` call.
611            Self { 
612                result: OrdValStorage {
613                    keys: L::KeyContainer::with_capacity(keys),
614                    keys_offs: L::OffsetContainer::with_capacity(keys + 1),
615                    vals: L::ValContainer::with_capacity(vals),
616                    vals_offs: L::OffsetContainer::with_capacity(vals + 1),
617                    times: L::TimeContainer::with_capacity(upds),
618                    diffs: L::DiffContainer::with_capacity(upds),
619                },
620                singleton: None,
621                singletons: 0,
622                _marker: PhantomData,
623            }
624        }
625
626        #[inline]
627        fn push(&mut self, chunk: &mut Self::Input) {
628            for item in chunk.drain() {
629                let (key, val, time, diff) = CI::into_parts(item);
630                // Perhaps this is a continuation of an already received key.
631                if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) {
632                    // Perhaps this is a continuation of an already received value.
633                    if self.result.vals.last().map(|v| CI::val_eq(&val, v)).unwrap_or(false) {
634                        self.push_update(time, diff);
635                    } else {
636                        // New value; complete representation of prior value.
637                        self.result.vals_offs.push(self.result.times.len());
638                        if self.singleton.take().is_some() { self.singletons += 1; }
639                        self.push_update(time, diff);
640                        self.result.vals.push(val);
641                    }
642                } else {
643                    // New key; complete representation of prior key.
644                    self.result.vals_offs.push(self.result.times.len());
645                    if self.singleton.take().is_some() { self.singletons += 1; }
646                    self.result.keys_offs.push(self.result.vals.len());
647                    self.push_update(time, diff);
648                    self.result.vals.push(val);
649                    self.result.keys.push(key);
650                }
651            }
652        }
653
654        #[inline(never)]
655        fn done(mut self, description: Description<Self::Time>) -> OrdValBatch<L> {
656            // Record the final offsets
657            self.result.vals_offs.push(self.result.times.len());
658            // Remove any pending singleton, and if it was set increment our count.
659            if self.singleton.take().is_some() { self.singletons += 1; }
660            self.result.keys_offs.push(self.result.vals.len());
661            OrdValBatch {
662                updates: self.result.times.len() + self.singletons,
663                storage: self.result,
664                description,
665            }
666        }
667
668        fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
669            let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
670            let mut builder = Self::with_capacity(keys, vals, upds);
671            for mut chunk in chain.drain(..) {
672                builder.push(&mut chunk);
673            }
674    
675            builder.done(description)
676        }
677    }
678}
679
680mod key_batch {
681
682    use std::marker::PhantomData;
683    use serde::{Deserialize, Serialize};
684    use timely::container::PushInto;
685    use timely::progress::{Antichain, frontier::AntichainRef};
686
687    use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
688    use crate::trace::implementations::{BatchContainer, BuilderInput};
689    use crate::IntoOwned;
690
691    use super::{Layout, Update};
692
693    /// An immutable collection of update tuples, from a contiguous interval of logical times.
694    #[derive(Debug, Serialize, Deserialize)]
695    pub struct OrdKeyStorage<L: Layout> {
696        /// An ordered list of keys, corresponding to entries in `keys_offs`.
697        pub keys: L::KeyContainer,
698        /// Offsets used to provide indexes from keys to updates.
699        ///
700        /// This list has a special representation that any empty range indicates the singleton
701        /// element just before the range, as if the start were decremented by one. The empty
702        /// range is otherwise an invalid representation, and we borrow it to compactly encode
703        /// single common update values (e.g. in a snapshot, the minimal time and a diff of one).
704        ///
705        /// The length of this list is one longer than `keys`, so that we can avoid bounds logic.
706        pub keys_offs: L::OffsetContainer,
707        /// Concatenated ordered lists of update times, bracketed by offsets in `vals_offs`.
708        pub times: L::TimeContainer,
709        /// Concatenated ordered lists of update diffs, bracketed by offsets in `vals_offs`.
710        pub diffs: L::DiffContainer,
711    }
712
713    impl<L: Layout> OrdKeyStorage<L> {
714        /// Lower and upper bounds in `self.vals` corresponding to the key at `index`.
715        fn updates_for_key(&self, index: usize) -> (usize, usize) {
716            let mut lower = self.keys_offs.index(index);
717            let upper = self.keys_offs.index(index+1);
718            // We use equal lower and upper to encode "singleton update; just before here".
719            // It should only apply when there is a prior element, so `lower` should be greater than zero.
720            if lower == upper {
721                assert!(lower > 0);
722                lower -= 1;
723            }
724            (lower, upper)
725        }
726    }
727
728    /// An immutable collection of update tuples, from a contiguous interval of logical times.
729    ///
730    /// The `L` parameter captures how the updates should be laid out, and `C` determines which
731    /// merge batcher to select.
732    #[derive(Serialize, Deserialize)]
733    #[serde(bound = "
734        L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
735        L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
736        L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
737        L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
738    ")]
739    pub struct OrdKeyBatch<L: Layout> {
740        /// The updates themselves.
741        pub storage: OrdKeyStorage<L>,
742        /// Description of the update times this layer represents.
743        pub description: Description<<L::Target as Update>::Time>,
744        /// The number of updates reflected in the batch.
745        ///
746        /// We track this separately from `storage` because due to the singleton optimization,
747        /// we may have many more updates than `storage.updates.len()`. It should equal that
748        /// length, plus the number of singleton optimizations employed.
749        pub updates: usize,
750    }
751
752    impl<L: Layout> BatchReader for OrdKeyBatch<L> {
753        
754        type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
755        type Val<'a> = &'a ();
756        type Time = <L::Target as Update>::Time;
757        type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
758        type Diff = <L::Target as Update>::Diff;
759        type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;
760
761        type Cursor = OrdKeyCursor<L>;
762        fn cursor(&self) -> Self::Cursor {
763            OrdKeyCursor {
764                key_cursor: 0,
765                val_stepped: false,
766                phantom: std::marker::PhantomData,
767            }
768        }
769        fn len(&self) -> usize {
770            // Normally this would be `self.updates.len()`, but we have a clever compact encoding.
771            // Perhaps we should count such exceptions to the side, to provide a correct accounting.
772            self.updates
773        }
774        fn description(&self) -> &Description<<L::Target as Update>::Time> { &self.description }
775    }
776
777    impl<L: Layout> Batch for OrdKeyBatch<L> {
778        type Merger = OrdKeyMerger<L>;
779
780        fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self::Merger {
781            OrdKeyMerger::new(self, other, compaction_frontier)
782        }
783
784        fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
785            use timely::progress::Timestamp;
786            Self {
787                storage: OrdKeyStorage {
788                    keys: L::KeyContainer::with_capacity(0),
789                    keys_offs: L::OffsetContainer::with_capacity(0),
790                    times: L::TimeContainer::with_capacity(0),
791                    diffs: L::DiffContainer::with_capacity(0),
792                },
793                description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())),
794                updates: 0,
795            }
796        }
797    }
798
799    /// State for an in-progress merge.
800    pub struct OrdKeyMerger<L: Layout> {
801        /// Key position to merge next in the first batch.
802        key_cursor1: usize,
803        /// Key position to merge next in the second batch.
804        key_cursor2: usize,
805        /// result that we are currently assembling.
806        result: OrdKeyStorage<L>,
807        /// description
808        description: Description<<L::Target as Update>::Time>,
809
810        /// Local stash of updates, to use for consolidation.
811        ///
812        /// We could emulate a `ChangeBatch` here, with related compaction smarts.
813        /// A `ChangeBatch` itself needs an `i64` diff type, which we have not.
814        update_stash: Vec<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
815        /// Counts the number of singleton-optimized entries, that we may correctly count the updates.
816        singletons: usize,
817    }
818
819    impl<L: Layout> Merger<OrdKeyBatch<L>> for OrdKeyMerger<L>
820    where
821        OrdKeyBatch<L>: Batch<Time=<L::Target as Update>::Time>,
822        for<'a> <L::TimeContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Time>,
823        for<'a> <L::DiffContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Diff>,
824    {
825        fn new(batch1: &OrdKeyBatch<L>, batch2: &OrdKeyBatch<L>, compaction_frontier: AntichainRef<<L::Target as Update>::Time>) -> Self {
826
827            assert!(batch1.upper() == batch2.lower());
828            use crate::lattice::Lattice;
829            let mut since = batch1.description().since().join(batch2.description().since());
830            since = since.join(&compaction_frontier.to_owned());
831
832            let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
833
834            let batch1 = &batch1.storage;
835            let batch2 = &batch2.storage;
836
837            let mut storage = OrdKeyStorage {
838                keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
839                keys_offs: L::OffsetContainer::with_capacity(batch1.keys_offs.len() + batch2.keys_offs.len()),
840                times: L::TimeContainer::merge_capacity(&batch1.times, &batch2.times),
841                diffs: L::DiffContainer::merge_capacity(&batch1.diffs, &batch2.diffs),
842            };
843
844            let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs;
845            keys_offs.push(0);
846
847            OrdKeyMerger {
848                key_cursor1: 0,
849                key_cursor2: 0,
850                result: storage,
851                description,
852                update_stash: Vec::new(),
853                singletons: 0,
854            }
855        }
856        fn done(self) -> OrdKeyBatch<L> {
857            OrdKeyBatch {
858                updates: self.result.times.len() + self.singletons,
859                storage: self.result,
860                description: self.description,
861            }
862        }
863        fn work(&mut self, source1: &OrdKeyBatch<L>, source2: &OrdKeyBatch<L>, fuel: &mut isize) {
864
865            // An (incomplete) indication of the amount of work we've done so far.
866            let starting_updates = self.result.times.len();
867            let mut effort = 0isize;
868
869            // While both mergees are still active, perform single-key merges.
870            while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
871                self.merge_key(&source1.storage, &source2.storage);
872                // An (incomplete) accounting of the work we've done.
873                effort = (self.result.times.len() - starting_updates) as isize;
874            }
875
876            // Merging is complete, and only copying remains.
877            // Key-by-key copying allows effort interruption, and compaction.
878            while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
879                self.copy_key(&source1.storage, self.key_cursor1);
880                self.key_cursor1 += 1;
881                effort = (self.result.times.len() - starting_updates) as isize;
882            }
883            while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
884                self.copy_key(&source2.storage, self.key_cursor2);
885                self.key_cursor2 += 1;
886                effort = (self.result.times.len() - starting_updates) as isize;
887            }
888
889            *fuel -= effort;
890        }
891    }
892
893    // Helper methods in support of merging batches.
894    impl<L: Layout> OrdKeyMerger<L> {
895        /// Copy the next key in `source`.
896        ///
897        /// The method extracts the key in `source` at `cursor`, and merges it in to `self`.
898        /// If the result does not wholly cancel, they key will be present in `self` with the
899        /// compacted values and updates. 
900        /// 
901        /// The caller should be certain to update the cursor, as this method does not do this.
902        fn copy_key(&mut self, source: &OrdKeyStorage<L>, cursor: usize) {
903            self.stash_updates_for_key(source, cursor);
904            if let Some(off) = self.consolidate_updates() {
905                self.result.keys_offs.push(off);
906                self.result.keys.push(source.keys.index(cursor));
907            }
908        }
909        /// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors.
910        ///
911        /// This method only merges a single key. It applies all compaction necessary, and may result in no output
912        /// if the updates cancel either directly or after compaction.
913        fn merge_key(&mut self, source1: &OrdKeyStorage<L>, source2: &OrdKeyStorage<L>) {
914            use ::std::cmp::Ordering;
915            match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
916                Ordering::Less => { 
917                    self.copy_key(source1, self.key_cursor1);
918                    self.key_cursor1 += 1;
919                },
920                Ordering::Equal => {
921                    // Keys are equal; must merge all updates from both sources for this one key.
922                    self.stash_updates_for_key(source1, self.key_cursor1);
923                    self.stash_updates_for_key(source2, self.key_cursor2);
924                    if let Some(off) = self.consolidate_updates() {
925                        self.result.keys_offs.push(off);
926                        self.result.keys.push(source1.keys.index(self.key_cursor1));
927                    }
928                    // Increment cursors in either case; the keys are merged.
929                    self.key_cursor1 += 1;
930                    self.key_cursor2 += 1;
931                },
932                Ordering::Greater => {
933                    self.copy_key(source2, self.key_cursor2);
934                    self.key_cursor2 += 1;
935                },
936            }
937        }
938
939        /// Transfer updates for an indexed value in `source` into `self`, with compaction applied.
940        fn stash_updates_for_key(&mut self, source: &OrdKeyStorage<L>, index: usize) {
941            let (lower, upper) = source.updates_for_key(index);
942            for i in lower .. upper {
943                // NB: Here is where we would need to look back if `lower == upper`.
944                let time = source.times.index(i);
945                let diff = source.diffs.index(i);
946                use crate::lattice::Lattice;
947                let mut new_time = time.into_owned();
948                new_time.advance_by(self.description.since().borrow());
949                self.update_stash.push((new_time, diff.into_owned()));
950            }
951        }
952
953        /// Consolidates `self.updates_stash` and produces the offset to record, if any.
954        fn consolidate_updates(&mut self) -> Option<usize> {
955            use crate::consolidation;
956            consolidation::consolidate(&mut self.update_stash);
957            if !self.update_stash.is_empty() {
958                // If there is a single element, equal to a just-prior recorded update,
959                // we push nothing and report an unincremented offset to encode this case.
960                let time_diff = self.result.times.last().zip(self.result.diffs.last());
961                let last_eq = self.update_stash.last().zip(time_diff).map(|((t1, d1), (t2, d2))| {
962                    let t1 = <<L::TimeContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(t1);
963                    let d1 = <<L::DiffContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(d1);
964                    t1.eq(&t2) && d1.eq(&d2)
965                });
966                if self.update_stash.len() == 1 && last_eq.unwrap_or(false) {
967                    // Just clear out update_stash, as we won't drain it here.
968                    self.update_stash.clear();
969                    self.singletons += 1;
970                }
971                else {
972                    // Conventional; move `update_stash` into `updates`.
973                    for (time, diff) in self.update_stash.drain(..) {
974                        self.result.times.push(time);
975                        self.result.diffs.push(diff);
976                    }
977                }
978                Some(self.result.times.len())
979            } else {
980                None
981            }
982        }
983    }
984
985    /// A cursor for navigating a single layer.
986    pub struct OrdKeyCursor<L: Layout> {
987        /// Absolute position of the current key.
988        key_cursor: usize,
989        /// If the value has been stepped for the key, there are no more values.
990        val_stepped: bool,
991        /// Phantom marker for Rust happiness.
992        phantom: PhantomData<L>,
993    }
994
995    impl<L: Layout> Cursor for OrdKeyCursor<L> {
996        type Key<'a> = <L::KeyContainer as BatchContainer>::ReadItem<'a>;
997        type Val<'a> = &'a ();
998        type Time = <L::Target as Update>::Time;
999        type TimeGat<'a> = <L::TimeContainer as BatchContainer>::ReadItem<'a>;
1000        type Diff = <L::Target as Update>::Diff;
1001        type DiffGat<'a> = <L::DiffContainer as BatchContainer>::ReadItem<'a>;
1002
1003        type Storage = OrdKeyBatch<L>;
1004
1005        fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { storage.storage.keys.get(self.key_cursor) }
1006        fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<&'a ()> { if self.val_valid(storage) { Some(&()) } else { None } }
1007
1008        fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
1009        fn val<'a>(&self, _storage: &'a Self::Storage) -> &'a () { &() }
1010        fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L2) {
1011            let (lower, upper) = storage.storage.updates_for_key(self.key_cursor);
1012            for index in lower .. upper {
1013                let time = storage.storage.times.index(index);
1014                let diff = storage.storage.diffs.index(index);
1015                logic(time, diff);
1016            }
1017        }
1018        fn key_valid(&self, storage: &Self::Storage) -> bool { self.key_cursor < storage.storage.keys.len() }
1019        fn val_valid(&self, _storage: &Self::Storage) -> bool { !self.val_stepped }
1020        fn step_key(&mut self, storage: &Self::Storage){
1021            self.key_cursor += 1;
1022            if self.key_valid(storage) {
1023                self.rewind_vals(storage);
1024            }
1025            else {
1026                self.key_cursor = storage.storage.keys.len();
1027            }
1028        }
1029        fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) {
1030            self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| <L::KeyContainer as BatchContainer>::reborrow(x).lt(&<L::KeyContainer as BatchContainer>::reborrow(key)));
1031            if self.key_valid(storage) {
1032                self.rewind_vals(storage);
1033            }
1034        }
1035        fn step_val(&mut self, _storage: &Self::Storage) {
1036            self.val_stepped = true;
1037        }
1038        fn seek_val(&mut self, _storage: &Self::Storage, _val: Self::Val<'_>) { }
1039        fn rewind_keys(&mut self, storage: &Self::Storage) {
1040            self.key_cursor = 0;
1041            if self.key_valid(storage) {
1042                self.rewind_vals(storage)
1043            }
1044        }
1045        fn rewind_vals(&mut self, _storage: &Self::Storage) {
1046            self.val_stepped = false;
1047        }
1048    }
1049
1050    /// A builder for creating layers from unsorted update tuples.
1051    pub struct OrdKeyBuilder<L: Layout, CI> {
1052        /// The in-progress result.
1053        ///
1054        /// This is public to allow container implementors to set and inspect their container.
1055        pub result: OrdKeyStorage<L>,
1056        singleton: Option<(<L::Target as Update>::Time, <L::Target as Update>::Diff)>,
1057        /// Counts the number of singleton optimizations we performed.
1058        ///
1059        /// This number allows us to correctly gauge the total number of updates reflected in a batch,
1060        /// even though `updates.len()` may be much shorter than this amount.
1061        singletons: usize,
1062        _marker: PhantomData<CI>,
1063    }
1064
1065    impl<L: Layout, CI> OrdKeyBuilder<L, CI> {
1066        /// Pushes a single update, which may set `self.singleton` rather than push.
1067        ///
1068        /// This operation is meant to be equivalent to `self.results.updates.push((time, diff))`.
1069        /// However, for "clever" reasons it does not do this. Instead, it looks for opportunities
1070        /// to encode a singleton update with an "absert" update: repeating the most recent offset.
1071        /// This otherwise invalid state encodes "look back one element".
1072        ///
1073        /// When `self.singleton` is `Some`, it means that we have seen one update and it matched the
1074        /// previously pushed update exactly. In that case, we do not push the update into `updates`.
1075        /// The update tuple is retained in `self.singleton` in case we see another update and need
1076        /// to recover the singleton to push it into `updates` to join the second update.
1077        fn push_update(&mut self, time: <L::Target as Update>::Time, diff: <L::Target as Update>::Diff) {
1078            // If a just-pushed update exactly equals `(time, diff)` we can avoid pushing it.
1079            let t1 = <<L::TimeContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&time);
1080            let d1 = <<L::DiffContainer as BatchContainer>::ReadItem<'_> as IntoOwned>::borrow_as(&diff);
1081            if self.result.times.last().map(|t| t == t1).unwrap_or(false) && self.result.diffs.last().map(|d| d == d1).unwrap_or(false) {
1082                assert!(self.singleton.is_none());
1083                self.singleton = Some((time, diff));
1084            }
1085            else {
1086                // If we have pushed a single element, we need to copy it out to meet this one.
1087                if let Some((time, diff)) = self.singleton.take() {
1088                    self.result.times.push(time);
1089                    self.result.diffs.push(diff);
1090                }
1091                self.result.times.push(time);
1092                self.result.diffs.push(diff);
1093            }
1094        }
1095    }
1096
1097    impl<L: Layout, CI> Builder for OrdKeyBuilder<L, CI>
1098    where
1099        L: for<'a> Layout<KeyContainer: PushInto<CI::Key<'a>>>,
1100        for<'a> <L::TimeContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Time>,
1101        for<'a> <L::DiffContainer as BatchContainer>::ReadItem<'a> : IntoOwned<'a, Owned = <L::Target as Update>::Diff>,
1102        CI: BuilderInput<L::KeyContainer, L::ValContainer, Time=<L::Target as Update>::Time, Diff=<L::Target as Update>::Diff>,
1103    {
1104
1105        type Input = CI;
1106        type Time = <L::Target as Update>::Time;
1107        type Output = OrdKeyBatch<L>;
1108
1109        fn with_capacity(keys: usize, _vals: usize, upds: usize) -> Self {
1110            // We don't introduce zero offsets as they will be introduced by the first `push` call.
1111            Self { 
1112                result: OrdKeyStorage {
1113                    keys: L::KeyContainer::with_capacity(keys),
1114                    keys_offs: L::OffsetContainer::with_capacity(keys + 1),
1115                    times: L::TimeContainer::with_capacity(upds),
1116                    diffs: L::DiffContainer::with_capacity(upds),
1117                },
1118                singleton: None,
1119                singletons: 0,
1120                _marker: PhantomData,
1121            }
1122        }
1123
1124        #[inline]
1125        fn push(&mut self, chunk: &mut Self::Input) {
1126            for item in chunk.drain() {
1127                let (key, _val, time, diff) = CI::into_parts(item);
1128                // Perhaps this is a continuation of an already received key.
1129                if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) {
1130                    self.push_update(time, diff);
1131                } else {
1132                    // New key; complete representation of prior key.
1133                    self.result.keys_offs.push(self.result.times.len());
1134                    // Remove any pending singleton, and if it was set increment our count.
1135                    if self.singleton.take().is_some() { self.singletons += 1; }
1136                    self.push_update(time, diff);
1137                    self.result.keys.push(key);
1138                }
1139            }
1140        }
1141
1142        #[inline(never)]
1143        fn done(mut self, description: Description<Self::Time>) -> OrdKeyBatch<L> {
1144            // Record the final offsets
1145            self.result.keys_offs.push(self.result.times.len());
1146            // Remove any pending singleton, and if it was set increment our count.
1147            if self.singleton.take().is_some() { self.singletons += 1; }
1148            OrdKeyBatch {
1149                updates: self.result.times.len() + self.singletons,
1150                storage: self.result,
1151                description,
1152            }
1153        }
1154
1155        fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
1156            let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
1157            let mut builder = Self::with_capacity(keys, vals, upds);
1158            for mut chunk in chain.drain(..) {
1159                builder.push(&mut chunk);
1160            }
1161    
1162            builder.done(description)
1163        }
1164    }
1165
1166}