1use differential_dataflow::Data;
17use differential_dataflow::difference::Abelian;
18use differential_dataflow::lattice::Lattice;
19use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
20use differential_dataflow::trace::{Builder, Trace, TraceReader};
21use timely::Container;
22use timely::container::PushInto;
23use timely::dataflow::Scope;
24
25use crate::extensions::arrange::ArrangementSize;
26
27pub(crate) trait MzReduce<G: Scope, T1: TraceReader<Time = G::Timestamp>>
29where
30 G::Timestamp: Lattice,
31{
32 fn mz_reduce_abelian<L, Bu, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
34 where
35 T2: for<'a> Trace<
36 Key<'a> = T1::Key<'a>,
37 KeyOwn = T1::KeyOwn,
38 ValOwn: Data,
39 Time = G::Timestamp,
40 Diff: Abelian,
41 > + 'static,
42 Bu: Builder<Time = G::Timestamp, Output = T2::Batch>,
43 Bu::Input: Container + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>,
44 L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>)
45 + 'static,
46 Arranged<G, TraceAgent<T2>>: ArrangementSize;
47}
48
49impl<G, T1> MzReduce<G, T1> for Arranged<G, T1>
50where
51 G: Scope,
52 G::Timestamp: Lattice,
53 T1: TraceReader<Time = G::Timestamp, KeyOwn: Ord> + Clone + 'static,
54{
55 fn mz_reduce_abelian<L, Bu, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
57 where
58 T2: for<'a> Trace<
59 Key<'a> = T1::Key<'a>,
60 KeyOwn = T1::KeyOwn,
61 ValOwn: Data,
62 Time = G::Timestamp,
63 Diff: Abelian,
64 > + 'static,
65 Bu: Builder<
66 Time = G::Timestamp,
67 Input: Container + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>,
68 Output = T2::Batch,
69 >,
70 L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>)
71 + 'static,
72 Arranged<G, TraceAgent<T2>>: ArrangementSize,
73 {
74 #[allow(clippy::disallowed_methods)]
76 Arranged::<_, _>::reduce_abelian::<_, Bu, T2>(self, name, logic).log_arrangement_size()
77 }
78}
79
80pub trait ReduceExt<G, Tr>
83where
84 G: Scope,
85 G::Timestamp: Lattice,
86 Tr: TraceReader<Time = G::Timestamp>,
87{
88 fn reduce_pair<L1, Bu1, T1, L2, Bu2, T2>(
94 &self,
95 name1: &str,
96 name2: &str,
97 logic1: L1,
98 logic2: L2,
99 ) -> (Arranged<G, TraceAgent<T1>>, Arranged<G, TraceAgent<T2>>)
100 where
101 T1: for<'a> Trace<
102 Key<'a> = Tr::Key<'a>,
103 KeyOwn = Tr::KeyOwn,
104 ValOwn: Data,
105 Time = G::Timestamp,
106 Diff: Abelian,
107 > + 'static,
108 Bu1: Builder<
109 Time = G::Timestamp,
110 Input: Container + PushInto<((T1::KeyOwn, T1::ValOwn), T1::Time, T1::Diff)>,
111 Output = T1::Batch,
112 >,
113 L1: FnMut(Tr::Key<'_>, &[(Tr::Val<'_>, Tr::Diff)], &mut Vec<(T1::ValOwn, T1::Diff)>)
114 + 'static,
115 T2: for<'a> Trace<
116 Key<'a> = Tr::Key<'a>,
117 KeyOwn = Tr::KeyOwn,
118 ValOwn: Data,
119 Time = G::Timestamp,
120 Diff: Abelian,
121 > + 'static,
122 Bu2: Builder<
123 Time = G::Timestamp,
124 Input: Container + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>,
125 Output = T2::Batch,
126 >,
127 L2: FnMut(Tr::Key<'_>, &[(Tr::Val<'_>, Tr::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>)
128 + 'static,
129 Arranged<G, TraceAgent<T1>>: ArrangementSize,
130 Arranged<G, TraceAgent<T2>>: ArrangementSize;
131}
132
133impl<G, Tr> ReduceExt<G, Tr> for Arranged<G, Tr>
134where
135 G: Scope,
136 G::Timestamp: Lattice,
137 Tr: TraceReader<Time = G::Timestamp, KeyOwn: Ord> + Clone + 'static,
138{
139 fn reduce_pair<L1, Bu1, T1, L2, Bu2, T2>(
140 &self,
141 name1: &str,
142 name2: &str,
143 logic1: L1,
144 logic2: L2,
145 ) -> (Arranged<G, TraceAgent<T1>>, Arranged<G, TraceAgent<T2>>)
146 where
147 T1: for<'a> Trace<
148 Key<'a> = Tr::Key<'a>,
149 KeyOwn = Tr::KeyOwn,
150 ValOwn: Data,
151 Time = G::Timestamp,
152 Diff: Abelian,
153 > + 'static,
154 Bu1: Builder<
155 Time = G::Timestamp,
156 Input: Container + PushInto<((T1::KeyOwn, T1::ValOwn), T1::Time, T1::Diff)>,
157 Output = T1::Batch,
158 >,
159 L1: FnMut(Tr::Key<'_>, &[(Tr::Val<'_>, Tr::Diff)], &mut Vec<(T1::ValOwn, T1::Diff)>)
160 + 'static,
161 T2: for<'a> Trace<
162 Key<'a> = Tr::Key<'a>,
163 KeyOwn = Tr::KeyOwn,
164 ValOwn: Data,
165 Time = G::Timestamp,
166 Diff: Abelian,
167 > + 'static,
168 Bu2: Builder<
169 Time = G::Timestamp,
170 Input: Container + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>,
171 Output = T2::Batch,
172 >,
173 L2: FnMut(Tr::Key<'_>, &[(Tr::Val<'_>, Tr::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>)
174 + 'static,
175 Arranged<G, TraceAgent<T1>>: ArrangementSize,
176 Arranged<G, TraceAgent<T2>>: ArrangementSize,
177 {
178 let arranged1 = self.mz_reduce_abelian::<L1, Bu1, T1>(name1, logic1);
179 let arranged2 = self.mz_reduce_abelian::<L2, Bu2, T2>(name2, logic2);
180 (arranged1, arranged2)
181 }
182}