mz_timely_util/
columnar.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Container for columnar data.
17
18#![deny(missing_docs)]
19
20pub mod batcher;
21pub mod builder;
22
23use std::hash::Hash;
24
25use columnar::Container as _;
26use columnar::bytes::{EncodeDecode, Indexed};
27use columnar::common::IterOwn;
28use columnar::{Clear, FromBytes, Index, Len};
29use columnar::{Columnar, Ref};
30use differential_dataflow::Hashable;
31use differential_dataflow::containers::TimelyStack;
32use differential_dataflow::trace::implementations::merge_batcher::{ColMerger, MergeBatcher};
33use mz_ore::region::Region;
34use timely::Container;
35use timely::bytes::arc::Bytes;
36use timely::container::PushInto;
37use timely::dataflow::channels::ContainerBytes;
38
39/// A batcher for columnar storage.
40pub type Col2ValBatcher<K, V, T, R> = MergeBatcher<
41    Column<((K, V), T, R)>,
42    batcher::Chunker<TimelyStack<((K, V), T, R)>>,
43    ColMerger<(K, V), T, R>,
44>;
45/// A batcher for columnar storage with unit values.
46pub type Col2KeyBatcher<K, T, R> = Col2ValBatcher<K, (), T, R>;
47
48/// A container based on a columnar store, encoded in aligned bytes.
49///
50/// The type can represent typed data, bytes from Timely, or an aligned allocation. The name
51/// is singular to express that the preferred format is [`Column::Align`]. The [`Column::Typed`]
52/// variant is used to construct the container, and it owns potentially multiple columns of data.
53pub enum Column<C: Columnar> {
54    /// The typed variant of the container.
55    Typed(C::Container),
56    /// The binary variant of the container.
57    Bytes(Bytes),
58    /// Relocated, aligned binary data, if `Bytes` doesn't work for some reason.
59    ///
60    /// Reasons could include misalignment, cloning of data, or wanting
61    /// to release the `Bytes` as a scarce resource.
62    Align(Region<u64>),
63}
64
65impl<C: Columnar> Column<C> {
66    /// Borrows the container as a reference.
67    #[inline]
68    fn borrow(&self) -> <C::Container as columnar::Container>::Borrowed<'_> {
69        match self {
70            Column::Typed(t) => t.borrow(),
71            Column::Bytes(b) => <<C::Container as columnar::Container>::Borrowed<'_>>::from_bytes(
72                &mut Indexed::decode(bytemuck::cast_slice(b)),
73            ),
74            Column::Align(a) => <<C::Container as columnar::Container>::Borrowed<'_>>::from_bytes(
75                &mut Indexed::decode(a),
76            ),
77        }
78    }
79}
80
81impl<C: Columnar> Default for Column<C> {
82    fn default() -> Self {
83        Self::Typed(Default::default())
84    }
85}
86
87impl<C: Columnar> Clone for Column<C>
88where
89    C::Container: Clone,
90{
91    fn clone(&self) -> Self {
92        match self {
93            // Typed stays typed, although we would have the option to move to aligned data.
94            // If we did it might be confusing why we couldn't push into a cloned column.
95            Column::Typed(t) => Column::Typed(t.clone()),
96            Column::Bytes(b) => {
97                assert_eq!(b.len() % 8, 0);
98                let mut alloc: Region<u64> = crate::containers::alloc_aligned_zeroed(b.len() / 8);
99                let alloc_bytes = bytemuck::cast_slice_mut(&mut alloc);
100                alloc_bytes[..b.len()].copy_from_slice(b);
101                Self::Align(alloc)
102            }
103            Column::Align(a) => {
104                let mut alloc = crate::containers::alloc_aligned_zeroed(a.len());
105                alloc[..a.len()].copy_from_slice(a);
106                Column::Align(alloc)
107            }
108        }
109    }
110}
111
112impl<C: Columnar> Container for Column<C> {
113    type ItemRef<'a> = columnar::Ref<'a, C>;
114    type Item<'a> = columnar::Ref<'a, C>;
115
116    #[inline]
117    fn len(&self) -> usize {
118        self.borrow().len()
119    }
120
121    // This sets the `Bytes` variant to be an empty `Typed` variant, appropriate for pushing into.
122    #[inline]
123    fn clear(&mut self) {
124        match self {
125            Column::Typed(t) => t.clear(),
126            Column::Bytes(_) | Column::Align(_) => *self = Column::Typed(Default::default()),
127        }
128    }
129
130    type Iter<'a> = IterOwn<<C::Container as columnar::Container>::Borrowed<'a>>;
131
132    #[inline]
133    fn iter(&self) -> Self::Iter<'_> {
134        self.borrow().into_index_iter()
135    }
136
137    type DrainIter<'a> = IterOwn<<C::Container as columnar::Container>::Borrowed<'a>>;
138
139    #[inline]
140    fn drain(&mut self) -> Self::DrainIter<'_> {
141        self.borrow().into_index_iter()
142    }
143}
144
145impl<C: Columnar, T> PushInto<T> for Column<C>
146where
147    C::Container: columnar::Push<T>,
148{
149    #[inline]
150    fn push_into(&mut self, item: T) {
151        use columnar::Push;
152        match self {
153            Column::Typed(t) => t.push(item),
154            Column::Align(_) | Column::Bytes(_) => {
155                // We really oughtn't be calling this in this case.
156                // We could convert to owned, but need more constraints on `C`.
157                unimplemented!("Pushing into Column::Bytes without first clearing");
158            }
159        }
160    }
161}
162
163impl<C: Columnar> ContainerBytes for Column<C> {
164    #[inline]
165    fn from_bytes(bytes: Bytes) -> Self {
166        // Our expectation / hope is that `bytes` is `u64` aligned and sized.
167        // If the alignment is borked, we can relocate. If the size is borked,
168        // not sure what we do in that case. An incorrect size indicates a problem
169        // of `into_bytes`, or a failure of the communication layer, both of which
170        // are unrecoverable.
171        assert_eq!(bytes.len() % 8, 0);
172        if let Ok(_) = bytemuck::try_cast_slice::<_, u64>(&bytes) {
173            Self::Bytes(bytes)
174        } else {
175            // We failed to cast the slice, so we'll reallocate.
176            let mut alloc: Region<u64> = crate::containers::alloc_aligned_zeroed(bytes.len() / 8);
177            let alloc_bytes = bytemuck::cast_slice_mut(&mut alloc);
178            alloc_bytes[..bytes.len()].copy_from_slice(&bytes);
179            Self::Align(alloc)
180        }
181    }
182
183    #[inline]
184    fn length_in_bytes(&self) -> usize {
185        match self {
186            Column::Typed(t) => Indexed::length_in_bytes(&t.borrow()),
187            Column::Bytes(b) => b.len(),
188            Column::Align(a) => 8 * a.len(),
189        }
190    }
191
192    #[inline]
193    fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
194        match self {
195            Column::Typed(t) => Indexed::write(writer, &t.borrow()).unwrap(),
196            Column::Bytes(b) => writer.write_all(b).unwrap(),
197            Column::Align(a) => writer.write_all(bytemuck::cast_slice(a)).unwrap(),
198        }
199    }
200}
201
202/// An exchange function for columnar tuples of the form `((K, V), T, D)`. Rust has a hard
203/// time to figure out the lifetimes of the elements when specified as a closure, so we rather
204/// specify it as a function.
205#[inline(always)]
206pub fn columnar_exchange<K, V, T, D>(((k, _), _, _): &Ref<'_, ((K, V), T, D)>) -> u64
207where
208    K: Columnar,
209    for<'a> Ref<'a, K>: Hash,
210    V: Columnar,
211    D: Columnar,
212    T: Columnar,
213{
214    k.hashed()
215}
216
217#[cfg(test)]
218mod tests {
219    use mz_ore::region::Region;
220    use timely::Container;
221    use timely::bytes::arc::BytesMut;
222    use timely::container::PushInto;
223    use timely::dataflow::channels::ContainerBytes;
224
225    use super::*;
226
227    /// Produce some bytes that are in columnar format.
228    fn raw_columnar_bytes() -> Vec<u8> {
229        let mut raw = Vec::new();
230        raw.extend(16_u64.to_le_bytes()); // offsets
231        raw.extend(28_u64.to_le_bytes()); // length
232        raw.extend(1_i32.to_le_bytes());
233        raw.extend(2_i32.to_le_bytes());
234        raw.extend(3_i32.to_le_bytes());
235        raw.extend([0, 0, 0, 0]); // padding
236        raw
237    }
238
239    #[mz_ore::test]
240    fn test_column_clone() {
241        let columns = Columnar::as_columns([1, 2, 3].iter());
242        let column_typed: Column<i32> = Column::Typed(columns);
243        let column_typed2 = column_typed.clone();
244
245        assert_eq!(column_typed2.iter().collect::<Vec<_>>(), vec![&1, &2, &3]);
246
247        let bytes = BytesMut::from(raw_columnar_bytes()).freeze();
248        let column_bytes: Column<i32> = Column::Bytes(bytes);
249        let column_bytes2 = column_bytes.clone();
250
251        assert_eq!(column_bytes2.iter().collect::<Vec<_>>(), vec![&1, &2, &3]);
252
253        let raw = raw_columnar_bytes();
254        let mut region: Region<u64> = crate::containers::alloc_aligned_zeroed(raw.len() / 8);
255        let region_bytes = bytemuck::cast_slice_mut(&mut region);
256        region_bytes[..raw.len()].copy_from_slice(&raw);
257        let column_align: Column<i32> = Column::Align(region);
258        let column_align2 = column_align.clone();
259
260        assert_eq!(column_align2.iter().collect::<Vec<_>>(), vec![&1, &2, &3]);
261    }
262
263    /// Assert the desired contents of raw_columnar_bytes so that diagnosing test failures is
264    /// easier.
265    #[mz_ore::test]
266    fn test_column_known_bytes() {
267        let mut column: Column<i32> = Default::default();
268        column.push_into(1);
269        column.push_into(2);
270        column.push_into(3);
271        let mut data = Vec::new();
272        column.into_bytes(&mut std::io::Cursor::new(&mut data));
273        assert_eq!(data, raw_columnar_bytes());
274    }
275
276    #[mz_ore::test]
277    fn test_column_from_bytes() {
278        let raw = raw_columnar_bytes();
279
280        let buf = vec![0; raw.len() + 8];
281        let align = buf.as_ptr().align_offset(std::mem::size_of::<u64>());
282        let mut bytes_mut = BytesMut::from(buf);
283        let _ = bytes_mut.extract_to(align);
284        bytes_mut[..raw.len()].copy_from_slice(&raw);
285        let aligned_bytes = bytes_mut.extract_to(raw.len());
286
287        let column: Column<i32> = Column::from_bytes(aligned_bytes);
288        assert!(matches!(column, Column::Bytes(_)));
289        assert_eq!(column.iter().collect::<Vec<_>>(), vec![&1, &2, &3]);
290
291        let buf = vec![0; raw.len() + 8];
292        let align = buf.as_ptr().align_offset(std::mem::size_of::<u64>());
293        let mut bytes_mut = BytesMut::from(buf);
294        let _ = bytes_mut.extract_to(align + 1);
295        bytes_mut[..raw.len()].copy_from_slice(&raw);
296        let unaligned_bytes = bytes_mut.extract_to(raw.len());
297
298        let column: Column<i32> = Column::from_bytes(unaligned_bytes);
299        assert!(matches!(column, Column::Align(_)));
300        assert_eq!(column.iter().collect::<Vec<_>>(), vec![&1, &2, &3]);
301    }
302}