1use std::collections::{BTreeMap, BTreeSet};
11use std::sync::Arc;
12use std::time::Duration;
13
14use mz_ore::collections::CollectionExt;
15use mz_proto::RustType;
16use mz_sql_parser::ast::display::AstDisplay;
17use mz_sql_parser::ast::{
18 ColumnDef, CreateSubsourceOption, CreateSubsourceOptionName, CreateSubsourceStatement,
19 ExternalReferences, Ident, SqlServerConfigOptionName, TableConstraint, UnresolvedItemName,
20 Value, WithOptionValue,
21};
22use mz_sql_server_util::desc::{
23 SqlServerColumnDesc, SqlServerQualifiedTableName, SqlServerTableDesc,
24};
25use mz_storage_types::sources::{SourceExportStatementDetails, SourceReferenceResolver};
26use prost::Message;
27
28use crate::names::{Aug, ResolvedItemName};
29use crate::plan::{PlanError, StatementContext};
30use crate::pure::{
31 PurifiedExportDetails, PurifiedSourceExport, RequestedSourceExport, RetrievedSourceReferences,
32 SourceReferencePolicy, SqlServerSourcePurificationError,
33};
34
35pub(super) struct PurifiedSourceExports {
36 pub(super) source_exports: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
38 pub(super) normalized_text_columns: Vec<WithOptionValue<Aug>>,
44 pub(super) normalized_excl_columns: Vec<WithOptionValue<Aug>>,
50}
51
52#[allow(clippy::unused_async)]
55pub(super) async fn purify_source_exports(
56 database: &str,
57 client: &mut mz_sql_server_util::Client,
58 retrieved_references: &RetrievedSourceReferences,
59 requested_references: &Option<ExternalReferences>,
60 text_columns: &[UnresolvedItemName],
61 excl_columns: &[UnresolvedItemName],
62 unresolved_source_name: &UnresolvedItemName,
63 timeout: Duration,
64 reference_policy: &SourceReferencePolicy,
65) -> Result<PurifiedSourceExports, PlanError> {
66 let requested_exports = match requested_references.as_ref() {
67 Some(requested) => {
68 if *reference_policy == SourceReferencePolicy::NotAllowed {
69 return Err(PlanError::UseTablesForSources(requested.to_string()));
70 } else {
71 let exports = retrieved_references
72 .requested_source_exports(Some(requested), unresolved_source_name)?;
73 exports
74 }
75 }
76 None => {
77 if *reference_policy == SourceReferencePolicy::Required {
78 return Err(SqlServerSourcePurificationError::RequiresExternalReferences.into());
79 }
80
81 if !text_columns.is_empty() {
84 Err(
85 SqlServerSourcePurificationError::UnnecessaryOptionsWithoutReferences(
86 "TEXT COLUMNS".to_string(),
87 ),
88 )?
89 }
90 if !excl_columns.is_empty() {
91 Err(
92 SqlServerSourcePurificationError::UnnecessaryOptionsWithoutReferences(
93 "EXCLUDE COLUMNS".to_string(),
94 ),
95 )?
96 }
97
98 return Ok(PurifiedSourceExports {
99 source_exports: BTreeMap::default(),
100 normalized_text_columns: Vec::default(),
101 normalized_excl_columns: Vec::default(),
102 });
103 }
104 };
105
106 if requested_exports.is_empty() {
107 sql_bail!(
108 "SQL Server source must ingest at least one table, but {} matched none",
109 requested_references
110 .as_ref()
111 .expect("checked above")
112 .to_ast_string_simple()
113 )
114 }
115
116 super::validate_source_export_names(&requested_exports)?;
117
118 let text_cols_map = map_column_refs(text_columns, SqlServerConfigOptionName::TextColumns)?;
121 let excl_cols_map = map_column_refs(excl_columns, SqlServerConfigOptionName::ExcludeColumns)?;
122
123 for export in &requested_exports {
131 let table = export.meta.sql_server_table().expect("sql server source");
132 let maybe_excl_cols = excl_cols_map.get(&table.qualified_name());
133 let is_excluded = |name| {
134 maybe_excl_cols
135 .as_ref()
136 .map(|cols| cols.contains(name))
137 .unwrap_or(false)
138 };
139
140 let capture_instance = export
141 .meta
142 .sql_server_capture_instance()
143 .expect("sql server capture instance");
144 let mut cdc_columns =
145 mz_sql_server_util::inspect::get_cdc_table_columns(client, capture_instance).await?;
146 let mut mismatched_columns = vec![];
147
148 for column in table.columns.iter() {
149 let col_excluded = is_excluded(column.name.as_ref());
150 if !column.is_supported() && !col_excluded {
151 Err(SqlServerSourcePurificationError::UnsupportedColumn {
152 schema_name: Arc::clone(&table.schema_name),
153 tbl_name: Arc::clone(&table.name),
154 col_name: Arc::clone(&column.name),
155 col_type: Arc::clone(&column.raw_type),
156 })?
157 }
158
159 let cdc_column = cdc_columns.remove(&column.name);
161 if col_excluded {
162 continue;
163 }
164 let Some(cdc_column) = cdc_column else {
165 mismatched_columns.push(Arc::clone(&column.name));
166 continue;
167 };
168
169 let cdc_column = SqlServerColumnDesc::new(&cdc_column);
170 match (cdc_column.column_type.as_ref(), column.column_type.as_ref()) {
171 (None, None) => (),
172 (Some(cdc_type), Some(tbl_type)) => {
173 if cdc_type.scalar_type != tbl_type.scalar_type {
176 mismatched_columns.push(Arc::clone(&column.name));
177 }
178 }
179 (Some(_), None) | (None, Some(_)) => {
180 mismatched_columns.push(Arc::clone(&column.name));
181 }
182 }
183 }
184
185 if !mismatched_columns.is_empty() {
189 Err(SqlServerSourcePurificationError::CdcMissingColumns {
190 capture_instance: Arc::clone(capture_instance),
191 col_names: mismatched_columns,
192 })?
193 }
194 }
195
196 let capture_instances: BTreeMap<_, _> = requested_exports
197 .iter()
198 .map(|requested| {
199 let table = requested
200 .meta
201 .sql_server_table()
202 .expect("sql server source");
203 let capture_instance = requested
204 .meta
205 .sql_server_capture_instance()
206 .expect("sql server source");
207
208 (table.qualified_name(), Arc::clone(capture_instance))
209 })
210 .collect();
211
212 mz_sql_server_util::inspect::validate_source_privileges(
213 client,
214 capture_instances.values().map(|instance| instance.as_ref()),
215 )
216 .await?;
217
218 let mut initial_lsns = mz_sql_server_util::inspect::get_min_lsns(
225 client,
226 capture_instances.values().map(|instance| instance.as_ref()),
227 )
228 .await?;
229
230 let max_lsn = mz_sql_server_util::inspect::get_max_lsn_retry(client, timeout).await?;
231 tracing::debug!(?initial_lsns, %max_lsn, "retrieved start LSNs");
232 for lsn in initial_lsns.values_mut() {
233 *lsn = std::cmp::max(*lsn, max_lsn);
234 }
235
236 let mut tables = vec![];
237 for requested in requested_exports {
238 let mut table = requested
239 .meta
240 .sql_server_table()
241 .expect("sql server source")
242 .clone();
243
244 let maybe_text_cols = text_cols_map.get(&table.qualified_name());
245 let maybe_excl_cols = excl_cols_map.get(&table.qualified_name());
246
247 if let Some(text_cols) = maybe_text_cols {
248 table.apply_text_columns(text_cols);
249 }
250
251 if let Some(excl_cols) = maybe_excl_cols {
252 table.apply_excl_columns(excl_cols);
253 }
254
255 if table.columns.iter().all(|c| c.is_excluded()) {
256 Err(SqlServerSourcePurificationError::AllColumnsExcluded {
257 tbl_name: Arc::clone(&table.name),
258 })?
259 }
260
261 tables.push(requested.change_meta(table));
262 }
263
264 if tables.is_empty() {
265 Err(SqlServerSourcePurificationError::NoTables)?;
266 }
267
268 let reference_resolver = SourceReferenceResolver::new(
269 database,
270 &tables.iter().map(|r| &r.meta).collect::<Vec<_>>(),
271 )?;
272
273 let normalized_text_columns = normalize_column_refs(
275 text_columns,
276 &reference_resolver,
277 &tables,
278 SqlServerConfigOptionName::TextColumns,
279 )?;
280 let normalized_excl_columns = normalize_column_refs(
281 excl_columns,
282 &reference_resolver,
283 &tables,
284 SqlServerConfigOptionName::ExcludeColumns,
285 )?;
286
287 let exports = tables
288 .into_iter()
289 .map(|reference| {
290 let table_reference = reference.meta.qualified_name();
291 let text_columns = text_cols_map.get(&table_reference).map(|cols| {
292 cols.iter()
293 .map(|c| Ident::new(*c).expect("validated above"))
294 .collect()
295 });
296 let excl_columns = excl_cols_map.get(&table_reference).map(|cols| {
297 cols.iter()
298 .map(|c| Ident::new(*c).expect("validated above"))
299 .collect()
300 });
301 let capture_instance = capture_instances
302 .get(&reference.meta.qualified_name())
303 .expect("capture instance should exist");
304
305 let initial_lsn = *initial_lsns.get(capture_instance).ok_or_else(|| {
306 SqlServerSourcePurificationError::NoStartLsn(capture_instance.to_string())
307 })?;
308
309 let export = PurifiedSourceExport {
310 external_reference: reference.external_reference,
311 details: PurifiedExportDetails::SqlServer {
312 table: reference.meta,
313 text_columns,
314 excl_columns,
315 capture_instance: Arc::clone(capture_instance),
316 initial_lsn,
317 },
318 };
319
320 Ok::<_, SqlServerSourcePurificationError>((reference.name, export))
321 })
322 .collect::<Result<_, _>>()?;
323
324 Ok(PurifiedSourceExports {
325 source_exports: exports,
326 normalized_text_columns,
327 normalized_excl_columns,
328 })
329}
330
331pub fn generate_create_subsource_statements(
332 scx: &StatementContext,
333 source_name: ResolvedItemName,
334 requested_subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
335) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
336 let mut subsources = Vec::with_capacity(requested_subsources.len());
337
338 for (subsource_name, purified_export) in requested_subsources {
339 let SqlServerExportStatementValues {
340 columns,
341 constraints,
342 text_columns,
343 excl_columns,
344 details,
345 external_reference,
346 } = generate_source_export_statement_values(scx, purified_export)?;
347
348 let mut with_options = vec![
349 CreateSubsourceOption {
350 name: CreateSubsourceOptionName::ExternalReference,
351 value: Some(WithOptionValue::UnresolvedItemName(external_reference)),
352 },
353 CreateSubsourceOption {
354 name: CreateSubsourceOptionName::Details,
355 value: Some(WithOptionValue::Value(Value::String(hex::encode(
356 details.into_proto().encode_to_vec(),
357 )))),
358 },
359 ];
360
361 if let Some(text_columns) = text_columns {
362 with_options.push(CreateSubsourceOption {
363 name: CreateSubsourceOptionName::TextColumns,
364 value: Some(WithOptionValue::Sequence(text_columns)),
365 });
366 }
367 if let Some(excl_columns) = excl_columns {
368 with_options.push(CreateSubsourceOption {
369 name: CreateSubsourceOptionName::ExcludeColumns,
370 value: Some(WithOptionValue::Sequence(excl_columns)),
371 });
372 }
373
374 let subsource = CreateSubsourceStatement {
376 name: subsource_name,
377 columns,
378 of_source: Some(source_name.clone()),
379 constraints,
380 if_not_exists: false,
381 with_options,
382 };
383 subsources.push(subsource);
384 }
385
386 Ok(subsources)
387}
388
389pub(super) struct SqlServerExportStatementValues {
390 pub(super) columns: Vec<ColumnDef<Aug>>,
391 pub(super) constraints: Vec<TableConstraint<Aug>>,
392 pub(super) text_columns: Option<Vec<WithOptionValue<Aug>>>,
393 pub(super) excl_columns: Option<Vec<WithOptionValue<Aug>>>,
394 pub(super) details: SourceExportStatementDetails,
395 pub(super) external_reference: UnresolvedItemName,
396}
397
398pub(super) fn generate_source_export_statement_values(
399 scx: &StatementContext,
400 purified_export: PurifiedSourceExport,
401) -> Result<SqlServerExportStatementValues, PlanError> {
402 let PurifiedExportDetails::SqlServer {
403 table,
404 text_columns,
405 excl_columns,
406 capture_instance,
407 initial_lsn,
408 } = purified_export.details
409 else {
410 unreachable!("purified export details must be SQL Server")
411 };
412
413 let included_columns = table.columns.iter().filter_map(|c| {
415 c.column_type
416 .as_ref()
417 .map(|ct| (c.name.as_ref(), ct, c.primary_key_constraint.clone()))
418 });
419
420 let mut primary_keys: BTreeMap<Arc<str>, Vec<Ident>> = BTreeMap::new();
421 let mut column_defs = vec![];
422 let mut constraints = vec![];
423
424 for (col_name, col_type, col_primary_key_constraint) in included_columns {
425 let name = Ident::new(col_name)?;
426 let ty = mz_pgrepr::Type::from(&col_type.scalar_type);
427 let data_type = scx.resolve_type(ty)?;
428 let mut col_options = vec![];
429
430 if let Some(constraint) = col_primary_key_constraint {
431 let columns = primary_keys.entry(constraint).or_default();
432 columns.push(name.clone());
433 }
434 if !col_type.nullable {
435 col_options.push(mz_sql_parser::ast::ColumnOptionDef {
436 name: None,
437 option: mz_sql_parser::ast::ColumnOption::NotNull,
438 });
439 }
440
441 column_defs.push(ColumnDef {
442 name,
443 data_type,
444 collation: None,
445 options: col_options,
446 });
447 }
448
449 match primary_keys.len() {
450 0 => (),
452 1 => {
453 let (constraint_name, columns) = primary_keys.into_element();
454 constraints.push(mz_sql_parser::ast::TableConstraint::Unique {
455 name: Some(Ident::new_lossy(&*constraint_name)),
456 columns,
457 is_primary: true,
458 nulls_not_distinct: false,
459 });
460 }
461 2.. => {
463 let constraint_names = primary_keys.into_keys().collect();
464 return Err(PlanError::SqlServerSourcePurificationError(
465 SqlServerSourcePurificationError::MultiplePrimaryKeys { constraint_names },
466 ));
467 }
468 }
469
470 let details = SourceExportStatementDetails::SqlServer {
471 table,
472 capture_instance,
473 initial_lsn,
474 };
475 let text_columns = text_columns.map(|mut columns| {
476 columns.sort();
477 columns
478 .into_iter()
479 .map(WithOptionValue::Ident::<Aug>)
480 .collect()
481 });
482 let excl_columns = excl_columns.map(|mut columns| {
483 columns.sort();
484 columns
485 .into_iter()
486 .map(WithOptionValue::Ident::<Aug>)
487 .collect()
488 });
489
490 let values = SqlServerExportStatementValues {
491 columns: column_defs,
492 constraints,
493 text_columns,
494 excl_columns,
495 details,
496 external_reference: purified_export.external_reference,
497 };
498 Ok(values)
499}
500
501fn map_column_refs<'a>(
503 cols: &'a [UnresolvedItemName],
504 option_type: SqlServerConfigOptionName,
505) -> Result<BTreeMap<SqlServerQualifiedTableName, BTreeSet<&'a str>>, PlanError> {
506 let mut table_to_cols: BTreeMap<_, BTreeSet<_>> = BTreeMap::new();
507 for name in cols.iter() {
508 if name.0.len() == 3 {
510 let key = mz_sql_server_util::desc::SqlServerQualifiedTableName {
511 schema_name: name.0[0].as_str().into(),
512 table_name: name.0[1].as_str().into(),
513 };
514 let new = table_to_cols
515 .entry(key)
516 .or_default()
517 .insert(name.0[2].as_str());
518 if !new {
519 return Err(PlanError::InvalidOptionValue {
520 option_name: option_type.to_ast_string_simple(),
521 err: Box::new(PlanError::UnexpectedDuplicateReference { name: name.clone() }),
522 });
523 }
524 } else {
525 return Err(PlanError::InvalidOptionValue {
526 option_name: option_type.to_ast_string_simple(),
527 err: Box::new(PlanError::UnderqualifiedColumnName(name.to_string())),
528 });
529 }
530 }
531 Ok(table_to_cols)
532}
533
534fn normalize_column_refs(
540 cols: &[UnresolvedItemName],
541 reference_resolver: &SourceReferenceResolver,
542 tables: &[RequestedSourceExport<SqlServerTableDesc>],
543 option_name: SqlServerConfigOptionName,
544) -> Result<Vec<WithOptionValue<Aug>>, SqlServerSourcePurificationError> {
545 let (seq, unknown): (Vec<_>, Vec<_>) = cols.into_iter().partition(|name| {
546 let (column_name, qual) = name.0.split_last().expect("non-empty");
547 match reference_resolver.resolve_idx(qual) {
548 Ok(idx) => tables[idx]
551 .meta
552 .columns
553 .iter()
554 .any(|n| &*n.name == column_name.as_str()),
555 Err(_) => false,
556 }
557 });
558
559 if !unknown.is_empty() {
560 return Err(SqlServerSourcePurificationError::DanglingColumns {
561 option_name: option_name.to_string(),
562 items: unknown.into_iter().cloned().collect(),
563 });
564 }
565
566 let mut seq: Vec<_> = seq
567 .into_iter()
568 .cloned()
569 .map(WithOptionValue::UnresolvedItemName)
570 .collect();
571
572 seq.sort();
573 seq.dedup();
574 Ok(seq)
575}