mz_compute/
row_spine.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10pub use self::container::DatumContainer;
11pub use self::container::DatumSeq;
12pub use self::offset_opt::OffsetOptimized;
13pub use self::spines::{
14    RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowRowSpine, RowSpine, RowValBatcher,
15    RowValBuilder, RowValSpine,
16};
17use differential_dataflow::trace::implementations::OffsetList;
18
19/// Spines specialized to contain `Row` types in keys and values.
20mod spines {
21    use std::rc::Rc;
22
23    use differential_dataflow::containers::{Columnation, TimelyStack};
24    use differential_dataflow::trace::implementations::Layout;
25    use differential_dataflow::trace::implementations::Update;
26    use differential_dataflow::trace::implementations::ord_neu::{OrdKeyBatch, OrdKeyBuilder};
27    use differential_dataflow::trace::implementations::ord_neu::{OrdValBatch, OrdValBuilder};
28    use differential_dataflow::trace::implementations::spine_fueled::Spine;
29    use differential_dataflow::trace::rc_blanket_impls::RcBuilder;
30    use mz_repr::Row;
31
32    use crate::row_spine::{DatumContainer, OffsetOptimized};
33    use crate::typedefs::{KeyBatcher, KeyValBatcher};
34
35    pub type RowRowSpine<T, R> = Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), T, R)>>>>;
36    pub type RowRowBatcher<T, R> = KeyValBatcher<Row, Row, T, R>;
37    pub type RowRowBuilder<T, R> =
38        RcBuilder<OrdValBuilder<RowRowLayout<((Row, Row), T, R)>, TimelyStack<((Row, Row), T, R)>>>;
39
40    pub type RowValSpine<V, T, R> = Spine<Rc<OrdValBatch<RowValLayout<((Row, V), T, R)>>>>;
41    pub type RowValBatcher<V, T, R> = KeyValBatcher<Row, V, T, R>;
42    pub type RowValBuilder<V, T, R> =
43        RcBuilder<OrdValBuilder<RowValLayout<((Row, V), T, R)>, TimelyStack<((Row, V), T, R)>>>;
44
45    pub type RowSpine<T, R> = Spine<Rc<OrdKeyBatch<RowLayout<((Row, ()), T, R)>>>>;
46    pub type RowBatcher<T, R> = KeyBatcher<Row, T, R>;
47    pub type RowBuilder<T, R> =
48        RcBuilder<OrdKeyBuilder<RowLayout<((Row, ()), T, R)>, TimelyStack<((Row, ()), T, R)>>>;
49
50    /// A layout based on timely stacks
51    pub struct RowRowLayout<U: Update<Key = Row, Val = Row>> {
52        phantom: std::marker::PhantomData<U>,
53    }
54    pub struct RowValLayout<U: Update<Key = Row>> {
55        phantom: std::marker::PhantomData<U>,
56    }
57    pub struct RowLayout<U: Update<Key = Row, Val = ()>> {
58        phantom: std::marker::PhantomData<U>,
59    }
60
61    impl<U: Update<Key = Row, Val = Row>> Layout for RowRowLayout<U>
62    where
63        U::Time: Columnation,
64        U::Diff: Columnation,
65    {
66        type KeyContainer = DatumContainer;
67        type ValContainer = DatumContainer;
68        type TimeContainer = TimelyStack<U::Time>;
69        type DiffContainer = TimelyStack<U::Diff>;
70        type OffsetContainer = OffsetOptimized;
71    }
72    impl<U: Update<Key = Row>> Layout for RowValLayout<U>
73    where
74        U::Val: Columnation,
75        U::Time: Columnation,
76        U::Diff: Columnation,
77    {
78        type KeyContainer = DatumContainer;
79        type ValContainer = TimelyStack<U::Val>;
80        type TimeContainer = TimelyStack<U::Time>;
81        type DiffContainer = TimelyStack<U::Diff>;
82        type OffsetContainer = OffsetOptimized;
83    }
84    impl<U: Update<Key = Row, Val = ()>> Layout for RowLayout<U>
85    where
86        U::Time: Columnation,
87        U::Diff: Columnation,
88    {
89        type KeyContainer = DatumContainer;
90        type ValContainer = TimelyStack<()>;
91        type TimeContainer = TimelyStack<U::Time>;
92        type DiffContainer = TimelyStack<U::Diff>;
93        type OffsetContainer = OffsetOptimized;
94    }
95}
96
97/// A `Row`-specialized container using dictionary compression.
98mod container {
99
100    use std::cmp::Ordering;
101
102    use differential_dataflow::trace::implementations::BatchContainer;
103    use timely::container::PushInto;
104
105    use mz_repr::{Datum, Row, RowPacker, read_datum};
106
107    use super::bytes_container::BytesContainer;
108
109    /// Container wrapping `BytesContainer` that traffics only in `Row`-formatted bytes.
110    ///
111    /// This type accepts only `Row`-formatted bytes in its `Push` implementation, and
112    /// in return provides a `DatumSeq` view of the bytes which can be decoded as `Datum`s.
113    pub struct DatumContainer {
114        bytes: BytesContainer,
115    }
116
117    impl DatumContainer {
118        /// Visit contained allocations to determine their size and capacity.
119        #[inline]
120        pub fn heap_size(&self, callback: impl FnMut(usize, usize)) {
121            self.bytes.heap_size(callback)
122        }
123    }
124
125    impl BatchContainer for DatumContainer {
126        type Owned = Row;
127        type ReadItem<'a> = DatumSeq<'a>;
128
129        #[inline(always)]
130        fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned {
131            item.to_row()
132        }
133
134        #[inline]
135        fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) {
136            let mut packer = other.packer();
137            item.copy_into(&mut packer);
138        }
139
140        #[inline(always)]
141        fn push_ref(&mut self, item: Self::ReadItem<'_>) {
142            self.bytes.push_into(item.bytes);
143        }
144
145        #[inline(always)]
146        fn push_own(&mut self, item: &Self::Owned) {
147            self.bytes.push_into(item.data());
148        }
149
150        #[inline(always)]
151        fn clear(&mut self) {
152            self.bytes.clear();
153        }
154
155        #[inline(always)]
156        fn with_capacity(size: usize) -> Self {
157            Self {
158                bytes: BytesContainer::with_capacity(size),
159            }
160        }
161
162        #[inline(always)]
163        fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
164            Self {
165                bytes: BytesContainer::merge_capacity(&cont1.bytes, &cont2.bytes),
166            }
167        }
168
169        #[inline(always)]
170        fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
171            item
172        }
173
174        #[inline(always)]
175        fn index(&self, index: usize) -> Self::ReadItem<'_> {
176            DatumSeq {
177                bytes: self.bytes.index(index),
178            }
179        }
180
181        #[inline(always)]
182        fn len(&self) -> usize {
183            self.bytes.len()
184        }
185    }
186
187    impl PushInto<Row> for DatumContainer {
188        fn push_into(&mut self, item: Row) {
189            self.push_into(&item);
190        }
191    }
192
193    impl PushInto<&Row> for DatumContainer {
194        fn push_into(&mut self, item: &Row) {
195            self.push_own(item);
196        }
197    }
198
199    impl PushInto<DatumSeq<'_>> for DatumContainer {
200        fn push_into(&mut self, item: DatumSeq<'_>) {
201            self.bytes.push_into(item.as_bytes())
202        }
203    }
204
205    #[derive(Debug)]
206    pub struct DatumSeq<'a> {
207        bytes: &'a [u8],
208    }
209
210    impl<'a> DatumSeq<'a> {
211        #[inline]
212        pub fn copy_into(&self, row: &mut RowPacker) {
213            // SAFETY: `self.bytes` is a correctly formatted row.
214            unsafe { row.extend_by_slice_unchecked(self.bytes) }
215        }
216        #[inline]
217        fn as_bytes(&self) -> &'a [u8] {
218            self.bytes
219        }
220        #[inline]
221        pub fn to_row(&self) -> Row {
222            // SAFETY: `self.bytes` is a correctly formatted row.
223            unsafe { Row::from_bytes_unchecked(self.bytes) }
224        }
225    }
226
227    impl<'a> Copy for DatumSeq<'a> {}
228    impl<'a> Clone for DatumSeq<'a> {
229        #[inline(always)]
230        fn clone(&self) -> Self {
231            *self
232        }
233    }
234
235    impl<'a, 'b> PartialEq<DatumSeq<'a>> for DatumSeq<'b> {
236        #[inline]
237        fn eq(&self, other: &DatumSeq<'a>) -> bool {
238            self.bytes.eq(other.bytes)
239        }
240    }
241    impl<'a> PartialEq<&Row> for DatumSeq<'a> {
242        #[inline]
243        fn eq(&self, other: &&Row) -> bool {
244            self.bytes.eq(other.data())
245        }
246    }
247    impl<'a> Eq for DatumSeq<'a> {}
248    impl<'a, 'b> PartialOrd<DatumSeq<'a>> for DatumSeq<'b> {
249        #[inline]
250        fn partial_cmp(&self, other: &DatumSeq<'a>) -> Option<Ordering> {
251            Some(self.cmp(other))
252        }
253    }
254    impl<'a> Ord for DatumSeq<'a> {
255        #[inline]
256        fn cmp(&self, other: &Self) -> Ordering {
257            match self.bytes.len().cmp(&other.bytes.len()) {
258                std::cmp::Ordering::Less => std::cmp::Ordering::Less,
259                std::cmp::Ordering::Greater => std::cmp::Ordering::Greater,
260                std::cmp::Ordering::Equal => self.bytes.cmp(other.bytes),
261            }
262        }
263    }
264    impl<'a> Iterator for DatumSeq<'a> {
265        type Item = Datum<'a>;
266        #[inline]
267        fn next(&mut self) -> Option<Self::Item> {
268            if self.bytes.is_empty() {
269                None
270            } else {
271                let result = unsafe { read_datum(&mut self.bytes) };
272                Some(result)
273            }
274        }
275    }
276
277    use mz_repr::fixed_length::ToDatumIter;
278    impl<'long> ToDatumIter for DatumSeq<'long> {
279        type DatumIter<'short>
280            = DatumSeq<'short>
281        where
282            Self: 'short;
283        #[inline]
284        fn to_datum_iter<'short>(&'short self) -> Self::DatumIter<'short> {
285            *self
286        }
287    }
288
289    #[cfg(test)]
290    mod tests {
291        use crate::row_spine::DatumContainer;
292        use differential_dataflow::trace::implementations::BatchContainer;
293        use mz_repr::adt::date::Date;
294        use mz_repr::adt::interval::Interval;
295        use mz_repr::{Datum, Row, SqlScalarType};
296
297        #[mz_ore::test]
298        #[cfg_attr(miri, ignore)] // unsupported operation: integer-to-pointer casts and `ptr::with_exposed_provenance` are not supported
299        fn test_round_trip() {
300            fn round_trip(datums: Vec<Datum>) {
301                let row = Row::pack(datums.clone());
302
303                let mut container = DatumContainer::with_capacity(row.byte_len());
304                container.push_own(&row);
305
306                // When run under miri this catches undefined bytes written to data
307                // eg by calling push_copy! on a type which contains undefined padding values
308                println!("{:?}", container.index(0).bytes);
309
310                let datums2 = container.index(0).collect::<Vec<_>>();
311                assert_eq!(datums, datums2);
312            }
313
314            round_trip(vec![]);
315            round_trip(
316                SqlScalarType::enumerate()
317                    .iter()
318                    .flat_map(|r#type| r#type.interesting_datums())
319                    .collect(),
320            );
321            round_trip(vec![
322                Datum::Null,
323                Datum::Null,
324                Datum::False,
325                Datum::True,
326                Datum::Int16(-21),
327                Datum::Int32(-42),
328                Datum::Int64(-2_147_483_648 - 42),
329                Datum::UInt8(0),
330                Datum::UInt8(1),
331                Datum::UInt16(0),
332                Datum::UInt16(1),
333                Datum::UInt16(1 << 8),
334                Datum::UInt32(0),
335                Datum::UInt32(1),
336                Datum::UInt32(1 << 8),
337                Datum::UInt32(1 << 16),
338                Datum::UInt32(1 << 24),
339                Datum::UInt64(0),
340                Datum::UInt64(1),
341                Datum::UInt64(1 << 8),
342                Datum::UInt64(1 << 16),
343                Datum::UInt64(1 << 24),
344                Datum::UInt64(1 << 32),
345                Datum::UInt64(1 << 40),
346                Datum::UInt64(1 << 48),
347                Datum::UInt64(1 << 56),
348                Datum::Date(Date::from_pg_epoch(365 * 45 + 21).unwrap()),
349                Datum::Interval(Interval {
350                    months: 312,
351                    ..Default::default()
352                }),
353                Datum::Interval(Interval::new(0, 0, 1_012_312)),
354                Datum::Bytes(&[]),
355                Datum::Bytes(&[0, 2, 1, 255]),
356                Datum::String(""),
357                Datum::String("العَرَبِيَّة"),
358            ]);
359        }
360    }
361}
362
363mod bytes_container {
364
365    use differential_dataflow::trace::implementations::BatchContainer;
366    use timely::container::PushInto;
367
368    use mz_ore::region::Region;
369
370    /// A slice container with four bytes overhead per slice.
371    pub struct BytesContainer {
372        /// Total length of `batches`, maintained because recomputation is expensive.
373        length: usize,
374        batches: Vec<BytesBatch>,
375    }
376
377    impl BytesContainer {
378        /// Visit contained allocations to determine their size and capacity.
379        #[inline]
380        pub fn heap_size(&self, mut callback: impl FnMut(usize, usize)) {
381            // Calculate heap size for local, stash, and stash entries
382            callback(
383                self.batches.len() * std::mem::size_of::<BytesBatch>(),
384                self.batches.capacity() * std::mem::size_of::<BytesBatch>(),
385            );
386            for batch in self.batches.iter() {
387                batch.offsets.heap_size(&mut callback);
388                callback(batch.storage.len(), batch.storage.capacity());
389            }
390        }
391    }
392
393    impl BatchContainer for BytesContainer {
394        type Owned = Vec<u8>;
395        type ReadItem<'a> = &'a [u8];
396
397        #[inline]
398        fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned {
399            item.to_vec()
400        }
401
402        #[inline]
403        fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) {
404            other.clear();
405            other.extend_from_slice(item);
406        }
407
408        #[inline(always)]
409        fn push_ref(&mut self, item: Self::ReadItem<'_>) {
410            self.push_into(item);
411        }
412
413        #[inline(always)]
414        fn push_own(&mut self, item: &Self::Owned) {
415            self.push_into(item.as_slice())
416        }
417
418        fn clear(&mut self) {
419            self.batches.clear();
420            self.batches.push(BytesBatch::with_capacities(0, 0));
421            self.length = 0;
422        }
423
424        fn with_capacity(size: usize) -> Self {
425            Self {
426                length: 0,
427                batches: vec![BytesBatch::with_capacities(size, size)],
428            }
429        }
430
431        fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
432            let mut item_cap = 1;
433            let mut byte_cap = 0;
434            for batch in cont1.batches.iter() {
435                item_cap += batch.offsets.len() - 1;
436                byte_cap += batch.storage.len();
437            }
438            for batch in cont2.batches.iter() {
439                item_cap += batch.offsets.len() - 1;
440                byte_cap += batch.storage.len();
441            }
442            Self {
443                length: 0,
444                batches: vec![BytesBatch::with_capacities(item_cap, byte_cap)],
445            }
446        }
447
448        #[inline(always)]
449        fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
450            item
451        }
452
453        #[inline]
454        fn index(&self, mut index: usize) -> Self::ReadItem<'_> {
455            for batch in self.batches.iter() {
456                if index < batch.len() {
457                    return batch.index(index);
458                }
459                index -= batch.len();
460            }
461            panic!("Index out of bounds");
462        }
463
464        #[inline(always)]
465        fn len(&self) -> usize {
466            self.length
467        }
468    }
469
470    impl PushInto<&[u8]> for BytesContainer {
471        #[inline]
472        fn push_into(&mut self, item: &[u8]) {
473            self.length += 1;
474            if let Some(batch) = self.batches.last_mut() {
475                let success = batch.try_push(item);
476                if !success {
477                    // double the lengths from `batch`.
478                    let item_cap = 2 * batch.offsets.len();
479                    let byte_cap = std::cmp::max(2 * batch.storage.capacity(), item.len());
480                    let mut new_batch = BytesBatch::with_capacities(item_cap, byte_cap);
481                    assert!(new_batch.try_push(item));
482                    self.batches.push(new_batch);
483                }
484            }
485        }
486    }
487
488    /// A batch of slice storage.
489    ///
490    /// The backing storage for this batch will not be resized.
491    pub struct BytesBatch {
492        offsets: crate::row_spine::OffsetOptimized,
493        storage: Region<u8>,
494        len: usize,
495    }
496
497    impl BytesBatch {
498        /// Either accepts the slice and returns true,
499        /// or does not and returns false.
500        fn try_push(&mut self, slice: &[u8]) -> bool {
501            if self.storage.len() + slice.len() <= self.storage.capacity() {
502                self.storage.extend_from_slice(slice);
503                self.offsets.push_into(self.storage.len());
504                self.len += 1;
505                true
506            } else {
507                false
508            }
509        }
510        fn index(&self, index: usize) -> &[u8] {
511            let lower = self.offsets.index(index);
512            let upper = self.offsets.index(index + 1);
513            &self.storage[lower..upper]
514        }
515        #[inline(always)]
516        fn len(&self) -> usize {
517            debug_assert_eq!(self.len, self.offsets.len() - 1);
518            self.len
519        }
520
521        fn with_capacities(item_cap: usize, byte_cap: usize) -> Self {
522            // TODO: be wary of `byte_cap` greater than 2^32.
523            let mut offsets = crate::row_spine::OffsetOptimized::with_capacity(item_cap + 1);
524            offsets.push_into(0);
525            Self {
526                offsets,
527                storage: Region::new_auto(byte_cap.next_power_of_two()),
528                len: 0,
529            }
530        }
531    }
532}
533
534mod offset_opt {
535    use differential_dataflow::trace::implementations::BatchContainer;
536    use differential_dataflow::trace::implementations::OffsetList;
537    use timely::container::PushInto;
538
539    enum OffsetStride {
540        Empty,
541        Zero,
542        Striding(usize, usize),
543        Saturated(usize, usize, usize),
544    }
545
546    impl OffsetStride {
547        /// Accepts or rejects a newly pushed element.
548        #[inline]
549        fn push(&mut self, item: usize) -> bool {
550            match self {
551                OffsetStride::Empty => {
552                    if item == 0 {
553                        *self = OffsetStride::Zero;
554                        true
555                    } else {
556                        false
557                    }
558                }
559                OffsetStride::Zero => {
560                    *self = OffsetStride::Striding(item, 2);
561                    true
562                }
563                OffsetStride::Striding(stride, count) => {
564                    if item == *stride * *count {
565                        *count += 1;
566                        true
567                    } else if item == *stride * (*count - 1) {
568                        *self = OffsetStride::Saturated(*stride, *count, 1);
569                        true
570                    } else {
571                        false
572                    }
573                }
574                OffsetStride::Saturated(stride, count, reps) => {
575                    if item == *stride * (*count - 1) {
576                        *reps += 1;
577                        true
578                    } else {
579                        false
580                    }
581                }
582            }
583        }
584
585        #[inline]
586        fn index(&self, index: usize) -> usize {
587            match self {
588                OffsetStride::Empty => {
589                    panic!("Empty OffsetStride")
590                }
591                OffsetStride::Zero => 0,
592                OffsetStride::Striding(stride, _steps) => *stride * index,
593                OffsetStride::Saturated(stride, steps, _reps) => {
594                    if index < *steps {
595                        *stride * index
596                    } else {
597                        *stride * (*steps - 1)
598                    }
599                }
600            }
601        }
602
603        #[inline]
604        fn len(&self) -> usize {
605            match self {
606                OffsetStride::Empty => 0,
607                OffsetStride::Zero => 1,
608                OffsetStride::Striding(_stride, steps) => *steps,
609                OffsetStride::Saturated(_stride, steps, reps) => *steps + *reps,
610            }
611        }
612    }
613
614    pub struct OffsetOptimized {
615        strided: OffsetStride,
616        spilled: OffsetList,
617    }
618
619    impl BatchContainer for OffsetOptimized {
620        type Owned = usize;
621        type ReadItem<'a> = usize;
622
623        #[inline]
624        fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned {
625            item
626        }
627
628        #[inline]
629        fn push_ref(&mut self, item: Self::ReadItem<'_>) {
630            self.push_into(item)
631        }
632
633        #[inline]
634        fn push_own(&mut self, item: &Self::Owned) {
635            self.push_into(*item)
636        }
637
638        fn clear(&mut self) {
639            self.strided = OffsetStride::Empty;
640            self.spilled.clear();
641        }
642
643        fn with_capacity(_size: usize) -> Self {
644            Self {
645                strided: OffsetStride::Empty,
646                spilled: OffsetList::with_capacity(0),
647            }
648        }
649
650        fn merge_capacity(_cont1: &Self, _cont2: &Self) -> Self {
651            Self {
652                strided: OffsetStride::Empty,
653                spilled: OffsetList::with_capacity(0),
654            }
655        }
656
657        #[inline]
658        fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
659            item
660        }
661
662        #[inline]
663        fn index(&self, index: usize) -> Self::ReadItem<'_> {
664            if index < self.strided.len() {
665                self.strided.index(index)
666            } else {
667                self.spilled.index(index - self.strided.len())
668            }
669        }
670
671        #[inline]
672        fn len(&self) -> usize {
673            self.strided.len() + self.spilled.len()
674        }
675    }
676
677    impl PushInto<usize> for OffsetOptimized {
678        #[inline]
679        fn push_into(&mut self, item: usize) {
680            if !self.spilled.is_empty() {
681                self.spilled.push(item);
682            } else {
683                let inserted = self.strided.push(item);
684                if !inserted {
685                    self.spilled.push(item);
686                }
687            }
688        }
689    }
690
691    impl OffsetOptimized {
692        pub fn heap_size(&self, callback: impl FnMut(usize, usize)) {
693            crate::row_spine::offset_list_size(&self.spilled, callback);
694        }
695    }
696}
697
698/// Helper to compute the size of an [`OffsetList`] in memory.
699#[inline]
700pub(crate) fn offset_list_size(data: &OffsetList, mut callback: impl FnMut(usize, usize)) {
701    // Private `vec_size` because we should only use it where data isn't region-allocated.
702    // `T: Copy` makes sure the implementation is correct even if types change!
703    #[inline(always)]
704    fn vec_size<T: Copy>(data: &Vec<T>, mut callback: impl FnMut(usize, usize)) {
705        let size_of_t = std::mem::size_of::<T>();
706        callback(data.len() * size_of_t, data.capacity() * size_of_t);
707    }
708
709    vec_size(&data.smol, &mut callback);
710    vec_size(&data.chonk, callback);
711}