mz_timely_util/columnar/builder_input.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! `BuilderInput` impl for [`Column`] so DD `Builder`s can drain our paged
17//! batcher's output without an extra container conversion.
18//!
19//! Mirrors the impl on [`ColumnationStack`](crate::columnation::ColumnationStack)
20//! at `columnation.rs`, but the `Item<'a>` here is a columnar `Ref` tuple
21//! rather than a borrowed owned tuple, so:
22//!
23//! - `Key<'a>` / `Val<'a>` are `Ref<'a, K>` / `Ref<'a, V>` — no `Owned`
24//! round-trip on the read side.
25//! - `Time` / `Diff` materialize as owned on `into_parts` (the trait
26//! contract requires owned for these).
27//!
28//! Distinct-counts (`key_val_upd_counts`) tally per chunk and sum, accepting
29//! at most `chain.len()` over-counts at chunk boundaries. The downstream
30//! consumer uses these as capacity hints, so a small over-estimate is
31//! cheaper than the alternative (snapshotting `K::Owned` / `V::Owned`
32//! across chunk boundaries).
33
34use columnar::{Columnar, Index, Len};
35use differential_dataflow::difference::Semigroup;
36use differential_dataflow::lattice::Lattice;
37use differential_dataflow::trace::implementations::{BatchContainer, BuilderInput};
38use timely::progress::Timestamp;
39
40use crate::columnar::Column;
41
42impl<KBC, VBC, K, V, T, R> BuilderInput<KBC, VBC> for Column<((K, V), T, R)>
43where
44 K: Columnar,
45 V: Columnar,
46 T: Columnar + Timestamp + Lattice + Clone,
47 R: Columnar + Ord + Semigroup + Clone,
48 for<'a> columnar::Ref<'a, K>: Copy + Ord,
49 for<'a> columnar::Ref<'a, V>: Copy + Ord,
50 KBC: BatchContainer,
51 VBC: BatchContainer,
52 for<'a, 'b> KBC::ReadItem<'a>: PartialEq<columnar::Ref<'b, K>>,
53 for<'a, 'b> VBC::ReadItem<'a>: PartialEq<columnar::Ref<'b, V>>,
54{
55 type Key<'a> = columnar::Ref<'a, K>;
56 type Val<'a> = columnar::Ref<'a, V>;
57 type Time = T;
58 type Diff = R;
59
60 fn into_parts<'a>(
61 item: Self::Item<'a>,
62 ) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) {
63 let ((key, val), time, diff) = item;
64 (key, val, T::into_owned(time), R::into_owned(diff))
65 }
66
67 fn key_eq(this: &columnar::Ref<'_, K>, other: KBC::ReadItem<'_>) -> bool {
68 KBC::reborrow(other) == *this
69 }
70
71 fn val_eq(this: &columnar::Ref<'_, V>, other: VBC::ReadItem<'_>) -> bool {
72 VBC::reborrow(other) == *this
73 }
74
75 fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) {
76 // Per-chunk dedup, summed. Skips cross-chunk equality checks; the
77 // counts may over-count by up to `chain.len()` (one boundary per
78 // chunk). Capacity-hint consumers tolerate over-estimates.
79 let mut keys = 0;
80 let mut vals = 0;
81 let mut upds = 0;
82 for col in chain.iter() {
83 let view = col.borrow();
84 let len = view.len();
85 if len == 0 {
86 continue;
87 }
88 let mut prev: Option<(columnar::Ref<'_, K>, columnar::Ref<'_, V>)> = None;
89 for i in 0..len {
90 let ((k, v), _, _) = view.get(i);
91 match prev {
92 None => {
93 keys += 1;
94 vals += 1;
95 }
96 Some((pk, pv)) => {
97 if pk != k {
98 keys += 1;
99 vals += 1;
100 } else if pv != v {
101 vals += 1;
102 }
103 }
104 }
105 upds += 1;
106 prev = Some((k, v));
107 }
108 }
109 (keys, vals, upds)
110 }
111}