1use std::collections::{BTreeMap, BTreeSet};
13
14use anyhow::bail;
15use mz_proto::{IntoRustIfSome, RustType, TryFromProtoError};
16use proptest::prelude::any;
17use proptest_derive::Arbitrary;
18use serde::{Deserialize, Serialize};
19use tokio_postgres::types::Oid;
20use tracing::warn;
21
22include!(concat!(env!("OUT_DIR"), "/mz_postgres_util.desc.rs"));
23
24#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
28pub struct PostgresSchemaDesc {
29 pub oid: Oid,
31 pub name: String,
33 pub owner: Oid,
35}
36
37#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
39pub struct PostgresTableDesc {
40 pub oid: Oid,
42 pub namespace: String,
44 pub name: String,
46 #[proptest(strategy = "proptest::collection::vec(any::<PostgresColumnDesc>(), 1..4)")]
48 pub columns: Vec<PostgresColumnDesc>,
49 #[proptest(strategy = "proptest::collection::btree_set(any::<PostgresKeyDesc>(), 1..4)")]
52 pub keys: BTreeSet<PostgresKeyDesc>,
53}
54
55impl PostgresTableDesc {
56 pub fn determine_compatibility(
66 &self,
67 other: &PostgresTableDesc,
68 allow_type_to_change_by_col_num: &BTreeSet<u16>,
69 ) -> Result<(), anyhow::Error> {
70 if self == other {
71 return Ok(());
72 }
73
74 let PostgresTableDesc {
75 oid: other_oid,
76 namespace: other_namespace,
77 name: other_name,
78 columns: other_cols,
79 keys: other_keys,
80 } = other;
81
82 let other_cols_by_name = BTreeMap::from_iter(other_cols.iter().map(|c| (&c.name, c)));
83 let columns_compatible =
84 self.columns
85 .iter()
86 .all(|info| match other_cols_by_name.get(&info.name) {
87 Some(other_info) => {
88 let allow_type_change =
89 allow_type_to_change_by_col_num.contains(&info.col_num);
90 info.is_compatible(other_info, allow_type_change)
91 }
92 None => false,
93 });
94
95 if columns_compatible
96 && &self.name == other_name
97 && &self.oid == other_oid
98 && &self.namespace == other_namespace
99 && self.keys.difference(other_keys).next().is_none()
101 {
102 Ok(())
103 } else {
104 warn!(
105 "Error validating table in publication. Expected: {:?} Actual: {:?}",
106 &self, other
107 );
108 bail!(
109 "source table {} with oid {} has been altered",
110 self.name,
111 self.oid
112 )
113 }
114 }
115}
116
117impl RustType<ProtoPostgresTableDesc> for PostgresTableDesc {
118 fn into_proto(&self) -> ProtoPostgresTableDesc {
119 ProtoPostgresTableDesc {
120 oid: self.oid,
121 namespace: self.namespace.clone(),
122 name: self.name.clone(),
123 columns: self.columns.iter().map(|c| c.into_proto()).collect(),
124 keys: self.keys.iter().map(PostgresKeyDesc::into_proto).collect(),
125 }
126 }
127
128 fn from_proto(proto: ProtoPostgresTableDesc) -> Result<Self, TryFromProtoError> {
129 Ok(PostgresTableDesc {
130 oid: proto.oid,
131 namespace: proto.namespace.clone(),
132 name: proto.name.clone(),
133 columns: proto
134 .columns
135 .into_iter()
136 .map(PostgresColumnDesc::from_proto)
137 .collect::<Result<_, _>>()?,
138 keys: proto
139 .keys
140 .into_iter()
141 .map(PostgresKeyDesc::from_proto)
142 .collect::<Result<_, _>>()?,
143 })
144 }
145}
146
147#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Arbitrary)]
149pub struct PostgresColumnDesc {
150 pub name: String,
152 pub col_num: u16,
155 pub type_oid: Oid,
157 pub type_mod: i32,
159 pub nullable: bool,
161}
162
163impl PostgresColumnDesc {
164 fn is_compatible(&self, other: &PostgresColumnDesc, allow_type_change: bool) -> bool {
171 self.name == other.name
172 && self.col_num == other.col_num
173 && (self.type_oid == other.type_oid || allow_type_change)
174 && (self.type_mod == other.type_mod || allow_type_change)
175 && (self.nullable || self.nullable == other.nullable)
180 }
181}
182
183impl RustType<ProtoPostgresColumnDesc> for PostgresColumnDesc {
184 fn into_proto(&self) -> ProtoPostgresColumnDesc {
185 ProtoPostgresColumnDesc {
186 name: self.name.clone(),
187 col_num: Some(self.col_num.into()),
188 type_oid: self.type_oid,
189 type_mod: self.type_mod,
190 nullable: self.nullable,
191 }
192 }
193
194 fn from_proto(proto: ProtoPostgresColumnDesc) -> Result<Self, TryFromProtoError> {
195 Ok(PostgresColumnDesc {
196 name: proto.name,
197 col_num: {
198 let v: u32 = proto
199 .col_num
200 .into_rust_if_some("ProtoPostgresColumnDesc::col_num")?;
201 u16::try_from(v).expect("u16 must roundtrip")
202 },
203 type_oid: proto.type_oid,
204 type_mod: proto.type_mod,
205 nullable: proto.nullable,
206 })
207 }
208}
209
210#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, PartialOrd, Ord, Arbitrary)]
212pub struct PostgresKeyDesc {
213 pub oid: Oid,
215 pub name: String,
217 #[proptest(strategy = "proptest::collection::vec(any::<u16>(), 0..4)")]
220 pub cols: Vec<u16>,
221 pub is_primary: bool,
223 pub nulls_not_distinct: bool,
226}
227
228impl RustType<ProtoPostgresKeyDesc> for PostgresKeyDesc {
229 fn into_proto(&self) -> ProtoPostgresKeyDesc {
230 ProtoPostgresKeyDesc {
231 oid: self.oid,
232 name: self.name.clone(),
233 cols: self.cols.clone().into_iter().map(u32::from).collect(),
234 is_primary: self.is_primary,
235 nulls_not_distinct: self.nulls_not_distinct,
236 }
237 }
238
239 fn from_proto(proto: ProtoPostgresKeyDesc) -> Result<Self, TryFromProtoError> {
240 Ok(PostgresKeyDesc {
241 oid: proto.oid,
242 name: proto.name,
243 cols: proto
244 .cols
245 .into_iter()
246 .map(|c| c.try_into().expect("values roundtrip"))
247 .collect(),
248 is_primary: proto.is_primary,
249 nulls_not_distinct: proto.nulls_not_distinct,
250 })
251 }
252}