1use std::collections::BTreeSet;
13
14use anyhow::bail;
15use itertools::Itertools;
16use mz_proto::{IntoRustIfSome, RustType, TryFromProtoError};
17use proptest::prelude::any;
18use proptest_derive::Arbitrary;
19use serde::{Deserialize, Serialize};
20use tokio_postgres::types::Oid;
21use tracing::warn;
22
23include!(concat!(env!("OUT_DIR"), "/mz_postgres_util.desc.rs"));
24
25#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
29pub struct PostgresSchemaDesc {
30 pub oid: Oid,
32 pub name: String,
34 pub owner: Oid,
36}
37
38#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
40pub struct PostgresTableDesc {
41 pub oid: Oid,
43 pub namespace: String,
45 pub name: String,
47 #[proptest(strategy = "proptest::collection::vec(any::<PostgresColumnDesc>(), 1..4)")]
49 pub columns: Vec<PostgresColumnDesc>,
50 #[proptest(strategy = "proptest::collection::btree_set(any::<PostgresKeyDesc>(), 1..4)")]
53 pub keys: BTreeSet<PostgresKeyDesc>,
54}
55
56impl PostgresTableDesc {
57 pub fn determine_compatibility(
67 &self,
68 other: &PostgresTableDesc,
69 allow_type_to_change_by_col_num: &BTreeSet<u16>,
70 ) -> Result<(), anyhow::Error> {
71 if self == other {
72 return Ok(());
73 }
74
75 let PostgresTableDesc {
76 oid: other_oid,
77 namespace: other_namespace,
78 name: other_name,
79 columns: other_cols,
80 keys: other_keys,
81 } = other;
82
83 if let Some(prefix) = other_cols.get(0..self.columns.len())
86 && self.columns.iter().zip_eq(prefix.iter()).all(|(s, o)| s.is_compatible(o, allow_type_to_change_by_col_num))
87 && &self.name == other_name
88 && &self.oid == other_oid
89 && &self.namespace == other_namespace
90 && self.keys.difference(other_keys).next().is_none()
92 {
93 Ok(())
94 } else {
95 warn!(
96 "Error validating table in publication. Expected: {:?} Actual: {:?}",
97 &self, other
98 );
99 bail!(
100 "source table {} with oid {} has been altered",
101 self.name,
102 self.oid
103 )
104 }
105 }
106}
107
108impl RustType<ProtoPostgresTableDesc> for PostgresTableDesc {
109 fn into_proto(&self) -> ProtoPostgresTableDesc {
110 ProtoPostgresTableDesc {
111 oid: self.oid,
112 namespace: self.namespace.clone(),
113 name: self.name.clone(),
114 columns: self.columns.iter().map(|c| c.into_proto()).collect(),
115 keys: self.keys.iter().map(PostgresKeyDesc::into_proto).collect(),
116 }
117 }
118
119 fn from_proto(proto: ProtoPostgresTableDesc) -> Result<Self, TryFromProtoError> {
120 Ok(PostgresTableDesc {
121 oid: proto.oid,
122 namespace: proto.namespace.clone(),
123 name: proto.name.clone(),
124 columns: proto
125 .columns
126 .into_iter()
127 .map(PostgresColumnDesc::from_proto)
128 .collect::<Result<_, _>>()?,
129 keys: proto
130 .keys
131 .into_iter()
132 .map(PostgresKeyDesc::from_proto)
133 .collect::<Result<_, _>>()?,
134 })
135 }
136}
137
138#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
140pub struct PostgresColumnDesc {
141 pub name: String,
143 pub col_num: u16,
146 pub type_oid: Oid,
148 pub type_mod: i32,
150 pub nullable: bool,
152}
153
154impl PostgresColumnDesc {
155 fn is_compatible(
162 &self,
163 other: &PostgresColumnDesc,
164 allow_type_to_change_by_col_num: &BTreeSet<u16>,
165 ) -> bool {
166 let allow_type_change = allow_type_to_change_by_col_num.contains(&self.col_num);
167
168 self.name == other.name
169 && self.col_num == other.col_num
170 && (self.type_oid == other.type_oid || allow_type_change)
171 && (self.type_mod == other.type_mod || allow_type_change)
172 && (self.nullable || self.nullable == other.nullable)
177 }
178}
179
180impl RustType<ProtoPostgresColumnDesc> for PostgresColumnDesc {
181 fn into_proto(&self) -> ProtoPostgresColumnDesc {
182 ProtoPostgresColumnDesc {
183 name: self.name.clone(),
184 col_num: Some(self.col_num.into()),
185 type_oid: self.type_oid,
186 type_mod: self.type_mod,
187 nullable: self.nullable,
188 }
189 }
190
191 fn from_proto(proto: ProtoPostgresColumnDesc) -> Result<Self, TryFromProtoError> {
192 Ok(PostgresColumnDesc {
193 name: proto.name,
194 col_num: {
195 let v: u32 = proto
196 .col_num
197 .into_rust_if_some("ProtoPostgresColumnDesc::col_num")?;
198 u16::try_from(v).expect("u16 must roundtrip")
199 },
200 type_oid: proto.type_oid,
201 type_mod: proto.type_mod,
202 nullable: proto.nullable,
203 })
204 }
205}
206
207#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, PartialOrd, Ord, Arbitrary)]
209pub struct PostgresKeyDesc {
210 pub oid: Oid,
212 pub name: String,
214 #[proptest(strategy = "proptest::collection::vec(any::<u16>(), 0..4)")]
217 pub cols: Vec<u16>,
218 pub is_primary: bool,
220 pub nulls_not_distinct: bool,
223}
224
225impl RustType<ProtoPostgresKeyDesc> for PostgresKeyDesc {
226 fn into_proto(&self) -> ProtoPostgresKeyDesc {
227 ProtoPostgresKeyDesc {
228 oid: self.oid,
229 name: self.name.clone(),
230 cols: self.cols.clone().into_iter().map(u32::from).collect(),
231 is_primary: self.is_primary,
232 nulls_not_distinct: self.nulls_not_distinct,
233 }
234 }
235
236 fn from_proto(proto: ProtoPostgresKeyDesc) -> Result<Self, TryFromProtoError> {
237 Ok(PostgresKeyDesc {
238 oid: proto.oid,
239 name: proto.name,
240 cols: proto
241 .cols
242 .into_iter()
243 .map(|c| c.try_into().expect("values roundtrip"))
244 .collect(),
245 is_primary: proto.is_primary,
246 nulls_not_distinct: proto.nulls_not_distinct,
247 })
248 }
249}