1use std::collections::{BTreeMap, BTreeSet};
11use std::sync::Arc;
12use std::time::Duration;
13
14use mz_proto::RustType;
15use mz_sql_parser::ast::display::AstDisplay;
16use mz_sql_parser::ast::{
17 ColumnDef, CreateSubsourceOption, CreateSubsourceOptionName, CreateSubsourceStatement,
18 ExternalReferences, Ident, SqlServerConfigOptionName, TableConstraint, UnresolvedItemName,
19 Value, WithOptionValue,
20};
21use mz_sql_server_util::desc::{
22 SqlServerColumnDecodeType, SqlServerColumnDesc, SqlServerQualifiedTableName,
23 SqlServerTableConstraintType, SqlServerTableDesc,
24};
25use mz_storage_types::sources::{SourceExportStatementDetails, SourceReferenceResolver};
26use prost::Message;
27
28use crate::names::{Aug, ResolvedItemName};
29use crate::plan::{PlanError, StatementContext};
30use crate::pure::{
31 PurifiedExportDetails, PurifiedSourceExport, RequestedSourceExport, RetrievedSourceReferences,
32 SourceReferencePolicy, SqlServerSourcePurificationError,
33};
34
35pub(super) struct PurifiedSourceExports {
36 pub(super) source_exports: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
38 pub(super) normalized_text_columns: Vec<WithOptionValue<Aug>>,
44 pub(super) normalized_excl_columns: Vec<WithOptionValue<Aug>>,
50}
51
52#[allow(clippy::unused_async)]
55pub(super) async fn purify_source_exports(
56 database: &str,
57 client: &mut mz_sql_server_util::Client,
58 retrieved_references: &RetrievedSourceReferences,
59 requested_references: &Option<ExternalReferences>,
60 text_columns: &[UnresolvedItemName],
61 excl_columns: &[UnresolvedItemName],
62 unresolved_source_name: &UnresolvedItemName,
63 timeout: Duration,
64 reference_policy: &SourceReferencePolicy,
65) -> Result<PurifiedSourceExports, PlanError> {
66 let requested_exports = match requested_references.as_ref() {
67 Some(requested) => {
68 if *reference_policy == SourceReferencePolicy::NotAllowed {
69 return Err(PlanError::UseTablesForSources(requested.to_string()));
70 } else {
71 let exports = retrieved_references
72 .requested_source_exports(Some(requested), unresolved_source_name)?;
73 exports
74 }
75 }
76 None => {
77 if *reference_policy == SourceReferencePolicy::Required {
78 return Err(SqlServerSourcePurificationError::RequiresExternalReferences.into());
79 }
80
81 if !text_columns.is_empty() {
84 Err(
85 SqlServerSourcePurificationError::UnnecessaryOptionsWithoutReferences(
86 "TEXT COLUMNS".to_string(),
87 ),
88 )?
89 }
90 if !excl_columns.is_empty() {
91 Err(
92 SqlServerSourcePurificationError::UnnecessaryOptionsWithoutReferences(
93 "EXCLUDE COLUMNS".to_string(),
94 ),
95 )?
96 }
97
98 return Ok(PurifiedSourceExports {
99 source_exports: BTreeMap::default(),
100 normalized_text_columns: Vec::default(),
101 normalized_excl_columns: Vec::default(),
102 });
103 }
104 };
105
106 if requested_exports.is_empty() {
107 sql_bail!(
108 "SQL Server source must ingest at least one table, but {} matched none",
109 requested_references
110 .as_ref()
111 .expect("checked above")
112 .to_ast_string_simple()
113 )
114 }
115
116 super::validate_source_export_names(&requested_exports)?;
117
118 let text_cols_map = map_column_refs(text_columns, SqlServerConfigOptionName::TextColumns)?;
121 let excl_cols_map = map_column_refs(excl_columns, SqlServerConfigOptionName::ExcludeColumns)?;
122
123 for export in &requested_exports {
131 let table = export.meta.sql_server_table().expect("sql server source");
132 let maybe_excl_cols = excl_cols_map.get(&table.qualified_name());
133 let is_excluded = |name| {
134 maybe_excl_cols
135 .as_ref()
136 .map(|cols| cols.contains(name))
137 .unwrap_or(false)
138 };
139
140 let capture_instance = export
141 .meta
142 .sql_server_capture_instance()
143 .expect("sql server capture instance");
144 let mut cdc_columns =
145 mz_sql_server_util::inspect::get_cdc_table_columns(client, capture_instance).await?;
146 let mut mismatched_columns = vec![];
147
148 for column in table.columns.iter() {
149 let col_excluded = is_excluded(column.name.as_ref());
150 if let SqlServerColumnDecodeType::Unsupported { context } = &column.decode_type
151 && !col_excluded
152 {
153 Err(SqlServerSourcePurificationError::UnsupportedColumn {
154 schema_name: Arc::clone(&table.schema_name),
155 tbl_name: Arc::clone(&table.name),
156 col_name: Arc::clone(&column.name),
157 col_type: Arc::clone(&column.raw_type),
158 context: context.clone(),
159 })?
160 }
161
162 let cdc_column = cdc_columns.remove(&column.name);
164 if col_excluded {
165 continue;
166 }
167 let Some(cdc_column) = cdc_column else {
168 mismatched_columns.push(Arc::clone(&column.name));
169 continue;
170 };
171
172 let cdc_column = SqlServerColumnDesc::new(&cdc_column);
173 match (cdc_column.column_type.as_ref(), column.column_type.as_ref()) {
174 (None, None) => (),
175 (Some(cdc_type), Some(tbl_type)) => {
176 if cdc_type.scalar_type != tbl_type.scalar_type {
179 mismatched_columns.push(Arc::clone(&column.name));
180 }
181 }
182 (Some(_), None) | (None, Some(_)) => {
183 mismatched_columns.push(Arc::clone(&column.name));
184 }
185 }
186 }
187
188 if !mismatched_columns.is_empty() {
192 Err(SqlServerSourcePurificationError::CdcMissingColumns {
193 capture_instance: Arc::clone(capture_instance),
194 col_names: mismatched_columns,
195 })?
196 }
197 }
198
199 let capture_instances: BTreeMap<_, _> = requested_exports
200 .iter()
201 .map(|requested| {
202 let table = requested
203 .meta
204 .sql_server_table()
205 .expect("sql server source");
206 let capture_instance = requested
207 .meta
208 .sql_server_capture_instance()
209 .expect("sql server source");
210
211 (table.qualified_name(), Arc::clone(capture_instance))
212 })
213 .collect();
214
215 mz_sql_server_util::inspect::validate_source_privileges(
216 client,
217 capture_instances.values().map(|instance| instance.as_ref()),
218 )
219 .await?;
220
221 let mut initial_lsns = mz_sql_server_util::inspect::get_min_lsns(
228 client,
229 capture_instances.values().map(|instance| instance.as_ref()),
230 )
231 .await?;
232
233 let max_lsn = mz_sql_server_util::inspect::get_max_lsn_retry(client, timeout).await?;
234 tracing::debug!(?initial_lsns, %max_lsn, "retrieved start LSNs");
235 for lsn in initial_lsns.values_mut() {
236 *lsn = std::cmp::max(*lsn, max_lsn);
237 }
238
239 let mut tables = vec![];
240 for requested in requested_exports {
241 let mut table = requested
242 .meta
243 .sql_server_table()
244 .expect("sql server source")
245 .clone();
246
247 let maybe_text_cols = text_cols_map.get(&table.qualified_name());
248 let maybe_excl_cols = excl_cols_map.get(&table.qualified_name());
249
250 if let Some(text_cols) = maybe_text_cols {
251 table.apply_text_columns(text_cols);
252 }
253
254 if let Some(excl_cols) = maybe_excl_cols {
255 table.apply_excl_columns(excl_cols);
256 }
257
258 if table.columns.iter().all(|c| c.is_excluded()) {
259 Err(SqlServerSourcePurificationError::AllColumnsExcluded {
260 tbl_name: Arc::clone(&table.name),
261 })?
262 }
263
264 tables.push(requested.change_meta(table));
265 }
266
267 if tables.is_empty() {
268 Err(SqlServerSourcePurificationError::NoTables)?;
269 }
270
271 let reference_resolver = SourceReferenceResolver::new(
272 database,
273 &tables.iter().map(|r| &r.meta).collect::<Vec<_>>(),
274 )?;
275
276 let normalized_text_columns = normalize_column_refs(
278 text_columns,
279 &reference_resolver,
280 &tables,
281 SqlServerConfigOptionName::TextColumns,
282 )?;
283 let normalized_excl_columns = normalize_column_refs(
284 excl_columns,
285 &reference_resolver,
286 &tables,
287 SqlServerConfigOptionName::ExcludeColumns,
288 )?;
289
290 let exports = tables
291 .into_iter()
292 .map(|reference| {
293 let table_reference = reference.meta.qualified_name();
294 let text_columns = text_cols_map.get(&table_reference).map(|cols| {
295 cols.iter()
296 .map(|c| Ident::new(*c).expect("validated above"))
297 .collect()
298 });
299 let excl_columns = excl_cols_map.get(&table_reference).map(|cols| {
300 cols.iter()
301 .map(|c| Ident::new(*c).expect("validated above"))
302 .collect()
303 });
304 let capture_instance = capture_instances
305 .get(&reference.meta.qualified_name())
306 .expect("capture instance should exist");
307
308 let initial_lsn = *initial_lsns.get(capture_instance).ok_or_else(|| {
309 SqlServerSourcePurificationError::NoStartLsn(capture_instance.to_string())
310 })?;
311
312 let export = PurifiedSourceExport {
313 external_reference: reference.external_reference,
314 details: PurifiedExportDetails::SqlServer {
315 table: reference.meta,
316 text_columns,
317 excl_columns,
318 capture_instance: Arc::clone(capture_instance),
319 initial_lsn,
320 },
321 };
322
323 Ok::<_, SqlServerSourcePurificationError>((reference.name, export))
324 })
325 .collect::<Result<_, _>>()?;
326
327 Ok(PurifiedSourceExports {
328 source_exports: exports,
329 normalized_text_columns,
330 normalized_excl_columns,
331 })
332}
333
334pub fn generate_create_subsource_statements(
335 scx: &StatementContext,
336 source_name: ResolvedItemName,
337 requested_subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
338) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
339 let mut subsources = Vec::with_capacity(requested_subsources.len());
340
341 for (subsource_name, purified_export) in requested_subsources {
342 let SqlServerExportStatementValues {
343 columns,
344 constraints,
345 text_columns,
346 excl_columns,
347 details,
348 external_reference,
349 } = generate_source_export_statement_values(scx, purified_export)?;
350
351 let mut with_options = vec![
352 CreateSubsourceOption {
353 name: CreateSubsourceOptionName::ExternalReference,
354 value: Some(WithOptionValue::UnresolvedItemName(external_reference)),
355 },
356 CreateSubsourceOption {
357 name: CreateSubsourceOptionName::Details,
358 value: Some(WithOptionValue::Value(Value::String(hex::encode(
359 details.into_proto().encode_to_vec(),
360 )))),
361 },
362 ];
363
364 if let Some(text_columns) = text_columns {
365 with_options.push(CreateSubsourceOption {
366 name: CreateSubsourceOptionName::TextColumns,
367 value: Some(WithOptionValue::Sequence(text_columns)),
368 });
369 }
370 if let Some(excl_columns) = excl_columns {
371 with_options.push(CreateSubsourceOption {
372 name: CreateSubsourceOptionName::ExcludeColumns,
373 value: Some(WithOptionValue::Sequence(excl_columns)),
374 });
375 }
376
377 let subsource = CreateSubsourceStatement {
379 name: subsource_name,
380 columns,
381 of_source: Some(source_name.clone()),
382 constraints,
383 if_not_exists: false,
384 with_options,
385 };
386 subsources.push(subsource);
387 }
388
389 Ok(subsources)
390}
391
392pub(super) struct SqlServerExportStatementValues {
393 pub(super) columns: Vec<ColumnDef<Aug>>,
394 pub(super) constraints: Vec<TableConstraint<Aug>>,
395 pub(super) text_columns: Option<Vec<WithOptionValue<Aug>>>,
396 pub(super) excl_columns: Option<Vec<WithOptionValue<Aug>>>,
397 pub(super) details: SourceExportStatementDetails,
398 pub(super) external_reference: UnresolvedItemName,
399}
400
401pub(super) fn generate_source_export_statement_values(
402 scx: &StatementContext,
403 purified_export: PurifiedSourceExport,
404) -> Result<SqlServerExportStatementValues, PlanError> {
405 let PurifiedExportDetails::SqlServer {
406 table,
407 text_columns,
408 excl_columns,
409 capture_instance,
410 initial_lsn,
411 } = purified_export.details
412 else {
413 unreachable!("purified export details must be SQL Server")
414 };
415
416 let included_columns = table
418 .columns
419 .iter()
420 .filter_map(|c| c.column_type.as_ref().map(|ct| (c.name.as_ref(), ct)));
421
422 let mut column_defs = vec![];
423 let mut constraints = vec![];
424
425 let excluded_columns: BTreeSet<_> = table
427 .columns
428 .iter()
429 .filter_map(|c| match &c.column_type {
430 None => Some(c.name.as_ref()),
431 Some(_) => None,
432 })
433 .collect();
434 for constraint in table.constraints.iter() {
435 if constraint
438 .column_names
439 .iter()
440 .any(|col| excluded_columns.contains(col.as_str()))
441 {
442 tracing::debug!(
443 "skipping constraint {name} due to excluded column",
444 name = &constraint.constraint_name
445 );
446 continue;
447 }
448
449 constraints.push(mz_sql_parser::ast::TableConstraint::Unique {
450 name: Some(Ident::new_lossy(constraint.constraint_name.clone())),
451 columns: constraint
452 .column_names
453 .iter()
454 .map(Ident::new)
455 .collect::<Result<_, _>>()?,
456 is_primary: matches!(
457 constraint.constraint_type,
458 SqlServerTableConstraintType::PrimaryKey
459 ),
460 nulls_not_distinct: matches!(
463 constraint.constraint_type,
464 SqlServerTableConstraintType::Unique
465 ),
466 });
467 }
468
469 tracing::debug!(
470 "source export constraints for {schema_name}.{table_name}: {constraints:?}",
471 schema_name = &table.schema_name,
472 table_name = &table.name
473 );
474
475 for (col_name, col_type) in included_columns {
476 let name = Ident::new(col_name)?;
477 let ty = mz_pgrepr::Type::from(&col_type.scalar_type);
478 let data_type = scx.resolve_type(ty)?;
479 let mut col_options = vec![];
480
481 if !col_type.nullable {
482 col_options.push(mz_sql_parser::ast::ColumnOptionDef {
483 name: None,
484 option: mz_sql_parser::ast::ColumnOption::NotNull,
485 });
486 }
487
488 column_defs.push(ColumnDef {
489 name,
490 data_type,
491 collation: None,
492 options: col_options,
493 });
494 }
495
496 let details = SourceExportStatementDetails::SqlServer {
497 table,
498 capture_instance,
499 initial_lsn,
500 };
501 let text_columns = text_columns.map(|mut columns| {
502 columns.sort();
503 columns
504 .into_iter()
505 .map(WithOptionValue::Ident::<Aug>)
506 .collect()
507 });
508 let excl_columns = excl_columns.map(|mut columns| {
509 columns.sort();
510 columns
511 .into_iter()
512 .map(WithOptionValue::Ident::<Aug>)
513 .collect()
514 });
515
516 let values = SqlServerExportStatementValues {
517 columns: column_defs,
518 constraints,
519 text_columns,
520 excl_columns,
521 details,
522 external_reference: purified_export.external_reference,
523 };
524 Ok(values)
525}
526
527fn map_column_refs<'a>(
529 cols: &'a [UnresolvedItemName],
530 option_type: SqlServerConfigOptionName,
531) -> Result<BTreeMap<SqlServerQualifiedTableName, BTreeSet<&'a str>>, PlanError> {
532 let mut table_to_cols: BTreeMap<_, BTreeSet<_>> = BTreeMap::new();
533 for name in cols.iter() {
534 if name.0.len() == 3 {
536 let key = mz_sql_server_util::desc::SqlServerQualifiedTableName {
537 schema_name: name.0[0].as_str().into(),
538 table_name: name.0[1].as_str().into(),
539 };
540 let new = table_to_cols
541 .entry(key)
542 .or_default()
543 .insert(name.0[2].as_str());
544 if !new {
545 return Err(PlanError::InvalidOptionValue {
546 option_name: option_type.to_ast_string_simple(),
547 err: Box::new(PlanError::UnexpectedDuplicateReference { name: name.clone() }),
548 });
549 }
550 } else {
551 return Err(PlanError::InvalidOptionValue {
552 option_name: option_type.to_ast_string_simple(),
553 err: Box::new(PlanError::UnderqualifiedColumnName(name.to_string())),
554 });
555 }
556 }
557 Ok(table_to_cols)
558}
559
560fn normalize_column_refs(
566 cols: &[UnresolvedItemName],
567 reference_resolver: &SourceReferenceResolver,
568 tables: &[RequestedSourceExport<SqlServerTableDesc>],
569 option_name: SqlServerConfigOptionName,
570) -> Result<Vec<WithOptionValue<Aug>>, SqlServerSourcePurificationError> {
571 let (seq, unknown): (Vec<_>, Vec<_>) = cols.into_iter().partition(|name| {
572 let (column_name, qual) = name.0.split_last().expect("non-empty");
573 match reference_resolver.resolve_idx(qual) {
574 Ok(idx) => tables[idx]
577 .meta
578 .columns
579 .iter()
580 .any(|n| &*n.name == column_name.as_str()),
581 Err(_) => false,
582 }
583 });
584
585 if !unknown.is_empty() {
586 return Err(SqlServerSourcePurificationError::DanglingColumns {
587 option_name: option_name.to_string(),
588 items: unknown.into_iter().cloned().collect(),
589 });
590 }
591
592 let mut seq: Vec<_> = seq
593 .into_iter()
594 .cloned()
595 .map(WithOptionValue::UnresolvedItemName)
596 .collect();
597
598 seq.sort();
599 seq.dedup();
600 Ok(seq)
601}