mz_compute/
row_spine.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10pub use self::container::DatumContainer;
11pub use self::container::DatumSeq;
12pub use self::offset_opt::OffsetOptimized;
13pub use self::spines::{
14    RowBatcher, RowBuilder, RowRowBatcher, RowRowBuilder, RowRowSpine, RowSpine, RowValBatcher,
15    RowValBuilder, RowValSpine,
16};
17use differential_dataflow::trace::implementations::OffsetList;
18
19/// Spines specialized to contain `Row` types in keys and values.
20mod spines {
21    use std::rc::Rc;
22
23    use differential_dataflow::containers::{Columnation, TimelyStack};
24    use differential_dataflow::trace::implementations::Layout;
25    use differential_dataflow::trace::implementations::Update;
26    use differential_dataflow::trace::implementations::ord_neu::{OrdKeyBatch, OrdKeyBuilder};
27    use differential_dataflow::trace::implementations::ord_neu::{OrdValBatch, OrdValBuilder};
28    use differential_dataflow::trace::implementations::spine_fueled::Spine;
29    use differential_dataflow::trace::rc_blanket_impls::RcBuilder;
30    use mz_repr::Row;
31
32    use crate::row_spine::{DatumContainer, OffsetOptimized};
33    use crate::typedefs::{KeyBatcher, KeyValBatcher};
34
35    pub type RowRowSpine<T, R> = Spine<Rc<OrdValBatch<RowRowLayout<((Row, Row), T, R)>>>>;
36    pub type RowRowBatcher<T, R> = KeyValBatcher<Row, Row, T, R>;
37    pub type RowRowBuilder<T, R> =
38        RcBuilder<OrdValBuilder<RowRowLayout<((Row, Row), T, R)>, TimelyStack<((Row, Row), T, R)>>>;
39
40    pub type RowValSpine<V, T, R> = Spine<Rc<OrdValBatch<RowValLayout<((Row, V), T, R)>>>>;
41    pub type RowValBatcher<V, T, R> = KeyValBatcher<Row, V, T, R>;
42    pub type RowValBuilder<V, T, R> =
43        RcBuilder<OrdValBuilder<RowValLayout<((Row, V), T, R)>, TimelyStack<((Row, V), T, R)>>>;
44
45    pub type RowSpine<T, R> = Spine<Rc<OrdKeyBatch<RowLayout<((Row, ()), T, R)>>>>;
46    pub type RowBatcher<T, R> = KeyBatcher<Row, T, R>;
47    pub type RowBuilder<T, R> =
48        RcBuilder<OrdKeyBuilder<RowLayout<((Row, ()), T, R)>, TimelyStack<((Row, ()), T, R)>>>;
49
50    /// A layout based on timely stacks
51    pub struct RowRowLayout<U: Update<Key = Row, Val = Row>> {
52        phantom: std::marker::PhantomData<U>,
53    }
54    pub struct RowValLayout<U: Update<Key = Row>> {
55        phantom: std::marker::PhantomData<U>,
56    }
57    pub struct RowLayout<U: Update<Key = Row, Val = ()>> {
58        phantom: std::marker::PhantomData<U>,
59    }
60
61    impl<U: Update<Key = Row, Val = Row>> Layout for RowRowLayout<U>
62    where
63        U::Time: Columnation,
64        U::Diff: Columnation,
65    {
66        type Target = U;
67        type KeyContainer = DatumContainer;
68        type ValContainer = DatumContainer;
69        type TimeContainer = TimelyStack<U::Time>;
70        type DiffContainer = TimelyStack<U::Diff>;
71        type OffsetContainer = OffsetOptimized;
72    }
73    impl<U: Update<Key = Row>> Layout for RowValLayout<U>
74    where
75        U::Val: Columnation,
76        U::Time: Columnation,
77        U::Diff: Columnation,
78    {
79        type Target = U;
80        type KeyContainer = DatumContainer;
81        type ValContainer = TimelyStack<U::Val>;
82        type TimeContainer = TimelyStack<U::Time>;
83        type DiffContainer = TimelyStack<U::Diff>;
84        type OffsetContainer = OffsetOptimized;
85    }
86    impl<U: Update<Key = Row, Val = ()>> Layout for RowLayout<U>
87    where
88        U::Time: Columnation,
89        U::Diff: Columnation,
90    {
91        type Target = U;
92        type KeyContainer = DatumContainer;
93        type ValContainer = TimelyStack<()>;
94        type TimeContainer = TimelyStack<U::Time>;
95        type DiffContainer = TimelyStack<U::Diff>;
96        type OffsetContainer = OffsetOptimized;
97    }
98}
99
100/// A `Row`-specialized container using dictionary compression.
101mod container {
102
103    use std::cmp::Ordering;
104
105    use differential_dataflow::IntoOwned;
106    use differential_dataflow::trace::implementations::BatchContainer;
107    use timely::container::PushInto;
108
109    use mz_repr::{Datum, Row, RowPacker, read_datum};
110
111    use super::bytes_container::BytesContainer;
112
113    /// Container wrapping `BytesContainer` that traffics only in `Row`-formatted bytes.
114    ///
115    /// This type accepts only `Row`-formatted bytes in its `Push` implementation, and
116    /// in return provides a `DatumSeq` view of the bytes which can be decoded as `Datum`s.
117    pub struct DatumContainer {
118        bytes: BytesContainer,
119    }
120
121    impl DatumContainer {
122        /// Visit contained allocations to determine their size and capacity.
123        #[inline]
124        pub fn heap_size(&self, callback: impl FnMut(usize, usize)) {
125            self.bytes.heap_size(callback)
126        }
127    }
128
129    impl BatchContainer for DatumContainer {
130        type Owned = Row;
131        type ReadItem<'a> = DatumSeq<'a>;
132
133        fn with_capacity(size: usize) -> Self {
134            Self {
135                bytes: BytesContainer::with_capacity(size),
136            }
137        }
138
139        fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
140            Self {
141                bytes: BytesContainer::merge_capacity(&cont1.bytes, &cont2.bytes),
142            }
143        }
144
145        fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
146            item
147        }
148
149        fn index(&self, index: usize) -> Self::ReadItem<'_> {
150            DatumSeq {
151                bytes: self.bytes.index(index),
152            }
153        }
154
155        fn len(&self) -> usize {
156            self.bytes.len()
157        }
158    }
159
160    impl PushInto<Row> for DatumContainer {
161        fn push_into(&mut self, item: Row) {
162            self.push_into(&item);
163        }
164    }
165
166    impl PushInto<&Row> for DatumContainer {
167        fn push_into(&mut self, item: &Row) {
168            let item: DatumSeq<'_> = IntoOwned::borrow_as(item);
169            self.push_into(item);
170        }
171    }
172
173    impl PushInto<DatumSeq<'_>> for DatumContainer {
174        fn push_into(&mut self, item: DatumSeq<'_>) {
175            self.bytes.push_into(item.as_bytes())
176        }
177    }
178
179    #[derive(Debug)]
180    pub struct DatumSeq<'a> {
181        bytes: &'a [u8],
182    }
183
184    impl<'a> DatumSeq<'a> {
185        pub fn copy_into(&self, row: &mut RowPacker) {
186            // SAFETY: `self.bytes` is a correctly formatted row.
187            unsafe { row.extend_by_slice_unchecked(self.bytes) }
188        }
189        fn as_bytes(&self) -> &'a [u8] {
190            self.bytes
191        }
192    }
193
194    impl<'a> Copy for DatumSeq<'a> {}
195    impl<'a> Clone for DatumSeq<'a> {
196        fn clone(&self) -> Self {
197            *self
198        }
199    }
200
201    impl<'a, 'b> PartialEq<DatumSeq<'a>> for DatumSeq<'b> {
202        fn eq(&self, other: &DatumSeq<'a>) -> bool {
203            self.bytes.eq(other.bytes)
204        }
205    }
206    impl<'a> PartialEq<&Row> for DatumSeq<'a> {
207        fn eq(&self, other: &&Row) -> bool {
208            self.bytes.eq(other.data())
209        }
210    }
211    impl<'a> Eq for DatumSeq<'a> {}
212    impl<'a, 'b> PartialOrd<DatumSeq<'a>> for DatumSeq<'b> {
213        fn partial_cmp(&self, other: &DatumSeq<'a>) -> Option<Ordering> {
214            Some(self.cmp(other))
215        }
216    }
217    impl<'a> Ord for DatumSeq<'a> {
218        fn cmp(&self, other: &Self) -> Ordering {
219            match self.bytes.len().cmp(&other.bytes.len()) {
220                std::cmp::Ordering::Less => std::cmp::Ordering::Less,
221                std::cmp::Ordering::Greater => std::cmp::Ordering::Greater,
222                std::cmp::Ordering::Equal => self.bytes.cmp(other.bytes),
223            }
224        }
225    }
226    impl<'a> IntoOwned<'a> for DatumSeq<'a> {
227        type Owned = Row;
228        fn into_owned(self) -> Self::Owned {
229            // SAFETY: `bytes` contains a valid row.
230            unsafe { Row::from_bytes_unchecked(self.bytes) }
231        }
232        fn clone_onto(self, other: &mut Self::Owned) {
233            let mut packer = other.packer();
234            self.copy_into(&mut packer);
235        }
236        fn borrow_as(other: &'a Self::Owned) -> Self {
237            Self {
238                bytes: other.data(),
239            }
240        }
241    }
242
243    impl<'a> Iterator for DatumSeq<'a> {
244        type Item = Datum<'a>;
245        fn next(&mut self) -> Option<Self::Item> {
246            if self.bytes.is_empty() {
247                None
248            } else {
249                let result = unsafe { read_datum(&mut self.bytes) };
250                Some(result)
251            }
252        }
253    }
254
255    use mz_repr::fixed_length::ToDatumIter;
256    impl<'long> ToDatumIter for DatumSeq<'long> {
257        type DatumIter<'short>
258            = DatumSeq<'short>
259        where
260            Self: 'short;
261        fn to_datum_iter<'short>(&'short self) -> Self::DatumIter<'short> {
262            *self
263        }
264    }
265
266    #[cfg(test)]
267    mod tests {
268        use crate::row_spine::DatumContainer;
269        use differential_dataflow::trace::implementations::BatchContainer;
270        use mz_repr::adt::date::Date;
271        use mz_repr::adt::interval::Interval;
272        use mz_repr::{Datum, Row, ScalarType};
273
274        #[mz_ore::test]
275        #[cfg_attr(miri, ignore)] // unsupported operation: integer-to-pointer casts and `ptr::with_exposed_provenance` are not supported
276        fn test_round_trip() {
277            fn round_trip(datums: Vec<Datum>) {
278                let row = Row::pack(datums.clone());
279
280                let mut container = DatumContainer::with_capacity(row.byte_len());
281                container.push(&row);
282
283                // When run under miri this catches undefined bytes written to data
284                // eg by calling push_copy! on a type which contains undefined padding values
285                println!("{:?}", container.index(0).bytes);
286
287                let datums2 = container.index(0).collect::<Vec<_>>();
288                assert_eq!(datums, datums2);
289            }
290
291            round_trip(vec![]);
292            round_trip(
293                ScalarType::enumerate()
294                    .iter()
295                    .flat_map(|r#type| r#type.interesting_datums())
296                    .collect(),
297            );
298            round_trip(vec![
299                Datum::Null,
300                Datum::Null,
301                Datum::False,
302                Datum::True,
303                Datum::Int16(-21),
304                Datum::Int32(-42),
305                Datum::Int64(-2_147_483_648 - 42),
306                Datum::UInt8(0),
307                Datum::UInt8(1),
308                Datum::UInt16(0),
309                Datum::UInt16(1),
310                Datum::UInt16(1 << 8),
311                Datum::UInt32(0),
312                Datum::UInt32(1),
313                Datum::UInt32(1 << 8),
314                Datum::UInt32(1 << 16),
315                Datum::UInt32(1 << 24),
316                Datum::UInt64(0),
317                Datum::UInt64(1),
318                Datum::UInt64(1 << 8),
319                Datum::UInt64(1 << 16),
320                Datum::UInt64(1 << 24),
321                Datum::UInt64(1 << 32),
322                Datum::UInt64(1 << 40),
323                Datum::UInt64(1 << 48),
324                Datum::UInt64(1 << 56),
325                Datum::Date(Date::from_pg_epoch(365 * 45 + 21).unwrap()),
326                Datum::Interval(Interval {
327                    months: 312,
328                    ..Default::default()
329                }),
330                Datum::Interval(Interval::new(0, 0, 1_012_312)),
331                Datum::Bytes(&[]),
332                Datum::Bytes(&[0, 2, 1, 255]),
333                Datum::String(""),
334                Datum::String("العَرَبِيَّة"),
335            ]);
336        }
337    }
338}
339
340mod bytes_container {
341
342    use differential_dataflow::trace::implementations::BatchContainer;
343    use timely::container::PushInto;
344
345    use mz_ore::region::Region;
346
347    /// A slice container with four bytes overhead per slice.
348    pub struct BytesContainer {
349        /// Total length of `batches`, maintained because recomputation is expensive.
350        length: usize,
351        batches: Vec<BytesBatch>,
352    }
353
354    impl BytesContainer {
355        /// Visit contained allocations to determine their size and capacity.
356        #[inline]
357        pub fn heap_size(&self, mut callback: impl FnMut(usize, usize)) {
358            // Calculate heap size for local, stash, and stash entries
359            callback(
360                self.batches.len() * std::mem::size_of::<BytesBatch>(),
361                self.batches.capacity() * std::mem::size_of::<BytesBatch>(),
362            );
363            for batch in self.batches.iter() {
364                batch.offsets.heap_size(&mut callback);
365                callback(batch.storage.len(), batch.storage.capacity());
366            }
367        }
368    }
369
370    impl BatchContainer for BytesContainer {
371        type Owned = Vec<u8>;
372        type ReadItem<'a> = &'a [u8];
373
374        fn with_capacity(size: usize) -> Self {
375            Self {
376                length: 0,
377                batches: vec![BytesBatch::with_capacities(size, size)],
378            }
379        }
380
381        fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
382            let mut item_cap = 1;
383            let mut byte_cap = 0;
384            for batch in cont1.batches.iter() {
385                item_cap += batch.offsets.len() - 1;
386                byte_cap += batch.storage.len();
387            }
388            for batch in cont2.batches.iter() {
389                item_cap += batch.offsets.len() - 1;
390                byte_cap += batch.storage.len();
391            }
392            Self {
393                length: 0,
394                batches: vec![BytesBatch::with_capacities(item_cap, byte_cap)],
395            }
396        }
397
398        fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
399            item
400        }
401
402        fn index(&self, mut index: usize) -> Self::ReadItem<'_> {
403            for batch in self.batches.iter() {
404                if index < batch.len() {
405                    return batch.index(index);
406                }
407                index -= batch.len();
408            }
409            panic!("Index out of bounds");
410        }
411
412        #[inline(always)]
413        fn len(&self) -> usize {
414            self.length
415        }
416    }
417
418    impl PushInto<&[u8]> for BytesContainer {
419        fn push_into(&mut self, item: &[u8]) {
420            self.length += 1;
421            if let Some(batch) = self.batches.last_mut() {
422                let success = batch.try_push(item);
423                if !success {
424                    // double the lengths from `batch`.
425                    let item_cap = 2 * batch.offsets.len();
426                    let byte_cap = std::cmp::max(2 * batch.storage.capacity(), item.len());
427                    let mut new_batch = BytesBatch::with_capacities(item_cap, byte_cap);
428                    assert!(new_batch.try_push(item));
429                    self.batches.push(new_batch);
430                }
431            }
432        }
433    }
434
435    /// A batch of slice storage.
436    ///
437    /// The backing storage for this batch will not be resized.
438    pub struct BytesBatch {
439        offsets: crate::row_spine::OffsetOptimized,
440        storage: Region<u8>,
441        len: usize,
442    }
443
444    impl BytesBatch {
445        /// Either accepts the slice and returns true,
446        /// or does not and returns false.
447        fn try_push(&mut self, slice: &[u8]) -> bool {
448            if self.storage.len() + slice.len() <= self.storage.capacity() {
449                self.storage.extend_from_slice(slice);
450                self.offsets.push(self.storage.len());
451                self.len += 1;
452                true
453            } else {
454                false
455            }
456        }
457        fn index(&self, index: usize) -> &[u8] {
458            let lower = self.offsets.index(index);
459            let upper = self.offsets.index(index + 1);
460            &self.storage[lower..upper]
461        }
462        #[inline(always)]
463        fn len(&self) -> usize {
464            debug_assert_eq!(self.len, self.offsets.len() - 1);
465            self.len
466        }
467
468        fn with_capacities(item_cap: usize, byte_cap: usize) -> Self {
469            // TODO: be wary of `byte_cap` greater than 2^32.
470            let mut offsets = crate::row_spine::OffsetOptimized::with_capacity(item_cap + 1);
471            offsets.push(0);
472            Self {
473                offsets,
474                storage: Region::new_auto(byte_cap.next_power_of_two()),
475                len: 0,
476            }
477        }
478    }
479}
480
481mod offset_opt {
482    use differential_dataflow::trace::implementations::BatchContainer;
483    use differential_dataflow::trace::implementations::OffsetList;
484    use timely::container::PushInto;
485
486    enum OffsetStride {
487        Empty,
488        Zero,
489        Striding(usize, usize),
490        Saturated(usize, usize, usize),
491    }
492
493    impl OffsetStride {
494        /// Accepts or rejects a newly pushed element.
495        fn push(&mut self, item: usize) -> bool {
496            match self {
497                OffsetStride::Empty => {
498                    if item == 0 {
499                        *self = OffsetStride::Zero;
500                        true
501                    } else {
502                        false
503                    }
504                }
505                OffsetStride::Zero => {
506                    *self = OffsetStride::Striding(item, 2);
507                    true
508                }
509                OffsetStride::Striding(stride, count) => {
510                    if item == *stride * *count {
511                        *count += 1;
512                        true
513                    } else if item == *stride * (*count - 1) {
514                        *self = OffsetStride::Saturated(*stride, *count, 1);
515                        true
516                    } else {
517                        false
518                    }
519                }
520                OffsetStride::Saturated(stride, count, reps) => {
521                    if item == *stride * (*count - 1) {
522                        *reps += 1;
523                        true
524                    } else {
525                        false
526                    }
527                }
528            }
529        }
530
531        fn index(&self, index: usize) -> usize {
532            match self {
533                OffsetStride::Empty => {
534                    panic!("Empty OffsetStride")
535                }
536                OffsetStride::Zero => 0,
537                OffsetStride::Striding(stride, _steps) => *stride * index,
538                OffsetStride::Saturated(stride, steps, _reps) => {
539                    if index < *steps {
540                        *stride * index
541                    } else {
542                        *stride * (*steps - 1)
543                    }
544                }
545            }
546        }
547
548        fn len(&self) -> usize {
549            match self {
550                OffsetStride::Empty => 0,
551                OffsetStride::Zero => 1,
552                OffsetStride::Striding(_stride, steps) => *steps,
553                OffsetStride::Saturated(_stride, steps, reps) => *steps + *reps,
554            }
555        }
556    }
557
558    pub struct OffsetOptimized {
559        strided: OffsetStride,
560        spilled: OffsetList,
561    }
562
563    impl BatchContainer for OffsetOptimized {
564        type Owned = usize;
565        type ReadItem<'a> = usize;
566
567        fn with_capacity(_size: usize) -> Self {
568            Self {
569                strided: OffsetStride::Empty,
570                spilled: OffsetList::with_capacity(0),
571            }
572        }
573
574        fn merge_capacity(_cont1: &Self, _cont2: &Self) -> Self {
575            Self {
576                strided: OffsetStride::Empty,
577                spilled: OffsetList::with_capacity(0),
578            }
579        }
580
581        fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
582            item
583        }
584
585        fn index(&self, index: usize) -> Self::ReadItem<'_> {
586            if index < self.strided.len() {
587                self.strided.index(index)
588            } else {
589                self.spilled.index(index - self.strided.len())
590            }
591        }
592
593        fn len(&self) -> usize {
594            self.strided.len() + self.spilled.len()
595        }
596    }
597
598    impl PushInto<usize> for OffsetOptimized {
599        fn push_into(&mut self, item: usize) {
600            if !self.spilled.is_empty() {
601                self.spilled.push(item);
602            } else {
603                let inserted = self.strided.push(item);
604                if !inserted {
605                    self.spilled.push(item);
606                }
607            }
608        }
609    }
610
611    impl OffsetOptimized {
612        pub fn heap_size(&self, callback: impl FnMut(usize, usize)) {
613            crate::row_spine::offset_list_size(&self.spilled, callback);
614        }
615    }
616}
617
618/// Helper to compute the size of an [`OffsetList`] in memory.
619#[inline]
620pub(crate) fn offset_list_size(data: &OffsetList, mut callback: impl FnMut(usize, usize)) {
621    // Private `vec_size` because we should only use it where data isn't region-allocated.
622    // `T: Copy` makes sure the implementation is correct even if types change!
623    #[inline(always)]
624    fn vec_size<T: Copy>(data: &Vec<T>, mut callback: impl FnMut(usize, usize)) {
625        let size_of_t = std::mem::size_of::<T>();
626        callback(data.len() * size_of_t, data.capacity() * size_of_t);
627    }
628
629    vec_size(&data.smol, &mut callback);
630    vec_size(&data.chonk, callback);
631}