Skip to main content

mz_timely_util/
columnar.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Container for columnar data.
17
18#![deny(missing_docs)]
19
20pub mod batcher;
21pub mod builder;
22pub mod consolidate;
23
24use std::hash::Hash;
25
26use columnar::Borrow;
27use columnar::bytes::indexed;
28use columnar::common::IterOwn;
29use columnar::{Clear, FromBytes, Index, Len};
30use columnar::{Columnar, Ref};
31use differential_dataflow::Hashable;
32use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher;
33use timely::Accountable;
34use timely::bytes::arc::Bytes;
35use timely::container::{DrainContainer, PushInto, SizableContainer};
36use timely::dataflow::channels::ContainerBytes;
37
38use crate::columnation::{ColInternalMerger, ColumnationStack};
39
40/// A batcher for columnar storage.
41pub type Col2ValBatcher<K, V, T, R> = MergeBatcher<
42    Column<((K, V), T, R)>,
43    batcher::Chunker<ColumnationStack<((K, V), T, R)>>,
44    ColInternalMerger<(K, V), T, R>,
45>;
46/// A batcher for columnar storage with unit values.
47pub type Col2KeyBatcher<K, T, R> = Col2ValBatcher<K, (), T, R>;
48
49/// A container based on a columnar store, encoded in aligned bytes.
50///
51/// The type can represent typed data, bytes from Timely, or an aligned allocation. The name
52/// is singular to express that the preferred format is [`Column::Align`]. The [`Column::Typed`]
53/// variant is used to construct the container, and it owns potentially multiple columns of data.
54pub enum Column<C: Columnar> {
55    /// The typed variant of the container.
56    Typed(C::Container),
57    /// The binary variant of the container.
58    Bytes(Bytes),
59    /// Relocated, aligned binary data, if `Bytes` doesn't work for some reason.
60    ///
61    /// Reasons could include misalignment, cloning of data, or wanting
62    /// to release the `Bytes` as a scarce resource.
63    ///
64    /// `Vec<u64>` guarantees `u64` alignment for the contained bytes.
65    Align(Vec<u64>),
66}
67
68impl<C: Columnar> Column<C> {
69    /// Empties the column, retaining the `Typed` variant's allocation so the
70    /// caller can refill it.
71    ///
72    /// [`columnar::Clear`] clears the typed container in place without
73    /// releasing its capacity. The serialized variants (`Bytes`/`Align`) own
74    /// no reusable typed buffer, so they are reset to an empty `Typed`.
75    #[inline]
76    pub fn clear(&mut self) {
77        match self {
78            Column::Typed(t) => t.clear(),
79            Column::Bytes(_) | Column::Align(_) => *self = Default::default(),
80        }
81    }
82
83    /// Borrows the container as a reference.
84    #[inline]
85    pub fn borrow(&self) -> <C::Container as Borrow>::Borrowed<'_> {
86        match self {
87            Column::Typed(t) => t.borrow(),
88            Column::Bytes(b) => <<C::Container as Borrow>::Borrowed<'_>>::from_bytes(
89                &mut indexed::decode(bytemuck::cast_slice(b)),
90            ),
91            Column::Align(a) => {
92                <<C::Container as Borrow>::Borrowed<'_>>::from_bytes(&mut indexed::decode(a))
93            }
94        }
95    }
96}
97
98impl<C: Columnar> Default for Column<C> {
99    fn default() -> Self {
100        Self::Typed(Default::default())
101    }
102}
103
104impl<C: Columnar> Clone for Column<C>
105where
106    C::Container: Clone,
107{
108    fn clone(&self) -> Self {
109        match self {
110            // Typed stays typed, although we would have the option to move to aligned data.
111            // If we did it might be confusing why we couldn't push into a cloned column.
112            Column::Typed(t) => Column::Typed(t.clone()),
113            Column::Bytes(b) => {
114                assert_eq!(b.len() % 8, 0);
115                Self::Align(bytemuck::allocation::pod_collect_to_vec(b))
116            }
117            Column::Align(a) => Column::Align(a.clone()),
118        }
119    }
120}
121
122impl<C: Columnar> Accountable for Column<C> {
123    #[inline]
124    fn record_count(&self) -> i64 {
125        self.borrow().len().try_into().expect("Must fit")
126    }
127}
128impl<C: Columnar> DrainContainer for Column<C> {
129    type Item<'a> = Ref<'a, C>;
130    type DrainIter<'a> = IterOwn<<C::Container as Borrow>::Borrowed<'a>>;
131    #[inline]
132    fn drain(&mut self) -> Self::DrainIter<'_> {
133        self.borrow().into_index_iter()
134    }
135}
136
137impl<C: Columnar, T> PushInto<T> for Column<C>
138where
139    C::Container: columnar::Push<T>,
140{
141    #[inline]
142    fn push_into(&mut self, item: T) {
143        use columnar::Push;
144        match self {
145            Column::Typed(t) => t.push(item),
146            Column::Align(_) | Column::Bytes(_) => {
147                // We really oughtn't be calling this in this case.
148                // We could convert to owned, but need more constraints on `C`.
149                unimplemented!("Pushing into Column::Bytes without first clearing");
150            }
151        }
152    }
153}
154
155/// Words per 2 MiB. `length_in_words` returns serialized size in `u64` units,
156/// so this is the page count we round up to. Picked to match
157/// [`builder::ColumnBuilder`]'s output granularity so chunks shipped from the
158/// merger and chunks shipped from the builder are sized comparably.
159const SHIP_WORDS: usize = 1 << 18;
160
161/// Returns true once the serialized size of `borrow` is within 10% of the next
162/// `SHIP_WORDS` boundary.
163///
164/// Same heuristic as `ColumnBuilder::push_into`; lifted out so the merger and
165/// the `SizableContainer` impl agree on the ship signal.
166#[inline]
167pub(crate) fn at_serialized_capacity<'a, A>(borrow: &A) -> bool
168where
169    A: columnar::AsBytes<'a>,
170{
171    let words = indexed::length_in_words(borrow);
172    let round = (words + (SHIP_WORDS - 1)) & !(SHIP_WORDS - 1);
173    round - words < round / 10
174}
175
176impl<C: Columnar> SizableContainer for Column<C> {
177    fn at_capacity(&self) -> bool {
178        // Match `ColumnBuilder`'s ship heuristic: serialized size within 10%
179        // of the next 2 MiB. Aligns chunk-size choices across the two paths
180        // and keeps recipients dealing with a single granularity.
181        //
182        // Serialized chunks (`Bytes` / `Align`) have no typed builder to push
183        // into, so they're trivially "at capacity" — there's no further work
184        // they can absorb.
185        match self {
186            Column::Typed(c) => at_serialized_capacity(&c.borrow()),
187            Column::Bytes(_) | Column::Align(_) => true,
188        }
189    }
190
191    fn ensure_capacity(&mut self, _stash: &mut Option<Self>) {
192        // No pre-reservation: chunks are recycled by the merge framework, so
193        // leaf capacities settle to steady-state after the first round and
194        // there is nothing useful to reserve up front. The `SizableContainer`
195        // impl exists so `at_capacity` is callable on result chunks during
196        // `Merger::merge` orchestration; `ensure_capacity` is a required
197        // method on the trait but has no work to do here.
198    }
199}
200
201impl<C: Columnar> ContainerBytes for Column<C> {
202    #[inline]
203    fn from_bytes(bytes: Bytes) -> Self {
204        // Our expectation / hope is that `bytes` is `u64` aligned and sized.
205        // If the alignment is borked, we can relocate. If the size is borked,
206        // not sure what we do in that case. An incorrect size indicates a problem
207        // of `into_bytes`, or a failure of the communication layer, both of which
208        // are unrecoverable.
209        assert_eq!(bytes.len() % 8, 0);
210        if let Ok(_) = bytemuck::try_cast_slice::<_, u64>(&bytes) {
211            Self::Bytes(bytes)
212        } else {
213            // We failed to cast the slice, so we'll reallocate. `Vec<u64>`
214            // is u64-aligned by construction.
215            Self::Align(bytemuck::allocation::pod_collect_to_vec(&bytes[..]))
216        }
217    }
218
219    #[inline]
220    fn length_in_bytes(&self) -> usize {
221        match self {
222            Column::Typed(t) => indexed::length_in_bytes(&t.borrow()),
223            Column::Bytes(b) => b.len(),
224            Column::Align(a) => 8 * a.len(),
225        }
226    }
227
228    #[inline]
229    fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
230        match self {
231            Column::Typed(t) => indexed::write(writer, &t.borrow()).unwrap(),
232            Column::Bytes(b) => writer.write_all(b).unwrap(),
233            Column::Align(a) => writer.write_all(bytemuck::cast_slice(a)).unwrap(),
234        }
235    }
236}
237
238/// An exchange function for columnar tuples of the form `((K, V), T, D)`. Rust has a hard
239/// time to figure out the lifetimes of the elements when specified as a closure, so we rather
240/// specify it as a function.
241#[inline(always)]
242pub fn columnar_exchange<K, V, T, D>(((k, _), _, _): &Ref<'_, ((K, V), T, D)>) -> u64
243where
244    K: Columnar,
245    for<'a> Ref<'a, K>: Hash,
246    V: Columnar,
247    D: Columnar,
248    T: Columnar,
249{
250    k.hashed()
251}
252
253#[cfg(test)]
254mod tests {
255    use timely::bytes::arc::BytesMut;
256    use timely::container::PushInto;
257    use timely::dataflow::channels::ContainerBytes;
258
259    use super::*;
260
261    /// Produce some bytes that are in columnar format.
262    fn raw_columnar_bytes() -> Vec<u8> {
263        let mut raw = Vec::new();
264        raw.extend(16_u64.to_le_bytes()); // offsets
265        raw.extend(28_u64.to_le_bytes()); // length
266        raw.extend(1_i32.to_le_bytes());
267        raw.extend(2_i32.to_le_bytes());
268        raw.extend(3_i32.to_le_bytes());
269        raw.extend([0, 0, 0, 0]); // padding
270        raw
271    }
272
273    #[mz_ore::test]
274    fn test_column_clone() {
275        let columns = Columnar::as_columns([1, 2, 3].iter());
276        let column_typed: Column<i32> = Column::Typed(columns);
277        let column_typed2 = column_typed.clone();
278
279        assert_eq!(
280            column_typed2.borrow().into_index_iter().collect::<Vec<_>>(),
281            vec![&1, &2, &3]
282        );
283
284        let bytes = BytesMut::from(raw_columnar_bytes()).freeze();
285        let column_bytes: Column<i32> = Column::Bytes(bytes);
286        let column_bytes2 = column_bytes.clone();
287
288        assert_eq!(
289            column_bytes2.borrow().into_index_iter().collect::<Vec<_>>(),
290            vec![&1, &2, &3]
291        );
292
293        let raw = raw_columnar_bytes();
294        let mut region: Vec<u64> = vec![0; raw.len() / 8];
295        let region_bytes = bytemuck::cast_slice_mut(&mut region[..]);
296        region_bytes[..raw.len()].copy_from_slice(&raw);
297        let column_align: Column<i32> = Column::Align(region);
298        let column_align2 = column_align.clone();
299
300        assert_eq!(
301            column_align2.borrow().into_index_iter().collect::<Vec<_>>(),
302            vec![&1, &2, &3]
303        );
304    }
305
306    /// Assert the desired contents of raw_columnar_bytes so that diagnosing test failures is
307    /// easier.
308    #[mz_ore::test]
309    fn test_column_known_bytes() {
310        let mut column: Column<i32> = Default::default();
311        column.push_into(1);
312        column.push_into(2);
313        column.push_into(3);
314        let mut data = Vec::new();
315        column.into_bytes(&mut std::io::Cursor::new(&mut data));
316        assert_eq!(data, raw_columnar_bytes());
317    }
318
319    #[mz_ore::test]
320    fn test_column_from_bytes() {
321        let raw = raw_columnar_bytes();
322
323        let buf = vec![0; raw.len() + 8];
324        let align = buf.as_ptr().align_offset(std::mem::size_of::<u64>());
325        let mut bytes_mut = BytesMut::from(buf);
326        let _ = bytes_mut.extract_to(align);
327        bytes_mut[..raw.len()].copy_from_slice(&raw);
328        let aligned_bytes = bytes_mut.extract_to(raw.len());
329
330        let column: Column<i32> = Column::from_bytes(aligned_bytes);
331        assert!(matches!(column, Column::Bytes(_)));
332        assert_eq!(
333            column.borrow().into_index_iter().collect::<Vec<_>>(),
334            vec![&1, &2, &3]
335        );
336
337        let buf = vec![0; raw.len() + 8];
338        let align = buf.as_ptr().align_offset(std::mem::size_of::<u64>());
339        let mut bytes_mut = BytesMut::from(buf);
340        let _ = bytes_mut.extract_to(align + 1);
341        bytes_mut[..raw.len()].copy_from_slice(&raw);
342        let unaligned_bytes = bytes_mut.extract_to(raw.len());
343
344        let column: Column<i32> = Column::from_bytes(unaligned_bytes);
345        assert!(matches!(column, Column::Align(_)));
346        assert_eq!(
347            column.borrow().into_index_iter().collect::<Vec<_>>(),
348            vec![&1, &2, &3]
349        );
350    }
351}