mz_compute/extensions/
reduce.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use differential_dataflow::Data;
17use differential_dataflow::IntoOwned;
18use differential_dataflow::difference::{Abelian, Semigroup};
19use differential_dataflow::lattice::Lattice;
20use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
21use differential_dataflow::trace::{Batch, Builder, Trace, TraceReader};
22use timely::Container;
23use timely::container::PushInto;
24use timely::dataflow::Scope;
25
26use crate::extensions::arrange::ArrangementSize;
27
28/// Extension trait for the `reduce_abelian` differential dataflow method.
29pub(crate) trait MzReduce<G: Scope, T1: TraceReader<Time = G::Timestamp>>
30where
31    G::Timestamp: Lattice + Ord,
32{
33    /// Applies `reduce` to arranged data, and returns an arrangement of output data.
34    fn mz_reduce_abelian<L, K, V, Bu, T2>(
35        &self,
36        name: &str,
37        logic: L,
38    ) -> Arranged<G, TraceAgent<T2>>
39    where
40        T2: for<'a> Trace<Key<'a> = T1::Key<'a>, Time = G::Timestamp> + 'static,
41        K: Data,
42        V: Data,
43        for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>,
44        for<'a> T2::Val<'a>: IntoOwned<'a, Owned = V>,
45        T2::Diff: Abelian,
46        T2::Batch: Batch,
47        Bu: Builder<Time = G::Timestamp, Output = T2::Batch>,
48        Bu::Input: Container + PushInto<((K, V), T2::Time, T2::Diff)>,
49        L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>) + 'static,
50        Arranged<G, TraceAgent<T2>>: ArrangementSize;
51}
52
53impl<G, T1> MzReduce<G, T1> for Arranged<G, T1>
54where
55    G::Timestamp: Lattice + Ord,
56    G: Scope,
57    T1: TraceReader<Time = G::Timestamp> + Clone + 'static,
58    T1::Diff: Semigroup,
59{
60    /// Applies `reduce` to arranged data, and returns an arrangement of output data.
61    fn mz_reduce_abelian<L, K, V, Bu, T2>(
62        &self,
63        name: &str,
64        logic: L,
65    ) -> Arranged<G, TraceAgent<T2>>
66    where
67        T2: for<'a> Trace<Key<'a> = T1::Key<'a>, Time = G::Timestamp> + 'static,
68        K: Data,
69        V: Data,
70        for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>,
71        for<'a> T2::Val<'a>: IntoOwned<'a, Owned = V>,
72        T2::Diff: Abelian,
73        T2::Batch: Batch,
74        Bu: Builder<Time = G::Timestamp, Output = T2::Batch>,
75        Bu::Input: Container + PushInto<((K, V), T2::Time, T2::Diff)>,
76        L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(V, T2::Diff)>) + 'static,
77        Arranged<G, TraceAgent<T2>>: ArrangementSize,
78    {
79        // Allow access to `reduce_abelian` since we're within Mz's wrapper and force arrangement size logging.
80        #[allow(clippy::disallowed_methods)]
81        Arranged::<_, _>::reduce_abelian::<_, _, _, Bu, T2>(self, name, logic)
82            .log_arrangement_size()
83    }
84}
85
86/// Extension trait for `ReduceCore`, currently providing a reduction based
87/// on an operator-pair approach.
88pub trait ReduceExt<G: Scope, Tr: TraceReader<Time = G::Timestamp>>
89where
90    G::Timestamp: Lattice + Ord,
91{
92    /// This method produces a reduction pair based on the same input arrangement. Each reduction
93    /// in the pair operates with its own logic and the two output arrangements from the reductions
94    /// are produced as a result. The method is useful for reductions that need to present different
95    /// output views on the same input data. An example is producing an error-free reduction output
96    /// along with a separate error output indicating when the error-free output is valid.
97    fn reduce_pair<L1, K, V1, Bu1, T1, L2, V2, Bu2, T2>(
98        &self,
99        name1: &str,
100        name2: &str,
101        logic1: L1,
102        logic2: L2,
103    ) -> (Arranged<G, TraceAgent<T1>>, Arranged<G, TraceAgent<T2>>)
104    where
105        K: Data,
106        T1: Trace + for<'a> TraceReader<Key<'a> = Tr::Key<'a>, Time = G::Timestamp> + 'static,
107        for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>,
108        for<'a> T1::Val<'a>: IntoOwned<'a, Owned = V1>,
109        T1::Diff: Abelian,
110        T1::Batch: Batch,
111        Bu1: Builder<Time = G::Timestamp, Output = T1::Batch>,
112        Bu1::Input: Container + PushInto<((K, V1), T1::Time, T1::Diff)>,
113        L1: FnMut(Tr::Key<'_>, &[(Tr::Val<'_>, Tr::Diff)], &mut Vec<(V1, T1::Diff)>) + 'static,
114        V1: Data,
115        T2: Trace + for<'a> TraceReader<Key<'a> = Tr::Key<'a>, Time = G::Timestamp> + 'static,
116        for<'a> T2::Val<'a>: IntoOwned<'a, Owned = V2>,
117        for<'a> T2::Val<'a>: IntoOwned<'a, Owned = V2>,
118        T2::Diff: Abelian,
119        T2::Batch: Batch,
120        Bu2: Builder<Time = G::Timestamp, Output = T2::Batch>,
121        Bu2::Input: Container + PushInto<((K, V2), T2::Time, T2::Diff)>,
122        L2: FnMut(Tr::Key<'_>, &[(Tr::Val<'_>, Tr::Diff)], &mut Vec<(V2, T2::Diff)>) + 'static,
123        V2: Data,
124        Arranged<G, TraceAgent<T1>>: ArrangementSize,
125        Arranged<G, TraceAgent<T2>>: ArrangementSize;
126}
127
128impl<G: Scope, Tr> ReduceExt<G, Tr> for Arranged<G, Tr>
129where
130    G::Timestamp: Lattice + Ord,
131    Tr: TraceReader<Time = G::Timestamp> + Clone + 'static,
132    Tr::Diff: Semigroup,
133{
134    fn reduce_pair<L1, K, V1, Bu1, T1, L2, V2, Bu2, T2>(
135        &self,
136        name1: &str,
137        name2: &str,
138        logic1: L1,
139        logic2: L2,
140    ) -> (Arranged<G, TraceAgent<T1>>, Arranged<G, TraceAgent<T2>>)
141    where
142        K: Data,
143        T1: Trace + for<'a> TraceReader<Key<'a> = Tr::Key<'a>, Time = G::Timestamp> + 'static,
144        for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>,
145        for<'a> T1::Val<'a>: IntoOwned<'a, Owned = V1>,
146        T1::Diff: Abelian,
147        T1::Batch: Batch,
148        Bu1: Builder<Time = G::Timestamp, Output = T1::Batch>,
149        Bu1::Input: Container + PushInto<((K, V1), T1::Time, T1::Diff)>,
150        L1: FnMut(Tr::Key<'_>, &[(Tr::Val<'_>, Tr::Diff)], &mut Vec<(V1, T1::Diff)>) + 'static,
151        V1: Data,
152        T2: Trace + for<'a> TraceReader<Key<'a> = Tr::Key<'a>, Time = G::Timestamp> + 'static,
153        for<'a> T2::Val<'a>: IntoOwned<'a, Owned = V2>,
154        for<'a> T2::Val<'a>: IntoOwned<'a, Owned = V2>,
155        T2::Diff: Abelian,
156        T2::Batch: Batch,
157        Bu2: Builder<Time = G::Timestamp, Output = T2::Batch>,
158        Bu2::Input: Container + PushInto<((K, V2), T2::Time, T2::Diff)>,
159        L2: FnMut(Tr::Key<'_>, &[(Tr::Val<'_>, Tr::Diff)], &mut Vec<(V2, T2::Diff)>) + 'static,
160        V2: Data,
161        Arranged<G, TraceAgent<T1>>: ArrangementSize,
162        Arranged<G, TraceAgent<T2>>: ArrangementSize,
163    {
164        let arranged1 = self.mz_reduce_abelian::<L1, _, _, Bu1, T1>(name1, logic1);
165        let arranged2 = self.mz_reduce_abelian::<L2, _, _, Bu2, T2>(name2, logic2);
166        (arranged1, arranged2)
167    }
168}