mz_compute/extensions/
reduce.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use differential_dataflow::Data;
17use differential_dataflow::difference::Abelian;
18use differential_dataflow::lattice::Lattice;
19use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
20use differential_dataflow::trace::implementations::merge_batcher::container::MergerChunk;
21use differential_dataflow::trace::{Builder, Trace, TraceReader};
22use timely::Container;
23use timely::container::PushInto;
24use timely::dataflow::Scope;
25
26use crate::extensions::arrange::ArrangementSize;
27
28/// Extension trait for the `reduce_abelian` differential dataflow method.
29pub(crate) trait MzReduce<G: Scope, T1: TraceReader<Time = G::Timestamp>>
30where
31    G::Timestamp: Lattice,
32{
33    /// Applies `reduce` to arranged data, and returns an arrangement of output data.
34    fn mz_reduce_abelian<L, Bu, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
35    where
36        T2: for<'a> Trace<
37                Key<'a> = T1::Key<'a>,
38                KeyOwn = T1::KeyOwn,
39                ValOwn: Data,
40                Time = G::Timestamp,
41                Diff: Abelian,
42            > + 'static,
43        Bu: Builder<Time = G::Timestamp, Output = T2::Batch>,
44        Bu::Input:
45            Container + MergerChunk + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>,
46        L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>)
47            + 'static,
48        Arranged<G, TraceAgent<T2>>: ArrangementSize;
49}
50
51impl<G, T1> MzReduce<G, T1> for Arranged<G, T1>
52where
53    G: Scope,
54    G::Timestamp: Lattice,
55    T1: TraceReader<Time = G::Timestamp, KeyOwn: Ord> + Clone + 'static,
56{
57    /// Applies `reduce` to arranged data, and returns an arrangement of output data.
58    fn mz_reduce_abelian<L, Bu, T2>(&self, name: &str, logic: L) -> Arranged<G, TraceAgent<T2>>
59    where
60        T2: for<'a> Trace<
61                Key<'a> = T1::Key<'a>,
62                KeyOwn = T1::KeyOwn,
63                ValOwn: Data,
64                Time = G::Timestamp,
65                Diff: Abelian,
66            > + 'static,
67        Bu: Builder<
68                Time = G::Timestamp,
69                Input: Container
70                           + MergerChunk
71                           + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>,
72                Output = T2::Batch,
73            >,
74        L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>)
75            + 'static,
76        Arranged<G, TraceAgent<T2>>: ArrangementSize,
77    {
78        // Allow access to `reduce_abelian` since we're within Mz's wrapper and force arrangement size logging.
79        #[allow(clippy::disallowed_methods)]
80        Arranged::<_, _>::reduce_abelian::<_, Bu, T2>(self, name, logic).log_arrangement_size()
81    }
82}
83
84/// Extension trait for `ReduceCore`, currently providing a reduction based
85/// on an operator-pair approach.
86pub trait ReduceExt<G, Tr>
87where
88    G: Scope,
89    G::Timestamp: Lattice,
90    Tr: TraceReader<Time = G::Timestamp>,
91{
92    /// This method produces a reduction pair based on the same input arrangement. Each reduction
93    /// in the pair operates with its own logic and the two output arrangements from the reductions
94    /// are produced as a result. The method is useful for reductions that need to present different
95    /// output views on the same input data. An example is producing an error-free reduction output
96    /// along with a separate error output indicating when the error-free output is valid.
97    fn reduce_pair<L1, Bu1, T1, L2, Bu2, T2>(
98        &self,
99        name1: &str,
100        name2: &str,
101        logic1: L1,
102        logic2: L2,
103    ) -> (Arranged<G, TraceAgent<T1>>, Arranged<G, TraceAgent<T2>>)
104    where
105        T1: for<'a> Trace<
106                Key<'a> = Tr::Key<'a>,
107                KeyOwn = Tr::KeyOwn,
108                ValOwn: Data,
109                Time = G::Timestamp,
110                Diff: Abelian,
111            > + 'static,
112        Bu1: Builder<
113                Time = G::Timestamp,
114                Input: Container
115                           + MergerChunk
116                           + PushInto<((T1::KeyOwn, T1::ValOwn), T1::Time, T1::Diff)>,
117                Output = T1::Batch,
118            >,
119        L1: FnMut(Tr::Key<'_>, &[(Tr::Val<'_>, Tr::Diff)], &mut Vec<(T1::ValOwn, T1::Diff)>)
120            + 'static,
121        T2: for<'a> Trace<
122                Key<'a> = Tr::Key<'a>,
123                KeyOwn = Tr::KeyOwn,
124                ValOwn: Data,
125                Time = G::Timestamp,
126                Diff: Abelian,
127            > + 'static,
128        Bu2: Builder<
129                Time = G::Timestamp,
130                Input: Container
131                           + MergerChunk
132                           + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>,
133                Output = T2::Batch,
134            >,
135        L2: FnMut(Tr::Key<'_>, &[(Tr::Val<'_>, Tr::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>)
136            + 'static,
137        Arranged<G, TraceAgent<T1>>: ArrangementSize,
138        Arranged<G, TraceAgent<T2>>: ArrangementSize;
139}
140
141impl<G, Tr> ReduceExt<G, Tr> for Arranged<G, Tr>
142where
143    G: Scope,
144    G::Timestamp: Lattice,
145    Tr: TraceReader<Time = G::Timestamp, KeyOwn: Ord> + Clone + 'static,
146{
147    fn reduce_pair<L1, Bu1, T1, L2, Bu2, T2>(
148        &self,
149        name1: &str,
150        name2: &str,
151        logic1: L1,
152        logic2: L2,
153    ) -> (Arranged<G, TraceAgent<T1>>, Arranged<G, TraceAgent<T2>>)
154    where
155        T1: for<'a> Trace<
156                Key<'a> = Tr::Key<'a>,
157                KeyOwn = Tr::KeyOwn,
158                ValOwn: Data,
159                Time = G::Timestamp,
160                Diff: Abelian,
161            > + 'static,
162        Bu1: Builder<
163                Time = G::Timestamp,
164                Input: Container
165                           + MergerChunk
166                           + PushInto<((T1::KeyOwn, T1::ValOwn), T1::Time, T1::Diff)>,
167                Output = T1::Batch,
168            >,
169        L1: FnMut(Tr::Key<'_>, &[(Tr::Val<'_>, Tr::Diff)], &mut Vec<(T1::ValOwn, T1::Diff)>)
170            + 'static,
171        T2: for<'a> Trace<
172                Key<'a> = Tr::Key<'a>,
173                KeyOwn = Tr::KeyOwn,
174                ValOwn: Data,
175                Time = G::Timestamp,
176                Diff: Abelian,
177            > + 'static,
178        Bu2: Builder<
179                Time = G::Timestamp,
180                Input: Container
181                           + MergerChunk
182                           + PushInto<((T1::KeyOwn, T2::ValOwn), T2::Time, T2::Diff)>,
183                Output = T2::Batch,
184            >,
185        L2: FnMut(Tr::Key<'_>, &[(Tr::Val<'_>, Tr::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>)
186            + 'static,
187        Arranged<G, TraceAgent<T1>>: ArrangementSize,
188        Arranged<G, TraceAgent<T2>>: ArrangementSize,
189    {
190        let arranged1 = self.mz_reduce_abelian::<L1, Bu1, T1>(name1, logic1);
191        let arranged2 = self.mz_reduce_abelian::<L2, Bu2, T2>(name2, logic2);
192        (arranged1, arranged2)
193    }
194}