1use std::collections::{BTreeMap, BTreeSet};
13
14use mz_mysql_util::{
15 MySqlError, MySqlTableDesc, QualifiedTableRef, SYSTEM_SCHEMAS, validate_source_privileges,
16};
17use mz_proto::RustType;
18use mz_sql_parser::ast::display::AstDisplay;
19use mz_sql_parser::ast::{
20 ColumnDef, CreateSubsourceOption, CreateSubsourceOptionName, CreateSubsourceStatement,
21 ExternalReferences, Ident, MySqlConfigOptionName, TableConstraint, WithOptionValue,
22};
23use mz_sql_parser::ast::{UnresolvedItemName, Value};
24use mz_storage_types::sources::{SourceExportStatementDetails, SourceReferenceResolver};
25use prost::Message;
26
27use crate::names::Aug;
28use crate::plan::{PlanError, StatementContext};
29use crate::pure::{MySqlSourcePurificationError, ResolvedItemName};
30
31use super::references::RetrievedSourceReferences;
32use super::{
33 PurifiedExportDetails, PurifiedSourceExport, RequestedSourceExport, SourceReferencePolicy,
34};
35
36pub(crate) static MYSQL_DATABASE_FAKE_NAME: &str = "mysql";
40
41pub(super) fn external_reference_to_table(
43 name: &UnresolvedItemName,
44) -> Result<QualifiedTableRef, MySqlSourcePurificationError> {
45 if name.0.len() != 2 {
46 Err(MySqlSourcePurificationError::InvalidTableReference(
47 name.to_string(),
48 ))?
49 }
50 Ok(QualifiedTableRef {
51 schema_name: name.0[0].as_str(),
52 table_name: name.0[1].as_str(),
53 })
54}
55
56pub fn generate_create_subsource_statements(
57 scx: &StatementContext,
58 source_name: ResolvedItemName,
59 requested_subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
60) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
61 let mut subsources = Vec::with_capacity(requested_subsources.len());
62
63 for (subsource_name, purified_export) in requested_subsources {
64 let MySqlExportStatementValues {
65 columns,
66 constraints,
67 text_columns,
68 exclude_columns,
69 details,
70 external_reference,
71 } = generate_source_export_statement_values(scx, purified_export)?;
72
73 let mut with_options = vec![
74 CreateSubsourceOption {
75 name: CreateSubsourceOptionName::ExternalReference,
76 value: Some(WithOptionValue::UnresolvedItemName(external_reference)),
77 },
78 CreateSubsourceOption {
79 name: CreateSubsourceOptionName::Details,
80 value: Some(WithOptionValue::Value(Value::String(hex::encode(
81 details.into_proto().encode_to_vec(),
82 )))),
83 },
84 ];
85
86 if let Some(text_columns) = text_columns {
87 with_options.push(CreateSubsourceOption {
88 name: CreateSubsourceOptionName::TextColumns,
89 value: Some(WithOptionValue::Sequence(text_columns)),
90 });
91 }
92
93 if let Some(exclude_columns) = exclude_columns {
94 with_options.push(CreateSubsourceOption {
95 name: CreateSubsourceOptionName::ExcludeColumns,
96 value: Some(WithOptionValue::Sequence(exclude_columns)),
97 });
98 }
99
100 let subsource = CreateSubsourceStatement {
102 name: subsource_name,
103 columns,
104 of_source: Some(source_name.clone()),
105 constraints,
106 if_not_exists: false,
107 with_options,
108 };
109 subsources.push(subsource);
110 }
111
112 Ok(subsources)
113}
114
115pub(super) struct MySqlExportStatementValues {
116 pub(super) columns: Vec<ColumnDef<Aug>>,
117 pub(super) constraints: Vec<TableConstraint<Aug>>,
118 pub(super) text_columns: Option<Vec<WithOptionValue<Aug>>>,
119 pub(super) exclude_columns: Option<Vec<WithOptionValue<Aug>>>,
120 pub(super) details: SourceExportStatementDetails,
121 pub(super) external_reference: UnresolvedItemName,
122}
123
124pub(super) fn generate_source_export_statement_values(
125 scx: &StatementContext,
126 purified_export: PurifiedSourceExport,
127) -> Result<MySqlExportStatementValues, PlanError> {
128 let PurifiedExportDetails::MySql {
129 table,
130 text_columns,
131 exclude_columns,
132 initial_gtid_set,
133 } = purified_export.details
134 else {
135 unreachable!("purified export details must be mysql")
136 };
137
138 let mut columns = vec![];
140 for c in table.columns.iter() {
141 match c.column_type {
142 None => {}
145 Some(ref column_type) => {
146 let name = Ident::new(&c.name)?;
147
148 let ty = mz_pgrepr::Type::from(&column_type.scalar_type);
149 let data_type = scx.resolve_type(ty)?;
150 let mut col_options = vec![];
151
152 if !column_type.nullable {
153 col_options.push(mz_sql_parser::ast::ColumnOptionDef {
154 name: None,
155 option: mz_sql_parser::ast::ColumnOption::NotNull,
156 });
157 }
158 columns.push(ColumnDef {
159 name,
160 data_type,
161 collation: None,
162 options: col_options,
163 });
164 }
165 }
166 }
167
168 let mut constraints = vec![];
169 for key in table.keys.iter() {
170 let columns: Result<Vec<Ident>, _> = key.columns.iter().map(Ident::new).collect();
171
172 let constraint = mz_sql_parser::ast::TableConstraint::Unique {
173 name: Some(Ident::new(&key.name)?),
174 columns: columns?,
175 is_primary: key.is_primary,
176 nulls_not_distinct: false,
178 };
179
180 if key.is_primary {
182 constraints.insert(0, constraint);
183 } else {
184 constraints.push(constraint);
185 }
186 }
187
188 let details = SourceExportStatementDetails::MySql {
189 table,
190 initial_gtid_set,
191 };
192
193 let text_columns = text_columns.map(|mut columns| {
194 columns.sort();
195 columns
196 .into_iter()
197 .map(WithOptionValue::Ident::<Aug>)
198 .collect()
199 });
200
201 let exclude_columns = exclude_columns.map(|mut columns| {
202 columns.sort();
203 columns
204 .into_iter()
205 .map(WithOptionValue::Ident::<Aug>)
206 .collect()
207 });
208 Ok(MySqlExportStatementValues {
209 columns,
210 constraints,
211 text_columns,
212 exclude_columns,
213 details,
214 external_reference: purified_export.external_reference,
215 })
216}
217
218pub(super) fn map_column_refs<'a>(
220 cols: &'a [UnresolvedItemName],
221 option_type: MySqlConfigOptionName,
222) -> Result<BTreeMap<QualifiedTableRef<'a>, BTreeSet<&'a str>>, PlanError> {
223 let mut table_to_cols = BTreeMap::new();
224 for name in cols.iter() {
225 if name.0.len() == 3 {
227 let key = mz_mysql_util::QualifiedTableRef {
228 schema_name: name.0[0].as_str(),
229 table_name: name.0[1].as_str(),
230 };
231 let new = table_to_cols
232 .entry(key)
233 .or_insert_with(BTreeSet::new)
234 .insert(name.0[2].as_str());
235 if !new {
236 return Err(PlanError::InvalidOptionValue {
237 option_name: option_type.to_ast_string_simple(),
238 err: Box::new(PlanError::UnexpectedDuplicateReference { name: name.clone() }),
239 });
240 }
241 } else {
242 return Err(PlanError::InvalidOptionValue {
243 option_name: option_type.to_ast_string_simple(),
244 err: Box::new(PlanError::UnderqualifiedColumnName(name.to_string())),
245 });
246 }
247 }
248 Ok(table_to_cols)
249}
250
251pub(super) fn normalize_column_refs(
253 cols: Vec<UnresolvedItemName>,
254 reference_resolver: &SourceReferenceResolver,
255 tables: &[RequestedSourceExport<MySqlTableDesc>],
256 option_name: &str,
257) -> Result<Vec<WithOptionValue<Aug>>, MySqlSourcePurificationError> {
258 let (seq, unknown): (Vec<_>, Vec<_>) = cols.into_iter().partition(|name| {
259 let (column_name, qual) = name.0.split_last().expect("non-empty");
260 match reference_resolver.resolve_idx(qual) {
261 Ok(idx) => tables[idx]
264 .meta
265 .columns
266 .iter()
267 .any(|n| &n.name == column_name.as_str()),
268 Err(_) => false,
269 }
270 });
271
272 if !unknown.is_empty() {
273 return Err(MySqlSourcePurificationError::DanglingColumns {
274 option_name: option_name.to_string(),
275 items: unknown,
276 });
277 }
278
279 let mut seq: Vec<_> = seq
280 .into_iter()
281 .map(WithOptionValue::UnresolvedItemName)
282 .collect();
283 seq.sort();
284 seq.dedup();
285 Ok(seq)
286}
287
288pub(super) async fn validate_requested_references_privileges(
289 requested_external_references: impl Iterator<Item = &UnresolvedItemName>,
290 conn: &mut mz_mysql_util::MySqlConn,
291) -> Result<(), PlanError> {
292 let tables_to_check_permissions = requested_external_references
296 .map(external_reference_to_table)
297 .collect::<Result<Vec<_>, _>>()?;
298
299 validate_source_privileges(&mut *conn, &tables_to_check_permissions)
300 .await
301 .map_err(|err| match err {
302 MySqlError::MissingPrivileges(missing_privileges) => {
303 MySqlSourcePurificationError::UserLacksPrivileges(
304 missing_privileges
305 .into_iter()
306 .map(|mp| (mp.privilege, mp.qualified_table_name))
307 .collect(),
308 )
309 .into()
310 }
311 _ => PlanError::MySqlConnectionErr { cause: err.into() },
312 })?;
313
314 Ok(())
315}
316
317pub(super) struct PurifiedSourceExports {
318 pub(super) source_exports: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
320 pub(super) normalized_text_columns: Vec<WithOptionValue<Aug>>,
327 pub(super) normalized_exclude_columns: Vec<WithOptionValue<Aug>>,
328}
329
330pub(super) async fn purify_source_exports(
334 conn: &mut mz_mysql_util::MySqlConn,
335 retrieved_references: &RetrievedSourceReferences,
336 requested_references: &Option<ExternalReferences>,
337 text_columns: Vec<UnresolvedItemName>,
338 exclude_columns: Vec<UnresolvedItemName>,
339 unresolved_source_name: &UnresolvedItemName,
340 initial_gtid_set: String,
341 reference_policy: &SourceReferencePolicy,
342) -> Result<PurifiedSourceExports, PlanError> {
343 let requested_exports = match requested_references.as_ref() {
344 Some(requested) if matches!(reference_policy, SourceReferencePolicy::NotAllowed) => {
345 Err(PlanError::UseTablesForSources(requested.to_string()))?
346 }
347 Some(requested) => retrieved_references
348 .requested_source_exports(Some(requested), unresolved_source_name)?,
349 None => {
350 if matches!(reference_policy, SourceReferencePolicy::Required) {
351 Err(MySqlSourcePurificationError::RequiresExternalReferences)?
352 }
353
354 if !text_columns.is_empty() {
357 Err(
358 MySqlSourcePurificationError::UnnecessaryOptionsWithoutReferences(
359 "TEXT COLUMNS".to_string(),
360 ),
361 )?
362 }
363 if !exclude_columns.is_empty() {
364 Err(
365 MySqlSourcePurificationError::UnnecessaryOptionsWithoutReferences(
366 "EXCLUDE COLUMNS".to_string(),
367 ),
368 )?
369 }
370
371 return Ok(PurifiedSourceExports {
372 source_exports: BTreeMap::new(),
373 normalized_text_columns: vec![],
374 normalized_exclude_columns: vec![],
375 });
376 }
377 };
378
379 if requested_exports.is_empty() {
380 sql_bail!(
381 "MySQL source must ingest at least one table, but {} matched none",
382 requested_references
383 .as_ref()
384 .unwrap()
385 .to_ast_string_simple()
386 );
387 }
388
389 super::validate_source_export_names(&requested_exports)?;
390
391 let text_cols_map = map_column_refs(&text_columns, MySqlConfigOptionName::TextColumns)?;
392 let exclude_columns_map =
393 map_column_refs(&exclude_columns, MySqlConfigOptionName::ExcludeColumns)?;
394
395 let tables = requested_exports
398 .into_iter()
399 .map(|requested_export| {
400 let table = requested_export.meta.mysql_table().expect("is mysql");
401 let table_ref = table.table_ref();
402 let text_cols = text_cols_map.get(&table_ref).map(|s| s.clone());
404 let exclude_columns = exclude_columns_map.get(&table_ref).map(|s| s.clone());
405 let parsed_table = table
406 .clone()
407 .to_desc(text_cols.as_ref(), exclude_columns.as_ref())
408 .map_err(|err| match err {
409 mz_mysql_util::MySqlError::UnsupportedDataTypes { columns } => {
410 PlanError::from(MySqlSourcePurificationError::UnrecognizedTypes {
411 cols: columns
412 .into_iter()
413 .map(|c| (c.qualified_table_name, c.column_name, c.column_type))
414 .collect(),
415 })
416 }
417 mz_mysql_util::MySqlError::DuplicatedColumnNames {
418 qualified_table_name,
419 columns,
420 } => PlanError::from(MySqlSourcePurificationError::DuplicatedColumnNames(
421 qualified_table_name,
422 columns,
423 )),
424 _ => err.into(),
425 })?;
426 Ok(requested_export.change_meta(parsed_table))
427 })
428 .collect::<Result<Vec<_>, PlanError>>()?;
429
430 if tables.is_empty() {
431 Err(MySqlSourcePurificationError::EmptyDatabase)?;
432 }
433
434 validate_requested_references_privileges(tables.iter().map(|t| &t.external_reference), conn)
435 .await?;
436
437 let reference_resolver = SourceReferenceResolver::new(
438 MYSQL_DATABASE_FAKE_NAME,
439 &tables.iter().map(|r| &r.meta).collect::<Vec<_>>(),
440 )?;
441
442 let normalized_text_columns = normalize_column_refs(
444 text_columns.clone(),
445 &reference_resolver,
446 &tables,
447 "TEXT COLUMNS",
448 )?;
449 let normalized_exclude_columns = normalize_column_refs(
450 exclude_columns.clone(),
451 &reference_resolver,
452 &tables,
453 "EXCLUDE COLUMNS",
454 )?;
455
456 let source_exports = tables
457 .into_iter()
458 .map(|r| {
459 let table_ref = mz_mysql_util::QualifiedTableRef {
460 schema_name: r.meta.schema_name.as_str(),
461 table_name: r.meta.name.as_str(),
462 };
463 (
464 r.name,
465 PurifiedSourceExport {
466 external_reference: r.external_reference,
467 details: PurifiedExportDetails::MySql {
468 table: r.meta.clone(),
469 text_columns: text_cols_map.get(&table_ref).map(|cols| {
470 cols.iter()
471 .map(|c| Ident::new(*c).expect("validated above"))
472 .collect()
473 }),
474 exclude_columns: exclude_columns_map.get(&table_ref).map(|cols| {
475 cols.iter()
476 .map(|c| Ident::new(*c).expect("validated above"))
477 .collect()
478 }),
479 initial_gtid_set: initial_gtid_set.clone(),
480 },
481 },
482 )
483 })
484 .collect();
485
486 Ok(PurifiedSourceExports {
487 source_exports,
488 normalized_text_columns,
489 normalized_exclude_columns,
490 })
491}
492
493pub(super) fn references_system_schemas(requested_references: &Option<ExternalReferences>) -> bool {
497 match requested_references {
498 Some(requested) => match requested {
499 ExternalReferences::All => false,
500 ExternalReferences::SubsetSchemas(schemas) => schemas
501 .iter()
502 .any(|schema| SYSTEM_SCHEMAS.contains(&schema.as_str())),
503 ExternalReferences::SubsetTables(tables) => tables.iter().any(|table| {
504 SYSTEM_SCHEMAS.contains(
505 &external_reference_to_table(&table.reference)
506 .map(|t| t.schema_name)
507 .unwrap_or_default(),
508 )
509 }),
510 },
511 None => false,
512 }
513}