1use std::collections::{BTreeMap, BTreeSet};
11use std::sync::Arc;
12
13use mz_proto::RustType;
14use mz_sql_parser::ast::display::AstDisplay;
15use mz_sql_parser::ast::{
16 ColumnDef, CreateSubsourceOption, CreateSubsourceOptionName, CreateSubsourceStatement,
17 ExternalReferences, Ident, SqlServerConfigOptionName, UnresolvedItemName, Value,
18 WithOptionValue,
19};
20use mz_sql_server_util::desc::{SqlServerQualifiedTableName, SqlServerTableDesc};
21use mz_storage_types::sources::{SourceExportStatementDetails, SourceReferenceResolver};
22use prost::Message;
23
24use crate::names::{Aug, ResolvedItemName};
25use crate::plan::{PlanError, StatementContext};
26use crate::pure::{
27 PurifiedExportDetails, PurifiedSourceExport, RequestedSourceExport, RetrievedSourceReferences,
28 SourceReferencePolicy, SqlServerSourcePurificationError,
29};
30
31pub(super) struct PurifiedSourceExports {
32 pub(super) source_exports: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
34 pub(super) normalized_text_columns: Vec<WithOptionValue<Aug>>,
40 pub(super) normalized_excl_columns: Vec<WithOptionValue<Aug>>,
46}
47
48#[allow(clippy::unused_async)]
51pub(super) async fn purify_source_exports(
52 database: &str,
53 _client: &mut mz_sql_server_util::Client,
54 retrieved_references: &RetrievedSourceReferences,
55 requested_references: &Option<ExternalReferences>,
56 text_columns: &[UnresolvedItemName],
57 excl_columns: &[UnresolvedItemName],
58 unresolved_source_name: &UnresolvedItemName,
59 reference_policy: &SourceReferencePolicy,
60) -> Result<PurifiedSourceExports, PlanError> {
61 let requested_exports = match requested_references.as_ref() {
62 Some(requested) => {
63 if *reference_policy == SourceReferencePolicy::NotAllowed {
64 return Err(PlanError::UseTablesForSources(requested.to_string()));
65 } else {
66 let exports = retrieved_references
67 .requested_source_exports(Some(requested), unresolved_source_name)?;
68 exports
69 }
70 }
71 None => {
72 if *reference_policy == SourceReferencePolicy::Required {
73 return Err(SqlServerSourcePurificationError::RequiresExternalReferences.into());
74 }
75
76 if !text_columns.is_empty() {
79 Err(
80 SqlServerSourcePurificationError::UnnecessaryOptionsWithoutReferences(
81 "TEXT COLUMNS".to_string(),
82 ),
83 )?
84 }
85 if !excl_columns.is_empty() {
86 Err(
87 SqlServerSourcePurificationError::UnnecessaryOptionsWithoutReferences(
88 "EXCLUDE COLUMNS".to_string(),
89 ),
90 )?
91 }
92
93 return Ok(PurifiedSourceExports {
94 source_exports: BTreeMap::default(),
95 normalized_text_columns: Vec::default(),
96 normalized_excl_columns: Vec::default(),
97 });
98 }
99 };
100
101 let text_cols_map = map_column_refs(text_columns, SqlServerConfigOptionName::TextColumns)?;
104 let excl_cols_map = map_column_refs(excl_columns, SqlServerConfigOptionName::ExcludeColumns)?;
105
106 if requested_exports.is_empty() {
107 sql_bail!(
108 "SQL Server source must ingest at least one table, but {} matched none",
109 requested_references
110 .as_ref()
111 .expect("checked above")
112 .to_ast_string_simple()
113 )
114 }
115
116 for export in &requested_exports {
124 let table = export.meta.sql_server_table().expect("sql server source");
125 let maybe_excl_cols = excl_cols_map.get(&table.qualified_name());
126 let is_excluded = |name| {
127 maybe_excl_cols
128 .as_ref()
129 .map(|cols| cols.contains(name))
130 .unwrap_or(false)
131 };
132
133 let maybe_bad_column = table
134 .columns
135 .iter()
136 .find(|col| !col.is_supported() && !is_excluded(col.name.as_ref()));
137 if let Some(bad_column) = maybe_bad_column {
138 Err(SqlServerSourcePurificationError::UnsupportedColumn {
139 schema_name: Arc::clone(&table.schema_name),
140 tbl_name: Arc::clone(&table.name),
141 col_name: Arc::clone(&bad_column.name),
142 col_type: Arc::clone(&bad_column.raw_type),
143 })?
144 }
145 }
146
147 let capture_instances: BTreeMap<_, _> = requested_exports
150 .iter()
151 .map(|requested| {
152 let table = requested
153 .meta
154 .sql_server_table()
155 .expect("sql server source");
156 let capture_instance = requested
157 .meta
158 .sql_server_capture_instance()
159 .expect("sql server source");
160
161 (table.qualified_name(), Arc::clone(capture_instance))
162 })
163 .collect();
164
165 let tables: Vec<_> = requested_exports
166 .into_iter()
167 .map(|requested| {
168 let mut table = requested
169 .meta
170 .sql_server_table()
171 .expect("sql server source")
172 .clone();
173
174 let maybe_text_cols = text_cols_map.get(&table.qualified_name());
175 let maybe_excl_cols = excl_cols_map.get(&table.qualified_name());
176
177 if let Some(text_cols) = maybe_text_cols {
178 table.apply_text_columns(text_cols);
179 }
180 if let Some(excl_cols) = maybe_excl_cols {
183 table.apply_excl_columns(excl_cols);
184 }
185
186 requested.change_meta(table)
187 })
188 .collect();
189
190 if tables.is_empty() {
191 Err(SqlServerSourcePurificationError::NoTables)?;
192 }
193
194 let reference_resolver = SourceReferenceResolver::new(
195 database,
196 &tables.iter().map(|r| &r.meta).collect::<Vec<_>>(),
197 )?;
198
199 let normalized_text_columns = normalize_column_refs(
201 text_columns,
202 &reference_resolver,
203 &tables,
204 SqlServerConfigOptionName::TextColumns,
205 )?;
206 let normalized_excl_columns = normalize_column_refs(
207 excl_columns,
208 &reference_resolver,
209 &tables,
210 SqlServerConfigOptionName::ExcludeColumns,
211 )?;
212
213 let exports = tables
214 .into_iter()
215 .map(|reference| {
216 let table_reference = reference.meta.qualified_name();
217 let text_columns = text_cols_map.get(&table_reference).map(|cols| {
218 cols.iter()
219 .map(|c| Ident::new(*c).expect("validated above"))
220 .collect()
221 });
222 let excl_columns = excl_cols_map.get(&table_reference).map(|cols| {
223 cols.iter()
224 .map(|c| Ident::new(*c).expect("validated above"))
225 .collect()
226 });
227 let capture_instance = capture_instances
228 .get(&reference.meta.qualified_name())
229 .expect("capture instance should exist");
230
231 let export = PurifiedSourceExport {
232 external_reference: reference.external_reference,
233 details: PurifiedExportDetails::SqlServer {
234 table: reference.meta,
235 text_columns,
236 excl_columns,
237 capture_instance: Arc::clone(capture_instance),
238 },
239 };
240
241 (reference.name, export)
242 })
243 .collect();
244
245 Ok(PurifiedSourceExports {
246 source_exports: exports,
247 normalized_text_columns,
248 normalized_excl_columns,
249 })
250}
251
252pub fn generate_create_subsource_statements(
253 scx: &StatementContext,
254 source_name: ResolvedItemName,
255 requested_subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
256) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
257 let mut subsources = Vec::with_capacity(requested_subsources.len());
258
259 for (subsource_name, purified_export) in requested_subsources {
260 let SqlServerExportStatementValues {
261 columns,
262 text_columns,
263 excl_columns,
264 details,
265 external_reference,
266 } = generate_source_export_statement_values(scx, purified_export)?;
267
268 let mut with_options = vec![
269 CreateSubsourceOption {
270 name: CreateSubsourceOptionName::ExternalReference,
271 value: Some(WithOptionValue::UnresolvedItemName(external_reference)),
272 },
273 CreateSubsourceOption {
274 name: CreateSubsourceOptionName::Details,
275 value: Some(WithOptionValue::Value(Value::String(hex::encode(
276 details.into_proto().encode_to_vec(),
277 )))),
278 },
279 ];
280
281 if let Some(text_columns) = text_columns {
282 with_options.push(CreateSubsourceOption {
283 name: CreateSubsourceOptionName::TextColumns,
284 value: Some(WithOptionValue::Sequence(text_columns)),
285 });
286 }
287 if let Some(excl_columns) = excl_columns {
288 with_options.push(CreateSubsourceOption {
289 name: CreateSubsourceOptionName::ExcludeColumns,
290 value: Some(WithOptionValue::Sequence(excl_columns)),
291 });
292 }
293
294 let subsource = CreateSubsourceStatement {
296 name: subsource_name,
297 columns,
298 of_source: Some(source_name.clone()),
299 constraints: vec![],
301 if_not_exists: false,
302 with_options,
303 };
304 subsources.push(subsource);
305 }
306
307 Ok(subsources)
308}
309
310struct SqlServerExportStatementValues {
311 pub columns: Vec<ColumnDef<Aug>>,
312 pub text_columns: Option<Vec<WithOptionValue<Aug>>>,
313 pub excl_columns: Option<Vec<WithOptionValue<Aug>>>,
314 pub details: SourceExportStatementDetails,
315 pub external_reference: UnresolvedItemName,
316}
317
318fn generate_source_export_statement_values(
319 scx: &StatementContext,
320 purified_export: PurifiedSourceExport,
321) -> Result<SqlServerExportStatementValues, PlanError> {
322 let PurifiedExportDetails::SqlServer {
323 table,
324 text_columns,
325 excl_columns,
326 capture_instance,
327 } = purified_export.details
328 else {
329 unreachable!("purified export details must be SQL Server")
330 };
331
332 let included_columns = table
334 .columns
335 .iter()
336 .filter_map(|c| c.column_type.as_ref().map(|ct| (c.name.as_ref(), ct)));
337 let mut column_defs = vec![];
338
339 for (col_name, col_type) in included_columns {
340 let name = Ident::new(col_name)?;
341 let ty = mz_pgrepr::Type::from(&col_type.scalar_type);
342 let data_type = scx.resolve_type(ty)?;
343 let mut col_options = vec![];
344
345 if !col_type.nullable {
346 col_options.push(mz_sql_parser::ast::ColumnOptionDef {
347 name: None,
348 option: mz_sql_parser::ast::ColumnOption::NotNull,
349 });
350 }
351 column_defs.push(ColumnDef {
352 name,
353 data_type,
354 collation: None,
355 options: col_options,
356 });
357 }
358
359 let details = SourceExportStatementDetails::SqlServer {
362 table,
363 capture_instance,
364 };
365 let text_columns = text_columns.map(|mut columns| {
366 columns.sort();
367 columns
368 .into_iter()
369 .map(WithOptionValue::Ident::<Aug>)
370 .collect()
371 });
372 let excl_columns = excl_columns.map(|mut columns| {
373 columns.sort();
374 columns
375 .into_iter()
376 .map(WithOptionValue::Ident::<Aug>)
377 .collect()
378 });
379
380 let values = SqlServerExportStatementValues {
381 columns: column_defs,
382 text_columns,
383 excl_columns,
384 details,
385 external_reference: purified_export.external_reference,
386 };
387 Ok(values)
388}
389
390fn map_column_refs<'a>(
392 cols: &'a [UnresolvedItemName],
393 option_type: SqlServerConfigOptionName,
394) -> Result<BTreeMap<SqlServerQualifiedTableName, BTreeSet<&'a str>>, PlanError> {
395 let mut table_to_cols: BTreeMap<_, BTreeSet<_>> = BTreeMap::new();
396 for name in cols.iter() {
397 if name.0.len() == 3 {
399 let key = mz_sql_server_util::desc::SqlServerQualifiedTableName {
400 schema_name: name.0[0].as_str().into(),
401 table_name: name.0[1].as_str().into(),
402 };
403 let new = table_to_cols
404 .entry(key)
405 .or_default()
406 .insert(name.0[2].as_str());
407 if !new {
408 return Err(PlanError::InvalidOptionValue {
409 option_name: option_type.to_ast_string_simple(),
410 err: Box::new(PlanError::UnexpectedDuplicateReference { name: name.clone() }),
411 });
412 }
413 } else {
414 return Err(PlanError::InvalidOptionValue {
415 option_name: option_type.to_ast_string_simple(),
416 err: Box::new(PlanError::UnderqualifiedColumnName(name.to_string())),
417 });
418 }
419 }
420 Ok(table_to_cols)
421}
422
423fn normalize_column_refs(
429 cols: &[UnresolvedItemName],
430 reference_resolver: &SourceReferenceResolver,
431 tables: &[RequestedSourceExport<SqlServerTableDesc>],
432 option_name: SqlServerConfigOptionName,
433) -> Result<Vec<WithOptionValue<Aug>>, SqlServerSourcePurificationError> {
434 let (seq, unknown): (Vec<_>, Vec<_>) = cols.into_iter().partition(|name| {
435 let (column_name, qual) = name.0.split_last().expect("non-empty");
436 match reference_resolver.resolve_idx(qual) {
437 Ok(idx) => tables[idx]
440 .meta
441 .columns
442 .iter()
443 .any(|n| &*n.name == column_name.as_str()),
444 Err(_) => false,
445 }
446 });
447
448 if !unknown.is_empty() {
449 return Err(SqlServerSourcePurificationError::DanglingColumns {
450 option_name: option_name.to_string(),
451 items: unknown.into_iter().cloned().collect(),
452 });
453 }
454
455 let mut seq: Vec<_> = seq
456 .into_iter()
457 .cloned()
458 .map(WithOptionValue::UnresolvedItemName)
459 .collect();
460
461 seq.sort();
462 seq.dedup();
463 Ok(seq)
464}