mz_sql/pure/
mysql.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! MySQL utilities for SQL purification.
11
12use std::collections::{BTreeMap, BTreeSet};
13
14use mz_mysql_util::{
15    MySqlError, MySqlTableDesc, QualifiedTableRef, SYSTEM_SCHEMAS, validate_source_privileges,
16};
17use mz_proto::RustType;
18use mz_sql_parser::ast::display::AstDisplay;
19use mz_sql_parser::ast::{
20    ColumnDef, CreateSubsourceOption, CreateSubsourceOptionName, CreateSubsourceStatement,
21    ExternalReferences, Ident, MySqlConfigOptionName, TableConstraint, WithOptionValue,
22};
23use mz_sql_parser::ast::{UnresolvedItemName, Value};
24use mz_storage_types::sources::{SourceExportStatementDetails, SourceReferenceResolver};
25use prost::Message;
26
27use crate::names::Aug;
28use crate::plan::{PlanError, StatementContext};
29use crate::pure::{MySqlSourcePurificationError, ResolvedItemName};
30
31use super::references::RetrievedSourceReferences;
32use super::{
33    PurifiedExportDetails, PurifiedSourceExport, RequestedSourceExport, SourceReferencePolicy,
34};
35
36/// The name of the fake database that we use for MySQL sources
37/// to fit our model of a 3-layer catalog. MySQL doesn't have a concept
38/// of databases AND schemas, it treats both as the same thing.
39pub(crate) static MYSQL_DATABASE_FAKE_NAME: &str = "mysql";
40
41/// Convert an unresolved item name to a qualified table reference.
42pub(super) fn external_reference_to_table(
43    name: &UnresolvedItemName,
44) -> Result<QualifiedTableRef, MySqlSourcePurificationError> {
45    if name.0.len() != 2 {
46        Err(MySqlSourcePurificationError::InvalidTableReference(
47            name.to_string(),
48        ))?
49    }
50    Ok(QualifiedTableRef {
51        schema_name: name.0[0].as_str(),
52        table_name: name.0[1].as_str(),
53    })
54}
55
56pub fn generate_create_subsource_statements(
57    scx: &StatementContext,
58    source_name: ResolvedItemName,
59    requested_subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
60) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
61    let mut subsources = Vec::with_capacity(requested_subsources.len());
62
63    for (subsource_name, purified_export) in requested_subsources {
64        let MySqlExportStatementValues {
65            columns,
66            constraints,
67            text_columns,
68            exclude_columns,
69            details,
70            external_reference,
71        } = generate_source_export_statement_values(scx, purified_export)?;
72
73        let mut with_options = vec![
74            CreateSubsourceOption {
75                name: CreateSubsourceOptionName::ExternalReference,
76                value: Some(WithOptionValue::UnresolvedItemName(external_reference)),
77            },
78            CreateSubsourceOption {
79                name: CreateSubsourceOptionName::Details,
80                value: Some(WithOptionValue::Value(Value::String(hex::encode(
81                    details.into_proto().encode_to_vec(),
82                )))),
83            },
84        ];
85
86        if let Some(text_columns) = text_columns {
87            with_options.push(CreateSubsourceOption {
88                name: CreateSubsourceOptionName::TextColumns,
89                value: Some(WithOptionValue::Sequence(text_columns)),
90            });
91        }
92
93        if let Some(exclude_columns) = exclude_columns {
94            with_options.push(CreateSubsourceOption {
95                name: CreateSubsourceOptionName::ExcludeColumns,
96                value: Some(WithOptionValue::Sequence(exclude_columns)),
97            });
98        }
99
100        // Create the subsource statement
101        let subsource = CreateSubsourceStatement {
102            name: subsource_name,
103            columns,
104            of_source: Some(source_name.clone()),
105            constraints,
106            if_not_exists: false,
107            with_options,
108        };
109        subsources.push(subsource);
110    }
111
112    Ok(subsources)
113}
114
115pub(super) struct MySqlExportStatementValues {
116    pub(super) columns: Vec<ColumnDef<Aug>>,
117    pub(super) constraints: Vec<TableConstraint<Aug>>,
118    pub(super) text_columns: Option<Vec<WithOptionValue<Aug>>>,
119    pub(super) exclude_columns: Option<Vec<WithOptionValue<Aug>>>,
120    pub(super) details: SourceExportStatementDetails,
121    pub(super) external_reference: UnresolvedItemName,
122}
123
124pub(super) fn generate_source_export_statement_values(
125    scx: &StatementContext,
126    purified_export: PurifiedSourceExport,
127) -> Result<MySqlExportStatementValues, PlanError> {
128    let PurifiedExportDetails::MySql {
129        table,
130        text_columns,
131        exclude_columns,
132        initial_gtid_set,
133    } = purified_export.details
134    else {
135        unreachable!("purified export details must be mysql")
136    };
137
138    // Figure out the schema of the subsource
139    let mut columns = vec![];
140    for c in table.columns.iter() {
141        match c.column_type {
142            // This column is intentionally ignored, so we don't generate a column for it in
143            // the subsource.
144            None => {}
145            Some(ref column_type) => {
146                let name = Ident::new(&c.name)?;
147
148                let ty = mz_pgrepr::Type::from(&column_type.scalar_type);
149                let data_type = scx.resolve_type(ty)?;
150                let mut col_options = vec![];
151
152                if !column_type.nullable {
153                    col_options.push(mz_sql_parser::ast::ColumnOptionDef {
154                        name: None,
155                        option: mz_sql_parser::ast::ColumnOption::NotNull,
156                    });
157                }
158                columns.push(ColumnDef {
159                    name,
160                    data_type,
161                    collation: None,
162                    options: col_options,
163                });
164            }
165        }
166    }
167
168    let mut constraints = vec![];
169    for key in table.keys.iter() {
170        let columns: Result<Vec<Ident>, _> = key.columns.iter().map(Ident::new).collect();
171
172        let constraint = mz_sql_parser::ast::TableConstraint::Unique {
173            name: Some(Ident::new(&key.name)?),
174            columns: columns?,
175            is_primary: key.is_primary,
176            // MySQL always permits multiple nulls values in unique indexes.
177            nulls_not_distinct: false,
178        };
179
180        // We take the first constraint available to be the primary key.
181        if key.is_primary {
182            constraints.insert(0, constraint);
183        } else {
184            constraints.push(constraint);
185        }
186    }
187
188    let details = SourceExportStatementDetails::MySql {
189        table,
190        initial_gtid_set,
191    };
192
193    let text_columns = text_columns.map(|mut columns| {
194        columns.sort();
195        columns
196            .into_iter()
197            .map(WithOptionValue::Ident::<Aug>)
198            .collect()
199    });
200
201    let exclude_columns = exclude_columns.map(|mut columns| {
202        columns.sort();
203        columns
204            .into_iter()
205            .map(WithOptionValue::Ident::<Aug>)
206            .collect()
207    });
208    Ok(MySqlExportStatementValues {
209        columns,
210        constraints,
211        text_columns,
212        exclude_columns,
213        details,
214        external_reference: purified_export.external_reference,
215    })
216}
217
218/// Map a list of column references to a map of table references to column names.
219pub(super) fn map_column_refs<'a>(
220    cols: &'a [UnresolvedItemName],
221    option_type: MySqlConfigOptionName,
222) -> Result<BTreeMap<QualifiedTableRef<'a>, BTreeSet<&'a str>>, PlanError> {
223    let mut table_to_cols = BTreeMap::new();
224    for name in cols.iter() {
225        // We only support fully qualified references for now (e.g. `schema_name.table_name.column_name`)
226        if name.0.len() == 3 {
227            let key = mz_mysql_util::QualifiedTableRef {
228                schema_name: name.0[0].as_str(),
229                table_name: name.0[1].as_str(),
230            };
231            let new = table_to_cols
232                .entry(key)
233                .or_insert_with(BTreeSet::new)
234                .insert(name.0[2].as_str());
235            if !new {
236                return Err(PlanError::InvalidOptionValue {
237                    option_name: option_type.to_ast_string_simple(),
238                    err: Box::new(PlanError::UnexpectedDuplicateReference { name: name.clone() }),
239                });
240            }
241        } else {
242            return Err(PlanError::InvalidOptionValue {
243                option_name: option_type.to_ast_string_simple(),
244                err: Box::new(PlanError::UnderqualifiedColumnName(name.to_string())),
245            });
246        }
247    }
248    Ok(table_to_cols)
249}
250
251/// Normalize column references to a sorted, deduplicated options list of column names.
252pub(super) fn normalize_column_refs(
253    cols: Vec<UnresolvedItemName>,
254    reference_resolver: &SourceReferenceResolver,
255    tables: &[RequestedSourceExport<MySqlTableDesc>],
256    option_name: &str,
257) -> Result<Vec<WithOptionValue<Aug>>, MySqlSourcePurificationError> {
258    let (seq, unknown): (Vec<_>, Vec<_>) = cols.into_iter().partition(|name| {
259        let (column_name, qual) = name.0.split_last().expect("non-empty");
260        match reference_resolver.resolve_idx(qual) {
261            // TODO: this needs to also introduce the maximum qualification on
262            // the columns, i.e. ensure they have the schema name.
263            Ok(idx) => tables[idx]
264                .meta
265                .columns
266                .iter()
267                .any(|n| &n.name == column_name.as_str()),
268            Err(_) => false,
269        }
270    });
271
272    if !unknown.is_empty() {
273        return Err(MySqlSourcePurificationError::DanglingColumns {
274            option_name: option_name.to_string(),
275            items: unknown,
276        });
277    }
278
279    let mut seq: Vec<_> = seq
280        .into_iter()
281        .map(WithOptionValue::UnresolvedItemName)
282        .collect();
283    seq.sort();
284    seq.dedup();
285    Ok(seq)
286}
287
288pub(super) async fn validate_requested_references_privileges(
289    requested_external_references: impl Iterator<Item = &UnresolvedItemName>,
290    conn: &mut mz_mysql_util::MySqlConn,
291) -> Result<(), PlanError> {
292    // Ensure that we have correct privileges on all tables; we have to do this before we
293    // start snapshotting because if we discover we cannot `SELECT` from a table while
294    // snapshotting, we break the entire source.
295    let tables_to_check_permissions = requested_external_references
296        .map(external_reference_to_table)
297        .collect::<Result<Vec<_>, _>>()?;
298
299    validate_source_privileges(&mut *conn, &tables_to_check_permissions)
300        .await
301        .map_err(|err| match err {
302            MySqlError::MissingPrivileges(missing_privileges) => {
303                MySqlSourcePurificationError::UserLacksPrivileges(
304                    missing_privileges
305                        .into_iter()
306                        .map(|mp| (mp.privilege, mp.qualified_table_name))
307                        .collect(),
308                )
309                .into()
310            }
311            _ => PlanError::MySqlConnectionErr { cause: err.into() },
312        })?;
313
314    Ok(())
315}
316
317pub(super) struct PurifiedSourceExports {
318    /// map of source export names to the details of the export
319    pub(super) source_exports: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
320    // NOTE(roshan): The text columns and exclude columns are already part of their
321    // appropriate `source_exports` above, but these are returned to allow
322    // round-tripping a `CREATE SOURCE` statement while we still allow creating
323    // implicit subsources from `CREATE SOURCE`. Remove once
324    // fully deprecating that feature and forcing users to use explicit
325    // `CREATE TABLE .. FROM SOURCE` statements.
326    pub(super) normalized_text_columns: Vec<WithOptionValue<Aug>>,
327    pub(super) normalized_exclude_columns: Vec<WithOptionValue<Aug>>,
328}
329
330// Purify the requested external references, returning a set of purified
331// source exports corresponding to external tables, and and additional
332// fields necessary to generate relevant statements and update statement options
333pub(super) async fn purify_source_exports(
334    conn: &mut mz_mysql_util::MySqlConn,
335    retrieved_references: &RetrievedSourceReferences,
336    requested_references: &Option<ExternalReferences>,
337    text_columns: Vec<UnresolvedItemName>,
338    exclude_columns: Vec<UnresolvedItemName>,
339    unresolved_source_name: &UnresolvedItemName,
340    initial_gtid_set: String,
341    reference_policy: &SourceReferencePolicy,
342) -> Result<PurifiedSourceExports, PlanError> {
343    let requested_exports = match requested_references.as_ref() {
344        Some(requested) if matches!(reference_policy, SourceReferencePolicy::NotAllowed) => {
345            Err(PlanError::UseTablesForSources(requested.to_string()))?
346        }
347        Some(requested) => retrieved_references
348            .requested_source_exports(Some(requested), unresolved_source_name)?,
349        None => {
350            if matches!(reference_policy, SourceReferencePolicy::Required) {
351                Err(MySqlSourcePurificationError::RequiresExternalReferences)?
352            }
353
354            // If no external reference is specified, it does not make sense to include
355            // or text or exclude columns.
356            if !text_columns.is_empty() {
357                Err(
358                    MySqlSourcePurificationError::UnnecessaryOptionsWithoutReferences(
359                        "TEXT COLUMNS".to_string(),
360                    ),
361                )?
362            }
363            if !exclude_columns.is_empty() {
364                Err(
365                    MySqlSourcePurificationError::UnnecessaryOptionsWithoutReferences(
366                        "EXCLUDE COLUMNS".to_string(),
367                    ),
368                )?
369            }
370
371            return Ok(PurifiedSourceExports {
372                source_exports: BTreeMap::new(),
373                normalized_text_columns: vec![],
374                normalized_exclude_columns: vec![],
375            });
376        }
377    };
378
379    if requested_exports.is_empty() {
380        sql_bail!(
381            "MySQL source must ingest at least one table, but {} matched none",
382            requested_references
383                .as_ref()
384                .unwrap()
385                .to_ast_string_simple()
386        );
387    }
388
389    super::validate_source_export_names(&requested_exports)?;
390
391    let text_cols_map = map_column_refs(&text_columns, MySqlConfigOptionName::TextColumns)?;
392    let exclude_columns_map =
393        map_column_refs(&exclude_columns, MySqlConfigOptionName::ExcludeColumns)?;
394
395    // Convert the raw tables into a format that we can use to generate source exports
396    // using any applicable text_columns and exclude_columns.
397    let tables = requested_exports
398        .into_iter()
399        .map(|requested_export| {
400            let table = requested_export.meta.mysql_table().expect("is mysql");
401            let table_ref = table.table_ref();
402            // we are cloning the BTreeSet<&str> so we can avoid a borrow on `table` here
403            let text_cols = text_cols_map.get(&table_ref).map(|s| s.clone());
404            let exclude_columns = exclude_columns_map.get(&table_ref).map(|s| s.clone());
405            let parsed_table = table
406                .clone()
407                .to_desc(text_cols.as_ref(), exclude_columns.as_ref())
408                .map_err(|err| match err {
409                    mz_mysql_util::MySqlError::UnsupportedDataTypes { columns } => {
410                        PlanError::from(MySqlSourcePurificationError::UnrecognizedTypes {
411                            cols: columns
412                                .into_iter()
413                                .map(|c| (c.qualified_table_name, c.column_name, c.column_type))
414                                .collect(),
415                        })
416                    }
417                    mz_mysql_util::MySqlError::DuplicatedColumnNames {
418                        qualified_table_name,
419                        columns,
420                    } => PlanError::from(MySqlSourcePurificationError::DuplicatedColumnNames(
421                        qualified_table_name,
422                        columns,
423                    )),
424                    _ => err.into(),
425                })?;
426            Ok(requested_export.change_meta(parsed_table))
427        })
428        .collect::<Result<Vec<_>, PlanError>>()?;
429
430    if tables.is_empty() {
431        Err(MySqlSourcePurificationError::EmptyDatabase)?;
432    }
433
434    validate_requested_references_privileges(tables.iter().map(|t| &t.external_reference), conn)
435        .await?;
436
437    let reference_resolver = SourceReferenceResolver::new(
438        MYSQL_DATABASE_FAKE_NAME,
439        &tables.iter().map(|r| &r.meta).collect::<Vec<_>>(),
440    )?;
441
442    // Normalize column options and remove unused column references.
443    let normalized_text_columns = normalize_column_refs(
444        text_columns.clone(),
445        &reference_resolver,
446        &tables,
447        "TEXT COLUMNS",
448    )?;
449    let normalized_exclude_columns = normalize_column_refs(
450        exclude_columns.clone(),
451        &reference_resolver,
452        &tables,
453        "EXCLUDE COLUMNS",
454    )?;
455
456    let source_exports = tables
457        .into_iter()
458        .map(|r| {
459            let table_ref = mz_mysql_util::QualifiedTableRef {
460                schema_name: r.meta.schema_name.as_str(),
461                table_name: r.meta.name.as_str(),
462            };
463            (
464                r.name,
465                PurifiedSourceExport {
466                    external_reference: r.external_reference,
467                    details: PurifiedExportDetails::MySql {
468                        table: r.meta.clone(),
469                        text_columns: text_cols_map.get(&table_ref).map(|cols| {
470                            cols.iter()
471                                .map(|c| Ident::new(*c).expect("validated above"))
472                                .collect()
473                        }),
474                        exclude_columns: exclude_columns_map.get(&table_ref).map(|cols| {
475                            cols.iter()
476                                .map(|c| Ident::new(*c).expect("validated above"))
477                                .collect()
478                        }),
479                        initial_gtid_set: initial_gtid_set.clone(),
480                    },
481                },
482            )
483        })
484        .collect();
485
486    Ok(PurifiedSourceExports {
487        source_exports,
488        normalized_text_columns,
489        normalized_exclude_columns,
490    })
491}
492
493/// Checks if the requested external references contain any explicit references
494/// to system schemas, since these are otherwise default excluded when retrieving
495/// tables from MySQL.
496pub(super) fn references_system_schemas(requested_references: &Option<ExternalReferences>) -> bool {
497    match requested_references {
498        Some(requested) => match requested {
499            ExternalReferences::All => false,
500            ExternalReferences::SubsetSchemas(schemas) => schemas
501                .iter()
502                .any(|schema| SYSTEM_SCHEMAS.contains(&schema.as_str())),
503            ExternalReferences::SubsetTables(tables) => tables.iter().any(|table| {
504                SYSTEM_SCHEMAS.contains(
505                    &external_reference_to_table(&table.reference)
506                        .map(|t| t.schema_name)
507                        .unwrap_or_default(),
508                )
509            }),
510        },
511        None => false,
512    }
513}