1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
910//! Threshold planning logic.
11//!
12//! The threshold operator produces only rows with a positive cardinality, for example required to
13//! provide SQL except and intersect semantics.
14//!
15//! We build a plan ([ThresholdPlan]) encapsulating all decisions and requirements on the specific
16//! threshold implementation. The idea is to decouple the logic deciding which plan to select from
17//! the actual implementation of each variant available.
18//!
19//! Currently, we provide two variants:
20//! * The [BasicThresholdPlan] maintains all its outputs as an arrangement. It is beneficial if the
21//! threshold is the final operation, or a downstream operators expects arranged inputs.
22//! * The [RetractionsThresholdPlan] maintains retractions, i.e. rows that are not in the output. It
23//! is beneficial to use this operator if the number of retractions is expected to be small, and
24//! if a potential downstream operator does not expect its input to be arranged.
2526use mz_expr::{MirScalarExpr, permutation_for_arrangement};
27use mz_proto::{ProtoType, RustType, TryFromProtoError};
28use mz_repr::ColumnType;
29use proptest_derive::Arbitrary;
30use serde::{Deserialize, Serialize};
3132use crate::plan::{AvailableCollections, any_arranged_thin};
3334include!(concat!(
35env!("OUT_DIR"),
36"/mz_compute_types.plan.threshold.rs"
37));
3839/// A plan describing how to compute a threshold operation.
40#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
41pub enum ThresholdPlan {
42/// Basic threshold maintains all positive inputs.
43Basic(BasicThresholdPlan),
44}
4546impl RustType<ProtoThresholdPlan> for ThresholdPlan {
47fn into_proto(&self) -> ProtoThresholdPlan {
48use proto_threshold_plan::Kind::*;
49 ProtoThresholdPlan {
50 kind: Some(match self {
51 ThresholdPlan::Basic(p) => Basic(p.ensure_arrangement.into_proto()),
52 }),
53 }
54 }
5556fn from_proto(proto: ProtoThresholdPlan) -> Result<Self, TryFromProtoError> {
57use proto_threshold_plan::Kind::*;
58let kind = proto
59 .kind
60 .ok_or_else(|| TryFromProtoError::missing_field("ProtoThresholdPlan::kind"))?;
61Ok(match kind {
62 Basic(p) => ThresholdPlan::Basic(BasicThresholdPlan {
63 ensure_arrangement: p.into_rust()?,
64 }),
65 })
66 }
67}
6869impl RustType<ProtoArrangement> for (Vec<MirScalarExpr>, Vec<usize>, Vec<usize>) {
70fn into_proto(&self) -> ProtoArrangement {
71 ProtoArrangement {
72 all_columns: self.0.into_proto(),
73 permutation: self.1.iter().map(|x| x.into_proto()).collect(),
74 thinning: self.2.iter().map(|x| x.into_proto()).collect(),
75 }
76 }
7778fn from_proto(proto: ProtoArrangement) -> Result<Self, TryFromProtoError> {
79Ok((
80 proto.all_columns.into_rust()?,
81 proto.permutation.into_rust()?,
82 proto.thinning.into_rust()?,
83 ))
84 }
85}
8687impl ThresholdPlan {
88/// Reports all keys of produced arrangements, with optionally
89 /// given types describing the rows that would be in the raw
90 /// form of the collection.
91 ///
92 /// This is likely either an empty vector, for no arrangement,
93 /// or a singleton vector containing the list of expressions
94 /// that key a single arrangement.
95pub fn keys(&self, types: Option<Vec<ColumnType>>) -> AvailableCollections {
96match self {
97 ThresholdPlan::Basic(plan) => {
98 AvailableCollections::new_arranged(vec![plan.ensure_arrangement.clone()], types)
99 }
100 }
101 }
102}
103104/// A plan to maintain all inputs with positive counts.
105#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
106pub struct BasicThresholdPlan {
107/// Description of how the input has been arranged, and how to arrange the output
108#[proptest(strategy = "any_arranged_thin()")]
109pub ensure_arrangement: (Vec<MirScalarExpr>, Vec<usize>, Vec<usize>),
110}
111112/// A plan to maintain all inputs with negative counts, which are subtracted from the output
113/// in order to maintain an equivalent collection compared to [BasicThresholdPlan].
114#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
115pub struct RetractionsThresholdPlan {
116/// Description of how the input has been arranged
117#[proptest(strategy = "any_arranged_thin()")]
118pub ensure_arrangement: (Vec<MirScalarExpr>, Vec<usize>, Vec<usize>),
119}
120121impl ThresholdPlan {
122/// Construct the plan from the number of columns (`arity`).
123 ///
124 /// Also returns the arrangement and thinning required for the input.
125pub fn create_from(arity: usize) -> (Self, (Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)) {
126// Arrange the input by all columns in order.
127let mut all_columns = Vec::new();
128for column in 0..arity {
129 all_columns.push(mz_expr::MirScalarExpr::Column(column));
130 }
131let (permutation, thinning) = permutation_for_arrangement(&all_columns, arity);
132let ensure_arrangement = (all_columns, permutation, thinning);
133let plan = ThresholdPlan::Basic(BasicThresholdPlan {
134 ensure_arrangement: ensure_arrangement.clone(),
135 });
136 (plan, ensure_arrangement)
137 }
138}
139140#[cfg(test)]
141mod tests {
142use mz_ore::assert_ok;
143use mz_proto::protobuf_roundtrip;
144use proptest::prelude::*;
145146use super::*;
147148proptest! {
149#[mz_ore::test]
150fn threshold_plan_protobuf_roundtrip(expect in any::<ThresholdPlan>() ) {
151let actual = protobuf_roundtrip::<_, ProtoThresholdPlan>(&expect);
152assert_ok!(actual);
153assert_eq!(actual.unwrap(), expect);
154 }
155 }
156}