mz_mysql_util/
schemas.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::sync::LazyLock;
12
13use itertools::Itertools;
14use maplit::btreeset;
15use regex::Regex;
16
17use mysql_async::prelude::{FromRow, Queryable};
18use mysql_async::{FromRowError, Row};
19
20use mz_repr::adt::char::CharLength;
21use mz_repr::adt::numeric::{NUMERIC_DATUM_MAX_PRECISION, NumericMaxScale};
22use mz_repr::adt::timestamp::TimestampPrecision;
23use mz_repr::adt::varchar::VarCharMaxLength;
24use mz_repr::{ColumnType, ScalarType};
25
26use crate::desc::{
27    MySqlColumnDesc, MySqlColumnMeta, MySqlColumnMetaEnum, MySqlKeyDesc, MySqlTableDesc,
28};
29use crate::{MySqlError, UnsupportedDataType};
30
31/// Built-in system schemas that should be ignored when querying for user-defined tables
32/// since they contain dozens of built-in system tables that are likely not needed.
33pub static SYSTEM_SCHEMAS: LazyLock<BTreeSet<&str>> = LazyLock::new(|| {
34    btreeset! {
35        "information_schema",
36        "performance_schema",
37        "mysql",
38        "sys",
39    }
40});
41
42/// Helper for querying information_schema.columns
43// NOTE: The order of these names *must* match the order of fields of the [`InfoSchema`] struct.
44const INFO_SCHEMA_COLS: &[&str] = &[
45    "column_name",
46    "data_type",
47    "column_type",
48    "is_nullable",
49    "numeric_precision",
50    "numeric_scale",
51    "datetime_precision",
52    "character_maximum_length",
53];
54
55// NOTE: The order of these fields *must* match the order of names of the [`INFO_SCHEMA_COLS`] list.
56#[derive(Debug, Clone)]
57pub struct InfoSchema {
58    column_name: String,
59    data_type: String,
60    column_type: String,
61    is_nullable: String,
62    numeric_precision: Option<i64>,
63    numeric_scale: Option<i64>,
64    datetime_precision: Option<i64>,
65    character_maximum_length: Option<i64>,
66}
67
68impl FromRow for InfoSchema {
69    fn from_row_opt(row: Row) -> Result<Self, FromRowError> {
70        let actual = row.columns_ref().iter().map(|c| c.name_ref());
71        let expected = INFO_SCHEMA_COLS.iter().map(|c| c.as_bytes());
72        itertools::assert_equal(actual, expected);
73        let (a, b, c, d, e, f, g, h) = FromRow::from_row_opt(row)?;
74        Ok(Self {
75            column_name: a,
76            data_type: b,
77            column_type: c,
78            is_nullable: d,
79            numeric_precision: e,
80            numeric_scale: f,
81            datetime_precision: g,
82            character_maximum_length: h,
83        })
84    }
85}
86
87impl InfoSchema {
88    pub fn name(self) -> String {
89        self.column_name
90    }
91}
92
93/// A representation of the raw schema info for a table from MySQL
94#[derive(Debug, Clone)]
95pub struct MySqlTableSchema {
96    pub schema_name: String,
97    pub name: String,
98    pub columns: Vec<InfoSchema>,
99    pub keys: BTreeSet<MySqlKeyDesc>,
100}
101
102impl MySqlTableSchema {
103    pub fn table_ref<'a>(&'a self) -> QualifiedTableRef<'a> {
104        QualifiedTableRef {
105            schema_name: &self.schema_name,
106            table_name: &self.name,
107        }
108    }
109
110    /// Convert the raw table schema to our MySqlTableDesc representation
111    /// using any provided text_columns and exclude_columns
112    pub fn to_desc(
113        self,
114        text_columns: Option<&BTreeSet<&str>>,
115        exclude_columns: Option<&BTreeSet<&str>>,
116    ) -> Result<MySqlTableDesc, MySqlError> {
117        // Verify there are no duplicates in text_columns and exclude_columns
118        match (&text_columns, &exclude_columns) {
119            (Some(text_cols), Some(ignore_cols)) => {
120                let intersection: Vec<_> = text_cols.intersection(ignore_cols).collect();
121                if !intersection.is_empty() {
122                    Err(MySqlError::DuplicatedColumnNames {
123                        qualified_table_name: format!("{:?}.{:?}", self.schema_name, self.name),
124                        columns: intersection.iter().map(|s| (*s).to_string()).collect(),
125                    })?;
126                }
127            }
128            _ => (),
129        };
130
131        let mut columns = Vec::with_capacity(self.columns.len());
132        let mut error_cols = vec![];
133        for info in self.columns {
134            // If this column is designated as a text column and of a supported text-column type
135            // treat it as a string and skip type parsing.
136            if let Some(text_columns) = &text_columns {
137                if text_columns.contains(&info.column_name.as_str()) {
138                    match parse_as_text_column(&info, &self.schema_name, &self.name) {
139                        Err(err) => error_cols.push(err),
140                        Ok((scalar_type, meta)) => columns.push(MySqlColumnDesc {
141                            name: info.column_name,
142                            column_type: Some(ColumnType {
143                                scalar_type,
144                                nullable: &info.is_nullable == "YES",
145                            }),
146                            meta,
147                        }),
148                    }
149                    continue;
150                }
151            }
152
153            // If this column is ignored, use None for the column type to signal that it should be.
154            if let Some(ignore_cols) = &exclude_columns {
155                if ignore_cols.contains(&info.column_name.as_str()) {
156                    columns.push(MySqlColumnDesc {
157                        name: info.column_name,
158                        column_type: None,
159                        meta: None,
160                    });
161                    continue;
162                }
163            }
164
165            // Collect the parsed data types or errors for later reporting.
166            match parse_data_type(&info, &self.schema_name, &self.name) {
167                Err(err) => error_cols.push(err),
168                Ok((scalar_type, meta)) => columns.push(MySqlColumnDesc {
169                    name: info.column_name,
170                    column_type: Some(ColumnType {
171                        scalar_type,
172                        nullable: &info.is_nullable == "YES",
173                    }),
174                    meta,
175                }),
176            }
177        }
178        if error_cols.len() > 0 {
179            Err(MySqlError::UnsupportedDataTypes {
180                columns: error_cols,
181            })?;
182        }
183
184        Ok(MySqlTableDesc {
185            schema_name: self.schema_name,
186            name: self.name,
187            columns,
188            keys: self.keys,
189        })
190    }
191}
192
193/// Request for table schemas from MySQL
194pub enum SchemaRequest<'a> {
195    /// Request schemas for all tables in the database, excluding tables in
196    /// the built-in system schemas.
197    All,
198    /// Request schemas for all tables in the database, including tables from
199    /// the built-in system schemas.
200    AllWithSystemSchemas,
201    /// Request schemas for all tables in the specified schemas/databases
202    Schemas(Vec<&'a str>),
203    /// Request schemas for all specified tables, specified as (schema_name, table_name)
204    Tables(Vec<(&'a str, &'a str)>),
205}
206
207/// A reference to a table in a schema/database
208#[derive(Debug, Hash, PartialEq, Eq, Clone, Ord, PartialOrd)]
209pub struct QualifiedTableRef<'a> {
210    pub schema_name: &'a str,
211    pub table_name: &'a str,
212}
213
214/// Retrieve the tables and column descriptions for tables in the given schemas.
215pub async fn schema_info<'a, Q>(
216    conn: &mut Q,
217    schema_request: &SchemaRequest<'a>,
218) -> Result<Vec<MySqlTableSchema>, MySqlError>
219where
220    Q: Queryable,
221{
222    let table_rows: Vec<(String, String)> = match schema_request {
223        SchemaRequest::All => {
224            let table_q = format!(
225                "SELECT table_name, table_schema
226                FROM information_schema.tables
227                WHERE table_type = 'BASE TABLE'
228                AND table_schema NOT IN ({})",
229                SYSTEM_SCHEMAS.iter().map(|s| format!("'{}'", s)).join(", ")
230            );
231            conn.exec(table_q, ()).await?
232        }
233        SchemaRequest::AllWithSystemSchemas => {
234            let table_q = "SELECT table_name, table_schema
235                FROM information_schema.tables
236                WHERE table_type = 'BASE TABLE'";
237            conn.exec(table_q, ()).await?
238        }
239        SchemaRequest::Schemas(schemas) => {
240            // Get all tables of type 'Base Table' in specified schemas
241            if schemas.is_empty() {
242                return Ok(vec![]);
243            }
244            let table_q = format!(
245                "SELECT table_name, table_schema
246                FROM information_schema.tables
247                WHERE table_type = 'BASE TABLE'
248                AND table_schema IN ({})",
249                schemas.iter().map(|_| "?").join(", ")
250            );
251            conn.exec(table_q, schemas).await?
252        }
253        SchemaRequest::Tables(tables) => {
254            // Get all specified tables
255            if tables.is_empty() {
256                return Ok(vec![]);
257            }
258            let table_q = format!(
259                "SELECT table_name, table_schema
260                FROM information_schema.tables
261                WHERE table_type = 'BASE TABLE'
262                AND (table_schema, table_name) IN ({})",
263                tables.iter().map(|_| "(?, ?)").join(", ")
264            );
265            conn.exec(
266                table_q,
267                tables
268                    .iter()
269                    .flat_map(|(s, t)| [*s, *t])
270                    .collect::<Vec<_>>(),
271            )
272            .await?
273        }
274    };
275
276    let mut tables = vec![];
277    for (table_name, schema_name) in table_rows {
278        // NOTE: It's important that we order by ordinal_position ASC since we rely on this as
279        // the ordering in which columns are returned in a row.
280        let column_q = format!(
281            "SELECT {}
282             FROM information_schema.columns
283             WHERE table_name = ? AND table_schema = ?
284             ORDER BY ordinal_position ASC",
285            INFO_SCHEMA_COLS
286                .iter()
287                .map(|c| format!("{c} AS {c}"))
288                .join(", ")
289        );
290        let column_rows = conn
291            .exec::<InfoSchema, _, _>(column_q, (&table_name, &schema_name))
292            .await?;
293
294        // Query for primary key and unique constraints that do not contain expressions / functional key parts.
295        // When a constraint contains expressions, the column_name field is NULL.
296        let index_rows = conn
297            .exec::<(String, String), _, _>(
298                "SELECT
299                    index_name,
300                    column_name
301                FROM information_schema.statistics AS outt
302                WHERE
303                    table_schema NOT IN ('information_schema', 'performance_schema', 'mysql', 'sys')
304                    AND NOT EXISTS (
305                        SELECT 1
306                        FROM information_schema.statistics AS inn
307                        WHERE outt.index_name = inn.index_name AND inn.column_name IS NULL
308                    )
309                    AND non_unique = 0
310                    AND table_name = ?
311                    AND table_schema = ?
312                ORDER BY index_name, seq_in_index
313            ",
314                (&table_name, &schema_name),
315            )
316            .await?;
317
318        let mut indices = BTreeMap::new();
319        for (index_name, column) in index_rows {
320            indices
321                .entry(index_name)
322                .or_insert_with(Vec::new)
323                .push(column);
324        }
325        let mut keys = BTreeSet::new();
326        while let Some((index_name, columns)) = indices.pop_first() {
327            keys.insert(MySqlKeyDesc {
328                is_primary: &index_name == "PRIMARY",
329                name: index_name,
330                columns,
331            });
332        }
333
334        tables.push(MySqlTableSchema {
335            schema_name,
336            name: table_name,
337            columns: column_rows,
338            keys,
339        });
340    }
341
342    Ok(tables)
343}
344
345fn parse_data_type(
346    info: &InfoSchema,
347    schema_name: &str,
348    table_name: &str,
349) -> Result<(ScalarType, Option<MySqlColumnMeta>), UnsupportedDataType> {
350    let unsigned = info.column_type.contains("unsigned");
351
352    let scalar_type = match info.data_type.as_str() {
353        "tinyint" | "smallint" => {
354            if unsigned {
355                ScalarType::UInt16
356            } else {
357                ScalarType::Int16
358            }
359        }
360        "mediumint" | "int" => {
361            if unsigned {
362                ScalarType::UInt32
363            } else {
364                ScalarType::Int32
365            }
366        }
367        "bigint" => {
368            if unsigned {
369                ScalarType::UInt64
370            } else {
371                ScalarType::Int64
372            }
373        }
374        "float" => ScalarType::Float32,
375        "double" => ScalarType::Float64,
376        "date" => ScalarType::Date,
377        "datetime" | "timestamp" => ScalarType::Timestamp {
378            // both mysql and our scalar type use a max six-digit fractional-second precision
379            // this is bounds-checked in the TryFrom impl
380            precision: info
381                .datetime_precision
382                .map(TimestampPrecision::try_from)
383                .transpose()
384                .map_err(|_| UnsupportedDataType {
385                    column_type: info.column_type.clone(),
386                    qualified_table_name: format!("{:?}.{:?}", schema_name, table_name),
387                    column_name: info.column_name.clone(),
388                    intended_type: None,
389                })?,
390        },
391        "time" => ScalarType::Time,
392        "decimal" | "numeric" => {
393            // validate the precision is within the bounds of our numeric type
394            // here since we don't use this precision on the ScalarType itself
395            // whereas the scale will be bounds-checked in the TryFrom impl
396            if info.numeric_precision.unwrap_or_default() > NUMERIC_DATUM_MAX_PRECISION.into() {
397                Err(UnsupportedDataType {
398                    column_type: info.column_type.clone(),
399                    qualified_table_name: format!("{:?}.{:?}", schema_name, table_name),
400                    column_name: info.column_name.clone(),
401                    intended_type: None,
402                })?
403            }
404            ScalarType::Numeric {
405                max_scale: info
406                    .numeric_scale
407                    .map(NumericMaxScale::try_from)
408                    .transpose()
409                    .map_err(|_| UnsupportedDataType {
410                        column_type: info.column_type.clone(),
411                        qualified_table_name: format!("{:?}.{:?}", schema_name, table_name),
412                        column_name: info.column_name.clone(),
413                        intended_type: None,
414                    })?,
415            }
416        }
417        "char" => ScalarType::Char {
418            length: info
419                .character_maximum_length
420                .and_then(|f| Some(CharLength::try_from(f)))
421                .transpose()
422                .map_err(|_| UnsupportedDataType {
423                    column_type: info.column_type.clone(),
424                    qualified_table_name: format!("{:?}.{:?}", schema_name, table_name),
425                    column_name: info.column_name.clone(),
426                    intended_type: None,
427                })?,
428        },
429        "varchar" => ScalarType::VarChar {
430            max_length: info
431                .character_maximum_length
432                .and_then(|f| Some(VarCharMaxLength::try_from(f)))
433                .transpose()
434                .map_err(|_| UnsupportedDataType {
435                    column_type: info.column_type.clone(),
436                    qualified_table_name: format!("{:?}.{:?}", schema_name, table_name),
437                    column_name: info.column_name.clone(),
438                    intended_type: None,
439                })?,
440        },
441        "text" | "tinytext" | "mediumtext" | "longtext" => ScalarType::String,
442        "binary" | "varbinary" | "tinyblob" | "blob" | "mediumblob" | "longblob" => {
443            ScalarType::Bytes
444        }
445        "json" => ScalarType::Jsonb,
446        // TODO(mysql): Support the `bit` type natively in Materialize.
447        "bit" => {
448            let precision = match info.numeric_precision {
449                Some(x @ 0..=64) => u32::try_from(x).expect("known good value"),
450                prec => {
451                    mz_ore::soft_panic_or_log!(
452                        "found invalid bit precision, {prec:?}, falling back"
453                    );
454                    64u32
455                }
456            };
457            return Ok((ScalarType::UInt64, Some(MySqlColumnMeta::Bit(precision))));
458        }
459        typ => {
460            tracing::warn!(?typ, "found unsupported data type");
461            return Err(UnsupportedDataType {
462                column_type: info.column_type.clone(),
463                qualified_table_name: format!("{:?}.{:?}", schema_name, table_name),
464                column_name: info.column_name.clone(),
465                intended_type: None,
466            });
467        }
468    };
469
470    Ok((scalar_type, None))
471}
472
473/// Parse the specified column as a TEXT COLUMN. We only support the set of types that are
474/// represented as an encoded-string in both the mysql-common binary query response and binlog
475/// event representation, OR types that we've added explicit casting support for.
476fn parse_as_text_column(
477    info: &InfoSchema,
478    schema_name: &str,
479    table_name: &str,
480) -> Result<(ScalarType, Option<MySqlColumnMeta>), UnsupportedDataType> {
481    match info.data_type.as_str() {
482        "year" => Ok((ScalarType::String, Some(MySqlColumnMeta::Year))),
483        "json" => Ok((ScalarType::String, Some(MySqlColumnMeta::Json))),
484        "enum" => Ok((
485            ScalarType::String,
486            Some(MySqlColumnMeta::Enum(MySqlColumnMetaEnum {
487                values: enum_vals_from_column_type(info.column_type.as_str()).map_err(|_| {
488                    UnsupportedDataType {
489                        column_type: info.column_type.clone(),
490                        qualified_table_name: format!("{:?}.{:?}", schema_name, table_name),
491                        column_name: info.column_name.clone(),
492                        intended_type: Some("text".to_string()),
493                    }
494                })?,
495            })),
496        )),
497        "date" => Ok((ScalarType::String, Some(MySqlColumnMeta::Date))),
498        "datetime" | "timestamp" => Ok((
499            ScalarType::String,
500            Some(MySqlColumnMeta::Timestamp(
501                info.datetime_precision
502                    // Default precision is 0 in MySQL if not specified
503                    .unwrap_or_default()
504                    .try_into()
505                    .map_err(|_| UnsupportedDataType {
506                        column_type: info.column_type.clone(),
507                        qualified_table_name: format!("{:?}.{:?}", schema_name, table_name),
508                        column_name: info.column_name.clone(),
509                        intended_type: Some("text".to_string()),
510                    })?,
511            )),
512        )),
513        _ => Err(UnsupportedDataType {
514            column_type: info.column_type.clone(),
515            qualified_table_name: format!("{:?}.{:?}", schema_name, table_name),
516            column_name: info.column_name.clone(),
517            intended_type: Some("text".to_string()),
518        }),
519    }
520}
521
522static ENUM_VAL_REGEX: LazyLock<Regex> =
523    LazyLock::new(|| Regex::new(r"'((?:[^']|'')*)'").expect("valid regex"));
524
525/// Parse the enum values from a column_type value on an enum column, which is a string like
526/// "enum('apple','banana','cher,ry','ora''nge')"
527/// We need to handle the case where the enum value itself contains a comma or a
528/// single quote (escaped with another quote), so we use a regex to do so
529fn enum_vals_from_column_type(s: &str) -> Result<Vec<String>, anyhow::Error> {
530    let vals_str = s
531        .strip_prefix("enum(")
532        .and_then(|s| s.strip_suffix(')'))
533        .ok_or_else(|| anyhow::format_err!("Unable to parse enum column type string"))?;
534
535    Ok(ENUM_VAL_REGEX
536        .captures_iter(vals_str)
537        .map(|s| s[1].replace("''", "'"))
538        .collect())
539}
540
541#[cfg(test)]
542mod tests {
543    use super::*;
544
545    #[mz_ore::test]
546    fn test_enum_value_parsing() {
547        let vals =
548            enum_vals_from_column_type("enum('apple','banana','cher,ry','ora''nge')").unwrap();
549        assert_eq!(vals, vec!["apple", "banana", "cher,ry", "ora'nge"]);
550    }
551}