Skip to main content

mz_timely_util/
columnar.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Container for columnar data.
17
18#![deny(missing_docs)]
19
20pub mod batcher;
21pub mod builder;
22
23use std::hash::Hash;
24
25use columnar::Borrow;
26use columnar::bytes::indexed;
27use columnar::common::IterOwn;
28use columnar::{Columnar, Ref};
29use columnar::{FromBytes, Index, Len};
30use differential_dataflow::Hashable;
31use differential_dataflow::containers::TimelyStack;
32use differential_dataflow::trace::implementations::merge_batcher::MergeBatcher;
33use differential_dataflow::trace::implementations::merge_batcher::container::ColInternalMerger;
34use mz_ore::region::Region;
35use timely::Accountable;
36use timely::bytes::arc::Bytes;
37use timely::container::{DrainContainer, PushInto};
38use timely::dataflow::channels::ContainerBytes;
39
40/// A batcher for columnar storage.
41pub type Col2ValBatcher<K, V, T, R> = MergeBatcher<
42    Column<((K, V), T, R)>,
43    batcher::Chunker<TimelyStack<((K, V), T, R)>>,
44    ColInternalMerger<(K, V), T, R>,
45>;
46/// A batcher for columnar storage with unit values.
47pub type Col2KeyBatcher<K, T, R> = Col2ValBatcher<K, (), T, R>;
48
49/// A container based on a columnar store, encoded in aligned bytes.
50///
51/// The type can represent typed data, bytes from Timely, or an aligned allocation. The name
52/// is singular to express that the preferred format is [`Column::Align`]. The [`Column::Typed`]
53/// variant is used to construct the container, and it owns potentially multiple columns of data.
54pub enum Column<C: Columnar> {
55    /// The typed variant of the container.
56    Typed(C::Container),
57    /// The binary variant of the container.
58    Bytes(Bytes),
59    /// Relocated, aligned binary data, if `Bytes` doesn't work for some reason.
60    ///
61    /// Reasons could include misalignment, cloning of data, or wanting
62    /// to release the `Bytes` as a scarce resource.
63    Align(Region<u64>),
64}
65
66impl<C: Columnar> Column<C> {
67    /// Borrows the container as a reference.
68    #[inline]
69    pub fn borrow(&self) -> <C::Container as Borrow>::Borrowed<'_> {
70        match self {
71            Column::Typed(t) => t.borrow(),
72            Column::Bytes(b) => <<C::Container as Borrow>::Borrowed<'_>>::from_bytes(
73                &mut indexed::decode(bytemuck::cast_slice(b)),
74            ),
75            Column::Align(a) => {
76                <<C::Container as Borrow>::Borrowed<'_>>::from_bytes(&mut indexed::decode(a))
77            }
78        }
79    }
80}
81
82impl<C: Columnar> Default for Column<C> {
83    fn default() -> Self {
84        Self::Typed(Default::default())
85    }
86}
87
88impl<C: Columnar> Clone for Column<C>
89where
90    C::Container: Clone,
91{
92    fn clone(&self) -> Self {
93        match self {
94            // Typed stays typed, although we would have the option to move to aligned data.
95            // If we did it might be confusing why we couldn't push into a cloned column.
96            Column::Typed(t) => Column::Typed(t.clone()),
97            Column::Bytes(b) => {
98                assert_eq!(b.len() % 8, 0);
99                let mut alloc: Region<u64> = crate::containers::alloc_aligned_zeroed(b.len() / 8);
100                let alloc_bytes = bytemuck::cast_slice_mut(&mut alloc);
101                alloc_bytes[..b.len()].copy_from_slice(b);
102                Self::Align(alloc)
103            }
104            Column::Align(a) => {
105                let mut alloc = crate::containers::alloc_aligned_zeroed(a.len());
106                alloc[..a.len()].copy_from_slice(a);
107                Column::Align(alloc)
108            }
109        }
110    }
111}
112
113impl<C: Columnar> Accountable for Column<C> {
114    #[inline]
115    fn record_count(&self) -> i64 {
116        self.borrow().len().try_into().expect("Must fit")
117    }
118}
119impl<C: Columnar> DrainContainer for Column<C> {
120    type Item<'a> = Ref<'a, C>;
121    type DrainIter<'a> = IterOwn<<C::Container as Borrow>::Borrowed<'a>>;
122    #[inline]
123    fn drain(&mut self) -> Self::DrainIter<'_> {
124        self.borrow().into_index_iter()
125    }
126}
127
128impl<C: Columnar, T> PushInto<T> for Column<C>
129where
130    C::Container: columnar::Push<T>,
131{
132    #[inline]
133    fn push_into(&mut self, item: T) {
134        use columnar::Push;
135        match self {
136            Column::Typed(t) => t.push(item),
137            Column::Align(_) | Column::Bytes(_) => {
138                // We really oughtn't be calling this in this case.
139                // We could convert to owned, but need more constraints on `C`.
140                unimplemented!("Pushing into Column::Bytes without first clearing");
141            }
142        }
143    }
144}
145
146impl<C: Columnar> ContainerBytes for Column<C> {
147    #[inline]
148    fn from_bytes(bytes: Bytes) -> Self {
149        // Our expectation / hope is that `bytes` is `u64` aligned and sized.
150        // If the alignment is borked, we can relocate. If the size is borked,
151        // not sure what we do in that case. An incorrect size indicates a problem
152        // of `into_bytes`, or a failure of the communication layer, both of which
153        // are unrecoverable.
154        assert_eq!(bytes.len() % 8, 0);
155        if let Ok(_) = bytemuck::try_cast_slice::<_, u64>(&bytes) {
156            Self::Bytes(bytes)
157        } else {
158            // We failed to cast the slice, so we'll reallocate.
159            let mut alloc: Region<u64> = crate::containers::alloc_aligned_zeroed(bytes.len() / 8);
160            let alloc_bytes = bytemuck::cast_slice_mut(&mut alloc);
161            alloc_bytes[..bytes.len()].copy_from_slice(&bytes);
162            Self::Align(alloc)
163        }
164    }
165
166    #[inline]
167    fn length_in_bytes(&self) -> usize {
168        match self {
169            Column::Typed(t) => indexed::length_in_bytes(&t.borrow()),
170            Column::Bytes(b) => b.len(),
171            Column::Align(a) => 8 * a.len(),
172        }
173    }
174
175    #[inline]
176    fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
177        match self {
178            Column::Typed(t) => indexed::write(writer, &t.borrow()).unwrap(),
179            Column::Bytes(b) => writer.write_all(b).unwrap(),
180            Column::Align(a) => writer.write_all(bytemuck::cast_slice(a)).unwrap(),
181        }
182    }
183}
184
185/// An exchange function for columnar tuples of the form `((K, V), T, D)`. Rust has a hard
186/// time to figure out the lifetimes of the elements when specified as a closure, so we rather
187/// specify it as a function.
188#[inline(always)]
189pub fn columnar_exchange<K, V, T, D>(((k, _), _, _): &Ref<'_, ((K, V), T, D)>) -> u64
190where
191    K: Columnar,
192    for<'a> Ref<'a, K>: Hash,
193    V: Columnar,
194    D: Columnar,
195    T: Columnar,
196{
197    k.hashed()
198}
199
200#[cfg(test)]
201mod tests {
202    use mz_ore::region::Region;
203    use timely::bytes::arc::BytesMut;
204    use timely::container::PushInto;
205    use timely::dataflow::channels::ContainerBytes;
206
207    use super::*;
208
209    /// Produce some bytes that are in columnar format.
210    fn raw_columnar_bytes() -> Vec<u8> {
211        let mut raw = Vec::new();
212        raw.extend(16_u64.to_le_bytes()); // offsets
213        raw.extend(28_u64.to_le_bytes()); // length
214        raw.extend(1_i32.to_le_bytes());
215        raw.extend(2_i32.to_le_bytes());
216        raw.extend(3_i32.to_le_bytes());
217        raw.extend([0, 0, 0, 0]); // padding
218        raw
219    }
220
221    #[mz_ore::test]
222    fn test_column_clone() {
223        let columns = Columnar::as_columns([1, 2, 3].iter());
224        let column_typed: Column<i32> = Column::Typed(columns);
225        let column_typed2 = column_typed.clone();
226
227        assert_eq!(
228            column_typed2.borrow().into_index_iter().collect::<Vec<_>>(),
229            vec![&1, &2, &3]
230        );
231
232        let bytes = BytesMut::from(raw_columnar_bytes()).freeze();
233        let column_bytes: Column<i32> = Column::Bytes(bytes);
234        let column_bytes2 = column_bytes.clone();
235
236        assert_eq!(
237            column_bytes2.borrow().into_index_iter().collect::<Vec<_>>(),
238            vec![&1, &2, &3]
239        );
240
241        let raw = raw_columnar_bytes();
242        let mut region: Region<u64> = crate::containers::alloc_aligned_zeroed(raw.len() / 8);
243        let region_bytes = bytemuck::cast_slice_mut(&mut region);
244        region_bytes[..raw.len()].copy_from_slice(&raw);
245        let column_align: Column<i32> = Column::Align(region);
246        let column_align2 = column_align.clone();
247
248        assert_eq!(
249            column_align2.borrow().into_index_iter().collect::<Vec<_>>(),
250            vec![&1, &2, &3]
251        );
252    }
253
254    /// Assert the desired contents of raw_columnar_bytes so that diagnosing test failures is
255    /// easier.
256    #[mz_ore::test]
257    fn test_column_known_bytes() {
258        let mut column: Column<i32> = Default::default();
259        column.push_into(1);
260        column.push_into(2);
261        column.push_into(3);
262        let mut data = Vec::new();
263        column.into_bytes(&mut std::io::Cursor::new(&mut data));
264        assert_eq!(data, raw_columnar_bytes());
265    }
266
267    #[mz_ore::test]
268    fn test_column_from_bytes() {
269        let raw = raw_columnar_bytes();
270
271        let buf = vec![0; raw.len() + 8];
272        let align = buf.as_ptr().align_offset(std::mem::size_of::<u64>());
273        let mut bytes_mut = BytesMut::from(buf);
274        let _ = bytes_mut.extract_to(align);
275        bytes_mut[..raw.len()].copy_from_slice(&raw);
276        let aligned_bytes = bytes_mut.extract_to(raw.len());
277
278        let column: Column<i32> = Column::from_bytes(aligned_bytes);
279        assert!(matches!(column, Column::Bytes(_)));
280        assert_eq!(
281            column.borrow().into_index_iter().collect::<Vec<_>>(),
282            vec![&1, &2, &3]
283        );
284
285        let buf = vec![0; raw.len() + 8];
286        let align = buf.as_ptr().align_offset(std::mem::size_of::<u64>());
287        let mut bytes_mut = BytesMut::from(buf);
288        let _ = bytes_mut.extract_to(align + 1);
289        bytes_mut[..raw.len()].copy_from_slice(&raw);
290        let unaligned_bytes = bytes_mut.extract_to(raw.len());
291
292        let column: Column<i32> = Column::from_bytes(unaligned_bytes);
293        assert!(matches!(column, Column::Align(_)));
294        assert_eq!(
295            column.borrow().into_index_iter().collect::<Vec<_>>(),
296            vec![&1, &2, &3]
297        );
298    }
299}