Skip to main content

mz_compute/render/
threshold.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Threshold execution logic.
11//!
12//! Consult [ThresholdPlan] documentation for details.
13
14use differential_dataflow::Data;
15use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
16use differential_dataflow::trace::implementations::BatchContainer;
17use differential_dataflow::trace::{Builder, Trace, TraceReader};
18use mz_compute_types::plan::threshold::{BasicThresholdPlan, ThresholdPlan};
19use mz_expr::MirScalarExpr;
20use mz_repr::Diff;
21use mz_timely_util::columnation::ColumnationChunker;
22use timely::Container;
23use timely::container::PushInto;
24
25use crate::extensions::arrange::{ArrangementSize, KeyCollection, MzArrange};
26use crate::extensions::reduce::{ClearContainer, MzReduce};
27use crate::render::RenderTimestamp;
28use crate::render::context::{ArrangementFlavor, CollectionBundle, Context};
29use crate::typedefs::{ErrBatcher, ErrBuilder, MzData, MzTimestamp};
30use mz_row_spine::RowRowBuilder;
31
32/// Shared function to compute an arrangement of values matching `logic`.
33fn threshold_arrangement<'scope, Ts, T1, Bu2, T2, L>(
34    arrangement: Arranged<'scope, T1>,
35    name: &str,
36    logic: L,
37) -> Arranged<'scope, TraceAgent<T2>>
38where
39    Ts: MzTimestamp,
40    T1: TraceReader<
41            KeyContainer: BatchContainer<Owned: MzData + Data>,
42            ValOwn: MzData + Data,
43            Time = Ts,
44            Diff = Diff,
45        > + Clone
46        + 'static,
47    Bu2: Builder<
48            Time = Ts,
49            Input: Container
50                       + ClearContainer
51                       + PushInto<(
52                (<T1::KeyContainer as BatchContainer>::Owned, T1::ValOwn),
53                Ts,
54                Diff,
55            )>,
56            Output = T2::Batch,
57        >,
58    T2: for<'a> Trace<
59            Key<'a> = T1::Key<'a>,
60            Val<'a> = T1::Val<'a>,
61            KeyContainer: BatchContainer<Owned = <T1::KeyContainer as BatchContainer>::Owned>,
62            ValOwn = T1::ValOwn,
63            Time = Ts,
64            Diff = Diff,
65        > + 'static,
66    L: Fn(&Diff) -> bool + 'static,
67    Arranged<'scope, TraceAgent<T2>>: ArrangementSize,
68{
69    arrangement.mz_reduce_abelian::<_, Bu2, T2>(name, move |_key, s, t| {
70        for (record, count) in s.iter() {
71            if logic(count) {
72                t.push((T1::owned_val(*record), *count));
73            }
74        }
75    })
76}
77
78/// Build a dataflow to threshold the input data.
79///
80/// This implementation maintains rows in the output, i.e. all rows that have a count greater than
81/// zero. It returns a [CollectionBundle] populated from a local arrangement.
82pub fn build_threshold_basic<'scope, T: RenderTimestamp>(
83    input: CollectionBundle<'scope, T>,
84    key: Vec<MirScalarExpr>,
85) -> CollectionBundle<'scope, T> {
86    let arrangement = input
87        .arrangement(&key)
88        .expect("Arrangement ensured to exist");
89    match arrangement {
90        ArrangementFlavor::Local(oks, errs) => {
91            let oks = threshold_arrangement::<_, _, RowRowBuilder<_, _>, _, _>(
92                oks,
93                "Threshold local",
94                |count| count.is_positive(),
95            );
96            CollectionBundle::from_expressions(key, ArrangementFlavor::Local(oks, errs))
97        }
98        ArrangementFlavor::Trace(_, oks, errs) => {
99            let oks = threshold_arrangement::<_, _, RowRowBuilder<_, _>, _, _>(
100                oks,
101                "Threshold trace",
102                |count| count.is_positive(),
103            );
104            let errs: KeyCollection<_, _, _> = errs.as_collection(|k, _| k.clone()).into();
105            let errs = errs
106                .mz_arrange::<ColumnationChunker<_>, ErrBatcher<_, _>, ErrBuilder<_, _>, _>(
107                    "Arrange threshold basic err",
108                );
109            CollectionBundle::from_expressions(key, ArrangementFlavor::Local(oks, errs))
110        }
111    }
112}
113
114impl<'scope, T: RenderTimestamp> Context<'scope, T> {
115    pub(crate) fn render_threshold(
116        &self,
117        input: CollectionBundle<'scope, T>,
118        threshold_plan: ThresholdPlan,
119    ) -> CollectionBundle<'scope, T> {
120        match threshold_plan {
121            ThresholdPlan::Basic(BasicThresholdPlan {
122                ensure_arrangement: (key, _, _),
123            }) => {
124                // We do not need to apply the permutation here,
125                // since threshold doesn't inspect the values, but only
126                // their counts.
127                build_threshold_basic(input, key)
128            }
129        }
130    }
131}