Skip to main content

mz_timely_util/columnar/
builder_input.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! `BuilderInput` impl for [`Column`] so DD `Builder`s can drain our paged
17//! batcher's output without an extra container conversion.
18//!
19//! Mirrors the impl on [`ColumnationStack`](crate::columnation::ColumnationStack)
20//! at `columnation.rs`, but the `Item<'a>` here is a columnar `Ref` tuple
21//! rather than a borrowed owned tuple, so:
22//!
23//! - `Key<'a>` / `Val<'a>` are `Ref<'a, K>` / `Ref<'a, V>` — no `Owned`
24//!   round-trip on the read side.
25//! - `Time` / `Diff` materialize as owned on `into_parts` (the trait
26//!   contract requires owned for these).
27//!
28//! Distinct-counts (`key_val_upd_counts`) tally per chunk and sum, accepting
29//! at most `chain.len()` over-counts at chunk boundaries. The downstream
30//! consumer uses these as capacity hints, so a small over-estimate is
31//! cheaper than the alternative (snapshotting `K::Owned` / `V::Owned`
32//! across chunk boundaries).
33
34use columnar::{Columnar, Index, Len};
35use differential_dataflow::difference::Semigroup;
36use differential_dataflow::lattice::Lattice;
37use differential_dataflow::trace::implementations::{BatchContainer, BuilderInput};
38use timely::progress::Timestamp;
39
40use crate::columnar::Column;
41
42impl<KBC, VBC, K, V, T, R> BuilderInput<KBC, VBC> for Column<((K, V), T, R)>
43where
44    K: Columnar,
45    V: Columnar,
46    T: Columnar + Timestamp + Lattice + Clone,
47    R: Columnar + Ord + Semigroup + Clone,
48    for<'a> columnar::Ref<'a, K>: Copy + Ord,
49    for<'a> columnar::Ref<'a, V>: Copy + Ord,
50    KBC: BatchContainer,
51    VBC: BatchContainer,
52    for<'a, 'b> KBC::ReadItem<'a>: PartialEq<columnar::Ref<'b, K>>,
53    for<'a, 'b> VBC::ReadItem<'a>: PartialEq<columnar::Ref<'b, V>>,
54{
55    type Key<'a> = columnar::Ref<'a, K>;
56    type Val<'a> = columnar::Ref<'a, V>;
57    type Time = T;
58    type Diff = R;
59
60    fn into_parts<'a>(
61        item: Self::Item<'a>,
62    ) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) {
63        let ((key, val), time, diff) = item;
64        (key, val, T::into_owned(time), R::into_owned(diff))
65    }
66
67    fn key_eq(this: &columnar::Ref<'_, K>, other: KBC::ReadItem<'_>) -> bool {
68        KBC::reborrow(other) == *this
69    }
70
71    fn val_eq(this: &columnar::Ref<'_, V>, other: VBC::ReadItem<'_>) -> bool {
72        VBC::reborrow(other) == *this
73    }
74
75    fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) {
76        // Per-chunk dedup, summed. Skips cross-chunk equality checks; the
77        // counts may over-count by up to `chain.len()` (one boundary per
78        // chunk). Capacity-hint consumers tolerate over-estimates.
79        let mut keys = 0;
80        let mut vals = 0;
81        let mut upds = 0;
82        for col in chain.iter() {
83            let view = col.borrow();
84            let len = view.len();
85            if len == 0 {
86                continue;
87            }
88            let mut prev: Option<(columnar::Ref<'_, K>, columnar::Ref<'_, V>)> = None;
89            for i in 0..len {
90                let ((k, v), _, _) = view.get(i);
91                match prev {
92                    None => {
93                        keys += 1;
94                        vals += 1;
95                    }
96                    Some((pk, pv)) => {
97                        if pk != k {
98                            keys += 1;
99                            vals += 1;
100                        } else if pv != v {
101                            vals += 1;
102                        }
103                    }
104                }
105                upds += 1;
106                prev = Some((k, v));
107            }
108        }
109        (keys, vals, upds)
110    }
111}