Skip to main content

mz_row_spine/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types and traits in support of containers for row-encoded byte slices.
11//!
12//! This includes the vanilla `bytes_container` that holds byte slices in contiguous
13//! allocations, as well as a `dictionary` encoding wrapper that is able to rewrite
14//! the byte slices to use spare tags in each column to reference common values.
15
16pub use self::dictionary::DatumContainer;
17pub use self::dictionary::DatumSeq;
18pub use self::offset_opt::OffsetOptimized;
19pub use self::spines::{
20    RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowRowColPagedBuilder, RowRowSpine,
21    RowSpine, RowValBatcher, RowValBuilder, RowValSpine, ValRowBatcher, ValRowBuilder,
22    ValRowColPagedBuilder, ValRowSpine,
23};
24use differential_dataflow::trace::implementations::OffsetList;
25
26/// Enable per-column dictionary compression in row containers.
27pub static DICTIONARY_COMPRESSION: std::sync::atomic::AtomicBool =
28    std::sync::atomic::AtomicBool::new(false);
29
30/// Spines specialized to contain `Row` types in keys and values.
31mod spines {
32    use std::rc::Rc;
33
34    use columnation::Columnation;
35    use differential_dataflow::trace::implementations::Layout;
36    use differential_dataflow::trace::implementations::Update;
37    use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher;
38    use differential_dataflow::trace::implementations::ord_neu::{
39        OrdKeyBatch, OrdValBatch, OrdValBuilder,
40    };
41    use differential_dataflow::trace::implementations::spine_fueled::Spine;
42    use differential_dataflow::trace::rc_blanket_impls::RcBuilder;
43    use mz_repr::Row;
44    use mz_timely_util::columnar::Column;
45    use mz_timely_util::columnation::{ColInternalMerger, ColumnationStack};
46
47    use crate::{DatumContainer, OffsetOptimized};
48
49    /// Batcher matching `mz_compute::typedefs::KeyValBatcher`, redeclared
50    /// locally so this crate does not need to depend on `mz_compute`.
51    type KeyValBatcher<K, V, T, D> = MergeBatcher<ColInternalMerger<(K, V), T, D>>;
52    type KeyBatcher<K, T, D> = KeyValBatcher<K, (), T, D>;
53
54    pub type RowRowSpine<T, R> = Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), T, R)>>>>;
55    pub type RowRowBatcher<T, R> = KeyValBatcher<Row, Row, T, R>;
56    pub type RowRowBuilder<T, R> = RcBuilder<crate::dictionary::builders::RowRowBuilder<T, R>>;
57
58    /// `RowRowBuilder` variant that consumes [`Column`] chunks. Pairs with
59    /// [`Col2ValPagedBatcher`] for the spillable arrange path. This is the stock
60    /// (non-dictionary) builder; dictionary-compressing the paged path is a follow-up.
61    ///
62    /// [`Col2ValPagedBatcher`]: mz_timely_util::columnar::Col2ValPagedBatcher
63    pub type RowRowColPagedBuilder<T, R> =
64        RcBuilder<OrdValBuilder<RowRowLayout<((Row, Row), T, R)>, Column<((Row, Row), T, R)>>>;
65
66    pub type RowValSpine<V, T, R> = Spine<Rc<OrdValBatch<RowValLayout<((Row, V), T, R)>>>>;
67    pub type RowValBatcher<V, T, R> = KeyValBatcher<Row, V, T, R>;
68    pub type RowValBuilder<V, T, R> =
69        RcBuilder<crate::dictionary::builders::RowValBuilder<V, T, R>>;
70
71    pub type RowSpine<T, R> = Spine<Rc<OrdKeyBatch<RowLayout<((Row, ()), T, R)>>>>;
72    pub type RowBatcher<T, R> = KeyBatcher<Row, T, R>;
73    pub type RowBuilder<T, R> = RcBuilder<crate::dictionary::builders::RowBuilder<T, R>>;
74
75    pub type ValRowSpine<K, T, R> = Spine<Rc<OrdValBatch<ValRowLayout<((K, Row), T, R)>>>>;
76    pub type ValRowBatcher<K, T, R> = KeyValBatcher<K, Row, T, R>;
77    pub type ValRowBuilder<K, T, R> =
78        RcBuilder<crate::dictionary::builders::ValRowBuilder<K, T, R>>;
79
80    /// `ValRowBuilder` variant that consumes [`Column`] chunks. Pairs with
81    /// `Col2ValPagedBatcher<K, Row, T, R>` for the spillable arrange path where
82    /// keys are arbitrary `Columnar` values (e.g. `UpsertKey`) and values are
83    /// packed `Row` bytes.
84    pub type ValRowColPagedBuilder<K, T, R> =
85        RcBuilder<OrdValBuilder<ValRowLayout<((K, Row), T, R)>, Column<((K, Row), T, R)>>>;
86
87    /// A layout based on timely stacks
88    pub struct RowRowLayout<U: Update<Key = Row, Val = Row>> {
89        phantom: std::marker::PhantomData<U>,
90    }
91    pub struct RowValLayout<U: Update<Key = Row>> {
92        phantom: std::marker::PhantomData<U>,
93    }
94    pub struct RowLayout<U: Update<Key = Row, Val = ()>> {
95        phantom: std::marker::PhantomData<U>,
96    }
97    /// Mirror of [`RowValLayout`] with the roles swapped: arbitrary `Columnation`
98    /// keys with `Row` values stored as packed bytes in a [`DatumContainer`].
99    pub struct ValRowLayout<U: Update<Val = Row>> {
100        phantom: std::marker::PhantomData<U>,
101    }
102
103    impl<U: Update<Key = Row, Val = Row>> Layout for RowRowLayout<U>
104    where
105        U::Time: Columnation,
106        U::Diff: Columnation,
107    {
108        type KeyContainer = DatumContainer;
109        type ValContainer = DatumContainer;
110        type TimeContainer = ColumnationStack<U::Time>;
111        type DiffContainer = ColumnationStack<U::Diff>;
112        type OffsetContainer = OffsetOptimized;
113    }
114    impl<U: Update<Key = Row>> Layout for RowValLayout<U>
115    where
116        U::Val: Columnation,
117        U::Time: Columnation,
118        U::Diff: Columnation,
119    {
120        type KeyContainer = DatumContainer;
121        type ValContainer = ColumnationStack<U::Val>;
122        type TimeContainer = ColumnationStack<U::Time>;
123        type DiffContainer = ColumnationStack<U::Diff>;
124        type OffsetContainer = OffsetOptimized;
125    }
126    impl<U: Update<Key = Row, Val = ()>> Layout for RowLayout<U>
127    where
128        U::Time: Columnation,
129        U::Diff: Columnation,
130    {
131        type KeyContainer = DatumContainer;
132        type ValContainer = ColumnationStack<()>;
133        type TimeContainer = ColumnationStack<U::Time>;
134        type DiffContainer = ColumnationStack<U::Diff>;
135        type OffsetContainer = OffsetOptimized;
136    }
137    impl<U: Update<Val = Row>> Layout for ValRowLayout<U>
138    where
139        U::Key: Columnation,
140        U::Time: Columnation,
141        U::Diff: Columnation,
142    {
143        type KeyContainer = ColumnationStack<U::Key>;
144        type ValContainer = DatumContainer;
145        type TimeContainer = ColumnationStack<U::Time>;
146        type DiffContainer = ColumnationStack<U::Diff>;
147        type OffsetContainer = OffsetOptimized;
148    }
149}
150
151#[cfg(test)]
152mod tests {
153    use crate::DatumContainer;
154    use differential_dataflow::trace::implementations::BatchContainer;
155    use mz_repr::adt::date::Date;
156    use mz_repr::adt::interval::Interval;
157    use mz_repr::{Datum, Row, SqlScalarType};
158
159    #[mz_ore::test]
160    #[cfg_attr(miri, ignore)] // unsupported operation: integer-to-pointer casts and `ptr::with_exposed_provenance` are not supported
161    fn test_round_trip() {
162        fn round_trip(datums: Vec<Datum>) {
163            let row = Row::pack(datums.clone());
164
165            let mut container = DatumContainer::with_capacity(row.byte_len());
166            container.push_own(&row);
167
168            // When run under miri this catches undefined bytes written to data
169            // eg by calling push_copy! on a type which contains undefined padding values
170            println!("{:?}", container.index(0).iter.data);
171
172            let datums2 = container.index(0).collect::<Vec<_>>();
173            assert_eq!(datums, datums2);
174        }
175
176        round_trip(vec![]);
177        round_trip(
178            SqlScalarType::enumerate()
179                .iter()
180                .flat_map(|r#type| r#type.interesting_datums())
181                .collect(),
182        );
183        round_trip(vec![
184            Datum::Null,
185            Datum::Null,
186            Datum::False,
187            Datum::True,
188            Datum::Int16(-21),
189            Datum::Int32(-42),
190            Datum::Int64(-2_147_483_648 - 42),
191            Datum::UInt8(0),
192            Datum::UInt8(1),
193            Datum::UInt16(0),
194            Datum::UInt16(1),
195            Datum::UInt16(1 << 8),
196            Datum::UInt32(0),
197            Datum::UInt32(1),
198            Datum::UInt32(1 << 8),
199            Datum::UInt32(1 << 16),
200            Datum::UInt32(1 << 24),
201            Datum::UInt64(0),
202            Datum::UInt64(1),
203            Datum::UInt64(1 << 8),
204            Datum::UInt64(1 << 16),
205            Datum::UInt64(1 << 24),
206            Datum::UInt64(1 << 32),
207            Datum::UInt64(1 << 40),
208            Datum::UInt64(1 << 48),
209            Datum::UInt64(1 << 56),
210            Datum::Date(Date::from_pg_epoch(365 * 45 + 21).unwrap()),
211            Datum::Interval(Interval {
212                months: 312,
213                ..Default::default()
214            }),
215            Datum::Interval(Interval::new(0, 0, 1_012_312)),
216            Datum::Bytes(&[]),
217            Datum::Bytes(&[0, 2, 1, 255]),
218            Datum::String(""),
219            Datum::String("العَرَبِيَّة"),
220        ]);
221    }
222
223    /// Exercises the *compressed* encode→decode paths, which the dyncfg-gated
224    /// `test_round_trip` never reaches (it installs no codec). We drive the codec
225    /// directly: observe a sample, build a codec via both `new_from([c1, c2])`
226    /// (the merge path) and `new_safe` (the safe-tag path), then round-trip every
227    /// row through it. We additionally assert the dictionary actually engaged, so
228    /// the test keeps covering the compressed branch rather than silently
229    /// degrading to raw fall-through.
230    #[mz_ore::test]
231    #[cfg_attr(miri, ignore)] // integer-to-pointer casts in row decoding are unsupported under miri
232    fn test_codec_round_trip() {
233        use crate::row_codec::ColumnsCodec;
234
235        // Rows with a small set of repeated, multi-byte string values, so the
236        // dictionary installs entries (MisraGries keeps values with len > 1 and
237        // count > 1). Mixing in an integer column exercises the raw fall-through
238        // (and thus the new soundness `debug_assert`) alongside dictionary hits.
239        let values = ["apple", "banana", "cherry"];
240        let rows: Vec<Row> = (0..3_000)
241            .map(|i| {
242                Row::pack_slice(&[
243                    Datum::String(values[i % values.len()]),
244                    Datum::Int64(i64::try_from(i).unwrap()),
245                    Datum::String(values[(i / 7) % values.len()]),
246                ])
247            })
248            .collect();
249
250        // Accumulate statistics in two independent observers, so the merge in
251        // `new_from([&stats1, &stats2])` is actually exercised.
252        let mut stats1 = ColumnsCodec::default();
253        let mut stats2 = ColumnsCodec::default();
254        let mut scratch = Vec::new();
255        for (i, row) in rows.iter().enumerate() {
256            scratch.clear();
257            let stats = if i % 2 == 0 { &mut stats1 } else { &mut stats2 };
258            stats.encode(ColumnsCodec::borrow_row(row), &mut scratch);
259        }
260
261        let merged = ColumnsCodec::new_from([&stats1, &stats2]);
262        let safe = stats1.new_safe();
263        for mut codec in [merged, safe] {
264            let mut compressed_any = false;
265            for row in &rows {
266                let mut buf = Vec::new();
267                codec.encode(ColumnsCodec::borrow_row(row), &mut buf);
268
269                let decoded = codec.decode(&buf).collect::<Vec<_>>();
270                let expected = ColumnsCodec::borrow_row(row).collect::<Vec<_>>();
271                assert_eq!(decoded, expected, "round-trip mismatch for {row:?}");
272
273                compressed_any |= buf.len() < row.data().len();
274            }
275            assert!(
276                compressed_any,
277                "dictionary never engaged; test no longer covers the compressed path",
278            );
279        }
280    }
281
282    /// Regression test for a dictionary-codec soundness bug in the safe-install
283    /// path (`new_safe`), reachable with the paged batcher enabled.
284    ///
285    /// A from-scratch container stores its pre-install rows *raw* while gathering
286    /// statistics, then installs a *safe* codec via `new_safe`. `new_safe` used to
287    /// discard the first-byte bitmap gathered over those raw rows. That bitmap is
288    /// soundness-critical: a later `new_from` merge consults it to decide which
289    /// one-byte tags are free to hand out as dictionary keys. With the bitmap
290    /// dropped, the merge could assign a dictionary tag equal to a raw datum's
291    /// first byte, after which `decode` resolves that literal datum to the
292    /// dictionary entry — returning the wrong value.
293    ///
294    /// We drive the lifecycle directly: observe short strings (first byte
295    /// `StringTiny`) into the pre-install statistics, install a safe codec, then
296    /// feed it many distinct *long* strings (first byte `StringShort`)
297    /// post-install so the merge has heavy hitters to compress. Merging via
298    /// `new_from` and re-encoding the short strings then exercises the raw
299    /// fall-through whose first byte the merge must not have claimed as a tag.
300    /// Before the fix the `StringTiny` tag was handed out and the round-trip
301    /// produced a long string (and tripped `encode`'s soundness `debug_assert`).
302    #[mz_ore::test]
303    #[cfg_attr(miri, ignore)] // integer-to-pointer casts in row decoding are unsupported under miri
304    fn test_safe_codec_merge_bitmap_carryover() {
305        use crate::row_codec::ColumnsCodec;
306
307        // Short strings: length < 256, so they encode with the `StringTiny` tag.
308        // Unique, so MisraGries never makes them dictionary entries; they always
309        // fall through raw, exposing their first byte.
310        let short_rows: Vec<Row> = (0..256)
311            .map(|i| Row::pack_slice(&[Datum::String(&format!("s{i}"))]))
312            .collect();
313        // Long strings: length >= 256, so they encode with the `StringShort` tag —
314        // a *different* first byte than the short strings. Distinct values, each
315        // repeated, so the post-install codec accrues many heavy hitters and the
316        // merge assigns dictionary tags across the low byte range, reaching the
317        // short strings' `StringTiny` tag unless the bitmap reserves it.
318        let long_values: Vec<String> = (0..64).map(|i| format!("{i:0>300}")).collect();
319
320        // Pre-install statistics observe only the short strings' first bytes.
321        let mut stats = ColumnsCodec::default();
322        let mut scratch = Vec::new();
323        for row in &short_rows {
324            scratch.clear();
325            stats.encode(ColumnsCodec::borrow_row(row), &mut scratch);
326        }
327
328        // Install a safe codec, then feed it the long strings post-install so it
329        // accrues heavy hitters (and observes only the `StringShort` first byte).
330        let mut safe = stats.new_safe();
331        for _ in 0..8 {
332            for v in &long_values {
333                let row = Row::pack_slice(&[Datum::String(v)]);
334                scratch.clear();
335                safe.encode(ColumnsCodec::borrow_row(&row), &mut scratch);
336            }
337        }
338
339        // Merge, then round-trip the short strings. With the bitmap carried over,
340        // no dictionary tag collides with the short strings' first byte; without
341        // it, one does.
342        let mut merged = ColumnsCodec::new_from([&safe]);
343        for row in &short_rows {
344            let mut buf = Vec::new();
345            merged.encode(ColumnsCodec::borrow_row(row), &mut buf);
346            let decoded = merged.decode(&buf).collect::<Vec<_>>();
347            let expected = ColumnsCodec::borrow_row(row).collect::<Vec<_>>();
348            assert_eq!(decoded, expected, "round-trip mismatch for {row:?}");
349        }
350    }
351
352    /// Confirms the structural assumption underpinning `SAFE_TAG_BASE`: every
353    /// datum the row format produces encodes with a first byte strictly less
354    /// than `SAFE_TAG_BASE`. If `mz_repr` ever introduces a tag that crosses
355    /// the boundary, `DictionaryCodec::new_safe` would assign a dictionary tag
356    /// that collides with a literal datum first-byte, breaking decoding.
357    #[mz_ore::test]
358    fn test_safe_tag_base() {
359        use crate::row_codec::SAFE_TAG_BASE;
360        let check = |datum: Datum| {
361            let row = Row::pack_slice(&[datum]);
362            let data = row.data();
363            assert!(!data.is_empty(), "empty encoding for {datum:?}");
364            assert!(
365                data[0] < SAFE_TAG_BASE,
366                "datum {datum:?} encodes with first byte {} >= SAFE_TAG_BASE ({}); \
367                 a new row tag has crossed the safe boundary",
368                data[0],
369                SAFE_TAG_BASE,
370            );
371        };
372        for ty in SqlScalarType::enumerate().iter() {
373            for datum in ty.interesting_datums() {
374                check(datum);
375            }
376        }
377    }
378}
379
380/// A `[u8]`-specialized container.
381mod bytes_container {
382
383    use differential_dataflow::trace::implementations::BatchContainer;
384    use timely::container::PushInto;
385
386    use mz_ore::region::Region;
387
388    /// A slice container with four bytes overhead per slice.
389    pub struct BytesContainer {
390        /// Total length of `batches`, maintained because recomputation is expensive.
391        length: usize,
392        batches: Vec<BytesBatch>,
393    }
394
395    impl BytesContainer {
396        /// Visit contained allocations to determine their size and capacity.
397        #[inline]
398        pub fn heap_size(&self, mut callback: impl FnMut(usize, usize)) {
399            // Calculate heap size for local, stash, and stash entries
400            callback(
401                self.batches.len() * std::mem::size_of::<BytesBatch>(),
402                self.batches.capacity() * std::mem::size_of::<BytesBatch>(),
403            );
404            for batch in self.batches.iter() {
405                batch.offsets.heap_size(&mut callback);
406                callback(batch.storage.len(), batch.storage.capacity());
407            }
408        }
409    }
410
411    impl BatchContainer for BytesContainer {
412        type Owned = Vec<u8>;
413        type ReadItem<'a> = &'a [u8];
414
415        #[inline]
416        fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned {
417            item.to_vec()
418        }
419
420        #[inline]
421        fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) {
422            other.clear();
423            other.extend_from_slice(item);
424        }
425
426        #[inline(always)]
427        fn push_ref(&mut self, item: Self::ReadItem<'_>) {
428            self.push_into(item);
429        }
430
431        #[inline(always)]
432        fn push_own(&mut self, item: &Self::Owned) {
433            self.push_into(item.as_slice())
434        }
435
436        fn clear(&mut self) {
437            self.batches.clear();
438            self.batches.push(BytesBatch::with_capacities(0, 0));
439            self.length = 0;
440        }
441
442        fn with_capacity(size: usize) -> Self {
443            Self {
444                length: 0,
445                batches: vec![BytesBatch::with_capacities(size, size)],
446            }
447        }
448
449        fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
450            let mut item_cap = 1;
451            let mut byte_cap = 0;
452            for batch in cont1.batches.iter() {
453                item_cap += batch.offsets.len() - 1;
454                byte_cap += batch.storage.len();
455            }
456            for batch in cont2.batches.iter() {
457                item_cap += batch.offsets.len() - 1;
458                byte_cap += batch.storage.len();
459            }
460            Self {
461                length: 0,
462                batches: vec![BytesBatch::with_capacities(item_cap, byte_cap)],
463            }
464        }
465
466        #[inline(always)]
467        fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
468            item
469        }
470
471        #[inline]
472        fn index(&self, mut index: usize) -> Self::ReadItem<'_> {
473            for batch in self.batches.iter() {
474                if index < batch.len() {
475                    return batch.index(index);
476                }
477                index -= batch.len();
478            }
479            panic!("Index out of bounds");
480        }
481
482        #[inline(always)]
483        fn len(&self) -> usize {
484            self.length
485        }
486    }
487
488    impl PushInto<&[u8]> for BytesContainer {
489        #[inline]
490        fn push_into(&mut self, item: &[u8]) {
491            self.length += 1;
492            if let Some(batch) = self.batches.last_mut() {
493                let success = batch.try_push(item);
494                if !success {
495                    // double the lengths from `batch`.
496                    let item_cap = 2 * batch.offsets.len();
497                    let byte_cap = std::cmp::max(2 * batch.storage.capacity(), item.len());
498                    let mut new_batch = BytesBatch::with_capacities(item_cap, byte_cap);
499                    assert!(new_batch.try_push(item));
500                    self.batches.push(new_batch);
501                }
502            }
503        }
504    }
505
506    /// A batch of slice storage.
507    ///
508    /// The backing storage for this batch will not be resized.
509    pub struct BytesBatch {
510        offsets: crate::OffsetOptimized,
511        storage: Region<u8>,
512        len: usize,
513    }
514
515    impl BytesBatch {
516        /// Either accepts the slice and returns true,
517        /// or does not and returns false.
518        fn try_push(&mut self, slice: &[u8]) -> bool {
519            if self.storage.len() + slice.len() <= self.storage.capacity() {
520                self.storage.extend_from_slice(slice);
521                self.offsets.push_into(self.storage.len());
522                self.len += 1;
523                true
524            } else {
525                false
526            }
527        }
528        #[inline]
529        fn index(&self, index: usize) -> &[u8] {
530            let lower = self.offsets.index(index);
531            let upper = self.offsets.index(index + 1);
532            &self.storage[lower..upper]
533        }
534        #[inline(always)]
535        fn len(&self) -> usize {
536            debug_assert_eq!(self.len, self.offsets.len() - 1);
537            self.len
538        }
539
540        fn with_capacities(item_cap: usize, byte_cap: usize) -> Self {
541            // TODO: be wary of `byte_cap` greater than 2^32.
542            let mut offsets = crate::OffsetOptimized::with_capacity(item_cap + 1);
543            offsets.push_into(0);
544            Self {
545                offsets,
546                storage: Region::new_auto(byte_cap.next_power_of_two()),
547                len: 0,
548            }
549        }
550    }
551}
552
553mod offset_opt {
554    use differential_dataflow::trace::implementations::BatchContainer;
555    use differential_dataflow::trace::implementations::OffsetList;
556    use timely::container::PushInto;
557
558    enum OffsetStride {
559        Empty,
560        Zero,
561        Striding(usize, usize),
562        Saturated(usize, usize, usize),
563    }
564
565    impl OffsetStride {
566        /// Accepts or rejects a newly pushed element.
567        #[inline]
568        fn push(&mut self, item: usize) -> bool {
569            match self {
570                OffsetStride::Empty => {
571                    if item == 0 {
572                        *self = OffsetStride::Zero;
573                        true
574                    } else {
575                        false
576                    }
577                }
578                OffsetStride::Zero => {
579                    *self = OffsetStride::Striding(item, 2);
580                    true
581                }
582                OffsetStride::Striding(stride, count) => {
583                    if item == *stride * *count {
584                        *count += 1;
585                        true
586                    } else if item == *stride * (*count - 1) {
587                        *self = OffsetStride::Saturated(*stride, *count, 1);
588                        true
589                    } else {
590                        false
591                    }
592                }
593                OffsetStride::Saturated(stride, count, reps) => {
594                    if item == *stride * (*count - 1) {
595                        *reps += 1;
596                        true
597                    } else {
598                        false
599                    }
600                }
601            }
602        }
603
604        #[inline]
605        fn index(&self, index: usize) -> usize {
606            match self {
607                OffsetStride::Empty => {
608                    panic!("Empty OffsetStride")
609                }
610                OffsetStride::Zero => 0,
611                OffsetStride::Striding(stride, _steps) => *stride * index,
612                OffsetStride::Saturated(stride, steps, _reps) => {
613                    if index < *steps {
614                        *stride * index
615                    } else {
616                        *stride * (*steps - 1)
617                    }
618                }
619            }
620        }
621
622        #[inline]
623        fn len(&self) -> usize {
624            match self {
625                OffsetStride::Empty => 0,
626                OffsetStride::Zero => 1,
627                OffsetStride::Striding(_stride, steps) => *steps,
628                OffsetStride::Saturated(_stride, steps, reps) => *steps + *reps,
629            }
630        }
631    }
632
633    pub struct OffsetOptimized {
634        strided: OffsetStride,
635        spilled: OffsetList,
636    }
637
638    impl BatchContainer for OffsetOptimized {
639        type Owned = usize;
640        type ReadItem<'a> = usize;
641
642        #[inline]
643        fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned {
644            item
645        }
646
647        #[inline]
648        fn push_ref(&mut self, item: Self::ReadItem<'_>) {
649            self.push_into(item)
650        }
651
652        #[inline]
653        fn push_own(&mut self, item: &Self::Owned) {
654            self.push_into(*item)
655        }
656
657        fn clear(&mut self) {
658            self.strided = OffsetStride::Empty;
659            self.spilled.clear();
660        }
661
662        fn with_capacity(_size: usize) -> Self {
663            Self {
664                strided: OffsetStride::Empty,
665                spilled: OffsetList::with_capacity(0),
666            }
667        }
668
669        fn merge_capacity(_cont1: &Self, _cont2: &Self) -> Self {
670            Self {
671                strided: OffsetStride::Empty,
672                spilled: OffsetList::with_capacity(0),
673            }
674        }
675
676        #[inline]
677        fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
678            item
679        }
680
681        #[inline]
682        fn index(&self, index: usize) -> Self::ReadItem<'_> {
683            if index < self.strided.len() {
684                self.strided.index(index)
685            } else {
686                self.spilled.index(index - self.strided.len())
687            }
688        }
689
690        #[inline]
691        fn len(&self) -> usize {
692            self.strided.len() + self.spilled.len()
693        }
694    }
695
696    impl PushInto<usize> for OffsetOptimized {
697        #[inline]
698        fn push_into(&mut self, item: usize) {
699            if !self.spilled.is_empty() {
700                self.spilled.push(item);
701            } else {
702                let inserted = self.strided.push(item);
703                if !inserted {
704                    self.spilled.push(item);
705                }
706            }
707        }
708    }
709
710    impl OffsetOptimized {
711        pub fn heap_size(&self, callback: impl FnMut(usize, usize)) {
712            crate::offset_list_size(&self.spilled, callback);
713        }
714    }
715}
716
717/// Helper to compute the size of an [`OffsetList`] in memory.
718#[inline]
719pub(crate) fn offset_list_size(data: &OffsetList, mut callback: impl FnMut(usize, usize)) {
720    // Private `vec_size` because we should only use it where data isn't region-allocated.
721    // `T: Copy` makes sure the implementation is correct even if types change!
722    #[inline(always)]
723    fn vec_size<T: Copy>(data: &Vec<T>, mut callback: impl FnMut(usize, usize)) {
724        let size_of_t = std::mem::size_of::<T>();
725        callback(data.len() * size_of_t, data.capacity() * size_of_t);
726    }
727
728    vec_size(&data.smol, &mut callback);
729    vec_size(&data.chonk, callback);
730}
731
732/// A `Row`-specialized container using dictionary compression.
733///
734/// The approach is to establish for each column lists of common values, and to use "unoccupied"
735/// tags in the row encoding (e.g. where we would indicate types) to replace these common values.
736/// This substitution is opt-in, in that we don't need to do it, and in particular do not do it
737/// while we are collecting preliminary information about common values, and then start to use it
738/// once we believe we have enough information. Once we have started to use the substitutions we
739/// cannot change the meaning of a reserved byte pattern, for the container we are populating.
740///
741/// Each from-scratch container observes `STATS_THRESHOLD` records before establishing a mapping
742/// from spare tags to common values. Containers that are formed from merging other containers
743/// use those input containers' common values to populate a codec and use it immediately.
744///
745/// The dictionary behavior is controlled by the `DICTIONARY_COMPRESSION` flag, which if disabled
746/// prevents the construction of codecs, which when absent simply cause the wrapper to behave as
747/// a no-op that fails to use any spare tags for common values. The flag is set once, when a
748/// replica is created (from compute's `InstanceConfig::arrangement_dictionary_compression`, itself
749/// captured from the `enable_arrangement_dictionary_compression_alpha` dyncfg at that moment), and is
750/// not changed for the life of the process; flipping the dyncfg only affects replicas created
751/// afterwards. Even with the flag fixed, a single replica can hold a mix of compressed and
752/// uncompressed containers — e.g. containers that never observed enough records to install a
753/// codec, or that were merged from uncompressed inputs.
754mod dictionary {
755
756    use differential_dataflow::trace::implementations::BatchContainer;
757
758    use mz_repr::{Row, RowRef};
759
760    use super::row_codec::{ColumnsCodec, ColumnsIter};
761
762    /// Wrapper types that exist to support the creation of dictionary codecs.
763    ///
764    /// These types interpose at the seal() call, to traverse the data that is being sealed and
765    /// then construct codecs that are used to encode the row-shaped keys and values. There are
766    /// several variants, corresponding to the RowRow, RowVal, and Row-only spine types.
767    pub mod builders {
768
769        use columnation::Columnation;
770        use differential_dataflow::difference::Semigroup;
771        use differential_dataflow::lattice::Lattice;
772        use differential_dataflow::trace::Builder;
773        use differential_dataflow::trace::Description;
774        use differential_dataflow::trace::implementations::ord_neu::{OrdKeyBatch, OrdKeyBuilder};
775        use differential_dataflow::trace::implementations::ord_neu::{OrdValBatch, OrdValBuilder};
776        use mz_timely_util::columnation::ColumnationStack as TimelyStack;
777        use timely::progress::Timestamp;
778
779        use mz_repr::Row;
780
781        use super::super::row_codec::ColumnsCodec;
782        use super::{DatumContainer, DatumSeq};
783        use crate::DICTIONARY_COMPRESSION;
784        use crate::spines::{RowLayout, RowRowLayout, RowValLayout, ValRowLayout};
785
786        /// Gather encoding statistics across `rows` and produce a codec from them.
787        ///
788        /// Returns `None` when dictionary compression is disabled.
789        fn build_codec<'a>(rows: impl IntoIterator<Item = &'a Row>) -> Option<ColumnsCodec> {
790            if !DICTIONARY_COMPRESSION.load(std::sync::atomic::Ordering::Relaxed) {
791                return None;
792            }
793            let mut stats = ColumnsCodec::default();
794            let mut buf = Vec::default();
795            for row in rows {
796                if !row.is_empty() {
797                    stats.encode(DatumSeq::borrow_as(row).bytes_iter(), &mut buf);
798                    buf.clear();
799                }
800            }
801            Some(ColumnsCodec::new_from([&stats]))
802        }
803
804        pub struct RowRowBuilder<
805            T: Lattice + Timestamp + Columnation,
806            R: Ord + Semigroup + Columnation + 'static,
807        > {
808            inner: OrdValBuilder<RowRowLayout<((Row, Row), T, R)>, TimelyStack<((Row, Row), T, R)>>,
809        }
810
811        impl<T: Lattice + Timestamp + Columnation, R: Ord + Semigroup + Columnation + 'static>
812            Builder for RowRowBuilder<T, R>
813        {
814            type Input = TimelyStack<((Row, Row), T, R)>;
815            type Time = T;
816            type Output = OrdValBatch<RowRowLayout<((Row, Row), T, R)>>;
817
818            fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
819                Self {
820                    inner: Builder::with_capacity(keys, vals, upds),
821                }
822            }
823            fn push(&mut self, chunk: &mut Self::Input) {
824                self.inner.push(chunk)
825            }
826            fn done(self, description: Description<Self::Time>) -> Self::Output {
827                self.inner.done(description)
828            }
829            fn seal(
830                chain: &mut Vec<Self::Input>,
831                description: Description<Self::Time>,
832            ) -> Self::Output {
833                let key_codec = build_codec(
834                    chain
835                        .iter()
836                        .flat_map(|link| link.iter().map(|((k, _), _, _)| k)),
837                );
838                let val_codec = build_codec(
839                    chain
840                        .iter()
841                        .flat_map(|link| link.iter().map(|((_, v), _, _)| v)),
842                );
843
844                use differential_dataflow::trace::implementations::BuilderInput;
845
846                let (keys, vals, upds) = <Self::Input as BuilderInput<
847                    DatumContainer,
848                    DatumContainer,
849                >>::key_val_upd_counts(&chain[..]);
850                let mut builder = Self::with_capacity(keys, vals, upds);
851                // The seal path installs a codec directly, so the per-container stats
852                // gatherer (which `with_capacity` may have allocated) is dead weight and
853                // would contradict the `stats: None once codec installed` invariant.
854                builder.inner.result.keys.codec = key_codec;
855                builder.inner.result.keys.stats = None;
856                builder.inner.result.vals.vals.codec = val_codec;
857                builder.inner.result.vals.vals.stats = None;
858
859                for mut chunk in chain.drain(..) {
860                    builder.push(&mut chunk);
861                }
862
863                builder.done(description)
864            }
865        }
866
867        pub struct RowValBuilder<
868            V: Ord + Clone + Columnation + 'static,
869            T: Lattice + Timestamp + Columnation,
870            R: Ord + Semigroup + Columnation + 'static,
871        > {
872            inner: OrdValBuilder<RowValLayout<((Row, V), T, R)>, TimelyStack<((Row, V), T, R)>>,
873        }
874
875        impl<
876            V: Ord + Clone + Columnation,
877            T: Lattice + Timestamp + Columnation,
878            R: Ord + Semigroup + Columnation + 'static,
879        > Builder for RowValBuilder<V, T, R>
880        {
881            type Input = TimelyStack<((Row, V), T, R)>;
882            type Time = T;
883            type Output = OrdValBatch<RowValLayout<((Row, V), T, R)>>;
884
885            fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
886                Self {
887                    inner: Builder::with_capacity(keys, vals, upds),
888                }
889            }
890            fn push(&mut self, chunk: &mut Self::Input) {
891                self.inner.push(chunk)
892            }
893            fn done(self, description: Description<Self::Time>) -> Self::Output {
894                self.inner.done(description)
895            }
896            fn seal(
897                chain: &mut Vec<Self::Input>,
898                description: Description<Self::Time>,
899            ) -> Self::Output {
900                let key_codec = build_codec(
901                    chain
902                        .iter()
903                        .flat_map(|link| link.iter().map(|((k, _), _, _)| k)),
904                );
905
906                use differential_dataflow::trace::implementations::BuilderInput;
907
908                let (keys, vals, upds) = <Self::Input as BuilderInput<
909                    DatumContainer,
910                    TimelyStack<V>,
911                >>::key_val_upd_counts(&chain[..]);
912                let mut builder = Self::with_capacity(keys, vals, upds);
913                // See `RowRowBuilder::seal`: drop the now-redundant stats gatherer.
914                builder.inner.result.keys.codec = key_codec;
915                builder.inner.result.keys.stats = None;
916
917                for mut chunk in chain.drain(..) {
918                    builder.push(&mut chunk);
919                }
920
921                builder.done(description)
922            }
923        }
924
925        pub struct RowBuilder<
926            T: Lattice + Timestamp + Columnation,
927            R: Ord + Semigroup + Columnation + 'static,
928        > {
929            inner: OrdKeyBuilder<RowLayout<((Row, ()), T, R)>, TimelyStack<((Row, ()), T, R)>>,
930        }
931
932        impl<T: Lattice + Timestamp + Columnation, R: Ord + Semigroup + Columnation + 'static>
933            Builder for RowBuilder<T, R>
934        {
935            type Input = TimelyStack<((Row, ()), T, R)>;
936            type Time = T;
937            type Output = OrdKeyBatch<RowLayout<((Row, ()), T, R)>>;
938
939            fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
940                Self {
941                    inner: Builder::with_capacity(keys, vals, upds),
942                }
943            }
944            fn push(&mut self, chunk: &mut Self::Input) {
945                self.inner.push(chunk)
946            }
947            fn done(self, description: Description<Self::Time>) -> Self::Output {
948                self.inner.done(description)
949            }
950            fn seal(
951                chain: &mut Vec<Self::Input>,
952                description: Description<Self::Time>,
953            ) -> Self::Output {
954                let key_codec = build_codec(
955                    chain
956                        .iter()
957                        .flat_map(|link| link.iter().map(|((k, _), _, _)| k)),
958                );
959
960                use differential_dataflow::trace::implementations::BuilderInput;
961
962                let (keys, vals, upds) = <Self::Input as BuilderInput<
963                    DatumContainer,
964                    TimelyStack<()>,
965                >>::key_val_upd_counts(&chain[..]);
966                let mut builder = Self::with_capacity(keys, vals, upds);
967                // See `RowRowBuilder::seal`: drop the now-redundant stats gatherer.
968                builder.inner.result.keys.codec = key_codec;
969                builder.inner.result.keys.stats = None;
970
971                for mut chunk in chain.drain(..) {
972                    builder.push(&mut chunk);
973                }
974
975                builder.done(description)
976            }
977        }
978
979        /// Mirror of [`RowValBuilder`] with the roles swapped: arbitrary keys and
980        /// `Row` *values*, so the dictionary codec is built for and installed on the
981        /// value container.
982        pub struct ValRowBuilder<
983            K: Ord + Clone + Columnation + 'static,
984            T: Lattice + Timestamp + Columnation,
985            R: Ord + Semigroup + Columnation + 'static,
986        > {
987            inner: OrdValBuilder<ValRowLayout<((K, Row), T, R)>, TimelyStack<((K, Row), T, R)>>,
988        }
989
990        impl<
991            K: Ord + Clone + Columnation,
992            T: Lattice + Timestamp + Columnation,
993            R: Ord + Semigroup + Columnation + 'static,
994        > Builder for ValRowBuilder<K, T, R>
995        {
996            type Input = TimelyStack<((K, Row), T, R)>;
997            type Time = T;
998            type Output = OrdValBatch<ValRowLayout<((K, Row), T, R)>>;
999
1000            fn with_capacity(keys: usize, vals: usize, upds: usize) -> Self {
1001                Self {
1002                    inner: Builder::with_capacity(keys, vals, upds),
1003                }
1004            }
1005            fn push(&mut self, chunk: &mut Self::Input) {
1006                self.inner.push(chunk)
1007            }
1008            fn done(self, description: Description<Self::Time>) -> Self::Output {
1009                self.inner.done(description)
1010            }
1011            fn seal(
1012                chain: &mut Vec<Self::Input>,
1013                description: Description<Self::Time>,
1014            ) -> Self::Output {
1015                let val_codec = build_codec(
1016                    chain
1017                        .iter()
1018                        .flat_map(|link| link.iter().map(|((_, v), _, _)| v)),
1019                );
1020
1021                use differential_dataflow::trace::implementations::BuilderInput;
1022
1023                let (keys, vals, upds) = <Self::Input as BuilderInput<
1024                    TimelyStack<K>,
1025                    DatumContainer,
1026                >>::key_val_upd_counts(&chain[..]);
1027                let mut builder = Self::with_capacity(keys, vals, upds);
1028                // See `RowRowBuilder::seal`: drop the now-redundant stats gatherer.
1029                builder.inner.result.vals.vals.codec = val_codec;
1030                builder.inner.result.vals.vals.stats = None;
1031
1032                for mut chunk in chain.drain(..) {
1033                    builder.push(&mut chunk);
1034                }
1035
1036                builder.done(description)
1037            }
1038        }
1039    }
1040
1041    pub struct DatumContainer {
1042        /// Encoder/decoder used to translate between row bytes and the stored bytes.
1043        /// `None` until enough pushes have been observed (or if compression is disabled).
1044        codec: Option<ColumnsCodec>,
1045        /// The stored, possibly-encoded, row bytes.
1046        inner: super::bytes_container::BytesContainer,
1047        /// Staging buffer for ingested `Row` types.
1048        staging: Vec<u8>,
1049        /// Statistics gatherer, used to build a safe codec after enough pushes.
1050        /// `None` once the codec has been installed or if compression is disabled.
1051        stats: Option<ColumnsCodec>,
1052    }
1053
1054    impl BatchContainer for DatumContainer {
1055        type Owned = Row;
1056        type ReadItem<'a> = DatumSeq<'a>;
1057
1058        fn with_capacity(size: usize) -> Self {
1059            let stats = if crate::DICTIONARY_COMPRESSION.load(std::sync::atomic::Ordering::Relaxed)
1060            {
1061                Some(Default::default())
1062            } else {
1063                None
1064            };
1065
1066            Self {
1067                codec: None,
1068                inner: BatchContainer::with_capacity(size),
1069                staging: Vec::new(),
1070                stats,
1071            }
1072        }
1073        fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
1074            // We only build a merged codec when *both* inputs carry one. A codec is
1075            // sound only for the data whose tag usage it observed, so we cannot reuse
1076            // one side's codec to decode the other side's rows. When exactly one side
1077            // is compressed we conservatively produce an uncompressed container rather
1078            // than risk a tag collision; the merged container re-gathers stats and may
1079            // install a fresh codec later via the `STATS_THRESHOLD` path.
1080            let codec = match (&cont1.codec, &cont2.codec) {
1081                (Some(c1), Some(c2)) => Some(ColumnsCodec::new_from([c1, c2])),
1082                _ => None,
1083            };
1084
1085            Self {
1086                codec,
1087                inner: BatchContainer::merge_capacity(&cont1.inner, &cont2.inner),
1088                staging: Vec::new(),
1089                stats: None,
1090            }
1091        }
1092        #[inline]
1093        fn index(&self, index: usize) -> Self::ReadItem<'_> {
1094            let data = self.inner.index(index);
1095            let iter = if let Some(codec) = &self.codec {
1096                codec.decode(data)
1097            } else {
1098                // Safety: without a codec we only push rows or datumseqs into `self.inner`.
1099                // Each retrieved byte slice should be row-encoded data, as long as we have
1100                // not unset the codec in the interim.
1101                unsafe { ColumnsIter::without_codec(data) }
1102            };
1103            DatumSeq { iter }
1104        }
1105        #[inline(always)]
1106        fn len(&self) -> usize {
1107            self.inner.len()
1108        }
1109
1110        #[inline(always)]
1111        fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
1112            item
1113        }
1114
1115        #[inline(always)]
1116        fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned {
1117            // Fast path: unencoded data is already row-formatted bytes.
1118            if item.iter.index.is_none() {
1119                // SAFETY: `iter.data` is raw row-encoded bytes when there is no codec.
1120                return unsafe { Row::from_bytes_unchecked(item.iter.data) };
1121            }
1122            Row::pack(item)
1123        }
1124
1125        #[inline(always)]
1126        fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) {
1127            // Fast path: unencoded data is already row-formatted bytes.
1128            if item.iter.index.is_none() {
1129                let mut packer = other.packer();
1130                // SAFETY: `iter.data` is raw row-encoded bytes when there is no codec.
1131                unsafe { packer.extend_by_slice_unchecked(item.iter.data) };
1132                return;
1133            }
1134            other.packer().extend(item);
1135        }
1136
1137        #[inline(always)]
1138        fn push_ref(&mut self, item: Self::ReadItem<'_>) {
1139            // Fast path: both sides unencoded — push raw bytes directly.
1140            if self.codec.is_none() && self.stats.is_none() && item.iter.index.is_none() {
1141                self.inner.push_ref(item.iter.data);
1142                return;
1143            }
1144            self.push_into(item);
1145        }
1146
1147        #[inline(always)]
1148        fn push_own(&mut self, item: &Self::Owned) {
1149            // Fast path: container is unencoded — push raw row bytes directly.
1150            if self.codec.is_none() && self.stats.is_none() {
1151                self.inner.push_ref(item.data());
1152                return;
1153            }
1154            self.push_into(item);
1155        }
1156
1157        #[inline(always)]
1158        fn clear(&mut self) {
1159            self.inner.clear();
1160            self.staging.clear();
1161            // Reset to the same state as a fresh `with_capacity`: drop any installed
1162            // codec and restore stats gathering (if compression is enabled). Keeping a
1163            // now-empty codec would leave `codec.is_some()`, which permanently routes
1164            // pushes down the encode path with an empty dictionary and prevents the
1165            // `STATS_THRESHOLD` install logic from ever re-engaging compression.
1166            self.codec = None;
1167            self.stats = if crate::DICTIONARY_COMPRESSION.load(std::sync::atomic::Ordering::Relaxed)
1168            {
1169                Some(Default::default())
1170            } else {
1171                None
1172            };
1173        }
1174    }
1175
1176    impl DatumContainer {
1177        /// Visit contained allocations to determine their size and capacity.
1178        #[inline]
1179        pub fn heap_size(&self, mut callback: impl FnMut(usize, usize)) {
1180            self.inner.heap_size(&mut callback);
1181            // The staging buffer and the (possibly absent) codec and stats gatherer all
1182            // hold heap allocations that the bare `inner` accounting misses.
1183            callback(self.staging.len(), self.staging.capacity());
1184            if let Some(codec) = &self.codec {
1185                codec.heap_size(&mut callback);
1186            }
1187            if let Some(stats) = &self.stats {
1188                stats.heap_size(&mut callback);
1189            }
1190        }
1191    }
1192
1193    use timely::container::PushInto;
1194    impl PushInto<Row> for DatumContainer {
1195        #[inline(always)]
1196        fn push_into(&mut self, item: Row) {
1197            self.push_into(&item);
1198        }
1199    }
1200
1201    impl PushInto<&Row> for DatumContainer {
1202        #[inline(always)]
1203        fn push_into(&mut self, item: &Row) {
1204            self.push_into(DatumSeq::borrow_as(item));
1205        }
1206    }
1207
1208    impl PushInto<&RowRef> for DatumContainer {
1209        #[inline(always)]
1210        fn push_into(&mut self, item: &RowRef) {
1211            self.push_into(DatumSeq::borrow_as(item));
1212        }
1213    }
1214
1215    /// Number of pushes a from-scratch container observes before it turns its
1216    /// gathered stats into a safe codec.
1217    ///
1218    /// A safe codec has at most `256 - SAFE_TAG_BASE` (= 134) dictionary slots per
1219    /// column, so we only need to identify ~134 genuinely-popular values. The
1220    /// `MisraGries` summary retains up to `2 * k` (= 1024) distinct candidates
1221    /// between tidies and reduces to `k` (= 512), comfortably more than 134, so the
1222    /// threshold just needs to be large enough that heavy hitters accumulate counts
1223    /// well above 1 before we freeze the codec. 64Ki pushes gives that headroom while
1224    /// keeping the pre-codec (uncompressed) window short.
1225    const STATS_THRESHOLD: usize = 64 * 1024;
1226
1227    impl PushInto<DatumSeq<'_>> for DatumContainer {
1228        #[inline]
1229        fn push_into(&mut self, item: DatumSeq<'_>) {
1230            // Fast path: container and item are both unencoded.
1231            // This is the hot path when dictionary compression is disabled.
1232            if self.codec.is_none() && self.stats.is_none() && item.iter.index.is_none() {
1233                self.inner.push_ref(item.iter.data);
1234                return;
1235            }
1236
1237            // Check if we've gathered enough stats to install a safe codec.
1238            if self.codec.is_none() && self.stats.is_some() && self.inner.len() >= STATS_THRESHOLD {
1239                let stats = self.stats.take().unwrap();
1240                self.codec = Some(stats.new_safe());
1241            }
1242
1243            if let Some(codec) = &mut self.codec {
1244                // Encode using the installed codec.
1245                codec.encode(item.bytes_iter(), &mut self.staging);
1246            } else if let Some(stats) = &mut self.stats {
1247                // Stats-gathering phase: feed the statistics but store raw bytes.
1248                // `observe` updates the heavy-hitter/tag summaries without encoding, so
1249                // we copy each row exactly once (below) instead of also encoding it into
1250                // a buffer we would immediately discard.
1251                stats.observe(item.bytes_iter());
1252                for slice in item.bytes_iter() {
1253                    self.staging.extend_from_slice(slice);
1254                }
1255            } else {
1256                // No codec, no stats: raw copy.
1257                for slice in item.bytes_iter() {
1258                    self.staging.extend_from_slice(slice);
1259                }
1260            }
1261            self.inner.push_ref(&self.staging[..]);
1262            self.staging.clear();
1263        }
1264    }
1265
1266    use mz_repr::{Datum, read_datum};
1267
1268    /// A reference that can be resolved to a sequence of `Datum`s.
1269    ///
1270    /// This type must "compare" as if decoded to a `Row`, which means it needs to track
1271    /// various nuances of `Row::cmp`, which at the moment is first by length, and then by
1272    /// the raw binary slice backing the row. Neither of those are explicit in this struct.
1273    /// We will need to produce them in order to perform comparisons.
1274    #[derive(Debug)]
1275    pub struct DatumSeq<'a> {
1276        pub iter: ColumnsIter<'a>,
1277    }
1278
1279    impl<'a> DatumSeq<'a> {
1280        #[inline(always)]
1281        fn borrow_as(other: &'a RowRef) -> Self {
1282            Self {
1283                iter: ColumnsCodec::borrow_row(other),
1284            }
1285        }
1286
1287        /// Borrow a `Row` as a `DatumSeq` so that it can be used to seek into a
1288        /// trace whose key/value container is a [`DatumContainer`].
1289        #[inline]
1290        pub fn from_row(row: &'a Row) -> Self {
1291            Self::borrow_as(row)
1292        }
1293
1294        #[inline]
1295        pub fn to_row(&self) -> Row {
1296            // Fast path: unencoded data is already row-formatted bytes.
1297            if self.iter.index.is_none() {
1298                return unsafe { Row::from_bytes_unchecked(self.iter.data) };
1299            }
1300            Row::pack(*self)
1301        }
1302    }
1303
1304    impl<'a> Copy for DatumSeq<'a> {}
1305    impl<'a> Clone for DatumSeq<'a> {
1306        #[inline(always)]
1307        fn clone(&self) -> Self {
1308            *self
1309        }
1310    }
1311
1312    use std::cmp::Ordering;
1313    impl<'a, 'b> PartialEq<DatumSeq<'a>> for DatumSeq<'b> {
1314        #[inline(always)]
1315        fn eq(&self, other: &DatumSeq<'a>) -> bool {
1316            // Fast path: both sides are unencoded raw row bytes.
1317            if self.iter.index.is_none() && other.iter.index.is_none() {
1318                return self.iter.data == other.iter.data;
1319            }
1320            Iterator::eq(self.iter, other.iter)
1321        }
1322    }
1323    impl<'a> Eq for DatumSeq<'a> {}
1324    impl<'a, 'b> PartialOrd<DatumSeq<'a>> for DatumSeq<'b> {
1325        #[inline(always)]
1326        fn partial_cmp(&self, other: &DatumSeq<'a>) -> Option<Ordering> {
1327            // Fast path: both sides are unencoded raw row bytes.
1328            if self.iter.index.is_none() && other.iter.index.is_none() {
1329                let left = self.iter.data;
1330                let right = other.iter.data;
1331                return Some(match left.len().cmp(&right.len()) {
1332                    Ordering::Equal => left.cmp(right),
1333                    other => other,
1334                });
1335            }
1336            // Slow path: at least one side is dictionary-encoded.
1337            // Fused length + lexicographic comparison in a single pass per side.
1338            // Row ordering is: shorter < longer; equal lengths compared lexicographically.
1339            //
1340            // We compare byte-by-byte (via `flatten`) rather than slice-by-slice on
1341            // purpose: a dictionary tag expands to a multi-byte value on one side while
1342            // the other side may store those same bytes raw, so the per-column slice
1343            // boundaries do not line up between the two iterators. Decoding to a flat
1344            // byte stream is the only representation in which both sides are directly
1345            // comparable. This path is cold — it only runs when at least one operand is
1346            // dictionary-encoded; the common unencoded case is handled by the fast path
1347            // above with a single slice comparison.
1348            let mut left = self.iter.flatten();
1349            let mut right = other.iter.flatten();
1350            let mut first_diff = Ordering::Equal;
1351            loop {
1352                match (left.next(), right.next()) {
1353                    (Some(l), Some(r)) => {
1354                        if first_diff == Ordering::Equal {
1355                            first_diff = l.cmp(r);
1356                        }
1357                    }
1358                    // Left exhausted first: left is shorter, so Less.
1359                    (None, Some(_)) => return Some(Ordering::Less),
1360                    // Right exhausted first: right is shorter, so Greater.
1361                    (Some(_), None) => return Some(Ordering::Greater),
1362                    // Same length: use first lexicographic difference.
1363                    (None, None) => return Some(first_diff),
1364                }
1365            }
1366        }
1367    }
1368    impl<'a> Ord for DatumSeq<'a> {
1369        #[inline(always)]
1370        fn cmp(&self, other: &Self) -> Ordering {
1371            self.partial_cmp(other).unwrap()
1372        }
1373    }
1374
1375    impl<'a> PartialEq<&'a Row> for DatumSeq<'a> {
1376        #[inline(always)]
1377        fn eq(&self, other: &&'a Row) -> bool {
1378            self.eq(&Self::borrow_as(*other))
1379        }
1380    }
1381
1382    // Lifetimes decoupled (`'b` independent of `'a`): the arrange machinery
1383    // requires `for<'b> DatumSeq<'a>: PartialEq<&'b RowRef>`, i.e. a fixed
1384    // `DatumSeq` must compare against a `&RowRef` of any lifetime.
1385    impl<'a, 'b> PartialEq<&'b RowRef> for DatumSeq<'a> {
1386        #[inline(always)]
1387        fn eq(&self, other: &&'b RowRef) -> bool {
1388            self.eq(&DatumSeq::borrow_as(*other))
1389        }
1390    }
1391
1392    impl<'a> DatumSeq<'a> {
1393        #[inline(always)]
1394        pub fn bytes_iter(self) -> ColumnsIter<'a> {
1395            self.iter
1396        }
1397    }
1398
1399    impl<'a> Iterator for DatumSeq<'a> {
1400        type Item = Datum<'a>;
1401        #[inline(always)]
1402        fn next(&mut self) -> Option<Self::Item> {
1403            // Delegate to `ColumnsIter`, which handles both the codec and no-codec
1404            // cases. The no-codec scan hot path is served directly by `extend_datums`
1405            // (which decodes without going through this iterator), so the only callers
1406            // left here are the codec-encoded `extend_datums`/`to_row` paths and tests;
1407            // none warrant a dedicated no-codec fast path.
1408            self.iter
1409                .next()
1410                .map(|mut bytes| unsafe { read_datum(&mut bytes) })
1411        }
1412    }
1413
1414    use mz_repr::RowArena;
1415    use mz_repr::fixed_length::ExtendDatums;
1416    impl<'long> ExtendDatums for DatumSeq<'long> {
1417        #[inline]
1418        fn extend_datums<'a>(
1419            &'a self,
1420            _arena: &'a RowArena,
1421            target: &mut Vec<Datum<'a>>,
1422            max: Option<usize>,
1423        ) {
1424            // Branch on codec presence ONCE per row rather than once per datum.
1425            // With no codec (the common, feature-off case) push raw datums in a
1426            // tight loop, matching the pre-dictionary path; with a codec, fall
1427            // back to the per-column iterator. This keeps the codec check out of
1428            // the per-datum loop — the source of the feature-off scan overhead.
1429            if self.iter.index.is_none() {
1430                let mut data = self.iter.data;
1431                match max {
1432                    Some(max) => {
1433                        let mut n = 0;
1434                        while n < max && !data.is_empty() {
1435                            target.push(unsafe { read_datum(&mut data) });
1436                            n += 1;
1437                        }
1438                    }
1439                    None => {
1440                        while !data.is_empty() {
1441                            target.push(unsafe { read_datum(&mut data) });
1442                        }
1443                    }
1444                }
1445            } else {
1446                match max {
1447                    Some(max) => target.extend((*self).take(max)),
1448                    None => target.extend(*self),
1449                }
1450            }
1451        }
1452    }
1453}
1454
1455/// Traits abstracting the processes of encoding and decoding row-encoded byte sequences.
1456///
1457/// It is unsafe to use these types to encode byte sequences that are not row-encoded,
1458/// as they are parsed out of contiguous `[u8]` slices using `mz_repr::read_datum`.
1459mod row_codec {
1460
1461    pub use self::misra_gries::MisraGries;
1462    pub use columns::{ColumnsCodec, ColumnsIter};
1463    pub use dictionary::DictionaryCodec;
1464    #[cfg(test)]
1465    pub use dictionary::SAFE_TAG_BASE;
1466
1467    // The codecs encode and decode `[u8]` data specific to the `[Row]` encoding. They
1468    // soundly decode data they themselves encoded from valid `[Row]` data, but may be
1469    // unsound if asked to decode data that was not row-encoded, or was encoded with a
1470    // different codec. `ColumnsCodec` (a per-column wrapper around `DictionaryCodec`) is
1471    // the only codec the spine instantiates; the methods are inherent rather than behind
1472    // a `Codec` trait because nothing ever dispatches over codecs generically.
1473
1474    mod columns {
1475
1476        use mz_repr::{RowRef, read_datum};
1477
1478        use super::DictionaryCodec;
1479
1480        /// Independently encodes each column.
1481        #[derive(Default, Debug)]
1482        pub struct ColumnsCodec {
1483            columns: Vec<DictionaryCodec>,
1484        }
1485
1486        impl ColumnsCodec {
1487            /// Decode a row-encoded byte slice into per-column byte slices.
1488            pub(crate) fn decode<'a>(&'a self, bytes: &'a [u8]) -> ColumnsIter<'a> {
1489                ColumnsIter {
1490                    index: Some(self),
1491                    column: 0,
1492                    data: bytes,
1493                }
1494            }
1495            /// Encode a sequence of column byte slices, updating per-column statistics.
1496            pub(crate) fn encode<'a, I>(&mut self, iter: I, output: &mut Vec<u8>)
1497            where
1498                I: IntoIterator<Item = &'a [u8]>,
1499            {
1500                for (index, bytes) in iter.into_iter().enumerate() {
1501                    if self.columns.len() <= index {
1502                        self.columns.push(Default::default());
1503                    }
1504                    self.columns[index].encode(std::iter::once(bytes), output);
1505                }
1506            }
1507
1508            /// Construct a codec valid for the union of the supplied codecs' data.
1509            pub(crate) fn new_from<'a>(stats: impl IntoIterator<Item = &'a Self>) -> Self {
1510                // An empty `stats` iterator yields a zero-column codec, which encodes and
1511                // decodes nothing; callers merging no inputs get an inert (but sound) codec.
1512                let stats = stats.into_iter().collect::<Vec<_>>();
1513                let cols = stats.iter().map(|s| s.columns.len()).max().unwrap_or(0);
1514                let mut columns = Vec::with_capacity(cols);
1515                let default: DictionaryCodec = Default::default();
1516                for index in 0..cols {
1517                    columns.push(DictionaryCodec::new_from(
1518                        stats
1519                            .iter()
1520                            .map(|s| s.columns.get(index).unwrap_or(&default)),
1521                    ));
1522                }
1523                Self { columns }
1524            }
1525
1526            /// Reveal a row's bytes for fast-path comparison, with no codec to consult.
1527            #[inline(always)]
1528            pub(crate) fn borrow_row(row: &RowRef) -> ColumnsIter<'_> {
1529                ColumnsIter {
1530                    index: None,
1531                    column: 0,
1532                    data: row.data(),
1533                }
1534            }
1535        }
1536
1537        impl ColumnsCodec {
1538            /// Visit contained allocations to determine their size and capacity.
1539            pub(crate) fn heap_size(&self, callback: &mut impl FnMut(usize, usize)) {
1540                let elem = std::mem::size_of::<DictionaryCodec>();
1541                callback(self.columns.len() * elem, self.columns.capacity() * elem);
1542                for column in &self.columns {
1543                    column.heap_size(callback);
1544                }
1545            }
1546        }
1547
1548        impl ColumnsCodec {
1549            /// Record a row's column values in the statistics without encoding.
1550            ///
1551            /// Used during the stats-gathering phase, where we want the heavy-hitter
1552            /// and tag-usage information but store the row raw, so encoding into a
1553            /// throwaway buffer would be pure waste.
1554            #[inline]
1555            pub(crate) fn observe<'a, I>(&mut self, iter: I)
1556            where
1557                I: IntoIterator<Item = &'a [u8]>,
1558            {
1559                for (index, bytes) in iter.into_iter().enumerate() {
1560                    if self.columns.len() <= index {
1561                        self.columns.push(Default::default());
1562                    }
1563                    self.columns[index].observe(bytes);
1564                }
1565            }
1566        }
1567
1568        impl ColumnsCodec {
1569            /// Construct a codec using only structurally safe tags.
1570            ///
1571            /// Consumes `self`: this is only ever called on stats that have just been
1572            /// `take`n out of a container and are about to be discarded, so we move the
1573            /// per-column `MisraGries` summaries through rather than cloning them.
1574            pub(crate) fn new_safe(self) -> Self {
1575                let columns = self
1576                    .columns
1577                    .into_iter()
1578                    .map(DictionaryCodec::new_safe)
1579                    .collect();
1580                Self { columns }
1581            }
1582        }
1583
1584        #[derive(Debug, Copy, Clone)]
1585        pub struct ColumnsIter<'a> {
1586            // `None` when iterating an owned row directly, with no codec to consult.
1587            pub index: Option<&'a ColumnsCodec>,
1588            pub column: usize,
1589            pub data: &'a [u8],
1590        }
1591
1592        impl<'a> Iterator for ColumnsIter<'a> {
1593            type Item = &'a [u8];
1594            #[inline(always)]
1595            fn next(&mut self) -> Option<Self::Item> {
1596                if self.data.is_empty() {
1597                    None
1598                } else if let Some(bytes) = self
1599                    .index
1600                    .as_ref()
1601                    .and_then(|i| i.columns.get(self.column))
1602                    .and_then(|i| i.decode.get(self.data[0].into()))
1603                {
1604                    self.data = &self.data[1..];
1605                    self.column += 1;
1606                    Some(bytes)
1607                } else {
1608                    let mut data = self.data;
1609                    let data_len = data.len();
1610                    unsafe {
1611                        read_datum(&mut data);
1612                    }
1613                    let (prev, next) = self.data.split_at(data_len - data.len());
1614                    self.data = next;
1615                    self.column += 1;
1616                    Some(prev)
1617                }
1618            }
1619        }
1620
1621        impl<'a> ColumnsIter<'a> {
1622            /// Create a column iterator without a codec.
1623            ///
1624            /// This requires the data to be row-formatted, and it will be erroneous otherwise.
1625            #[inline(always)]
1626            pub unsafe fn without_codec(data: &'a [u8]) -> Self {
1627                Self {
1628                    index: None,
1629                    column: 0,
1630                    data,
1631                }
1632            }
1633        }
1634    }
1635
1636    /// A dictionary encoding codec for `[Row]` data.
1637    ///
1638    /// The dictionary harvests unused tags within each column and uses them to
1639    /// represent popular values within that column. There are two mechanisms it
1640    /// uses to accomplish this:
1641    ///
1642    /// 1. Statically free tags: `SAFE_TAG_BASE` is taken as an exclusive upper bound
1643    ///    on the tags that will be used by `[Row]`, and tags greater or equal to this
1644    ///    value are always safe to use.
1645    /// 2. Dynamically free tags: having seen an entire collection, we can use any
1646    ///    tag not otherwise used by the collection, as it would not be ambiguous.
1647    ///
1648    /// It goes without saying that if either of these approaches are incorrect,
1649    /// there are calamitous unsoundness implications.
1650    mod dictionary {
1651
1652        use std::collections::BTreeMap;
1653
1654        pub use super::{BytesMap, MisraGries};
1655
1656        /// First byte value that is structurally unused by the datum encoding.
1657        /// All byte values >= this are safe to use as dictionary tags without
1658        /// observing the data, since no datum's first byte can have this value.
1659        ///
1660        /// `mz_repr`'s `Row` `Tag` enum currently has 94 variants (discriminants
1661        /// 0..=93), so the truly tight bound is 94. We deliberately pick a larger,
1662        /// round-ish constant to leave headroom for new tags without having to also
1663        /// bump the safe set, and the `test_safe_tag_base` test pins the real
1664        /// invariant: every datum the row format produces must encode with a first
1665        /// byte strictly less than this value. If a future tag crosses the boundary
1666        /// that test fails loudly rather than silently corrupting decoding.
1667        pub const SAFE_TAG_BASE: u8 = 122;
1668
1669        /// Per-column dictionary codec. Encodes column byte slices, replacing popular
1670        /// values with spare tags; decoding is performed by `ColumnsIter` reading the
1671        /// `decode` map directly.
1672        #[derive(Default, Debug)]
1673        pub struct DictionaryCodec {
1674            encode: BTreeMap<Vec<u8>, u8>,
1675            pub decode: BytesMap,
1676            stats: (MisraGries<Vec<u8>>, [u64; 4]),
1677        }
1678
1679        impl DictionaryCodec {
1680            /// Encode a sequence of byte slices.
1681            ///
1682            /// Encoding also records statistics about the structure of the input.
1683            ///
1684            /// Decoding has no symmetric method here: a column's bytes are decoded by
1685            /// `ColumnsIter`, which consults the `decode` map directly.
1686            pub(super) fn encode<'a, I>(&mut self, iter: I, output: &mut Vec<u8>)
1687            where
1688                I: IntoIterator<Item = &'a [u8]>,
1689            {
1690                for bytes in iter.into_iter() {
1691                    debug_assert!(
1692                        !bytes.is_empty(),
1693                        "row encoding never yields empty column slices",
1694                    );
1695                    // If we have an index referencing `bytes`, use the index key.
1696                    if let Some(b) = self.encode.get(bytes) {
1697                        output.push(*b);
1698                    } else {
1699                        // Raw fall-through. Soundness rests on `bytes[0]` never being a
1700                        // tag we hand out as a dictionary key: `new_from`/`new_safe` only
1701                        // assign dictionary tags from first-byte values that were never
1702                        // observed (or are `>= SAFE_TAG_BASE`, which no datum first-byte
1703                        // can equal). If a literal datum's first byte collided with a
1704                        // dictionary tag, `decode` would resolve it to the dictionary
1705                        // entry instead of reading the datum. This `debug_assert` makes
1706                        // the load-bearing "no later first-byte outside the observed
1707                        // union" invariant self-checking.
1708                        debug_assert!(
1709                            self.decode.get(bytes[0].into()).is_none(),
1710                            "raw datum first-byte {} collides with a dictionary tag; \
1711                             decode would be ambiguous",
1712                            bytes[0],
1713                        );
1714                        output.extend(bytes);
1715                    }
1716                    self.observe(bytes);
1717                }
1718            }
1719
1720            /// Construct a new encoder from supplied statistics.
1721            pub(super) fn new_from<'a>(stats: impl IntoIterator<Item = &'a Self>) -> Self {
1722                // Collect most popular bytes from combined containers.
1723                let mut mg = MisraGries::default();
1724                let mut tags: [u64; 4] = [0; 4];
1725                for stat in stats.into_iter() {
1726                    for (thing, count) in stat.stats.0.clone().done() {
1727                        mg.update(thing, count);
1728                    }
1729                    tags[0] |= stat.stats.1[0];
1730                    tags[1] |= stat.stats.1[1];
1731                    tags[2] |= stat.stats.1[2];
1732                    tags[3] |= stat.stats.1[3];
1733                }
1734                let mut mg = mg
1735                    .done()
1736                    .into_iter()
1737                    .filter(|(next_bytes, count)| next_bytes.len() > 1 && count > &1);
1738                // Establish encoding and decoding rules.
1739                let mut encode = BTreeMap::new();
1740                let mut decode = BytesMap::default();
1741                for tag in 0..=255 {
1742                    let tag_idx: usize = (tag % 4).into();
1743                    let shift = tag >> 2;
1744                    if (tags[tag_idx] >> shift) & 0x01 != 0 {
1745                        // Tag is used by a literal datum first-byte; reserve the slot.
1746                        decode.push(None);
1747                    } else if let Some((next_bytes, _count)) = mg.next() {
1748                        decode.push(Some(&next_bytes[..]));
1749                        encode.insert(next_bytes, tag);
1750                    } else {
1751                        // Unused tag, but the heavy-hitter supply is exhausted. We must
1752                        // still push a slot so that `decode`'s index stays aligned with
1753                        // the tag value: every iteration pushes exactly once, keeping the
1754                        // map length 256 and `decode.get(tag)` addressable by tag.
1755                        decode.push(None);
1756                    }
1757                }
1758
1759                Self {
1760                    encode,
1761                    decode,
1762                    stats: (MisraGries::default(), [0u64; 4]),
1763                }
1764            }
1765        }
1766
1767        impl DictionaryCodec {
1768            /// Visit contained allocations to determine their size and capacity.
1769            ///
1770            /// `BTreeMap` exposes no capacity, so its node storage is approximated as
1771            /// one logical entry's worth of bytes per element; the dominant terms (the
1772            /// owned key bytes and the `decode` map's byte arena) are accounted exactly.
1773            pub fn heap_size(&self, callback: &mut impl FnMut(usize, usize)) {
1774                let entry = std::mem::size_of::<(Vec<u8>, u8)>();
1775                callback(self.encode.len() * entry, self.encode.len() * entry);
1776                for key in self.encode.keys() {
1777                    callback(key.len(), key.capacity());
1778                }
1779                self.decode.heap_size(callback);
1780                self.stats.0.heap_size(callback);
1781            }
1782
1783            /// Record a single column value in this codec's statistics without
1784            /// producing any encoded output.
1785            ///
1786            /// Statistics come in two decoupled parts, with very different costs and
1787            /// purposes:
1788            ///
1789            /// 1. The tag bitmap (`stats.1`) records which first-byte values have been
1790            ///    observed. It is cheap (four `u64` ORs) and *soundness critical*:
1791            ///    `new_from`'s dynamic-tag path only hands out tags that this bitmap
1792            ///    reports as unused, so it must stay accurate for the entire life of the
1793            ///    codec, including on the hot encode path.
1794            /// 2. The MisraGries summary (`stats.0`) tracks heavy hitters and only
1795            ///    affects *which* values a future codec compresses, never correctness.
1796            ///    It is the expensive part (a `BTreeMap` insert per column per row). We
1797            ///    keep feeding it after install, on the hot encode path, on purpose: a
1798            ///    later merge rebuilds the merged codec from these summaries via
1799            ///    `new_from`. If we froze the summary at install time, then as the
1800            ///    collection evolves — records cancel under consolidation, the popular
1801            ///    set drifts — the codec could never reclaim slots for newly-popular
1802            ///    values and would eventually be left compressing values that no longer
1803            ///    occur, ceasing to compress the ones that do.
1804            #[inline]
1805            pub fn observe(&mut self, bytes: &[u8]) {
1806                debug_assert!(
1807                    !bytes.is_empty(),
1808                    "row encoding never yields empty column slices",
1809                );
1810                let tag = bytes[0];
1811                let tag_idx: usize = (tag % 4).into();
1812                self.stats.1[tag_idx] |= 1 << (tag >> 2);
1813                self.stats.0.insert_ref(bytes);
1814            }
1815
1816            /// Construct a codec using only structurally safe tags (>= SAFE_TAG_BASE).
1817            /// These tags never collide with datum first-bytes, so the codec can be
1818            /// installed without observing all data first.
1819            pub(super) fn new_safe(stats: Self) -> Self {
1820                // The container stores its pre-install rows raw, so the first-byte
1821                // bitmap (`stats.1`) gathered while observing them must carry over to
1822                // the installed codec. The bitmap is soundness-critical: a later
1823                // `new_from` merge consults it to decide which one-byte tags are free
1824                // to hand out as dictionary keys. If we dropped it here, the merge
1825                // could assign a dictionary tag equal to a pre-install datum's first
1826                // byte, after which `decode` would resolve that literal datum to the
1827                // dictionary entry. The MisraGries summary (`stats.0`), by contrast,
1828                // is consumed below to seed the dictionary and is reset, since the
1829                // installed codec re-accumulates it from rows it sees post-install.
1830                let (mg, observed_tags) = stats.stats;
1831                let mut mg = mg
1832                    .done()
1833                    .into_iter()
1834                    .filter(|(next_bytes, count)| next_bytes.len() > 1 && count > &1);
1835                let mut encode = BTreeMap::new();
1836                let mut decode = BytesMap::default();
1837                // Fill slots 0..SAFE_TAG_BASE with None (reserved for datum tags).
1838                for _ in 0..SAFE_TAG_BASE {
1839                    decode.push(None);
1840                }
1841                // Assign dictionary entries to safe tags.
1842                for tag in SAFE_TAG_BASE..=255 {
1843                    if let Some((next_bytes, _count)) = mg.next() {
1844                        decode.push(Some(&next_bytes[..]));
1845                        encode.insert(next_bytes, tag);
1846                    }
1847                }
1848                Self {
1849                    encode,
1850                    decode,
1851                    stats: (MisraGries::default(), observed_tags),
1852                }
1853            }
1854        }
1855    }
1856
1857    /// A map from `0 .. something` to `Option<&[u8]>`.
1858    ///
1859    /// Non-empty slices are pushed in order, and can be retrieved by index.
1860    /// Pushing an empty slice is equivalent to pushing `None`.
1861    #[derive(Debug)]
1862    pub struct BytesMap {
1863        offsets: Vec<usize>,
1864        bytes: Vec<u8>,
1865    }
1866    impl Default for BytesMap {
1867        #[inline(always)]
1868        fn default() -> Self {
1869            Self {
1870                offsets: vec![0],
1871                bytes: Vec::new(),
1872            }
1873        }
1874    }
1875    impl BytesMap {
1876        #[inline]
1877        fn push(&mut self, input: Option<&[u8]>) {
1878            if let Some(bytes) = input {
1879                self.bytes.extend(bytes);
1880            }
1881            self.offsets.push(self.bytes.len());
1882        }
1883        /// Visit contained allocations to determine their size and capacity.
1884        fn heap_size(&self, callback: &mut impl FnMut(usize, usize)) {
1885            let off = std::mem::size_of::<usize>();
1886            callback(self.offsets.len() * off, self.offsets.capacity() * off);
1887            callback(self.bytes.len(), self.bytes.capacity());
1888        }
1889        #[inline]
1890        fn get(&self, index: usize) -> Option<&[u8]> {
1891            if index < self.offsets.len() - 1 {
1892                let lower = self.offsets[index];
1893                let upper = self.offsets[index + 1];
1894                if lower < upper {
1895                    Some(&self.bytes[lower..upper])
1896                } else {
1897                    None
1898                }
1899            } else {
1900                None
1901            }
1902        }
1903    }
1904
1905    mod misra_gries {
1906
1907        use std::collections::BTreeMap;
1908
1909        /// Maintains a summary of "heavy hitters" in a presented collection of items.
1910        ///
1911        /// Uses a `BTreeMap` internally so that repeated observations of the same
1912        /// element only allocate once (on first sighting). Tidy is performed when
1913        /// the number of *distinct* elements exceeds `2 * k`, reducing to at most
1914        /// `k` entries.
1915        #[derive(Clone, Debug)]
1916        pub struct MisraGries<T: Ord> {
1917            inner: BTreeMap<T, usize>,
1918            k: usize,
1919        }
1920
1921        impl<T: Ord> Default for MisraGries<T> {
1922            #[inline(always)]
1923            fn default() -> Self {
1924                Self {
1925                    inner: BTreeMap::new(),
1926                    k: 512,
1927                }
1928            }
1929        }
1930
1931        impl<T: Ord> MisraGries<T> {
1932            /// Inserts an additional element to the summary.
1933            #[inline(always)]
1934            pub fn insert(&mut self, element: T) {
1935                self.update(element, 1);
1936            }
1937            /// Inserts multiple copies of an element to the summary.
1938            #[inline]
1939            pub fn update(&mut self, element: T, count: usize) {
1940                *self.inner.entry(element).or_insert(0) += count;
1941                if self.inner.len() > 2 * self.k {
1942                    self.tidy();
1943                }
1944            }
1945
1946            /// Completes the summary, and extracts the items and their counts.
1947            pub fn done(self) -> Vec<(T, usize)> {
1948                let mut result: Vec<_> = self.inner.into_iter().collect();
1949                result.sort_by(|x, y| y.1.cmp(&x.1));
1950                result
1951            }
1952
1953            /// Reduces the summary down to at most `k` distinct items by
1954            /// subtracting the (k+1)-th largest count from all entries and
1955            /// discarding those that drop to zero or below.
1956            fn tidy(&mut self) {
1957                let mut counts: Vec<usize> = self.inner.values().copied().collect();
1958                counts.sort_unstable_by(|a, b| b.cmp(a));
1959                // The (k+1)-th largest count, or 0 if fewer than k+1 entries.
1960                let sub_weight = counts.get(self.k).copied().unwrap_or(0);
1961                if sub_weight > 0 {
1962                    self.inner.retain(|_, count| {
1963                        *count = count.saturating_sub(sub_weight);
1964                        *count > 0
1965                    });
1966                }
1967            }
1968        }
1969
1970        impl MisraGries<Vec<u8>> {
1971            /// Visit contained allocations to determine their size and capacity.
1972            ///
1973            /// `BTreeMap` exposes no capacity, so node storage is approximated as one
1974            /// logical entry per element; the owned key bytes are accounted exactly.
1975            pub fn heap_size(&self, callback: &mut impl FnMut(usize, usize)) {
1976                let entry = std::mem::size_of::<(Vec<u8>, usize)>();
1977                callback(self.inner.len() * entry, self.inner.len() * entry);
1978                for key in self.inner.keys() {
1979                    callback(key.len(), key.capacity());
1980                }
1981            }
1982
1983            /// Insert a borrowed byte slice, only allocating if the key is new.
1984            #[inline]
1985            pub fn insert_ref(&mut self, element: &[u8]) {
1986                if let Some(count) = self.inner.get_mut(element) {
1987                    *count += 1;
1988                } else {
1989                    self.insert(element.to_owned());
1990                }
1991            }
1992        }
1993
1994        impl<T: Ord> std::ops::AddAssign for MisraGries<T> {
1995            fn add_assign(&mut self, rhs: Self) {
1996                for (element, count) in rhs.done() {
1997                    self.update(element, count);
1998                }
1999            }
2000        }
2001    }
2002}