mz_mysql_util/
desc.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeSet;
11
12use anyhow::bail;
13use mz_proto::{ProtoType, RustType, TryFromProtoError};
14use mz_repr::SqlColumnType;
15use proptest::prelude::any;
16use proptest_derive::Arbitrary;
17use serde::{Deserialize, Serialize};
18
19use self::proto_my_sql_column_desc::Meta;
20
21include!(concat!(env!("OUT_DIR"), "/mz_mysql_util.rs"));
22
23#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
24pub struct MySqlTableDesc {
25    /// In MySQL the schema and database of a table are synonymous.
26    pub schema_name: String,
27    /// The name of the table.
28    pub name: String,
29    /// Columns for the table
30    ///
31    /// The index of each column is based on its `ordinal_position`
32    /// reported by the information_schema.columns table, which defines
33    /// the order of column values when received in a row.
34    #[proptest(strategy = "proptest::collection::vec(any::<MySqlColumnDesc>(), 0..4)")]
35    pub columns: Vec<MySqlColumnDesc>,
36    /// Applicable keys for this table (i.e. primary key and unique
37    /// constraints).
38    #[proptest(strategy = "proptest::collection::btree_set(any::<MySqlKeyDesc>(), 0..4)")]
39    pub keys: BTreeSet<MySqlKeyDesc>,
40}
41
42impl RustType<ProtoMySqlTableDesc> for MySqlTableDesc {
43    fn into_proto(&self) -> ProtoMySqlTableDesc {
44        ProtoMySqlTableDesc {
45            schema_name: self.schema_name.clone(),
46            name: self.name.clone(),
47            columns: self.columns.iter().map(|c| c.into_proto()).collect(),
48            keys: self.keys.iter().map(|c| c.into_proto()).collect(),
49        }
50    }
51
52    fn from_proto(proto: ProtoMySqlTableDesc) -> Result<Self, TryFromProtoError> {
53        Ok(Self {
54            schema_name: proto.schema_name,
55            name: proto.name,
56            columns: proto
57                .columns
58                .into_iter()
59                .map(MySqlColumnDesc::from_proto)
60                .collect::<Result<_, _>>()?,
61            keys: proto
62                .keys
63                .into_iter()
64                .map(MySqlKeyDesc::from_proto)
65                .collect::<Result<_, _>>()?,
66        })
67    }
68}
69
70impl MySqlTableDesc {
71    /// Determines if two `MySqlTableDesc` are compatible with one another in
72    /// a way that Materialize can handle.
73    ///
74    /// Currently this means that the values are equal except for the following
75    /// exceptions:
76    /// - `self`'s columns are a prefix of `other`'s columns.
77    /// - `self`'s keys are all present in `other`
78    pub fn determine_compatibility(&self, other: &MySqlTableDesc) -> Result<(), anyhow::Error> {
79        if self == other {
80            return Ok(());
81        }
82
83        if self.schema_name != other.schema_name || self.name != other.name {
84            bail!(
85                "table name mismatch: self: {}.{}, other: {}.{}",
86                self.schema_name,
87                self.name,
88                other.schema_name,
89                other.name
90            );
91        }
92
93        // `columns` is ordered by the ordinal_position of each column in the table,
94        // so as long as `self.columns` is a compatible prefix of `other.columns`, we can
95        // ignore extra columns from `other.columns`.
96        let mut other_columns = other.columns.iter();
97        for self_column in &self.columns {
98            let other_column = other_columns.next().ok_or_else(|| {
99                anyhow::anyhow!(
100                    "column {} no longer present in table {}",
101                    self_column.name,
102                    self.name
103                )
104            })?;
105            if !self_column.is_compatible(other_column) {
106                bail!(
107                    "column {} in table {} has been altered",
108                    self_column.name,
109                    self.name
110                );
111            }
112        }
113
114        // Our keys are all still present in exactly the same shape.
115        // TODO: Implement a more relaxed key compatibility check:
116        // We should check that for all keys that we know about there exists an upstream key whose
117        // set of columns is a subset of the set of columns of the key we know about. For example
118        // if we had previously discovered that the table had two compound unique keys, key1 made
119        // up of columns (a, b) and key2 made up of columns (a, c) but now the table only has a
120        // single unique key of just the column a then it's compatible because {a} ⊆ {a, b} and
121        // {a} ⊆ {a, c}.
122        if self.keys.difference(&other.keys).next().is_some() {
123            bail!(
124                "keys in table {} have been altered: self: {:?}, other: {:?}",
125                self.name,
126                self.keys,
127                other.keys
128            );
129        }
130
131        Ok(())
132    }
133}
134
135#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
136pub struct MySqlColumnMetaEnum {
137    #[proptest(strategy = "proptest::collection::vec(any::<String>(), 0..3)")]
138    pub values: Vec<String>,
139}
140
141impl RustType<ProtoMySqlColumnMetaEnum> for MySqlColumnMetaEnum {
142    fn into_proto(&self) -> ProtoMySqlColumnMetaEnum {
143        ProtoMySqlColumnMetaEnum {
144            values: self.values.clone(),
145        }
146    }
147
148    fn from_proto(proto: ProtoMySqlColumnMetaEnum) -> Result<Self, TryFromProtoError> {
149        Ok(Self {
150            values: proto.values,
151        })
152    }
153}
154
155trait IsCompatible {
156    fn is_compatible(&self, other: &Self) -> bool;
157}
158
159#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
160pub enum MySqlColumnMeta {
161    /// The described column is an enum, with the given possible values.
162    Enum(MySqlColumnMetaEnum),
163    /// The described column is a json value.
164    Json,
165    /// The described column is a year value.
166    Year,
167    /// The described column is a date value.
168    Date,
169    /// The described column is a timestamp value with a set precision.
170    Timestamp(u32),
171    /// The described column is a `bit` column, with the given possibly precision.
172    Bit(u32),
173}
174
175impl IsCompatible for Option<MySqlColumnMeta> {
176    fn is_compatible(&self, other: &Option<MySqlColumnMeta>) -> bool {
177        match (self, other) {
178            (None, None) => true,
179            (Some(_), None) => false,
180            (None, Some(_)) => false,
181            (Some(MySqlColumnMeta::Enum(self_enum)), Some(MySqlColumnMeta::Enum(other_enum))) => {
182                // so as long as `self.values` is a compatible prefix of `other.values`, we can
183                // ignore extra values from `other.values`.
184                match other_enum.values.get(0..self_enum.values.len()) {
185                    Some(prefix) => self_enum.values == prefix,
186                    None => false,
187                }
188            }
189            (Some(MySqlColumnMeta::Json), Some(MySqlColumnMeta::Json)) => true,
190            (Some(MySqlColumnMeta::Year), Some(MySqlColumnMeta::Year)) => true,
191            (Some(MySqlColumnMeta::Date), Some(MySqlColumnMeta::Date)) => true,
192            // Timestamps are compatible as long as we don't lose precision
193            (
194                Some(MySqlColumnMeta::Timestamp(precision)),
195                Some(MySqlColumnMeta::Timestamp(other_precision)),
196            ) => precision <= other_precision,
197            // We always cast bit columns to u64's and the max precision of a bit column
198            // is 64 bits, so any bit column is always compatible with another.
199            (Some(MySqlColumnMeta::Bit(_)), Some(MySqlColumnMeta::Bit(_))) => true,
200            _ => false,
201        }
202    }
203}
204
205#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
206pub struct MySqlColumnDesc {
207    /// The name of the column.
208    pub name: String,
209    /// The intended data type of this column within Materialize
210    /// If this is None, the column is intended to be skipped within Materialize
211    pub column_type: Option<SqlColumnType>,
212    /// Optional metadata about the column that may be necessary for decoding
213    pub meta: Option<MySqlColumnMeta>,
214}
215
216impl RustType<ProtoMySqlColumnDesc> for MySqlColumnDesc {
217    fn into_proto(&self) -> ProtoMySqlColumnDesc {
218        ProtoMySqlColumnDesc {
219            name: self.name.clone(),
220            column_type: self.column_type.into_proto(),
221            meta: self.meta.as_ref().and_then(|meta| match meta {
222                MySqlColumnMeta::Enum(e) => Some(Meta::Enum(e.into_proto())),
223                MySqlColumnMeta::Json => Some(Meta::Json(ProtoMySqlColumnMetaJson {})),
224                MySqlColumnMeta::Year => Some(Meta::Year(ProtoMySqlColumnMetaYear {})),
225                MySqlColumnMeta::Date => Some(Meta::Date(ProtoMySqlColumnMetaDate {})),
226                MySqlColumnMeta::Timestamp(precision) => {
227                    Some(Meta::Timestamp(ProtoMySqlColumnMetaTimestamp {
228                        precision: *precision,
229                    }))
230                }
231                MySqlColumnMeta::Bit(precision) => Some(Meta::Bit(ProtoMySqlColumnMetaBit {
232                    precision: *precision,
233                })),
234            }),
235        }
236    }
237
238    fn from_proto(proto: ProtoMySqlColumnDesc) -> Result<Self, TryFromProtoError> {
239        Ok(Self {
240            name: proto.name,
241            column_type: proto.column_type.into_rust()?,
242            meta: proto
243                .meta
244                .and_then(|meta| match meta {
245                    Meta::Enum(e) => Some(
246                        MySqlColumnMetaEnum::from_proto(e)
247                            .and_then(|e| Ok(MySqlColumnMeta::Enum(e))),
248                    ),
249                    Meta::Json(_) => Some(Ok(MySqlColumnMeta::Json)),
250                    Meta::Year(_) => Some(Ok(MySqlColumnMeta::Year)),
251                    Meta::Date(_) => Some(Ok(MySqlColumnMeta::Date)),
252                    Meta::Timestamp(e) => Some(Ok(MySqlColumnMeta::Timestamp(e.precision))),
253                    Meta::Bit(e) => Some(Ok(MySqlColumnMeta::Bit(e.precision))),
254                })
255                .transpose()?,
256        })
257    }
258}
259
260impl IsCompatible for MySqlColumnDesc {
261    /// Determines if two `MySqlColumnDesc` are compatible with one another in
262    /// a way that Materialize can handle.
263    fn is_compatible(&self, other: &MySqlColumnDesc) -> bool {
264        self.name == other.name
265            && match (&self.column_type, &other.column_type) {
266                (None, None) => true,
267                (Some(self_type), Some(other_type)) => {
268                    self_type.scalar_type == other_type.scalar_type
269                    // Columns are compatible if:
270                    // - self is nullable; introducing a not null constraint doesn't
271                    //   change this column's behavior.
272                    // - self and other are both not nullable
273                    && (self_type.nullable || self_type.nullable == other_type.nullable)
274                }
275                (Some(_), None) => false,
276                (None, Some(_)) => false,
277            }
278            // Ensure any column metadata is compatible
279            && self.meta.is_compatible(&other.meta)
280    }
281}
282
283#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Ord, PartialOrd, Arbitrary)]
284pub struct MySqlKeyDesc {
285    /// The name of the index.
286    pub name: String,
287    /// Whether or not this key is the primary key.
288    pub is_primary: bool,
289    /// The columns that make up the key.
290    #[proptest(strategy = "proptest::collection::vec(any::<String>(), 0..4)")]
291    pub columns: Vec<String>,
292}
293
294impl RustType<ProtoMySqlKeyDesc> for MySqlKeyDesc {
295    fn into_proto(&self) -> ProtoMySqlKeyDesc {
296        ProtoMySqlKeyDesc {
297            name: self.name.clone(),
298            is_primary: self.is_primary.clone(),
299            columns: self.columns.clone(),
300        }
301    }
302
303    fn from_proto(proto: ProtoMySqlKeyDesc) -> Result<Self, TryFromProtoError> {
304        Ok(Self {
305            name: proto.name,
306            is_primary: proto.is_primary,
307            columns: proto.columns,
308        })
309    }
310}