1use std::collections::BTreeSet;
11
12use anyhow::bail;
13use mz_proto::{ProtoType, RustType, TryFromProtoError};
14use mz_repr::SqlColumnType;
15#[cfg(any(test, feature = "proptest"))]
16use proptest::prelude::any;
17#[cfg(any(test, feature = "proptest"))]
18use proptest_derive::Arbitrary;
19use serde::{Deserialize, Serialize};
20
21use self::proto_my_sql_column_desc::Meta;
22
23include!(concat!(env!("OUT_DIR"), "/mz_mysql_util.rs"));
24
25#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
26#[cfg_attr(any(test, feature = "proptest"), derive(Arbitrary))]
27pub struct MySqlTableDesc {
28 pub schema_name: String,
30 pub name: String,
32 #[cfg_attr(
38 any(test, feature = "proptest"),
39 proptest(strategy = "proptest::collection::vec(any::<MySqlColumnDesc>(), 0..4)")
40 )]
41 pub columns: Vec<MySqlColumnDesc>,
42 #[cfg_attr(
45 any(test, feature = "proptest"),
46 proptest(strategy = "proptest::collection::btree_set(any::<MySqlKeyDesc>(), 0..4)")
47 )]
48 pub keys: BTreeSet<MySqlKeyDesc>,
49}
50
51impl RustType<ProtoMySqlTableDesc> for MySqlTableDesc {
52 fn into_proto(&self) -> ProtoMySqlTableDesc {
53 ProtoMySqlTableDesc {
54 schema_name: self.schema_name.clone(),
55 name: self.name.clone(),
56 columns: self.columns.iter().map(|c| c.into_proto()).collect(),
57 keys: self.keys.iter().map(|c| c.into_proto()).collect(),
58 }
59 }
60
61 fn from_proto(proto: ProtoMySqlTableDesc) -> Result<Self, TryFromProtoError> {
62 Ok(Self {
63 schema_name: proto.schema_name,
64 name: proto.name,
65 columns: proto
66 .columns
67 .into_iter()
68 .map(MySqlColumnDesc::from_proto)
69 .collect::<Result<_, _>>()?,
70 keys: proto
71 .keys
72 .into_iter()
73 .map(MySqlKeyDesc::from_proto)
74 .collect::<Result<_, _>>()?,
75 })
76 }
77}
78
79impl MySqlTableDesc {
80 pub fn determine_compatibility(
88 &self,
89 other: &MySqlTableDesc,
90 binlog_full_metadata: bool,
91 ) -> Result<(), anyhow::Error> {
92 if self == other {
93 return Ok(());
94 }
95
96 if self.schema_name != other.schema_name || self.name != other.name {
97 bail!(
98 "table name mismatch: self: {}.{}, other: {}.{}",
99 self.schema_name,
100 self.name,
101 other.schema_name,
102 other.name
103 );
104 }
105
106 for (i, self_column) in self.columns.iter().enumerate() {
113 if self_column.column_type.is_none() {
114 continue;
116 }
117 let wire_idx = if !binlog_full_metadata {
118 (i < other.columns.len()).then_some(i)
120 } else {
121 other
124 .columns
125 .iter()
126 .position(|oc| oc.name.as_str() == self_column.name.as_str())
127 };
128
129 let wire_idx = match wire_idx {
130 Some(idx) => idx,
131 None => {
132 return Err(anyhow::anyhow!(
135 "column {} no longer present in table {}",
136 self_column.name,
137 self.name
138 ));
139 }
140 };
141 let other_column = other.columns.get(wire_idx).ok_or_else(|| {
142 anyhow::anyhow!(
143 "column {} no longer present in table {}",
144 self_column.name,
145 self.name
146 )
147 })?;
148 if !self_column.is_compatible(other_column) {
149 bail!(
150 "column {} in table {} has been altered",
151 self_column.name,
152 self.name
153 );
154 }
155 }
156 if self.keys.difference(&other.keys).next().is_some() {
165 bail!(
166 "keys in table {} have been altered: self: {:?}, other: {:?}",
167 self.name,
168 self.keys,
169 other.keys
170 );
171 }
172
173 Ok(())
174 }
175}
176
177#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
178#[cfg_attr(any(test, feature = "proptest"), derive(Arbitrary))]
179pub struct MySqlColumnMetaEnum {
180 #[cfg_attr(
181 any(test, feature = "proptest"),
182 proptest(strategy = "proptest::collection::vec(any::<String>(), 0..3)")
183 )]
184 pub values: Vec<String>,
185}
186
187impl RustType<ProtoMySqlColumnMetaEnum> for MySqlColumnMetaEnum {
188 fn into_proto(&self) -> ProtoMySqlColumnMetaEnum {
189 ProtoMySqlColumnMetaEnum {
190 values: self.values.clone(),
191 }
192 }
193
194 fn from_proto(proto: ProtoMySqlColumnMetaEnum) -> Result<Self, TryFromProtoError> {
195 Ok(Self {
196 values: proto.values,
197 })
198 }
199}
200
201trait IsCompatible {
202 fn is_compatible(&self, other: &Self) -> bool;
203}
204
205#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
206#[cfg_attr(any(test, feature = "proptest"), derive(Arbitrary))]
207pub enum MySqlColumnMeta {
208 Enum(MySqlColumnMetaEnum),
210 Json,
212 Year,
214 Date,
216 Timestamp(u32),
218 Bit(u32),
220}
221
222impl IsCompatible for Option<MySqlColumnMeta> {
223 fn is_compatible(&self, other: &Option<MySqlColumnMeta>) -> bool {
224 match (self, other) {
225 (None, None) => true,
226 (Some(_), None) => false,
227 (None, Some(_)) => false,
228 (Some(MySqlColumnMeta::Enum(self_enum)), Some(MySqlColumnMeta::Enum(other_enum))) => {
229 match other_enum.values.get(0..self_enum.values.len()) {
232 Some(prefix) => self_enum.values == prefix,
233 None => false,
234 }
235 }
236 (Some(MySqlColumnMeta::Json), Some(MySqlColumnMeta::Json)) => true,
237 (Some(MySqlColumnMeta::Year), Some(MySqlColumnMeta::Year)) => true,
238 (Some(MySqlColumnMeta::Date), Some(MySqlColumnMeta::Date)) => true,
239 (
241 Some(MySqlColumnMeta::Timestamp(precision)),
242 Some(MySqlColumnMeta::Timestamp(other_precision)),
243 ) => precision <= other_precision,
244 (Some(MySqlColumnMeta::Bit(_)), Some(MySqlColumnMeta::Bit(_))) => true,
247 _ => false,
248 }
249 }
250}
251
252#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
253#[cfg_attr(any(test, feature = "proptest"), derive(Arbitrary))]
254pub struct MySqlColumnDesc {
255 pub name: String,
257 pub column_type: Option<SqlColumnType>,
260 pub meta: Option<MySqlColumnMeta>,
262}
263
264impl RustType<ProtoMySqlColumnDesc> for MySqlColumnDesc {
265 fn into_proto(&self) -> ProtoMySqlColumnDesc {
266 ProtoMySqlColumnDesc {
267 name: self.name.clone(),
268 column_type: self.column_type.into_proto(),
269 meta: self.meta.as_ref().and_then(|meta| match meta {
270 MySqlColumnMeta::Enum(e) => Some(Meta::Enum(e.into_proto())),
271 MySqlColumnMeta::Json => Some(Meta::Json(ProtoMySqlColumnMetaJson {})),
272 MySqlColumnMeta::Year => Some(Meta::Year(ProtoMySqlColumnMetaYear {})),
273 MySqlColumnMeta::Date => Some(Meta::Date(ProtoMySqlColumnMetaDate {})),
274 MySqlColumnMeta::Timestamp(precision) => {
275 Some(Meta::Timestamp(ProtoMySqlColumnMetaTimestamp {
276 precision: *precision,
277 }))
278 }
279 MySqlColumnMeta::Bit(precision) => Some(Meta::Bit(ProtoMySqlColumnMetaBit {
280 precision: *precision,
281 })),
282 }),
283 }
284 }
285
286 fn from_proto(proto: ProtoMySqlColumnDesc) -> Result<Self, TryFromProtoError> {
287 Ok(Self {
288 name: proto.name,
289 column_type: proto.column_type.into_rust()?,
290 meta: proto
291 .meta
292 .and_then(|meta| match meta {
293 Meta::Enum(e) => Some(
294 MySqlColumnMetaEnum::from_proto(e)
295 .and_then(|e| Ok(MySqlColumnMeta::Enum(e))),
296 ),
297 Meta::Json(_) => Some(Ok(MySqlColumnMeta::Json)),
298 Meta::Year(_) => Some(Ok(MySqlColumnMeta::Year)),
299 Meta::Date(_) => Some(Ok(MySqlColumnMeta::Date)),
300 Meta::Timestamp(e) => Some(Ok(MySqlColumnMeta::Timestamp(e.precision))),
301 Meta::Bit(e) => Some(Ok(MySqlColumnMeta::Bit(e.precision))),
302 })
303 .transpose()?,
304 })
305 }
306}
307
308impl IsCompatible for MySqlColumnDesc {
309 fn is_compatible(&self, other: &MySqlColumnDesc) -> bool {
312 self.name == other.name
313 && match (&self.column_type, &other.column_type) {
314 (None, None) => true,
315 (Some(self_type), Some(other_type)) => {
316 self_type.scalar_type == other_type.scalar_type
317 && (self_type.nullable || self_type.nullable == other_type.nullable)
322 }
323 (Some(_), None) => false,
324 (None, Some(_)) => false,
325 }
326 && self.meta.is_compatible(&other.meta)
328 }
329}
330
331#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Ord, PartialOrd)]
332#[cfg_attr(any(test, feature = "proptest"), derive(Arbitrary))]
333pub struct MySqlKeyDesc {
334 pub name: String,
336 pub is_primary: bool,
338 #[cfg_attr(
340 any(test, feature = "proptest"),
341 proptest(strategy = "proptest::collection::vec(any::<String>(), 0..4)")
342 )]
343 pub columns: Vec<String>,
344}
345
346impl RustType<ProtoMySqlKeyDesc> for MySqlKeyDesc {
347 fn into_proto(&self) -> ProtoMySqlKeyDesc {
348 ProtoMySqlKeyDesc {
349 name: self.name.clone(),
350 is_primary: self.is_primary.clone(),
351 columns: self.columns.clone(),
352 }
353 }
354
355 fn from_proto(proto: ProtoMySqlKeyDesc) -> Result<Self, TryFromProtoError> {
356 Ok(Self {
357 name: proto.name,
358 is_primary: proto.is_primary,
359 columns: proto.columns,
360 })
361 }
362}