Skip to main content

mz_timely_util/
columnar.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Container for columnar data.
17
18#![deny(missing_docs)]
19
20pub mod batcher;
21pub mod builder;
22pub mod builder_input;
23pub mod consolidate;
24pub mod merge_batcher;
25
26use std::hash::Hash;
27
28use columnar::Borrow;
29use columnar::bytes::indexed;
30use columnar::common::IterOwn;
31use columnar::{Clear, FromBytes, Index, Len};
32use columnar::{Columnar, Ref};
33use differential_dataflow::Hashable;
34use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher;
35use timely::Accountable;
36use timely::bytes::arc::Bytes;
37use timely::container::{DrainContainer, PushInto, SizableContainer};
38use timely::dataflow::channels::ContainerBytes;
39
40use crate::columnation::ColInternalMerger;
41
42/// A batcher for columnar storage.
43///
44/// The chunker is supplied to the arrange operator separately. Callers pass
45/// it explicitly: [`ColumnationChunker`](crate::columnation::ColumnationChunker)
46/// for `Vec<_>` input, or [`batcher::Chunker`] (over a `ColumnationStack<_>`) for
47/// [`Column`] input.
48pub type Col2ValBatcher<K, V, T, R> = MergeBatcher<ColInternalMerger<(K, V), T, R>>;
49/// A batcher for columnar storage with unit values.
50pub type Col2KeyBatcher<K, T, R> = Col2ValBatcher<K, (), T, R>;
51
52/// Pageable counterpart to [`Col2ValBatcher`]. Routes every chunk produced
53/// by chunking, merging, or extract through a [`crate::column_pager::ColumnPager`],
54/// so memory pressure can spill chains to a backing store without touching
55/// the merge / extract bodies.
56///
57/// Drop-in shape at the type level: both aliases take `(K, V, T, R)` and
58/// produce a `Batcher<Input = Column<((K, V), T, R)>, Output = Column<((K,
59/// V), T, R)>>`. Call sites can swap with `cargo fix`–style renaming once
60/// downstream `Trace`/`Builder` impls have been wired up. The pager itself
61/// defaults to [`crate::column_pager::ColumnPager::disabled`]; inject a
62/// real one via [`merge_batcher::ColumnMergeBatcher::set_pager`].
63pub type Col2ValPagedBatcher<K, V, T, R> = merge_batcher::ColumnMergeBatcher<(K, V), T, R>;
64
65/// A container based on a columnar store, encoded in aligned bytes.
66///
67/// The type can represent typed data, bytes from Timely, or an aligned allocation. The name
68/// is singular to express that the preferred format is [`Column::Align`]. The [`Column::Typed`]
69/// variant is used to construct the container, and it owns potentially multiple columns of data.
70pub enum Column<C: Columnar> {
71    /// The typed variant of the container.
72    Typed(C::Container),
73    /// The binary variant of the container.
74    Bytes(Bytes),
75    /// Relocated, aligned binary data, if `Bytes` doesn't work for some reason.
76    ///
77    /// Reasons could include misalignment, cloning of data, or wanting
78    /// to release the `Bytes` as a scarce resource.
79    ///
80    /// `Vec<u64>` guarantees `u64` alignment for the contained bytes.
81    Align(Vec<u64>),
82}
83
84impl<C: Columnar> Column<C> {
85    /// Empties the column, retaining the `Typed` variant's allocation so the
86    /// caller can refill it.
87    ///
88    /// [`columnar::Clear`] clears the typed container in place without
89    /// releasing its capacity. The serialized variants (`Bytes`/`Align`) own
90    /// no reusable typed buffer, so they are reset to an empty `Typed`.
91    #[inline]
92    pub fn clear(&mut self) {
93        match self {
94            Column::Typed(t) => t.clear(),
95            Column::Bytes(_) | Column::Align(_) => *self = Default::default(),
96        }
97    }
98
99    /// Borrows the container as a reference.
100    #[inline]
101    pub fn borrow(&self) -> <C::Container as Borrow>::Borrowed<'_> {
102        match self {
103            Column::Typed(t) => t.borrow(),
104            Column::Bytes(b) => <<C::Container as Borrow>::Borrowed<'_>>::from_bytes(
105                &mut indexed::decode(bytemuck::cast_slice(b)),
106            ),
107            Column::Align(a) => {
108                <<C::Container as Borrow>::Borrowed<'_>>::from_bytes(&mut indexed::decode(a))
109            }
110        }
111    }
112}
113
114impl<C: Columnar> Default for Column<C> {
115    fn default() -> Self {
116        Self::Typed(Default::default())
117    }
118}
119
120impl<C: Columnar> Clone for Column<C>
121where
122    C::Container: Clone,
123{
124    fn clone(&self) -> Self {
125        match self {
126            // Typed stays typed, although we would have the option to move to aligned data.
127            // If we did it might be confusing why we couldn't push into a cloned column.
128            Column::Typed(t) => Column::Typed(t.clone()),
129            Column::Bytes(b) => {
130                assert_eq!(b.len() % 8, 0);
131                Self::Align(bytemuck::allocation::pod_collect_to_vec(b))
132            }
133            Column::Align(a) => Column::Align(a.clone()),
134        }
135    }
136}
137
138impl<C: Columnar> Accountable for Column<C> {
139    #[inline]
140    fn record_count(&self) -> i64 {
141        self.borrow().len().try_into().expect("Must fit")
142    }
143}
144impl<C: Columnar> DrainContainer for Column<C> {
145    type Item<'a> = Ref<'a, C>;
146    type DrainIter<'a> = IterOwn<<C::Container as Borrow>::Borrowed<'a>>;
147    #[inline]
148    fn drain(&mut self) -> Self::DrainIter<'_> {
149        self.borrow().into_index_iter()
150    }
151}
152
153impl<C: Columnar, T> PushInto<T> for Column<C>
154where
155    C::Container: columnar::Push<T>,
156{
157    #[inline]
158    fn push_into(&mut self, item: T) {
159        use columnar::Push;
160        match self {
161            Column::Typed(t) => t.push(item),
162            Column::Align(_) | Column::Bytes(_) => {
163                // We really oughtn't be calling this in this case.
164                // We could convert to owned, but need more constraints on `C`.
165                unimplemented!("Pushing into Column::Bytes without first clearing");
166            }
167        }
168    }
169}
170
171/// Words per 2 MiB. `length_in_words` returns serialized size in `u64` units,
172/// so this is the page count we round up to. Picked to match
173/// [`builder::ColumnBuilder`]'s output granularity so chunks shipped from the
174/// merger and chunks shipped from the builder are sized comparably.
175const SHIP_WORDS: usize = 1 << 18;
176
177/// Returns true once the serialized size of `borrow` is within 10% of the next
178/// `SHIP_WORDS` boundary.
179///
180/// Same heuristic as `ColumnBuilder::push_into`; lifted out so the merger and
181/// the `SizableContainer` impl agree on the ship signal.
182#[inline]
183pub(crate) fn at_serialized_capacity<'a, A>(borrow: &A) -> bool
184where
185    A: columnar::AsBytes<'a>,
186{
187    let words = indexed::length_in_words(borrow);
188    let round = (words + (SHIP_WORDS - 1)) & !(SHIP_WORDS - 1);
189    round - words < round / 10
190}
191
192impl<C: Columnar> SizableContainer for Column<C> {
193    fn at_capacity(&self) -> bool {
194        // Match `ColumnBuilder`'s ship heuristic: serialized size within 10%
195        // of the next 2 MiB. Aligns chunk-size choices across the two paths
196        // and keeps recipients dealing with a single granularity.
197        //
198        // Serialized chunks (`Bytes` / `Align`) have no typed builder to push
199        // into, so they're trivially "at capacity" — there's no further work
200        // they can absorb.
201        match self {
202            Column::Typed(c) => at_serialized_capacity(&c.borrow()),
203            Column::Bytes(_) | Column::Align(_) => true,
204        }
205    }
206
207    fn ensure_capacity(&mut self, _stash: &mut Option<Self>) {
208        // No pre-reservation: chunks are recycled by the merge framework, so
209        // leaf capacities settle to steady-state after the first round and
210        // there is nothing useful to reserve up front. The `SizableContainer`
211        // impl exists so `at_capacity` is callable on result chunks during
212        // `Merger::merge` orchestration; `ensure_capacity` is a required
213        // method on the trait but has no work to do here.
214    }
215}
216
217impl<C: Columnar> ContainerBytes for Column<C> {
218    #[inline]
219    fn from_bytes(bytes: Bytes) -> Self {
220        // Our expectation / hope is that `bytes` is `u64` aligned and sized.
221        // If the alignment is borked, we can relocate. If the size is borked,
222        // not sure what we do in that case. An incorrect size indicates a problem
223        // of `into_bytes`, or a failure of the communication layer, both of which
224        // are unrecoverable.
225        assert_eq!(bytes.len() % 8, 0);
226        if let Ok(_) = bytemuck::try_cast_slice::<_, u64>(&bytes) {
227            Self::Bytes(bytes)
228        } else {
229            // We failed to cast the slice, so we'll reallocate. `Vec<u64>`
230            // is u64-aligned by construction.
231            Self::Align(bytemuck::allocation::pod_collect_to_vec(&bytes[..]))
232        }
233    }
234
235    #[inline]
236    fn length_in_bytes(&self) -> usize {
237        match self {
238            Column::Typed(t) => indexed::length_in_bytes(&t.borrow()),
239            Column::Bytes(b) => b.len(),
240            Column::Align(a) => 8 * a.len(),
241        }
242    }
243
244    #[inline]
245    fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
246        match self {
247            Column::Typed(t) => indexed::write(writer, &t.borrow()).unwrap(),
248            Column::Bytes(b) => writer.write_all(b).unwrap(),
249            Column::Align(a) => writer.write_all(bytemuck::cast_slice(a)).unwrap(),
250        }
251    }
252}
253
254/// An exchange function for columnar tuples of the form `((K, V), T, D)`. Rust has a hard
255/// time to figure out the lifetimes of the elements when specified as a closure, so we rather
256/// specify it as a function.
257#[inline(always)]
258pub fn columnar_exchange<K, V, T, D>(((k, _), _, _): &Ref<'_, ((K, V), T, D)>) -> u64
259where
260    K: Columnar,
261    for<'a> Ref<'a, K>: Hash,
262    V: Columnar,
263    D: Columnar,
264    T: Columnar,
265{
266    k.hashed()
267}
268
269#[cfg(test)]
270mod tests {
271    use timely::bytes::arc::BytesMut;
272    use timely::container::PushInto;
273    use timely::dataflow::channels::ContainerBytes;
274
275    use super::*;
276
277    /// Produce some bytes that are in columnar format.
278    fn raw_columnar_bytes() -> Vec<u8> {
279        let mut raw = Vec::new();
280        raw.extend(16_u64.to_le_bytes()); // offsets
281        raw.extend(28_u64.to_le_bytes()); // length
282        raw.extend(1_i32.to_le_bytes());
283        raw.extend(2_i32.to_le_bytes());
284        raw.extend(3_i32.to_le_bytes());
285        raw.extend([0, 0, 0, 0]); // padding
286        raw
287    }
288
289    #[mz_ore::test]
290    fn test_column_clone() {
291        let columns = Columnar::as_columns([1, 2, 3].iter());
292        let column_typed: Column<i32> = Column::Typed(columns);
293        let column_typed2 = column_typed.clone();
294
295        assert_eq!(
296            column_typed2.borrow().into_index_iter().collect::<Vec<_>>(),
297            vec![&1, &2, &3]
298        );
299
300        let bytes = BytesMut::from(raw_columnar_bytes()).freeze();
301        let column_bytes: Column<i32> = Column::Bytes(bytes);
302        let column_bytes2 = column_bytes.clone();
303
304        assert_eq!(
305            column_bytes2.borrow().into_index_iter().collect::<Vec<_>>(),
306            vec![&1, &2, &3]
307        );
308
309        let raw = raw_columnar_bytes();
310        let mut region: Vec<u64> = vec![0; raw.len() / 8];
311        let region_bytes = bytemuck::cast_slice_mut(&mut region[..]);
312        region_bytes[..raw.len()].copy_from_slice(&raw);
313        let column_align: Column<i32> = Column::Align(region);
314        let column_align2 = column_align.clone();
315
316        assert_eq!(
317            column_align2.borrow().into_index_iter().collect::<Vec<_>>(),
318            vec![&1, &2, &3]
319        );
320    }
321
322    /// Assert the desired contents of raw_columnar_bytes so that diagnosing test failures is
323    /// easier.
324    #[mz_ore::test]
325    fn test_column_known_bytes() {
326        let mut column: Column<i32> = Default::default();
327        column.push_into(1);
328        column.push_into(2);
329        column.push_into(3);
330        let mut data = Vec::new();
331        column.into_bytes(&mut std::io::Cursor::new(&mut data));
332        assert_eq!(data, raw_columnar_bytes());
333    }
334
335    #[mz_ore::test]
336    fn test_column_from_bytes() {
337        let raw = raw_columnar_bytes();
338
339        let buf = vec![0; raw.len() + 8];
340        let align = buf.as_ptr().align_offset(std::mem::size_of::<u64>());
341        let mut bytes_mut = BytesMut::from(buf);
342        let _ = bytes_mut.extract_to(align);
343        bytes_mut[..raw.len()].copy_from_slice(&raw);
344        let aligned_bytes = bytes_mut.extract_to(raw.len());
345
346        let column: Column<i32> = Column::from_bytes(aligned_bytes);
347        assert!(matches!(column, Column::Bytes(_)));
348        assert_eq!(
349            column.borrow().into_index_iter().collect::<Vec<_>>(),
350            vec![&1, &2, &3]
351        );
352
353        let buf = vec![0; raw.len() + 8];
354        let align = buf.as_ptr().align_offset(std::mem::size_of::<u64>());
355        let mut bytes_mut = BytesMut::from(buf);
356        let _ = bytes_mut.extract_to(align + 1);
357        bytes_mut[..raw.len()].copy_from_slice(&raw);
358        let unaligned_bytes = bytes_mut.extract_to(raw.len());
359
360        let column: Column<i32> = Column::from_bytes(unaligned_bytes);
361        assert!(matches!(column, Column::Align(_)));
362        assert_eq!(
363            column.borrow().into_index_iter().collect::<Vec<_>>(),
364            vec![&1, &2, &3]
365        );
366    }
367}