Skip to main content

mz_row_spine/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Packed-bytes differential dataflow spine [`Layout`]s for `Row`-valued
11//! arrangements.
12//!
13//! The spines defined here store `Row` keys and values as concatenated bytes
14//! in a single contiguous backing region (via `mz_ore::region::Region`, which
15//! uses lgalloc when available), instead of as separately-allocated heap
16//! objects. This gives cursor lookups block locality and lets the OS evict
17//! cold pages cleanly under memory pressure.
18//!
19//! [`Layout`]: differential_dataflow::trace::implementations::Layout
20
21pub use self::container::DatumContainer;
22pub use self::container::DatumSeq;
23pub use self::offset_opt::OffsetOptimized;
24pub use self::spines::{
25    RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowRowColPagedBuilder, RowRowSpine,
26    RowSpine, RowValBatcher, RowValBuilder, RowValSpine, ValRowBatcher, ValRowBuilder, ValRowSpine,
27};
28use differential_dataflow::trace::implementations::OffsetList;
29
30/// Spines specialized to contain `Row` types in keys and values.
31mod spines {
32    use std::rc::Rc;
33
34    use columnation::Columnation;
35    use differential_dataflow::trace::implementations::Layout;
36    use differential_dataflow::trace::implementations::Update;
37    use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher;
38    use differential_dataflow::trace::implementations::ord_neu::{OrdKeyBatch, OrdKeyBuilder};
39    use differential_dataflow::trace::implementations::ord_neu::{OrdValBatch, OrdValBuilder};
40    use differential_dataflow::trace::implementations::spine_fueled::Spine;
41    use differential_dataflow::trace::rc_blanket_impls::RcBuilder;
42    use mz_repr::Row;
43    use mz_timely_util::columnar::Column;
44    use mz_timely_util::columnation::{ColInternalMerger, ColumnationStack};
45
46    use crate::{DatumContainer, OffsetOptimized};
47
48    /// Batcher matching `mz_compute::typedefs::KeyValBatcher`, redeclared
49    /// locally so this crate does not need to depend on `mz_compute`.
50    type KeyValBatcher<K, V, T, D> = MergeBatcher<ColInternalMerger<(K, V), T, D>>;
51    type KeyBatcher<K, T, D> = KeyValBatcher<K, (), T, D>;
52
53    pub type RowRowSpine<T, R> = Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), T, R)>>>>;
54    pub type RowRowBatcher<T, R> = KeyValBatcher<Row, Row, T, R>;
55    pub type RowRowBuilder<T, R> = RcBuilder<
56        OrdValBuilder<RowRowLayout<((Row, Row), T, R)>, ColumnationStack<((Row, Row), T, R)>>,
57    >;
58
59    /// `RowRowBuilder` variant that consumes [`Column`] chunks. Pairs with
60    /// [`Col2ValPagedBatcher`] for the spillable arrange path.
61    ///
62    /// [`Col2ValPagedBatcher`]: mz_timely_util::columnar::Col2ValPagedBatcher
63    pub type RowRowColPagedBuilder<T, R> =
64        RcBuilder<OrdValBuilder<RowRowLayout<((Row, Row), T, R)>, Column<((Row, Row), T, R)>>>;
65
66    pub type RowValSpine<V, T, R> = Spine<Rc<OrdValBatch<RowValLayout<((Row, V), T, R)>>>>;
67    pub type RowValBatcher<V, T, R> = KeyValBatcher<Row, V, T, R>;
68    pub type RowValBuilder<V, T, R> = RcBuilder<
69        OrdValBuilder<RowValLayout<((Row, V), T, R)>, ColumnationStack<((Row, V), T, R)>>,
70    >;
71
72    pub type RowSpine<T, R> = Spine<Rc<OrdKeyBatch<RowLayout<((Row, ()), T, R)>>>>;
73    pub type RowBatcher<T, R> = KeyBatcher<Row, T, R>;
74    pub type RowBuilder<T, R> =
75        RcBuilder<OrdKeyBuilder<RowLayout<((Row, ()), T, R)>, ColumnationStack<((Row, ()), T, R)>>>;
76
77    pub type ValRowSpine<K, T, R> = Spine<Rc<OrdValBatch<ValRowLayout<((K, Row), T, R)>>>>;
78    pub type ValRowBatcher<K, T, R> = KeyValBatcher<K, Row, T, R>;
79    pub type ValRowBuilder<K, T, R> = RcBuilder<
80        OrdValBuilder<ValRowLayout<((K, Row), T, R)>, ColumnationStack<((K, Row), T, R)>>,
81    >;
82
83    /// A layout based on timely stacks
84    pub struct RowRowLayout<U: Update<Key = Row, Val = Row>> {
85        phantom: std::marker::PhantomData<U>,
86    }
87    pub struct RowValLayout<U: Update<Key = Row>> {
88        phantom: std::marker::PhantomData<U>,
89    }
90    pub struct RowLayout<U: Update<Key = Row, Val = ()>> {
91        phantom: std::marker::PhantomData<U>,
92    }
93    /// Mirror of [`RowValLayout`] with the roles swapped: arbitrary `Columnation`
94    /// keys with `Row` values stored as packed bytes in a [`DatumContainer`].
95    pub struct ValRowLayout<U: Update<Val = Row>> {
96        phantom: std::marker::PhantomData<U>,
97    }
98
99    impl<U: Update<Key = Row, Val = Row>> Layout for RowRowLayout<U>
100    where
101        U::Time: Columnation,
102        U::Diff: Columnation,
103    {
104        type KeyContainer = DatumContainer;
105        type ValContainer = DatumContainer;
106        type TimeContainer = ColumnationStack<U::Time>;
107        type DiffContainer = ColumnationStack<U::Diff>;
108        type OffsetContainer = OffsetOptimized;
109    }
110    impl<U: Update<Key = Row>> Layout for RowValLayout<U>
111    where
112        U::Val: Columnation,
113        U::Time: Columnation,
114        U::Diff: Columnation,
115    {
116        type KeyContainer = DatumContainer;
117        type ValContainer = ColumnationStack<U::Val>;
118        type TimeContainer = ColumnationStack<U::Time>;
119        type DiffContainer = ColumnationStack<U::Diff>;
120        type OffsetContainer = OffsetOptimized;
121    }
122    impl<U: Update<Key = Row, Val = ()>> Layout for RowLayout<U>
123    where
124        U::Time: Columnation,
125        U::Diff: Columnation,
126    {
127        type KeyContainer = DatumContainer;
128        type ValContainer = ColumnationStack<()>;
129        type TimeContainer = ColumnationStack<U::Time>;
130        type DiffContainer = ColumnationStack<U::Diff>;
131        type OffsetContainer = OffsetOptimized;
132    }
133    impl<U: Update<Val = Row>> Layout for ValRowLayout<U>
134    where
135        U::Key: Columnation,
136        U::Time: Columnation,
137        U::Diff: Columnation,
138    {
139        type KeyContainer = ColumnationStack<U::Key>;
140        type ValContainer = DatumContainer;
141        type TimeContainer = ColumnationStack<U::Time>;
142        type DiffContainer = ColumnationStack<U::Diff>;
143        type OffsetContainer = OffsetOptimized;
144    }
145}
146
147/// A `Row`-specialized container using dictionary compression.
148mod container {
149
150    use std::cmp::Ordering;
151
152    use differential_dataflow::trace::implementations::BatchContainer;
153    use timely::container::PushInto;
154
155    use mz_repr::{Datum, Row, RowPacker, RowRef, read_datum};
156
157    use super::bytes_container::BytesContainer;
158
159    /// Container wrapping `BytesContainer` that traffics only in `Row`-formatted bytes.
160    ///
161    /// This type accepts only `Row`-formatted bytes in its `Push` implementation, and
162    /// in return provides a `DatumSeq` view of the bytes which can be decoded as `Datum`s.
163    pub struct DatumContainer {
164        bytes: BytesContainer,
165    }
166
167    impl DatumContainer {
168        /// Visit contained allocations to determine their size and capacity.
169        #[inline]
170        pub fn heap_size(&self, callback: impl FnMut(usize, usize)) {
171            self.bytes.heap_size(callback)
172        }
173    }
174
175    impl BatchContainer for DatumContainer {
176        type Owned = Row;
177        type ReadItem<'a> = DatumSeq<'a>;
178
179        #[inline(always)]
180        fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned {
181            item.to_row()
182        }
183
184        #[inline]
185        fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) {
186            let mut packer = other.packer();
187            item.copy_into(&mut packer);
188        }
189
190        #[inline(always)]
191        fn push_ref(&mut self, item: Self::ReadItem<'_>) {
192            self.bytes.push_into(item.bytes);
193        }
194
195        #[inline(always)]
196        fn push_own(&mut self, item: &Self::Owned) {
197            self.bytes.push_into(item.data());
198        }
199
200        #[inline(always)]
201        fn clear(&mut self) {
202            self.bytes.clear();
203        }
204
205        #[inline(always)]
206        fn with_capacity(size: usize) -> Self {
207            Self {
208                bytes: BytesContainer::with_capacity(size),
209            }
210        }
211
212        #[inline(always)]
213        fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
214            Self {
215                bytes: BytesContainer::merge_capacity(&cont1.bytes, &cont2.bytes),
216            }
217        }
218
219        #[inline(always)]
220        fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
221            item
222        }
223
224        #[inline(always)]
225        fn index(&self, index: usize) -> Self::ReadItem<'_> {
226            DatumSeq {
227                bytes: self.bytes.index(index),
228            }
229        }
230
231        #[inline(always)]
232        fn len(&self) -> usize {
233            self.bytes.len()
234        }
235    }
236
237    impl PushInto<Row> for DatumContainer {
238        fn push_into(&mut self, item: Row) {
239            self.push_into(&item);
240        }
241    }
242
243    impl PushInto<&Row> for DatumContainer {
244        fn push_into(&mut self, item: &Row) {
245            self.push_own(item);
246        }
247    }
248
249    impl PushInto<DatumSeq<'_>> for DatumContainer {
250        fn push_into(&mut self, item: DatumSeq<'_>) {
251            self.bytes.push_into(item.as_bytes())
252        }
253    }
254
255    impl PushInto<&RowRef> for DatumContainer {
256        fn push_into(&mut self, item: &RowRef) {
257            self.bytes.push_into(item.data())
258        }
259    }
260
261    #[derive(Debug)]
262    pub struct DatumSeq<'a> {
263        bytes: &'a [u8],
264    }
265
266    impl<'a> DatumSeq<'a> {
267        /// Borrow a `Row` as a `DatumSeq` so that it can be used to seek into a
268        /// trace whose key/value container is a [`DatumContainer`].
269        #[inline]
270        pub fn from_row(row: &'a Row) -> Self {
271            Self { bytes: row.data() }
272        }
273
274        #[inline]
275        pub fn copy_into(&self, row: &mut RowPacker) {
276            // SAFETY: `self.bytes` is a correctly formatted row.
277            unsafe { row.extend_by_slice_unchecked(self.bytes) }
278        }
279        #[inline]
280        fn as_bytes(&self) -> &'a [u8] {
281            self.bytes
282        }
283        #[inline]
284        pub fn to_row(&self) -> Row {
285            // SAFETY: `self.bytes` is a correctly formatted row.
286            unsafe { Row::from_bytes_unchecked(self.bytes) }
287        }
288    }
289
290    impl<'a> Copy for DatumSeq<'a> {}
291    impl<'a> Clone for DatumSeq<'a> {
292        #[inline(always)]
293        fn clone(&self) -> Self {
294            *self
295        }
296    }
297
298    impl<'a, 'b> PartialEq<DatumSeq<'a>> for DatumSeq<'b> {
299        #[inline]
300        fn eq(&self, other: &DatumSeq<'a>) -> bool {
301            self.bytes.eq(other.bytes)
302        }
303    }
304    impl<'a> PartialEq<&Row> for DatumSeq<'a> {
305        #[inline]
306        fn eq(&self, other: &&Row) -> bool {
307            self.bytes.eq(other.data())
308        }
309    }
310    impl<'a> PartialEq<&RowRef> for DatumSeq<'a> {
311        #[inline]
312        fn eq(&self, other: &&RowRef) -> bool {
313            self.bytes.eq(other.data())
314        }
315    }
316    impl<'a> Eq for DatumSeq<'a> {}
317    impl<'a, 'b> PartialOrd<DatumSeq<'a>> for DatumSeq<'b> {
318        #[inline]
319        fn partial_cmp(&self, other: &DatumSeq<'a>) -> Option<Ordering> {
320            Some(self.cmp(other))
321        }
322    }
323    impl<'a> Ord for DatumSeq<'a> {
324        #[inline]
325        fn cmp(&self, other: &Self) -> Ordering {
326            match self.bytes.len().cmp(&other.bytes.len()) {
327                std::cmp::Ordering::Less => std::cmp::Ordering::Less,
328                std::cmp::Ordering::Greater => std::cmp::Ordering::Greater,
329                std::cmp::Ordering::Equal => self.bytes.cmp(other.bytes),
330            }
331        }
332    }
333    impl<'a> Iterator for DatumSeq<'a> {
334        type Item = Datum<'a>;
335        #[inline]
336        fn next(&mut self) -> Option<Self::Item> {
337            if self.bytes.is_empty() {
338                None
339            } else {
340                let result = unsafe { read_datum(&mut self.bytes) };
341                Some(result)
342            }
343        }
344    }
345
346    use mz_repr::fixed_length::ToDatumIter;
347    impl<'long> ToDatumIter for DatumSeq<'long> {
348        type DatumIter<'short>
349            = DatumSeq<'short>
350        where
351            Self: 'short;
352        #[inline]
353        fn to_datum_iter<'short>(&'short self) -> Self::DatumIter<'short> {
354            *self
355        }
356    }
357
358    #[cfg(test)]
359    mod tests {
360        use crate::DatumContainer;
361        use differential_dataflow::trace::implementations::BatchContainer;
362        use mz_repr::adt::date::Date;
363        use mz_repr::adt::interval::Interval;
364        use mz_repr::{Datum, Row, SqlScalarType};
365
366        #[mz_ore::test]
367        #[cfg_attr(miri, ignore)] // unsupported operation: integer-to-pointer casts and `ptr::with_exposed_provenance` are not supported
368        fn test_round_trip() {
369            fn round_trip(datums: Vec<Datum>) {
370                let row = Row::pack(datums.clone());
371
372                let mut container = DatumContainer::with_capacity(row.byte_len());
373                container.push_own(&row);
374
375                // When run under miri this catches undefined bytes written to data
376                // eg by calling push_copy! on a type which contains undefined padding values
377                println!("{:?}", container.index(0).bytes);
378
379                let datums2 = container.index(0).collect::<Vec<_>>();
380                assert_eq!(datums, datums2);
381            }
382
383            round_trip(vec![]);
384            round_trip(
385                SqlScalarType::enumerate()
386                    .iter()
387                    .flat_map(|r#type| r#type.interesting_datums())
388                    .collect(),
389            );
390            round_trip(vec![
391                Datum::Null,
392                Datum::Null,
393                Datum::False,
394                Datum::True,
395                Datum::Int16(-21),
396                Datum::Int32(-42),
397                Datum::Int64(-2_147_483_648 - 42),
398                Datum::UInt8(0),
399                Datum::UInt8(1),
400                Datum::UInt16(0),
401                Datum::UInt16(1),
402                Datum::UInt16(1 << 8),
403                Datum::UInt32(0),
404                Datum::UInt32(1),
405                Datum::UInt32(1 << 8),
406                Datum::UInt32(1 << 16),
407                Datum::UInt32(1 << 24),
408                Datum::UInt64(0),
409                Datum::UInt64(1),
410                Datum::UInt64(1 << 8),
411                Datum::UInt64(1 << 16),
412                Datum::UInt64(1 << 24),
413                Datum::UInt64(1 << 32),
414                Datum::UInt64(1 << 40),
415                Datum::UInt64(1 << 48),
416                Datum::UInt64(1 << 56),
417                Datum::Date(Date::from_pg_epoch(365 * 45 + 21).unwrap()),
418                Datum::Interval(Interval {
419                    months: 312,
420                    ..Default::default()
421                }),
422                Datum::Interval(Interval::new(0, 0, 1_012_312)),
423                Datum::Bytes(&[]),
424                Datum::Bytes(&[0, 2, 1, 255]),
425                Datum::String(""),
426                Datum::String("العَرَبِيَّة"),
427            ]);
428        }
429    }
430}
431
432mod bytes_container {
433
434    use differential_dataflow::trace::implementations::BatchContainer;
435    use timely::container::PushInto;
436
437    use mz_ore::region::Region;
438
439    /// A slice container with four bytes overhead per slice.
440    pub struct BytesContainer {
441        /// Total length of `batches`, maintained because recomputation is expensive.
442        length: usize,
443        batches: Vec<BytesBatch>,
444    }
445
446    impl BytesContainer {
447        /// Visit contained allocations to determine their size and capacity.
448        #[inline]
449        pub fn heap_size(&self, mut callback: impl FnMut(usize, usize)) {
450            // Calculate heap size for local, stash, and stash entries
451            callback(
452                self.batches.len() * std::mem::size_of::<BytesBatch>(),
453                self.batches.capacity() * std::mem::size_of::<BytesBatch>(),
454            );
455            for batch in self.batches.iter() {
456                batch.offsets.heap_size(&mut callback);
457                callback(batch.storage.len(), batch.storage.capacity());
458            }
459        }
460    }
461
462    impl BatchContainer for BytesContainer {
463        type Owned = Vec<u8>;
464        type ReadItem<'a> = &'a [u8];
465
466        #[inline]
467        fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned {
468            item.to_vec()
469        }
470
471        #[inline]
472        fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) {
473            other.clear();
474            other.extend_from_slice(item);
475        }
476
477        #[inline(always)]
478        fn push_ref(&mut self, item: Self::ReadItem<'_>) {
479            self.push_into(item);
480        }
481
482        #[inline(always)]
483        fn push_own(&mut self, item: &Self::Owned) {
484            self.push_into(item.as_slice())
485        }
486
487        fn clear(&mut self) {
488            self.batches.clear();
489            self.batches.push(BytesBatch::with_capacities(0, 0));
490            self.length = 0;
491        }
492
493        fn with_capacity(size: usize) -> Self {
494            Self {
495                length: 0,
496                batches: vec![BytesBatch::with_capacities(size, size)],
497            }
498        }
499
500        fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
501            let mut item_cap = 1;
502            let mut byte_cap = 0;
503            for batch in cont1.batches.iter() {
504                item_cap += batch.offsets.len() - 1;
505                byte_cap += batch.storage.len();
506            }
507            for batch in cont2.batches.iter() {
508                item_cap += batch.offsets.len() - 1;
509                byte_cap += batch.storage.len();
510            }
511            Self {
512                length: 0,
513                batches: vec![BytesBatch::with_capacities(item_cap, byte_cap)],
514            }
515        }
516
517        #[inline(always)]
518        fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
519            item
520        }
521
522        #[inline]
523        fn index(&self, mut index: usize) -> Self::ReadItem<'_> {
524            for batch in self.batches.iter() {
525                if index < batch.len() {
526                    return batch.index(index);
527                }
528                index -= batch.len();
529            }
530            panic!("Index out of bounds");
531        }
532
533        #[inline(always)]
534        fn len(&self) -> usize {
535            self.length
536        }
537    }
538
539    impl PushInto<&[u8]> for BytesContainer {
540        #[inline]
541        fn push_into(&mut self, item: &[u8]) {
542            self.length += 1;
543            if let Some(batch) = self.batches.last_mut() {
544                let success = batch.try_push(item);
545                if !success {
546                    // double the lengths from `batch`.
547                    let item_cap = 2 * batch.offsets.len();
548                    let byte_cap = std::cmp::max(2 * batch.storage.capacity(), item.len());
549                    let mut new_batch = BytesBatch::with_capacities(item_cap, byte_cap);
550                    assert!(new_batch.try_push(item));
551                    self.batches.push(new_batch);
552                }
553            }
554        }
555    }
556
557    /// A batch of slice storage.
558    ///
559    /// The backing storage for this batch will not be resized.
560    pub struct BytesBatch {
561        offsets: crate::OffsetOptimized,
562        storage: Region<u8>,
563        len: usize,
564    }
565
566    impl BytesBatch {
567        /// Either accepts the slice and returns true,
568        /// or does not and returns false.
569        fn try_push(&mut self, slice: &[u8]) -> bool {
570            if self.storage.len() + slice.len() <= self.storage.capacity() {
571                self.storage.extend_from_slice(slice);
572                self.offsets.push_into(self.storage.len());
573                self.len += 1;
574                true
575            } else {
576                false
577            }
578        }
579        #[inline]
580        fn index(&self, index: usize) -> &[u8] {
581            let lower = self.offsets.index(index);
582            let upper = self.offsets.index(index + 1);
583            &self.storage[lower..upper]
584        }
585        #[inline(always)]
586        fn len(&self) -> usize {
587            debug_assert_eq!(self.len, self.offsets.len() - 1);
588            self.len
589        }
590
591        fn with_capacities(item_cap: usize, byte_cap: usize) -> Self {
592            // TODO: be wary of `byte_cap` greater than 2^32.
593            let mut offsets = crate::OffsetOptimized::with_capacity(item_cap + 1);
594            offsets.push_into(0);
595            Self {
596                offsets,
597                storage: Region::new_auto(byte_cap.next_power_of_two()),
598                len: 0,
599            }
600        }
601    }
602}
603
604mod offset_opt {
605    use differential_dataflow::trace::implementations::BatchContainer;
606    use differential_dataflow::trace::implementations::OffsetList;
607    use timely::container::PushInto;
608
609    enum OffsetStride {
610        Empty,
611        Zero,
612        Striding(usize, usize),
613        Saturated(usize, usize, usize),
614    }
615
616    impl OffsetStride {
617        /// Accepts or rejects a newly pushed element.
618        #[inline]
619        fn push(&mut self, item: usize) -> bool {
620            match self {
621                OffsetStride::Empty => {
622                    if item == 0 {
623                        *self = OffsetStride::Zero;
624                        true
625                    } else {
626                        false
627                    }
628                }
629                OffsetStride::Zero => {
630                    *self = OffsetStride::Striding(item, 2);
631                    true
632                }
633                OffsetStride::Striding(stride, count) => {
634                    if item == *stride * *count {
635                        *count += 1;
636                        true
637                    } else if item == *stride * (*count - 1) {
638                        *self = OffsetStride::Saturated(*stride, *count, 1);
639                        true
640                    } else {
641                        false
642                    }
643                }
644                OffsetStride::Saturated(stride, count, reps) => {
645                    if item == *stride * (*count - 1) {
646                        *reps += 1;
647                        true
648                    } else {
649                        false
650                    }
651                }
652            }
653        }
654
655        #[inline]
656        fn index(&self, index: usize) -> usize {
657            match self {
658                OffsetStride::Empty => {
659                    panic!("Empty OffsetStride")
660                }
661                OffsetStride::Zero => 0,
662                OffsetStride::Striding(stride, _steps) => *stride * index,
663                OffsetStride::Saturated(stride, steps, _reps) => {
664                    if index < *steps {
665                        *stride * index
666                    } else {
667                        *stride * (*steps - 1)
668                    }
669                }
670            }
671        }
672
673        #[inline]
674        fn len(&self) -> usize {
675            match self {
676                OffsetStride::Empty => 0,
677                OffsetStride::Zero => 1,
678                OffsetStride::Striding(_stride, steps) => *steps,
679                OffsetStride::Saturated(_stride, steps, reps) => *steps + *reps,
680            }
681        }
682    }
683
684    pub struct OffsetOptimized {
685        strided: OffsetStride,
686        spilled: OffsetList,
687    }
688
689    impl BatchContainer for OffsetOptimized {
690        type Owned = usize;
691        type ReadItem<'a> = usize;
692
693        #[inline]
694        fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned {
695            item
696        }
697
698        #[inline]
699        fn push_ref(&mut self, item: Self::ReadItem<'_>) {
700            self.push_into(item)
701        }
702
703        #[inline]
704        fn push_own(&mut self, item: &Self::Owned) {
705            self.push_into(*item)
706        }
707
708        fn clear(&mut self) {
709            self.strided = OffsetStride::Empty;
710            self.spilled.clear();
711        }
712
713        fn with_capacity(_size: usize) -> Self {
714            Self {
715                strided: OffsetStride::Empty,
716                spilled: OffsetList::with_capacity(0),
717            }
718        }
719
720        fn merge_capacity(_cont1: &Self, _cont2: &Self) -> Self {
721            Self {
722                strided: OffsetStride::Empty,
723                spilled: OffsetList::with_capacity(0),
724            }
725        }
726
727        #[inline]
728        fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
729            item
730        }
731
732        #[inline]
733        fn index(&self, index: usize) -> Self::ReadItem<'_> {
734            if index < self.strided.len() {
735                self.strided.index(index)
736            } else {
737                self.spilled.index(index - self.strided.len())
738            }
739        }
740
741        #[inline]
742        fn len(&self) -> usize {
743            self.strided.len() + self.spilled.len()
744        }
745    }
746
747    impl PushInto<usize> for OffsetOptimized {
748        #[inline]
749        fn push_into(&mut self, item: usize) {
750            if !self.spilled.is_empty() {
751                self.spilled.push(item);
752            } else {
753                let inserted = self.strided.push(item);
754                if !inserted {
755                    self.spilled.push(item);
756                }
757            }
758        }
759    }
760
761    impl OffsetOptimized {
762        pub fn heap_size(&self, callback: impl FnMut(usize, usize)) {
763            crate::offset_list_size(&self.spilled, callback);
764        }
765    }
766}
767
768/// Helper to compute the size of an [`OffsetList`] in memory.
769#[inline]
770pub(crate) fn offset_list_size(data: &OffsetList, mut callback: impl FnMut(usize, usize)) {
771    // Private `vec_size` because we should only use it where data isn't region-allocated.
772    // `T: Copy` makes sure the implementation is correct even if types change!
773    #[inline(always)]
774    fn vec_size<T: Copy>(data: &Vec<T>, mut callback: impl FnMut(usize, usize)) {
775        let size_of_t = std::mem::size_of::<T>();
776        callback(data.len() * size_of_t, data.capacity() * size_of_t);
777    }
778
779    vec_size(&data.smol, &mut callback);
780    vec_size(&data.chonk, callback);
781}