Skip to main content

mz_timely_util/columnar/
consolidate.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! A `ContainerBuilder` that consolidates `(D, T, R)` updates and emits columnar containers.
17//!
18//! Two-level buffering:
19//!
20//! 1. AoS staging `Vec<(D, T, R)>` with a small cap so `consolidate_updates`' `n log n` cost
21//!    stays bounded. Cancellations and same-key updates collapse here before reaching the
22//!    column-shaped storage. A drain-multiple-of-half-cap trick keeps the leftover in staging,
23//!    so cross-batch keys with the same `(D, T)` continue consolidating on the next sort.
24//! 2. SoA accumulator: one sub-container per column (`D::Container`, `T::Container`,
25//!    `R::Container`). Drains push in fixed-size chunks (`DRAIN_CHUNK_ROWS`) with three
26//!    sequential per-column passes per chunk, so the inner pushes autovectorize. After each
27//!    chunk, check the serialized size; once the accumulator reaches the flush threshold
28//!    (90% of `OUTPUT_TARGET_WORDS`), serialize into an aligned `Vec<u64>` (no zero-fill —
29//!    written by `indexed::encode`) and ship as `Column::Align`. Per-chunk granularity bounds
30//!    overshoot to `K * row_words`. The trailing partial on `finish` ships as `Column::Typed`.
31//!
32//! Generic over `(D, T, R): Columnar` via the columnar tuple decomposition
33//! `<(D, T, R) as Columnar>::Container = (D::Container, T::Container, R::Container)`.
34
35use std::collections::VecDeque;
36
37use columnar::bytes::indexed;
38use columnar::{Borrow, Columnar, Push};
39use differential_dataflow::Data;
40use differential_dataflow::consolidation::consolidate_updates;
41use differential_dataflow::difference::Semigroup;
42use timely::container::{ContainerBuilder, PushInto};
43
44use crate::columnar::Column;
45
46/// Per-buffer byte budget for the staging cap. Matches the 8 KiB basis DD's
47/// `ConsolidatingContainerBuilder` uses via `timely::container::buffer::default_capacity`.
48const STAGING_BUFFER_BYTES: usize = 8 * 1024;
49
50/// Default items per staging buffer: `2 * STAGING_BUFFER_BYTES / size_of::<(D, T, R)>()`,
51/// matching DD's `ConsolidatingContainerBuilder`. Small enough that O(n log n) sort stays
52/// cheap, large enough to amortize the sort + drain overhead across many pushes.
53fn default_staging_cap<D, T, R>() -> usize {
54    let elem = std::mem::size_of::<(D, T, R)>().max(1);
55    // Floor at 2 so the half-cap drain grain is at least 1.
56    (2 * STAGING_BUFFER_BYTES / elem).max(2)
57}
58/// Target serialized chunk size in u64 words (2 MiB). Bounded slop matters once we put these
59/// behind huge pages.
60const OUTPUT_TARGET_WORDS: usize = 1 << 18;
61/// Flush when within 10% of `OUTPUT_TARGET_WORDS` — matches `ColumnBuilder`'s slop heuristic.
62/// Computed at compile time so the hot loop is a single `cmp`/`jae` instead of a per-row
63/// round-up + divide-by-10.
64const FLUSH_THRESHOLD_WORDS: usize = OUTPUT_TARGET_WORDS - OUTPUT_TARGET_WORDS / 10;
65/// Drain rows from staging in chunks of this size. Inside each chunk we do three sequential
66/// per-column passes — long enough for autovectorization (4–8 vector iterations on NEON / SVE2
67/// 128-bit / AVX2 / SVE 256-bit) — while the outer per-chunk size check bounds overshoot to
68/// `K * row_words`. With a 1 KiB row (128 words) and K=16, worst-case overshoot is 2 KiB,
69/// well under the 10% slop budget on a 2 MiB target.
70const DRAIN_CHUNK_ROWS: usize = 16;
71
72/// A container builder that consolidates `(D, T, R)` updates and emits `Column<(D, T, R)>`.
73///
74/// Stages updates in an AoS `Vec` for in-place consolidation, then drains consolidated rows
75/// in `DRAIN_CHUNK_ROWS`-sized chunks (three sequential per-column passes per chunk) into SoA
76/// sub-containers, flushing whenever the accumulator hits the flush threshold (90% of
77/// `OUTPUT_TARGET_WORDS`). Flushed accumulators are written into aligned `Vec<u64>` (via
78/// `indexed::encode`, no zero-fill) and queued as `Column::Align`. The trailing partial on
79/// `finish` ships as `Column::Typed`.
80///
81/// Does **not** maintain FIFO ordering (consolidation reorders updates).
82pub struct ConsolidatingColumnBuilder<D, T, R>
83where
84    D: Columnar,
85    T: Columnar,
86    R: Columnar,
87{
88    /// AoS staging buffer for in-place consolidation. Cap = [`Self::staging_cap`].
89    staging: Vec<(D, T, R)>,
90    /// Capacity of `staging`. Drain triggers when `staging.len()` hits this.
91    staging_cap: usize,
92    /// SoA accumulator, one sub-container per column.
93    cur_d: D::Container,
94    cur_t: T::Container,
95    cur_r: R::Container,
96    /// Number of `(D, T, R)` tuples currently in `cur_*`.
97    cur_len: usize,
98    /// Finished columns ready to ship.
99    pending: VecDeque<Column<(D, T, R)>>,
100    /// The currently extracted/finished column.
101    finished: Option<Column<(D, T, R)>>,
102}
103
104impl<D, T, R> Default for ConsolidatingColumnBuilder<D, T, R>
105where
106    D: Columnar,
107    T: Columnar,
108    R: Columnar,
109{
110    fn default() -> Self {
111        let cap = default_staging_cap::<D, T, R>();
112        Self {
113            // Pre-allocate so `push` is unconditional (no per-push capacity check or lazy
114            // reserve branch).
115            staging: Vec::with_capacity(cap),
116            staging_cap: cap,
117            cur_d: D::Container::default(),
118            cur_t: T::Container::default(),
119            cur_r: R::Container::default(),
120            cur_len: 0,
121            pending: VecDeque::new(),
122            finished: None,
123        }
124    }
125}
126
127impl<D, T, R> ConsolidatingColumnBuilder<D, T, R>
128where
129    D: Data + Columnar,
130    T: Data + Columnar,
131    R: Semigroup + Columnar + 'static,
132    (D, T, R): Columnar<Container = (D::Container, T::Container, R::Container)>,
133{
134    /// Sort and consolidate `staging`, then drain a multiple-of-`grain` prefix into the SoA
135    /// accumulator. Pass `1` to drain everything (used by `finish`). Pushes in chunks of
136    /// `DRAIN_CHUNK_ROWS` and flushes mid-drain whenever the accumulator hits
137    /// `FLUSH_THRESHOLD_WORDS`, so a single drain can mint several aligned containers when the
138    /// prefix is large.
139    #[cold]
140    fn consolidate_and_drain(&mut self, grain: usize) {
141        consolidate_updates(&mut self.staging);
142        let drain_n = (self.staging.len() / grain) * grain;
143        if drain_n == 0 {
144            return;
145        }
146
147        // Drain in chunks of `DRAIN_CHUNK_ROWS` rows. Three sequential per-column passes inside
148        // the chunk give the compiler streamable loops it can autovectorize for primitive
149        // containers; the per-chunk size check bounds overshoot of the 90%-of-2-MiB threshold to
150        // ~`K * row_words`. After each chunk, check the serialized size (via `length_in_words`,
151        // not item count, so output Columns stay bounded for variable-width `D` like `Row`).
152        let mut consumed = 0;
153        while consumed < drain_n {
154            let take = (drain_n - consumed).min(DRAIN_CHUNK_ROWS);
155            let head = &self.staging[consumed..consumed + take];
156            for (d, _, _) in head {
157                self.cur_d.push(d);
158            }
159            for (_, t, _) in head {
160                self.cur_t.push(t);
161            }
162            for (_, _, r) in head {
163                self.cur_r.push(r);
164            }
165            self.cur_len += take;
166            consumed += take;
167
168            let words = {
169                let view = (
170                    self.cur_d.borrow(),
171                    self.cur_t.borrow(),
172                    self.cur_r.borrow(),
173                );
174                indexed::length_in_words(&view)
175            };
176            if words >= FLUSH_THRESHOLD_WORDS {
177                self.flush_aligned();
178            }
179        }
180        self.staging.drain(..consumed);
181    }
182
183    /// Serialize the SoA accumulator into a `Column::Align` via `indexed::encode`, which
184    /// builds the buffer with `Vec::push`/`extend_from_slice` so no memory is initialized
185    /// twice.
186    #[cold]
187    fn flush_aligned(&mut self) {
188        if self.cur_len == 0 {
189            return;
190        }
191        let cur: <(D, T, R) as Columnar>::Container = (
192            std::mem::take(&mut self.cur_d),
193            std::mem::take(&mut self.cur_t),
194            std::mem::take(&mut self.cur_r),
195        );
196        self.cur_len = 0;
197
198        let mut buf: Vec<u64> = Vec::with_capacity(indexed::length_in_words(&cur.borrow()));
199        indexed::encode(&mut buf, &cur.borrow());
200        self.pending.push_back(Column::Align(buf));
201    }
202}
203
204impl<D, T, R> PushInto<(D, T, R)> for ConsolidatingColumnBuilder<D, T, R>
205where
206    D: Data + Columnar,
207    T: Data + Columnar,
208    R: Semigroup + Columnar + 'static,
209    (D, T, R): Columnar<Container = (D::Container, T::Container, R::Container)>,
210{
211    /// Push an element into the staging buffer; consolidate + drain when full.
212    #[inline]
213    fn push_into(&mut self, item: (D, T, R)) {
214        self.staging.push(item);
215        if self.staging.len() == self.staging_cap {
216            self.consolidate_and_drain(self.staging_cap / 2);
217        }
218    }
219}
220
221impl<D, T, R> ContainerBuilder for ConsolidatingColumnBuilder<D, T, R>
222where
223    D: Data + Columnar,
224    T: Data + Columnar,
225    R: Semigroup + Columnar + 'static,
226    (D, T, R): Columnar<Container = (D::Container, T::Container, R::Container)>,
227    <(D, T, R) as Columnar>::Container: Clone,
228{
229    type Container = Column<(D, T, R)>;
230
231    #[inline]
232    fn extract(&mut self) -> Option<&mut Self::Container> {
233        if let Some(c) = self.pending.pop_front() {
234            self.finished = Some(c);
235            self.finished.as_mut()
236        } else {
237            None
238        }
239    }
240
241    #[inline]
242    fn finish(&mut self) -> Option<&mut Self::Container> {
243        if !self.staging.is_empty() {
244            // `multiple = 1` so any remainder also leaves staging.
245            self.consolidate_and_drain(1);
246        }
247        // Trailing partial: ship as `Column::Typed` (no extra serialize copy).
248        if self.cur_len > 0 {
249            let cur: <(D, T, R) as Columnar>::Container = (
250                std::mem::take(&mut self.cur_d),
251                std::mem::take(&mut self.cur_t),
252                std::mem::take(&mut self.cur_r),
253            );
254            self.cur_len = 0;
255            self.pending.push_back(Column::Typed(cur));
256        }
257        self.extract()
258    }
259}
260
261#[cfg(test)]
262mod tests {
263    use columnar::Index;
264    use columnar::Len;
265    use timely::container::{ContainerBuilder, PushInto};
266
267    use super::*;
268
269    /// Collect every `(D, T, R)` row from a `Column<(u64, u64, i64)>`.
270    fn rows(column: &Column<(u64, u64, i64)>) -> Vec<(u64, u64, i64)> {
271        let borrow = column.borrow();
272        (0..borrow.len())
273            .map(|i| {
274                let r = borrow.get(i);
275                (*r.0, *r.1, *r.2)
276            })
277            .collect()
278    }
279
280    /// Drain a builder by repeatedly calling `extract` then `finish`.
281    fn drain(mut builder: ConsolidatingColumnBuilder<u64, u64, i64>) -> Vec<(u64, u64, i64)> {
282        let mut out: Vec<(u64, u64, i64)> = Vec::new();
283        while let Some(c) = builder.extract() {
284            for r in rows(c) {
285                out.push(r);
286            }
287        }
288        if let Some(c) = builder.finish() {
289            for r in rows(c) {
290                out.push(r);
291            }
292        }
293        out
294    }
295
296    #[mz_ore::test]
297    fn empty_finish_yields_none() {
298        let mut builder: ConsolidatingColumnBuilder<u64, u64, i64> = Default::default();
299        assert!(builder.extract().is_none());
300        assert!(builder.finish().is_none());
301    }
302
303    #[mz_ore::test]
304    fn single_push_finish_yields_one() {
305        let mut builder: ConsolidatingColumnBuilder<u64, u64, i64> = Default::default();
306        builder.push_into((1u64, 0u64, 1i64));
307        let column = builder.finish().expect("one container");
308        assert_eq!(rows(column), vec![(1, 0, 1)]);
309        assert!(builder.finish().is_none());
310    }
311
312    #[mz_ore::test]
313    fn consolidates_on_threshold() {
314        let mut builder: ConsolidatingColumnBuilder<u64, u64, i64> = Default::default();
315        // Push enough +1/-1 pairs to exceed the staging cap several times over and trigger
316        // multiple consolidation cycles. Everything cancels in the staging buffer before
317        // ever reaching the SoA accumulator.
318        let cap = default_staging_cap::<u64, u64, i64>();
319        for _ in 0..(cap * 4) {
320            builder.push_into((7u64, 0u64, 1i64));
321            builder.push_into((7u64, 0u64, -1i64));
322        }
323        assert!(drain(builder).is_empty());
324    }
325
326    #[mz_ore::test]
327    fn cross_batch_consolidation() {
328        // A single key pushed many times must collapse to one row even across many staging
329        // refills. The drain-multiple-of-grain trick keeps the in-progress consolidated row
330        // in staging so subsequent pushes merge into it.
331        let mut builder: ConsolidatingColumnBuilder<u64, u64, i64> = Default::default();
332        let n: i64 = 100_000;
333        for _ in 0..n {
334            builder.push_into((42u64, 0u64, 1i64));
335        }
336        let out = drain(builder);
337        assert_eq!(out, vec![(42, 0, n)]);
338    }
339
340    #[mz_ore::test]
341    fn multiple_distinct_keys() {
342        let mut builder: ConsolidatingColumnBuilder<u64, u64, i64> = Default::default();
343        builder.push_into((1u64, 0u64, 1i64));
344        builder.push_into((2u64, 0u64, 1i64));
345        builder.push_into((1u64, 0u64, 1i64));
346        let mut out = drain(builder);
347        out.sort();
348        assert_eq!(out, vec![(1, 0, 2), (2, 0, 1)]);
349    }
350
351    #[mz_ore::test]
352    #[cfg_attr(miri, ignore)] // too slow
353    fn emits_multiple_containers() {
354        let mut builder: ConsolidatingColumnBuilder<u64, u64, i64> = Default::default();
355        // Enough distinct rows to fill the SoA accumulator past the output target multiple
356        // times. Each row is 24 bytes; ~87k rows ≈ 2 MiB, so 300k pushes should mint at least
357        // 2-3 aligned containers plus a typed partial on `finish`.
358        let n: u64 = 300_000;
359        for d in 0..n {
360            builder.push_into((d, 0u64, 1i64));
361        }
362
363        let mut containers = 0;
364        let mut out: Vec<(u64, u64, i64)> = Vec::new();
365        while let Some(c) = builder.extract() {
366            containers += 1;
367            for r in rows(c) {
368                out.push(r);
369            }
370        }
371        if let Some(c) = builder.finish() {
372            containers += 1;
373            for r in rows(c) {
374                out.push(r);
375            }
376        }
377        assert!(
378            containers > 1,
379            "expected multiple containers, got {containers}"
380        );
381        out.sort();
382        let expected: Vec<(u64, u64, i64)> = (0..n).map(|d| (d, 0u64, 1i64)).collect();
383        assert_eq!(out, expected);
384    }
385}