Skip to main content

mz_mysql_util/
desc.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeSet;
11
12use anyhow::bail;
13use mz_proto::{ProtoType, RustType, TryFromProtoError};
14use mz_repr::SqlColumnType;
15#[cfg(any(test, feature = "proptest"))]
16use proptest::prelude::any;
17#[cfg(any(test, feature = "proptest"))]
18use proptest_derive::Arbitrary;
19use serde::{Deserialize, Serialize};
20
21use self::proto_my_sql_column_desc::Meta;
22
23include!(concat!(env!("OUT_DIR"), "/mz_mysql_util.rs"));
24
25#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
26#[cfg_attr(any(test, feature = "proptest"), derive(Arbitrary))]
27pub struct MySqlTableDesc {
28    /// In MySQL the schema and database of a table are synonymous.
29    pub schema_name: String,
30    /// The name of the table.
31    pub name: String,
32    /// Columns for the table
33    ///
34    /// The index of each column is based on its `ordinal_position`
35    /// reported by the information_schema.columns table, which defines
36    /// the order of column values when received in a row.
37    #[cfg_attr(
38        any(test, feature = "proptest"),
39        proptest(strategy = "proptest::collection::vec(any::<MySqlColumnDesc>(), 0..4)")
40    )]
41    pub columns: Vec<MySqlColumnDesc>,
42    /// Applicable keys for this table (i.e. primary key and unique
43    /// constraints).
44    #[cfg_attr(
45        any(test, feature = "proptest"),
46        proptest(strategy = "proptest::collection::btree_set(any::<MySqlKeyDesc>(), 0..4)")
47    )]
48    pub keys: BTreeSet<MySqlKeyDesc>,
49}
50
51impl RustType<ProtoMySqlTableDesc> for MySqlTableDesc {
52    fn into_proto(&self) -> ProtoMySqlTableDesc {
53        ProtoMySqlTableDesc {
54            schema_name: self.schema_name.clone(),
55            name: self.name.clone(),
56            columns: self.columns.iter().map(|c| c.into_proto()).collect(),
57            keys: self.keys.iter().map(|c| c.into_proto()).collect(),
58        }
59    }
60
61    fn from_proto(proto: ProtoMySqlTableDesc) -> Result<Self, TryFromProtoError> {
62        Ok(Self {
63            schema_name: proto.schema_name,
64            name: proto.name,
65            columns: proto
66                .columns
67                .into_iter()
68                .map(MySqlColumnDesc::from_proto)
69                .collect::<Result<_, _>>()?,
70            keys: proto
71                .keys
72                .into_iter()
73                .map(MySqlKeyDesc::from_proto)
74                .collect::<Result<_, _>>()?,
75        })
76    }
77}
78
79impl MySqlTableDesc {
80    /// Determines if two `MySqlTableDesc` are compatible with one another in
81    /// a way that Materialize can handle.
82    ///
83    /// Currently this means that the values are equal except for the following
84    /// exceptions:
85    /// - `self`'s columns are a prefix of `other`'s columns.
86    /// - `self`'s keys are all present in `other`
87    pub fn determine_compatibility(
88        &self,
89        other: &MySqlTableDesc,
90        binlog_full_metadata: bool,
91    ) -> Result<(), anyhow::Error> {
92        if self == other {
93            return Ok(());
94        }
95
96        if self.schema_name != other.schema_name || self.name != other.name {
97            bail!(
98                "table name mismatch: self: {}.{}, other: {}.{}",
99                self.schema_name,
100                self.name,
101                other.schema_name,
102                other.name
103            );
104        }
105
106        // In the case that we don't have full binlog row metadata, `columns` is ordered by the
107        // ordinal position of each column in the table, so as long as `self.columns` is a
108        // compatible prefix of `other.columns`, we can ignore extra columns from `other.columns`.
109        //
110        // If we do have full metadata, then we can match columns by name and just check that all
111        // columns in `self.columns` are present and compatible with columns in `other.columns`.
112        for (i, self_column) in self.columns.iter().enumerate() {
113            if self_column.column_type.is_none() {
114                // This is an excluded column and can be ignored.
115                continue;
116            }
117            let wire_idx = if !binlog_full_metadata {
118                // No column name metadata, so we match by index.
119                (i < other.columns.len()).then_some(i)
120            } else {
121                // This means the row from the binlog has column name included in the metadata,
122                // so we can match on that instead of position.
123                other
124                    .columns
125                    .iter()
126                    .position(|oc| oc.name.as_str() == self_column.name.as_str())
127            };
128
129            let wire_idx = match wire_idx {
130                Some(idx) => idx,
131                None => {
132                    // We could not find a column in the incoming row that matches this descriptor column.
133                    // This is an error as the column is not ignored (ignored columns have already been skipped).
134                    return Err(anyhow::anyhow!(
135                        "column {} no longer present in table {}",
136                        self_column.name,
137                        self.name
138                    ));
139                }
140            };
141            let other_column = other.columns.get(wire_idx).ok_or_else(|| {
142                anyhow::anyhow!(
143                    "column {} no longer present in table {}",
144                    self_column.name,
145                    self.name
146                )
147            })?;
148            if !self_column.is_compatible(other_column) {
149                bail!(
150                    "column {} in table {} has been altered",
151                    self_column.name,
152                    self.name
153                );
154            }
155        }
156        // Our keys are all still present in exactly the same shape.
157        // TODO: Implement a more relaxed key compatibility check:
158        // We should check that for all keys that we know about there exists an upstream key whose
159        // set of columns is a subset of the set of columns of the key we know about. For example
160        // if we had previously discovered that the table had two compound unique keys, key1 made
161        // up of columns (a, b) and key2 made up of columns (a, c) but now the table only has a
162        // single unique key of just the column a then it's compatible because {a} ⊆ {a, b} and
163        // {a} ⊆ {a, c}.
164        if self.keys.difference(&other.keys).next().is_some() {
165            bail!(
166                "keys in table {} have been altered: self: {:?}, other: {:?}",
167                self.name,
168                self.keys,
169                other.keys
170            );
171        }
172
173        Ok(())
174    }
175}
176
177#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
178#[cfg_attr(any(test, feature = "proptest"), derive(Arbitrary))]
179pub struct MySqlColumnMetaEnum {
180    #[cfg_attr(
181        any(test, feature = "proptest"),
182        proptest(strategy = "proptest::collection::vec(any::<String>(), 0..3)")
183    )]
184    pub values: Vec<String>,
185}
186
187impl RustType<ProtoMySqlColumnMetaEnum> for MySqlColumnMetaEnum {
188    fn into_proto(&self) -> ProtoMySqlColumnMetaEnum {
189        ProtoMySqlColumnMetaEnum {
190            values: self.values.clone(),
191        }
192    }
193
194    fn from_proto(proto: ProtoMySqlColumnMetaEnum) -> Result<Self, TryFromProtoError> {
195        Ok(Self {
196            values: proto.values,
197        })
198    }
199}
200
201trait IsCompatible {
202    fn is_compatible(&self, other: &Self) -> bool;
203}
204
205#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
206#[cfg_attr(any(test, feature = "proptest"), derive(Arbitrary))]
207pub enum MySqlColumnMeta {
208    /// The described column is an enum, with the given possible values.
209    Enum(MySqlColumnMetaEnum),
210    /// The described column is a json value.
211    Json,
212    /// The described column is a year value.
213    Year,
214    /// The described column is a date value.
215    Date,
216    /// The described column is a timestamp value with a set precision.
217    Timestamp(u32),
218    /// The described column is a `bit` column, with the given possibly precision.
219    Bit(u32),
220}
221
222impl IsCompatible for Option<MySqlColumnMeta> {
223    fn is_compatible(&self, other: &Option<MySqlColumnMeta>) -> bool {
224        match (self, other) {
225            (None, None) => true,
226            (Some(_), None) => false,
227            (None, Some(_)) => false,
228            (Some(MySqlColumnMeta::Enum(self_enum)), Some(MySqlColumnMeta::Enum(other_enum))) => {
229                // so as long as `self.values` is a compatible prefix of `other.values`, we can
230                // ignore extra values from `other.values`.
231                match other_enum.values.get(0..self_enum.values.len()) {
232                    Some(prefix) => self_enum.values == prefix,
233                    None => false,
234                }
235            }
236            (Some(MySqlColumnMeta::Json), Some(MySqlColumnMeta::Json)) => true,
237            (Some(MySqlColumnMeta::Year), Some(MySqlColumnMeta::Year)) => true,
238            (Some(MySqlColumnMeta::Date), Some(MySqlColumnMeta::Date)) => true,
239            // Timestamps are compatible as long as we don't lose precision
240            (
241                Some(MySqlColumnMeta::Timestamp(precision)),
242                Some(MySqlColumnMeta::Timestamp(other_precision)),
243            ) => precision <= other_precision,
244            // We always cast bit columns to u64's and the max precision of a bit column
245            // is 64 bits, so any bit column is always compatible with another.
246            (Some(MySqlColumnMeta::Bit(_)), Some(MySqlColumnMeta::Bit(_))) => true,
247            _ => false,
248        }
249    }
250}
251
252#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
253#[cfg_attr(any(test, feature = "proptest"), derive(Arbitrary))]
254pub struct MySqlColumnDesc {
255    /// The name of the column.
256    pub name: String,
257    /// The intended data type of this column within Materialize
258    /// If this is None, the column is intended to be skipped within Materialize
259    pub column_type: Option<SqlColumnType>,
260    /// Optional metadata about the column that may be necessary for decoding
261    pub meta: Option<MySqlColumnMeta>,
262}
263
264impl RustType<ProtoMySqlColumnDesc> for MySqlColumnDesc {
265    fn into_proto(&self) -> ProtoMySqlColumnDesc {
266        ProtoMySqlColumnDesc {
267            name: self.name.clone(),
268            column_type: self.column_type.into_proto(),
269            meta: self.meta.as_ref().and_then(|meta| match meta {
270                MySqlColumnMeta::Enum(e) => Some(Meta::Enum(e.into_proto())),
271                MySqlColumnMeta::Json => Some(Meta::Json(ProtoMySqlColumnMetaJson {})),
272                MySqlColumnMeta::Year => Some(Meta::Year(ProtoMySqlColumnMetaYear {})),
273                MySqlColumnMeta::Date => Some(Meta::Date(ProtoMySqlColumnMetaDate {})),
274                MySqlColumnMeta::Timestamp(precision) => {
275                    Some(Meta::Timestamp(ProtoMySqlColumnMetaTimestamp {
276                        precision: *precision,
277                    }))
278                }
279                MySqlColumnMeta::Bit(precision) => Some(Meta::Bit(ProtoMySqlColumnMetaBit {
280                    precision: *precision,
281                })),
282            }),
283        }
284    }
285
286    fn from_proto(proto: ProtoMySqlColumnDesc) -> Result<Self, TryFromProtoError> {
287        Ok(Self {
288            name: proto.name,
289            column_type: proto.column_type.into_rust()?,
290            meta: proto
291                .meta
292                .and_then(|meta| match meta {
293                    Meta::Enum(e) => Some(
294                        MySqlColumnMetaEnum::from_proto(e)
295                            .and_then(|e| Ok(MySqlColumnMeta::Enum(e))),
296                    ),
297                    Meta::Json(_) => Some(Ok(MySqlColumnMeta::Json)),
298                    Meta::Year(_) => Some(Ok(MySqlColumnMeta::Year)),
299                    Meta::Date(_) => Some(Ok(MySqlColumnMeta::Date)),
300                    Meta::Timestamp(e) => Some(Ok(MySqlColumnMeta::Timestamp(e.precision))),
301                    Meta::Bit(e) => Some(Ok(MySqlColumnMeta::Bit(e.precision))),
302                })
303                .transpose()?,
304        })
305    }
306}
307
308impl IsCompatible for MySqlColumnDesc {
309    /// Determines if two `MySqlColumnDesc` are compatible with one another in
310    /// a way that Materialize can handle.
311    fn is_compatible(&self, other: &MySqlColumnDesc) -> bool {
312        self.name == other.name
313            && match (&self.column_type, &other.column_type) {
314                (None, None) => true,
315                (Some(self_type), Some(other_type)) => {
316                    self_type.scalar_type == other_type.scalar_type
317                    // Columns are compatible if:
318                    // - self is nullable; introducing a not null constraint doesn't
319                    //   change this column's behavior.
320                    // - self and other are both not nullable
321                    && (self_type.nullable || self_type.nullable == other_type.nullable)
322                }
323                (Some(_), None) => false,
324                (None, Some(_)) => false,
325            }
326            // Ensure any column metadata is compatible
327            && self.meta.is_compatible(&other.meta)
328    }
329}
330
331#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Ord, PartialOrd)]
332#[cfg_attr(any(test, feature = "proptest"), derive(Arbitrary))]
333pub struct MySqlKeyDesc {
334    /// The name of the index.
335    pub name: String,
336    /// Whether or not this key is the primary key.
337    pub is_primary: bool,
338    /// The columns that make up the key.
339    #[cfg_attr(
340        any(test, feature = "proptest"),
341        proptest(strategy = "proptest::collection::vec(any::<String>(), 0..4)")
342    )]
343    pub columns: Vec<String>,
344}
345
346impl RustType<ProtoMySqlKeyDesc> for MySqlKeyDesc {
347    fn into_proto(&self) -> ProtoMySqlKeyDesc {
348        ProtoMySqlKeyDesc {
349            name: self.name.clone(),
350            is_primary: self.is_primary.clone(),
351            columns: self.columns.clone(),
352        }
353    }
354
355    fn from_proto(proto: ProtoMySqlKeyDesc) -> Result<Self, TryFromProtoError> {
356        Ok(Self {
357            name: proto.name,
358            is_primary: proto.is_primary,
359            columns: proto.columns,
360        })
361    }
362}