Skip to main content

mz_compute/
row_spine.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10pub use self::container::DatumContainer;
11pub use self::container::DatumSeq;
12pub use self::offset_opt::OffsetOptimized;
13pub use self::spines::{
14    RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowRowSpine, RowSpine, RowValBatcher,
15    RowValBuilder, RowValSpine,
16};
17use differential_dataflow::trace::implementations::OffsetList;
18
19/// Spines specialized to contain `Row` types in keys and values.
20mod spines {
21    use std::rc::Rc;
22
23    use columnation::Columnation;
24    use differential_dataflow::trace::implementations::Layout;
25    use differential_dataflow::trace::implementations::Update;
26    use differential_dataflow::trace::implementations::ord_neu::{OrdKeyBatch, OrdKeyBuilder};
27    use differential_dataflow::trace::implementations::ord_neu::{OrdValBatch, OrdValBuilder};
28    use differential_dataflow::trace::implementations::spine_fueled::Spine;
29    use differential_dataflow::trace::rc_blanket_impls::RcBuilder;
30    use mz_repr::Row;
31    use mz_timely_util::columnation::ColumnationStack;
32
33    use crate::row_spine::{DatumContainer, OffsetOptimized};
34    use crate::typedefs::{KeyBatcher, KeyValBatcher};
35
36    pub type RowRowSpine<T, R> = Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), T, R)>>>>;
37    pub type RowRowBatcher<T, R> = KeyValBatcher<Row, Row, T, R>;
38    pub type RowRowBuilder<T, R> = RcBuilder<
39        OrdValBuilder<RowRowLayout<((Row, Row), T, R)>, ColumnationStack<((Row, Row), T, R)>>,
40    >;
41
42    pub type RowValSpine<V, T, R> = Spine<Rc<OrdValBatch<RowValLayout<((Row, V), T, R)>>>>;
43    pub type RowValBatcher<V, T, R> = KeyValBatcher<Row, V, T, R>;
44    pub type RowValBuilder<V, T, R> = RcBuilder<
45        OrdValBuilder<RowValLayout<((Row, V), T, R)>, ColumnationStack<((Row, V), T, R)>>,
46    >;
47
48    pub type RowSpine<T, R> = Spine<Rc<OrdKeyBatch<RowLayout<((Row, ()), T, R)>>>>;
49    pub type RowBatcher<T, R> = KeyBatcher<Row, T, R>;
50    pub type RowBuilder<T, R> =
51        RcBuilder<OrdKeyBuilder<RowLayout<((Row, ()), T, R)>, ColumnationStack<((Row, ()), T, R)>>>;
52
53    /// A layout based on timely stacks
54    pub struct RowRowLayout<U: Update<Key = Row, Val = Row>> {
55        phantom: std::marker::PhantomData<U>,
56    }
57    pub struct RowValLayout<U: Update<Key = Row>> {
58        phantom: std::marker::PhantomData<U>,
59    }
60    pub struct RowLayout<U: Update<Key = Row, Val = ()>> {
61        phantom: std::marker::PhantomData<U>,
62    }
63
64    impl<U: Update<Key = Row, Val = Row>> Layout for RowRowLayout<U>
65    where
66        U::Time: Columnation,
67        U::Diff: Columnation,
68    {
69        type KeyContainer = DatumContainer;
70        type ValContainer = DatumContainer;
71        type TimeContainer = ColumnationStack<U::Time>;
72        type DiffContainer = ColumnationStack<U::Diff>;
73        type OffsetContainer = OffsetOptimized;
74    }
75    impl<U: Update<Key = Row>> Layout for RowValLayout<U>
76    where
77        U::Val: Columnation,
78        U::Time: Columnation,
79        U::Diff: Columnation,
80    {
81        type KeyContainer = DatumContainer;
82        type ValContainer = ColumnationStack<U::Val>;
83        type TimeContainer = ColumnationStack<U::Time>;
84        type DiffContainer = ColumnationStack<U::Diff>;
85        type OffsetContainer = OffsetOptimized;
86    }
87    impl<U: Update<Key = Row, Val = ()>> Layout for RowLayout<U>
88    where
89        U::Time: Columnation,
90        U::Diff: Columnation,
91    {
92        type KeyContainer = DatumContainer;
93        type ValContainer = ColumnationStack<()>;
94        type TimeContainer = ColumnationStack<U::Time>;
95        type DiffContainer = ColumnationStack<U::Diff>;
96        type OffsetContainer = OffsetOptimized;
97    }
98}
99
100/// A `Row`-specialized container using dictionary compression.
101mod container {
102
103    use std::cmp::Ordering;
104
105    use differential_dataflow::trace::implementations::BatchContainer;
106    use timely::container::PushInto;
107
108    use mz_repr::{Datum, Row, RowPacker, read_datum};
109
110    use super::bytes_container::BytesContainer;
111
112    /// Container wrapping `BytesContainer` that traffics only in `Row`-formatted bytes.
113    ///
114    /// This type accepts only `Row`-formatted bytes in its `Push` implementation, and
115    /// in return provides a `DatumSeq` view of the bytes which can be decoded as `Datum`s.
116    pub struct DatumContainer {
117        bytes: BytesContainer,
118    }
119
120    impl DatumContainer {
121        /// Visit contained allocations to determine their size and capacity.
122        #[inline]
123        pub fn heap_size(&self, callback: impl FnMut(usize, usize)) {
124            self.bytes.heap_size(callback)
125        }
126    }
127
128    impl BatchContainer for DatumContainer {
129        type Owned = Row;
130        type ReadItem<'a> = DatumSeq<'a>;
131
132        #[inline(always)]
133        fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned {
134            item.to_row()
135        }
136
137        #[inline]
138        fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) {
139            let mut packer = other.packer();
140            item.copy_into(&mut packer);
141        }
142
143        #[inline(always)]
144        fn push_ref(&mut self, item: Self::ReadItem<'_>) {
145            self.bytes.push_into(item.bytes);
146        }
147
148        #[inline(always)]
149        fn push_own(&mut self, item: &Self::Owned) {
150            self.bytes.push_into(item.data());
151        }
152
153        #[inline(always)]
154        fn clear(&mut self) {
155            self.bytes.clear();
156        }
157
158        #[inline(always)]
159        fn with_capacity(size: usize) -> Self {
160            Self {
161                bytes: BytesContainer::with_capacity(size),
162            }
163        }
164
165        #[inline(always)]
166        fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
167            Self {
168                bytes: BytesContainer::merge_capacity(&cont1.bytes, &cont2.bytes),
169            }
170        }
171
172        #[inline(always)]
173        fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
174            item
175        }
176
177        #[inline(always)]
178        fn index(&self, index: usize) -> Self::ReadItem<'_> {
179            DatumSeq {
180                bytes: self.bytes.index(index),
181            }
182        }
183
184        #[inline(always)]
185        fn len(&self) -> usize {
186            self.bytes.len()
187        }
188    }
189
190    impl PushInto<Row> for DatumContainer {
191        fn push_into(&mut self, item: Row) {
192            self.push_into(&item);
193        }
194    }
195
196    impl PushInto<&Row> for DatumContainer {
197        fn push_into(&mut self, item: &Row) {
198            self.push_own(item);
199        }
200    }
201
202    impl PushInto<DatumSeq<'_>> for DatumContainer {
203        fn push_into(&mut self, item: DatumSeq<'_>) {
204            self.bytes.push_into(item.as_bytes())
205        }
206    }
207
208    #[derive(Debug)]
209    pub struct DatumSeq<'a> {
210        bytes: &'a [u8],
211    }
212
213    impl<'a> DatumSeq<'a> {
214        #[inline]
215        pub fn copy_into(&self, row: &mut RowPacker) {
216            // SAFETY: `self.bytes` is a correctly formatted row.
217            unsafe { row.extend_by_slice_unchecked(self.bytes) }
218        }
219        #[inline]
220        fn as_bytes(&self) -> &'a [u8] {
221            self.bytes
222        }
223        #[inline]
224        pub fn to_row(&self) -> Row {
225            // SAFETY: `self.bytes` is a correctly formatted row.
226            unsafe { Row::from_bytes_unchecked(self.bytes) }
227        }
228    }
229
230    impl<'a> Copy for DatumSeq<'a> {}
231    impl<'a> Clone for DatumSeq<'a> {
232        #[inline(always)]
233        fn clone(&self) -> Self {
234            *self
235        }
236    }
237
238    impl<'a, 'b> PartialEq<DatumSeq<'a>> for DatumSeq<'b> {
239        #[inline]
240        fn eq(&self, other: &DatumSeq<'a>) -> bool {
241            self.bytes.eq(other.bytes)
242        }
243    }
244    impl<'a> PartialEq<&Row> for DatumSeq<'a> {
245        #[inline]
246        fn eq(&self, other: &&Row) -> bool {
247            self.bytes.eq(other.data())
248        }
249    }
250    impl<'a> Eq for DatumSeq<'a> {}
251    impl<'a, 'b> PartialOrd<DatumSeq<'a>> for DatumSeq<'b> {
252        #[inline]
253        fn partial_cmp(&self, other: &DatumSeq<'a>) -> Option<Ordering> {
254            Some(self.cmp(other))
255        }
256    }
257    impl<'a> Ord for DatumSeq<'a> {
258        #[inline]
259        fn cmp(&self, other: &Self) -> Ordering {
260            match self.bytes.len().cmp(&other.bytes.len()) {
261                std::cmp::Ordering::Less => std::cmp::Ordering::Less,
262                std::cmp::Ordering::Greater => std::cmp::Ordering::Greater,
263                std::cmp::Ordering::Equal => self.bytes.cmp(other.bytes),
264            }
265        }
266    }
267    impl<'a> Iterator for DatumSeq<'a> {
268        type Item = Datum<'a>;
269        #[inline]
270        fn next(&mut self) -> Option<Self::Item> {
271            if self.bytes.is_empty() {
272                None
273            } else {
274                let result = unsafe { read_datum(&mut self.bytes) };
275                Some(result)
276            }
277        }
278    }
279
280    use mz_repr::fixed_length::ToDatumIter;
281    impl<'long> ToDatumIter for DatumSeq<'long> {
282        type DatumIter<'short>
283            = DatumSeq<'short>
284        where
285            Self: 'short;
286        #[inline]
287        fn to_datum_iter<'short>(&'short self) -> Self::DatumIter<'short> {
288            *self
289        }
290    }
291
292    #[cfg(test)]
293    mod tests {
294        use crate::row_spine::DatumContainer;
295        use differential_dataflow::trace::implementations::BatchContainer;
296        use mz_repr::adt::date::Date;
297        use mz_repr::adt::interval::Interval;
298        use mz_repr::{Datum, Row, SqlScalarType};
299
300        #[mz_ore::test]
301        #[cfg_attr(miri, ignore)] // unsupported operation: integer-to-pointer casts and `ptr::with_exposed_provenance` are not supported
302        fn test_round_trip() {
303            fn round_trip(datums: Vec<Datum>) {
304                let row = Row::pack(datums.clone());
305
306                let mut container = DatumContainer::with_capacity(row.byte_len());
307                container.push_own(&row);
308
309                // When run under miri this catches undefined bytes written to data
310                // eg by calling push_copy! on a type which contains undefined padding values
311                println!("{:?}", container.index(0).bytes);
312
313                let datums2 = container.index(0).collect::<Vec<_>>();
314                assert_eq!(datums, datums2);
315            }
316
317            round_trip(vec![]);
318            round_trip(
319                SqlScalarType::enumerate()
320                    .iter()
321                    .flat_map(|r#type| r#type.interesting_datums())
322                    .collect(),
323            );
324            round_trip(vec![
325                Datum::Null,
326                Datum::Null,
327                Datum::False,
328                Datum::True,
329                Datum::Int16(-21),
330                Datum::Int32(-42),
331                Datum::Int64(-2_147_483_648 - 42),
332                Datum::UInt8(0),
333                Datum::UInt8(1),
334                Datum::UInt16(0),
335                Datum::UInt16(1),
336                Datum::UInt16(1 << 8),
337                Datum::UInt32(0),
338                Datum::UInt32(1),
339                Datum::UInt32(1 << 8),
340                Datum::UInt32(1 << 16),
341                Datum::UInt32(1 << 24),
342                Datum::UInt64(0),
343                Datum::UInt64(1),
344                Datum::UInt64(1 << 8),
345                Datum::UInt64(1 << 16),
346                Datum::UInt64(1 << 24),
347                Datum::UInt64(1 << 32),
348                Datum::UInt64(1 << 40),
349                Datum::UInt64(1 << 48),
350                Datum::UInt64(1 << 56),
351                Datum::Date(Date::from_pg_epoch(365 * 45 + 21).unwrap()),
352                Datum::Interval(Interval {
353                    months: 312,
354                    ..Default::default()
355                }),
356                Datum::Interval(Interval::new(0, 0, 1_012_312)),
357                Datum::Bytes(&[]),
358                Datum::Bytes(&[0, 2, 1, 255]),
359                Datum::String(""),
360                Datum::String("العَرَبِيَّة"),
361            ]);
362        }
363    }
364}
365
366mod bytes_container {
367
368    use differential_dataflow::trace::implementations::BatchContainer;
369    use timely::container::PushInto;
370
371    use mz_ore::region::Region;
372
373    /// A slice container with four bytes overhead per slice.
374    pub struct BytesContainer {
375        /// Total length of `batches`, maintained because recomputation is expensive.
376        length: usize,
377        batches: Vec<BytesBatch>,
378    }
379
380    impl BytesContainer {
381        /// Visit contained allocations to determine their size and capacity.
382        #[inline]
383        pub fn heap_size(&self, mut callback: impl FnMut(usize, usize)) {
384            // Calculate heap size for local, stash, and stash entries
385            callback(
386                self.batches.len() * std::mem::size_of::<BytesBatch>(),
387                self.batches.capacity() * std::mem::size_of::<BytesBatch>(),
388            );
389            for batch in self.batches.iter() {
390                batch.offsets.heap_size(&mut callback);
391                callback(batch.storage.len(), batch.storage.capacity());
392            }
393        }
394    }
395
396    impl BatchContainer for BytesContainer {
397        type Owned = Vec<u8>;
398        type ReadItem<'a> = &'a [u8];
399
400        #[inline]
401        fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned {
402            item.to_vec()
403        }
404
405        #[inline]
406        fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) {
407            other.clear();
408            other.extend_from_slice(item);
409        }
410
411        #[inline(always)]
412        fn push_ref(&mut self, item: Self::ReadItem<'_>) {
413            self.push_into(item);
414        }
415
416        #[inline(always)]
417        fn push_own(&mut self, item: &Self::Owned) {
418            self.push_into(item.as_slice())
419        }
420
421        fn clear(&mut self) {
422            self.batches.clear();
423            self.batches.push(BytesBatch::with_capacities(0, 0));
424            self.length = 0;
425        }
426
427        fn with_capacity(size: usize) -> Self {
428            Self {
429                length: 0,
430                batches: vec![BytesBatch::with_capacities(size, size)],
431            }
432        }
433
434        fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
435            let mut item_cap = 1;
436            let mut byte_cap = 0;
437            for batch in cont1.batches.iter() {
438                item_cap += batch.offsets.len() - 1;
439                byte_cap += batch.storage.len();
440            }
441            for batch in cont2.batches.iter() {
442                item_cap += batch.offsets.len() - 1;
443                byte_cap += batch.storage.len();
444            }
445            Self {
446                length: 0,
447                batches: vec![BytesBatch::with_capacities(item_cap, byte_cap)],
448            }
449        }
450
451        #[inline(always)]
452        fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
453            item
454        }
455
456        #[inline]
457        fn index(&self, mut index: usize) -> Self::ReadItem<'_> {
458            for batch in self.batches.iter() {
459                if index < batch.len() {
460                    return batch.index(index);
461                }
462                index -= batch.len();
463            }
464            panic!("Index out of bounds");
465        }
466
467        #[inline(always)]
468        fn len(&self) -> usize {
469            self.length
470        }
471    }
472
473    impl PushInto<&[u8]> for BytesContainer {
474        #[inline]
475        fn push_into(&mut self, item: &[u8]) {
476            self.length += 1;
477            if let Some(batch) = self.batches.last_mut() {
478                let success = batch.try_push(item);
479                if !success {
480                    // double the lengths from `batch`.
481                    let item_cap = 2 * batch.offsets.len();
482                    let byte_cap = std::cmp::max(2 * batch.storage.capacity(), item.len());
483                    let mut new_batch = BytesBatch::with_capacities(item_cap, byte_cap);
484                    assert!(new_batch.try_push(item));
485                    self.batches.push(new_batch);
486                }
487            }
488        }
489    }
490
491    /// A batch of slice storage.
492    ///
493    /// The backing storage for this batch will not be resized.
494    pub struct BytesBatch {
495        offsets: crate::row_spine::OffsetOptimized,
496        storage: Region<u8>,
497        len: usize,
498    }
499
500    impl BytesBatch {
501        /// Either accepts the slice and returns true,
502        /// or does not and returns false.
503        fn try_push(&mut self, slice: &[u8]) -> bool {
504            if self.storage.len() + slice.len() <= self.storage.capacity() {
505                self.storage.extend_from_slice(slice);
506                self.offsets.push_into(self.storage.len());
507                self.len += 1;
508                true
509            } else {
510                false
511            }
512        }
513        #[inline]
514        fn index(&self, index: usize) -> &[u8] {
515            let lower = self.offsets.index(index);
516            let upper = self.offsets.index(index + 1);
517            &self.storage[lower..upper]
518        }
519        #[inline(always)]
520        fn len(&self) -> usize {
521            debug_assert_eq!(self.len, self.offsets.len() - 1);
522            self.len
523        }
524
525        fn with_capacities(item_cap: usize, byte_cap: usize) -> Self {
526            // TODO: be wary of `byte_cap` greater than 2^32.
527            let mut offsets = crate::row_spine::OffsetOptimized::with_capacity(item_cap + 1);
528            offsets.push_into(0);
529            Self {
530                offsets,
531                storage: Region::new_auto(byte_cap.next_power_of_two()),
532                len: 0,
533            }
534        }
535    }
536}
537
538mod offset_opt {
539    use differential_dataflow::trace::implementations::BatchContainer;
540    use differential_dataflow::trace::implementations::OffsetList;
541    use timely::container::PushInto;
542
543    enum OffsetStride {
544        Empty,
545        Zero,
546        Striding(usize, usize),
547        Saturated(usize, usize, usize),
548    }
549
550    impl OffsetStride {
551        /// Accepts or rejects a newly pushed element.
552        #[inline]
553        fn push(&mut self, item: usize) -> bool {
554            match self {
555                OffsetStride::Empty => {
556                    if item == 0 {
557                        *self = OffsetStride::Zero;
558                        true
559                    } else {
560                        false
561                    }
562                }
563                OffsetStride::Zero => {
564                    *self = OffsetStride::Striding(item, 2);
565                    true
566                }
567                OffsetStride::Striding(stride, count) => {
568                    if item == *stride * *count {
569                        *count += 1;
570                        true
571                    } else if item == *stride * (*count - 1) {
572                        *self = OffsetStride::Saturated(*stride, *count, 1);
573                        true
574                    } else {
575                        false
576                    }
577                }
578                OffsetStride::Saturated(stride, count, reps) => {
579                    if item == *stride * (*count - 1) {
580                        *reps += 1;
581                        true
582                    } else {
583                        false
584                    }
585                }
586            }
587        }
588
589        #[inline]
590        fn index(&self, index: usize) -> usize {
591            match self {
592                OffsetStride::Empty => {
593                    panic!("Empty OffsetStride")
594                }
595                OffsetStride::Zero => 0,
596                OffsetStride::Striding(stride, _steps) => *stride * index,
597                OffsetStride::Saturated(stride, steps, _reps) => {
598                    if index < *steps {
599                        *stride * index
600                    } else {
601                        *stride * (*steps - 1)
602                    }
603                }
604            }
605        }
606
607        #[inline]
608        fn len(&self) -> usize {
609            match self {
610                OffsetStride::Empty => 0,
611                OffsetStride::Zero => 1,
612                OffsetStride::Striding(_stride, steps) => *steps,
613                OffsetStride::Saturated(_stride, steps, reps) => *steps + *reps,
614            }
615        }
616    }
617
618    pub struct OffsetOptimized {
619        strided: OffsetStride,
620        spilled: OffsetList,
621    }
622
623    impl BatchContainer for OffsetOptimized {
624        type Owned = usize;
625        type ReadItem<'a> = usize;
626
627        #[inline]
628        fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned {
629            item
630        }
631
632        #[inline]
633        fn push_ref(&mut self, item: Self::ReadItem<'_>) {
634            self.push_into(item)
635        }
636
637        #[inline]
638        fn push_own(&mut self, item: &Self::Owned) {
639            self.push_into(*item)
640        }
641
642        fn clear(&mut self) {
643            self.strided = OffsetStride::Empty;
644            self.spilled.clear();
645        }
646
647        fn with_capacity(_size: usize) -> Self {
648            Self {
649                strided: OffsetStride::Empty,
650                spilled: OffsetList::with_capacity(0),
651            }
652        }
653
654        fn merge_capacity(_cont1: &Self, _cont2: &Self) -> Self {
655            Self {
656                strided: OffsetStride::Empty,
657                spilled: OffsetList::with_capacity(0),
658            }
659        }
660
661        #[inline]
662        fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
663            item
664        }
665
666        #[inline]
667        fn index(&self, index: usize) -> Self::ReadItem<'_> {
668            if index < self.strided.len() {
669                self.strided.index(index)
670            } else {
671                self.spilled.index(index - self.strided.len())
672            }
673        }
674
675        #[inline]
676        fn len(&self) -> usize {
677            self.strided.len() + self.spilled.len()
678        }
679    }
680
681    impl PushInto<usize> for OffsetOptimized {
682        #[inline]
683        fn push_into(&mut self, item: usize) {
684            if !self.spilled.is_empty() {
685                self.spilled.push(item);
686            } else {
687                let inserted = self.strided.push(item);
688                if !inserted {
689                    self.spilled.push(item);
690                }
691            }
692        }
693    }
694
695    impl OffsetOptimized {
696        pub fn heap_size(&self, callback: impl FnMut(usize, usize)) {
697            crate::row_spine::offset_list_size(&self.spilled, callback);
698        }
699    }
700}
701
702/// Helper to compute the size of an [`OffsetList`] in memory.
703#[inline]
704pub(crate) fn offset_list_size(data: &OffsetList, mut callback: impl FnMut(usize, usize)) {
705    // Private `vec_size` because we should only use it where data isn't region-allocated.
706    // `T: Copy` makes sure the implementation is correct even if types change!
707    #[inline(always)]
708    fn vec_size<T: Copy>(data: &Vec<T>, mut callback: impl FnMut(usize, usize)) {
709        let size_of_t = std::mem::size_of::<T>();
710        callback(data.len() * size_of_t, data.capacity() * size_of_t);
711    }
712
713    vec_size(&data.smol, &mut callback);
714    vec_size(&data.chonk, callback);
715}