mz_compute_types/plan/
threshold.rs1use mz_expr::{MirScalarExpr, permutation_for_arrangement};
27use mz_proto::{ProtoType, RustType, TryFromProtoError};
28use proptest_derive::Arbitrary;
29use serde::{Deserialize, Serialize};
30
31use crate::plan::{AvailableCollections, any_arranged_thin};
32
33include!(concat!(
34 env!("OUT_DIR"),
35 "/mz_compute_types.plan.threshold.rs"
36));
37
38#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
40pub enum ThresholdPlan {
41 Basic(BasicThresholdPlan),
43}
44
45impl RustType<ProtoThresholdPlan> for ThresholdPlan {
46 fn into_proto(&self) -> ProtoThresholdPlan {
47 use proto_threshold_plan::Kind::*;
48 ProtoThresholdPlan {
49 kind: Some(match self {
50 ThresholdPlan::Basic(p) => Basic(p.ensure_arrangement.into_proto()),
51 }),
52 }
53 }
54
55 fn from_proto(proto: ProtoThresholdPlan) -> Result<Self, TryFromProtoError> {
56 use proto_threshold_plan::Kind::*;
57 let kind = proto
58 .kind
59 .ok_or_else(|| TryFromProtoError::missing_field("ProtoThresholdPlan::kind"))?;
60 Ok(match kind {
61 Basic(p) => ThresholdPlan::Basic(BasicThresholdPlan {
62 ensure_arrangement: p.into_rust()?,
63 }),
64 })
65 }
66}
67
68impl RustType<ProtoArrangement> for (Vec<MirScalarExpr>, Vec<usize>, Vec<usize>) {
69 fn into_proto(&self) -> ProtoArrangement {
70 ProtoArrangement {
71 all_columns: self.0.into_proto(),
72 permutation: self.1.iter().map(|x| x.into_proto()).collect(),
73 thinning: self.2.iter().map(|x| x.into_proto()).collect(),
74 }
75 }
76
77 fn from_proto(proto: ProtoArrangement) -> Result<Self, TryFromProtoError> {
78 Ok((
79 proto.all_columns.into_rust()?,
80 proto.permutation.into_rust()?,
81 proto.thinning.into_rust()?,
82 ))
83 }
84}
85
86impl ThresholdPlan {
87 pub fn keys(&self) -> AvailableCollections {
95 match self {
96 ThresholdPlan::Basic(plan) => {
97 AvailableCollections::new_arranged(vec![plan.ensure_arrangement.clone()])
98 }
99 }
100 }
101}
102
103#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
105pub struct BasicThresholdPlan {
106 #[proptest(strategy = "any_arranged_thin()")]
108 pub ensure_arrangement: (Vec<MirScalarExpr>, Vec<usize>, Vec<usize>),
109}
110
111#[derive(Arbitrary, Clone, Debug, Serialize, Deserialize, Eq, PartialEq, Ord, PartialOrd)]
114pub struct RetractionsThresholdPlan {
115 #[proptest(strategy = "any_arranged_thin()")]
117 pub ensure_arrangement: (Vec<MirScalarExpr>, Vec<usize>, Vec<usize>),
118}
119
120impl ThresholdPlan {
121 pub fn create_from(arity: usize) -> (Self, (Vec<MirScalarExpr>, Vec<usize>, Vec<usize>)) {
125 let mut all_columns = Vec::new();
127 for column in 0..arity {
128 all_columns.push(mz_expr::MirScalarExpr::column(column));
129 }
130 let (permutation, thinning) = permutation_for_arrangement(&all_columns, arity);
131 let ensure_arrangement = (all_columns, permutation, thinning);
132 let plan = ThresholdPlan::Basic(BasicThresholdPlan {
133 ensure_arrangement: ensure_arrangement.clone(),
134 });
135 (plan, ensure_arrangement)
136 }
137}
138
139#[cfg(test)]
140mod tests {
141 use mz_ore::assert_ok;
142 use mz_proto::protobuf_roundtrip;
143 use proptest::prelude::*;
144
145 use super::*;
146
147 proptest! {
148 #[mz_ore::test]
149 fn threshold_plan_protobuf_roundtrip(expect in any::<ThresholdPlan>() ) {
150 let actual = protobuf_roundtrip::<_, ProtoThresholdPlan>(&expect);
151 assert_ok!(actual);
152 assert_eq!(actual.unwrap(), expect);
153 }
154 }
155}