mz_compute_types/plan/
threshold.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Threshold planning logic.
11//!
12//! The threshold operator produces only rows with a positive cardinality, for example required to
13//! provide SQL except and intersect semantics.
14//!
15//! We build a plan ([ThresholdPlan]) encapsulating all decisions and requirements on the specific
16//! threshold implementation. The idea is to decouple the logic deciding which plan to select from
17//! the actual implementation of each variant available.
18//!
19//! Currently, we provide two variants:
20//! * The [BasicThresholdPlan] maintains all its outputs as an arrangement. It is beneficial if the
21//!     threshold is the final operation, or a downstream operators expects arranged inputs.
22//! * The [RetractionsThresholdPlan] maintains retractions, i.e. rows that are not in the output. It
23//!     is beneficial to use this operator if the number of retractions is expected to be small, and
24//!     if a potential downstream operator does not expect its input to be arranged.
25
26use mz_expr::{MirScalarExpr, permutation_for_arrangement};
27use mz_proto::{ProtoType, RustType, TryFromProtoError};
28use proptest_derive::Arbitrary;
29use serde::{Deserialize, Serialize};
30
31use crate::plan::{AvailableCollections, any_arranged_thin};
32
33include!(concat!(
34    env!("OUT_DIR"),
35    "/mz_compute_types.plan.threshold.rs"
36));
37
38/// A plan describing how to compute a threshold operation.
39#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
40pub enum ThresholdPlan {
41    /// Basic threshold maintains all positive inputs.
42    Basic(BasicThresholdPlan),
43}
44
45impl RustType<ProtoThresholdPlan> for ThresholdPlan {
46    fn into_proto(&self) -> ProtoThresholdPlan {
47        use proto_threshold_plan::Kind::*;
48        ProtoThresholdPlan {
49            kind: Some(match self {
50                ThresholdPlan::Basic(p) => Basic(p.ensure_arrangement.into_proto()),
51            }),
52        }
53    }
54
55    fn from_proto(proto: ProtoThresholdPlan) -> Result<Self, TryFromProtoError> {
56        use proto_threshold_plan::Kind::*;
57        let kind = proto
58            .kind
59            .ok_or_else(|| TryFromProtoError::missing_field("ProtoThresholdPlan::kind"))?;
60        Ok(match kind {
61            Basic(p) => ThresholdPlan::Basic(BasicThresholdPlan {
62                ensure_arrangement: p.into_rust()?,
63            }),
64        })
65    }
66}
67
68impl RustType<ProtoArrangement> for (Vec<MirScalarExpr>, Vec<usize>, Vec<usize>) {
69    fn into_proto(&self) -> ProtoArrangement {
70        ProtoArrangement {
71            all_columns: self.0.into_proto(),
72            permutation: self.1.iter().map(|x| x.into_proto()).collect(),
73            thinning: self.2.iter().map(|x| x.into_proto()).collect(),
74        }
75    }
76
77    fn from_proto(proto: ProtoArrangement) -> Result<Self, TryFromProtoError> {
78        Ok((
79            proto.all_columns.into_rust()?,
80            proto.permutation.into_rust()?,
81            proto.thinning.into_rust()?,
82        ))
83    }
84}
85
86impl ThresholdPlan {
87    /// Reports all keys of produced arrangements, with optionally
88    /// given types describing the rows that would be in the raw
89    /// form of the collection.
90    ///
91    /// This is likely either an empty vector, for no arrangement,
92    /// or a singleton vector containing the list of expressions
93    /// that key a single arrangement.
94    pub fn keys(&self) -> AvailableCollections {
95        match self {
96            ThresholdPlan::Basic(plan) => {
97                AvailableCollections::new_arranged(vec![plan.ensure_arrangement.clone()])
98            }
99        }
100    }
101}
102
103/// A plan to maintain all inputs with positive counts.
104#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
105pub struct BasicThresholdPlan {
106    /// Description of how the input has been arranged, and how to arrange the output
107    #[proptest(strategy = "any_arranged_thin()")]
108    pub ensure_arrangement: (Vec<MirScalarExpr>, Vec<usize>, Vec<usize>),
109}
110
111/// A plan to maintain all inputs with negative counts, which are subtracted from the output
112/// in order to maintain an equivalent collection compared to [BasicThresholdPlan].
113#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
114pub struct RetractionsThresholdPlan {
115    /// Description of how the input has been arranged
116    #[proptest(strategy = "any_arranged_thin()")]
117    pub ensure_arrangement: (Vec<MirScalarExpr>, Vec<usize>, Vec<usize>),
118}
119
120impl ThresholdPlan {
121    /// Construct the plan from the number of columns (`arity`).
122    ///
123    /// Also returns the arrangement and thinning required for the input.
124    pub fn create_from(arity: usize) -> (Self, (Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)) {
125        // Arrange the input by all columns in order.
126        let mut all_columns = Vec::new();
127        for column in 0..arity {
128            all_columns.push(mz_expr::MirScalarExpr::column(column));
129        }
130        let (permutation, thinning) = permutation_for_arrangement(&all_columns, arity);
131        let ensure_arrangement = (all_columns, permutation, thinning);
132        let plan = ThresholdPlan::Basic(BasicThresholdPlan {
133            ensure_arrangement: ensure_arrangement.clone(),
134        });
135        (plan, ensure_arrangement)
136    }
137}
138
139#[cfg(test)]
140mod tests {
141    use mz_ore::assert_ok;
142    use mz_proto::protobuf_roundtrip;
143    use proptest::prelude::*;
144
145    use super::*;
146
147    proptest! {
148       #[mz_ore::test]
149        fn threshold_plan_protobuf_roundtrip(expect in any::<ThresholdPlan>() ) {
150            let actual = protobuf_roundtrip::<_, ProtoThresholdPlan>(&expect);
151            assert_ok!(actual);
152            assert_eq!(actual.unwrap(), expect);
153        }
154    }
155}