Skip to main content

mz_row_spine/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Packed-bytes differential dataflow spine [`Layout`]s for `Row`-valued
11//! arrangements.
12//!
13//! The spines defined here store `Row` keys and values as concatenated bytes
14//! in a single contiguous backing region (via `mz_ore::region::Region`, which
15//! uses lgalloc when available), instead of as separately-allocated heap
16//! objects. This gives cursor lookups block locality and lets the OS evict
17//! cold pages cleanly under memory pressure.
18//!
19//! [`Layout`]: differential_dataflow::trace::implementations::Layout
20
21pub use self::container::DatumContainer;
22pub use self::container::DatumSeq;
23pub use self::offset_opt::OffsetOptimized;
24pub use self::spines::{
25    RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowRowSpine, RowSpine, RowValBatcher,
26    RowValBuilder, RowValSpine, ValRowBatcher, ValRowBuilder, ValRowSpine,
27};
28use differential_dataflow::trace::implementations::OffsetList;
29
30/// Spines specialized to contain `Row` types in keys and values.
31mod spines {
32    use std::rc::Rc;
33
34    use columnation::Columnation;
35    use differential_dataflow::trace::implementations::Layout;
36    use differential_dataflow::trace::implementations::Update;
37    use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher;
38    use differential_dataflow::trace::implementations::ord_neu::{OrdKeyBatch, OrdKeyBuilder};
39    use differential_dataflow::trace::implementations::ord_neu::{OrdValBatch, OrdValBuilder};
40    use differential_dataflow::trace::implementations::spine_fueled::Spine;
41    use differential_dataflow::trace::rc_blanket_impls::RcBuilder;
42    use mz_repr::Row;
43    use mz_timely_util::columnation::{ColInternalMerger, ColumnationChunker, ColumnationStack};
44
45    use crate::{DatumContainer, OffsetOptimized};
46
47    /// Batcher matching `mz_compute::typedefs::KeyValBatcher`, redeclared
48    /// locally so this crate does not need to depend on `mz_compute`.
49    type KeyValBatcher<K, V, T, D> = MergeBatcher<
50        Vec<((K, V), T, D)>,
51        ColumnationChunker<((K, V), T, D)>,
52        ColInternalMerger<(K, V), T, D>,
53    >;
54    type KeyBatcher<K, T, D> = KeyValBatcher<K, (), T, D>;
55
56    pub type RowRowSpine<T, R> = Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), T, R)>>>>;
57    pub type RowRowBatcher<T, R> = KeyValBatcher<Row, Row, T, R>;
58    pub type RowRowBuilder<T, R> = RcBuilder<
59        OrdValBuilder<RowRowLayout<((Row, Row), T, R)>, ColumnationStack<((Row, Row), T, R)>>,
60    >;
61
62    pub type RowValSpine<V, T, R> = Spine<Rc<OrdValBatch<RowValLayout<((Row, V), T, R)>>>>;
63    pub type RowValBatcher<V, T, R> = KeyValBatcher<Row, V, T, R>;
64    pub type RowValBuilder<V, T, R> = RcBuilder<
65        OrdValBuilder<RowValLayout<((Row, V), T, R)>, ColumnationStack<((Row, V), T, R)>>,
66    >;
67
68    pub type RowSpine<T, R> = Spine<Rc<OrdKeyBatch<RowLayout<((Row, ()), T, R)>>>>;
69    pub type RowBatcher<T, R> = KeyBatcher<Row, T, R>;
70    pub type RowBuilder<T, R> =
71        RcBuilder<OrdKeyBuilder<RowLayout<((Row, ()), T, R)>, ColumnationStack<((Row, ()), T, R)>>>;
72
73    pub type ValRowSpine<K, T, R> = Spine<Rc<OrdValBatch<ValRowLayout<((K, Row), T, R)>>>>;
74    pub type ValRowBatcher<K, T, R> = KeyValBatcher<K, Row, T, R>;
75    pub type ValRowBuilder<K, T, R> = RcBuilder<
76        OrdValBuilder<ValRowLayout<((K, Row), T, R)>, ColumnationStack<((K, Row), T, R)>>,
77    >;
78
79    /// A layout based on timely stacks
80    pub struct RowRowLayout<U: Update<Key = Row, Val = Row>> {
81        phantom: std::marker::PhantomData<U>,
82    }
83    pub struct RowValLayout<U: Update<Key = Row>> {
84        phantom: std::marker::PhantomData<U>,
85    }
86    pub struct RowLayout<U: Update<Key = Row, Val = ()>> {
87        phantom: std::marker::PhantomData<U>,
88    }
89    /// Mirror of [`RowValLayout`] with the roles swapped: arbitrary `Columnation`
90    /// keys with `Row` values stored as packed bytes in a [`DatumContainer`].
91    pub struct ValRowLayout<U: Update<Val = Row>> {
92        phantom: std::marker::PhantomData<U>,
93    }
94
95    impl<U: Update<Key = Row, Val = Row>> Layout for RowRowLayout<U>
96    where
97        U::Time: Columnation,
98        U::Diff: Columnation,
99    {
100        type KeyContainer = DatumContainer;
101        type ValContainer = DatumContainer;
102        type TimeContainer = ColumnationStack<U::Time>;
103        type DiffContainer = ColumnationStack<U::Diff>;
104        type OffsetContainer = OffsetOptimized;
105    }
106    impl<U: Update<Key = Row>> Layout for RowValLayout<U>
107    where
108        U::Val: Columnation,
109        U::Time: Columnation,
110        U::Diff: Columnation,
111    {
112        type KeyContainer = DatumContainer;
113        type ValContainer = ColumnationStack<U::Val>;
114        type TimeContainer = ColumnationStack<U::Time>;
115        type DiffContainer = ColumnationStack<U::Diff>;
116        type OffsetContainer = OffsetOptimized;
117    }
118    impl<U: Update<Key = Row, Val = ()>> Layout for RowLayout<U>
119    where
120        U::Time: Columnation,
121        U::Diff: Columnation,
122    {
123        type KeyContainer = DatumContainer;
124        type ValContainer = ColumnationStack<()>;
125        type TimeContainer = ColumnationStack<U::Time>;
126        type DiffContainer = ColumnationStack<U::Diff>;
127        type OffsetContainer = OffsetOptimized;
128    }
129    impl<U: Update<Val = Row>> Layout for ValRowLayout<U>
130    where
131        U::Key: Columnation,
132        U::Time: Columnation,
133        U::Diff: Columnation,
134    {
135        type KeyContainer = ColumnationStack<U::Key>;
136        type ValContainer = DatumContainer;
137        type TimeContainer = ColumnationStack<U::Time>;
138        type DiffContainer = ColumnationStack<U::Diff>;
139        type OffsetContainer = OffsetOptimized;
140    }
141}
142
143/// A `Row`-specialized container using dictionary compression.
144mod container {
145
146    use std::cmp::Ordering;
147
148    use differential_dataflow::trace::implementations::BatchContainer;
149    use timely::container::PushInto;
150
151    use mz_repr::{Datum, Row, RowPacker, read_datum};
152
153    use super::bytes_container::BytesContainer;
154
155    /// Container wrapping `BytesContainer` that traffics only in `Row`-formatted bytes.
156    ///
157    /// This type accepts only `Row`-formatted bytes in its `Push` implementation, and
158    /// in return provides a `DatumSeq` view of the bytes which can be decoded as `Datum`s.
159    pub struct DatumContainer {
160        bytes: BytesContainer,
161    }
162
163    impl DatumContainer {
164        /// Visit contained allocations to determine their size and capacity.
165        #[inline]
166        pub fn heap_size(&self, callback: impl FnMut(usize, usize)) {
167            self.bytes.heap_size(callback)
168        }
169    }
170
171    impl BatchContainer for DatumContainer {
172        type Owned = Row;
173        type ReadItem<'a> = DatumSeq<'a>;
174
175        #[inline(always)]
176        fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned {
177            item.to_row()
178        }
179
180        #[inline]
181        fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) {
182            let mut packer = other.packer();
183            item.copy_into(&mut packer);
184        }
185
186        #[inline(always)]
187        fn push_ref(&mut self, item: Self::ReadItem<'_>) {
188            self.bytes.push_into(item.bytes);
189        }
190
191        #[inline(always)]
192        fn push_own(&mut self, item: &Self::Owned) {
193            self.bytes.push_into(item.data());
194        }
195
196        #[inline(always)]
197        fn clear(&mut self) {
198            self.bytes.clear();
199        }
200
201        #[inline(always)]
202        fn with_capacity(size: usize) -> Self {
203            Self {
204                bytes: BytesContainer::with_capacity(size),
205            }
206        }
207
208        #[inline(always)]
209        fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
210            Self {
211                bytes: BytesContainer::merge_capacity(&cont1.bytes, &cont2.bytes),
212            }
213        }
214
215        #[inline(always)]
216        fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
217            item
218        }
219
220        #[inline(always)]
221        fn index(&self, index: usize) -> Self::ReadItem<'_> {
222            DatumSeq {
223                bytes: self.bytes.index(index),
224            }
225        }
226
227        #[inline(always)]
228        fn len(&self) -> usize {
229            self.bytes.len()
230        }
231    }
232
233    impl PushInto<Row> for DatumContainer {
234        fn push_into(&mut self, item: Row) {
235            self.push_into(&item);
236        }
237    }
238
239    impl PushInto<&Row> for DatumContainer {
240        fn push_into(&mut self, item: &Row) {
241            self.push_own(item);
242        }
243    }
244
245    impl PushInto<DatumSeq<'_>> for DatumContainer {
246        fn push_into(&mut self, item: DatumSeq<'_>) {
247            self.bytes.push_into(item.as_bytes())
248        }
249    }
250
251    #[derive(Debug)]
252    pub struct DatumSeq<'a> {
253        bytes: &'a [u8],
254    }
255
256    impl<'a> DatumSeq<'a> {
257        /// Borrow a `Row` as a `DatumSeq` so that it can be used to seek into a
258        /// trace whose key/value container is a [`DatumContainer`].
259        #[inline]
260        pub fn from_row(row: &'a Row) -> Self {
261            Self { bytes: row.data() }
262        }
263
264        #[inline]
265        pub fn copy_into(&self, row: &mut RowPacker) {
266            // SAFETY: `self.bytes` is a correctly formatted row.
267            unsafe { row.extend_by_slice_unchecked(self.bytes) }
268        }
269        #[inline]
270        fn as_bytes(&self) -> &'a [u8] {
271            self.bytes
272        }
273        #[inline]
274        pub fn to_row(&self) -> Row {
275            // SAFETY: `self.bytes` is a correctly formatted row.
276            unsafe { Row::from_bytes_unchecked(self.bytes) }
277        }
278    }
279
280    impl<'a> Copy for DatumSeq<'a> {}
281    impl<'a> Clone for DatumSeq<'a> {
282        #[inline(always)]
283        fn clone(&self) -> Self {
284            *self
285        }
286    }
287
288    impl<'a, 'b> PartialEq<DatumSeq<'a>> for DatumSeq<'b> {
289        #[inline]
290        fn eq(&self, other: &DatumSeq<'a>) -> bool {
291            self.bytes.eq(other.bytes)
292        }
293    }
294    impl<'a> PartialEq<&Row> for DatumSeq<'a> {
295        #[inline]
296        fn eq(&self, other: &&Row) -> bool {
297            self.bytes.eq(other.data())
298        }
299    }
300    impl<'a> Eq for DatumSeq<'a> {}
301    impl<'a, 'b> PartialOrd<DatumSeq<'a>> for DatumSeq<'b> {
302        #[inline]
303        fn partial_cmp(&self, other: &DatumSeq<'a>) -> Option<Ordering> {
304            Some(self.cmp(other))
305        }
306    }
307    impl<'a> Ord for DatumSeq<'a> {
308        #[inline]
309        fn cmp(&self, other: &Self) -> Ordering {
310            match self.bytes.len().cmp(&other.bytes.len()) {
311                std::cmp::Ordering::Less => std::cmp::Ordering::Less,
312                std::cmp::Ordering::Greater => std::cmp::Ordering::Greater,
313                std::cmp::Ordering::Equal => self.bytes.cmp(other.bytes),
314            }
315        }
316    }
317    impl<'a> Iterator for DatumSeq<'a> {
318        type Item = Datum<'a>;
319        #[inline]
320        fn next(&mut self) -> Option<Self::Item> {
321            if self.bytes.is_empty() {
322                None
323            } else {
324                let result = unsafe { read_datum(&mut self.bytes) };
325                Some(result)
326            }
327        }
328    }
329
330    use mz_repr::fixed_length::ToDatumIter;
331    impl<'long> ToDatumIter for DatumSeq<'long> {
332        type DatumIter<'short>
333            = DatumSeq<'short>
334        where
335            Self: 'short;
336        #[inline]
337        fn to_datum_iter<'short>(&'short self) -> Self::DatumIter<'short> {
338            *self
339        }
340    }
341
342    #[cfg(test)]
343    mod tests {
344        use crate::DatumContainer;
345        use differential_dataflow::trace::implementations::BatchContainer;
346        use mz_repr::adt::date::Date;
347        use mz_repr::adt::interval::Interval;
348        use mz_repr::{Datum, Row, SqlScalarType};
349
350        #[mz_ore::test]
351        #[cfg_attr(miri, ignore)] // unsupported operation: integer-to-pointer casts and `ptr::with_exposed_provenance` are not supported
352        fn test_round_trip() {
353            fn round_trip(datums: Vec<Datum>) {
354                let row = Row::pack(datums.clone());
355
356                let mut container = DatumContainer::with_capacity(row.byte_len());
357                container.push_own(&row);
358
359                // When run under miri this catches undefined bytes written to data
360                // eg by calling push_copy! on a type which contains undefined padding values
361                println!("{:?}", container.index(0).bytes);
362
363                let datums2 = container.index(0).collect::<Vec<_>>();
364                assert_eq!(datums, datums2);
365            }
366
367            round_trip(vec![]);
368            round_trip(
369                SqlScalarType::enumerate()
370                    .iter()
371                    .flat_map(|r#type| r#type.interesting_datums())
372                    .collect(),
373            );
374            round_trip(vec![
375                Datum::Null,
376                Datum::Null,
377                Datum::False,
378                Datum::True,
379                Datum::Int16(-21),
380                Datum::Int32(-42),
381                Datum::Int64(-2_147_483_648 - 42),
382                Datum::UInt8(0),
383                Datum::UInt8(1),
384                Datum::UInt16(0),
385                Datum::UInt16(1),
386                Datum::UInt16(1 << 8),
387                Datum::UInt32(0),
388                Datum::UInt32(1),
389                Datum::UInt32(1 << 8),
390                Datum::UInt32(1 << 16),
391                Datum::UInt32(1 << 24),
392                Datum::UInt64(0),
393                Datum::UInt64(1),
394                Datum::UInt64(1 << 8),
395                Datum::UInt64(1 << 16),
396                Datum::UInt64(1 << 24),
397                Datum::UInt64(1 << 32),
398                Datum::UInt64(1 << 40),
399                Datum::UInt64(1 << 48),
400                Datum::UInt64(1 << 56),
401                Datum::Date(Date::from_pg_epoch(365 * 45 + 21).unwrap()),
402                Datum::Interval(Interval {
403                    months: 312,
404                    ..Default::default()
405                }),
406                Datum::Interval(Interval::new(0, 0, 1_012_312)),
407                Datum::Bytes(&[]),
408                Datum::Bytes(&[0, 2, 1, 255]),
409                Datum::String(""),
410                Datum::String("العَرَبِيَّة"),
411            ]);
412        }
413    }
414}
415
416mod bytes_container {
417
418    use differential_dataflow::trace::implementations::BatchContainer;
419    use timely::container::PushInto;
420
421    use mz_ore::region::Region;
422
423    /// A slice container with four bytes overhead per slice.
424    pub struct BytesContainer {
425        /// Total length of `batches`, maintained because recomputation is expensive.
426        length: usize,
427        batches: Vec<BytesBatch>,
428    }
429
430    impl BytesContainer {
431        /// Visit contained allocations to determine their size and capacity.
432        #[inline]
433        pub fn heap_size(&self, mut callback: impl FnMut(usize, usize)) {
434            // Calculate heap size for local, stash, and stash entries
435            callback(
436                self.batches.len() * std::mem::size_of::<BytesBatch>(),
437                self.batches.capacity() * std::mem::size_of::<BytesBatch>(),
438            );
439            for batch in self.batches.iter() {
440                batch.offsets.heap_size(&mut callback);
441                callback(batch.storage.len(), batch.storage.capacity());
442            }
443        }
444    }
445
446    impl BatchContainer for BytesContainer {
447        type Owned = Vec<u8>;
448        type ReadItem<'a> = &'a [u8];
449
450        #[inline]
451        fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned {
452            item.to_vec()
453        }
454
455        #[inline]
456        fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) {
457            other.clear();
458            other.extend_from_slice(item);
459        }
460
461        #[inline(always)]
462        fn push_ref(&mut self, item: Self::ReadItem<'_>) {
463            self.push_into(item);
464        }
465
466        #[inline(always)]
467        fn push_own(&mut self, item: &Self::Owned) {
468            self.push_into(item.as_slice())
469        }
470
471        fn clear(&mut self) {
472            self.batches.clear();
473            self.batches.push(BytesBatch::with_capacities(0, 0));
474            self.length = 0;
475        }
476
477        fn with_capacity(size: usize) -> Self {
478            Self {
479                length: 0,
480                batches: vec![BytesBatch::with_capacities(size, size)],
481            }
482        }
483
484        fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
485            let mut item_cap = 1;
486            let mut byte_cap = 0;
487            for batch in cont1.batches.iter() {
488                item_cap += batch.offsets.len() - 1;
489                byte_cap += batch.storage.len();
490            }
491            for batch in cont2.batches.iter() {
492                item_cap += batch.offsets.len() - 1;
493                byte_cap += batch.storage.len();
494            }
495            Self {
496                length: 0,
497                batches: vec![BytesBatch::with_capacities(item_cap, byte_cap)],
498            }
499        }
500
501        #[inline(always)]
502        fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
503            item
504        }
505
506        #[inline]
507        fn index(&self, mut index: usize) -> Self::ReadItem<'_> {
508            for batch in self.batches.iter() {
509                if index < batch.len() {
510                    return batch.index(index);
511                }
512                index -= batch.len();
513            }
514            panic!("Index out of bounds");
515        }
516
517        #[inline(always)]
518        fn len(&self) -> usize {
519            self.length
520        }
521    }
522
523    impl PushInto<&[u8]> for BytesContainer {
524        #[inline]
525        fn push_into(&mut self, item: &[u8]) {
526            self.length += 1;
527            if let Some(batch) = self.batches.last_mut() {
528                let success = batch.try_push(item);
529                if !success {
530                    // double the lengths from `batch`.
531                    let item_cap = 2 * batch.offsets.len();
532                    let byte_cap = std::cmp::max(2 * batch.storage.capacity(), item.len());
533                    let mut new_batch = BytesBatch::with_capacities(item_cap, byte_cap);
534                    assert!(new_batch.try_push(item));
535                    self.batches.push(new_batch);
536                }
537            }
538        }
539    }
540
541    /// A batch of slice storage.
542    ///
543    /// The backing storage for this batch will not be resized.
544    pub struct BytesBatch {
545        offsets: crate::OffsetOptimized,
546        storage: Region<u8>,
547        len: usize,
548    }
549
550    impl BytesBatch {
551        /// Either accepts the slice and returns true,
552        /// or does not and returns false.
553        fn try_push(&mut self, slice: &[u8]) -> bool {
554            if self.storage.len() + slice.len() <= self.storage.capacity() {
555                self.storage.extend_from_slice(slice);
556                self.offsets.push_into(self.storage.len());
557                self.len += 1;
558                true
559            } else {
560                false
561            }
562        }
563        #[inline]
564        fn index(&self, index: usize) -> &[u8] {
565            let lower = self.offsets.index(index);
566            let upper = self.offsets.index(index + 1);
567            &self.storage[lower..upper]
568        }
569        #[inline(always)]
570        fn len(&self) -> usize {
571            debug_assert_eq!(self.len, self.offsets.len() - 1);
572            self.len
573        }
574
575        fn with_capacities(item_cap: usize, byte_cap: usize) -> Self {
576            // TODO: be wary of `byte_cap` greater than 2^32.
577            let mut offsets = crate::OffsetOptimized::with_capacity(item_cap + 1);
578            offsets.push_into(0);
579            Self {
580                offsets,
581                storage: Region::new_auto(byte_cap.next_power_of_two()),
582                len: 0,
583            }
584        }
585    }
586}
587
588mod offset_opt {
589    use differential_dataflow::trace::implementations::BatchContainer;
590    use differential_dataflow::trace::implementations::OffsetList;
591    use timely::container::PushInto;
592
593    enum OffsetStride {
594        Empty,
595        Zero,
596        Striding(usize, usize),
597        Saturated(usize, usize, usize),
598    }
599
600    impl OffsetStride {
601        /// Accepts or rejects a newly pushed element.
602        #[inline]
603        fn push(&mut self, item: usize) -> bool {
604            match self {
605                OffsetStride::Empty => {
606                    if item == 0 {
607                        *self = OffsetStride::Zero;
608                        true
609                    } else {
610                        false
611                    }
612                }
613                OffsetStride::Zero => {
614                    *self = OffsetStride::Striding(item, 2);
615                    true
616                }
617                OffsetStride::Striding(stride, count) => {
618                    if item == *stride * *count {
619                        *count += 1;
620                        true
621                    } else if item == *stride * (*count - 1) {
622                        *self = OffsetStride::Saturated(*stride, *count, 1);
623                        true
624                    } else {
625                        false
626                    }
627                }
628                OffsetStride::Saturated(stride, count, reps) => {
629                    if item == *stride * (*count - 1) {
630                        *reps += 1;
631                        true
632                    } else {
633                        false
634                    }
635                }
636            }
637        }
638
639        #[inline]
640        fn index(&self, index: usize) -> usize {
641            match self {
642                OffsetStride::Empty => {
643                    panic!("Empty OffsetStride")
644                }
645                OffsetStride::Zero => 0,
646                OffsetStride::Striding(stride, _steps) => *stride * index,
647                OffsetStride::Saturated(stride, steps, _reps) => {
648                    if index < *steps {
649                        *stride * index
650                    } else {
651                        *stride * (*steps - 1)
652                    }
653                }
654            }
655        }
656
657        #[inline]
658        fn len(&self) -> usize {
659            match self {
660                OffsetStride::Empty => 0,
661                OffsetStride::Zero => 1,
662                OffsetStride::Striding(_stride, steps) => *steps,
663                OffsetStride::Saturated(_stride, steps, reps) => *steps + *reps,
664            }
665        }
666    }
667
668    pub struct OffsetOptimized {
669        strided: OffsetStride,
670        spilled: OffsetList,
671    }
672
673    impl BatchContainer for OffsetOptimized {
674        type Owned = usize;
675        type ReadItem<'a> = usize;
676
677        #[inline]
678        fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned {
679            item
680        }
681
682        #[inline]
683        fn push_ref(&mut self, item: Self::ReadItem<'_>) {
684            self.push_into(item)
685        }
686
687        #[inline]
688        fn push_own(&mut self, item: &Self::Owned) {
689            self.push_into(*item)
690        }
691
692        fn clear(&mut self) {
693            self.strided = OffsetStride::Empty;
694            self.spilled.clear();
695        }
696
697        fn with_capacity(_size: usize) -> Self {
698            Self {
699                strided: OffsetStride::Empty,
700                spilled: OffsetList::with_capacity(0),
701            }
702        }
703
704        fn merge_capacity(_cont1: &Self, _cont2: &Self) -> Self {
705            Self {
706                strided: OffsetStride::Empty,
707                spilled: OffsetList::with_capacity(0),
708            }
709        }
710
711        #[inline]
712        fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
713            item
714        }
715
716        #[inline]
717        fn index(&self, index: usize) -> Self::ReadItem<'_> {
718            if index < self.strided.len() {
719                self.strided.index(index)
720            } else {
721                self.spilled.index(index - self.strided.len())
722            }
723        }
724
725        #[inline]
726        fn len(&self) -> usize {
727            self.strided.len() + self.spilled.len()
728        }
729    }
730
731    impl PushInto<usize> for OffsetOptimized {
732        #[inline]
733        fn push_into(&mut self, item: usize) {
734            if !self.spilled.is_empty() {
735                self.spilled.push(item);
736            } else {
737                let inserted = self.strided.push(item);
738                if !inserted {
739                    self.spilled.push(item);
740                }
741            }
742        }
743    }
744
745    impl OffsetOptimized {
746        pub fn heap_size(&self, callback: impl FnMut(usize, usize)) {
747            crate::offset_list_size(&self.spilled, callback);
748        }
749    }
750}
751
752/// Helper to compute the size of an [`OffsetList`] in memory.
753#[inline]
754pub(crate) fn offset_list_size(data: &OffsetList, mut callback: impl FnMut(usize, usize)) {
755    // Private `vec_size` because we should only use it where data isn't region-allocated.
756    // `T: Copy` makes sure the implementation is correct even if types change!
757    #[inline(always)]
758    fn vec_size<T: Copy>(data: &Vec<T>, mut callback: impl FnMut(usize, usize)) {
759        let size_of_t = std::mem::size_of::<T>();
760        callback(data.len() * size_of_t, data.capacity() * size_of_t);
761    }
762
763    vec_size(&data.smol, &mut callback);
764    vec_size(&data.chonk, callback);
765}