differential_dataflow/trace/implementations/
ord_neu.rs

1//! Trace and batch implementations based on sorted ranges.
2//!
3//! The types and type aliases in this module start with either
4//!
5//! * `OrdVal`: Collections whose data have the form `(key, val)` where `key` is ordered.
6//! * `OrdKey`: Collections whose data have the form `key` where `key` is ordered.
7//!
8//! Although `OrdVal` is more general than `OrdKey`, the latter has a simpler representation
9//! and should consume fewer resources (computation and memory) when it applies.
10
11use std::rc::Rc;
12
13use crate::containers::TimelyStack;
14use crate::trace::implementations::chunker::{ColumnationChunker, VecChunker};
15use crate::trace::implementations::spine_fueled::Spine;
16use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger, ColMerger};
17use crate::trace::rc_blanket_impls::RcBuilder;
18
19use super::{Layout, Vector, TStack};
20
21pub use self::val_batch::{OrdValBatch, OrdValBuilder};
22pub use self::key_batch::{OrdKeyBatch, OrdKeyBuilder};
23
24/// A trace implementation using a spine of ordered lists.
25pub type OrdValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<Vector<((K,V),T,R)>>>>;
26/// A batcher using ordered lists.
27pub type OrdValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, VecChunker<((K,V),T,R)>, VecMerger<(K, V), T, R>>;
28/// A builder using ordered lists.
29pub type RcOrdValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<Vector<((K,V),T,R)>, Vec<((K,V),T,R)>>>;
30
31// /// A trace implementation for empty values using a spine of ordered lists.
32// pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
33
34/// A trace implementation backed by columnar storage.
35pub type ColValSpine<K, V, T, R> = Spine<Rc<OrdValBatch<TStack<((K,V),T,R)>>>>;
36/// A batcher for columnar storage.
37pub type ColValBatcher<K, V, T, R> = MergeBatcher<Vec<((K,V),T,R)>, ColumnationChunker<((K,V),T,R)>, ColMerger<(K,V),T,R>>;
38/// A builder for columnar storage.
39pub type ColValBuilder<K, V, T, R> = RcBuilder<OrdValBuilder<TStack<((K,V),T,R)>, TimelyStack<((K,V),T,R)>>>;
40
41/// A trace implementation using a spine of ordered lists.
42pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
43/// A batcher for ordered lists.
44pub type OrdKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, VecChunker<((K,()),T,R)>, VecMerger<(K, ()), T, R>>;
45/// A builder for ordered lists.
46pub type RcOrdKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<Vector<((K,()),T,R)>, Vec<((K,()),T,R)>>>;
47
48// /// A trace implementation for empty values using a spine of ordered lists.
49// pub type OrdKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<Vector<((K,()),T,R)>>>>;
50
51/// A trace implementation backed by columnar storage.
52pub type ColKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<TStack<((K,()),T,R)>>>>;
53/// A batcher for columnar storage
54pub type ColKeyBatcher<K, T, R> = MergeBatcher<Vec<((K,()),T,R)>, ColumnationChunker<((K,()),T,R)>, ColMerger<(K,()),T,R>>;
55/// A builder for columnar storage
56pub type ColKeyBuilder<K, T, R> = RcBuilder<OrdKeyBuilder<TStack<((K,()),T,R)>, TimelyStack<((K,()),T,R)>>>;
57
58// /// A trace implementation backed by columnar storage.
59// pub type ColKeySpine<K, T, R> = Spine<Rc<OrdKeyBatch<TStack<((K,()),T,R)>>>>;
60
61pub use layers::{Vals, Upds};
62/// Layers are containers of lists of some type.
63///
64/// The intent is that they "attach" to an outer layer which has as many values
65/// as the layer has lists, thereby associating a list with each outer value.
66/// A sequence of layers, each matching the number of values in its predecessor,
67/// forms a layered trie: a tree with values of some type on nodes at each depth.
68///
69/// We will form tries here by layering `[Keys, Vals, Upds]` or `[Keys, Upds]`.
70pub mod layers {
71
72    use serde::{Deserialize, Serialize};
73    use crate::trace::implementations::BatchContainer;
74
75    /// A container for non-empty lists of values.
76    #[derive(Debug, Serialize, Deserialize)]
77    pub struct Vals<O, V> {
78        /// Offsets used to provide indexes from keys to values.
79        ///
80        /// The length of this list is one longer than `keys`, so that we can avoid bounds logic.
81        pub offs: O,
82        /// Concatenated ordered lists of values, bracketed by offsets in `offs`.
83        pub vals: V,
84    }
85
86    impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, V: BatchContainer> Default for Vals<O, V> {
87        fn default() -> Self { Self::with_capacity(0, 0) }
88    }
89
90    impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, V: BatchContainer> Vals<O, V> {
91        /// Lower and upper bounds in `self.vals` of the indexed list.
92        pub fn bounds(&self, index: usize) -> (usize, usize) {
93            (self.offs.index(index), self.offs.index(index+1))
94        }
95        /// Retrieves a value using relative indexes.
96        ///
97        /// The first index identifies a list, and the second an item within the list.
98        /// The method adds the list's lower bound to the item index, and then calls
99        /// `get_abs`. Using absolute indexes within the list's bounds can be more
100        /// efficient than using relative indexing.
101        pub fn get_rel(&self, list_idx: usize, item_idx: usize) -> V::ReadItem<'_> {
102            self.get_abs(self.bounds(list_idx).0 + item_idx)
103        }
104
105        /// Number of lists in the container.
106        pub fn len(&self) -> usize { self.offs.len() - 1 }
107        /// Retrieves a value using an absolute rather than relative index.
108        pub fn get_abs(&self, index: usize) -> V::ReadItem<'_> {
109            self.vals.index(index)
110        }
111        /// Allocates with capacities for a number of lists and values.
112        pub fn with_capacity(o_size: usize, v_size: usize) -> Self {
113            let mut offs = <O as BatchContainer>::with_capacity(o_size);
114            offs.push_ref(0);
115            Self {
116                offs,
117                vals: <V as BatchContainer>::with_capacity(v_size),
118            }
119        }
120        /// Allocates with enough capacity to contain two inputs.
121        pub fn merge_capacity(this: &Self, that: &Self) -> Self {
122            let mut offs = <O as BatchContainer>::with_capacity(this.offs.len() + that.offs.len());
123            offs.push_ref(0);
124            Self {
125                offs,
126                vals: <V as BatchContainer>::merge_capacity(&this.vals, &that.vals),
127            }
128        }
129    }
130
131    /// A container for non-empty lists of updates.
132    ///
133    /// This container uses the special representiation of an empty slice to stand in for
134    /// "the previous single element". An empty slice is an otherwise invalid representation.
135    #[derive(Debug, Serialize, Deserialize)]
136    pub struct Upds<O, T, D> {
137        /// Offsets used to provide indexes from values to updates.
138        pub offs: O,
139        /// Concatenated ordered lists of update times, bracketed by offsets in `offs`.
140        pub times: T,
141        /// Concatenated ordered lists of update diffs, bracketed by offsets in `offs`.
142        pub diffs: D,
143    }
144
145    impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, T: BatchContainer, D: BatchContainer> Default for Upds<O, T, D> {
146        fn default() -> Self { Self::with_capacity(0, 0) }
147    }
148    impl<O: for<'a> BatchContainer<ReadItem<'a> = usize>, T: BatchContainer, D: BatchContainer> Upds<O, T, D> {
149        /// Lower and upper bounds in `self.times` and `self.diffs` of the indexed list.
150        pub fn bounds(&self, index: usize) -> (usize, usize) {
151            let mut lower = self.offs.index(index);
152            let upper = self.offs.index(index+1);
153            // We use equal lower and upper to encode "singleton update; just before here".
154            // It should only apply when there is a prior element, so `lower` should be greater than zero.
155            if lower == upper {
156                assert!(lower > 0);
157                lower -= 1;
158            }
159            (lower, upper)
160        }
161        /// Retrieves a value using relative indexes.
162        ///
163        /// The first index identifies a list, and the second an item within the list.
164        /// The method adds the list's lower bound to the item index, and then calls
165        /// `get_abs`. Using absolute indexes within the list's bounds can be more
166        /// efficient than using relative indexing.
167        pub fn get_rel(&self, list_idx: usize, item_idx: usize) -> (T::ReadItem<'_>, D::ReadItem<'_>) {
168            self.get_abs(self.bounds(list_idx).0 + item_idx)
169        }
170
171        /// Number of lists in the container.
172        pub fn len(&self) -> usize { self.offs.len() - 1 }
173        /// Retrieves a value using an absolute rather than relative index.
174        pub fn get_abs(&self, index: usize) -> (T::ReadItem<'_>, D::ReadItem<'_>) {
175            (self.times.index(index), self.diffs.index(index))
176        }
177        /// Allocates with capacities for a number of lists and values.
178        pub fn with_capacity(o_size: usize, u_size: usize) -> Self {
179            let mut offs = <O as BatchContainer>::with_capacity(o_size);
180            offs.push_ref(0);
181            Self {
182                offs,
183                times: <T as BatchContainer>::with_capacity(u_size),
184                diffs: <D as BatchContainer>::with_capacity(u_size),
185            }
186        }
187        /// Allocates with enough capacity to contain two inputs.
188        pub fn merge_capacity(this: &Self, that: &Self) -> Self {
189            let mut offs = <O as BatchContainer>::with_capacity(this.offs.len() + that.offs.len());
190            offs.push_ref(0);
191            Self {
192                offs,
193                times: <T as BatchContainer>::merge_capacity(&this.times, &that.times),
194                diffs: <D as BatchContainer>::merge_capacity(&this.diffs, &that.diffs),
195            }
196        }
197    }
198
199    /// Helper type for constructing `Upds` containers.
200    pub struct UpdsBuilder<T: BatchContainer, D: BatchContainer> {
201        /// Local stash of updates, to use for consolidation.
202        ///
203        /// We could emulate a `ChangeBatch` here, with related compaction smarts.
204        /// A `ChangeBatch` itself needs an `i64` diff type, which we have not.
205        stash: Vec<(T::Owned, D::Owned)>,
206        /// Total number of consolidated updates.
207        ///
208        /// Tracked independently to account for duplicate compression.
209        total: usize,
210
211        /// Time container to stage singleton times for evaluation.
212        time_con: T,
213        /// Diff container to stage singleton times for evaluation.
214        diff_con: D,
215    }
216
217    impl<T: BatchContainer, D: BatchContainer> Default for UpdsBuilder<T, D> {
218        fn default() -> Self { Self { stash: Vec::default(), total: 0, time_con: BatchContainer::with_capacity(1), diff_con: BatchContainer::with_capacity(1) } }
219    }
220
221
222    impl<T, D> UpdsBuilder<T, D>
223    where
224        T: BatchContainer<Owned: Ord>,
225        D: BatchContainer<Owned: crate::difference::Semigroup>,
226    {
227        /// Stages one update, but does not seal the set of updates.
228        pub fn push(&mut self, time: T::Owned, diff: D::Owned) {
229            self.stash.push((time, diff));
230        }
231
232        /// Consolidate and insert (if non-empty) the stashed updates.
233        ///
234        /// The return indicates whether the results were indeed non-empty.
235        pub fn seal<O: for<'a> BatchContainer<ReadItem<'a> = usize>>(&mut self, upds: &mut Upds<O, T, D>) -> bool {
236            use crate::consolidation;
237            consolidation::consolidate(&mut self.stash);
238            // If everything consolidates away, return false.
239            if self.stash.is_empty() { return false; }
240            // If there is a singleton, we may be able to optimize.
241            if self.stash.len() == 1 {
242                let (time, diff) = self.stash.last().unwrap();
243                self.time_con.clear(); self.time_con.push_own(time);
244                self.diff_con.clear(); self.diff_con.push_own(diff);
245                if upds.times.last() == self.time_con.get(0) && upds.diffs.last() == self.diff_con.get(0) {
246                    self.total += 1;
247                    self.stash.clear();
248                    upds.offs.push_ref(upds.times.len());
249                    return true;
250                }
251            }
252            // Conventional; move `stash` into `updates`.
253            self.total += self.stash.len();
254            for (time, diff) in self.stash.drain(..) {
255                upds.times.push_own(&time);
256                upds.diffs.push_own(&diff);
257            }
258            upds.offs.push_ref(upds.times.len());
259            true
260        }
261
262        /// Completes the building and returns the total updates sealed.
263        pub fn total(&self) -> usize { self.total }
264    }
265}
266
267/// Types related to forming batches with values.
268pub mod val_batch {
269
270    use std::marker::PhantomData;
271    use serde::{Deserialize, Serialize};
272    use timely::container::PushInto;
273    use timely::progress::{Antichain, frontier::AntichainRef};
274
275    use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
276    use crate::trace::implementations::{BatchContainer, BuilderInput};
277    use crate::trace::implementations::layout;
278
279    use super::{Layout, Vals, Upds, layers::UpdsBuilder};
280
281    /// An immutable collection of update tuples, from a contiguous interval of logical times.
282    #[derive(Debug, Serialize, Deserialize)]
283    #[serde(bound = "
284        L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
285        L::ValContainer: Serialize + for<'a> Deserialize<'a>,
286        L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
287        L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
288        L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
289    ")]
290    pub struct OrdValStorage<L: Layout> {
291        /// An ordered list of keys.
292        pub keys: L::KeyContainer,
293        /// For each key in `keys`, a list of values.
294        pub vals: Vals<L::OffsetContainer, L::ValContainer>,
295        /// For each val in `vals`, a list of (time, diff) updates.
296        pub upds: Upds<L::OffsetContainer, L::TimeContainer, L::DiffContainer>,
297    }
298
299    /// An immutable collection of update tuples, from a contiguous interval of logical times.
300    ///
301    /// The `L` parameter captures how the updates should be laid out, and `C` determines which
302    /// merge batcher to select.
303    #[derive(Serialize, Deserialize)]
304    #[serde(bound = "
305        L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
306        L::ValContainer: Serialize + for<'a> Deserialize<'a>,
307        L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
308        L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
309        L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
310    ")]
311    pub struct OrdValBatch<L: Layout> {
312        /// The updates themselves.
313        pub storage: OrdValStorage<L>,
314        /// Description of the update times this layer represents.
315        pub description: Description<layout::Time<L>>,
316        /// The number of updates reflected in the batch.
317        ///
318        /// We track this separately from `storage` because due to the singleton optimization,
319        /// we may have many more updates than `storage.updates.len()`. It should equal that
320        /// length, plus the number of singleton optimizations employed.
321        pub updates: usize,
322    }
323
324    impl<L: Layout> WithLayout for OrdValBatch<L> {
325        type Layout = L;
326    }
327
328    impl<L: Layout> BatchReader for OrdValBatch<L> {
329
330        type Cursor = OrdValCursor<L>;
331        fn cursor(&self) -> Self::Cursor {
332            OrdValCursor {
333                key_cursor: 0,
334                val_cursor: 0,
335                phantom: PhantomData,
336            }
337        }
338        fn len(&self) -> usize {
339            // Normally this would be `self.updates.len()`, but we have a clever compact encoding.
340            // Perhaps we should count such exceptions to the side, to provide a correct accounting.
341            self.updates
342        }
343        fn description(&self) -> &Description<layout::Time<L>> { &self.description }
344    }
345
346    impl<L: Layout> Batch for OrdValBatch<L> {
347        type Merger = OrdValMerger<L>;
348
349        fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self::Merger {
350            OrdValMerger::new(self, other, compaction_frontier)
351        }
352
353        fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
354            use timely::progress::Timestamp;
355            Self {
356                storage: OrdValStorage {
357                    keys: L::KeyContainer::with_capacity(0),
358                    vals: Default::default(),
359                    upds: Default::default(),
360                },
361                description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())),
362                updates: 0,
363            }
364        }
365    }
366
367    /// State for an in-progress merge.
368    pub struct OrdValMerger<L: Layout> {
369        /// Key position to merge next in the first batch.
370        key_cursor1: usize,
371        /// Key position to merge next in the second batch.
372        key_cursor2: usize,
373        /// result that we are currently assembling.
374        result: OrdValStorage<L>,
375        /// description
376        description: Description<layout::Time<L>>,
377        /// Staging area to consolidate owned times and diffs, before sealing.
378        staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
379    }
380
381    impl<L: Layout> Merger<OrdValBatch<L>> for OrdValMerger<L>
382    where
383        OrdValBatch<L>: Batch<Time=layout::Time<L>>,
384    {
385        fn new(batch1: &OrdValBatch<L>, batch2: &OrdValBatch<L>, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self {
386
387            assert!(batch1.upper() == batch2.lower());
388            use crate::lattice::Lattice;
389            let mut since = batch1.description().since().join(batch2.description().since());
390            since = since.join(&compaction_frontier.to_owned());
391
392            let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
393
394            let batch1 = &batch1.storage;
395            let batch2 = &batch2.storage;
396
397            OrdValMerger {
398                key_cursor1: 0,
399                key_cursor2: 0,
400                result: OrdValStorage {
401                    keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
402                    vals: Vals::merge_capacity(&batch1.vals, &batch2.vals),
403                    upds: Upds::merge_capacity(&batch1.upds, &batch2.upds),
404                },
405                description,
406                staging: UpdsBuilder::default(),
407            }
408        }
409        fn done(self) -> OrdValBatch<L> {
410            OrdValBatch {
411                updates: self.staging.total(),
412                storage: self.result,
413                description: self.description,
414            }
415        }
416        fn work(&mut self, source1: &OrdValBatch<L>, source2: &OrdValBatch<L>, fuel: &mut isize) {
417
418            // An (incomplete) indication of the amount of work we've done so far.
419            let starting_updates = self.staging.total();
420            let mut effort = 0isize;
421
422            // While both mergees are still active, perform single-key merges.
423            while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
424                self.merge_key(&source1.storage, &source2.storage);
425                // An (incomplete) accounting of the work we've done.
426                effort = (self.staging.total() - starting_updates) as isize;
427            }
428
429            // Merging is complete, and only copying remains.
430            // Key-by-key copying allows effort interruption, and compaction.
431            while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
432                self.copy_key(&source1.storage, self.key_cursor1);
433                self.key_cursor1 += 1;
434                effort = (self.staging.total() - starting_updates) as isize;
435            }
436            while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
437                self.copy_key(&source2.storage, self.key_cursor2);
438                self.key_cursor2 += 1;
439                effort = (self.staging.total() - starting_updates) as isize;
440            }
441
442            *fuel -= effort;
443        }
444    }
445
446    // Helper methods in support of merging batches.
447    impl<L: Layout> OrdValMerger<L> {
448        /// Copy the next key in `source`.
449        ///
450        /// The method extracts the key in `source` at `cursor`, and merges it in to `self`.
451        /// If the result does not wholly cancel, they key will be present in `self` with the
452        /// compacted values and updates.
453        ///
454        /// The caller should be certain to update the cursor, as this method does not do this.
455        fn copy_key(&mut self, source: &OrdValStorage<L>, cursor: usize) {
456            // Capture the initial number of values to determine if the merge was ultimately non-empty.
457            let init_vals = self.result.vals.vals.len();
458            let (mut lower, upper) = source.vals.bounds(cursor);
459            while lower < upper {
460                self.stash_updates_for_val(source, lower);
461                if self.staging.seal(&mut self.result.upds) {
462                    self.result.vals.vals.push_ref(source.vals.get_abs(lower));
463                }
464                lower += 1;
465            }
466
467            // If we have pushed any values, copy the key as well.
468            if self.result.vals.vals.len() > init_vals {
469                self.result.keys.push_ref(source.keys.index(cursor));
470                self.result.vals.offs.push_ref(self.result.vals.vals.len());
471            }
472        }
473        /// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors.
474        ///
475        /// This method only merges a single key. It applies all compaction necessary, and may result in no output
476        /// if the updates cancel either directly or after compaction.
477        fn merge_key(&mut self, source1: &OrdValStorage<L>, source2: &OrdValStorage<L>) {
478            use ::std::cmp::Ordering;
479            match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
480                Ordering::Less => {
481                    self.copy_key(source1, self.key_cursor1);
482                    self.key_cursor1 += 1;
483                },
484                Ordering::Equal => {
485                    // Keys are equal; must merge all values from both sources for this one key.
486                    let (lower1, upper1) = source1.vals.bounds(self.key_cursor1);
487                    let (lower2, upper2) = source2.vals.bounds(self.key_cursor2);
488                    if let Some(off) = self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2)) {
489                        self.result.keys.push_ref(source1.keys.index(self.key_cursor1));
490                        self.result.vals.offs.push_ref(off);
491                    }
492                    // Increment cursors in either case; the keys are merged.
493                    self.key_cursor1 += 1;
494                    self.key_cursor2 += 1;
495                },
496                Ordering::Greater => {
497                    self.copy_key(source2, self.key_cursor2);
498                    self.key_cursor2 += 1;
499                },
500            }
501        }
502        /// Merge two ranges of values into `self`.
503        ///
504        /// If the compacted result contains values with non-empty updates, the function returns
505        /// an offset that should be recorded to indicate the upper extent of the result values.
506        fn merge_vals(
507            &mut self,
508            (source1, mut lower1, upper1): (&OrdValStorage<L>, usize, usize),
509            (source2, mut lower2, upper2): (&OrdValStorage<L>, usize, usize),
510        ) -> Option<usize> {
511            // Capture the initial number of values to determine if the merge was ultimately non-empty.
512            let init_vals = self.result.vals.vals.len();
513            while lower1 < upper1 && lower2 < upper2 {
514                // We compare values, and fold in updates for the lowest values;
515                // if they are non-empty post-consolidation, we write the value.
516                // We could multi-way merge and it wouldn't be very complicated.
517                use ::std::cmp::Ordering;
518                match source1.vals.get_abs(lower1).cmp(&source2.vals.get_abs(lower2)) {
519                    Ordering::Less => {
520                        // Extend stash by updates, with logical compaction applied.
521                        self.stash_updates_for_val(source1, lower1);
522                        if self.staging.seal(&mut self.result.upds) {
523                            self.result.vals.vals.push_ref(source1.vals.get_abs(lower1));
524                        }
525                        lower1 += 1;
526                    },
527                    Ordering::Equal => {
528                        self.stash_updates_for_val(source1, lower1);
529                        self.stash_updates_for_val(source2, lower2);
530                        if self.staging.seal(&mut self.result.upds) {
531                            self.result.vals.vals.push_ref(source1.vals.get_abs(lower1));
532                        }
533                        lower1 += 1;
534                        lower2 += 1;
535                    },
536                    Ordering::Greater => {
537                        // Extend stash by updates, with logical compaction applied.
538                        self.stash_updates_for_val(source2, lower2);
539                        if self.staging.seal(&mut self.result.upds) {
540                            self.result.vals.vals.push_ref(source2.vals.get_abs(lower2));
541                        }
542                        lower2 += 1;
543                    },
544                }
545            }
546            // Merging is complete, but we may have remaining elements to push.
547            while lower1 < upper1 {
548                self.stash_updates_for_val(source1, lower1);
549                if self.staging.seal(&mut self.result.upds) {
550                    self.result.vals.vals.push_ref(source1.vals.get_abs(lower1));
551                }
552                lower1 += 1;
553            }
554            while lower2 < upper2 {
555                self.stash_updates_for_val(source2, lower2);
556                if self.staging.seal(&mut self.result.upds) {
557                    self.result.vals.vals.push_ref(source2.vals.get_abs(lower2));
558                }
559                lower2 += 1;
560            }
561
562            // Values being pushed indicate non-emptiness.
563            if self.result.vals.vals.len() > init_vals {
564                Some(self.result.vals.vals.len())
565            } else {
566                None
567            }
568        }
569
570        /// Transfer updates for an indexed value in `source` into `self`, with compaction applied.
571        fn stash_updates_for_val(&mut self, source: &OrdValStorage<L>, index: usize) {
572            let (lower, upper) = source.upds.bounds(index);
573            for i in lower .. upper {
574                // NB: Here is where we would need to look back if `lower == upper`.
575                let (time, diff) = source.upds.get_abs(i);
576                use crate::lattice::Lattice;
577                let mut new_time: layout::Time<L> = L::TimeContainer::into_owned(time);
578                new_time.advance_by(self.description.since().borrow());
579                self.staging.push(new_time, L::DiffContainer::into_owned(diff));
580            }
581        }
582    }
583
584    /// A cursor for navigating a single layer.
585    pub struct OrdValCursor<L: Layout> {
586        /// Absolute position of the current key.
587        key_cursor: usize,
588        /// Absolute position of the current value.
589        val_cursor: usize,
590        /// Phantom marker for Rust happiness.
591        phantom: PhantomData<L>,
592    }
593
594    use crate::trace::implementations::WithLayout;
595    impl<L: Layout> WithLayout for OrdValCursor<L> {
596        type Layout = L;
597    }
598
599    impl<L: Layout> Cursor for OrdValCursor<L> {
600
601        type Storage = OrdValBatch<L>;
602
603        fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { storage.storage.keys.get(self.key_cursor) }
604        fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Val<'a>> { if self.val_valid(storage) { Some(self.val(storage)) } else { None } }
605
606        fn key<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
607        fn val<'a>(&self, storage: &'a OrdValBatch<L>) -> Self::Val<'a> { storage.storage.vals.get_abs(self.val_cursor) }
608        fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &OrdValBatch<L>, mut logic: L2) {
609            let (lower, upper) = storage.storage.upds.bounds(self.val_cursor);
610            for index in lower .. upper {
611                let (time, diff) = storage.storage.upds.get_abs(index);
612                logic(time, diff);
613            }
614        }
615        fn key_valid(&self, storage: &OrdValBatch<L>) -> bool { self.key_cursor < storage.storage.keys.len() }
616        fn val_valid(&self, storage: &OrdValBatch<L>) -> bool { self.val_cursor < storage.storage.vals.bounds(self.key_cursor).1 }
617        fn step_key(&mut self, storage: &OrdValBatch<L>){
618            self.key_cursor += 1;
619            if self.key_valid(storage) {
620                self.rewind_vals(storage);
621            }
622            else {
623                self.key_cursor = storage.storage.keys.len();
624            }
625        }
626        fn seek_key(&mut self, storage: &OrdValBatch<L>, key: Self::Key<'_>) {
627            self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| <L::KeyContainer as BatchContainer>::reborrow(x).lt(&<L::KeyContainer as BatchContainer>::reborrow(key)));
628            if self.key_valid(storage) {
629                self.rewind_vals(storage);
630            }
631        }
632        fn step_val(&mut self, storage: &OrdValBatch<L>) {
633            self.val_cursor += 1;
634            if !self.val_valid(storage) {
635                self.val_cursor = storage.storage.vals.bounds(self.key_cursor).1;
636            }
637        }
638        fn seek_val(&mut self, storage: &OrdValBatch<L>, val: Self::Val<'_>) {
639            self.val_cursor += storage.storage.vals.vals.advance(self.val_cursor, storage.storage.vals.bounds(self.key_cursor).1, |x| <L::ValContainer as BatchContainer>::reborrow(x).lt(&<L::ValContainer as BatchContainer>::reborrow(val)));
640        }
641        fn rewind_keys(&mut self, storage: &OrdValBatch<L>) {
642            self.key_cursor = 0;
643            if self.key_valid(storage) {
644                self.rewind_vals(storage)
645            }
646        }
647        fn rewind_vals(&mut self, storage: &OrdValBatch<L>) {
648            self.val_cursor = storage.storage.vals.bounds(self.key_cursor).0;
649        }
650    }
651
652    /// A builder for creating layers from unsorted update tuples.
653    pub struct OrdValBuilder<L: Layout, CI> {
654        /// The in-progress result.
655        ///
656        /// This is public to allow container implementors to set and inspect their container.
657        pub result: OrdValStorage<L>,
658        staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
659        _marker: PhantomData<CI>,
660    }
661
662    impl<L, CI> Builder for OrdValBuilder<L, CI>
663    where
664        L: for<'a> Layout<
665            KeyContainer: PushInto<CI::Key<'a>>,
666            ValContainer: PushInto<CI::Val<'a>>,
667        >,
668        CI: for<'a> BuilderInput<L::KeyContainer, L::ValContainer, Time=layout::Time<L>, Diff=layout::Diff<L>>,
669    {
670
671        type Input = CI;
672        type Time = layout::Time<L>;
673        type Output = OrdValBatch<L>;
674
675        fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
676            Self {
677                result: OrdValStorage {
678                    keys: L::KeyContainer::with_capacity(keys),
679                    vals: Vals::with_capacity(keys + 1, vals),
680                    upds: Upds::with_capacity(vals + 1, upds),
681                },
682                staging: UpdsBuilder::default(),
683                _marker: PhantomData,
684            }
685        }
686
687        #[inline]
688        fn push(&mut self, chunk: &mut Self::Input) {
689            for item in chunk.drain() {
690                let (key, val, time, diff) = CI::into_parts(item);
691
692                // Pre-load the first update.
693                if self.result.keys.is_empty() {
694                    self.result.vals.vals.push_into(val);
695                    self.result.keys.push_into(key);
696                    self.staging.push(time, diff);
697                }
698                // Perhaps this is a continuation of an already received key.
699                else if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) {
700                    // Perhaps this is a continuation of an already received value.
701                    if self.result.vals.vals.last().map(|v| CI::val_eq(&val, v)).unwrap_or(false) {
702                        self.staging.push(time, diff);
703                    } else {
704                        // New value; complete representation of prior value.
705                        self.staging.seal(&mut self.result.upds);
706                        self.staging.push(time, diff);
707                        self.result.vals.vals.push_into(val);
708                    }
709                } else {
710                    // New key; complete representation of prior key.
711                    self.staging.seal(&mut self.result.upds);
712                    self.staging.push(time, diff);
713                    self.result.vals.offs.push_ref(self.result.vals.vals.len());
714                    self.result.vals.vals.push_into(val);
715                    self.result.keys.push_into(key);
716                }
717            }
718        }
719
720        #[inline(never)]
721        fn done(mut self, description: Description<Self::Time>) -> OrdValBatch<L> {
722            self.staging.seal(&mut self.result.upds);
723            self.result.vals.offs.push_ref(self.result.vals.vals.len());
724            OrdValBatch {
725                updates: self.staging.total(),
726                storage: self.result,
727                description,
728            }
729        }
730
731        fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
732            let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
733            let mut builder = Self::with_capacity(keys, vals, upds);
734            for mut chunk in chain.drain(..) {
735                builder.push(&mut chunk);
736            }
737
738            builder.done(description)
739        }
740    }
741}
742
743/// Types related to forming batches of keys.
744pub mod key_batch {
745
746    use std::marker::PhantomData;
747    use serde::{Deserialize, Serialize};
748    use timely::container::PushInto;
749    use timely::progress::{Antichain, frontier::AntichainRef};
750
751    use crate::trace::{Batch, BatchReader, Builder, Cursor, Description, Merger};
752    use crate::trace::implementations::{BatchContainer, BuilderInput};
753    use crate::trace::implementations::layout;
754
755    use super::{Layout, Upds, layers::UpdsBuilder};
756
757    /// An immutable collection of update tuples, from a contiguous interval of logical times.
758    #[derive(Debug, Serialize, Deserialize)]
759    #[serde(bound = "
760        L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
761        L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
762        L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
763        L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
764    ")]
765    pub struct OrdKeyStorage<L: Layout> {
766        /// An ordered list of keys, corresponding to entries in `keys_offs`.
767        pub keys: L::KeyContainer,
768        /// For each key in `keys`, a list of (time, diff) updates.
769        pub upds: Upds<L::OffsetContainer, L::TimeContainer, L::DiffContainer>,
770    }
771
772    /// An immutable collection of update tuples, from a contiguous interval of logical times.
773    ///
774    /// The `L` parameter captures how the updates should be laid out, and `C` determines which
775    /// merge batcher to select.
776    #[derive(Serialize, Deserialize)]
777    #[serde(bound = "
778        L::KeyContainer: Serialize + for<'a> Deserialize<'a>,
779        L::OffsetContainer: Serialize + for<'a> Deserialize<'a>,
780        L::TimeContainer: Serialize + for<'a> Deserialize<'a>,
781        L::DiffContainer: Serialize + for<'a> Deserialize<'a>,
782    ")]
783    pub struct OrdKeyBatch<L: Layout> {
784        /// The updates themselves.
785        pub storage: OrdKeyStorage<L>,
786        /// Description of the update times this layer represents.
787        pub description: Description<layout::Time<L>>,
788        /// The number of updates reflected in the batch.
789        ///
790        /// We track this separately from `storage` because due to the singleton optimization,
791        /// we may have many more updates than `storage.updates.len()`. It should equal that
792        /// length, plus the number of singleton optimizations employed.
793        pub updates: usize,
794    }
795
796    impl<L: for<'a> Layout<ValContainer: BatchContainer<ReadItem<'a> = &'a ()>>> WithLayout for OrdKeyBatch<L> {
797        type Layout = L;
798    }
799
800    impl<L: for<'a> Layout<ValContainer: BatchContainer<ReadItem<'a> = &'a ()>>> BatchReader for OrdKeyBatch<L> {
801
802        type Cursor = OrdKeyCursor<L>;
803        fn cursor(&self) -> Self::Cursor {
804            OrdKeyCursor {
805                key_cursor: 0,
806                val_stepped: false,
807                phantom: std::marker::PhantomData,
808            }
809        }
810        fn len(&self) -> usize {
811            // Normally this would be `self.updates.len()`, but we have a clever compact encoding.
812            // Perhaps we should count such exceptions to the side, to provide a correct accounting.
813            self.updates
814        }
815        fn description(&self) -> &Description<layout::Time<L>> { &self.description }
816    }
817
818    impl<L: for<'a> Layout<ValContainer: BatchContainer<ReadItem<'a> = &'a ()>>> Batch for OrdKeyBatch<L> {
819        type Merger = OrdKeyMerger<L>;
820
821        fn begin_merge(&self, other: &Self, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self::Merger {
822            OrdKeyMerger::new(self, other, compaction_frontier)
823        }
824
825        fn empty(lower: Antichain<Self::Time>, upper: Antichain<Self::Time>) -> Self {
826            use timely::progress::Timestamp;
827            Self {
828                storage: OrdKeyStorage {
829                    keys: L::KeyContainer::with_capacity(0),
830                    upds: Upds::default(),
831                },
832                description: Description::new(lower, upper, Antichain::from_elem(Self::Time::minimum())),
833                updates: 0,
834            }
835        }
836    }
837
838    /// State for an in-progress merge.
839    pub struct OrdKeyMerger<L: Layout> {
840        /// Key position to merge next in the first batch.
841        key_cursor1: usize,
842        /// Key position to merge next in the second batch.
843        key_cursor2: usize,
844        /// result that we are currently assembling.
845        result: OrdKeyStorage<L>,
846        /// description
847        description: Description<layout::Time<L>>,
848
849        /// Local stash of updates, to use for consolidation.
850        staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
851    }
852
853    impl<L: Layout> Merger<OrdKeyBatch<L>> for OrdKeyMerger<L>
854    where
855        OrdKeyBatch<L>: Batch<Time=layout::Time<L>>,
856    {
857        fn new(batch1: &OrdKeyBatch<L>, batch2: &OrdKeyBatch<L>, compaction_frontier: AntichainRef<layout::Time<L>>) -> Self {
858
859            assert!(batch1.upper() == batch2.lower());
860            use crate::lattice::Lattice;
861            let mut since = batch1.description().since().join(batch2.description().since());
862            since = since.join(&compaction_frontier.to_owned());
863
864            let description = Description::new(batch1.lower().clone(), batch2.upper().clone(), since);
865
866            let batch1 = &batch1.storage;
867            let batch2 = &batch2.storage;
868
869            OrdKeyMerger {
870                key_cursor1: 0,
871                key_cursor2: 0,
872                result: OrdKeyStorage {
873                    keys: L::KeyContainer::merge_capacity(&batch1.keys, &batch2.keys),
874                    upds: Upds::merge_capacity(&batch1.upds, &batch2.upds),
875                },
876                description,
877                staging: UpdsBuilder::default(),
878            }
879        }
880        fn done(self) -> OrdKeyBatch<L> {
881            OrdKeyBatch {
882                updates: self.staging.total(),
883                storage: self.result,
884                description: self.description,
885            }
886        }
887        fn work(&mut self, source1: &OrdKeyBatch<L>, source2: &OrdKeyBatch<L>, fuel: &mut isize) {
888
889            // An (incomplete) indication of the amount of work we've done so far.
890            let starting_updates = self.staging.total();
891            let mut effort = 0isize;
892
893            // While both mergees are still active, perform single-key merges.
894            while self.key_cursor1 < source1.storage.keys.len() && self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
895                self.merge_key(&source1.storage, &source2.storage);
896                // An (incomplete) accounting of the work we've done.
897                effort = (self.staging.total() - starting_updates) as isize;
898            }
899
900            // Merging is complete, and only copying remains.
901            // Key-by-key copying allows effort interruption, and compaction.
902            while self.key_cursor1 < source1.storage.keys.len() && effort < *fuel {
903                self.copy_key(&source1.storage, self.key_cursor1);
904                self.key_cursor1 += 1;
905                effort = (self.staging.total() - starting_updates) as isize;
906            }
907            while self.key_cursor2 < source2.storage.keys.len() && effort < *fuel {
908                self.copy_key(&source2.storage, self.key_cursor2);
909                self.key_cursor2 += 1;
910                effort = (self.staging.total() - starting_updates) as isize;
911            }
912
913            *fuel -= effort;
914        }
915    }
916
917    // Helper methods in support of merging batches.
918    impl<L: Layout> OrdKeyMerger<L> {
919        /// Copy the next key in `source`.
920        ///
921        /// The method extracts the key in `source` at `cursor`, and merges it in to `self`.
922        /// If the result does not wholly cancel, they key will be present in `self` with the
923        /// compacted values and updates.
924        ///
925        /// The caller should be certain to update the cursor, as this method does not do this.
926        fn copy_key(&mut self, source: &OrdKeyStorage<L>, cursor: usize) {
927            self.stash_updates_for_key(source, cursor);
928            if self.staging.seal(&mut self.result.upds) {
929                self.result.keys.push_ref(source.keys.index(cursor));
930            }
931        }
932        /// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors.
933        ///
934        /// This method only merges a single key. It applies all compaction necessary, and may result in no output
935        /// if the updates cancel either directly or after compaction.
936        fn merge_key(&mut self, source1: &OrdKeyStorage<L>, source2: &OrdKeyStorage<L>) {
937            use ::std::cmp::Ordering;
938            match source1.keys.index(self.key_cursor1).cmp(&source2.keys.index(self.key_cursor2)) {
939                Ordering::Less => {
940                    self.copy_key(source1, self.key_cursor1);
941                    self.key_cursor1 += 1;
942                },
943                Ordering::Equal => {
944                    // Keys are equal; must merge all updates from both sources for this one key.
945                    self.stash_updates_for_key(source1, self.key_cursor1);
946                    self.stash_updates_for_key(source2, self.key_cursor2);
947                    if self.staging.seal(&mut self.result.upds) {
948                        self.result.keys.push_ref(source1.keys.index(self.key_cursor1));
949                    }
950                    // Increment cursors in either case; the keys are merged.
951                    self.key_cursor1 += 1;
952                    self.key_cursor2 += 1;
953                },
954                Ordering::Greater => {
955                    self.copy_key(source2, self.key_cursor2);
956                    self.key_cursor2 += 1;
957                },
958            }
959        }
960
961        /// Transfer updates for an indexed value in `source` into `self`, with compaction applied.
962        fn stash_updates_for_key(&mut self, source: &OrdKeyStorage<L>, index: usize) {
963            let (lower, upper) = source.upds.bounds(index);
964            for i in lower .. upper {
965                // NB: Here is where we would need to look back if `lower == upper`.
966                let (time, diff) = source.upds.get_abs(i);
967                use crate::lattice::Lattice;
968                let mut new_time = L::TimeContainer::into_owned(time);
969                new_time.advance_by(self.description.since().borrow());
970                self.staging.push(new_time, L::DiffContainer::into_owned(diff));
971            }
972        }
973    }
974
975    /// A cursor for navigating a single layer.
976    pub struct OrdKeyCursor<L: Layout> {
977        /// Absolute position of the current key.
978        key_cursor: usize,
979        /// If the value has been stepped for the key, there are no more values.
980        val_stepped: bool,
981        /// Phantom marker for Rust happiness.
982        phantom: PhantomData<L>,
983    }
984
985    use crate::trace::implementations::WithLayout;
986    impl<L: for<'a> Layout<ValContainer: BatchContainer<ReadItem<'a> = &'a ()>>> WithLayout for OrdKeyCursor<L> {
987        type Layout = L;
988    }
989
990    impl<L: for<'a> Layout<ValContainer: BatchContainer<ReadItem<'a> = &'a ()>>> Cursor for OrdKeyCursor<L> {
991
992        type Storage = OrdKeyBatch<L>;
993
994        fn get_key<'a>(&self, storage: &'a Self::Storage) -> Option<Self::Key<'a>> { storage.storage.keys.get(self.key_cursor) }
995        fn get_val<'a>(&self, storage: &'a Self::Storage) -> Option<&'a ()> { if self.val_valid(storage) { Some(&()) } else { None } }
996
997        fn key<'a>(&self, storage: &'a Self::Storage) -> Self::Key<'a> { storage.storage.keys.index(self.key_cursor) }
998        fn val<'a>(&self, _storage: &'a Self::Storage) -> &'a () { &() }
999        fn map_times<L2: FnMut(Self::TimeGat<'_>, Self::DiffGat<'_>)>(&mut self, storage: &Self::Storage, mut logic: L2) {
1000            let (lower, upper) = storage.storage.upds.bounds(self.key_cursor);
1001            for index in lower .. upper {
1002                let (time, diff) = storage.storage.upds.get_abs(index);
1003                logic(time, diff);
1004            }
1005        }
1006        fn key_valid(&self, storage: &Self::Storage) -> bool { self.key_cursor < storage.storage.keys.len() }
1007        fn val_valid(&self, _storage: &Self::Storage) -> bool { !self.val_stepped }
1008        fn step_key(&mut self, storage: &Self::Storage){
1009            self.key_cursor += 1;
1010            if self.key_valid(storage) {
1011                self.rewind_vals(storage);
1012            }
1013            else {
1014                self.key_cursor = storage.storage.keys.len();
1015            }
1016        }
1017        fn seek_key(&mut self, storage: &Self::Storage, key: Self::Key<'_>) {
1018            self.key_cursor += storage.storage.keys.advance(self.key_cursor, storage.storage.keys.len(), |x| <L::KeyContainer as BatchContainer>::reborrow(x).lt(&<L::KeyContainer as BatchContainer>::reborrow(key)));
1019            if self.key_valid(storage) {
1020                self.rewind_vals(storage);
1021            }
1022        }
1023        fn step_val(&mut self, _storage: &Self::Storage) {
1024            self.val_stepped = true;
1025        }
1026        fn seek_val(&mut self, _storage: &Self::Storage, _val: Self::Val<'_>) { }
1027        fn rewind_keys(&mut self, storage: &Self::Storage) {
1028            self.key_cursor = 0;
1029            if self.key_valid(storage) {
1030                self.rewind_vals(storage)
1031            }
1032        }
1033        fn rewind_vals(&mut self, _storage: &Self::Storage) {
1034            self.val_stepped = false;
1035        }
1036    }
1037
1038    /// A builder for creating layers from unsorted update tuples.
1039    pub struct OrdKeyBuilder<L: Layout, CI> {
1040        /// The in-progress result.
1041        ///
1042        /// This is public to allow container implementors to set and inspect their container.
1043        pub result: OrdKeyStorage<L>,
1044        staging: UpdsBuilder<L::TimeContainer, L::DiffContainer>,
1045        _marker: PhantomData<CI>,
1046    }
1047
1048    impl<L: Layout, CI> Builder for OrdKeyBuilder<L, CI>
1049    where
1050        L: for<'a> Layout<KeyContainer: PushInto<CI::Key<'a>>>,
1051        CI: BuilderInput<L::KeyContainer, L::ValContainer, Time=layout::Time<L>, Diff=layout::Diff<L>>,
1052    {
1053
1054        type Input = CI;
1055        type Time = layout::Time<L>;
1056        type Output = OrdKeyBatch<L>;
1057
1058        fn with_capacity(keys: usize, _vals: usize, upds: usize) -> Self {
1059            Self {
1060                result: OrdKeyStorage {
1061                    keys: L::KeyContainer::with_capacity(keys),
1062                    upds: Upds::with_capacity(keys+1, upds),
1063                },
1064                staging: UpdsBuilder::default(),
1065                _marker: PhantomData,
1066            }
1067        }
1068
1069        #[inline]
1070        fn push(&mut self, chunk: &mut Self::Input) {
1071            for item in chunk.drain() {
1072                let (key, _val, time, diff) = CI::into_parts(item);
1073                if self.result.keys.is_empty() {
1074                    self.result.keys.push_into(key);
1075                    self.staging.push(time, diff);
1076                }
1077                // Perhaps this is a continuation of an already received key.
1078                else if self.result.keys.last().map(|k| CI::key_eq(&key, k)).unwrap_or(false) {
1079                    self.staging.push(time, diff);
1080                } else {
1081                    self.staging.seal(&mut self.result.upds);
1082                    self.staging.push(time, diff);
1083                    self.result.keys.push_into(key);
1084                }
1085            }
1086        }
1087
1088        #[inline(never)]
1089        fn done(mut self, description: Description<Self::Time>) -> OrdKeyBatch<L> {
1090            self.staging.seal(&mut self.result.upds);
1091            OrdKeyBatch {
1092                updates: self.staging.total(),
1093                storage: self.result,
1094                description,
1095            }
1096        }
1097
1098        fn seal(chain: &mut Vec<Self::Input>, description: Description<Self::Time>) -> Self::Output {
1099            let (keys, vals, upds) = Self::Input::key_val_upd_counts(&chain[..]);
1100            let mut builder = Self::with_capacity(keys, vals, upds);
1101            for mut chunk in chain.drain(..) {
1102                builder.push(&mut chunk);
1103            }
1104
1105            builder.done(description)
1106        }
1107    }
1108
1109}