mz_compute/render/
threshold.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Threshold execution logic.
11//!
12//! Consult [ThresholdPlan] documentation for details.
13
14use differential_dataflow::Data;
15use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
16use differential_dataflow::trace::implementations::merge_batcher::container::MergerChunk;
17use differential_dataflow::trace::{Builder, Trace, TraceReader};
18use mz_compute_types::plan::threshold::{BasicThresholdPlan, ThresholdPlan};
19use mz_expr::MirScalarExpr;
20use mz_repr::Diff;
21use timely::Container;
22use timely::container::PushInto;
23use timely::dataflow::Scope;
24use timely::progress::timestamp::Refines;
25
26use crate::extensions::arrange::{ArrangementSize, KeyCollection, MzArrange};
27use crate::extensions::reduce::MzReduce;
28use crate::render::context::{ArrangementFlavor, CollectionBundle, Context};
29use crate::row_spine::RowRowBuilder;
30use crate::typedefs::{ErrBatcher, ErrBuilder, MzData, MzTimestamp};
31
32/// Shared function to compute an arrangement of values matching `logic`.
33fn threshold_arrangement<G, T1, Bu2, T2, L>(
34    arrangement: &Arranged<G, T1>,
35    name: &str,
36    logic: L,
37) -> Arranged<G, TraceAgent<T2>>
38where
39    G: Scope,
40    G::Timestamp: MzTimestamp,
41    T1: TraceReader<KeyOwn: MzData + Data, ValOwn: MzData + Data, Time = G::Timestamp, Diff = Diff>
42        + Clone
43        + 'static,
44    Bu2: Builder<
45            Time = G::Timestamp,
46            Input: Container
47                       + MergerChunk
48                       + PushInto<((T1::KeyOwn, T1::ValOwn), G::Timestamp, Diff)>,
49            Output = T2::Batch,
50        >,
51    T2: for<'a> Trace<
52            Key<'a> = T1::Key<'a>,
53            Val<'a> = T1::Val<'a>,
54            KeyOwn = T1::KeyOwn,
55            ValOwn = T1::ValOwn,
56            Time = G::Timestamp,
57            Diff = Diff,
58        > + 'static,
59    L: Fn(&Diff) -> bool + 'static,
60    Arranged<G, TraceAgent<T2>>: ArrangementSize,
61{
62    arrangement.mz_reduce_abelian::<_, Bu2, T2>(name, move |_key, s, t| {
63        for (record, count) in s.iter() {
64            if logic(count) {
65                t.push((T1::owned_val(*record), *count));
66            }
67        }
68    })
69}
70
71/// Build a dataflow to threshold the input data.
72///
73/// This implementation maintains rows in the output, i.e. all rows that have a count greater than
74/// zero. It returns a [CollectionBundle] populated from a local arrangement.
75pub fn build_threshold_basic<G, T>(
76    input: CollectionBundle<G, T>,
77    key: Vec<MirScalarExpr>,
78) -> CollectionBundle<G, T>
79where
80    G: Scope,
81    G::Timestamp: MzTimestamp + Refines<T>,
82    T: MzTimestamp,
83{
84    let arrangement = input
85        .arrangement(&key)
86        .expect("Arrangement ensured to exist");
87    match arrangement {
88        ArrangementFlavor::Local(oks, errs) => {
89            let oks = threshold_arrangement::<_, _, RowRowBuilder<_, _>, _, _>(
90                &oks,
91                "Threshold local",
92                |count| count.is_positive(),
93            );
94            CollectionBundle::from_expressions(key, ArrangementFlavor::Local(oks, errs))
95        }
96        ArrangementFlavor::Trace(_, oks, errs) => {
97            let oks = threshold_arrangement::<_, _, RowRowBuilder<_, _>, _, _>(
98                &oks,
99                "Threshold trace",
100                |count| count.is_positive(),
101            );
102            let errs: KeyCollection<_, _, _> = errs.as_collection(|k, _| k.clone()).into();
103            let errs = errs
104                .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>("Arrange threshold basic err");
105            CollectionBundle::from_expressions(key, ArrangementFlavor::Local(oks, errs))
106        }
107    }
108}
109
110impl<G, T> Context<G, T>
111where
112    G: Scope,
113    G::Timestamp: MzTimestamp + Refines<T>,
114    T: MzTimestamp,
115{
116    pub(crate) fn render_threshold(
117        &self,
118        input: CollectionBundle<G, T>,
119        threshold_plan: ThresholdPlan,
120    ) -> CollectionBundle<G, T> {
121        match threshold_plan {
122            ThresholdPlan::Basic(BasicThresholdPlan {
123                ensure_arrangement: (key, _, _),
124            }) => {
125                // We do not need to apply the permutation here,
126                // since threshold doesn't inspect the values, but only
127                // their counts.
128                build_threshold_basic(input, key)
129            }
130        }
131    }
132}