mz_compute/render/
threshold.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Threshold execution logic.
11//!
12//! Consult [ThresholdPlan] documentation for details.
13
14use differential_dataflow::Data;
15use differential_dataflow::IntoOwned;
16use differential_dataflow::containers::Columnation;
17use differential_dataflow::lattice::Lattice;
18use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
19use differential_dataflow::trace::{Batch, Builder, Trace, TraceReader};
20use mz_compute_types::plan::threshold::{BasicThresholdPlan, ThresholdPlan};
21use mz_expr::MirScalarExpr;
22use mz_repr::Diff;
23use timely::Container;
24use timely::container::PushInto;
25use timely::dataflow::Scope;
26use timely::progress::Timestamp;
27use timely::progress::timestamp::Refines;
28
29use crate::extensions::arrange::{ArrangementSize, KeyCollection, MzArrange};
30use crate::extensions::reduce::MzReduce;
31use crate::render::context::{ArrangementFlavor, CollectionBundle, Context};
32use crate::row_spine::RowRowBuilder;
33use crate::typedefs::{ErrBatcher, ErrBuilder};
34
35/// Shared function to compute an arrangement of values matching `logic`.
36fn threshold_arrangement<G, K, V, T1, Bu2, T2, L>(
37    arrangement: &Arranged<G, T1>,
38    name: &str,
39    logic: L,
40) -> Arranged<G, TraceAgent<T2>>
41where
42    G: Scope,
43    G::Timestamp: Lattice + Columnation,
44    V: Data + Columnation,
45    T1: TraceReader<Time = G::Timestamp, Diff = Diff> + Clone + 'static,
46    for<'a> T1::Key<'a>: IntoOwned<'a, Owned = K>,
47    for<'a> T1::Val<'a>: IntoOwned<'a, Owned = V>,
48    K: Columnation + Data,
49    Bu2: Builder<Time = G::Timestamp, Output = T2::Batch>,
50    Bu2::Input: Container + PushInto<((K, V), G::Timestamp, Diff)>,
51    T2: for<'a> Trace<
52            Key<'a> = T1::Key<'a>,
53            Val<'a> = T1::Val<'a>,
54            Time = G::Timestamp,
55            Diff = Diff,
56        > + 'static,
57    T2::Batch: Batch,
58    L: Fn(&Diff) -> bool + 'static,
59    Arranged<G, TraceAgent<T2>>: ArrangementSize,
60{
61    arrangement.mz_reduce_abelian::<_, _, _, Bu2, T2>(name, move |_key, s, t| {
62        for (record, count) in s.iter() {
63            if logic(count) {
64                t.push(((*record).into_owned(), *count));
65            }
66        }
67    })
68}
69
70/// Build a dataflow to threshold the input data.
71///
72/// This implementation maintains rows in the output, i.e. all rows that have a count greater than
73/// zero. It returns a [CollectionBundle] populated from a local arrangement.
74pub fn build_threshold_basic<G, T>(
75    input: CollectionBundle<G, T>,
76    key: Vec<MirScalarExpr>,
77) -> CollectionBundle<G, T>
78where
79    G: Scope,
80    G::Timestamp: Lattice + Refines<T> + Columnation,
81    T: Timestamp + Lattice + Columnation,
82{
83    let arrangement = input
84        .arrangement(&key)
85        .expect("Arrangement ensured to exist");
86    match arrangement {
87        ArrangementFlavor::Local(oks, errs) => {
88            let oks = threshold_arrangement::<_, _, _, _, RowRowBuilder<_, _>, _, _>(
89                &oks,
90                "Threshold local",
91                |count| count.is_positive(),
92            );
93            CollectionBundle::from_expressions(key, ArrangementFlavor::Local(oks, errs))
94        }
95        ArrangementFlavor::Trace(_, oks, errs) => {
96            let oks = threshold_arrangement::<_, _, _, _, RowRowBuilder<_, _>, _, _>(
97                &oks,
98                "Threshold trace",
99                |count| count.is_positive(),
100            );
101            let errs: KeyCollection<_, _, _> = errs.as_collection(|k, _| k.clone()).into();
102            let errs = errs
103                .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>("Arrange threshold basic err");
104            CollectionBundle::from_expressions(key, ArrangementFlavor::Local(oks, errs))
105        }
106    }
107}
108
109impl<G, T> Context<G, T>
110where
111    G: Scope,
112    G::Timestamp: Lattice + Refines<T> + Columnation,
113    T: Timestamp + Lattice + Columnation,
114{
115    pub(crate) fn render_threshold(
116        &self,
117        input: CollectionBundle<G, T>,
118        threshold_plan: ThresholdPlan,
119    ) -> CollectionBundle<G, T> {
120        match threshold_plan {
121            ThresholdPlan::Basic(BasicThresholdPlan {
122                ensure_arrangement: (key, _, _),
123            }) => {
124                // We do not need to apply the permutation here,
125                // since threshold doesn't inspect the values, but only
126                // their counts.
127                build_threshold_basic(input, key)
128            }
129        }
130    }
131}