Skip to main content

mz_compute/extensions/
reduce.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use columnation::Columnation;
17use differential_dataflow::Data;
18use differential_dataflow::difference::Abelian;
19use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
20use differential_dataflow::trace::implementations::{BatchContainer, LayoutExt};
21use differential_dataflow::trace::{Builder, Trace, TraceReader};
22use mz_timely_util::columnation::ColumnationStack;
23use timely::Container;
24use timely::container::PushInto;
25
26use crate::extensions::arrange::ArrangementSize;
27
28type KeyOwn<Tr> = <<Tr as LayoutExt>::KeyContainer as BatchContainer>::Owned;
29
30pub trait ClearContainer {
31    fn clear(&mut self);
32}
33
34impl<T> ClearContainer for Vec<T> {
35    fn clear(&mut self) {
36        Vec::clear(self)
37    }
38}
39
40impl<D, T, R> ClearContainer for ColumnationStack<(D, T, R)>
41where
42    D: Columnation + Clone + 'static,
43    T: Columnation + Clone + 'static,
44    R: Columnation + Clone + 'static,
45{
46    fn clear(&mut self) {
47        ColumnationStack::clear(self)
48    }
49}
50
51/// Extension trait for the `reduce_abelian` differential dataflow method.
52pub(crate) trait MzReduce<'scope, T1: TraceReader> {
53    /// Applies `reduce` to arranged data, and returns an arrangement of output data.
54    fn mz_reduce_abelian<L, Bu, T2>(self, name: &str, logic: L) -> Arranged<'scope, TraceAgent<T2>>
55    where
56        T2: for<'a> Trace<Key<'a> = T1::Key<'a>, ValOwn: Data, Time = T1::Time, Diff: Abelian>
57            + 'static,
58        Bu: Builder<Time = T1::Time, Output = T2::Batch>,
59        Bu::Input: Container
60            + Default
61            + ClearContainer
62            + PushInto<((KeyOwn<T1>, T2::ValOwn), T2::Time, T2::Diff)>,
63        L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>)
64            + 'static,
65        Arranged<'scope, TraceAgent<T2>>: ArrangementSize;
66}
67
68impl<'scope, T1> MzReduce<'scope, T1> for Arranged<'scope, T1>
69where
70    T1: TraceReader + Clone + 'static,
71{
72    /// Applies `reduce` to arranged data, and returns an arrangement of output data.
73    fn mz_reduce_abelian<L, Bu, T2>(self, name: &str, logic: L) -> Arranged<'scope, TraceAgent<T2>>
74    where
75        T2: for<'a> Trace<Key<'a> = T1::Key<'a>, ValOwn: Data, Time = T1::Time, Diff: Abelian>
76            + 'static,
77        Bu: Builder<
78                Time = T1::Time,
79                Input: Container
80                           + Default
81                           + ClearContainer
82                           + PushInto<((KeyOwn<T1>, T2::ValOwn), T2::Time, T2::Diff)>,
83                Output = T2::Batch,
84            >,
85        L: FnMut(T1::Key<'_>, &[(T1::Val<'_>, T1::Diff)], &mut Vec<(T2::ValOwn, T2::Diff)>)
86            + 'static,
87        Arranged<'scope, TraceAgent<T2>>: ArrangementSize,
88    {
89        // Construct a push closure for `reduce_abelian`.
90        use differential_dataflow::trace::implementations::BatchContainer;
91        let push_closure =
92            |buf: &mut Bu::Input,
93             key: T1::Key<'_>,
94             updates: &mut Vec<(T2::ValOwn, T2::Time, T2::Diff)>| {
95                buf.clear();
96                let key_owned = <T1::KeyContainer as BatchContainer>::into_owned(key);
97                for (val, time, diff) in updates.drain(..) {
98                    buf.push_into(((key_owned.clone(), val), time, diff));
99                }
100            };
101
102        // Allow access to `reduce_abelian` since we're within Mz's wrapper and force arrangement size logging.
103        #[allow(clippy::disallowed_methods)]
104        Arranged::<_>::reduce_abelian::<_, Bu, T2, _>(self, name, logic, push_closure)
105            .log_arrangement_size()
106    }
107}