1use differential_dataflow::Data;
17use differential_dataflow::difference::Abelian;
18use differential_dataflow::lattice::Lattice;
19use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
20use differential_dataflow::trace::implementations::merge_batcher::container::MergerChunk;
21use differential_dataflow::trace::{Builder, Trace, TraceReader};
22use timely::Container;
23use timely::container::PushInto;
24use timely::dataflow::Scope;
25
26use crate::extensions::arrange::ArrangementSize;
27
28pub(crate) trait MzReduce<G: Scope, T1: TraceReader<Time = G::Timestamp>>
30where
31 G::Timestamp: Lattice,
32{
33 fn mz_reduce_abelian<L, Bu, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
35 where
36 T2: for<'a> Trace<
37 Key<'a> = T1::Key<'a>,
38 KeyOwn = T1::KeyOwn,
39 ValOwn: Data,
40 Time = G::Timestamp,
41 Diff: Abelian,
42 > + 'static,
43 Bu: Builder<Time = G::Timestamp, Output = T2::Batch>,
44 Bu::Input:
45 Container + MergerChunk + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>,
46 L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>)
47 + 'static,
48 Arranged<G, TraceAgent<T2>>: ArrangementSize;
49}
50
51impl<G, T1> MzReduce<G, T1> for Arranged<G, T1>
52where
53 G: Scope,
54 G::Timestamp: Lattice,
55 T1: TraceReader<Time = G::Timestamp, KeyOwn: Ord> + Clone + 'static,
56{
57 fn mz_reduce_abelian<L, Bu, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
59 where
60 T2: for<'a> Trace<
61 Key<'a> = T1::Key<'a>,
62 KeyOwn = T1::KeyOwn,
63 ValOwn: Data,
64 Time = G::Timestamp,
65 Diff: Abelian,
66 > + 'static,
67 Bu: Builder<
68 Time = G::Timestamp,
69 Input: Container
70 + MergerChunk
71 + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>,
72 Output = T2::Batch,
73 >,
74 L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>)
75 + 'static,
76 Arranged<G, TraceAgent<T2>>: ArrangementSize,
77 {
78 #[allow(clippy::disallowed_methods)]
80 Arranged::<_, _>::reduce_abelian::<_, Bu, T2>(self, name, logic).log_arrangement_size()
81 }
82}
83
84pub trait ReduceExt<G, Tr>
87where
88 G: Scope,
89 G::Timestamp: Lattice,
90 Tr: TraceReader<Time = G::Timestamp>,
91{
92 fn reduce_pair<L1, Bu1, T1, L2, Bu2, T2>(
98 &self,
99 name1: &str,
100 name2: &str,
101 logic1: L1,
102 logic2: L2,
103 ) -> (Arranged<G, TraceAgent<T1>>, Arranged<G, TraceAgent<T2>>)
104 where
105 T1: for<'a> Trace<
106 Key<'a> = Tr::Key<'a>,
107 KeyOwn = Tr::KeyOwn,
108 ValOwn: Data,
109 Time = G::Timestamp,
110 Diff: Abelian,
111 > + 'static,
112 Bu1: Builder<
113 Time = G::Timestamp,
114 Input: Container
115 + MergerChunk
116 + PushInto<((T1::KeyOwn, T1::ValOwn), T1::Time, T1::Diff)>,
117 Output = T1::Batch,
118 >,
119 L1: FnMut(Tr::Key<'_>, &[(Tr::Val<'_>, Tr::Diff)], &mut Vec<(T1::ValOwn, T1::Diff)>)
120 + 'static,
121 T2: for<'a> Trace<
122 Key<'a> = Tr::Key<'a>,
123 KeyOwn = Tr::KeyOwn,
124 ValOwn: Data,
125 Time = G::Timestamp,
126 Diff: Abelian,
127 > + 'static,
128 Bu2: Builder<
129 Time = G::Timestamp,
130 Input: Container
131 + MergerChunk
132 + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>,
133 Output = T2::Batch,
134 >,
135 L2: FnMut(Tr::Key<'_>, &[(Tr::Val<'_>, Tr::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>)
136 + 'static,
137 Arranged<G, TraceAgent<T1>>: ArrangementSize,
138 Arranged<G, TraceAgent<T2>>: ArrangementSize;
139}
140
141impl<G, Tr> ReduceExt<G, Tr> for Arranged<G, Tr>
142where
143 G: Scope,
144 G::Timestamp: Lattice,
145 Tr: TraceReader<Time = G::Timestamp, KeyOwn: Ord> + Clone + 'static,
146{
147 fn reduce_pair<L1, Bu1, T1, L2, Bu2, T2>(
148 &self,
149 name1: &str,
150 name2: &str,
151 logic1: L1,
152 logic2: L2,
153 ) -> (Arranged<G, TraceAgent<T1>>, Arranged<G, TraceAgent<T2>>)
154 where
155 T1: for<'a> Trace<
156 Key<'a> = Tr::Key<'a>,
157 KeyOwn = Tr::KeyOwn,
158 ValOwn: Data,
159 Time = G::Timestamp,
160 Diff: Abelian,
161 > + 'static,
162 Bu1: Builder<
163 Time = G::Timestamp,
164 Input: Container
165 + MergerChunk
166 + PushInto<((T1::KeyOwn, T1::ValOwn), T1::Time, T1::Diff)>,
167 Output = T1::Batch,
168 >,
169 L1: FnMut(Tr::Key<'_>, &[(Tr::Val<'_>, Tr::Diff)], &mut Vec<(T1::ValOwn, T1::Diff)>)
170 + 'static,
171 T2: for<'a> Trace<
172 Key<'a> = Tr::Key<'a>,
173 KeyOwn = Tr::KeyOwn,
174 ValOwn: Data,
175 Time = G::Timestamp,
176 Diff: Abelian,
177 > + 'static,
178 Bu2: Builder<
179 Time = G::Timestamp,
180 Input: Container
181 + MergerChunk
182 + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>,
183 Output = T2::Batch,
184 >,
185 L2: FnMut(Tr::Key<'_>, &[(Tr::Val<'_>, Tr::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>)
186 + 'static,
187 Arranged<G, TraceAgent<T1>>: ArrangementSize,
188 Arranged<G, TraceAgent<T2>>: ArrangementSize,
189 {
190 let arranged1 = self.mz_reduce_abelian::<L1, Bu1, T1>(name1, logic1);
191 let arranged2 = self.mz_reduce_abelian::<L2, Bu2, T2>(name2, logic2);
192 (arranged1, arranged2)
193 }
194}