mz_compute/
row_spine.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10pub use self::container::DatumContainer;
11pub use self::container::DatumSeq;
12pub use self::offset_opt::OffsetOptimized;
13pub use self::spines::{
14    RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowRowSpine, RowSpine, RowValBatcher,
15    RowValBuilder, RowValSpine,
16};
17use differential_dataflow::trace::implementations::OffsetList;
18
19/// Spines specialized to contain `Row` types in keys and values.
20mod spines {
21    use std::rc::Rc;
22
23    use differential_dataflow::containers::{Columnation, TimelyStack};
24    use differential_dataflow::trace::implementations::Layout;
25    use differential_dataflow::trace::implementations::Update;
26    use differential_dataflow::trace::implementations::ord_neu::{OrdKeyBatch, OrdKeyBuilder};
27    use differential_dataflow::trace::implementations::ord_neu::{OrdValBatch, OrdValBuilder};
28    use differential_dataflow::trace::implementations::spine_fueled::Spine;
29    use differential_dataflow::trace::rc_blanket_impls::RcBuilder;
30    use mz_repr::Row;
31
32    use crate::row_spine::{DatumContainer, OffsetOptimized};
33    use crate::typedefs::{KeyBatcher, KeyValBatcher};
34
35    pub type RowRowSpine<T, R> = Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), T, R)>>>>;
36    pub type RowRowBatcher<T, R> = KeyValBatcher<Row, Row, T, R>;
37    pub type RowRowBuilder<T, R> =
38        RcBuilder<OrdValBuilder<RowRowLayout<((Row, Row), T, R)>, TimelyStack<((Row, Row), T, R)>>>;
39
40    pub type RowValSpine<V, T, R> = Spine<Rc<OrdValBatch<RowValLayout<((Row, V), T, R)>>>>;
41    pub type RowValBatcher<V, T, R> = KeyValBatcher<Row, V, T, R>;
42    pub type RowValBuilder<V, T, R> =
43        RcBuilder<OrdValBuilder<RowValLayout<((Row, V), T, R)>, TimelyStack<((Row, V), T, R)>>>;
44
45    pub type RowSpine<T, R> = Spine<Rc<OrdKeyBatch<RowLayout<((Row, ()), T, R)>>>>;
46    pub type RowBatcher<T, R> = KeyBatcher<Row, T, R>;
47    pub type RowBuilder<T, R> =
48        RcBuilder<OrdKeyBuilder<RowLayout<((Row, ()), T, R)>, TimelyStack<((Row, ()), T, R)>>>;
49
50    /// A layout based on timely stacks
51    pub struct RowRowLayout<U: Update<Key = Row, Val = Row>> {
52        phantom: std::marker::PhantomData<U>,
53    }
54    pub struct RowValLayout<U: Update<Key = Row>> {
55        phantom: std::marker::PhantomData<U>,
56    }
57    pub struct RowLayout<U: Update<Key = Row, Val = ()>> {
58        phantom: std::marker::PhantomData<U>,
59    }
60
61    impl<U: Update<Key = Row, Val = Row>> Layout for RowRowLayout<U>
62    where
63        U::Time: Columnation,
64        U::Diff: Columnation,
65    {
66        type Target = U;
67        type KeyContainer = DatumContainer;
68        type ValContainer = DatumContainer;
69        type TimeContainer = TimelyStack<U::Time>;
70        type DiffContainer = TimelyStack<U::Diff>;
71        type OffsetContainer = OffsetOptimized;
72    }
73    impl<U: Update<Key = Row>> Layout for RowValLayout<U>
74    where
75        U::Val: Columnation,
76        U::Time: Columnation,
77        U::Diff: Columnation,
78    {
79        type Target = U;
80        type KeyContainer = DatumContainer;
81        type ValContainer = TimelyStack<U::Val>;
82        type TimeContainer = TimelyStack<U::Time>;
83        type DiffContainer = TimelyStack<U::Diff>;
84        type OffsetContainer = OffsetOptimized;
85    }
86    impl<U: Update<Key = Row, Val = ()>> Layout for RowLayout<U>
87    where
88        U::Time: Columnation,
89        U::Diff: Columnation,
90    {
91        type Target = U;
92        type KeyContainer = DatumContainer;
93        type ValContainer = TimelyStack<()>;
94        type TimeContainer = TimelyStack<U::Time>;
95        type DiffContainer = TimelyStack<U::Diff>;
96        type OffsetContainer = OffsetOptimized;
97    }
98}
99
100/// A `Row`-specialized container using dictionary compression.
101mod container {
102
103    use std::cmp::Ordering;
104
105    use differential_dataflow::IntoOwned;
106    use differential_dataflow::trace::implementations::BatchContainer;
107    use timely::container::PushInto;
108
109    use mz_repr::{Datum, Row, RowPacker, read_datum};
110
111    use super::bytes_container::BytesContainer;
112
113    /// Container wrapping `BytesContainer` that traffics only in `Row`-formatted bytes.
114    ///
115    /// This type accepts only `Row`-formatted bytes in its `Push` implementation, and
116    /// in return provides a `DatumSeq` view of the bytes which can be decoded as `Datum`s.
117    pub struct DatumContainer {
118        bytes: BytesContainer,
119    }
120
121    impl DatumContainer {
122        /// Visit contained allocations to determine their size and capacity.
123        #[inline]
124        pub fn heap_size(&self, callback: impl FnMut(usize, usize)) {
125            self.bytes.heap_size(callback)
126        }
127    }
128
129    impl BatchContainer for DatumContainer {
130        type Owned = Row;
131        type ReadItem<'a> = DatumSeq<'a>;
132
133        fn with_capacity(size: usize) -> Self {
134            Self {
135                bytes: BytesContainer::with_capacity(size),
136            }
137        }
138
139        fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
140            Self {
141                bytes: BytesContainer::merge_capacity(&cont1.bytes, &cont2.bytes),
142            }
143        }
144
145        fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
146            item
147        }
148
149        fn index(&self, index: usize) -> Self::ReadItem<'_> {
150            DatumSeq {
151                bytes: self.bytes.index(index),
152            }
153        }
154
155        fn len(&self) -> usize {
156            self.bytes.len()
157        }
158    }
159
160    impl PushInto<Row> for DatumContainer {
161        fn push_into(&mut self, item: Row) {
162            self.push_into(&item);
163        }
164    }
165
166    impl PushInto<&Row> for DatumContainer {
167        fn push_into(&mut self, item: &Row) {
168            let item: DatumSeq<'_> = IntoOwned::borrow_as(item);
169            self.push_into(item);
170        }
171    }
172
173    impl PushInto<DatumSeq<'_>> for DatumContainer {
174        fn push_into(&mut self, item: DatumSeq<'_>) {
175            self.bytes.push_into(item.as_bytes())
176        }
177    }
178
179    #[derive(Debug)]
180    pub struct DatumSeq<'a> {
181        bytes: &'a [u8],
182    }
183
184    impl<'a> DatumSeq<'a> {
185        pub fn copy_into(&self, row: &mut RowPacker) {
186            // SAFETY: `self.bytes` is a correctly formatted row.
187            unsafe { row.extend_by_slice_unchecked(self.bytes) }
188        }
189        fn as_bytes(&self) -> &'a [u8] {
190            self.bytes
191        }
192    }
193
194    impl<'a> Copy for DatumSeq<'a> {}
195    impl<'a> Clone for DatumSeq<'a> {
196        fn clone(&self) -> Self {
197            *self
198        }
199    }
200
201    impl<'a, 'b> PartialEq<DatumSeq<'a>> for DatumSeq<'b> {
202        fn eq(&self, other: &DatumSeq<'a>) -> bool {
203            self.bytes.eq(other.bytes)
204        }
205    }
206    impl<'a> PartialEq<&Row> for DatumSeq<'a> {
207        fn eq(&self, other: &&Row) -> bool {
208            self.bytes.eq(other.data())
209        }
210    }
211    impl<'a> Eq for DatumSeq<'a> {}
212    impl<'a, 'b> PartialOrd<DatumSeq<'a>> for DatumSeq<'b> {
213        fn partial_cmp(&self, other: &DatumSeq<'a>) -> Option<Ordering> {
214            Some(self.cmp(other))
215        }
216    }
217    impl<'a> Ord for DatumSeq<'a> {
218        fn cmp(&self, other: &Self) -> Ordering {
219            match self.bytes.len().cmp(&other.bytes.len()) {
220                std::cmp::Ordering::Less => std::cmp::Ordering::Less,
221                std::cmp::Ordering::Greater => std::cmp::Ordering::Greater,
222                std::cmp::Ordering::Equal => self.bytes.cmp(other.bytes),
223            }
224        }
225    }
226    impl<'a> IntoOwned<'a> for DatumSeq<'a> {
227        type Owned = Row;
228        fn into_owned(self) -> Self::Owned {
229            // SAFETY: `bytes` contains a valid row.
230            unsafe { Row::from_bytes_unchecked(self.bytes) }
231        }
232        fn clone_onto(self, other: &mut Self::Owned) {
233            let mut packer = other.packer();
234            self.copy_into(&mut packer);
235        }
236        fn borrow_as(other: &'a Self::Owned) -> Self {
237            Self {
238                bytes: other.data(),
239            }
240        }
241    }
242
243    impl<'a> Iterator for DatumSeq<'a> {
244        type Item = Datum<'a>;
245        fn next(&mut self) -> Option<Self::Item> {
246            if self.bytes.is_empty() {
247                None
248            } else {
249                let result = unsafe { read_datum(&mut self.bytes) };
250                Some(result)
251            }
252        }
253    }
254
255    use mz_repr::fixed_length::ToDatumIter;
256    impl<'long> ToDatumIter for DatumSeq<'long> {
257        type DatumIter<'short>
258            = DatumSeq<'short>
259        where
260            Self: 'short;
261        fn to_datum_iter<'short>(&'short self) -> Self::DatumIter<'short> {
262            *self
263        }
264    }
265
266    #[cfg(test)]
267    mod tests {
268        use crate::row_spine::DatumContainer;
269        use differential_dataflow::trace::implementations::BatchContainer;
270        use mz_repr::adt::date::Date;
271        use mz_repr::adt::interval::Interval;
272        use mz_repr::{Datum, Row, ScalarType};
273
274        #[mz_ore::test]
275        #[cfg_attr(miri, ignore)] // unsupported operation: integer-to-pointer casts and `ptr::with_exposed_provenance` are not supported
276        fn test_round_trip() {
277            fn round_trip(datums: Vec<Datum>) {
278                let row = Row::pack(datums.clone());
279
280                let mut container = DatumContainer::with_capacity(row.byte_len());
281                container.push(&row);
282
283                // When run under miri this catches undefined bytes written to data
284                // eg by calling push_copy! on a type which contains undefined padding values
285                println!("{:?}", container.index(0).bytes);
286
287                let datums2 = container.index(0).collect::<Vec<_>>();
288                assert_eq!(datums, datums2);
289            }
290
291            round_trip(vec![]);
292            round_trip(
293                ScalarType::enumerate()
294                    .iter()
295                    .flat_map(|r#type| r#type.interesting_datums())
296                    .collect(),
297            );
298            round_trip(vec![
299                Datum::Null,
300                Datum::Null,
301                Datum::False,
302                Datum::True,
303                Datum::Int16(-21),
304                Datum::Int32(-42),
305                Datum::Int64(-2_147_483_648 - 42),
306                Datum::UInt8(0),
307                Datum::UInt8(1),
308                Datum::UInt16(0),
309                Datum::UInt16(1),
310                Datum::UInt16(1 << 8),
311                Datum::UInt32(0),
312                Datum::UInt32(1),
313                Datum::UInt32(1 << 8),
314                Datum::UInt32(1 << 16),
315                Datum::UInt32(1 << 24),
316                Datum::UInt64(0),
317                Datum::UInt64(1),
318                Datum::UInt64(1 << 8),
319                Datum::UInt64(1 << 16),
320                Datum::UInt64(1 << 24),
321                Datum::UInt64(1 << 32),
322                Datum::UInt64(1 << 40),
323                Datum::UInt64(1 << 48),
324                Datum::UInt64(1 << 56),
325                Datum::Date(Date::from_pg_epoch(365 * 45 + 21).unwrap()),
326                Datum::Interval(Interval {
327                    months: 312,
328                    ..Default::default()
329                }),
330                Datum::Interval(Interval::new(0, 0, 1_012_312)),
331                Datum::Bytes(&[]),
332                Datum::Bytes(&[0, 2, 1, 255]),
333                Datum::String(""),
334                Datum::String("العَرَبِيَّة"),
335            ]);
336        }
337    }
338}
339
340mod bytes_container {
341
342    use differential_dataflow::trace::implementations::BatchContainer;
343    use timely::container::PushInto;
344
345    use mz_ore::region::Region;
346
347    /// A slice container with four bytes overhead per slice.
348    pub struct BytesContainer {
349        batches: Vec<BytesBatch>,
350    }
351
352    impl BytesContainer {
353        /// Visit contained allocations to determine their size and capacity.
354        #[inline]
355        pub fn heap_size(&self, mut callback: impl FnMut(usize, usize)) {
356            // Calculate heap size for local, stash, and stash entries
357            callback(
358                self.batches.len() * std::mem::size_of::<BytesBatch>(),
359                self.batches.capacity() * std::mem::size_of::<BytesBatch>(),
360            );
361            for batch in self.batches.iter() {
362                batch.offsets.heap_size(&mut callback);
363                callback(batch.storage.len(), batch.storage.capacity());
364            }
365        }
366    }
367
368    impl BatchContainer for BytesContainer {
369        type Owned = Vec<u8>;
370        type ReadItem<'a> = &'a [u8];
371
372        fn with_capacity(size: usize) -> Self {
373            Self {
374                batches: vec![BytesBatch::with_capacities(size, size)],
375            }
376        }
377
378        fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
379            let mut item_cap = 1;
380            let mut byte_cap = 0;
381            for batch in cont1.batches.iter() {
382                item_cap += batch.offsets.len() - 1;
383                byte_cap += batch.storage.len();
384            }
385            for batch in cont2.batches.iter() {
386                item_cap += batch.offsets.len() - 1;
387                byte_cap += batch.storage.len();
388            }
389            Self {
390                batches: vec![BytesBatch::with_capacities(item_cap, byte_cap)],
391            }
392        }
393
394        fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
395            item
396        }
397
398        fn index(&self, mut index: usize) -> Self::ReadItem<'_> {
399            for batch in self.batches.iter() {
400                if index < batch.len() {
401                    return batch.index(index);
402                }
403                index -= batch.len();
404            }
405            panic!("Index out of bounds");
406        }
407
408        fn len(&self) -> usize {
409            let mut result = 0;
410            for batch in self.batches.iter() {
411                result += batch.len();
412            }
413            result
414        }
415    }
416
417    impl PushInto<&[u8]> for BytesContainer {
418        fn push_into(&mut self, item: &[u8]) {
419            if let Some(batch) = self.batches.last_mut() {
420                let success = batch.try_push(item);
421                if !success {
422                    // double the lengths from `batch`.
423                    let item_cap = 2 * batch.offsets.len();
424                    let byte_cap = std::cmp::max(2 * batch.storage.capacity(), item.len());
425                    let mut new_batch = BytesBatch::with_capacities(item_cap, byte_cap);
426                    assert!(new_batch.try_push(item));
427                    self.batches.push(new_batch);
428                }
429            }
430        }
431    }
432
433    /// A batch of slice storage.
434    ///
435    /// The backing storage for this batch will not be resized.
436    pub struct BytesBatch {
437        offsets: crate::row_spine::OffsetOptimized,
438        storage: Region<u8>,
439    }
440
441    impl BytesBatch {
442        /// Either accepts the slice and returns true,
443        /// or does not and returns false.
444        fn try_push(&mut self, slice: &[u8]) -> bool {
445            if self.storage.len() + slice.len() <= self.storage.capacity() {
446                self.storage.extend_from_slice(slice);
447                self.offsets.push(self.storage.len());
448                true
449            } else {
450                false
451            }
452        }
453        fn index(&self, index: usize) -> &[u8] {
454            let lower = self.offsets.index(index);
455            let upper = self.offsets.index(index + 1);
456            &self.storage[lower..upper]
457        }
458        fn len(&self) -> usize {
459            self.offsets.len() - 1
460        }
461
462        fn with_capacities(item_cap: usize, byte_cap: usize) -> Self {
463            // TODO: be wary of `byte_cap` greater than 2^32.
464            let mut offsets = crate::row_spine::OffsetOptimized::with_capacity(item_cap + 1);
465            offsets.push(0);
466            Self {
467                offsets,
468                storage: Region::new_auto(byte_cap.next_power_of_two()),
469            }
470        }
471    }
472}
473
474mod offset_opt {
475    use differential_dataflow::trace::implementations::BatchContainer;
476    use differential_dataflow::trace::implementations::OffsetList;
477    use timely::container::PushInto;
478
479    enum OffsetStride {
480        Empty,
481        Zero,
482        Striding(usize, usize),
483        Saturated(usize, usize, usize),
484    }
485
486    impl OffsetStride {
487        /// Accepts or rejects a newly pushed element.
488        fn push(&mut self, item: usize) -> bool {
489            match self {
490                OffsetStride::Empty => {
491                    if item == 0 {
492                        *self = OffsetStride::Zero;
493                        true
494                    } else {
495                        false
496                    }
497                }
498                OffsetStride::Zero => {
499                    *self = OffsetStride::Striding(item, 2);
500                    true
501                }
502                OffsetStride::Striding(stride, count) => {
503                    if item == *stride * *count {
504                        *count += 1;
505                        true
506                    } else if item == *stride * (*count - 1) {
507                        *self = OffsetStride::Saturated(*stride, *count, 1);
508                        true
509                    } else {
510                        false
511                    }
512                }
513                OffsetStride::Saturated(stride, count, reps) => {
514                    if item == *stride * (*count - 1) {
515                        *reps += 1;
516                        true
517                    } else {
518                        false
519                    }
520                }
521            }
522        }
523
524        fn index(&self, index: usize) -> usize {
525            match self {
526                OffsetStride::Empty => {
527                    panic!("Empty OffsetStride")
528                }
529                OffsetStride::Zero => 0,
530                OffsetStride::Striding(stride, _steps) => *stride * index,
531                OffsetStride::Saturated(stride, steps, _reps) => {
532                    if index < *steps {
533                        *stride * index
534                    } else {
535                        *stride * (*steps - 1)
536                    }
537                }
538            }
539        }
540
541        fn len(&self) -> usize {
542            match self {
543                OffsetStride::Empty => 0,
544                OffsetStride::Zero => 1,
545                OffsetStride::Striding(_stride, steps) => *steps,
546                OffsetStride::Saturated(_stride, steps, reps) => *steps + *reps,
547            }
548        }
549    }
550
551    pub struct OffsetOptimized {
552        strided: OffsetStride,
553        spilled: OffsetList,
554    }
555
556    impl BatchContainer for OffsetOptimized {
557        type Owned = usize;
558        type ReadItem<'a> = usize;
559
560        fn with_capacity(_size: usize) -> Self {
561            Self {
562                strided: OffsetStride::Empty,
563                spilled: OffsetList::with_capacity(0),
564            }
565        }
566
567        fn merge_capacity(_cont1: &Self, _cont2: &Self) -> Self {
568            Self {
569                strided: OffsetStride::Empty,
570                spilled: OffsetList::with_capacity(0),
571            }
572        }
573
574        fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
575            item
576        }
577
578        fn index(&self, index: usize) -> Self::ReadItem<'_> {
579            if index < self.strided.len() {
580                self.strided.index(index)
581            } else {
582                self.spilled.index(index - self.strided.len())
583            }
584        }
585
586        fn len(&self) -> usize {
587            self.strided.len() + self.spilled.len()
588        }
589    }
590
591    impl PushInto<usize> for OffsetOptimized {
592        fn push_into(&mut self, item: usize) {
593            if !self.spilled.is_empty() {
594                self.spilled.push(item);
595            } else {
596                let inserted = self.strided.push(item);
597                if !inserted {
598                    self.spilled.push(item);
599                }
600            }
601        }
602    }
603
604    impl OffsetOptimized {
605        pub fn heap_size(&self, callback: impl FnMut(usize, usize)) {
606            crate::row_spine::offset_list_size(&self.spilled, callback);
607        }
608    }
609}
610
611/// Helper to compute the size of an [`OffsetList`] in memory.
612#[inline]
613pub(crate) fn offset_list_size(data: &OffsetList, mut callback: impl FnMut(usize, usize)) {
614    // Private `vec_size` because we should only use it where data isn't region-allocated.
615    // `T: Copy` makes sure the implementation is correct even if types change!
616    #[inline(always)]
617    fn vec_size<T: Copy>(data: &Vec<T>, mut callback: impl FnMut(usize, usize)) {
618        let size_of_t = std::mem::size_of::<T>();
619        callback(data.len() * size_of_t, data.capacity() * size_of_t);
620    }
621
622    vec_size(&data.smol, &mut callback);
623    vec_size(&data.chonk, callback);
624}