mz_timely_util/columnar/consolidate.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! A `ContainerBuilder` that consolidates `(D, T, R)` updates and emits columnar containers.
17//!
18//! Two-level buffering:
19//!
20//! 1. AoS staging `Vec<(D, T, R)>` with a small cap so `consolidate_updates`' `n log n` cost
21//! stays bounded. Cancellations and same-key updates collapse here before reaching the
22//! column-shaped storage. A drain-multiple-of-half-cap trick keeps the leftover in staging,
23//! so cross-batch keys with the same `(D, T)` continue consolidating on the next sort.
24//! 2. SoA accumulator: one sub-container per column (`D::Container`, `T::Container`,
25//! `R::Container`). Drains push in fixed-size chunks (`DRAIN_CHUNK_ROWS`) with three
26//! sequential per-column passes per chunk, so the inner pushes autovectorize. After each
27//! chunk, check the serialized size; once the accumulator reaches the flush threshold
28//! (90% of `OUTPUT_TARGET_WORDS`), serialize into an aligned `Vec<u64>` (no zero-fill —
29//! written by `indexed::encode`) and ship as `Column::Align`. Per-chunk granularity bounds
30//! overshoot to `K * row_words`. The trailing partial on `finish` ships as `Column::Typed`.
31//!
32//! Generic over `(D, T, R): Columnar` via the columnar tuple decomposition
33//! `<(D, T, R) as Columnar>::Container = (D::Container, T::Container, R::Container)`.
34
35use std::collections::VecDeque;
36
37use columnar::bytes::indexed;
38use columnar::{Borrow, Columnar, Push};
39use differential_dataflow::Data;
40use differential_dataflow::consolidation::consolidate_updates;
41use differential_dataflow::difference::Semigroup;
42use timely::container::{ContainerBuilder, PushInto};
43
44use crate::columnar::Column;
45
46/// Per-buffer byte budget for the staging cap. Matches the 8 KiB basis DD's
47/// `ConsolidatingContainerBuilder` uses via `timely::container::buffer::default_capacity`.
48const STAGING_BUFFER_BYTES: usize = 8 * 1024;
49
50/// Default items per staging buffer: `2 * STAGING_BUFFER_BYTES / size_of::<(D, T, R)>()`,
51/// matching DD's `ConsolidatingContainerBuilder`. Small enough that O(n log n) sort stays
52/// cheap, large enough to amortize the sort + drain overhead across many pushes.
53fn default_staging_cap<D, T, R>() -> usize {
54 let elem = std::mem::size_of::<(D, T, R)>().max(1);
55 // Floor at 2 so the half-cap drain grain is at least 1.
56 (2 * STAGING_BUFFER_BYTES / elem).max(2)
57}
58/// Target serialized chunk size in u64 words (2 MiB). Bounded slop matters once we put these
59/// behind huge pages.
60const OUTPUT_TARGET_WORDS: usize = 1 << 18;
61/// Flush when within 10% of `OUTPUT_TARGET_WORDS` — matches `ColumnBuilder`'s slop heuristic.
62/// Computed at compile time so the hot loop is a single `cmp`/`jae` instead of a per-row
63/// round-up + divide-by-10.
64const FLUSH_THRESHOLD_WORDS: usize = OUTPUT_TARGET_WORDS - OUTPUT_TARGET_WORDS / 10;
65/// Drain rows from staging in chunks of this size. Inside each chunk we do three sequential
66/// per-column passes — long enough for autovectorization (4–8 vector iterations on NEON / SVE2
67/// 128-bit / AVX2 / SVE 256-bit) — while the outer per-chunk size check bounds overshoot to
68/// `K * row_words`. With a 1 KiB row (128 words) and K=16, worst-case overshoot is 2 KiB,
69/// well under the 10% slop budget on a 2 MiB target.
70const DRAIN_CHUNK_ROWS: usize = 16;
71
72/// A container builder that consolidates `(D, T, R)` updates and emits `Column<(D, T, R)>`.
73///
74/// Stages updates in an AoS `Vec` for in-place consolidation, then drains consolidated rows
75/// in `DRAIN_CHUNK_ROWS`-sized chunks (three sequential per-column passes per chunk) into SoA
76/// sub-containers, flushing whenever the accumulator hits the flush threshold (90% of
77/// `OUTPUT_TARGET_WORDS`). Flushed accumulators are written into aligned `Vec<u64>` (via
78/// `indexed::encode`, no zero-fill) and queued as `Column::Align`. The trailing partial on
79/// `finish` ships as `Column::Typed`.
80///
81/// Does **not** maintain FIFO ordering (consolidation reorders updates).
82pub struct ConsolidatingColumnBuilder<D, T, R>
83where
84 D: Columnar,
85 T: Columnar,
86 R: Columnar,
87{
88 /// AoS staging buffer for in-place consolidation. Cap = [`Self::staging_cap`].
89 staging: Vec<(D, T, R)>,
90 /// Capacity of `staging`. Drain triggers when `staging.len()` hits this.
91 staging_cap: usize,
92 /// SoA accumulator, one sub-container per column.
93 cur_d: D::Container,
94 cur_t: T::Container,
95 cur_r: R::Container,
96 /// Number of `(D, T, R)` tuples currently in `cur_*`.
97 cur_len: usize,
98 /// Finished columns ready to ship.
99 pending: VecDeque<Column<(D, T, R)>>,
100 /// The currently extracted/finished column.
101 finished: Option<Column<(D, T, R)>>,
102}
103
104impl<D, T, R> Default for ConsolidatingColumnBuilder<D, T, R>
105where
106 D: Columnar,
107 T: Columnar,
108 R: Columnar,
109{
110 fn default() -> Self {
111 let cap = default_staging_cap::<D, T, R>();
112 Self {
113 // Pre-allocate so `push` is unconditional (no per-push capacity check or lazy
114 // reserve branch).
115 staging: Vec::with_capacity(cap),
116 staging_cap: cap,
117 cur_d: D::Container::default(),
118 cur_t: T::Container::default(),
119 cur_r: R::Container::default(),
120 cur_len: 0,
121 pending: VecDeque::new(),
122 finished: None,
123 }
124 }
125}
126
127impl<D, T, R> ConsolidatingColumnBuilder<D, T, R>
128where
129 D: Data + Columnar,
130 T: Data + Columnar,
131 R: Semigroup + Columnar + 'static,
132 (D, T, R): Columnar<Container = (D::Container, T::Container, R::Container)>,
133{
134 /// Sort and consolidate `staging`, then drain a multiple-of-`grain` prefix into the SoA
135 /// accumulator. Pass `1` to drain everything (used by `finish`). Pushes in chunks of
136 /// `DRAIN_CHUNK_ROWS` and flushes mid-drain whenever the accumulator hits
137 /// `FLUSH_THRESHOLD_WORDS`, so a single drain can mint several aligned containers when the
138 /// prefix is large.
139 #[cold]
140 fn consolidate_and_drain(&mut self, grain: usize) {
141 consolidate_updates(&mut self.staging);
142 let drain_n = (self.staging.len() / grain) * grain;
143 if drain_n == 0 {
144 return;
145 }
146
147 // Drain in chunks of `DRAIN_CHUNK_ROWS` rows. Three sequential per-column passes inside
148 // the chunk give the compiler streamable loops it can autovectorize for primitive
149 // containers; the per-chunk size check bounds overshoot of the 90%-of-2-MiB threshold to
150 // ~`K * row_words`. After each chunk, check the serialized size (via `length_in_words`,
151 // not item count, so output Columns stay bounded for variable-width `D` like `Row`).
152 let mut consumed = 0;
153 while consumed < drain_n {
154 let take = (drain_n - consumed).min(DRAIN_CHUNK_ROWS);
155 let head = &self.staging[consumed..consumed + take];
156 for (d, _, _) in head {
157 self.cur_d.push(d);
158 }
159 for (_, t, _) in head {
160 self.cur_t.push(t);
161 }
162 for (_, _, r) in head {
163 self.cur_r.push(r);
164 }
165 self.cur_len += take;
166 consumed += take;
167
168 let words = {
169 let view = (
170 self.cur_d.borrow(),
171 self.cur_t.borrow(),
172 self.cur_r.borrow(),
173 );
174 indexed::length_in_words(&view)
175 };
176 if words >= FLUSH_THRESHOLD_WORDS {
177 self.flush_aligned();
178 }
179 }
180 self.staging.drain(..consumed);
181 }
182
183 /// Serialize the SoA accumulator into a `Column::Align` via `indexed::encode`, which
184 /// builds the buffer with `Vec::push`/`extend_from_slice` so no memory is initialized
185 /// twice.
186 #[cold]
187 fn flush_aligned(&mut self) {
188 if self.cur_len == 0 {
189 return;
190 }
191 let cur: <(D, T, R) as Columnar>::Container = (
192 std::mem::take(&mut self.cur_d),
193 std::mem::take(&mut self.cur_t),
194 std::mem::take(&mut self.cur_r),
195 );
196 self.cur_len = 0;
197
198 let mut buf: Vec<u64> = Vec::with_capacity(indexed::length_in_words(&cur.borrow()));
199 indexed::encode(&mut buf, &cur.borrow());
200 self.pending.push_back(Column::Align(buf));
201 }
202}
203
204impl<D, T, R> PushInto<(D, T, R)> for ConsolidatingColumnBuilder<D, T, R>
205where
206 D: Data + Columnar,
207 T: Data + Columnar,
208 R: Semigroup + Columnar + 'static,
209 (D, T, R): Columnar<Container = (D::Container, T::Container, R::Container)>,
210{
211 /// Push an element into the staging buffer; consolidate + drain when full.
212 #[inline]
213 fn push_into(&mut self, item: (D, T, R)) {
214 self.staging.push(item);
215 if self.staging.len() == self.staging_cap {
216 self.consolidate_and_drain(self.staging_cap / 2);
217 }
218 }
219}
220
221impl<D, T, R> ContainerBuilder for ConsolidatingColumnBuilder<D, T, R>
222where
223 D: Data + Columnar,
224 T: Data + Columnar,
225 R: Semigroup + Columnar + 'static,
226 (D, T, R): Columnar<Container = (D::Container, T::Container, R::Container)>,
227 <(D, T, R) as Columnar>::Container: Clone,
228{
229 type Container = Column<(D, T, R)>;
230
231 #[inline]
232 fn extract(&mut self) -> Option<&mut Self::Container> {
233 if let Some(c) = self.pending.pop_front() {
234 self.finished = Some(c);
235 self.finished.as_mut()
236 } else {
237 None
238 }
239 }
240
241 #[inline]
242 fn finish(&mut self) -> Option<&mut Self::Container> {
243 if !self.staging.is_empty() {
244 // `multiple = 1` so any remainder also leaves staging.
245 self.consolidate_and_drain(1);
246 }
247 // Trailing partial: ship as `Column::Typed` (no extra serialize copy).
248 if self.cur_len > 0 {
249 let cur: <(D, T, R) as Columnar>::Container = (
250 std::mem::take(&mut self.cur_d),
251 std::mem::take(&mut self.cur_t),
252 std::mem::take(&mut self.cur_r),
253 );
254 self.cur_len = 0;
255 self.pending.push_back(Column::Typed(cur));
256 }
257 self.extract()
258 }
259}
260
261#[cfg(test)]
262mod tests {
263 use columnar::Index;
264 use columnar::Len;
265 use timely::container::{ContainerBuilder, PushInto};
266
267 use super::*;
268
269 /// Collect every `(D, T, R)` row from a `Column<(u64, u64, i64)>`.
270 fn rows(column: &Column<(u64, u64, i64)>) -> Vec<(u64, u64, i64)> {
271 let borrow = column.borrow();
272 (0..borrow.len())
273 .map(|i| {
274 let r = borrow.get(i);
275 (*r.0, *r.1, *r.2)
276 })
277 .collect()
278 }
279
280 /// Drain a builder by repeatedly calling `extract` then `finish`.
281 fn drain(mut builder: ConsolidatingColumnBuilder<u64, u64, i64>) -> Vec<(u64, u64, i64)> {
282 let mut out: Vec<(u64, u64, i64)> = Vec::new();
283 while let Some(c) = builder.extract() {
284 for r in rows(c) {
285 out.push(r);
286 }
287 }
288 if let Some(c) = builder.finish() {
289 for r in rows(c) {
290 out.push(r);
291 }
292 }
293 out
294 }
295
296 #[mz_ore::test]
297 fn empty_finish_yields_none() {
298 let mut builder: ConsolidatingColumnBuilder<u64, u64, i64> = Default::default();
299 assert!(builder.extract().is_none());
300 assert!(builder.finish().is_none());
301 }
302
303 #[mz_ore::test]
304 fn single_push_finish_yields_one() {
305 let mut builder: ConsolidatingColumnBuilder<u64, u64, i64> = Default::default();
306 builder.push_into((1u64, 0u64, 1i64));
307 let column = builder.finish().expect("one container");
308 assert_eq!(rows(column), vec![(1, 0, 1)]);
309 assert!(builder.finish().is_none());
310 }
311
312 #[mz_ore::test]
313 fn consolidates_on_threshold() {
314 let mut builder: ConsolidatingColumnBuilder<u64, u64, i64> = Default::default();
315 // Push enough +1/-1 pairs to exceed the staging cap several times over and trigger
316 // multiple consolidation cycles. Everything cancels in the staging buffer before
317 // ever reaching the SoA accumulator.
318 let cap = default_staging_cap::<u64, u64, i64>();
319 for _ in 0..(cap * 4) {
320 builder.push_into((7u64, 0u64, 1i64));
321 builder.push_into((7u64, 0u64, -1i64));
322 }
323 assert!(drain(builder).is_empty());
324 }
325
326 #[mz_ore::test]
327 fn cross_batch_consolidation() {
328 // A single key pushed many times must collapse to one row even across many staging
329 // refills. The drain-multiple-of-grain trick keeps the in-progress consolidated row
330 // in staging so subsequent pushes merge into it.
331 let mut builder: ConsolidatingColumnBuilder<u64, u64, i64> = Default::default();
332 let n: i64 = 100_000;
333 for _ in 0..n {
334 builder.push_into((42u64, 0u64, 1i64));
335 }
336 let out = drain(builder);
337 assert_eq!(out, vec![(42, 0, n)]);
338 }
339
340 #[mz_ore::test]
341 fn multiple_distinct_keys() {
342 let mut builder: ConsolidatingColumnBuilder<u64, u64, i64> = Default::default();
343 builder.push_into((1u64, 0u64, 1i64));
344 builder.push_into((2u64, 0u64, 1i64));
345 builder.push_into((1u64, 0u64, 1i64));
346 let mut out = drain(builder);
347 out.sort();
348 assert_eq!(out, vec![(1, 0, 2), (2, 0, 1)]);
349 }
350
351 #[mz_ore::test]
352 #[cfg_attr(miri, ignore)] // too slow
353 fn emits_multiple_containers() {
354 let mut builder: ConsolidatingColumnBuilder<u64, u64, i64> = Default::default();
355 // Enough distinct rows to fill the SoA accumulator past the output target multiple
356 // times. Each row is 24 bytes; ~87k rows ≈ 2 MiB, so 300k pushes should mint at least
357 // 2-3 aligned containers plus a typed partial on `finish`.
358 let n: u64 = 300_000;
359 for d in 0..n {
360 builder.push_into((d, 0u64, 1i64));
361 }
362
363 let mut containers = 0;
364 let mut out: Vec<(u64, u64, i64)> = Vec::new();
365 while let Some(c) = builder.extract() {
366 containers += 1;
367 for r in rows(c) {
368 out.push(r);
369 }
370 }
371 if let Some(c) = builder.finish() {
372 containers += 1;
373 for r in rows(c) {
374 out.push(r);
375 }
376 }
377 assert!(
378 containers > 1,
379 "expected multiple containers, got {containers}"
380 );
381 out.sort();
382 let expected: Vec<(u64, u64, i64)> = (0..n).map(|d| (d, 0u64, 1i64)).collect();
383 assert_eq!(out, expected);
384 }
385}