1use std::collections::{BTreeMap, BTreeSet};
11use std::sync::Arc;
12use std::time::Duration;
13
14use mz_ore::collections::CollectionExt;
15use mz_proto::RustType;
16use mz_sql_parser::ast::display::AstDisplay;
17use mz_sql_parser::ast::{
18 ColumnDef, CreateSubsourceOption, CreateSubsourceOptionName, CreateSubsourceStatement,
19 ExternalReferences, Ident, SqlServerConfigOptionName, TableConstraint, UnresolvedItemName,
20 Value, WithOptionValue,
21};
22use mz_sql_server_util::desc::{SqlServerQualifiedTableName, SqlServerTableDesc};
23use mz_storage_types::sources::{SourceExportStatementDetails, SourceReferenceResolver};
24use prost::Message;
25
26use crate::names::{Aug, ResolvedItemName};
27use crate::plan::{PlanError, StatementContext};
28use crate::pure::{
29 PurifiedExportDetails, PurifiedSourceExport, RequestedSourceExport, RetrievedSourceReferences,
30 SourceReferencePolicy, SqlServerSourcePurificationError,
31};
32
33pub(super) struct PurifiedSourceExports {
34 pub(super) source_exports: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
36 pub(super) normalized_text_columns: Vec<WithOptionValue<Aug>>,
42 pub(super) normalized_excl_columns: Vec<WithOptionValue<Aug>>,
48}
49
50#[allow(clippy::unused_async)]
53pub(super) async fn purify_source_exports(
54 database: &str,
55 client: &mut mz_sql_server_util::Client,
56 retrieved_references: &RetrievedSourceReferences,
57 requested_references: &Option<ExternalReferences>,
58 text_columns: &[UnresolvedItemName],
59 excl_columns: &[UnresolvedItemName],
60 unresolved_source_name: &UnresolvedItemName,
61 timeout: Duration,
62 reference_policy: &SourceReferencePolicy,
63) -> Result<PurifiedSourceExports, PlanError> {
64 let requested_exports = match requested_references.as_ref() {
65 Some(requested) => {
66 if *reference_policy == SourceReferencePolicy::NotAllowed {
67 return Err(PlanError::UseTablesForSources(requested.to_string()));
68 } else {
69 let exports = retrieved_references
70 .requested_source_exports(Some(requested), unresolved_source_name)?;
71 exports
72 }
73 }
74 None => {
75 if *reference_policy == SourceReferencePolicy::Required {
76 return Err(SqlServerSourcePurificationError::RequiresExternalReferences.into());
77 }
78
79 if !text_columns.is_empty() {
82 Err(
83 SqlServerSourcePurificationError::UnnecessaryOptionsWithoutReferences(
84 "TEXT COLUMNS".to_string(),
85 ),
86 )?
87 }
88 if !excl_columns.is_empty() {
89 Err(
90 SqlServerSourcePurificationError::UnnecessaryOptionsWithoutReferences(
91 "EXCLUDE COLUMNS".to_string(),
92 ),
93 )?
94 }
95
96 return Ok(PurifiedSourceExports {
97 source_exports: BTreeMap::default(),
98 normalized_text_columns: Vec::default(),
99 normalized_excl_columns: Vec::default(),
100 });
101 }
102 };
103
104 if requested_exports.is_empty() {
105 sql_bail!(
106 "SQL Server source must ingest at least one table, but {} matched none",
107 requested_references
108 .as_ref()
109 .expect("checked above")
110 .to_ast_string_simple()
111 )
112 }
113
114 super::validate_source_export_names(&requested_exports)?;
115
116 let text_cols_map = map_column_refs(text_columns, SqlServerConfigOptionName::TextColumns)?;
119 let excl_cols_map = map_column_refs(excl_columns, SqlServerConfigOptionName::ExcludeColumns)?;
120
121 for export in &requested_exports {
129 let table = export.meta.sql_server_table().expect("sql server source");
130 let maybe_excl_cols = excl_cols_map.get(&table.qualified_name());
131 let is_excluded = |name| {
132 maybe_excl_cols
133 .as_ref()
134 .map(|cols| cols.contains(name))
135 .unwrap_or(false)
136 };
137
138 let maybe_bad_column = table
139 .columns
140 .iter()
141 .find(|col| !col.is_supported() && !is_excluded(col.name.as_ref()));
142 if let Some(bad_column) = maybe_bad_column {
143 Err(SqlServerSourcePurificationError::UnsupportedColumn {
144 schema_name: Arc::clone(&table.schema_name),
145 tbl_name: Arc::clone(&table.name),
146 col_name: Arc::clone(&bad_column.name),
147 col_type: Arc::clone(&bad_column.raw_type),
148 })?
149 }
150 }
151
152 let capture_instances: BTreeMap<_, _> = requested_exports
153 .iter()
154 .map(|requested| {
155 let table = requested
156 .meta
157 .sql_server_table()
158 .expect("sql server source");
159 let capture_instance = requested
160 .meta
161 .sql_server_capture_instance()
162 .expect("sql server source");
163
164 (table.qualified_name(), Arc::clone(capture_instance))
165 })
166 .collect();
167
168 mz_sql_server_util::inspect::validate_source_privileges(
169 client,
170 capture_instances.values().map(|instance| instance.as_ref()),
171 )
172 .await?;
173
174 let mut initial_lsns = mz_sql_server_util::inspect::get_min_lsns(
181 client,
182 capture_instances.values().map(|instance| instance.as_ref()),
183 )
184 .await?;
185
186 let max_lsn = mz_sql_server_util::inspect::get_max_lsn_retry(client, timeout).await?;
187 tracing::debug!(?initial_lsns, %max_lsn, "retrieved start LSNs");
188 for lsn in initial_lsns.values_mut() {
189 *lsn = std::cmp::max(*lsn, max_lsn);
190 }
191
192 let mut tables = vec![];
193 for requested in requested_exports {
194 let mut table = requested
195 .meta
196 .sql_server_table()
197 .expect("sql server source")
198 .clone();
199
200 let maybe_text_cols = text_cols_map.get(&table.qualified_name());
201 let maybe_excl_cols = excl_cols_map.get(&table.qualified_name());
202
203 if let Some(text_cols) = maybe_text_cols {
204 table.apply_text_columns(text_cols);
205 }
206
207 if let Some(excl_cols) = maybe_excl_cols {
208 table.apply_excl_columns(excl_cols);
209 }
210
211 if table.columns.iter().all(|c| c.is_excluded()) {
212 Err(SqlServerSourcePurificationError::AllColumnsExcluded {
213 tbl_name: Arc::clone(&table.name),
214 })?
215 }
216
217 tables.push(requested.change_meta(table));
218 }
219
220 if tables.is_empty() {
221 Err(SqlServerSourcePurificationError::NoTables)?;
222 }
223
224 let reference_resolver = SourceReferenceResolver::new(
225 database,
226 &tables.iter().map(|r| &r.meta).collect::<Vec<_>>(),
227 )?;
228
229 let normalized_text_columns = normalize_column_refs(
231 text_columns,
232 &reference_resolver,
233 &tables,
234 SqlServerConfigOptionName::TextColumns,
235 )?;
236 let normalized_excl_columns = normalize_column_refs(
237 excl_columns,
238 &reference_resolver,
239 &tables,
240 SqlServerConfigOptionName::ExcludeColumns,
241 )?;
242
243 let exports = tables
244 .into_iter()
245 .map(|reference| {
246 let table_reference = reference.meta.qualified_name();
247 let text_columns = text_cols_map.get(&table_reference).map(|cols| {
248 cols.iter()
249 .map(|c| Ident::new(*c).expect("validated above"))
250 .collect()
251 });
252 let excl_columns = excl_cols_map.get(&table_reference).map(|cols| {
253 cols.iter()
254 .map(|c| Ident::new(*c).expect("validated above"))
255 .collect()
256 });
257 let capture_instance = capture_instances
258 .get(&reference.meta.qualified_name())
259 .expect("capture instance should exist");
260
261 let initial_lsn = *initial_lsns.get(capture_instance).ok_or_else(|| {
262 SqlServerSourcePurificationError::NoStartLsn(capture_instance.to_string())
263 })?;
264
265 let export = PurifiedSourceExport {
266 external_reference: reference.external_reference,
267 details: PurifiedExportDetails::SqlServer {
268 table: reference.meta,
269 text_columns,
270 excl_columns,
271 capture_instance: Arc::clone(capture_instance),
272 initial_lsn,
273 },
274 };
275
276 Ok::<_, SqlServerSourcePurificationError>((reference.name, export))
277 })
278 .collect::<Result<_, _>>()?;
279
280 Ok(PurifiedSourceExports {
281 source_exports: exports,
282 normalized_text_columns,
283 normalized_excl_columns,
284 })
285}
286
287pub fn generate_create_subsource_statements(
288 scx: &StatementContext,
289 source_name: ResolvedItemName,
290 requested_subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
291) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
292 let mut subsources = Vec::with_capacity(requested_subsources.len());
293
294 for (subsource_name, purified_export) in requested_subsources {
295 let SqlServerExportStatementValues {
296 columns,
297 constraints,
298 text_columns,
299 excl_columns,
300 details,
301 external_reference,
302 } = generate_source_export_statement_values(scx, purified_export)?;
303
304 let mut with_options = vec![
305 CreateSubsourceOption {
306 name: CreateSubsourceOptionName::ExternalReference,
307 value: Some(WithOptionValue::UnresolvedItemName(external_reference)),
308 },
309 CreateSubsourceOption {
310 name: CreateSubsourceOptionName::Details,
311 value: Some(WithOptionValue::Value(Value::String(hex::encode(
312 details.into_proto().encode_to_vec(),
313 )))),
314 },
315 ];
316
317 if let Some(text_columns) = text_columns {
318 with_options.push(CreateSubsourceOption {
319 name: CreateSubsourceOptionName::TextColumns,
320 value: Some(WithOptionValue::Sequence(text_columns)),
321 });
322 }
323 if let Some(excl_columns) = excl_columns {
324 with_options.push(CreateSubsourceOption {
325 name: CreateSubsourceOptionName::ExcludeColumns,
326 value: Some(WithOptionValue::Sequence(excl_columns)),
327 });
328 }
329
330 let subsource = CreateSubsourceStatement {
332 name: subsource_name,
333 columns,
334 of_source: Some(source_name.clone()),
335 constraints,
336 if_not_exists: false,
337 with_options,
338 };
339 subsources.push(subsource);
340 }
341
342 Ok(subsources)
343}
344
345pub(super) struct SqlServerExportStatementValues {
346 pub(super) columns: Vec<ColumnDef<Aug>>,
347 pub(super) constraints: Vec<TableConstraint<Aug>>,
348 pub(super) text_columns: Option<Vec<WithOptionValue<Aug>>>,
349 pub(super) excl_columns: Option<Vec<WithOptionValue<Aug>>>,
350 pub(super) details: SourceExportStatementDetails,
351 pub(super) external_reference: UnresolvedItemName,
352}
353
354pub(super) fn generate_source_export_statement_values(
355 scx: &StatementContext,
356 purified_export: PurifiedSourceExport,
357) -> Result<SqlServerExportStatementValues, PlanError> {
358 let PurifiedExportDetails::SqlServer {
359 table,
360 text_columns,
361 excl_columns,
362 capture_instance,
363 initial_lsn,
364 } = purified_export.details
365 else {
366 unreachable!("purified export details must be SQL Server")
367 };
368
369 let included_columns = table.columns.iter().filter_map(|c| {
371 c.column_type
372 .as_ref()
373 .map(|ct| (c.name.as_ref(), ct, c.primary_key_constraint.clone()))
374 });
375
376 let mut primary_keys: BTreeMap<Arc<str>, Vec<Ident>> = BTreeMap::new();
377 let mut column_defs = vec![];
378 let mut constraints = vec![];
379
380 for (col_name, col_type, col_primary_key_constraint) in included_columns {
381 let name = Ident::new(col_name)?;
382 let ty = mz_pgrepr::Type::from(&col_type.scalar_type);
383 let data_type = scx.resolve_type(ty)?;
384 let mut col_options = vec![];
385
386 if let Some(constraint) = col_primary_key_constraint {
387 let columns = primary_keys.entry(constraint).or_default();
388 columns.push(name.clone());
389 }
390 if !col_type.nullable {
391 col_options.push(mz_sql_parser::ast::ColumnOptionDef {
392 name: None,
393 option: mz_sql_parser::ast::ColumnOption::NotNull,
394 });
395 }
396
397 column_defs.push(ColumnDef {
398 name,
399 data_type,
400 collation: None,
401 options: col_options,
402 });
403 }
404
405 match primary_keys.len() {
406 0 => (),
408 1 => {
409 let (constraint_name, columns) = primary_keys.into_element();
410 constraints.push(mz_sql_parser::ast::TableConstraint::Unique {
411 name: Some(Ident::new_lossy(&*constraint_name)),
412 columns,
413 is_primary: true,
414 nulls_not_distinct: false,
415 });
416 }
417 2.. => {
419 let constraint_names = primary_keys.into_keys().collect();
420 return Err(PlanError::SqlServerSourcePurificationError(
421 SqlServerSourcePurificationError::MultiplePrimaryKeys { constraint_names },
422 ));
423 }
424 }
425
426 let details = SourceExportStatementDetails::SqlServer {
427 table,
428 capture_instance,
429 initial_lsn,
430 };
431 let text_columns = text_columns.map(|mut columns| {
432 columns.sort();
433 columns
434 .into_iter()
435 .map(WithOptionValue::Ident::<Aug>)
436 .collect()
437 });
438 let excl_columns = excl_columns.map(|mut columns| {
439 columns.sort();
440 columns
441 .into_iter()
442 .map(WithOptionValue::Ident::<Aug>)
443 .collect()
444 });
445
446 let values = SqlServerExportStatementValues {
447 columns: column_defs,
448 constraints,
449 text_columns,
450 excl_columns,
451 details,
452 external_reference: purified_export.external_reference,
453 };
454 Ok(values)
455}
456
457fn map_column_refs<'a>(
459 cols: &'a [UnresolvedItemName],
460 option_type: SqlServerConfigOptionName,
461) -> Result<BTreeMap<SqlServerQualifiedTableName, BTreeSet<&'a str>>, PlanError> {
462 let mut table_to_cols: BTreeMap<_, BTreeSet<_>> = BTreeMap::new();
463 for name in cols.iter() {
464 if name.0.len() == 3 {
466 let key = mz_sql_server_util::desc::SqlServerQualifiedTableName {
467 schema_name: name.0[0].as_str().into(),
468 table_name: name.0[1].as_str().into(),
469 };
470 let new = table_to_cols
471 .entry(key)
472 .or_default()
473 .insert(name.0[2].as_str());
474 if !new {
475 return Err(PlanError::InvalidOptionValue {
476 option_name: option_type.to_ast_string_simple(),
477 err: Box::new(PlanError::UnexpectedDuplicateReference { name: name.clone() }),
478 });
479 }
480 } else {
481 return Err(PlanError::InvalidOptionValue {
482 option_name: option_type.to_ast_string_simple(),
483 err: Box::new(PlanError::UnderqualifiedColumnName(name.to_string())),
484 });
485 }
486 }
487 Ok(table_to_cols)
488}
489
490fn normalize_column_refs(
496 cols: &[UnresolvedItemName],
497 reference_resolver: &SourceReferenceResolver,
498 tables: &[RequestedSourceExport<SqlServerTableDesc>],
499 option_name: SqlServerConfigOptionName,
500) -> Result<Vec<WithOptionValue<Aug>>, SqlServerSourcePurificationError> {
501 let (seq, unknown): (Vec<_>, Vec<_>) = cols.into_iter().partition(|name| {
502 let (column_name, qual) = name.0.split_last().expect("non-empty");
503 match reference_resolver.resolve_idx(qual) {
504 Ok(idx) => tables[idx]
507 .meta
508 .columns
509 .iter()
510 .any(|n| &*n.name == column_name.as_str()),
511 Err(_) => false,
512 }
513 });
514
515 if !unknown.is_empty() {
516 return Err(SqlServerSourcePurificationError::DanglingColumns {
517 option_name: option_name.to_string(),
518 items: unknown.into_iter().cloned().collect(),
519 });
520 }
521
522 let mut seq: Vec<_> = seq
523 .into_iter()
524 .cloned()
525 .map(WithOptionValue::UnresolvedItemName)
526 .collect();
527
528 seq.sort();
529 seq.dedup();
530 Ok(seq)
531}