differential_dataflow/trace/implementations/
ord_neu.rs

1//! Trace and batch implementations based on sorted ranges.
2//!
3//! The types and type aliases in this module start with either
4//!
5//! * `OrdVal`: Collections whose data have the form `(key, val)` where `key` is ordered.
6//! * `OrdKey`: Collections whose data have the form `key` where `key` is ordered.
7//!
8//! Although `OrdVal` is more general than `OrdKey`, the latter has a simpler representation
9//! and should consume fewer resources (computation and memory) when it applies.
10
11use std::rc::Rc;
12
13use crate::containers::TimelyStack;
14use crate::trace::implementations::chunker::{ColumnationChunker, VecChunker};
15use crate::trace::implementations::spine_fueled::Spine;
16use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger, ColMerger};
17use crate::trace::rc_blanket_impls::RcBuilder;
18
19use super::{Layout, Vector, TStack};
20
21pub use self::val_batch::{OrdValBatch, OrdValBuilder};
22pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder};
23
24/// A trace implementation using a spine of ordered lists.
25pub type OrdValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<Vector<((K,V),T,R)>>>>;
26/// A batcher using ordered lists.
27pub type OrdValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, VecChunker<((K,V),T,R)>, VecMerger<(K, V), T, R>>;
28/// A builder using ordered lists.
29pub type RcOrdValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Vector<((K,V),T,R)>, Vec<((K,V),T,R)>>>;
30
31// /// A trace implementation for empty values using a spine of ordered lists.
32// pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
33
34/// A trace implementation backed by columnar storage.
35pub type ColValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<TStack<((K,V),T,R)>>>>;
36/// A batcher for columnar storage.
37pub type ColValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, ColumnationChunker<((K,V),T,R)>, ColMerger<(K,V),T,R>>;
38/// A builder for columnar storage.
39pub type ColValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<TStack<((K,V),T,R)>, TimelyStack<((K,V),T,R)>>>;
40
41/// A trace implementation using a spine of ordered lists.
42pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
43/// A batcher for ordered lists.
44pub type OrdKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, VecChunker<((K,()),T,R)>, VecMerger<(K, ()), T, R>>;
45/// A builder for ordered lists.
46pub type RcOrdKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<Vector<((K,()),T,R)>, Vec<((K,()),T,R)>>>;
47
48// /// A trace implementation for empty values using a spine of ordered lists.
49// pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
50
51/// A trace implementation backed by columnar storage.
52pub type ColKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<TStack<((K,()),T,R)>>>>;
53/// A batcher for columnar storage
54pub type ColKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, ColumnationChunker<((K,()),T,R)>, ColMerger<(K,()),T,R>>;
55/// A builder for columnar storage
56pub type ColKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<TStack<((K,()),T,R)>, TimelyStack<((K,()),T,R)>>>;
57
58// /// A trace implementation backed by columnar storage.
59// pub type ColKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<TStack<((K,()),T,R)>>>>;
60
61pub use layers::{Vals, Upds};
62/// Layers are containers of lists of some type.
63///
64/// The intent is that they "attach" to an outer layer which has as many values
65/// as the layer has lists, thereby associating a list with each outer value.
66/// A sequence of layers, each matching the number of values in its predecessor,
67/// forms a layered trie: a tree with values of some type on nodes at each depth.
68///
69/// We will form tries here by layering `[Keys, Vals, Upds]` or `[Keys, Upds]`.
70pub mod layers {
71
72    use serde::{Deserialize, Serialize};
73    use crate::trace::implementations::BatchContainer;
74
75    /// A container for non-empty lists of values.
76    #[derive(Debug, Serialize, Deserialize)]
77    pub struct Vals<O, V> {
78        /// Offsets used to provide indexes from keys to values.
79        ///
80        /// The length of this list is one longer than `keys`, so that we can avoid bounds logic.
81        pub offs: O,
82        /// Concatenated ordered lists of values, bracketed by offsets in `offs`.
83        pub vals: V,
84    }
85
86    impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, V: BatchContainer> Default for Vals<O, V> {
87        fn default() -> Self { Self::with_capacity(0, 0) }
88    }
89
90    impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, V: BatchContainer> Vals<O, V> {
91        /// Lower and upper bounds in `self.vals` of the indexed list.
92        pub fn bounds(&self, index: usize) -> (usize, usize) {
93            (self.offs.index(index), self.offs.index(index+1))
94        }
95        /// Retrieves a value using relative indexes.
96        ///
97        /// The first index identifies a list, and the second an item within the list.
98        /// The method adds the list's lower bound to the item index, and then calls
99        /// `get_abs`. Using absolute indexes within the list's bounds can be more
100        /// efficient than using relative indexing.
101        pub fn get_rel(&self, list_idx: usize, item_idx: usize) -> V::ReadItem<'_> {
102            self.get_abs(self.bounds(list_idx).0 + item_idx)
103        }
104
105        /// Number of lists in the container.
106        pub fn len(&self) -> usize { self.offs.len() - 1 }
107        /// Retrieves a value using an absolute rather than relative index.
108        pub fn get_abs(&self, index: usize) -> V::ReadItem<'_> {
109            self.vals.index(index)
110        }
111        /// Allocates with capacities for a number of lists and values.
112        pub fn with_capacity(o_size: usize, v_size: usize) -> Self {
113            let mut offs = <O as BatchContainer>::with_capacity(o_size);
114            offs.push_ref(0);
115            Self {
116                offs,
117                vals: <V as BatchContainer>::with_capacity(v_size),
118            }
119        }
120        /// Allocates with enough capacity to contain two inputs.
121        pub fn merge_capacity(this: &Self, that: &Self) -> Self {
122            let mut offs = <O as BatchContainer>::with_capacity(this.offs.len() + that.offs.len());
123            offs.push_ref(0);
124            Self {
125                offs,
126                vals: <V as BatchContainer>::merge_capacity(&this.vals, &that.vals),
127            }
128        }
129    }
130
131    /// A container for non-empty lists of updates.
132    ///
133    /// This container uses the special representiation of an empty slice to stand in for
134    /// "the previous single element". An empty slice is an otherwise invalid representation.
135    #[derive(Debug, Serialize, Deserialize)]
136    pub struct Upds<O, T, D> {
137        /// Offsets used to provide indexes from values to updates.
138        pub offs: O,
139        /// Concatenated ordered lists of update times, bracketed by offsets in `offs`.
140        pub times: T,
141        /// Concatenated ordered lists of update diffs, bracketed by offsets in `offs`.
142        pub diffs: D,
143    }
144
145    impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, T: BatchContainer, D: BatchContainer> Default for Upds<O, T, D> {
146        fn default() -> Self { Self::with_capacity(0, 0) }
147    }
148    impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, T: BatchContainer, D: BatchContainer> Upds<O, T, D> {
149        /// Lower and upper bounds in `self.times` and `self.diffs` of the indexed list.
150        pub fn bounds(&self, index: usize) -> (usize, usize) {
151            let mut lower = self.offs.index(index);
152            let upper = self.offs.index(index+1);
153            // We use equal lower and upper to encode "singleton update; just before here".
154            // It should only apply when there is a prior element, so `lower` should be greater than zero.
155            if lower == upper {
156                assert!(lower > 0);
157                lower -= 1;
158            }
159            (lower, upper)
160        }
161        /// Retrieves a value using relative indexes.
162        ///
163        /// The first index identifies a list, and the second an item within the list.
164        /// The method adds the list's lower bound to the item index, and then calls
165        /// `get_abs`. Using absolute indexes within the list's bounds can be more
166        /// efficient than using relative indexing.
167        pub fn get_rel(&self, list_idx: usize, item_idx: usize) -> (T::ReadItem<'_>, D::ReadItem<'_>) {
168            self.get_abs(self.bounds(list_idx).0 + item_idx)
169        }
170
171        /// Number of lists in the container.
172        pub fn len(&self) -> usize { self.offs.len() - 1 }
173        /// Retrieves a value using an absolute rather than relative index.
174        pub fn get_abs(&self, index: usize) -> (T::ReadItem<'_>, D::ReadItem<'_>) {
175            (self.times.index(index), self.diffs.index(index))
176        }
177        /// Allocates with capacities for a number of lists and values.
178        pub fn with_capacity(o_size: usize, u_size: usize) -> Self {
179            let mut offs = <O as BatchContainer>::with_capacity(o_size);
180            offs.push_ref(0);
181            Self {
182                offs,
183                times: <T as BatchContainer>::with_capacity(u_size),
184                diffs: <D as BatchContainer>::with_capacity(u_size),
185            }
186        }
187        /// Allocates with enough capacity to contain two inputs.
188        pub fn merge_capacity(this: &Self, that: &Self) -> Self {
189            let mut offs = <O as BatchContainer>::with_capacity(this.offs.len() + that.offs.len());
190            offs.push_ref(0);
191            Self {
192                offs,
193                times: <T as BatchContainer>::merge_capacity(&this.times, &that.times),
194                diffs: <D as BatchContainer>::merge_capacity(&this.diffs, &that.diffs),
195            }
196        }
197    }
198
199    /// Helper type for constructing `Upds` containers.
200    pub struct UpdsBuilder<T: BatchContainer, D: BatchContainer> {
201        /// Local stash of updates, to use for consolidation.
202        ///
203        /// We could emulate a `ChangeBatch` here, with related compaction smarts.
204        /// A `ChangeBatch` itself needs an `i64` diff type, which we have not.
205        stash: Vec<(T::Owned, D::Owned)>,
206        /// Total number of consolidated updates.
207        ///
208        /// Tracked independently to account for duplicate compression.
209        total: usize,
210
211        /// Time container to stage singleton times for evaluation.
212        time_con: T,
213        /// Diff container to stage singleton times for evaluation.
214        diff_con: D,
215    }
216
217    impl<T: BatchContainer, D: BatchContainer> Default for UpdsBuilder<T, D> {
218        fn default() -> Self { Self { stash: Vec::default(), total: 0, time_con: BatchContainer::with_capacity(1), diff_con: BatchContainer::with_capacity(1) } }
219    }
220
221
222    impl<T, D> UpdsBuilder<T, D>
223    where
224        T: BatchContainer<Owned: Ord>,
225        D: BatchContainer<Owned: crate::difference::Semigroup>,
226    {
227        /// Stages one update, but does not seal the set of updates.
228        pub fn push(&mut self, time: T::Owned, diff: D::Owned) {
229            self.stash.push((time, diff));
230        }
231
232        /// Consolidate and insert (if non-empty) the stashed updates.
233        ///
234        /// The return indicates whether the results were indeed non-empty.
235        pub fn seal<O: for<'a> BatchContainer<ReadItem<'a> = usize>>(&mut self, upds: &mut Upds<O, T, D>) -> bool {
236            use crate::consolidation;
237            consolidation::consolidate(&mut self.stash);
238            // If everything consolidates away, return false.
239            if self.stash.is_empty() { return false; }
240            // If there is a singleton, we may be able to optimize.
241            if self.stash.len() == 1 {
242                let (time, diff) = self.stash.last().unwrap();
243                self.time_con.clear(); self.time_con.push_own(time);
244                self.diff_con.clear(); self.diff_con.push_own(diff);
245                if upds.times.last() == self.time_con.get(0) && upds.diffs.last() == self.diff_con.get(0) {
246                    self.total += 1;
247                    self.stash.clear();
248                    upds.offs.push_ref(upds.times.len());
249                    return true;
250                }
251            }
252            // Conventional; move `stash` into `updates`.
253            self.total += self.stash.len();
254            for (time, diff) in self.stash.drain(..) {
255                upds.times.push_own(&time);
256                upds.diffs.push_own(&diff);
257            }
258            upds.offs.push_ref(upds.times.len());
259            true
260        }
261
262        /// Completes the building and returns the total updates sealed.
263        pub fn total(&self) -> usize { self.total }
264    }
265}
266
267/// Types related to forming batches with values.
268pub mod val_batch {
269
270    use std::marker::PhantomData;
271    use serde::{Deserialize, Serialize};
272    use timely::container::PushInto;
273    use timely::progress::{Antichain, frontier::AntichainRef};
274
275    use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
276    use crate::trace::implementations::{BatchContainer, BuilderInput};
277    use crate::trace::implementations::layout;
278
279    use super::{Layout, Vals, Upds, layers::UpdsBuilder};
280
281    /// An immutable collection of update tuples, from a contiguous interval of logical times.
282    #[derive(Debug, Serialize, Deserialize)]
283    #[serde(bound = "
284        L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
285        L::ValContainer: Serialize + for<'a> Deserialize<'a>,
286        L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
287        L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
288        L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
289    ")]
290    pub struct OrdValStorage<L: Layout> {
291        /// An ordered list of keys.
292        pub keys: L::KeyContainer,
293        /// For each key in `keys`, a list of values.
294        pub vals: Vals<L::OffsetContainer, L::ValContainer>,
295        /// For each val in `vals`, a list of (time, diff) updates.
296        pub upds: Upds<L::OffsetContainer, L::TimeContainer, L::DiffContainer>,
297    }
298
299    /// An immutable collection of update tuples, from a contiguous interval of logical times.
300    ///
301    /// The `L` parameter captures how the updates should be laid out, and `C` determines which
302    /// merge batcher to select.
303    #[derive(Serialize, Deserialize)]
304    #[serde(bound = "
305        L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
306        L::ValContainer: Serialize + for<'a> Deserialize<'a>,
307        L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
308        L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
309        L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
310    ")]
311    pub struct OrdValBatch<L: Layout> {
312        /// The updates themselves.
313        pub storage: OrdValStorage<L>,
314        /// Description of the update times this layer represents.
315        pub description: Description<layout::Time<L>>,
316        /// The number of updates reflected in the batch.
317        ///
318        /// We track this separately from `storage` because due to the singleton optimization,
319        /// we may have many more updates than `storage.updates.len()`. It should equal that
320        /// length, plus the number of singleton optimizations employed.
321        pub updates: usize,
322    }
323
324    impl<L: Layout> WithLayout for OrdValBatch<L> {
325        type Layout = L;
326    }
327
328    impl<L: Layout> BatchReader for OrdValBatch<L> {
329
330        type Cursor = OrdValCursor<L>;
331        fn cursor(&self) -> Self::Cursor {
332            OrdValCursor {
333                key_cursor: 0,
334                val_cursor: 0,
335                phantom: PhantomData,
336            }
337        }
338        fn len(&self) -> usize {
339            // Normally this would be `self.updates.len()`, but we have a clever compact encoding.
340            // Perhaps we should count such exceptions to the side, to provide a correct accounting.
341            self.updates
342        }
343        fn description(&self) -> &Description<layout::Time<L>> { &self.description }
344    }
345
346    impl<L: Layout> Batch for OrdValBatch<L> {
347        type Merger = OrdValMerger<L>;
348
349        fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self::Merger {
350            OrdValMerger::new(self, other, compaction_frontier)
351        }
352
353        fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
354            use timely::progress::Timestamp;
355            Self {
356                storage: OrdValStorage {
357                    keys: L::KeyContainer::with_capacity(0),
358                    vals: Default::default(),
359                    upds: Default::default(),
360                },
361                description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())),
362                updates: 0,
363            }
364        }
365    }
366
367    /// State for an in-progress merge.
368    pub struct OrdValMerger<L: Layout> {
369        /// Key position to merge next in the first batch.
370        key_cursor1: usize,
371        /// Key position to merge next in the second batch.
372        key_cursor2: usize,
373        /// result that we are currently assembling.
374        result: OrdValStorage<L>,
375        /// description
376        description: Description<layout::Time<L>>,
377        /// Staging area to consolidate owned times and diffs, before sealing.
378        staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
379    }
380
381    impl<L: Layout> Merger<OrdValBatch<L>> for OrdValMerger<L>
382    where
383        OrdValBatch<L>: Batch<Time=layout::Time<L>>,
384    {
385        fn new(batch1: &OrdValBatch<L>, batch2: &OrdValBatch<L>, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self {
386
387            assert!(batch1.upper() == batch2.lower());
388            use crate::lattice::Lattice;
389            let mut since = batch1.description().since().join(batch2.description().since());
390            since = since.join(&compaction_frontier.to_owned());
391
392            let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
393
394            let batch1 = &batch1.storage;
395            let batch2 = &batch2.storage;
396
397            OrdValMerger {
398                key_cursor1: 0,
399                key_cursor2: 0,
400                result: OrdValStorage {
401                    keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
402                    vals: Vals::merge_capacity(&batch1.vals, &batch2.vals),
403                    upds: Upds::merge_capacity(&batch1.upds, &batch2.upds),
404                },
405                description,
406                staging: UpdsBuilder::default(),
407            }
408        }
409        fn done(self) -> OrdValBatch<L> {
410            OrdValBatch {
411                updates: self.staging.total(),
412                storage: self.result,
413                description: self.description,
414            }
415        }
416        fn work(&mut self, source1: &OrdValBatch<L>, source2: &OrdValBatch<L>, fuel: &mut isize) {
417
418            // An (incomplete) indication of the amount of work we've done so far.
419            let starting_updates = self.staging.total();
420            let mut effort = 0isize;
421
422            // While both mergees are still active, perform single-key merges.
423            while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
424                self.merge_key(&source1.storage, &source2.storage);
425                // An (incomplete) accounting of the work we've done.
426                effort = (self.staging.total() - starting_updates) as isize;
427            }
428
429            // Merging is complete, and only copying remains.
430            // Key-by-key copying allows effort interruption, and compaction.
431            while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
432                self.copy_key(&source1.storage, self.key_cursor1);
433                self.key_cursor1 += 1;
434                effort = (self.staging.total() - starting_updates) as isize;
435            }
436            while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
437                self.copy_key(&source2.storage, self.key_cursor2);
438                self.key_cursor2 += 1;
439                effort = (self.staging.total() - starting_updates) as isize;
440            }
441
442            *fuel -= effort;
443        }
444    }
445
446    // Helper methods in support of merging batches.
447    impl<L: Layout> OrdValMerger<L> {
448        /// Copy the next key in `source`.
449        ///
450        /// The method extracts the key in `source` at `cursor`, and merges it in to `self`.
451        /// If the result does not wholly cancel, they key will be present in `self` with the
452        /// compacted values and updates.
453        ///
454        /// The caller should be certain to update the cursor, as this method does not do this.
455        fn copy_key(&mut self, source: &OrdValStorage<L>, cursor: usize) {
456            // Capture the initial number of values to determine if the merge was ultimately non-empty.
457            let init_vals = self.result.vals.vals.len();
458            let (mut lower, upper) = source.vals.bounds(cursor);
459            while lower < upper {
460                self.stash_updates_for_val(source, lower);
461                if self.staging.seal(&mut self.result.upds) {
462                    self.result.vals.vals.push_ref(source.vals.get_abs(lower));
463                }
464                lower += 1;
465            }
466
467            // If we have pushed any values, copy the key as well.
468            if self.result.vals.vals.len() > init_vals {
469                self.result.keys.push_ref(source.keys.index(cursor));
470                self.result.vals.offs.push_ref(self.result.vals.vals.len());
471            }
472        }
473        /// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors.
474        ///
475        /// This method only merges a single key. It applies all compaction necessary, and may result in no output
476        /// if the updates cancel either directly or after compaction.
477        fn merge_key(&mut self, source1: &OrdValStorage<L>, source2: &OrdValStorage<L>) {
478            use ::std::cmp::Ordering;
479            match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
480                Ordering::Less => {
481                    self.copy_key(source1, self.key_cursor1);
482                    self.key_cursor1 += 1;
483                },
484                Ordering::Equal => {
485                    // Keys are equal; must merge all values from both sources for this one key.
486                    let (lower1, upper1) = source1.vals.bounds(self.key_cursor1);
487                    let (lower2, upper2) = source2.vals.bounds(self.key_cursor2);
488                    if let Some(off) = self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2)) {
489                        self.result.keys.push_ref(source1.keys.index(self.key_cursor1));
490                        self.result.vals.offs.push_ref(off);
491                    }
492                    // Increment cursors in either case; the keys are merged.
493                    self.key_cursor1 += 1;
494                    self.key_cursor2 += 1;
495                },
496                Ordering::Greater => {
497                    self.copy_key(source2, self.key_cursor2);
498                    self.key_cursor2 += 1;
499                },
500            }
501        }
502        /// Merge two ranges of values into `self`.
503        ///
504        /// If the compacted result contains values with non-empty updates, the function returns
505        /// an offset that should be recorded to indicate the upper extent of the result values.
506        fn merge_vals(
507            &mut self,
508            (source1, mut lower1, upper1): (&OrdValStorage<L>, usize, usize),
509            (source2, mut lower2, upper2): (&OrdValStorage<L>, usize, usize),
510        ) -> Option<usize> {
511            // Capture the initial number of values to determine if the merge was ultimately non-empty.
512            let init_vals = self.result.vals.vals.len();
513            while lower1 < upper1 && lower2 < upper2 {
514                // We compare values, and fold in updates for the lowest values;
515                // if they are non-empty post-consolidation, we write the value.
516                // We could multi-way merge and it wouldn't be very complicated.
517                use ::std::cmp::Ordering;
518                match source1.vals.get_abs(lower1).cmp(&source2.vals.get_abs(lower2)) {
519                    Ordering::Less => {
520                        // Extend stash by updates, with logical compaction applied.
521                        self.stash_updates_for_val(source1, lower1);
522                        if self.staging.seal(&mut self.result.upds) {
523                            self.result.vals.vals.push_ref(source1.vals.get_abs(lower1));
524                        }
525                        lower1 += 1;
526                    },
527                    Ordering::Equal => {
528                        self.stash_updates_for_val(source1, lower1);
529                        self.stash_updates_for_val(source2, lower2);
530                        if self.staging.seal(&mut self.result.upds) {
531                            self.result.vals.vals.push_ref(source1.vals.get_abs(lower1));
532                        }
533                        lower1 += 1;
534                        lower2 += 1;
535                    },
536                    Ordering::Greater => {
537                        // Extend stash by updates, with logical compaction applied.
538                        self.stash_updates_for_val(source2, lower2);
539                        if self.staging.seal(&mut self.result.upds) {
540                            self.result.vals.vals.push_ref(source2.vals.get_abs(lower2));
541                        }
542                        lower2 += 1;
543                    },
544                }
545            }
546            // Merging is complete, but we may have remaining elements to push.
547            while lower1 < upper1 {
548                self.stash_updates_for_val(source1, lower1);
549                if self.staging.seal(&mut self.result.upds) {
550                    self.result.vals.vals.push_ref(source1.vals.get_abs(lower1));
551                }
552                lower1 += 1;
553            }
554            while lower2 < upper2 {
555                self.stash_updates_for_val(source2, lower2);
556                if self.staging.seal(&mut self.result.upds) {
557                    self.result.vals.vals.push_ref(source2.vals.get_abs(lower2));
558                }
559                lower2 += 1;
560            }
561
562            // Values being pushed indicate non-emptiness.
563            if self.result.vals.vals.len() > init_vals {
564                Some(self.result.vals.vals.len())
565            } else {
566                None
567            }
568        }
569
570        /// Transfer updates for an indexed value in `source` into `self`, with compaction applied.
571        fn stash_updates_for_val(&mut self, source: &OrdValStorage<L>, index: usize) {
572            let (lower, upper) = source.upds.bounds(index);
573            for i in lower .. upper {
574                // NB: Here is where we would need to look back if `lower == upper`.
575                let (time, diff) = source.upds.get_abs(i);
576                use crate::lattice::Lattice;
577                let mut new_time: layout::Time<L> = L::TimeContainer::into_owned(time);
578                new_time.advance_by(self.description.since().borrow());
579                self.staging.push(new_time, L::DiffContainer::into_owned(diff));
580            }
581        }
582    }
583
584    /// A cursor for navigating a single layer.
585    pub struct OrdValCursor<L: Layout> {
586        /// Absolute position of the current key.
587        key_cursor: usize,
588        /// Absolute position of the current value.
589        val_cursor: usize,
590        /// Phantom marker for Rust happiness.
591        phantom: PhantomData<L>,
592    }
593
594    use crate::trace::implementations::WithLayout;
595    impl<L: Layout> WithLayout for OrdValCursor<L> {
596        type Layout = L;
597    }
598
599    impl<L: Layout> Cursor for OrdValCursor<L> {
600
601        type Storage = OrdValBatch<L>;
602
603        fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { storage.storage.keys.get(self.key_cursor) }
604        fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { if self.val_valid(storage) { Some(self.val(storage)) } else { None } }
605
606        fn key<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
607        fn val<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Val<'a> { storage.storage.vals.get_abs(self.val_cursor) }
608        fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &OrdValBatch<L>, mut logic: L2) {
609            let (lower, upper) = storage.storage.upds.bounds(self.val_cursor);
610            for index in lower .. upper {
611                let (time, diff) = storage.storage.upds.get_abs(index);
612                logic(time, diff);
613            }
614        }
615        fn key_valid(&self, storage: &OrdValBatch<L>) -> bool { self.key_cursor < storage.storage.keys.len() }
616        fn val_valid(&self, storage: &OrdValBatch<L>) -> bool { self.val_cursor < storage.storage.vals.bounds(self.key_cursor).1 }
617        fn step_key(&mut self, storage: &OrdValBatch<L>){
618            self.key_cursor += 1;
619            if self.key_valid(storage) {
620                self.rewind_vals(storage);
621            }
622            else {
623                self.key_cursor = storage.storage.keys.len();
624            }
625        }
626        fn seek_key(&mut self, storage: &OrdValBatch<L>, key: Self::Key<'_>) {
627            self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| <L::KeyContainer as BatchContainer>::reborrow(x).lt(&<L::KeyContainer as BatchContainer>::reborrow(key)));
628            if self.key_valid(storage) {
629                self.rewind_vals(storage);
630            }
631        }
632        fn step_val(&mut self, storage: &OrdValBatch<L>) {
633            self.val_cursor += 1;
634            if !self.val_valid(storage) {
635                self.val_cursor = storage.storage.vals.bounds(self.key_cursor).1;
636            }
637        }
638        fn seek_val(&mut self, storage: &OrdValBatch<L>, val: Self::Val<'_>) {
639            self.val_cursor += storage.storage.vals.vals.advance(self.val_cursor, storage.storage.vals.bounds(self.key_cursor).1, |x| <L::ValContainer as BatchContainer>::reborrow(x).lt(&<L::ValContainer as BatchContainer>::reborrow(val)));
640        }
641        fn rewind_keys(&mut self, storage: &OrdValBatch<L>) {
642            self.key_cursor = 0;
643            if self.key_valid(storage) {
644                self.rewind_vals(storage)
645            }
646        }
647        fn rewind_vals(&mut self, storage: &OrdValBatch<L>) {
648            self.val_cursor = storage.storage.vals.bounds(self.key_cursor).0;
649        }
650    }
651
652    /// A builder for creating layers from unsorted update tuples.
653    pub struct OrdValBuilder<L: Layout, CI> {
654        /// The in-progress result.
655        ///
656        /// This is public to allow container implementors to set and inspect their container.
657        pub result: OrdValStorage<L>,
658        staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
659        _marker: PhantomData<CI>,
660    }
661
662    impl<L, CI> Builder for OrdValBuilder<L, CI>
663    where
664        L: for<'a> Layout<
665            KeyContainer: PushInto<CI::Key<'a>>,
666            ValContainer: PushInto<CI::Val<'a>>,
667        >,
668        CI: for<'a> BuilderInput<L::KeyContainer, L::ValContainer, Time=layout::Time<L>, Diff=layout::Diff<L>>,
669    {
670
671        type Input = CI;
672        type Time = layout::Time<L>;
673        type Output = OrdValBatch<L>;
674
675        fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
676            Self {
677                result: OrdValStorage {
678                    keys: L::KeyContainer::with_capacity(keys),
679                    vals: Vals::with_capacity(keys + 1, vals),
680                    upds: Upds::with_capacity(vals + 1, upds),
681                },
682                staging: UpdsBuilder::default(),
683                _marker: PhantomData,
684            }
685        }
686
687        #[inline]
688        fn push(&mut self, chunk: &mut Self::Input) {
689            for item in chunk.drain() {
690                let (key, val, time, diff) = CI::into_parts(item);
691
692                // Pre-load the first update.
693                if self.result.keys.is_empty() {
694                    self.result.vals.vals.push_into(val);
695                    self.result.keys.push_into(key);
696                    self.staging.push(time, diff);
697                }
698                // Perhaps this is a continuation of an already received key.
699                else if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) {
700                    // Perhaps this is a continuation of an already received value.
701                    if self.result.vals.vals.last().map(|v| CI::val_eq(&val, v)).unwrap_or(false) {
702                        self.staging.push(time, diff);
703                    } else {
704                        // New value; complete representation of prior value.
705                        self.staging.seal(&mut self.result.upds);
706                        self.staging.push(time, diff);
707                        self.result.vals.vals.push_into(val);
708                    }
709                } else {
710                    // New key; complete representation of prior key.
711                    self.staging.seal(&mut self.result.upds);
712                    self.staging.push(time, diff);
713                    self.result.vals.offs.push_ref(self.result.vals.vals.len());
714                    self.result.vals.vals.push_into(val);
715                    self.result.keys.push_into(key);
716                }
717            }
718        }
719
720        #[inline(never)]
721        fn done(mut self, description: Description<Self::Time>) -> OrdValBatch<L> {
722            self.staging.seal(&mut self.result.upds);
723            self.result.vals.offs.push_ref(self.result.vals.vals.len());
724            OrdValBatch {
725                updates: self.staging.total(),
726                storage: self.result,
727                description,
728            }
729        }
730
731        fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
732            let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
733            let mut builder = Self::with_capacity(keys, vals, upds);
734            for mut chunk in chain.drain(..) {
735                builder.push(&mut chunk);
736            }
737
738            builder.done(description)
739        }
740    }
741}
742
743/// Types related to forming batches of keys.
744pub mod key_batch {
745
746    use std::marker::PhantomData;
747    use serde::{Deserialize, Serialize};
748    use timely::container::PushInto;
749    use timely::progress::{Antichain, frontier::AntichainRef};
750
751    use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
752    use crate::trace::implementations::{BatchContainer, BuilderInput};
753    use crate::trace::implementations::layout;
754
755    use super::{Layout, Upds, layers::UpdsBuilder};
756
757    /// An immutable collection of update tuples, from a contiguous interval of logical times.
758    #[derive(Debug, Serialize, Deserialize)]
759    #[serde(bound = "
760        L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
761        L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
762        L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
763        L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
764    ")]
765    pub struct OrdKeyStorage<L: Layout> {
766        /// An ordered list of keys, corresponding to entries in `keys_offs`.
767        pub keys: L::KeyContainer,
768        /// For each key in `keys`, a list of (time, diff) updates.
769        pub upds: Upds<L::OffsetContainer, L::TimeContainer, L::DiffContainer>,
770    }
771
772    /// An immutable collection of update tuples, from a contiguous interval of logical times.
773    ///
774    /// The `L` parameter captures how the updates should be laid out, and `C` determines which
775    /// merge batcher to select.
776    #[derive(Serialize, Deserialize)]
777    #[serde(bound = "
778        L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
779        L::ValContainer: Serialize + for<'a> Deserialize<'a>,
780        L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
781        L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
782        L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
783    ")]
784    pub struct OrdKeyBatch<L: Layout> {
785        /// The updates themselves.
786        pub storage: OrdKeyStorage<L>,
787        /// Description of the update times this layer represents.
788        pub description: Description<layout::Time<L>>,
789        /// The number of updates reflected in the batch.
790        ///
791        /// We track this separately from `storage` because due to the singleton optimization,
792        /// we may have many more updates than `storage.updates.len()`. It should equal that
793        /// length, plus the number of singleton optimizations employed.
794        pub updates: usize,
795
796        /// Single value to return if asked.
797        pub value: L::ValContainer,
798    }
799
800    impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> OrdKeyBatch<L> {
801        /// Creates a container with one value, to slot in to `self.value`.
802        pub fn create_value() -> L::ValContainer {
803            let mut value = L::ValContainer::with_capacity(1);
804            value.push_own(&Default::default());
805            value
806        }
807    }
808
809    impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> WithLayout for OrdKeyBatch<L> {
810        type Layout = L;
811    }
812
813    impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> BatchReader for OrdKeyBatch<L> {
814
815        type Cursor = OrdKeyCursor<L>;
816        fn cursor(&self) -> Self::Cursor {
817            OrdKeyCursor {
818                key_cursor: 0,
819                val_stepped: false,
820                phantom: std::marker::PhantomData,
821            }
822        }
823        fn len(&self) -> usize {
824            // Normally this would be `self.updates.len()`, but we have a clever compact encoding.
825            // Perhaps we should count such exceptions to the side, to provide a correct accounting.
826            self.updates
827        }
828        fn description(&self) -> &Description<layout::Time<L>> { &self.description }
829    }
830
831    impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> Batch for OrdKeyBatch<L> {
832        type Merger = OrdKeyMerger<L>;
833
834        fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self::Merger {
835            OrdKeyMerger::new(self, other, compaction_frontier)
836        }
837
838        fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
839            use timely::progress::Timestamp;
840            Self {
841                storage: OrdKeyStorage {
842                    keys: L::KeyContainer::with_capacity(0),
843                    upds: Upds::default(),
844                },
845                description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())),
846                updates: 0,
847                value: Self::create_value(),
848            }
849        }
850    }
851
852    /// State for an in-progress merge.
853    pub struct OrdKeyMerger<L: Layout> {
854        /// Key position to merge next in the first batch.
855        key_cursor1: usize,
856        /// Key position to merge next in the second batch.
857        key_cursor2: usize,
858        /// result that we are currently assembling.
859        result: OrdKeyStorage<L>,
860        /// description
861        description: Description<layout::Time<L>>,
862
863        /// Local stash of updates, to use for consolidation.
864        staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
865    }
866
867    impl<L: Layout<ValContainer: BatchContainer<Owned: Default>>> Merger<OrdKeyBatch<L>> for OrdKeyMerger<L>
868    where
869        OrdKeyBatch<L>: Batch<Time=layout::Time<L>>,
870    {
871        fn new(batch1: &OrdKeyBatch<L>, batch2: &OrdKeyBatch<L>, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self {
872
873            assert!(batch1.upper() == batch2.lower());
874            use crate::lattice::Lattice;
875            let mut since = batch1.description().since().join(batch2.description().since());
876            since = since.join(&compaction_frontier.to_owned());
877
878            let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
879
880            let batch1 = &batch1.storage;
881            let batch2 = &batch2.storage;
882
883            OrdKeyMerger {
884                key_cursor1: 0,
885                key_cursor2: 0,
886                result: OrdKeyStorage {
887                    keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
888                    upds: Upds::merge_capacity(&batch1.upds, &batch2.upds),
889                },
890                description,
891                staging: UpdsBuilder::default(),
892            }
893        }
894        fn done(self) -> OrdKeyBatch<L> {
895            OrdKeyBatch {
896                updates: self.staging.total(),
897                storage: self.result,
898                description: self.description,
899                value: OrdKeyBatch::<L>::create_value(),
900            }
901        }
902        fn work(&mut self, source1: &OrdKeyBatch<L>, source2: &OrdKeyBatch<L>, fuel: &mut isize) {
903
904            // An (incomplete) indication of the amount of work we've done so far.
905            let starting_updates = self.staging.total();
906            let mut effort = 0isize;
907
908            // While both mergees are still active, perform single-key merges.
909            while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
910                self.merge_key(&source1.storage, &source2.storage);
911                // An (incomplete) accounting of the work we've done.
912                effort = (self.staging.total() - starting_updates) as isize;
913            }
914
915            // Merging is complete, and only copying remains.
916            // Key-by-key copying allows effort interruption, and compaction.
917            while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
918                self.copy_key(&source1.storage, self.key_cursor1);
919                self.key_cursor1 += 1;
920                effort = (self.staging.total() - starting_updates) as isize;
921            }
922            while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
923                self.copy_key(&source2.storage, self.key_cursor2);
924                self.key_cursor2 += 1;
925                effort = (self.staging.total() - starting_updates) as isize;
926            }
927
928            *fuel -= effort;
929        }
930    }
931
932    // Helper methods in support of merging batches.
933    impl<L: Layout> OrdKeyMerger<L> {
934        /// Copy the next key in `source`.
935        ///
936        /// The method extracts the key in `source` at `cursor`, and merges it in to `self`.
937        /// If the result does not wholly cancel, they key will be present in `self` with the
938        /// compacted values and updates.
939        ///
940        /// The caller should be certain to update the cursor, as this method does not do this.
941        fn copy_key(&mut self, source: &OrdKeyStorage<L>, cursor: usize) {
942            self.stash_updates_for_key(source, cursor);
943            if self.staging.seal(&mut self.result.upds) {
944                self.result.keys.push_ref(source.keys.index(cursor));
945            }
946        }
947        /// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors.
948        ///
949        /// This method only merges a single key. It applies all compaction necessary, and may result in no output
950        /// if the updates cancel either directly or after compaction.
951        fn merge_key(&mut self, source1: &OrdKeyStorage<L>, source2: &OrdKeyStorage<L>) {
952            use ::std::cmp::Ordering;
953            match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
954                Ordering::Less => {
955                    self.copy_key(source1, self.key_cursor1);
956                    self.key_cursor1 += 1;
957                },
958                Ordering::Equal => {
959                    // Keys are equal; must merge all updates from both sources for this one key.
960                    self.stash_updates_for_key(source1, self.key_cursor1);
961                    self.stash_updates_for_key(source2, self.key_cursor2);
962                    if self.staging.seal(&mut self.result.upds) {
963                        self.result.keys.push_ref(source1.keys.index(self.key_cursor1));
964                    }
965                    // Increment cursors in either case; the keys are merged.
966                    self.key_cursor1 += 1;
967                    self.key_cursor2 += 1;
968                },
969                Ordering::Greater => {
970                    self.copy_key(source2, self.key_cursor2);
971                    self.key_cursor2 += 1;
972                },
973            }
974        }
975
976        /// Transfer updates for an indexed value in `source` into `self`, with compaction applied.
977        fn stash_updates_for_key(&mut self, source: &OrdKeyStorage<L>, index: usize) {
978            let (lower, upper) = source.upds.bounds(index);
979            for i in lower .. upper {
980                // NB: Here is where we would need to look back if `lower == upper`.
981                let (time, diff) = source.upds.get_abs(i);
982                use crate::lattice::Lattice;
983                let mut new_time = L::TimeContainer::into_owned(time);
984                new_time.advance_by(self.description.since().borrow());
985                self.staging.push(new_time, L::DiffContainer::into_owned(diff));
986            }
987        }
988    }
989
990    /// A cursor for navigating a single layer.
991    pub struct OrdKeyCursor<L: Layout> {
992        /// Absolute position of the current key.
993        key_cursor: usize,
994        /// If the value has been stepped for the key, there are no more values.
995        val_stepped: bool,
996        /// Phantom marker for Rust happiness.
997        phantom: PhantomData<L>,
998    }
999
1000    use crate::trace::implementations::WithLayout;
1001    impl<L: Layout<ValContainer: BatchContainer>> WithLayout for OrdKeyCursor<L> {
1002        type Layout = L;
1003    }
1004
1005    impl<L: for<'a> Layout<ValContainer: BatchContainer<Owned: Default>>> Cursor for OrdKeyCursor<L> {
1006
1007        type Storage = OrdKeyBatch<L>;
1008
1009        fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { storage.storage.keys.get(self.key_cursor) }
1010        fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { if self.val_valid(storage) { Some(self.val(storage)) } else { None } }
1011
1012        fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
1013        fn val<'a>(&self, storage: &'a Self::Storage) -> Self::Val<'a> { storage.value.index(0) }
1014        fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L2) {
1015            let (lower, upper) = storage.storage.upds.bounds(self.key_cursor);
1016            for index in lower .. upper {
1017                let (time, diff) = storage.storage.upds.get_abs(index);
1018                logic(time, diff);
1019            }
1020        }
1021        fn key_valid(&self, storage: &Self::Storage) -> bool { self.key_cursor < storage.storage.keys.len() }
1022        fn val_valid(&self, _storage: &Self::Storage) -> bool { !self.val_stepped }
1023        fn step_key(&mut self, storage: &Self::Storage){
1024            self.key_cursor += 1;
1025            if self.key_valid(storage) {
1026                self.rewind_vals(storage);
1027            }
1028            else {
1029                self.key_cursor = storage.storage.keys.len();
1030            }
1031        }
1032        fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) {
1033            self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| <L::KeyContainer as BatchContainer>::reborrow(x).lt(&<L::KeyContainer as BatchContainer>::reborrow(key)));
1034            if self.key_valid(storage) {
1035                self.rewind_vals(storage);
1036            }
1037        }
1038        fn step_val(&mut self, _storage: &Self::Storage) {
1039            self.val_stepped = true;
1040        }
1041        fn seek_val(&mut self, _storage: &Self::Storage, _val: Self::Val<'_>) { }
1042        fn rewind_keys(&mut self, storage: &Self::Storage) {
1043            self.key_cursor = 0;
1044            if self.key_valid(storage) {
1045                self.rewind_vals(storage)
1046            }
1047        }
1048        fn rewind_vals(&mut self, _storage: &Self::Storage) {
1049            self.val_stepped = false;
1050        }
1051    }
1052
1053    /// A builder for creating layers from unsorted update tuples.
1054    pub struct OrdKeyBuilder<L: Layout, CI> {
1055        /// The in-progress result.
1056        ///
1057        /// This is public to allow container implementors to set and inspect their container.
1058        pub result: OrdKeyStorage<L>,
1059        staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
1060        _marker: PhantomData<CI>,
1061    }
1062
1063    impl<L: Layout, CI> Builder for OrdKeyBuilder<L, CI>
1064    where
1065        L: for<'a> Layout<KeyContainer: PushInto<CI::Key<'a>>>,
1066        L: Layout<ValContainer: BatchContainer<Owned: Default>>,
1067        CI: BuilderInput<L::KeyContainer, L::ValContainer, Time=layout::Time<L>, Diff=layout::Diff<L>>,
1068    {
1069
1070        type Input = CI;
1071        type Time = layout::Time<L>;
1072        type Output = OrdKeyBatch<L>;
1073
1074        fn with_capacity(keys: usize, _vals: usize, upds: usize) -> Self {
1075            Self {
1076                result: OrdKeyStorage {
1077                    keys: L::KeyContainer::with_capacity(keys),
1078                    upds: Upds::with_capacity(keys+1, upds),
1079                },
1080                staging: UpdsBuilder::default(),
1081                _marker: PhantomData,
1082            }
1083        }
1084
1085        #[inline]
1086        fn push(&mut self, chunk: &mut Self::Input) {
1087            for item in chunk.drain() {
1088                let (key, _val, time, diff) = CI::into_parts(item);
1089                if self.result.keys.is_empty() {
1090                    self.result.keys.push_into(key);
1091                    self.staging.push(time, diff);
1092                }
1093                // Perhaps this is a continuation of an already received key.
1094                else if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) {
1095                    self.staging.push(time, diff);
1096                } else {
1097                    self.staging.seal(&mut self.result.upds);
1098                    self.staging.push(time, diff);
1099                    self.result.keys.push_into(key);
1100                }
1101            }
1102        }
1103
1104        #[inline(never)]
1105        fn done(mut self, description: Description<Self::Time>) -> OrdKeyBatch<L> {
1106            self.staging.seal(&mut self.result.upds);
1107            OrdKeyBatch {
1108                updates: self.staging.total(),
1109                storage: self.result,
1110                description,
1111                value: OrdKeyBatch::<L>::create_value(),
1112            }
1113        }
1114
1115        fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
1116            let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
1117            let mut builder = Self::with_capacity(keys, vals, upds);
1118            for mut chunk in chain.drain(..) {
1119                builder.push(&mut chunk);
1120            }
1121
1122            builder.done(description)
1123        }
1124    }
1125
1126}