Skip to main content

mz_sql/pure/
mysql.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! MySQL utilities for SQL purification.
11
12use std::collections::{BTreeMap, BTreeSet};
13
14use mz_mysql_util::{
15    MySqlError, MySqlTableDesc, QualifiedTableRef, SYSTEM_SCHEMAS, validate_source_privileges,
16};
17use mz_proto::RustType;
18use mz_sql_parser::ast::display::AstDisplay;
19use mz_sql_parser::ast::{
20    ColumnDef, CreateSubsourceOption, CreateSubsourceOptionName, CreateSubsourceStatement,
21    ExternalReferences, Ident, MySqlConfigOptionName, TableConstraint, WithOptionValue,
22};
23use mz_sql_parser::ast::{UnresolvedItemName, Value};
24use mz_storage_types::sources::{SourceExportStatementDetails, SourceReferenceResolver};
25use prost::Message;
26
27use crate::names::Aug;
28use crate::plan::{PlanError, StatementContext};
29use crate::pure::{MySqlSourcePurificationError, ResolvedItemName};
30
31use super::references::RetrievedSourceReferences;
32use super::{
33    PurifiedExportDetails, PurifiedSourceExport, RequestedSourceExport, SourceReferencePolicy,
34};
35
36/// The name of the fake database that we use for MySQL sources
37/// to fit our model of a 3-layer catalog. MySQL doesn't have a concept
38/// of databases AND schemas, it treats both as the same thing.
39pub(crate) static MYSQL_DATABASE_FAKE_NAME: &str = "mysql";
40
41/// Convert an unresolved item name to a qualified table reference.
42pub(super) fn external_reference_to_table(
43    name: &UnresolvedItemName,
44) -> Result<QualifiedTableRef<'_>, MySqlSourcePurificationError> {
45    if name.0.len() != 2 {
46        Err(MySqlSourcePurificationError::InvalidTableReference(
47            name.to_string(),
48        ))?
49    }
50    Ok(QualifiedTableRef {
51        schema_name: name.0[0].as_str(),
52        table_name: name.0[1].as_str(),
53    })
54}
55
56pub fn generate_create_subsource_statements(
57    scx: &StatementContext,
58    source_name: ResolvedItemName,
59    requested_subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
60) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
61    let mut subsources = Vec::with_capacity(requested_subsources.len());
62
63    for (subsource_name, purified_export) in requested_subsources {
64        let MySqlExportStatementValues {
65            columns,
66            constraints,
67            text_columns,
68            exclude_columns,
69            details,
70            external_reference,
71        } = generate_source_export_statement_values(scx, purified_export)?;
72
73        let mut with_options = vec![
74            CreateSubsourceOption {
75                name: CreateSubsourceOptionName::ExternalReference,
76                value: Some(WithOptionValue::UnresolvedItemName(external_reference)),
77            },
78            CreateSubsourceOption {
79                name: CreateSubsourceOptionName::Details,
80                value: Some(WithOptionValue::Value(Value::String(hex::encode(
81                    details.into_proto().encode_to_vec(),
82                )))),
83            },
84        ];
85
86        if let Some(text_columns) = text_columns {
87            with_options.push(CreateSubsourceOption {
88                name: CreateSubsourceOptionName::TextColumns,
89                value: Some(WithOptionValue::Sequence(text_columns)),
90            });
91        }
92
93        if let Some(exclude_columns) = exclude_columns {
94            with_options.push(CreateSubsourceOption {
95                name: CreateSubsourceOptionName::ExcludeColumns,
96                value: Some(WithOptionValue::Sequence(exclude_columns)),
97            });
98        }
99
100        // Create the subsource statement
101        let subsource = CreateSubsourceStatement {
102            name: subsource_name,
103            columns,
104            of_source: Some(source_name.clone()),
105            constraints,
106            if_not_exists: false,
107            with_options,
108        };
109        subsources.push(subsource);
110    }
111
112    Ok(subsources)
113}
114
115pub(super) struct MySqlExportStatementValues {
116    pub(super) columns: Vec<ColumnDef<Aug>>,
117    pub(super) constraints: Vec<TableConstraint<Aug>>,
118    pub(super) text_columns: Option<Vec<WithOptionValue<Aug>>>,
119    pub(super) exclude_columns: Option<Vec<WithOptionValue<Aug>>>,
120    pub(super) details: SourceExportStatementDetails,
121    pub(super) external_reference: UnresolvedItemName,
122}
123
124pub(super) fn generate_source_export_statement_values(
125    scx: &StatementContext,
126    purified_export: PurifiedSourceExport,
127) -> Result<MySqlExportStatementValues, PlanError> {
128    let PurifiedExportDetails::MySql {
129        table,
130        text_columns,
131        exclude_columns,
132        initial_gtid_set,
133        binlog_full_metadata,
134    } = purified_export.details
135    else {
136        bail_internal!("purified export details must be mysql");
137    };
138
139    // Figure out the schema of the subsource
140    let mut columns = vec![];
141    for c in table.columns.iter() {
142        match c.column_type {
143            // This column is intentionally ignored, so we don't generate a column for it in
144            // the subsource.
145            None => {}
146            Some(ref column_type) => {
147                let name = Ident::new(&c.name)?;
148
149                let ty = mz_pgrepr::Type::from(&column_type.scalar_type);
150                let data_type = scx.resolve_type(ty)?;
151                let mut col_options = vec![];
152
153                if !column_type.nullable {
154                    col_options.push(mz_sql_parser::ast::ColumnOptionDef {
155                        name: None,
156                        option: mz_sql_parser::ast::ColumnOption::NotNull,
157                    });
158                }
159                columns.push(ColumnDef {
160                    name,
161                    data_type,
162                    collation: None,
163                    options: col_options,
164                });
165            }
166        }
167    }
168
169    let mut constraints = vec![];
170    for key in table.keys.iter() {
171        let columns: Result<Vec<Ident>, _> = key.columns.iter().map(Ident::new).collect();
172
173        let constraint = mz_sql_parser::ast::TableConstraint::Unique {
174            name: Some(Ident::new(&key.name)?),
175            columns: columns?,
176            is_primary: key.is_primary,
177            // MySQL always permits multiple nulls values in unique indexes.
178            nulls_not_distinct: false,
179        };
180
181        // We take the first constraint available to be the primary key.
182        if key.is_primary {
183            constraints.insert(0, constraint);
184        } else {
185            constraints.push(constraint);
186        }
187    }
188
189    let details = SourceExportStatementDetails::MySql {
190        table,
191        initial_gtid_set,
192        binlog_full_metadata,
193    };
194
195    let text_columns = text_columns.map(|mut columns| {
196        columns.sort();
197        columns
198            .into_iter()
199            .map(WithOptionValue::Ident::<Aug>)
200            .collect()
201    });
202
203    let exclude_columns = exclude_columns.map(|mut columns| {
204        columns.sort();
205        columns
206            .into_iter()
207            .map(WithOptionValue::Ident::<Aug>)
208            .collect()
209    });
210    Ok(MySqlExportStatementValues {
211        columns,
212        constraints,
213        text_columns,
214        exclude_columns,
215        details,
216        external_reference: purified_export.external_reference,
217    })
218}
219
220/// Map a list of column references to a map of table references to column names.
221pub(super) fn map_column_refs<'a>(
222    cols: &'a [UnresolvedItemName],
223    option_type: MySqlConfigOptionName,
224) -> Result<BTreeMap<QualifiedTableRef<'a>, BTreeSet<&'a str>>, PlanError> {
225    let mut table_to_cols = BTreeMap::new();
226    for name in cols.iter() {
227        // We only support fully qualified references for now (e.g. `schema_name.table_name.column_name`)
228        if name.0.len() == 3 {
229            let key = mz_mysql_util::QualifiedTableRef {
230                schema_name: name.0[0].as_str(),
231                table_name: name.0[1].as_str(),
232            };
233            let new = table_to_cols
234                .entry(key)
235                .or_insert_with(BTreeSet::new)
236                .insert(name.0[2].as_str());
237            if !new {
238                return Err(PlanError::InvalidOptionValue {
239                    option_name: option_type.to_ast_string_simple(),
240                    err: Box::new(PlanError::UnexpectedDuplicateReference { name: name.clone() }),
241                });
242            }
243        } else {
244            return Err(PlanError::InvalidOptionValue {
245                option_name: option_type.to_ast_string_simple(),
246                err: Box::new(PlanError::UnderqualifiedColumnName(name.to_string())),
247            });
248        }
249    }
250    Ok(table_to_cols)
251}
252
253/// Normalize column references to a sorted, deduplicated options list of column names.
254pub(super) fn normalize_column_refs(
255    cols: Vec<UnresolvedItemName>,
256    reference_resolver: &SourceReferenceResolver,
257    tables: &[RequestedSourceExport<MySqlTableDesc>],
258    option_name: &str,
259) -> Result<Vec<WithOptionValue<Aug>>, MySqlSourcePurificationError> {
260    let (seq, unknown): (Vec<_>, Vec<_>) = cols.into_iter().partition(|name| {
261        let (column_name, qual) = name.0.split_last().expect("non-empty");
262        match reference_resolver.resolve_idx(qual) {
263            // TODO: this needs to also introduce the maximum qualification on
264            // the columns, i.e. ensure they have the schema name.
265            Ok(idx) => tables[idx]
266                .meta
267                .columns
268                .iter()
269                .any(|n| &n.name == column_name.as_str()),
270            Err(_) => false,
271        }
272    });
273
274    if !unknown.is_empty() {
275        return Err(MySqlSourcePurificationError::DanglingColumns {
276            option_name: option_name.to_string(),
277            items: unknown,
278        });
279    }
280
281    let mut seq: Vec<_> = seq
282        .into_iter()
283        .map(WithOptionValue::UnresolvedItemName)
284        .collect();
285    seq.sort();
286    seq.dedup();
287    Ok(seq)
288}
289
290pub(super) async fn validate_requested_references_privileges(
291    requested_external_references: impl Iterator<Item = &UnresolvedItemName>,
292    conn: &mut mz_mysql_util::MySqlConn,
293) -> Result<(), PlanError> {
294    // Ensure that we have correct privileges on all tables; we have to do this before we
295    // start snapshotting because if we discover we cannot `SELECT` from a table while
296    // snapshotting, we break the entire source.
297    let tables_to_check_permissions = requested_external_references
298        .map(external_reference_to_table)
299        .collect::<Result<Vec<_>, _>>()?;
300
301    validate_source_privileges(&mut *conn, &tables_to_check_permissions)
302        .await
303        .map_err(|err| match err {
304            MySqlError::MissingPrivileges(missing_privileges) => {
305                MySqlSourcePurificationError::UserLacksPrivileges(
306                    missing_privileges
307                        .into_iter()
308                        .map(|mp| (mp.privilege, mp.qualified_table_name))
309                        .collect(),
310                )
311                .into()
312            }
313            _ => PlanError::MySqlConnectionErr { cause: err.into() },
314        })?;
315
316    Ok(())
317}
318
319pub(super) struct PurifiedSourceExports {
320    /// map of source export names to the details of the export
321    pub(super) source_exports: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
322    // NOTE(roshan): The text columns and exclude columns are already part of their
323    // appropriate `source_exports` above, but these are returned to allow
324    // round-tripping a `CREATE SOURCE` statement while we still allow creating
325    // implicit subsources from `CREATE SOURCE`. Remove once
326    // fully deprecating that feature and forcing users to use explicit
327    // `CREATE TABLE .. FROM SOURCE` statements.
328    pub(super) normalized_text_columns: Vec<WithOptionValue<Aug>>,
329    pub(super) normalized_exclude_columns: Vec<WithOptionValue<Aug>>,
330}
331
332// Purify the requested external references, returning a set of purified
333// source exports corresponding to external tables, and and additional
334// fields necessary to generate relevant statements and update statement options
335pub(super) async fn purify_source_exports(
336    conn: &mut mz_mysql_util::MySqlConn,
337    retrieved_references: &RetrievedSourceReferences,
338    requested_references: &Option<ExternalReferences>,
339    text_columns: Vec<UnresolvedItemName>,
340    exclude_columns: Vec<UnresolvedItemName>,
341    unresolved_source_name: &UnresolvedItemName,
342    initial_gtid_set: String,
343    reference_policy: &SourceReferencePolicy,
344    binlog_full_metadata: bool,
345) -> Result<PurifiedSourceExports, PlanError> {
346    let requested_exports = match requested_references.as_ref() {
347        Some(requested) if matches!(reference_policy, SourceReferencePolicy::NotAllowed) => {
348            Err(PlanError::UseTablesForSources(requested.to_string()))?
349        }
350        Some(requested) => retrieved_references
351            .requested_source_exports(Some(requested), unresolved_source_name)?,
352        None => {
353            if matches!(reference_policy, SourceReferencePolicy::Required) {
354                Err(MySqlSourcePurificationError::RequiresExternalReferences)?
355            }
356
357            // If no external reference is specified, it does not make sense to include
358            // or text or exclude columns.
359            if !text_columns.is_empty() {
360                Err(
361                    MySqlSourcePurificationError::UnnecessaryOptionsWithoutReferences(
362                        "TEXT COLUMNS".to_string(),
363                    ),
364                )?
365            }
366            if !exclude_columns.is_empty() {
367                Err(
368                    MySqlSourcePurificationError::UnnecessaryOptionsWithoutReferences(
369                        "EXCLUDE COLUMNS".to_string(),
370                    ),
371                )?
372            }
373
374            return Ok(PurifiedSourceExports {
375                source_exports: BTreeMap::new(),
376                normalized_text_columns: vec![],
377                normalized_exclude_columns: vec![],
378            });
379        }
380    };
381
382    if requested_exports.is_empty() {
383        sql_bail!(
384            "MySQL source must ingest at least one table, but {} matched none",
385            requested_references
386                .as_ref()
387                .unwrap()
388                .to_ast_string_simple()
389        );
390    }
391
392    super::validate_source_export_names(&requested_exports)?;
393
394    let text_cols_map = map_column_refs(&text_columns, MySqlConfigOptionName::TextColumns)?;
395    let exclude_columns_map =
396        map_column_refs(&exclude_columns, MySqlConfigOptionName::ExcludeColumns)?;
397
398    // Convert the raw tables into a format that we can use to generate source exports
399    // using any applicable text_columns and exclude_columns.
400    let tables = requested_exports
401        .into_iter()
402        .map(|requested_export| {
403            let table = requested_export.meta.mysql_table().expect("is mysql");
404            let table_ref = table.table_ref();
405            // we are cloning the BTreeSet<&str> so we can avoid a borrow on `table` here
406            let text_cols = text_cols_map.get(&table_ref).map(|s| s.clone());
407            let exclude_columns = exclude_columns_map.get(&table_ref).map(|s| s.clone());
408            let parsed_table = table
409                .clone()
410                .to_desc(text_cols.as_ref(), exclude_columns.as_ref())
411                .map_err(|err| match err {
412                    mz_mysql_util::MySqlError::UnsupportedDataTypes { columns } => {
413                        PlanError::from(MySqlSourcePurificationError::UnrecognizedTypes {
414                            cols: columns
415                                .into_iter()
416                                .map(|c| (c.qualified_table_name, c.column_name, c.column_type))
417                                .collect(),
418                        })
419                    }
420                    mz_mysql_util::MySqlError::DuplicatedColumnNames {
421                        qualified_table_name,
422                        columns,
423                    } => PlanError::from(MySqlSourcePurificationError::DuplicatedColumnNames(
424                        qualified_table_name,
425                        columns,
426                    )),
427                    _ => err.into(),
428                })?;
429            Ok(requested_export.change_meta(parsed_table))
430        })
431        .collect::<Result<Vec<_>, PlanError>>()?;
432
433    if tables.is_empty() {
434        Err(MySqlSourcePurificationError::EmptyDatabase)?;
435    }
436
437    validate_requested_references_privileges(tables.iter().map(|t| &t.external_reference), conn)
438        .await?;
439
440    let reference_resolver = SourceReferenceResolver::new(
441        MYSQL_DATABASE_FAKE_NAME,
442        &tables.iter().map(|r| &r.meta).collect::<Vec<_>>(),
443    )?;
444
445    // Normalize column options and remove unused column references.
446    let normalized_text_columns = normalize_column_refs(
447        text_columns.clone(),
448        &reference_resolver,
449        &tables,
450        "TEXT COLUMNS",
451    )?;
452    let normalized_exclude_columns = normalize_column_refs(
453        exclude_columns.clone(),
454        &reference_resolver,
455        &tables,
456        "EXCLUDE COLUMNS",
457    )?;
458
459    let source_exports = tables
460        .into_iter()
461        .map(|r| {
462            let table_ref = mz_mysql_util::QualifiedTableRef {
463                schema_name: r.meta.schema_name.as_str(),
464                table_name: r.meta.name.as_str(),
465            };
466            (
467                r.name,
468                PurifiedSourceExport {
469                    external_reference: r.external_reference,
470                    details: PurifiedExportDetails::MySql {
471                        table: r.meta.clone(),
472                        text_columns: text_cols_map.get(&table_ref).map(|cols| {
473                            cols.iter()
474                                .map(|c| Ident::new(*c).expect("validated above"))
475                                .collect()
476                        }),
477                        exclude_columns: exclude_columns_map.get(&table_ref).map(|cols| {
478                            cols.iter()
479                                .map(|c| Ident::new(*c).expect("validated above"))
480                                .collect()
481                        }),
482                        initial_gtid_set: initial_gtid_set.clone(),
483                        binlog_full_metadata,
484                    },
485                },
486            )
487        })
488        .collect();
489
490    Ok(PurifiedSourceExports {
491        source_exports,
492        normalized_text_columns,
493        normalized_exclude_columns,
494    })
495}
496
497/// Checks if the requested external references contain any explicit references
498/// to system schemas, since these are otherwise default excluded when retrieving
499/// tables from MySQL.
500pub(super) fn references_system_schemas(requested_references: &Option<ExternalReferences>) -> bool {
501    match requested_references {
502        Some(requested) => match requested {
503            ExternalReferences::All => false,
504            ExternalReferences::SubsetSchemas(schemas) => schemas
505                .iter()
506                .any(|schema| SYSTEM_SCHEMAS.contains(&schema.as_str())),
507            ExternalReferences::SubsetTables(tables) => tables.iter().any(|table| {
508                SYSTEM_SCHEMAS.contains(
509                    &external_reference_to_table(&table.reference)
510                        .map(|t| t.schema_name)
511                        .unwrap_or_default(),
512                )
513            }),
514        },
515        None => false,
516    }
517}
518
519pub async fn ensure_binlog_full_metadata(
520    conn: &mut mz_mysql_util::MySqlConn,
521) -> Result<(), MySqlSourcePurificationError> {
522    let version = mz_mysql_util::query_sys_var(conn, "version")
523        .await
524        .map_err(|err| MySqlSourcePurificationError::InvalidConnection(err.into()))?;
525    if version_compare::compare_to(&version, "8.0.1", version_compare::Cmp::Lt).map_err(|_| {
526        MySqlSourcePurificationError::UnsupportedMySqlVersion {
527            version: version.clone(),
528        }
529    })? {
530        Err(MySqlSourcePurificationError::UnsupportedMySqlVersion { version })?;
531    }
532    let binlog_metadata_setting = mz_mysql_util::query_sys_var(conn, "binlog_row_metadata")
533        .await
534        .map_err(|err| MySqlSourcePurificationError::InvalidConnection(err.into()))?;
535    let binlog_full_metadata = binlog_metadata_setting.eq_ignore_ascii_case("FULL");
536    if !binlog_full_metadata {
537        Err(
538            MySqlSourcePurificationError::UnsupportedBinlogMetadataSetting {
539                setting: binlog_metadata_setting,
540            },
541        )
542    } else {
543        Ok(())
544    }
545}
546
547pub async fn is_binlog_full_metadata(
548    conn: &mut mz_mysql_util::MySqlConn,
549) -> Result<bool, MySqlSourcePurificationError> {
550    match ensure_binlog_full_metadata(conn).await {
551        Ok(_) => Ok(true),
552        // If we get an InvalidConnection error, we have failed to query the upstream for
553        // version or binlog format information, which is fatal to purification.
554        // Other types of errors simply indicate that binlow_row_metadata is not full for some reason.
555        Err(MySqlSourcePurificationError::InvalidConnection(err)) => {
556            Err(MySqlSourcePurificationError::InvalidConnection(err))
557        }
558        Err(err) => {
559            tracing::info!(
560                error = ?err,
561                "unable to verify MySQL binlog format, proceeding without full metadata"
562            );
563            Ok(false)
564        }
565    }
566}