mz_compute_types/plan/
threshold.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Threshold planning logic.
11//!
12//! The threshold operator produces only rows with a positive cardinality, for example required to
13//! provide SQL except and intersect semantics.
14//!
15//! We build a plan ([ThresholdPlan]) encapsulating all decisions and requirements on the specific
16//! threshold implementation. The idea is to decouple the logic deciding which plan to select from
17//! the actual implementation of each variant available.
18//!
19//! Currently, we provide two variants:
20//! * The [BasicThresholdPlan] maintains all its outputs as an arrangement. It is beneficial if the
21//!     threshold is the final operation, or a downstream operators expects arranged inputs.
22//! * The [RetractionsThresholdPlan] maintains retractions, i.e. rows that are not in the output. It
23//!     is beneficial to use this operator if the number of retractions is expected to be small, and
24//!     if a potential downstream operator does not expect its input to be arranged.
25
26use mz_expr::{MirScalarExpr, permutation_for_arrangement};
27use mz_proto::{ProtoType, RustType, TryFromProtoError};
28use mz_repr::ColumnType;
29use proptest_derive::Arbitrary;
30use serde::{Deserialize, Serialize};
31
32use crate::plan::{AvailableCollections, any_arranged_thin};
33
34include!(concat!(
35    env!("OUT_DIR"),
36    "/mz_compute_types.plan.threshold.rs"
37));
38
39/// A plan describing how to compute a threshold operation.
40#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
41pub enum ThresholdPlan {
42    /// Basic threshold maintains all positive inputs.
43    Basic(BasicThresholdPlan),
44}
45
46impl RustType<ProtoThresholdPlan> for ThresholdPlan {
47    fn into_proto(&self) -> ProtoThresholdPlan {
48        use proto_threshold_plan::Kind::*;
49        ProtoThresholdPlan {
50            kind: Some(match self {
51                ThresholdPlan::Basic(p) => Basic(p.ensure_arrangement.into_proto()),
52            }),
53        }
54    }
55
56    fn from_proto(proto: ProtoThresholdPlan) -> Result<Self, TryFromProtoError> {
57        use proto_threshold_plan::Kind::*;
58        let kind = proto
59            .kind
60            .ok_or_else(|| TryFromProtoError::missing_field("ProtoThresholdPlan::kind"))?;
61        Ok(match kind {
62            Basic(p) => ThresholdPlan::Basic(BasicThresholdPlan {
63                ensure_arrangement: p.into_rust()?,
64            }),
65        })
66    }
67}
68
69impl RustType<ProtoArrangement> for (Vec<MirScalarExpr>, Vec<usize>, Vec<usize>) {
70    fn into_proto(&self) -> ProtoArrangement {
71        ProtoArrangement {
72            all_columns: self.0.into_proto(),
73            permutation: self.1.iter().map(|x| x.into_proto()).collect(),
74            thinning: self.2.iter().map(|x| x.into_proto()).collect(),
75        }
76    }
77
78    fn from_proto(proto: ProtoArrangement) -> Result<Self, TryFromProtoError> {
79        Ok((
80            proto.all_columns.into_rust()?,
81            proto.permutation.into_rust()?,
82            proto.thinning.into_rust()?,
83        ))
84    }
85}
86
87impl ThresholdPlan {
88    /// Reports all keys of produced arrangements, with optionally
89    /// given types describing the rows that would be in the raw
90    /// form of the collection.
91    ///
92    /// This is likely either an empty vector, for no arrangement,
93    /// or a singleton vector containing the list of expressions
94    /// that key a single arrangement.
95    pub fn keys(&self, types: Option<Vec<ColumnType>>) -> AvailableCollections {
96        match self {
97            ThresholdPlan::Basic(plan) => {
98                AvailableCollections::new_arranged(vec![plan.ensure_arrangement.clone()], types)
99            }
100        }
101    }
102}
103
104/// A plan to maintain all inputs with positive counts.
105#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
106pub struct BasicThresholdPlan {
107    /// Description of how the input has been arranged, and how to arrange the output
108    #[proptest(strategy = "any_arranged_thin()")]
109    pub ensure_arrangement: (Vec<MirScalarExpr>, Vec<usize>, Vec<usize>),
110}
111
112/// A plan to maintain all inputs with negative counts, which are subtracted from the output
113/// in order to maintain an equivalent collection compared to [BasicThresholdPlan].
114#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
115pub struct RetractionsThresholdPlan {
116    /// Description of how the input has been arranged
117    #[proptest(strategy = "any_arranged_thin()")]
118    pub ensure_arrangement: (Vec<MirScalarExpr>, Vec<usize>, Vec<usize>),
119}
120
121impl ThresholdPlan {
122    /// Construct the plan from the number of columns (`arity`).
123    ///
124    /// Also returns the arrangement and thinning required for the input.
125    pub fn create_from(arity: usize) -> (Self, (Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)) {
126        // Arrange the input by all columns in order.
127        let mut all_columns = Vec::new();
128        for column in 0..arity {
129            all_columns.push(mz_expr::MirScalarExpr::Column(column));
130        }
131        let (permutation, thinning) = permutation_for_arrangement(&all_columns, arity);
132        let ensure_arrangement = (all_columns, permutation, thinning);
133        let plan = ThresholdPlan::Basic(BasicThresholdPlan {
134            ensure_arrangement: ensure_arrangement.clone(),
135        });
136        (plan, ensure_arrangement)
137    }
138}
139
140#[cfg(test)]
141mod tests {
142    use mz_ore::assert_ok;
143    use mz_proto::protobuf_roundtrip;
144    use proptest::prelude::*;
145
146    use super::*;
147
148    proptest! {
149       #[mz_ore::test]
150        fn threshold_plan_protobuf_roundtrip(expect in any::<ThresholdPlan>() ) {
151            let actual = protobuf_roundtrip::<_, ProtoThresholdPlan>(&expect);
152            assert_ok!(actual);
153            assert_eq!(actual.unwrap(), expect);
154        }
155    }
156}