1use std::collections::BTreeSet;
13
14use anyhow::bail;
15use mz_proto::{IntoRustIfSome, RustType, TryFromProtoError};
16use proptest::prelude::any;
17use proptest_derive::Arbitrary;
18use serde::{Deserialize, Serialize};
19use tokio_postgres::types::Oid;
20use tracing::warn;
21
22include!(concat!(env!("OUT_DIR"), "/mz_postgres_util.desc.rs"));
23
24#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
28pub struct PostgresSchemaDesc {
29 pub oid: Oid,
31 pub name: String,
33 pub owner: Oid,
35}
36
37#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
39pub struct PostgresTableDesc {
40 pub oid: Oid,
42 pub namespace: String,
44 pub name: String,
46 #[proptest(strategy = "proptest::collection::vec(any::<PostgresColumnDesc>(), 1..4)")]
48 pub columns: Vec<PostgresColumnDesc>,
49 #[proptest(strategy = "proptest::collection::btree_set(any::<PostgresKeyDesc>(), 1..4)")]
52 pub keys: BTreeSet<PostgresKeyDesc>,
53}
54
55impl PostgresTableDesc {
56 pub fn determine_compatibility(
66 &self,
67 other: &PostgresTableDesc,
68 allow_type_to_change_by_col_num: &BTreeSet<u16>,
69 ) -> Result<(), anyhow::Error> {
70 if self == other {
71 return Ok(());
72 }
73
74 let PostgresTableDesc {
75 oid: other_oid,
76 namespace: other_namespace,
77 name: other_name,
78 columns: other_cols,
79 keys: other_keys,
80 } = other;
81
82 if self.columns.len() <= other_cols.len()
85 && self.columns.iter().zip(other_cols.iter()).all(|(s, o)| s.is_compatible(o, allow_type_to_change_by_col_num))
86 && &self.name == other_name
87 && &self.oid == other_oid
88 && &self.namespace == other_namespace
89 && self.keys.difference(other_keys).next().is_none()
91 {
92 Ok(())
93 } else {
94 warn!(
95 "Error validating table in publication. Expected: {:?} Actual: {:?}",
96 &self, other
97 );
98 bail!(
99 "source table {} with oid {} has been altered",
100 self.name,
101 self.oid
102 )
103 }
104 }
105}
106
107impl RustType<ProtoPostgresTableDesc> for PostgresTableDesc {
108 fn into_proto(&self) -> ProtoPostgresTableDesc {
109 ProtoPostgresTableDesc {
110 oid: self.oid,
111 namespace: self.namespace.clone(),
112 name: self.name.clone(),
113 columns: self.columns.iter().map(|c| c.into_proto()).collect(),
114 keys: self.keys.iter().map(PostgresKeyDesc::into_proto).collect(),
115 }
116 }
117
118 fn from_proto(proto: ProtoPostgresTableDesc) -> Result<Self, TryFromProtoError> {
119 Ok(PostgresTableDesc {
120 oid: proto.oid,
121 namespace: proto.namespace.clone(),
122 name: proto.name.clone(),
123 columns: proto
124 .columns
125 .into_iter()
126 .map(PostgresColumnDesc::from_proto)
127 .collect::<Result<_, _>>()?,
128 keys: proto
129 .keys
130 .into_iter()
131 .map(PostgresKeyDesc::from_proto)
132 .collect::<Result<_, _>>()?,
133 })
134 }
135}
136
137#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
139pub struct PostgresColumnDesc {
140 pub name: String,
142 pub col_num: u16,
145 pub type_oid: Oid,
147 pub type_mod: i32,
149 pub nullable: bool,
151}
152
153impl PostgresColumnDesc {
154 fn is_compatible(
161 &self,
162 other: &PostgresColumnDesc,
163 allow_type_to_change_by_col_num: &BTreeSet<u16>,
164 ) -> bool {
165 let allow_type_change = allow_type_to_change_by_col_num.contains(&self.col_num);
166
167 self.name == other.name
168 && self.col_num == other.col_num
169 && (self.type_oid == other.type_oid || allow_type_change)
170 && (self.type_mod == other.type_mod || allow_type_change)
171 && (self.nullable || self.nullable == other.nullable)
176 }
177}
178
179impl RustType<ProtoPostgresColumnDesc> for PostgresColumnDesc {
180 fn into_proto(&self) -> ProtoPostgresColumnDesc {
181 ProtoPostgresColumnDesc {
182 name: self.name.clone(),
183 col_num: Some(self.col_num.into()),
184 type_oid: self.type_oid,
185 type_mod: self.type_mod,
186 nullable: self.nullable,
187 }
188 }
189
190 fn from_proto(proto: ProtoPostgresColumnDesc) -> Result<Self, TryFromProtoError> {
191 Ok(PostgresColumnDesc {
192 name: proto.name,
193 col_num: {
194 let v: u32 = proto
195 .col_num
196 .into_rust_if_some("ProtoPostgresColumnDesc::col_num")?;
197 u16::try_from(v).expect("u16 must roundtrip")
198 },
199 type_oid: proto.type_oid,
200 type_mod: proto.type_mod,
201 nullable: proto.nullable,
202 })
203 }
204}
205
206#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, PartialOrd, Ord, Arbitrary)]
208pub struct PostgresKeyDesc {
209 pub oid: Oid,
211 pub name: String,
213 #[proptest(strategy = "proptest::collection::vec(any::<u16>(), 0..4)")]
216 pub cols: Vec<u16>,
217 pub is_primary: bool,
219 pub nulls_not_distinct: bool,
222}
223
224impl RustType<ProtoPostgresKeyDesc> for PostgresKeyDesc {
225 fn into_proto(&self) -> ProtoPostgresKeyDesc {
226 ProtoPostgresKeyDesc {
227 oid: self.oid,
228 name: self.name.clone(),
229 cols: self.cols.clone().into_iter().map(u32::from).collect(),
230 is_primary: self.is_primary,
231 nulls_not_distinct: self.nulls_not_distinct,
232 }
233 }
234
235 fn from_proto(proto: ProtoPostgresKeyDesc) -> Result<Self, TryFromProtoError> {
236 Ok(PostgresKeyDesc {
237 oid: proto.oid,
238 name: proto.name,
239 cols: proto
240 .cols
241 .into_iter()
242 .map(|c| c.try_into().expect("values roundtrip"))
243 .collect(),
244 is_primary: proto.is_primary,
245 nulls_not_distinct: proto.nulls_not_distinct,
246 })
247 }
248}