mz_mysql_util/
desc.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeSet;
11
12use anyhow::bail;
13use proptest::prelude::any;
14use proptest_derive::Arbitrary;
15use serde::{Deserialize, Serialize};
16
17use mz_proto::{ProtoType, RustType, TryFromProtoError};
18use mz_repr::ColumnType;
19
20use self::proto_my_sql_column_desc::Meta;
21
22include!(concat!(env!("OUT_DIR"), "/mz_mysql_util.rs"));
23
24#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
25pub struct MySqlTableDesc {
26    /// In MySQL the schema and database of a table are synonymous.
27    pub schema_name: String,
28    /// The name of the table.
29    pub name: String,
30    /// Columns for the table
31    ///
32    /// The index of each column is based on its `ordinal_position`
33    /// reported by the information_schema.columns table, which defines
34    /// the order of column values when received in a row.
35    #[proptest(strategy = "proptest::collection::vec(any::<MySqlColumnDesc>(), 0..4)")]
36    pub columns: Vec<MySqlColumnDesc>,
37    /// Applicable keys for this table (i.e. primary key and unique
38    /// constraints).
39    #[proptest(strategy = "proptest::collection::btree_set(any::<MySqlKeyDesc>(), 0..4)")]
40    pub keys: BTreeSet<MySqlKeyDesc>,
41}
42
43impl RustType<ProtoMySqlTableDesc> for MySqlTableDesc {
44    fn into_proto(&self) -> ProtoMySqlTableDesc {
45        ProtoMySqlTableDesc {
46            schema_name: self.schema_name.clone(),
47            name: self.name.clone(),
48            columns: self.columns.iter().map(|c| c.into_proto()).collect(),
49            keys: self.keys.iter().map(|c| c.into_proto()).collect(),
50        }
51    }
52
53    fn from_proto(proto: ProtoMySqlTableDesc) -> Result<Self, TryFromProtoError> {
54        Ok(Self {
55            schema_name: proto.schema_name,
56            name: proto.name,
57            columns: proto
58                .columns
59                .into_iter()
60                .map(MySqlColumnDesc::from_proto)
61                .collect::<Result<_, _>>()?,
62            keys: proto
63                .keys
64                .into_iter()
65                .map(MySqlKeyDesc::from_proto)
66                .collect::<Result<_, _>>()?,
67        })
68    }
69}
70
71impl MySqlTableDesc {
72    /// Determines if two `MySqlTableDesc` are compatible with one another in
73    /// a way that Materialize can handle.
74    ///
75    /// Currently this means that the values are equal except for the following
76    /// exceptions:
77    /// - `self`'s columns are a prefix of `other`'s columns.
78    /// - `self`'s keys are all present in `other`
79    pub fn determine_compatibility(&self, other: &MySqlTableDesc) -> Result<(), anyhow::Error> {
80        if self == other {
81            return Ok(());
82        }
83
84        if self.schema_name != other.schema_name || self.name != other.name {
85            bail!(
86                "table name mismatch: self: {}.{}, other: {}.{}",
87                self.schema_name,
88                self.name,
89                other.schema_name,
90                other.name
91            );
92        }
93
94        // `columns` is ordered by the ordinal_position of each column in the table,
95        // so as long as `self.columns` is a compatible prefix of `other.columns`, we can
96        // ignore extra columns from `other.columns`.
97        let mut other_columns = other.columns.iter();
98        for self_column in &self.columns {
99            let other_column = other_columns.next().ok_or_else(|| {
100                anyhow::anyhow!(
101                    "column {} no longer present in table {}",
102                    self_column.name,
103                    self.name
104                )
105            })?;
106            if !self_column.is_compatible(other_column) {
107                bail!(
108                    "column {} in table {} has been altered",
109                    self_column.name,
110                    self.name
111                );
112            }
113        }
114
115        // Our keys are all still present in exactly the same shape.
116        // TODO: Implement a more relaxed key compatibility check:
117        // We should check that for all keys that we know about there exists an upstream key whose
118        // set of columns is a subset of the set of columns of the key we know about. For example
119        // if we had previously discovered that the table had two compound unique keys, key1 made
120        // up of columns (a, b) and key2 made up of columns (a, c) but now the table only has a
121        // single unique key of just the column a then it's compatible because {a} ⊆ {a, b} and
122        // {a} ⊆ {a, c}.
123        if self.keys.difference(&other.keys).next().is_some() {
124            bail!(
125                "keys in table {} have been altered: self: {:?}, other: {:?}",
126                self.name,
127                self.keys,
128                other.keys
129            );
130        }
131
132        Ok(())
133    }
134}
135
136#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
137pub struct MySqlColumnMetaEnum {
138    #[proptest(strategy = "proptest::collection::vec(any::<String>(), 0..3)")]
139    pub values: Vec<String>,
140}
141
142impl RustType<ProtoMySqlColumnMetaEnum> for MySqlColumnMetaEnum {
143    fn into_proto(&self) -> ProtoMySqlColumnMetaEnum {
144        ProtoMySqlColumnMetaEnum {
145            values: self.values.clone(),
146        }
147    }
148
149    fn from_proto(proto: ProtoMySqlColumnMetaEnum) -> Result<Self, TryFromProtoError> {
150        Ok(Self {
151            values: proto.values,
152        })
153    }
154}
155
156trait IsCompatible {
157    fn is_compatible(&self, other: &Self) -> bool;
158}
159
160#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
161pub enum MySqlColumnMeta {
162    /// The described column is an enum, with the given possible values.
163    Enum(MySqlColumnMetaEnum),
164    /// The described column is a json value.
165    Json,
166    /// The described column is a year value.
167    Year,
168    /// The described column is a date value.
169    Date,
170    /// The described column is a timestamp value with a set precision.
171    Timestamp(u32),
172    /// The described column is a `bit` column, with the given possibly precision.
173    Bit(u32),
174}
175
176impl IsCompatible for Option<MySqlColumnMeta> {
177    fn is_compatible(&self, other: &Option<MySqlColumnMeta>) -> bool {
178        match (self, other) {
179            (None, None) => true,
180            (Some(_), None) => false,
181            (None, Some(_)) => false,
182            (Some(MySqlColumnMeta::Enum(self_enum)), Some(MySqlColumnMeta::Enum(other_enum))) => {
183                // so as long as `self.values` is a compatible prefix of `other.values`, we can
184                // ignore extra values from `other.values`.
185                self_enum.values.len() <= other_enum.values.len()
186                    && self_enum
187                        .values
188                        .iter()
189                        .zip(other_enum.values.iter())
190                        .all(|(self_val, other_val)| self_val == other_val)
191            }
192            (Some(MySqlColumnMeta::Json), Some(MySqlColumnMeta::Json)) => true,
193            (Some(MySqlColumnMeta::Year), Some(MySqlColumnMeta::Year)) => true,
194            (Some(MySqlColumnMeta::Date), Some(MySqlColumnMeta::Date)) => true,
195            // Timestamps are compatible as long as we don't lose precision
196            (
197                Some(MySqlColumnMeta::Timestamp(precision)),
198                Some(MySqlColumnMeta::Timestamp(other_precision)),
199            ) => precision <= other_precision,
200            // We always cast bit columns to u64's and the max precision of a bit column
201            // is 64 bits, so any bit column is always compatible with another.
202            (Some(MySqlColumnMeta::Bit(_)), Some(MySqlColumnMeta::Bit(_))) => true,
203            _ => false,
204        }
205    }
206}
207
208#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
209pub struct MySqlColumnDesc {
210    /// The name of the column.
211    pub name: String,
212    /// The intended data type of this column within Materialize
213    /// If this is None, the column is intended to be skipped within Materialize
214    pub column_type: Option<ColumnType>,
215    /// Optional metadata about the column that may be necessary for decoding
216    pub meta: Option<MySqlColumnMeta>,
217}
218
219impl RustType<ProtoMySqlColumnDesc> for MySqlColumnDesc {
220    fn into_proto(&self) -> ProtoMySqlColumnDesc {
221        ProtoMySqlColumnDesc {
222            name: self.name.clone(),
223            column_type: self.column_type.into_proto(),
224            meta: self.meta.as_ref().and_then(|meta| match meta {
225                MySqlColumnMeta::Enum(e) => Some(Meta::Enum(e.into_proto())),
226                MySqlColumnMeta::Json => Some(Meta::Json(ProtoMySqlColumnMetaJson {})),
227                MySqlColumnMeta::Year => Some(Meta::Year(ProtoMySqlColumnMetaYear {})),
228                MySqlColumnMeta::Date => Some(Meta::Date(ProtoMySqlColumnMetaDate {})),
229                MySqlColumnMeta::Timestamp(precision) => {
230                    Some(Meta::Timestamp(ProtoMySqlColumnMetaTimestamp {
231                        precision: *precision,
232                    }))
233                }
234                MySqlColumnMeta::Bit(precision) => Some(Meta::Bit(ProtoMySqlColumnMetaBit {
235                    precision: *precision,
236                })),
237            }),
238        }
239    }
240
241    fn from_proto(proto: ProtoMySqlColumnDesc) -> Result<Self, TryFromProtoError> {
242        Ok(Self {
243            name: proto.name,
244            column_type: proto.column_type.into_rust()?,
245            meta: proto
246                .meta
247                .and_then(|meta| match meta {
248                    Meta::Enum(e) => Some(
249                        MySqlColumnMetaEnum::from_proto(e)
250                            .and_then(|e| Ok(MySqlColumnMeta::Enum(e))),
251                    ),
252                    Meta::Json(_) => Some(Ok(MySqlColumnMeta::Json)),
253                    Meta::Year(_) => Some(Ok(MySqlColumnMeta::Year)),
254                    Meta::Date(_) => Some(Ok(MySqlColumnMeta::Date)),
255                    Meta::Timestamp(e) => Some(Ok(MySqlColumnMeta::Timestamp(e.precision))),
256                    Meta::Bit(e) => Some(Ok(MySqlColumnMeta::Bit(e.precision))),
257                })
258                .transpose()?,
259        })
260    }
261}
262
263impl IsCompatible for MySqlColumnDesc {
264    /// Determines if two `MySqlColumnDesc` are compatible with one another in
265    /// a way that Materialize can handle.
266    fn is_compatible(&self, other: &MySqlColumnDesc) -> bool {
267        self.name == other.name
268            && match (&self.column_type, &other.column_type) {
269                (None, None) => true,
270                (Some(self_type), Some(other_type)) => {
271                    self_type.scalar_type == other_type.scalar_type
272                    // Columns are compatible if:
273                    // - self is nullable; introducing a not null constraint doesn't
274                    //   change this column's behavior.
275                    // - self and other are both not nullable
276                    && (self_type.nullable || self_type.nullable == other_type.nullable)
277                }
278                (Some(_), None) => false,
279                (None, Some(_)) => false,
280            }
281            // Ensure any column metadata is compatible
282            && self.meta.is_compatible(&other.meta)
283    }
284}
285
286#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Ord, PartialOrd, Arbitrary)]
287pub struct MySqlKeyDesc {
288    /// The name of the index.
289    pub name: String,
290    /// Whether or not this key is the primary key.
291    pub is_primary: bool,
292    /// The columns that make up the key.
293    #[proptest(strategy = "proptest::collection::vec(any::<String>(), 0..4)")]
294    pub columns: Vec<String>,
295}
296
297impl RustType<ProtoMySqlKeyDesc> for MySqlKeyDesc {
298    fn into_proto(&self) -> ProtoMySqlKeyDesc {
299        ProtoMySqlKeyDesc {
300            name: self.name.clone(),
301            is_primary: self.is_primary.clone(),
302            columns: self.columns.clone(),
303        }
304    }
305
306    fn from_proto(proto: ProtoMySqlKeyDesc) -> Result<Self, TryFromProtoError> {
307        Ok(Self {
308            name: proto.name,
309            is_primary: proto.is_primary,
310            columns: proto.columns,
311        })
312    }
313}