mz_timely_util/columnar/
batcher.rs1use std::collections::VecDeque;
19
20use columnar::{Columnar, Index, Len};
21use columnation::Columnation;
22use differential_dataflow::containers::TimelyStack;
23use differential_dataflow::difference::Semigroup;
24use timely::Container;
25use timely::container::{ContainerBuilder, PushInto};
26
27use crate::columnar::Column;
28
29#[derive(Default)]
31pub struct Chunker<C> {
32 target: C,
37 ready: VecDeque<C>,
39}
40
41impl<C: Container + Clone + 'static> ContainerBuilder for Chunker<C> {
42 type Container = C;
43
44 fn extract(&mut self) -> Option<&mut Self::Container> {
45 if let Some(ready) = self.ready.pop_front() {
46 self.target = ready;
47 Some(&mut self.target)
48 } else {
49 None
50 }
51 }
52
53 fn finish(&mut self) -> Option<&mut Self::Container> {
54 self.extract()
55 }
56}
57
58impl<'a, D, T, R> PushInto<&'a mut Column<(D, T, R)>> for Chunker<TimelyStack<(D, T, R)>>
59where
60 D: Columnar + Columnation,
61 for<'b> columnar::Ref<'b, D>: Ord + Copy,
62 T: Columnar + Columnation,
63 for<'b> columnar::Ref<'b, T>: Ord + Copy,
64 R: Columnar + Columnation + Semigroup + for<'b> Semigroup<columnar::Ref<'b, R>>,
65 for<'b> columnar::Ref<'b, R>: Ord,
66 {
68 fn push_into(&mut self, container: &'a mut Column<(D, T, R)>) {
69 let borrowed = container.borrow();
72 let mut permutation = Vec::with_capacity(borrowed.len());
73 permutation.extend(borrowed.into_index_iter());
74 permutation.sort();
75
76 self.target.clear();
77 let mut iter = permutation.drain(..);
79 if let Some((data, time, diff)) = iter.next() {
80 let mut owned_data = D::into_owned(data);
81 let mut owned_time = T::into_owned(time);
82
83 let mut prev_data = data;
84 let mut prev_time = time;
85 let mut prev_diff = <R as Columnar>::into_owned(diff);
86
87 for (data, time, diff) in iter {
88 if (&prev_data, &prev_time) == (&data, &time) {
89 prev_diff.plus_equals(&diff);
90 } else {
91 if !prev_diff.is_zero() {
92 D::copy_from(&mut owned_data, prev_data);
93 T::copy_from(&mut owned_time, prev_time);
94 let tuple = (owned_data, owned_time, prev_diff);
95 self.target.push_into(&tuple);
96 (owned_data, owned_time, prev_diff) = tuple;
97 }
98 prev_data = data;
99 prev_time = time;
100 R::copy_from(&mut prev_diff, diff);
101 }
102 }
103
104 if !prev_diff.is_zero() {
105 D::copy_from(&mut owned_data, prev_data);
106 T::copy_from(&mut owned_time, prev_time);
107 let tuple = (owned_data, owned_time, prev_diff);
108 self.target.push_into(&tuple);
109 }
110 }
111
112 if !self.target.is_empty() {
113 self.ready.push_back(std::mem::take(&mut self.target));
114 }
115 }
116}