mz_compute/extensions/
reduce.rs1use columnation::Columnation;
17use differential_dataflow::Data;
18use differential_dataflow::difference::Abelian;
19use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
20use differential_dataflow::trace::implementations::{BatchContainer, LayoutExt};
21use differential_dataflow::trace::{Builder, Trace, TraceReader};
22use mz_timely_util::columnation::ColumnationStack;
23use timely::Container;
24use timely::container::PushInto;
25
26use crate::extensions::arrange::ArrangementSize;
27
28type KeyOwn<Tr> = <<Tr as LayoutExt>::KeyContainer as BatchContainer>::Owned;
29
30pub trait ClearContainer {
31 fn clear(&mut self);
32}
33
34impl<T> ClearContainer for Vec<T> {
35 fn clear(&mut self) {
36 Vec::clear(self)
37 }
38}
39
40impl<D, T, R> ClearContainer for ColumnationStack<(D, T, R)>
41where
42 D: Columnation + Clone + 'static,
43 T: Columnation + Clone + 'static,
44 R: Columnation + Clone + 'static,
45{
46 fn clear(&mut self) {
47 ColumnationStack::clear(self)
48 }
49}
50
51pub(crate) trait MzReduce<'scope, T1: TraceReader> {
53 fn mz_reduce_abelian<L, Bu, T2>(self, name: &str, logic: L) -> Arranged<'scope, TraceAgent<T2>>
55 where
56 T2: for<'a> Trace<Key<'a> = T1::Key<'a>, ValOwn: Data, Time = T1::Time, Diff: Abelian>
57 + 'static,
58 Bu: Builder<Time = T1::Time, Output = T2::Batch>,
59 Bu::Input: Container
60 + Default
61 + ClearContainer
62 + PushInto<((KeyOwn<T1>, T2::ValOwn), T2::Time, T2::Diff)>,
63 L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>)
64 + 'static,
65 Arranged<'scope, TraceAgent<T2>>: ArrangementSize;
66}
67
68impl<'scope, T1> MzReduce<'scope, T1> for Arranged<'scope, T1>
69where
70 T1: TraceReader + Clone + 'static,
71{
72 fn mz_reduce_abelian<L, Bu, T2>(self, name: &str, logic: L) -> Arranged<'scope, TraceAgent<T2>>
74 where
75 T2: for<'a> Trace<Key<'a> = T1::Key<'a>, ValOwn: Data, Time = T1::Time, Diff: Abelian>
76 + 'static,
77 Bu: Builder<
78 Time = T1::Time,
79 Input: Container
80 + Default
81 + ClearContainer
82 + PushInto<((KeyOwn<T1>, T2::ValOwn), T2::Time, T2::Diff)>,
83 Output = T2::Batch,
84 >,
85 L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>)
86 + 'static,
87 Arranged<'scope, TraceAgent<T2>>: ArrangementSize,
88 {
89 use differential_dataflow::trace::implementations::BatchContainer;
91 let push_closure =
92 |buf: &mut Bu::Input,
93 key: T1::Key<'_>,
94 updates: &mut Vec<(T2::ValOwn, T2::Time, T2::Diff)>| {
95 buf.clear();
96 let key_owned = <T1::KeyContainer as BatchContainer>::into_owned(key);
97 for (val, time, diff) in updates.drain(..) {
98 buf.push_into(((key_owned.clone(), val), time, diff));
99 }
100 };
101
102 #[allow(clippy::disallowed_methods)]
104 Arranged::<_>::reduce_abelian::<_, Bu, T2, _>(self, name, logic, push_closure)
105 .log_arrangement_size()
106 }
107}