mz_compute/extensions/
reduce.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use differential_dataflow::Data;
17use differential_dataflow::difference::Abelian;
18use differential_dataflow::lattice::Lattice;
19use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
20use differential_dataflow::trace::{Builder, Trace, TraceReader};
21use timely::Container;
22use timely::container::PushInto;
23use timely::dataflow::Scope;
24
25use crate::extensions::arrange::ArrangementSize;
26
27/// Extension trait for the `reduce_abelian` differential dataflow method.
28pub(crate) trait MzReduce<G: Scope, T1: TraceReader<Time = G::Timestamp>>
29where
30    G::Timestamp: Lattice,
31{
32    /// Applies `reduce` to arranged data, and returns an arrangement of output data.
33    fn mz_reduce_abelian<L, Bu, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
34    where
35        T2: for<'a> Trace<
36                Key<'a> = T1::Key<'a>,
37                KeyOwn = T1::KeyOwn,
38                ValOwn: Data,
39                Time = G::Timestamp,
40                Diff: Abelian,
41            > + 'static,
42        Bu: Builder<Time = G::Timestamp, Output = T2::Batch>,
43        Bu::Input: Container + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>,
44        L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>)
45            + 'static,
46        Arranged<G, TraceAgent<T2>>: ArrangementSize;
47}
48
49impl<G, T1> MzReduce<G, T1> for Arranged<G, T1>
50where
51    G: Scope,
52    G::Timestamp: Lattice,
53    T1: TraceReader<Time = G::Timestamp, KeyOwn: Ord> + Clone + 'static,
54{
55    /// Applies `reduce` to arranged data, and returns an arrangement of output data.
56    fn mz_reduce_abelian<L, Bu, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
57    where
58        T2: for<'a> Trace<
59                Key<'a> = T1::Key<'a>,
60                KeyOwn = T1::KeyOwn,
61                ValOwn: Data,
62                Time = G::Timestamp,
63                Diff: Abelian,
64            > + 'static,
65        Bu: Builder<
66                Time = G::Timestamp,
67                Input: Container + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>,
68                Output = T2::Batch,
69            >,
70        L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>)
71            + 'static,
72        Arranged<G, TraceAgent<T2>>: ArrangementSize,
73    {
74        // Allow access to `reduce_abelian` since we're within Mz's wrapper and force arrangement size logging.
75        #[allow(clippy::disallowed_methods)]
76        Arranged::<_, _>::reduce_abelian::<_, Bu, T2>(self, name, logic).log_arrangement_size()
77    }
78}
79
80/// Extension trait for `ReduceCore`, currently providing a reduction based
81/// on an operator-pair approach.
82pub trait ReduceExt<G, Tr>
83where
84    G: Scope,
85    G::Timestamp: Lattice,
86    Tr: TraceReader<Time = G::Timestamp>,
87{
88    /// This method produces a reduction pair based on the same input arrangement. Each reduction
89    /// in the pair operates with its own logic and the two output arrangements from the reductions
90    /// are produced as a result. The method is useful for reductions that need to present different
91    /// output views on the same input data. An example is producing an error-free reduction output
92    /// along with a separate error output indicating when the error-free output is valid.
93    fn reduce_pair<L1, Bu1, T1, L2, Bu2, T2>(
94        &self,
95        name1: &str,
96        name2: &str,
97        logic1: L1,
98        logic2: L2,
99    ) -> (Arranged<G, TraceAgent<T1>>, Arranged<G, TraceAgent<T2>>)
100    where
101        T1: for<'a> Trace<
102                Key<'a> = Tr::Key<'a>,
103                KeyOwn = Tr::KeyOwn,
104                ValOwn: Data,
105                Time = G::Timestamp,
106                Diff: Abelian,
107            > + 'static,
108        Bu1: Builder<
109                Time = G::Timestamp,
110                Input: Container + PushInto<((T1::KeyOwn, T1::ValOwn), T1::Time, T1::Diff)>,
111                Output = T1::Batch,
112            >,
113        L1: FnMut(Tr::Key<'_>, &[(Tr::Val<'_>, Tr::Diff)], &mut Vec<(T1::ValOwn, T1::Diff)>)
114            + 'static,
115        T2: for<'a> Trace<
116                Key<'a> = Tr::Key<'a>,
117                KeyOwn = Tr::KeyOwn,
118                ValOwn: Data,
119                Time = G::Timestamp,
120                Diff: Abelian,
121            > + 'static,
122        Bu2: Builder<
123                Time = G::Timestamp,
124                Input: Container + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>,
125                Output = T2::Batch,
126            >,
127        L2: FnMut(Tr::Key<'_>, &[(Tr::Val<'_>, Tr::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>)
128            + 'static,
129        Arranged<G, TraceAgent<T1>>: ArrangementSize,
130        Arranged<G, TraceAgent<T2>>: ArrangementSize;
131}
132
133impl<G, Tr> ReduceExt<G, Tr> for Arranged<G, Tr>
134where
135    G: Scope,
136    G::Timestamp: Lattice,
137    Tr: TraceReader<Time = G::Timestamp, KeyOwn: Ord> + Clone + 'static,
138{
139    fn reduce_pair<L1, Bu1, T1, L2, Bu2, T2>(
140        &self,
141        name1: &str,
142        name2: &str,
143        logic1: L1,
144        logic2: L2,
145    ) -> (Arranged<G, TraceAgent<T1>>, Arranged<G, TraceAgent<T2>>)
146    where
147        T1: for<'a> Trace<
148                Key<'a> = Tr::Key<'a>,
149                KeyOwn = Tr::KeyOwn,
150                ValOwn: Data,
151                Time = G::Timestamp,
152                Diff: Abelian,
153            > + 'static,
154        Bu1: Builder<
155                Time = G::Timestamp,
156                Input: Container + PushInto<((T1::KeyOwn, T1::ValOwn), T1::Time, T1::Diff)>,
157                Output = T1::Batch,
158            >,
159        L1: FnMut(Tr::Key<'_>, &[(Tr::Val<'_>, Tr::Diff)], &mut Vec<(T1::ValOwn, T1::Diff)>)
160            + 'static,
161        T2: for<'a> Trace<
162                Key<'a> = Tr::Key<'a>,
163                KeyOwn = Tr::KeyOwn,
164                ValOwn: Data,
165                Time = G::Timestamp,
166                Diff: Abelian,
167            > + 'static,
168        Bu2: Builder<
169                Time = G::Timestamp,
170                Input: Container + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>,
171                Output = T2::Batch,
172            >,
173        L2: FnMut(Tr::Key<'_>, &[(Tr::Val<'_>, Tr::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>)
174            + 'static,
175        Arranged<G, TraceAgent<T1>>: ArrangementSize,
176        Arranged<G, TraceAgent<T2>>: ArrangementSize,
177    {
178        let arranged1 = self.mz_reduce_abelian::<L1, Bu1, T1>(name1, logic1);
179        let arranged2 = self.mz_reduce_abelian::<L2, Bu2, T2>(name2, logic2);
180        (arranged1, arranged2)
181    }
182}