mz_timely_util/
containers.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Reusable containers.
17
18use std::hash::Hash;
19
20use columnar::Columnar;
21use differential_dataflow::Hashable;
22use differential_dataflow::containers::TimelyStack;
23use differential_dataflow::trace::implementations::merge_batcher::{ColMerger, MergeBatcher};
24
25pub mod stack;
26
27pub(crate) use alloc::alloc_aligned_zeroed;
28pub use alloc::{enable_columnar_lgalloc, set_enable_columnar_lgalloc};
29pub use builder::ColumnBuilder;
30pub use container::Column;
31pub use provided_builder::ProvidedBuilder;
32
33mod alloc {
34    use mz_ore::region::Region;
35
36    /// Allocate a region of memory with a capacity of at least `len` that is properly aligned
37    /// and zeroed. The memory in Regions is always aligned to its content type.
38    #[inline]
39    pub(crate) fn alloc_aligned_zeroed<T: bytemuck::AnyBitPattern>(len: usize) -> Region<T> {
40        if enable_columnar_lgalloc() {
41            Region::new_auto_zeroed(len)
42        } else {
43            Region::new_heap_zeroed(len)
44        }
45    }
46
47    thread_local! {
48        static ENABLE_COLUMNAR_LGALLOC: std::cell::Cell<bool> = const { std::cell::Cell::new(false) };
49    }
50
51    /// Returns `true` if columnar allocations should come from lgalloc.
52    #[inline]
53    pub fn enable_columnar_lgalloc() -> bool {
54        ENABLE_COLUMNAR_LGALLOC.get()
55    }
56
57    /// Set whether columnar allocations should come from lgalloc. Applies to future allocations.
58    pub fn set_enable_columnar_lgalloc(enabled: bool) {
59        ENABLE_COLUMNAR_LGALLOC.set(enabled);
60    }
61}
62
63mod container {
64    use columnar::Columnar;
65    use columnar::Container as _;
66    use columnar::bytes::{EncodeDecode, Sequence};
67    use columnar::common::IterOwn;
68    use columnar::{Clear, FromBytes, Index, Len};
69    use mz_ore::region::Region;
70    use timely::Container;
71    use timely::bytes::arc::Bytes;
72    use timely::container::PushInto;
73    use timely::dataflow::channels::ContainerBytes;
74
75    /// A container based on a columnar store, encoded in aligned bytes.
76    ///
77    /// The type can represent typed data, bytes from Timely, or an aligned allocation. The name
78    /// is singular to express that the preferred format is [`Column::Align`]. The [`Column::Typed`]
79    /// variant is used to construct the container, and it owns potentially multiple columns of data.
80    pub enum Column<C: Columnar> {
81        /// The typed variant of the container.
82        Typed(C::Container),
83        /// The binary variant of the container.
84        Bytes(Bytes),
85        /// Relocated, aligned binary data, if `Bytes` doesn't work for some reason.
86        ///
87        /// Reasons could include misalignment, cloning of data, or wanting
88        /// to release the `Bytes` as a scarce resource.
89        Align(Region<u64>),
90    }
91
92    impl<C: Columnar> Column<C> {
93        /// Borrows the container as a reference.
94        fn borrow(&self) -> <C::Container as columnar::Container<C>>::Borrowed<'_> {
95            match self {
96                Column::Typed(t) => t.borrow(),
97                Column::Bytes(b) => {
98                    <<C::Container as columnar::Container<C>>::Borrowed<'_>>::from_bytes(
99                        &mut Sequence::decode(bytemuck::cast_slice(b)),
100                    )
101                }
102                Column::Align(a) => {
103                    <<C::Container as columnar::Container<C>>::Borrowed<'_>>::from_bytes(
104                        &mut Sequence::decode(a),
105                    )
106                }
107            }
108        }
109    }
110
111    impl<C: Columnar> Default for Column<C> {
112        fn default() -> Self {
113            Self::Typed(Default::default())
114        }
115    }
116
117    impl<C: Columnar> Clone for Column<C>
118    where
119        C::Container: Clone,
120    {
121        fn clone(&self) -> Self {
122            match self {
123                // Typed stays typed, although we would have the option to move to aligned data.
124                // If we did it might be confusing why we couldn't push into a cloned column.
125                Column::Typed(t) => Column::Typed(t.clone()),
126                Column::Bytes(b) => {
127                    assert_eq!(b.len() % 8, 0);
128                    let mut alloc: Region<u64> = super::alloc_aligned_zeroed(b.len() / 8);
129                    let alloc_bytes = bytemuck::cast_slice_mut(&mut alloc);
130                    alloc_bytes[..b.len()].copy_from_slice(b);
131                    Self::Align(alloc)
132                }
133                Column::Align(a) => {
134                    let mut alloc = super::alloc_aligned_zeroed(a.len());
135                    alloc[..a.len()].copy_from_slice(a);
136                    Column::Align(alloc)
137                }
138            }
139        }
140    }
141
142    impl<C: Columnar> Container for Column<C> {
143        type ItemRef<'a> = C::Ref<'a>;
144        type Item<'a> = C::Ref<'a>;
145
146        fn len(&self) -> usize {
147            self.borrow().len()
148        }
149
150        // This sets the `Bytes` variant to be an empty `Typed` variant, appropriate for pushing into.
151        fn clear(&mut self) {
152            match self {
153                Column::Typed(t) => t.clear(),
154                Column::Bytes(_) | Column::Align(_) => *self = Column::Typed(Default::default()),
155            }
156        }
157
158        type Iter<'a> = IterOwn<<C::Container as columnar::Container<C>>::Borrowed<'a>>;
159
160        fn iter(&self) -> Self::Iter<'_> {
161            self.borrow().into_index_iter()
162        }
163
164        type DrainIter<'a> = IterOwn<<C::Container as columnar::Container<C>>::Borrowed<'a>>;
165
166        fn drain(&mut self) -> Self::DrainIter<'_> {
167            self.borrow().into_index_iter()
168        }
169    }
170
171    impl<C: Columnar, T> PushInto<T> for Column<C>
172    where
173        C::Container: columnar::Push<T>,
174    {
175        #[inline]
176        fn push_into(&mut self, item: T) {
177            use columnar::Push;
178            match self {
179                Column::Typed(t) => t.push(item),
180                Column::Align(_) | Column::Bytes(_) => {
181                    // We really oughtn't be calling this in this case.
182                    // We could convert to owned, but need more constraints on `C`.
183                    unimplemented!("Pushing into Column::Bytes without first clearing");
184                }
185            }
186        }
187    }
188
189    impl<C: Columnar> ContainerBytes for Column<C> {
190        fn from_bytes(bytes: Bytes) -> Self {
191            // Our expectation / hope is that `bytes` is `u64` aligned and sized.
192            // If the alignment is borked, we can relocate. If the size is borked,
193            // not sure what we do in that case. An incorrect size indicates a problem
194            // of `into_bytes`, or a failure of the communication layer, both of which
195            // are unrecoverable.
196            assert_eq!(bytes.len() % 8, 0);
197            if let Ok(_) = bytemuck::try_cast_slice::<_, u64>(&bytes) {
198                Self::Bytes(bytes)
199            } else {
200                // We failed to cast the slice, so we'll reallocate.
201                let mut alloc: Region<u64> = super::alloc_aligned_zeroed(bytes.len() / 8);
202                let alloc_bytes = bytemuck::cast_slice_mut(&mut alloc);
203                alloc_bytes[..bytes.len()].copy_from_slice(&bytes);
204                Self::Align(alloc)
205            }
206        }
207
208        fn length_in_bytes(&self) -> usize {
209            match self {
210                Column::Typed(t) => Sequence::length_in_bytes(&t.borrow()),
211                Column::Bytes(b) => b.len(),
212                Column::Align(a) => 8 * a.len(),
213            }
214        }
215
216        fn into_bytes<W: ::std::io::Write>(&self, writer: &mut W) {
217            match self {
218                Column::Typed(t) => Sequence::write(writer, &t.borrow()).unwrap(),
219                Column::Bytes(b) => writer.write_all(b).unwrap(),
220                Column::Align(a) => writer.write_all(bytemuck::cast_slice(a)).unwrap(),
221            }
222        }
223    }
224}
225
226mod builder {
227    use std::collections::VecDeque;
228
229    use columnar::bytes::{EncodeDecode, Sequence};
230    use columnar::{Clear, Columnar, Len, Push};
231    use timely::container::PushInto;
232    use timely::container::{ContainerBuilder, LengthPreservingContainerBuilder};
233
234    use crate::containers::Column;
235
236    /// A container builder for `Column<C>`.
237    pub struct ColumnBuilder<C: Columnar> {
238        /// Container that we're writing to.
239        current: C::Container,
240        /// Finished container that we presented to callers of extract/finish.
241        ///
242        /// We don't recycle the column because for extract, it's not typed, and after calls
243        /// to finish it'll be `None`.
244        finished: Option<Column<C>>,
245        /// Completed containers pending to be sent.
246        pending: VecDeque<Column<C>>,
247    }
248
249    impl<C: Columnar, T> PushInto<T> for ColumnBuilder<C>
250    where
251        C::Container: Push<T>,
252    {
253        #[inline]
254        fn push_into(&mut self, item: T) {
255            self.current.push(item);
256            // If there is less than 10% slop with 2MB backing allocations, mint a container.
257            use columnar::Container;
258            let words = Sequence::length_in_words(&self.current.borrow());
259            let round = (words + ((1 << 18) - 1)) & !((1 << 18) - 1);
260            if round - words < round / 10 {
261                /// Move the contents from `current` to an aligned allocation, and push it to `pending`.
262                /// The contents must fit in `round` words (u64).
263                #[cold]
264                fn outlined_align<C>(
265                    current: &mut C::Container,
266                    round: usize,
267                    pending: &mut VecDeque<Column<C>>,
268                ) where
269                    C: Columnar,
270                {
271                    let mut alloc = super::alloc_aligned_zeroed(round);
272                    let writer = std::io::Cursor::new(bytemuck::cast_slice_mut(&mut alloc[..]));
273                    Sequence::write(writer, &current.borrow()).unwrap();
274                    pending.push_back(Column::Align(alloc));
275                    current.clear();
276                }
277
278                outlined_align(&mut self.current, round, &mut self.pending);
279            }
280        }
281    }
282
283    impl<C: Columnar> Default for ColumnBuilder<C> {
284        fn default() -> Self {
285            ColumnBuilder {
286                current: Default::default(),
287                finished: None,
288                pending: Default::default(),
289            }
290        }
291    }
292
293    impl<C: Columnar> ContainerBuilder for ColumnBuilder<C>
294    where
295        C::Container: Clone,
296    {
297        type Container = Column<C>;
298
299        #[inline]
300        fn extract(&mut self) -> Option<&mut Self::Container> {
301            if let Some(container) = self.pending.pop_front() {
302                self.finished = Some(container);
303                self.finished.as_mut()
304            } else {
305                None
306            }
307        }
308
309        #[inline]
310        fn finish(&mut self) -> Option<&mut Self::Container> {
311            if !self.current.is_empty() {
312                self.pending
313                    .push_back(Column::Typed(std::mem::take(&mut self.current)));
314            }
315            self.finished = self.pending.pop_front();
316            self.finished.as_mut()
317        }
318    }
319
320    impl<C: Columnar> LengthPreservingContainerBuilder for ColumnBuilder<C> where C::Container: Clone {}
321}
322
323/// A batcher for columnar storage.
324pub type Col2ValBatcher<K, V, T, R> = MergeBatcher<
325    Column<((K, V), T, R)>,
326    batcher::Chunker<TimelyStack<((K, V), T, R)>>,
327    ColMerger<(K, V), T, R>,
328>;
329pub type Col2KeyBatcher<K, T, R> = Col2ValBatcher<K, (), T, R>;
330
331/// An exchange function for columnar tuples of the form `((K, V), T, D)`. Rust has a hard
332/// time to figure out the lifetimes of the elements when specified as a closure, so we rather
333/// specify it as a function.
334#[inline(always)]
335pub fn columnar_exchange<K, V, T, D>(((k, _), _, _): &<((K, V), T, D) as Columnar>::Ref<'_>) -> u64
336where
337    K: Columnar,
338    for<'a> K::Ref<'a>: Hash,
339    V: Columnar,
340    D: Columnar,
341    T: Columnar,
342{
343    k.hashed()
344}
345
346/// Types for consolidating, merging, and extracting columnar update collections.
347pub mod batcher {
348    use std::collections::VecDeque;
349
350    use columnar::Columnar;
351    use differential_dataflow::difference::Semigroup;
352    use timely::Container;
353    use timely::container::{ContainerBuilder, PushInto};
354
355    use crate::containers::Column;
356
357    #[derive(Default)]
358    pub struct Chunker<C> {
359        /// Buffer into which we'll consolidate.
360        ///
361        /// Also the buffer where we'll stage responses to `extract` and `finish`.
362        /// When these calls return, the buffer is available for reuse.
363        target: C,
364        /// Consolidated buffers ready to go.
365        ready: VecDeque<C>,
366    }
367
368    impl<C: Container + Clone + 'static> ContainerBuilder for Chunker<C> {
369        type Container = C;
370
371        fn extract(&mut self) -> Option<&mut Self::Container> {
372            if let Some(ready) = self.ready.pop_front() {
373                self.target = ready;
374                Some(&mut self.target)
375            } else {
376                None
377            }
378        }
379
380        fn finish(&mut self) -> Option<&mut Self::Container> {
381            self.extract()
382        }
383    }
384
385    impl<'a, D, T, R, C2> PushInto<&'a mut Column<(D, T, R)>> for Chunker<C2>
386    where
387        D: Columnar,
388        for<'b> D::Ref<'b>: Ord + Copy,
389        T: Columnar,
390        for<'b> T::Ref<'b>: Ord + Copy,
391        R: Columnar + Semigroup + for<'b> Semigroup<R::Ref<'b>>,
392        for<'b> R::Ref<'b>: Ord,
393        C2: Container + for<'b> PushInto<&'b (D, T, R)>,
394    {
395        fn push_into(&mut self, container: &'a mut Column<(D, T, R)>) {
396            // Sort input data
397            // TODO: consider `Vec<usize>` that we retain, containing indexes.
398            let mut permutation = Vec::with_capacity(container.len());
399            permutation.extend(container.drain());
400            permutation.sort();
401
402            self.target.clear();
403            // Iterate over the data, accumulating diffs for like keys.
404            let mut iter = permutation.drain(..);
405            if let Some((data, time, diff)) = iter.next() {
406                let mut owned_data = D::into_owned(data);
407                let mut owned_time = T::into_owned(time);
408
409                let mut prev_data = data;
410                let mut prev_time = time;
411                let mut prev_diff = <R as Columnar>::into_owned(diff);
412
413                for (data, time, diff) in iter {
414                    if (&prev_data, &prev_time) == (&data, &time) {
415                        prev_diff.plus_equals(&diff);
416                    } else {
417                        if !prev_diff.is_zero() {
418                            D::copy_from(&mut owned_data, prev_data);
419                            T::copy_from(&mut owned_time, prev_time);
420                            let tuple = (owned_data, owned_time, prev_diff);
421                            self.target.push_into(&tuple);
422                            (owned_data, owned_time, prev_diff) = tuple;
423                        }
424                        prev_data = data;
425                        prev_time = time;
426                        R::copy_from(&mut prev_diff, diff);
427                    }
428                }
429
430                if !prev_diff.is_zero() {
431                    D::copy_from(&mut owned_data, prev_data);
432                    T::copy_from(&mut owned_time, prev_time);
433                    let tuple = (owned_data, owned_time, prev_diff);
434                    self.target.push_into(&tuple);
435                }
436            }
437
438            if !self.target.is_empty() {
439                self.ready.push_back(std::mem::take(&mut self.target));
440            }
441        }
442    }
443}
444
445mod provided_builder {
446    use timely::Container;
447    use timely::container::ContainerBuilder;
448
449    /// A container builder that doesn't support pushing elements, and is only suitable for pushing
450    /// whole containers at Timely sessions. See [`give_container`] for more information.
451    ///
452    ///  [`give_container`]: timely::dataflow::channels::pushers::buffer::Session::give_container
453    pub struct ProvidedBuilder<C> {
454        _marker: std::marker::PhantomData<C>,
455    }
456
457    impl<C> Default for ProvidedBuilder<C> {
458        fn default() -> Self {
459            Self {
460                _marker: std::marker::PhantomData,
461            }
462        }
463    }
464
465    impl<C: Container + Clone + 'static> ContainerBuilder for ProvidedBuilder<C> {
466        type Container = C;
467
468        #[inline(always)]
469        fn extract(&mut self) -> Option<&mut Self::Container> {
470            None
471        }
472
473        #[inline(always)]
474        fn finish(&mut self) -> Option<&mut Self::Container> {
475            None
476        }
477    }
478}
479
480#[cfg(test)]
481mod tests {
482    use mz_ore::region::Region;
483    use timely::Container;
484    use timely::bytes::arc::BytesMut;
485    use timely::dataflow::channels::ContainerBytes;
486
487    use super::*;
488
489    /// Produce some bytes that are in columnar format.
490    fn raw_columnar_bytes() -> Vec<u8> {
491        let mut raw = Vec::new();
492        raw.extend(12_u64.to_le_bytes()); // length
493        raw.extend(1_i32.to_le_bytes());
494        raw.extend(2_i32.to_le_bytes());
495        raw.extend(3_i32.to_le_bytes());
496        raw.extend([0, 0, 0, 0]); // padding
497        raw
498    }
499
500    #[mz_ore::test]
501    fn test_column_clone() {
502        let columns = Columnar::as_columns([1, 2, 3].iter());
503        let column_typed: Column<i32> = Column::Typed(columns);
504        let column_typed2 = column_typed.clone();
505
506        assert_eq!(column_typed2.iter().collect::<Vec<_>>(), vec![&1, &2, &3]);
507
508        let bytes = BytesMut::from(raw_columnar_bytes()).freeze();
509        let column_bytes: Column<i32> = Column::Bytes(bytes);
510        let column_bytes2 = column_bytes.clone();
511
512        assert_eq!(column_bytes2.iter().collect::<Vec<_>>(), vec![&1, &2, &3]);
513
514        let raw = raw_columnar_bytes();
515        let mut region: Region<u64> = alloc_aligned_zeroed(raw.len() / 8);
516        let region_bytes = bytemuck::cast_slice_mut(&mut region);
517        region_bytes[..raw.len()].copy_from_slice(&raw);
518        let column_align: Column<i32> = Column::Align(region);
519        let column_align2 = column_align.clone();
520
521        assert_eq!(column_align2.iter().collect::<Vec<_>>(), vec![&1, &2, &3]);
522    }
523
524    #[mz_ore::test]
525    fn test_column_from_bytes() {
526        let raw = raw_columnar_bytes();
527
528        let buf = vec![0; raw.len() + 8];
529        let align = buf.as_ptr().align_offset(std::mem::size_of::<u64>());
530        let mut bytes_mut = BytesMut::from(buf);
531        let _ = bytes_mut.extract_to(align);
532        bytes_mut[..raw.len()].copy_from_slice(&raw);
533        let aligned_bytes = bytes_mut.extract_to(raw.len());
534
535        let column: Column<i32> = Column::from_bytes(aligned_bytes);
536        assert!(matches!(column, Column::Bytes(_)));
537        assert_eq!(column.iter().collect::<Vec<_>>(), vec![&1, &2, &3]);
538
539        let buf = vec![0; raw.len() + 8];
540        let align = buf.as_ptr().align_offset(std::mem::size_of::<u64>());
541        let mut bytes_mut = BytesMut::from(buf);
542        let _ = bytes_mut.extract_to(align + 1);
543        bytes_mut[..raw.len()].copy_from_slice(&raw);
544        let unaligned_bytes = bytes_mut.extract_to(raw.len());
545
546        let column: Column<i32> = Column::from_bytes(unaligned_bytes);
547        assert!(matches!(column, Column::Align(_)));
548        assert_eq!(column.iter().collect::<Vec<_>>(), vec![&1, &2, &3]);
549    }
550}