Skip to main content

mz_timely_util/
columnation.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Columnation-based containers vendored from differential-dataflow.
17//!
18//! This module provides [`ColumnationStack`], a columnar container that stores data using the
19//! columnation library, along with [`ColumnationChunker`] for organizing streams into sorted
20//! chunks, and the [`InternalMerge`] implementation needed by the merge batcher.
21
22use std::collections::VecDeque;
23use std::iter::FromIterator;
24
25use columnation::{Columnation, Region};
26use differential_dataflow::consolidation::consolidate_updates;
27use differential_dataflow::difference::Semigroup;
28use differential_dataflow::lattice::Lattice;
29use differential_dataflow::trace::implementations::merge_batcher::container::InternalMerge;
30use differential_dataflow::trace::implementations::{BatchContainer, BuilderInput};
31use timely::container::{ContainerBuilder, DrainContainer, PushInto, SizableContainer};
32use timely::progress::Timestamp;
33use timely::progress::frontier::{Antichain, AntichainRef};
34use timely::{Accountable, PartialOrder};
35
36// ---------------------------------------------------------------------------
37// ColumnationStack
38// ---------------------------------------------------------------------------
39
40/// An append-only vector that stores records as columns.
41///
42/// This container maintains elements that might conventionally own
43/// memory allocations, but instead the pointers to those allocations
44/// reference larger regions of memory shared with multiple instances
45/// of the type. Elements can be retrieved as references, and care is
46/// taken when this type is dropped to ensure that the correct memory
47/// is returned (rather than the incorrect memory, from running the
48/// elements `Drop` implementations).
49pub struct ColumnationStack<T: Columnation> {
50    local: Vec<T>,
51    inner: T::InnerRegion,
52}
53
54impl<T: Columnation> ColumnationStack<T> {
55    /// Construct a [`ColumnationStack`], reserving space for `capacity` elements.
56    ///
57    /// Note that the associated region is not initialized to a specific capacity
58    /// because we can't generally know how much space would be required.
59    pub fn with_capacity(capacity: usize) -> Self {
60        Self {
61            local: Vec::with_capacity(capacity),
62            inner: T::InnerRegion::default(),
63        }
64    }
65
66    /// Ensures `Self` can absorb `items` without further allocations.
67    ///
68    /// The argument `items` may be cloned and iterated multiple times.
69    /// Please be careful if it contains side effects.
70    #[inline(always)]
71    pub fn reserve_items<'a, I>(&mut self, items: I)
72    where
73        I: Iterator<Item = &'a T> + Clone,
74        T: 'a,
75    {
76        self.local.reserve(items.clone().count());
77        self.inner.reserve_items(items);
78    }
79
80    /// Ensures `Self` can absorb `items` without further allocations.
81    ///
82    /// The argument `items` may be cloned and iterated multiple times.
83    /// Please be careful if it contains side effects.
84    #[inline(always)]
85    pub fn reserve_regions<'a, I>(&mut self, regions: I)
86    where
87        Self: 'a,
88        I: Iterator<Item = &'a Self> + Clone,
89    {
90        self.local
91            .reserve(regions.clone().map(|cs| cs.local.len()).sum());
92        self.inner.reserve_regions(regions.map(|cs| &cs.inner));
93    }
94
95    /// Copies an element into the region.
96    ///
97    /// The element can be read by indexing.
98    pub fn copy(&mut self, item: &T) {
99        unsafe {
100            self.local.push(self.inner.copy(item));
101        }
102    }
103
104    /// Empties the collection.
105    pub fn clear(&mut self) {
106        unsafe {
107            self.local.set_len(0);
108            self.inner.clear();
109        }
110    }
111
112    /// Retain elements that pass a predicate, from a specified offset.
113    ///
114    /// This method may or may not reclaim memory in the inner region.
115    pub fn retain_from<P: FnMut(&T) -> bool>(&mut self, index: usize, mut predicate: P) {
116        let mut write_position = index;
117        for position in index..self.local.len() {
118            if predicate(&self[position]) {
119                self.local.swap(position, write_position);
120                write_position += 1;
121            }
122        }
123        unsafe {
124            // Unsafety justified in that `write_position` is no greater than
125            // `self.local.len()` and so this exposes no invalid data.
126            self.local.set_len(write_position);
127        }
128    }
129
130    /// Unsafe access to `local` data. The slices store data that is backed by a region
131    /// allocation. Therefore, it is undefined behavior to mutate elements of the `local` slice.
132    ///
133    /// # Safety
134    /// Elements within `local` can be reordered, but not mutated, removed and/or dropped.
135    pub unsafe fn local(&mut self) -> &mut [T] {
136        &mut self.local[..]
137    }
138
139    /// Estimate the memory capacity in bytes.
140    #[inline]
141    pub fn heap_size(&self, mut callback: impl FnMut(usize, usize)) {
142        let size_of = std::mem::size_of::<T>();
143        callback(self.local.len() * size_of, self.local.capacity() * size_of);
144        self.inner.heap_size(callback);
145    }
146
147    /// Estimate the consumed memory capacity in bytes, summing both used and total capacity.
148    #[inline]
149    pub fn summed_heap_size(&self) -> (usize, usize) {
150        let (mut length, mut capacity) = (0, 0);
151        self.heap_size(|len, cap| {
152            length += len;
153            capacity += cap
154        });
155        (length, capacity)
156    }
157
158    /// The length in items.
159    #[inline]
160    pub fn len(&self) -> usize {
161        self.local.len()
162    }
163
164    /// Returns `true` if the stack is empty.
165    pub fn is_empty(&self) -> bool {
166        self.local.is_empty()
167    }
168
169    /// The capacity of the local vector.
170    #[inline]
171    pub fn capacity(&self) -> usize {
172        self.local.capacity()
173    }
174
175    /// Reserve space for `additional` elements.
176    #[inline]
177    pub fn reserve(&mut self, additional: usize) {
178        self.local.reserve(additional)
179    }
180}
181
182impl<A: Columnation, B: Columnation> ColumnationStack<(A, B)> {
183    /// Copies a destructured tuple `(A, B)` into this column stack.
184    pub fn copy_destructured(&mut self, t1: &A, t2: &B) {
185        unsafe {
186            self.local.push(self.inner.copy_destructured(t1, t2));
187        }
188    }
189}
190
191impl<A: Columnation, B: Columnation, C: Columnation> ColumnationStack<(A, B, C)> {
192    /// Copies a destructured tuple `(A, B, C)` into this column stack.
193    pub fn copy_destructured(&mut self, r0: &A, r1: &B, r2: &C) {
194        unsafe {
195            self.local.push(self.inner.copy_destructured(r0, r1, r2));
196        }
197    }
198}
199
200impl<T: Columnation> std::ops::Deref for ColumnationStack<T> {
201    type Target = [T];
202    #[inline(always)]
203    fn deref(&self) -> &Self::Target {
204        &self.local[..]
205    }
206}
207
208impl<T: Columnation> Drop for ColumnationStack<T> {
209    fn drop(&mut self) {
210        self.clear();
211    }
212}
213
214impl<T: Columnation> Default for ColumnationStack<T> {
215    fn default() -> Self {
216        Self {
217            local: Vec::new(),
218            inner: T::InnerRegion::default(),
219        }
220    }
221}
222
223impl<'a, A: 'a + Columnation> FromIterator<&'a A> for ColumnationStack<A> {
224    fn from_iter<I: IntoIterator<Item = &'a A>>(iter: I) -> Self {
225        let iter = iter.into_iter();
226        let mut c = ColumnationStack::<A>::with_capacity(iter.size_hint().0);
227        for element in iter {
228            c.copy(element);
229        }
230        c
231    }
232}
233
234impl<T: Columnation + PartialEq> PartialEq for ColumnationStack<T> {
235    fn eq(&self, other: &Self) -> bool {
236        PartialEq::eq(&self[..], &other[..])
237    }
238}
239
240impl<T: Columnation + Eq> Eq for ColumnationStack<T> {}
241
242impl<T: Columnation + std::fmt::Debug> std::fmt::Debug for ColumnationStack<T> {
243    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
244        self[..].fmt(f)
245    }
246}
247
248impl<T: Columnation> Clone for ColumnationStack<T> {
249    fn clone(&self) -> Self {
250        let mut new: Self = Default::default();
251        for item in &self[..] {
252            new.copy(item);
253        }
254        new
255    }
256
257    fn clone_from(&mut self, source: &Self) {
258        self.clear();
259        for item in &source[..] {
260            self.copy(item);
261        }
262    }
263}
264
265impl<T: Columnation> PushInto<T> for ColumnationStack<T> {
266    #[inline]
267    fn push_into(&mut self, item: T) {
268        self.copy(&item);
269    }
270}
271
272impl<T: Columnation> PushInto<&T> for ColumnationStack<T> {
273    #[inline]
274    fn push_into(&mut self, item: &T) {
275        self.copy(item);
276    }
277}
278
279impl<T: Columnation> PushInto<&&T> for ColumnationStack<T> {
280    #[inline]
281    fn push_into(&mut self, item: &&T) {
282        self.copy(*item);
283    }
284}
285
286// Container trait impls
287
288impl<T: Columnation> Accountable for ColumnationStack<T> {
289    #[inline]
290    fn record_count(&self) -> i64 {
291        i64::try_from(self.local.len()).unwrap()
292    }
293    #[inline]
294    fn is_empty(&self) -> bool {
295        self.local.is_empty()
296    }
297}
298
299impl<T: Columnation> DrainContainer for ColumnationStack<T> {
300    type Item<'a>
301        = &'a T
302    where
303        Self: 'a;
304    type DrainIter<'a>
305        = std::slice::Iter<'a, T>
306    where
307        Self: 'a;
308    #[inline]
309    fn drain(&mut self) -> Self::DrainIter<'_> {
310        (*self).iter()
311    }
312}
313
314impl<T: Columnation> SizableContainer for ColumnationStack<T> {
315    fn at_capacity(&self) -> bool {
316        self.len() == self.capacity()
317    }
318    fn ensure_capacity(&mut self, stash: &mut Option<Self>) {
319        if self.capacity() == 0 {
320            *self = stash.take().unwrap_or_default();
321            self.clear();
322        }
323        let preferred = timely::container::buffer::default_capacity::<T>();
324        if self.capacity() < preferred {
325            self.reserve(preferred - self.capacity());
326        }
327    }
328}
329
330impl<T: Clone + Ord + Columnation + 'static> BatchContainer for ColumnationStack<T> {
331    type Owned = T;
332    type ReadItem<'a> = &'a T;
333
334    #[inline(always)]
335    fn into_owned<'a>(item: Self::ReadItem<'a>) -> Self::Owned {
336        item.clone()
337    }
338    #[inline(always)]
339    fn clone_onto<'a>(item: Self::ReadItem<'a>, other: &mut Self::Owned) {
340        other.clone_from(item);
341    }
342
343    fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> {
344        item
345    }
346
347    fn push_ref(&mut self, item: Self::ReadItem<'_>) {
348        self.push_into(item)
349    }
350    fn push_own(&mut self, item: &Self::Owned) {
351        self.push_into(item)
352    }
353
354    fn clear(&mut self) {
355        self.clear()
356    }
357
358    fn with_capacity(size: usize) -> Self {
359        Self::with_capacity(size)
360    }
361    fn merge_capacity(cont1: &Self, cont2: &Self) -> Self {
362        let mut new = Self::default();
363        new.reserve_regions(std::iter::once(cont1).chain(std::iter::once(cont2)));
364        new
365    }
366    fn index(&self, index: usize) -> Self::ReadItem<'_> {
367        &self[index]
368    }
369    fn len(&self) -> usize {
370        self[..].len()
371    }
372}
373
374impl<K, V, T, R> BuilderInput<K, V> for ColumnationStack<((K::Owned, V::Owned), T, R)>
375where
376    K: for<'a> BatchContainer<
377            ReadItem<'a>: PartialEq<&'a K::Owned>,
378            Owned: Ord + Columnation + Clone + 'static,
379        >,
380    V: for<'a> BatchContainer<
381            ReadItem<'a>: PartialEq<&'a V::Owned>,
382            Owned: Ord + Columnation + Clone + 'static,
383        >,
384    T: Timestamp + Lattice + Columnation + Clone + 'static,
385    R: Ord + Clone + Semigroup + Columnation + 'static,
386{
387    type Key<'a> = &'a K::Owned;
388    type Val<'a> = &'a V::Owned;
389    type Time = T;
390    type Diff = R;
391
392    fn into_parts<'a>(
393        ((key, val), time, diff): Self::Item<'a>,
394    ) -> (Self::Key<'a>, Self::Val<'a>, Self::Time, Self::Diff) {
395        (key, val, time.clone(), diff.clone())
396    }
397
398    fn key_eq(this: &&K::Owned, other: K::ReadItem<'_>) -> bool {
399        K::reborrow(other) == *this
400    }
401
402    fn val_eq(this: &&V::Owned, other: V::ReadItem<'_>) -> bool {
403        V::reborrow(other) == *this
404    }
405
406    fn key_val_upd_counts(chain: &[Self]) -> (usize, usize, usize) {
407        let mut keys = 0;
408        let mut vals = 0;
409        let mut upds = 0;
410        let mut prev_keyval = None;
411        for link in chain.iter() {
412            for ((key, val), _, _) in link.iter() {
413                if let Some((p_key, p_val)) = prev_keyval {
414                    if p_key != key {
415                        keys += 1;
416                        vals += 1;
417                    } else if p_val != val {
418                        vals += 1;
419                    }
420                } else {
421                    keys += 1;
422                    vals += 1;
423                }
424                upds += 1;
425                prev_keyval = Some((key, val));
426            }
427        }
428        (keys, vals, upds)
429    }
430}
431
432// ---------------------------------------------------------------------------
433// ColumnationChunker
434// ---------------------------------------------------------------------------
435
436/// Chunk a stream of vectors into chains of columnation stacks.
437///
438/// This chunker accumulates into a `Vec` (not a `ColumnationStack`) for efficient
439/// in-place sorting and consolidation, then copies the consolidated results
440/// into `ColumnationStack` chunks. This avoids the cost of sorting through
441/// columnation indirection.
442pub struct ColumnationChunker<T: Columnation> {
443    pending: Vec<T>,
444    ready: VecDeque<ColumnationStack<T>>,
445    empty: Option<ColumnationStack<T>>,
446}
447
448impl<T: Columnation> Default for ColumnationChunker<T> {
449    fn default() -> Self {
450        Self {
451            pending: Vec::default(),
452            ready: VecDeque::default(),
453            empty: None,
454        }
455    }
456}
457
458impl<D, T, R> ColumnationChunker<(D, T, R)>
459where
460    D: Columnation + Ord,
461    T: Columnation + Ord,
462    R: Columnation + Semigroup,
463{
464    const BUFFER_SIZE_BYTES: usize = 64 << 10;
465
466    fn chunk_capacity() -> usize {
467        let size = std::mem::size_of::<(D, T, R)>();
468        if size == 0 {
469            Self::BUFFER_SIZE_BYTES
470        } else if size <= Self::BUFFER_SIZE_BYTES {
471            Self::BUFFER_SIZE_BYTES / size
472        } else {
473            1
474        }
475    }
476
477    fn form_chunk(&mut self) {
478        consolidate_updates(&mut self.pending);
479        if self.pending.len() >= Self::chunk_capacity() {
480            while self.pending.len() > Self::chunk_capacity() {
481                let mut chunk = ColumnationStack::with_capacity(Self::chunk_capacity());
482                for item in self.pending.drain(..chunk.capacity()) {
483                    chunk.copy(&item);
484                }
485                self.ready.push_back(chunk);
486            }
487        }
488    }
489}
490
491impl<'a, D, T, R> PushInto<&'a mut Vec<(D, T, R)>> for ColumnationChunker<(D, T, R)>
492where
493    D: Columnation + Ord + Clone,
494    T: Columnation + Ord + Clone,
495    R: Columnation + Semigroup + Clone,
496{
497    fn push_into(&mut self, container: &'a mut Vec<(D, T, R)>) {
498        if self.pending.capacity() < Self::chunk_capacity() * 2 {
499            self.pending
500                .reserve(Self::chunk_capacity() * 2 - self.pending.len());
501        }
502
503        let mut drain = container.drain(..).peekable();
504        while drain.peek().is_some() {
505            self.pending
506                .extend((&mut drain).take(self.pending.capacity() - self.pending.len()));
507            if self.pending.len() == self.pending.capacity() {
508                self.form_chunk();
509            }
510        }
511    }
512}
513
514impl<D, T, R> ContainerBuilder for ColumnationChunker<(D, T, R)>
515where
516    D: Columnation + Ord + Clone + 'static,
517    T: Columnation + Ord + Clone + 'static,
518    R: Columnation + Semigroup + Clone + 'static,
519{
520    type Container = ColumnationStack<(D, T, R)>;
521
522    fn extract(&mut self) -> Option<&mut Self::Container> {
523        if let Some(ready) = self.ready.pop_front() {
524            self.empty = Some(ready);
525            self.empty.as_mut()
526        } else {
527            None
528        }
529    }
530
531    fn finish(&mut self) -> Option<&mut Self::Container> {
532        consolidate_updates(&mut self.pending);
533        while !self.pending.is_empty() {
534            let mut chunk = ColumnationStack::with_capacity(Self::chunk_capacity());
535            for item in self
536                .pending
537                .drain(..std::cmp::min(self.pending.len(), chunk.capacity()))
538            {
539                chunk.copy(&item);
540            }
541            self.ready.push_back(chunk);
542        }
543        self.empty = self.ready.pop_front();
544        self.empty.as_mut()
545    }
546}
547
548// ---------------------------------------------------------------------------
549// InternalMerge impl for ColumnationStack
550// ---------------------------------------------------------------------------
551
552impl<D, T, R> InternalMerge for ColumnationStack<(D, T, R)>
553where
554    D: Ord + Columnation + Clone + 'static,
555    T: Ord + Columnation + Clone + PartialOrder + 'static,
556    R: Default + Semigroup + Columnation + Clone + 'static,
557{
558    type TimeOwned = T;
559
560    fn len(&self) -> usize {
561        self[..].len()
562    }
563
564    fn clear(&mut self) {
565        ColumnationStack::clear(self)
566    }
567
568    fn account(&self) -> (usize, usize, usize, usize) {
569        let (mut size, mut capacity, mut allocations) = (0, 0, 0);
570        let cb = |siz, cap| {
571            size += siz;
572            capacity += cap;
573            allocations += 1;
574        };
575        self.heap_size(cb);
576        (self.len(), size, capacity, allocations)
577    }
578
579    fn merge_from(&mut self, others: &mut [Self], positions: &mut [usize]) {
580        use std::cmp::Ordering;
581        match others.len() {
582            0 => {}
583            1 => {
584                let other = &mut others[0];
585                let pos = &mut positions[0];
586                if self[..].is_empty() && *pos == 0 {
587                    std::mem::swap(self, other);
588                    return;
589                }
590                for i in *pos..other[..].len() {
591                    self.copy(&other[i]);
592                }
593                *pos = other[..].len();
594            }
595            2 => {
596                let (left, right) = others.split_at_mut(1);
597                let other1 = &left[0];
598                let other2 = &right[0];
599
600                let mut stash = R::default();
601
602                while positions[0] < other1[..].len()
603                    && positions[1] < other2[..].len()
604                    && !self.at_capacity()
605                {
606                    let (d1, t1, _) = &other1[positions[0]];
607                    let (d2, t2, _) = &other2[positions[1]];
608                    match (d1, t1).cmp(&(d2, t2)) {
609                        Ordering::Less => {
610                            self.copy(&other1[positions[0]]);
611                            positions[0] += 1;
612                        }
613                        Ordering::Greater => {
614                            self.copy(&other2[positions[1]]);
615                            positions[1] += 1;
616                        }
617                        Ordering::Equal => {
618                            let (_, _, r1) = &other1[positions[0]];
619                            let (_, _, r2) = &other2[positions[1]];
620                            stash.clone_from(r1);
621                            stash.plus_equals(r2);
622                            if !stash.is_zero() {
623                                let (d, t, _) = &other1[positions[0]];
624                                self.copy_destructured(d, t, &stash);
625                            }
626                            positions[0] += 1;
627                            positions[1] += 1;
628                        }
629                    }
630                }
631            }
632            n => unimplemented!("{n}-way merge not yet supported"),
633        }
634    }
635
636    fn extract(
637        &mut self,
638        position: &mut usize,
639        upper: AntichainRef<T>,
640        frontier: &mut Antichain<T>,
641        keep: &mut Self,
642        ship: &mut Self,
643    ) {
644        let len = self[..].len();
645        while *position < len && !keep.at_capacity() && !ship.at_capacity() {
646            let (data, time, diff) = &self[*position];
647            if upper.less_equal(time) {
648                frontier.insert_with(time, |time| time.clone());
649                keep.copy_destructured(data, time, diff);
650            } else {
651                ship.copy_destructured(data, time, diff);
652            }
653            *position += 1;
654        }
655    }
656}
657
658// ---------------------------------------------------------------------------
659// ColInternalMerger type alias
660// ---------------------------------------------------------------------------
661
662/// A `Merger` using internal iteration for `ColumnationStack` containers.
663pub type ColInternalMerger<D, T, R> =
664    differential_dataflow::trace::implementations::merge_batcher::InternalMerger<
665        ColumnationStack<(D, T, R)>,
666    >;