mz_timely_util/columnar/
batcher.rs1use std::collections::VecDeque;
19
20use columnar::Columnar;
21use differential_dataflow::difference::Semigroup;
22use timely::Container;
23use timely::container::{ContainerBuilder, PushInto};
24
25use crate::columnar::Column;
26
27#[derive(Default)]
29pub struct Chunker<C> {
30 target: C,
35 ready: VecDeque<C>,
37}
38
39impl<C: Container + Clone + 'static> ContainerBuilder for Chunker<C> {
40 type Container = C;
41
42 fn extract(&mut self) -> Option<&mut Self::Container> {
43 if let Some(ready) = self.ready.pop_front() {
44 self.target = ready;
45 Some(&mut self.target)
46 } else {
47 None
48 }
49 }
50
51 fn finish(&mut self) -> Option<&mut Self::Container> {
52 self.extract()
53 }
54}
55
56impl<'a, D, T, R, C2> PushInto<&'a mut Column<(D, T, R)>> for Chunker<C2>
57where
58 D: Columnar,
59 for<'b> columnar::Ref<'b, D>: Ord + Copy,
60 T: Columnar,
61 for<'b> columnar::Ref<'b, T>: Ord + Copy,
62 R: Columnar + Semigroup + for<'b> Semigroup<columnar::Ref<'b, R>>,
63 for<'b> columnar::Ref<'b, R>: Ord,
64 C2: Container + for<'b> PushInto<&'b (D, T, R)>,
65{
66 fn push_into(&mut self, container: &'a mut Column<(D, T, R)>) {
67 let mut permutation = Vec::with_capacity(container.len());
70 permutation.extend(container.drain());
71 permutation.sort();
72
73 self.target.clear();
74 let mut iter = permutation.drain(..);
76 if let Some((data, time, diff)) = iter.next() {
77 let mut owned_data = D::into_owned(data);
78 let mut owned_time = T::into_owned(time);
79
80 let mut prev_data = data;
81 let mut prev_time = time;
82 let mut prev_diff = <R as Columnar>::into_owned(diff);
83
84 for (data, time, diff) in iter {
85 if (&prev_data, &prev_time) == (&data, &time) {
86 prev_diff.plus_equals(&diff);
87 } else {
88 if !prev_diff.is_zero() {
89 D::copy_from(&mut owned_data, prev_data);
90 T::copy_from(&mut owned_time, prev_time);
91 let tuple = (owned_data, owned_time, prev_diff);
92 self.target.push_into(&tuple);
93 (owned_data, owned_time, prev_diff) = tuple;
94 }
95 prev_data = data;
96 prev_time = time;
97 R::copy_from(&mut prev_diff, diff);
98 }
99 }
100
101 if !prev_diff.is_zero() {
102 D::copy_from(&mut owned_data, prev_data);
103 T::copy_from(&mut owned_time, prev_time);
104 let tuple = (owned_data, owned_time, prev_diff);
105 self.target.push_into(&tuple);
106 }
107 }
108
109 if !self.target.is_empty() {
110 self.ready.push_back(std::mem::take(&mut self.target));
111 }
112 }
113}