1use std::collections::{BTreeMap, BTreeSet};
13
14use anyhow::bail;
15use mz_proto::{IntoRustIfSome, RustType, TryFromProtoError};
16use proptest::prelude::any;
17use proptest_derive::Arbitrary;
18use serde::{Deserialize, Serialize};
19use tokio_postgres::types::Oid;
20use tracing::warn;
21
22include!(concat!(env!("OUT_DIR"), "/mz_postgres_util.desc.rs"));
23
24#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
28pub struct PostgresSchemaDesc {
29 pub oid: Oid,
31 pub name: String,
33 pub owner: Oid,
35}
36
37#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
39pub struct PostgresTableDesc {
40 pub oid: Oid,
42 pub namespace: String,
44 pub name: String,
46 #[proptest(strategy = "proptest::collection::vec(any::<PostgresColumnDesc>(), 1..4)")]
48 pub columns: Vec<PostgresColumnDesc>,
49 #[proptest(strategy = "proptest::collection::btree_set(any::<PostgresKeyDesc>(), 1..4)")]
52 pub keys: BTreeSet<PostgresKeyDesc>,
53}
54
55impl PostgresTableDesc {
56 pub fn determine_compatibility(
66 &self,
67 other: &PostgresTableDesc,
68 allow_type_to_change_by_col_num: &BTreeSet<u16>,
69 ) -> Result<(), anyhow::Error> {
70 if self == other {
71 return Ok(());
72 }
73
74 let PostgresTableDesc {
75 oid: other_oid,
76 namespace: other_namespace,
77 name: other_name,
78 columns: other_cols,
79 keys: other_keys,
80 } = other;
81
82 let other_cols_by_name = BTreeMap::from_iter(other_cols.iter().map(|c| (&c.name, c)));
83 let columns_compatible =
84 self.columns
85 .iter()
86 .all(|info| match other_cols_by_name.get(&info.name) {
87 Some(other_info) => {
88 let allow_type_change =
89 allow_type_to_change_by_col_num.contains(&info.col_num);
90 info.is_compatible(other_info, allow_type_change)
91 }
92 None => false,
93 });
94
95 if columns_compatible
96 && &self.name == other_name
97 && &self.oid == other_oid
98 && &self.namespace == other_namespace
99 && self.keys.difference(other_keys).next().is_none()
101 {
102 Ok(())
103 } else {
104 warn!(
105 "Error validating table in publication. Expected: {:?} Actual: {:?}",
106 &self, other
107 );
108 bail!(
109 "source table {} with oid {} has been altered",
110 self.name,
111 self.oid
112 )
113 }
114 }
115}
116
117impl RustType<ProtoPostgresTableDesc> for PostgresTableDesc {
118 fn into_proto(&self) -> ProtoPostgresTableDesc {
119 ProtoPostgresTableDesc {
120 oid: self.oid,
121 namespace: self.namespace.clone(),
122 name: self.name.clone(),
123 columns: self.columns.iter().map(|c| c.into_proto()).collect(),
124 keys: self.keys.iter().map(PostgresKeyDesc::into_proto).collect(),
125 }
126 }
127
128 fn from_proto(proto: ProtoPostgresTableDesc) -> Result<Self, TryFromProtoError> {
129 Ok(PostgresTableDesc {
130 oid: proto.oid,
131 namespace: proto.namespace.clone(),
132 name: proto.name.clone(),
133 columns: proto
134 .columns
135 .into_iter()
136 .map(PostgresColumnDesc::from_proto)
137 .collect::<Result<_, _>>()?,
138 keys: proto
139 .keys
140 .into_iter()
141 .map(PostgresKeyDesc::from_proto)
142 .collect::<Result<_, _>>()?,
143 })
144 }
145}
146
147#[derive(
149 Debug,
150 Clone,
151 Eq,
152 PartialEq,
153 Ord,
154 PartialOrd,
155 Serialize,
156 Deserialize,
157 Arbitrary
158)]
159pub struct PostgresColumnDesc {
160 pub name: String,
162 pub col_num: u16,
165 pub type_oid: Oid,
167 pub type_mod: i32,
169 pub nullable: bool,
171}
172
173impl PostgresColumnDesc {
174 fn is_compatible(&self, other: &PostgresColumnDesc, allow_type_change: bool) -> bool {
181 self.name == other.name
182 && self.col_num == other.col_num
183 && (self.type_oid == other.type_oid || allow_type_change)
184 && (self.type_mod == other.type_mod || allow_type_change)
185 && (self.nullable || self.nullable == other.nullable)
190 }
191}
192
193impl RustType<ProtoPostgresColumnDesc> for PostgresColumnDesc {
194 fn into_proto(&self) -> ProtoPostgresColumnDesc {
195 ProtoPostgresColumnDesc {
196 name: self.name.clone(),
197 col_num: Some(self.col_num.into()),
198 type_oid: self.type_oid,
199 type_mod: self.type_mod,
200 nullable: self.nullable,
201 }
202 }
203
204 fn from_proto(proto: ProtoPostgresColumnDesc) -> Result<Self, TryFromProtoError> {
205 Ok(PostgresColumnDesc {
206 name: proto.name,
207 col_num: {
208 let v: u32 = proto
209 .col_num
210 .into_rust_if_some("ProtoPostgresColumnDesc::col_num")?;
211 u16::try_from(v).expect("u16 must roundtrip")
212 },
213 type_oid: proto.type_oid,
214 type_mod: proto.type_mod,
215 nullable: proto.nullable,
216 })
217 }
218}
219
220#[derive(
222 Debug,
223 Clone,
224 Eq,
225 PartialEq,
226 Serialize,
227 Deserialize,
228 PartialOrd,
229 Ord,
230 Arbitrary
231)]
232pub struct PostgresKeyDesc {
233 pub oid: Oid,
235 pub name: String,
237 #[proptest(strategy = "proptest::collection::vec(any::<u16>(), 0..4)")]
240 pub cols: Vec<u16>,
241 pub is_primary: bool,
243 pub nulls_not_distinct: bool,
246}
247
248impl RustType<ProtoPostgresKeyDesc> for PostgresKeyDesc {
249 fn into_proto(&self) -> ProtoPostgresKeyDesc {
250 ProtoPostgresKeyDesc {
251 oid: self.oid,
252 name: self.name.clone(),
253 cols: self.cols.clone().into_iter().map(u32::from).collect(),
254 is_primary: self.is_primary,
255 nulls_not_distinct: self.nulls_not_distinct,
256 }
257 }
258
259 fn from_proto(proto: ProtoPostgresKeyDesc) -> Result<Self, TryFromProtoError> {
260 Ok(PostgresKeyDesc {
261 oid: proto.oid,
262 name: proto.name,
263 cols: proto
264 .cols
265 .into_iter()
266 .map(|c| c.try_into().expect("values roundtrip"))
267 .collect(),
268 is_primary: proto.is_primary,
269 nulls_not_distinct: proto.nulls_not_distinct,
270 })
271 }
272}