1use differential_dataflow::Data;
17use differential_dataflow::IntoOwned;
18use differential_dataflow::difference::{Abelian, Semigroup};
19use differential_dataflow::lattice::Lattice;
20use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
21use differential_dataflow::trace::{Batch, Builder, Trace, TraceReader};
22use timely::Container;
23use timely::container::PushInto;
24use timely::dataflow::Scope;
25
26use crate::extensions::arrange::ArrangementSize;
27
28pub(crate) trait MzReduce<G: Scope, T1: TraceReader<Time = G::Timestamp>>
30where
31 G::Timestamp: Lattice + Ord,
32{
33 fn mz_reduce_abelian<L, K, V, Bu, T2>(
35 &self,
36 name: &str,
37 logic: L,
38 ) -> Arranged<G, TraceAgent<T2>>
39 where
40 T2: for<'a> Trace<Key<'a> = T1::Key<'a>, Time = G::Timestamp> + 'static,
41 K: Data,
42 V: Data,
43 for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>,
44 for<'a> T2::Val<'a>: IntoOwned<'a, Owned = V>,
45 T2::Diff: Abelian,
46 T2::Batch: Batch,
47 Bu: Builder<Time = G::Timestamp, Output = T2::Batch>,
48 Bu::Input: Container + PushInto<((K, V), T2::Time, T2::Diff)>,
49 L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>) + 'static,
50 Arranged<G, TraceAgent<T2>>: ArrangementSize;
51}
52
53impl<G, T1> MzReduce<G, T1> for Arranged<G, T1>
54where
55 G::Timestamp: Lattice + Ord,
56 G: Scope,
57 T1: TraceReader<Time = G::Timestamp> + Clone + 'static,
58 T1::Diff: Semigroup,
59{
60 fn mz_reduce_abelian<L, K, V, Bu, T2>(
62 &self,
63 name: &str,
64 logic: L,
65 ) -> Arranged<G, TraceAgent<T2>>
66 where
67 T2: for<'a> Trace<Key<'a> = T1::Key<'a>, Time = G::Timestamp> + 'static,
68 K: Data,
69 V: Data,
70 for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>,
71 for<'a> T2::Val<'a>: IntoOwned<'a, Owned = V>,
72 T2::Diff: Abelian,
73 T2::Batch: Batch,
74 Bu: Builder<Time = G::Timestamp, Output = T2::Batch>,
75 Bu::Input: Container + PushInto<((K, V), T2::Time, T2::Diff)>,
76 L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>) + 'static,
77 Arranged<G, TraceAgent<T2>>: ArrangementSize,
78 {
79 #[allow(clippy::disallowed_methods)]
81 Arranged::<_, _>::reduce_abelian::<_, _, _, Bu, T2>(self, name, logic)
82 .log_arrangement_size()
83 }
84}
85
86pub trait ReduceExt<G: Scope, Tr: TraceReader<Time = G::Timestamp>>
89where
90 G::Timestamp: Lattice + Ord,
91{
92 fn reduce_pair<L1, K, V1, Bu1, T1, L2, V2, Bu2, T2>(
98 &self,
99 name1: &str,
100 name2: &str,
101 logic1: L1,
102 logic2: L2,
103 ) -> (Arranged<G, TraceAgent<T1>>, Arranged<G, TraceAgent<T2>>)
104 where
105 K: Data,
106 T1: Trace + for<'a> TraceReader<Key<'a> = Tr::Key<'a>, Time = G::Timestamp> + 'static,
107 for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>,
108 for<'a> T1::Val<'a>: IntoOwned<'a, Owned = V1>,
109 T1::Diff: Abelian,
110 T1::Batch: Batch,
111 Bu1: Builder<Time = G::Timestamp, Output = T1::Batch>,
112 Bu1::Input: Container + PushInto<((K, V1), T1::Time, T1::Diff)>,
113 L1: FnMut(Tr::Key<'_>, &[(Tr::Val<'_>, Tr::Diff)], &mut Vec<(V1, T1::Diff)>) + 'static,
114 V1: Data,
115 T2: Trace + for<'a> TraceReader<Key<'a> = Tr::Key<'a>, Time = G::Timestamp> + 'static,
116 for<'a> T2::Val<'a>: IntoOwned<'a, Owned = V2>,
117 for<'a> T2::Val<'a>: IntoOwned<'a, Owned = V2>,
118 T2::Diff: Abelian,
119 T2::Batch: Batch,
120 Bu2: Builder<Time = G::Timestamp, Output = T2::Batch>,
121 Bu2::Input: Container + PushInto<((K, V2), T2::Time, T2::Diff)>,
122 L2: FnMut(Tr::Key<'_>, &[(Tr::Val<'_>, Tr::Diff)], &mut Vec<(V2, T2::Diff)>) + 'static,
123 V2: Data,
124 Arranged<G, TraceAgent<T1>>: ArrangementSize,
125 Arranged<G, TraceAgent<T2>>: ArrangementSize;
126}
127
128impl<G: Scope, Tr> ReduceExt<G, Tr> for Arranged<G, Tr>
129where
130 G::Timestamp: Lattice + Ord,
131 Tr: TraceReader<Time = G::Timestamp> + Clone + 'static,
132 Tr::Diff: Semigroup,
133{
134 fn reduce_pair<L1, K, V1, Bu1, T1, L2, V2, Bu2, T2>(
135 &self,
136 name1: &str,
137 name2: &str,
138 logic1: L1,
139 logic2: L2,
140 ) -> (Arranged<G, TraceAgent<T1>>, Arranged<G, TraceAgent<T2>>)
141 where
142 K: Data,
143 T1: Trace + for<'a> TraceReader<Key<'a> = Tr::Key<'a>, Time = G::Timestamp> + 'static,
144 for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>,
145 for<'a> T1::Val<'a>: IntoOwned<'a, Owned = V1>,
146 T1::Diff: Abelian,
147 T1::Batch: Batch,
148 Bu1: Builder<Time = G::Timestamp, Output = T1::Batch>,
149 Bu1::Input: Container + PushInto<((K, V1), T1::Time, T1::Diff)>,
150 L1: FnMut(Tr::Key<'_>, &[(Tr::Val<'_>, Tr::Diff)], &mut Vec<(V1, T1::Diff)>) + 'static,
151 V1: Data,
152 T2: Trace + for<'a> TraceReader<Key<'a> = Tr::Key<'a>, Time = G::Timestamp> + 'static,
153 for<'a> T2::Val<'a>: IntoOwned<'a, Owned = V2>,
154 for<'a> T2::Val<'a>: IntoOwned<'a, Owned = V2>,
155 T2::Diff: Abelian,
156 T2::Batch: Batch,
157 Bu2: Builder<Time = G::Timestamp, Output = T2::Batch>,
158 Bu2::Input: Container + PushInto<((K, V2), T2::Time, T2::Diff)>,
159 L2: FnMut(Tr::Key<'_>, &[(Tr::Val<'_>, Tr::Diff)], &mut Vec<(V2, T2::Diff)>) + 'static,
160 V2: Data,
161 Arranged<G, TraceAgent<T1>>: ArrangementSize,
162 Arranged<G, TraceAgent<T2>>: ArrangementSize,
163 {
164 let arranged1 = self.mz_reduce_abelian::<L1, _, _, Bu1, T1>(name1, logic1);
165 let arranged2 = self.mz_reduce_abelian::<L2, _, _, Bu2, T2>(name2, logic2);
166 (arranged1, arranged2)
167 }
168}