mz_compute/render/
threshold.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Threshold execution logic.
11//!
12//! Consult [ThresholdPlan] documentation for details.
13
14use differential_dataflow::Data;
15use differential_dataflow::operators::arrange::{Arranged, TraceAgent};
16use differential_dataflow::trace::{Builder, Trace, TraceReader};
17use mz_compute_types::plan::threshold::{BasicThresholdPlan, ThresholdPlan};
18use mz_expr::MirScalarExpr;
19use mz_repr::Diff;
20use timely::Container;
21use timely::container::PushInto;
22use timely::dataflow::Scope;
23use timely::progress::timestamp::Refines;
24
25use crate::extensions::arrange::{ArrangementSize, KeyCollection, MzArrange};
26use crate::extensions::reduce::MzReduce;
27use crate::render::context::{ArrangementFlavor, CollectionBundle, Context};
28use crate::row_spine::RowRowBuilder;
29use crate::typedefs::{ErrBatcher, ErrBuilder, MzData, MzTimestamp};
30
31/// Shared function to compute an arrangement of values matching `logic`.
32fn threshold_arrangement<G, T1, Bu2, T2, L>(
33    arrangement: &Arranged<G, T1>,
34    name: &str,
35    logic: L,
36) -> Arranged<G, TraceAgent<T2>>
37where
38    G: Scope,
39    G::Timestamp: MzTimestamp,
40    T1: TraceReader<KeyOwn: MzData + Data, ValOwn: MzData + Data, Time = G::Timestamp, Diff = Diff>
41        + Clone
42        + 'static,
43    Bu2: Builder<
44            Time = G::Timestamp,
45            Input: Container + PushInto<((T1::KeyOwn, T1::ValOwn), G::Timestamp, Diff)>,
46            Output = T2::Batch,
47        >,
48    T2: for<'a> Trace<
49            Key<'a> = T1::Key<'a>,
50            Val<'a> = T1::Val<'a>,
51            KeyOwn = T1::KeyOwn,
52            ValOwn = T1::ValOwn,
53            Time = G::Timestamp,
54            Diff = Diff,
55        > + 'static,
56    L: Fn(&Diff) -> bool + 'static,
57    Arranged<G, TraceAgent<T2>>: ArrangementSize,
58{
59    arrangement.mz_reduce_abelian::<_, Bu2, T2>(name, move |_key, s, t| {
60        for (record, count) in s.iter() {
61            if logic(count) {
62                t.push((T1::owned_val(*record), *count));
63            }
64        }
65    })
66}
67
68/// Build a dataflow to threshold the input data.
69///
70/// This implementation maintains rows in the output, i.e. all rows that have a count greater than
71/// zero. It returns a [CollectionBundle] populated from a local arrangement.
72pub fn build_threshold_basic<G, T>(
73    input: CollectionBundle<G, T>,
74    key: Vec<MirScalarExpr>,
75) -> CollectionBundle<G, T>
76where
77    G: Scope,
78    G::Timestamp: MzTimestamp + Refines<T>,
79    T: MzTimestamp,
80{
81    let arrangement = input
82        .arrangement(&key)
83        .expect("Arrangement ensured to exist");
84    match arrangement {
85        ArrangementFlavor::Local(oks, errs) => {
86            let oks = threshold_arrangement::<_, _, RowRowBuilder<_, _>, _, _>(
87                &oks,
88                "Threshold local",
89                |count| count.is_positive(),
90            );
91            CollectionBundle::from_expressions(key, ArrangementFlavor::Local(oks, errs))
92        }
93        ArrangementFlavor::Trace(_, oks, errs) => {
94            let oks = threshold_arrangement::<_, _, RowRowBuilder<_, _>, _, _>(
95                &oks,
96                "Threshold trace",
97                |count| count.is_positive(),
98            );
99            let errs: KeyCollection<_, _, _> = errs.as_collection(|k, _| k.clone()).into();
100            let errs = errs
101                .mz_arrange::<ErrBatcher<_, _>, ErrBuilder<_, _>, _>("Arrange threshold basic err");
102            CollectionBundle::from_expressions(key, ArrangementFlavor::Local(oks, errs))
103        }
104    }
105}
106
107impl<G, T> Context<G, T>
108where
109    G: Scope,
110    G::Timestamp: MzTimestamp + Refines<T>,
111    T: MzTimestamp,
112{
113    pub(crate) fn render_threshold(
114        &self,
115        input: CollectionBundle<G, T>,
116        threshold_plan: ThresholdPlan,
117    ) -> CollectionBundle<G, T> {
118        match threshold_plan {
119            ThresholdPlan::Basic(BasicThresholdPlan {
120                ensure_arrangement: (key, _, _),
121            }) => {
122                // We do not need to apply the permutation here,
123                // since threshold doesn't inspect the values, but only
124                // their counts.
125                build_threshold_basic(input, key)
126            }
127        }
128    }
129}