1use std::collections::{BTreeMap, BTreeSet};
13
14use mz_mysql_util::{
15 MySqlError, MySqlTableDesc, QualifiedTableRef, SYSTEM_SCHEMAS, validate_source_privileges,
16};
17use mz_proto::RustType;
18use mz_sql_parser::ast::display::AstDisplay;
19use mz_sql_parser::ast::{
20 ColumnDef, CreateSubsourceOption, CreateSubsourceOptionName, CreateSubsourceStatement,
21 ExternalReferences, Ident, MySqlConfigOptionName, TableConstraint, WithOptionValue,
22};
23use mz_sql_parser::ast::{UnresolvedItemName, Value};
24use mz_storage_types::sources::{SourceExportStatementDetails, SourceReferenceResolver};
25use prost::Message;
26
27use crate::names::Aug;
28use crate::plan::{PlanError, StatementContext};
29use crate::pure::{MySqlSourcePurificationError, ResolvedItemName};
30
31use super::references::RetrievedSourceReferences;
32use super::{
33 PurifiedExportDetails, PurifiedSourceExport, RequestedSourceExport, SourceReferencePolicy,
34};
35
36pub(crate) static MYSQL_DATABASE_FAKE_NAME: &str = "mysql";
40
41pub(super) fn external_reference_to_table(
43 name: &UnresolvedItemName,
44) -> Result<QualifiedTableRef<'_>, MySqlSourcePurificationError> {
45 if name.0.len() != 2 {
46 Err(MySqlSourcePurificationError::InvalidTableReference(
47 name.to_string(),
48 ))?
49 }
50 Ok(QualifiedTableRef {
51 schema_name: name.0[0].as_str(),
52 table_name: name.0[1].as_str(),
53 })
54}
55
56pub fn generate_create_subsource_statements(
57 scx: &StatementContext,
58 source_name: ResolvedItemName,
59 requested_subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
60) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
61 let mut subsources = Vec::with_capacity(requested_subsources.len());
62
63 for (subsource_name, purified_export) in requested_subsources {
64 let MySqlExportStatementValues {
65 columns,
66 constraints,
67 text_columns,
68 exclude_columns,
69 details,
70 external_reference,
71 } = generate_source_export_statement_values(scx, purified_export)?;
72
73 let mut with_options = vec![
74 CreateSubsourceOption {
75 name: CreateSubsourceOptionName::ExternalReference,
76 value: Some(WithOptionValue::UnresolvedItemName(external_reference)),
77 },
78 CreateSubsourceOption {
79 name: CreateSubsourceOptionName::Details,
80 value: Some(WithOptionValue::Value(Value::String(hex::encode(
81 details.into_proto().encode_to_vec(),
82 )))),
83 },
84 ];
85
86 if let Some(text_columns) = text_columns {
87 with_options.push(CreateSubsourceOption {
88 name: CreateSubsourceOptionName::TextColumns,
89 value: Some(WithOptionValue::Sequence(text_columns)),
90 });
91 }
92
93 if let Some(exclude_columns) = exclude_columns {
94 with_options.push(CreateSubsourceOption {
95 name: CreateSubsourceOptionName::ExcludeColumns,
96 value: Some(WithOptionValue::Sequence(exclude_columns)),
97 });
98 }
99
100 let subsource = CreateSubsourceStatement {
102 name: subsource_name,
103 columns,
104 of_source: Some(source_name.clone()),
105 constraints,
106 if_not_exists: false,
107 with_options,
108 };
109 subsources.push(subsource);
110 }
111
112 Ok(subsources)
113}
114
115pub(super) struct MySqlExportStatementValues {
116 pub(super) columns: Vec<ColumnDef<Aug>>,
117 pub(super) constraints: Vec<TableConstraint<Aug>>,
118 pub(super) text_columns: Option<Vec<WithOptionValue<Aug>>>,
119 pub(super) exclude_columns: Option<Vec<WithOptionValue<Aug>>>,
120 pub(super) details: SourceExportStatementDetails,
121 pub(super) external_reference: UnresolvedItemName,
122}
123
124pub(super) fn generate_source_export_statement_values(
125 scx: &StatementContext,
126 purified_export: PurifiedSourceExport,
127) -> Result<MySqlExportStatementValues, PlanError> {
128 let PurifiedExportDetails::MySql {
129 table,
130 text_columns,
131 exclude_columns,
132 initial_gtid_set,
133 binlog_full_metadata,
134 } = purified_export.details
135 else {
136 bail_internal!("purified export details must be mysql");
137 };
138
139 let mut columns = vec![];
141 for c in table.columns.iter() {
142 match c.column_type {
143 None => {}
146 Some(ref column_type) => {
147 let name = Ident::new(&c.name)?;
148
149 let ty = mz_pgrepr::Type::from(&column_type.scalar_type);
150 let data_type = scx.resolve_type(ty)?;
151 let mut col_options = vec![];
152
153 if !column_type.nullable {
154 col_options.push(mz_sql_parser::ast::ColumnOptionDef {
155 name: None,
156 option: mz_sql_parser::ast::ColumnOption::NotNull,
157 });
158 }
159 columns.push(ColumnDef {
160 name,
161 data_type,
162 collation: None,
163 options: col_options,
164 });
165 }
166 }
167 }
168
169 let mut constraints = vec![];
170 for key in table.keys.iter() {
171 let columns: Result<Vec<Ident>, _> = key.columns.iter().map(Ident::new).collect();
172
173 let constraint = mz_sql_parser::ast::TableConstraint::Unique {
174 name: Some(Ident::new(&key.name)?),
175 columns: columns?,
176 is_primary: key.is_primary,
177 nulls_not_distinct: false,
179 };
180
181 if key.is_primary {
183 constraints.insert(0, constraint);
184 } else {
185 constraints.push(constraint);
186 }
187 }
188
189 let details = SourceExportStatementDetails::MySql {
190 table,
191 initial_gtid_set,
192 binlog_full_metadata,
193 };
194
195 let text_columns = text_columns.map(|mut columns| {
196 columns.sort();
197 columns
198 .into_iter()
199 .map(WithOptionValue::Ident::<Aug>)
200 .collect()
201 });
202
203 let exclude_columns = exclude_columns.map(|mut columns| {
204 columns.sort();
205 columns
206 .into_iter()
207 .map(WithOptionValue::Ident::<Aug>)
208 .collect()
209 });
210 Ok(MySqlExportStatementValues {
211 columns,
212 constraints,
213 text_columns,
214 exclude_columns,
215 details,
216 external_reference: purified_export.external_reference,
217 })
218}
219
220pub(super) fn map_column_refs<'a>(
222 cols: &'a [UnresolvedItemName],
223 option_type: MySqlConfigOptionName,
224) -> Result<BTreeMap<QualifiedTableRef<'a>, BTreeSet<&'a str>>, PlanError> {
225 let mut table_to_cols = BTreeMap::new();
226 for name in cols.iter() {
227 if name.0.len() == 3 {
229 let key = mz_mysql_util::QualifiedTableRef {
230 schema_name: name.0[0].as_str(),
231 table_name: name.0[1].as_str(),
232 };
233 let new = table_to_cols
234 .entry(key)
235 .or_insert_with(BTreeSet::new)
236 .insert(name.0[2].as_str());
237 if !new {
238 return Err(PlanError::InvalidOptionValue {
239 option_name: option_type.to_ast_string_simple(),
240 err: Box::new(PlanError::UnexpectedDuplicateReference { name: name.clone() }),
241 });
242 }
243 } else {
244 return Err(PlanError::InvalidOptionValue {
245 option_name: option_type.to_ast_string_simple(),
246 err: Box::new(PlanError::UnderqualifiedColumnName(name.to_string())),
247 });
248 }
249 }
250 Ok(table_to_cols)
251}
252
253pub(super) fn normalize_column_refs(
255 cols: Vec<UnresolvedItemName>,
256 reference_resolver: &SourceReferenceResolver,
257 tables: &[RequestedSourceExport<MySqlTableDesc>],
258 option_name: &str,
259) -> Result<Vec<WithOptionValue<Aug>>, MySqlSourcePurificationError> {
260 let (seq, unknown): (Vec<_>, Vec<_>) = cols.into_iter().partition(|name| {
261 let (column_name, qual) = name.0.split_last().expect("non-empty");
262 match reference_resolver.resolve_idx(qual) {
263 Ok(idx) => tables[idx]
266 .meta
267 .columns
268 .iter()
269 .any(|n| &n.name == column_name.as_str()),
270 Err(_) => false,
271 }
272 });
273
274 if !unknown.is_empty() {
275 return Err(MySqlSourcePurificationError::DanglingColumns {
276 option_name: option_name.to_string(),
277 items: unknown,
278 });
279 }
280
281 let mut seq: Vec<_> = seq
282 .into_iter()
283 .map(WithOptionValue::UnresolvedItemName)
284 .collect();
285 seq.sort();
286 seq.dedup();
287 Ok(seq)
288}
289
290pub(super) async fn validate_requested_references_privileges(
291 requested_external_references: impl Iterator<Item = &UnresolvedItemName>,
292 conn: &mut mz_mysql_util::MySqlConn,
293) -> Result<(), PlanError> {
294 let tables_to_check_permissions = requested_external_references
298 .map(external_reference_to_table)
299 .collect::<Result<Vec<_>, _>>()?;
300
301 validate_source_privileges(&mut *conn, &tables_to_check_permissions)
302 .await
303 .map_err(|err| match err {
304 MySqlError::MissingPrivileges(missing_privileges) => {
305 MySqlSourcePurificationError::UserLacksPrivileges(
306 missing_privileges
307 .into_iter()
308 .map(|mp| (mp.privilege, mp.qualified_table_name))
309 .collect(),
310 )
311 .into()
312 }
313 _ => PlanError::MySqlConnectionErr { cause: err.into() },
314 })?;
315
316 Ok(())
317}
318
319pub(super) struct PurifiedSourceExports {
320 pub(super) source_exports: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
322 pub(super) normalized_text_columns: Vec<WithOptionValue<Aug>>,
329 pub(super) normalized_exclude_columns: Vec<WithOptionValue<Aug>>,
330}
331
332pub(super) async fn purify_source_exports(
336 conn: &mut mz_mysql_util::MySqlConn,
337 retrieved_references: &RetrievedSourceReferences,
338 requested_references: &Option<ExternalReferences>,
339 text_columns: Vec<UnresolvedItemName>,
340 exclude_columns: Vec<UnresolvedItemName>,
341 unresolved_source_name: &UnresolvedItemName,
342 initial_gtid_set: String,
343 reference_policy: &SourceReferencePolicy,
344 binlog_full_metadata: bool,
345) -> Result<PurifiedSourceExports, PlanError> {
346 let requested_exports = match requested_references.as_ref() {
347 Some(requested) if matches!(reference_policy, SourceReferencePolicy::NotAllowed) => {
348 Err(PlanError::UseTablesForSources(requested.to_string()))?
349 }
350 Some(requested) => retrieved_references
351 .requested_source_exports(Some(requested), unresolved_source_name)?,
352 None => {
353 if matches!(reference_policy, SourceReferencePolicy::Required) {
354 Err(MySqlSourcePurificationError::RequiresExternalReferences)?
355 }
356
357 if !text_columns.is_empty() {
360 Err(
361 MySqlSourcePurificationError::UnnecessaryOptionsWithoutReferences(
362 "TEXT COLUMNS".to_string(),
363 ),
364 )?
365 }
366 if !exclude_columns.is_empty() {
367 Err(
368 MySqlSourcePurificationError::UnnecessaryOptionsWithoutReferences(
369 "EXCLUDE COLUMNS".to_string(),
370 ),
371 )?
372 }
373
374 return Ok(PurifiedSourceExports {
375 source_exports: BTreeMap::new(),
376 normalized_text_columns: vec![],
377 normalized_exclude_columns: vec![],
378 });
379 }
380 };
381
382 if requested_exports.is_empty() {
383 sql_bail!(
384 "MySQL source must ingest at least one table, but {} matched none",
385 requested_references
386 .as_ref()
387 .unwrap()
388 .to_ast_string_simple()
389 );
390 }
391
392 super::validate_source_export_names(&requested_exports)?;
393
394 let text_cols_map = map_column_refs(&text_columns, MySqlConfigOptionName::TextColumns)?;
395 let exclude_columns_map =
396 map_column_refs(&exclude_columns, MySqlConfigOptionName::ExcludeColumns)?;
397
398 let tables = requested_exports
401 .into_iter()
402 .map(|requested_export| {
403 let table = requested_export.meta.mysql_table().expect("is mysql");
404 let table_ref = table.table_ref();
405 let text_cols = text_cols_map.get(&table_ref).map(|s| s.clone());
407 let exclude_columns = exclude_columns_map.get(&table_ref).map(|s| s.clone());
408 let parsed_table = table
409 .clone()
410 .to_desc(text_cols.as_ref(), exclude_columns.as_ref())
411 .map_err(|err| match err {
412 mz_mysql_util::MySqlError::UnsupportedDataTypes { columns } => {
413 PlanError::from(MySqlSourcePurificationError::UnrecognizedTypes {
414 cols: columns
415 .into_iter()
416 .map(|c| (c.qualified_table_name, c.column_name, c.column_type))
417 .collect(),
418 })
419 }
420 mz_mysql_util::MySqlError::DuplicatedColumnNames {
421 qualified_table_name,
422 columns,
423 } => PlanError::from(MySqlSourcePurificationError::DuplicatedColumnNames(
424 qualified_table_name,
425 columns,
426 )),
427 _ => err.into(),
428 })?;
429 Ok(requested_export.change_meta(parsed_table))
430 })
431 .collect::<Result<Vec<_>, PlanError>>()?;
432
433 if tables.is_empty() {
434 Err(MySqlSourcePurificationError::EmptyDatabase)?;
435 }
436
437 validate_requested_references_privileges(tables.iter().map(|t| &t.external_reference), conn)
438 .await?;
439
440 let reference_resolver = SourceReferenceResolver::new(
441 MYSQL_DATABASE_FAKE_NAME,
442 &tables.iter().map(|r| &r.meta).collect::<Vec<_>>(),
443 )?;
444
445 let normalized_text_columns = normalize_column_refs(
447 text_columns.clone(),
448 &reference_resolver,
449 &tables,
450 "TEXT COLUMNS",
451 )?;
452 let normalized_exclude_columns = normalize_column_refs(
453 exclude_columns.clone(),
454 &reference_resolver,
455 &tables,
456 "EXCLUDE COLUMNS",
457 )?;
458
459 let source_exports = tables
460 .into_iter()
461 .map(|r| {
462 let table_ref = mz_mysql_util::QualifiedTableRef {
463 schema_name: r.meta.schema_name.as_str(),
464 table_name: r.meta.name.as_str(),
465 };
466 (
467 r.name,
468 PurifiedSourceExport {
469 external_reference: r.external_reference,
470 details: PurifiedExportDetails::MySql {
471 table: r.meta.clone(),
472 text_columns: text_cols_map.get(&table_ref).map(|cols| {
473 cols.iter()
474 .map(|c| Ident::new(*c).expect("validated above"))
475 .collect()
476 }),
477 exclude_columns: exclude_columns_map.get(&table_ref).map(|cols| {
478 cols.iter()
479 .map(|c| Ident::new(*c).expect("validated above"))
480 .collect()
481 }),
482 initial_gtid_set: initial_gtid_set.clone(),
483 binlog_full_metadata,
484 },
485 },
486 )
487 })
488 .collect();
489
490 Ok(PurifiedSourceExports {
491 source_exports,
492 normalized_text_columns,
493 normalized_exclude_columns,
494 })
495}
496
497pub(super) fn references_system_schemas(requested_references: &Option<ExternalReferences>) -> bool {
501 match requested_references {
502 Some(requested) => match requested {
503 ExternalReferences::All => false,
504 ExternalReferences::SubsetSchemas(schemas) => schemas
505 .iter()
506 .any(|schema| SYSTEM_SCHEMAS.contains(&schema.as_str())),
507 ExternalReferences::SubsetTables(tables) => tables.iter().any(|table| {
508 SYSTEM_SCHEMAS.contains(
509 &external_reference_to_table(&table.reference)
510 .map(|t| t.schema_name)
511 .unwrap_or_default(),
512 )
513 }),
514 },
515 None => false,
516 }
517}
518
519pub async fn ensure_binlog_full_metadata(
520 conn: &mut mz_mysql_util::MySqlConn,
521) -> Result<(), MySqlSourcePurificationError> {
522 let version = mz_mysql_util::query_sys_var(conn, "version")
523 .await
524 .map_err(|err| MySqlSourcePurificationError::InvalidConnection(err.into()))?;
525 if version_compare::compare_to(&version, "8.0.1", version_compare::Cmp::Lt).map_err(|_| {
526 MySqlSourcePurificationError::UnsupportedMySqlVersion {
527 version: version.clone(),
528 }
529 })? {
530 Err(MySqlSourcePurificationError::UnsupportedMySqlVersion { version })?;
531 }
532 let binlog_metadata_setting = mz_mysql_util::query_sys_var(conn, "binlog_row_metadata")
533 .await
534 .map_err(|err| MySqlSourcePurificationError::InvalidConnection(err.into()))?;
535 let binlog_full_metadata = binlog_metadata_setting.eq_ignore_ascii_case("FULL");
536 if !binlog_full_metadata {
537 Err(
538 MySqlSourcePurificationError::UnsupportedBinlogMetadataSetting {
539 setting: binlog_metadata_setting,
540 },
541 )
542 } else {
543 Ok(())
544 }
545}
546
547pub async fn is_binlog_full_metadata(
548 conn: &mut mz_mysql_util::MySqlConn,
549) -> Result<bool, MySqlSourcePurificationError> {
550 match ensure_binlog_full_metadata(conn).await {
551 Ok(_) => Ok(true),
552 Err(MySqlSourcePurificationError::InvalidConnection(err)) => {
556 Err(MySqlSourcePurificationError::InvalidConnection(err))
557 }
558 Err(err) => {
559 tracing::info!(
560 error = ?err,
561 "unable to verify MySQL binlog format, proceeding without full metadata"
562 );
563 Ok(false)
564 }
565 }
566}