mz_sql/pure/
sql_server.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::sync::Arc;
12use std::time::Duration;
13
14use mz_proto::RustType;
15use mz_sql_parser::ast::display::AstDisplay;
16use mz_sql_parser::ast::{
17    ColumnDef, CreateSubsourceOption, CreateSubsourceOptionName, CreateSubsourceStatement,
18    ExternalReferences, Ident, SqlServerConfigOptionName, TableConstraint, UnresolvedItemName,
19    Value, WithOptionValue,
20};
21use mz_sql_server_util::desc::{
22    SqlServerColumnDecodeType, SqlServerColumnDesc, SqlServerQualifiedTableName,
23    SqlServerTableConstraintType, SqlServerTableDesc,
24};
25use mz_storage_types::sources::{SourceExportStatementDetails, SourceReferenceResolver};
26use prost::Message;
27
28use crate::names::{Aug, ResolvedItemName};
29use crate::plan::{PlanError, StatementContext};
30use crate::pure::{
31    PurifiedExportDetails, PurifiedSourceExport, RequestedSourceExport, RetrievedSourceReferences,
32    SourceReferencePolicy, SqlServerSourcePurificationError,
33};
34
35pub(super) struct PurifiedSourceExports {
36    /// Map of source export names to details of the export.
37    pub(super) source_exports: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
38    /// Normalized list of columns that we'll decode as text.
39    ///
40    /// Note: Each source export (i.e. subsource) also tracks which of its
41    /// columns we decode as text, but we also return this normalized list so
42    /// we can roundtrip a `CREATE SOURCE` statement.
43    pub(super) normalized_text_columns: Vec<WithOptionValue<Aug>>,
44    /// Normalized list of columns that we'll exclude.
45    ///
46    /// Note: Each source export (i.e. subsource) also tracks which of its
47    /// columns we should exlude, but we also return this normalized list so
48    /// we can roundtrip a `CREATE SOURCE` statement.
49    pub(super) normalized_excl_columns: Vec<WithOptionValue<Aug>>,
50}
51
52/// Purify the requested [`ExternalReferences`] from the provided
53/// [`RetrievedSourceReferences`].
54#[allow(clippy::unused_async)]
55pub(super) async fn purify_source_exports(
56    database: &str,
57    client: &mut mz_sql_server_util::Client,
58    retrieved_references: &RetrievedSourceReferences,
59    requested_references: &Option<ExternalReferences>,
60    text_columns: &[UnresolvedItemName],
61    excl_columns: &[UnresolvedItemName],
62    unresolved_source_name: &UnresolvedItemName,
63    timeout: Duration,
64    reference_policy: &SourceReferencePolicy,
65) -> Result<PurifiedSourceExports, PlanError> {
66    let requested_exports = match requested_references.as_ref() {
67        Some(requested) => {
68            if *reference_policy == SourceReferencePolicy::NotAllowed {
69                return Err(PlanError::UseTablesForSources(requested.to_string()));
70            } else {
71                let exports = retrieved_references
72                    .requested_source_exports(Some(requested), unresolved_source_name)?;
73                exports
74            }
75        }
76        None => {
77            if *reference_policy == SourceReferencePolicy::Required {
78                return Err(SqlServerSourcePurificationError::RequiresExternalReferences.into());
79            }
80
81            // If no external reference is specified, it does not make sense to include
82            // or text or exclude columns.
83            if !text_columns.is_empty() {
84                Err(
85                    SqlServerSourcePurificationError::UnnecessaryOptionsWithoutReferences(
86                        "TEXT COLUMNS".to_string(),
87                    ),
88                )?
89            }
90            if !excl_columns.is_empty() {
91                Err(
92                    SqlServerSourcePurificationError::UnnecessaryOptionsWithoutReferences(
93                        "EXCLUDE COLUMNS".to_string(),
94                    ),
95                )?
96            }
97
98            return Ok(PurifiedSourceExports {
99                source_exports: BTreeMap::default(),
100                normalized_text_columns: Vec::default(),
101                normalized_excl_columns: Vec::default(),
102            });
103        }
104    };
105
106    if requested_exports.is_empty() {
107        sql_bail!(
108            "SQL Server source must ingest at least one table, but {} matched none",
109            requested_references
110                .as_ref()
111                .expect("checked above")
112                .to_ast_string_simple()
113        )
114    }
115
116    super::validate_source_export_names(&requested_exports)?;
117
118    // TODO(sql_server2): Should we check if these have overlapping columns?
119    // What do our other sources do?
120    let text_cols_map = map_column_refs(text_columns, SqlServerConfigOptionName::TextColumns)?;
121    let excl_cols_map = map_column_refs(excl_columns, SqlServerConfigOptionName::ExcludeColumns)?;
122
123    // Ensure the columns we intend to include are supported.
124    //
125    // For example, we don't support `text` columns in SQL Server because they
126    // do not include the "before" value when an `UPDATE` occurs. When parsing
127    // the raw metadata for this type we mark the column as "unsupported" but
128    // don't return an error, in case later (here) they intend to exclude that
129    // column.
130    for export in &requested_exports {
131        let table = export.meta.sql_server_table().expect("sql server source");
132        let maybe_excl_cols = excl_cols_map.get(&table.qualified_name());
133        let is_excluded = |name| {
134            maybe_excl_cols
135                .as_ref()
136                .map(|cols| cols.contains(name))
137                .unwrap_or(false)
138        };
139
140        let capture_instance = export
141            .meta
142            .sql_server_capture_instance()
143            .expect("sql server capture instance");
144        let mut cdc_columns =
145            mz_sql_server_util::inspect::get_cdc_table_columns(client, capture_instance).await?;
146        let mut mismatched_columns = vec![];
147
148        for column in table.columns.iter() {
149            let col_excluded = is_excluded(column.name.as_ref());
150            if let SqlServerColumnDecodeType::Unsupported { context } = &column.decode_type
151                && !col_excluded
152            {
153                Err(SqlServerSourcePurificationError::UnsupportedColumn {
154                    schema_name: Arc::clone(&table.schema_name),
155                    tbl_name: Arc::clone(&table.name),
156                    col_name: Arc::clone(&column.name),
157                    col_type: Arc::clone(&column.raw_type),
158                    context: context.clone(),
159                })?
160            }
161
162            // Columns in the upstream table must be in the CDC table, unless excluded.
163            let cdc_column = cdc_columns.remove(&column.name);
164            if col_excluded {
165                continue;
166            }
167            let Some(cdc_column) = cdc_column else {
168                mismatched_columns.push(Arc::clone(&column.name));
169                continue;
170            };
171
172            let cdc_column = SqlServerColumnDesc::new(&cdc_column);
173            match (cdc_column.column_type.as_ref(), column.column_type.as_ref()) {
174                (None, None) => (),
175                (Some(cdc_type), Some(tbl_type)) => {
176                    // Nullability needs to be excluded in this check as the CDC table column
177                    // must be nullable to support deleted columns from the upstream table.
178                    if cdc_type.scalar_type != tbl_type.scalar_type {
179                        mismatched_columns.push(Arc::clone(&column.name));
180                    }
181                }
182                (Some(_), None) | (None, Some(_)) => {
183                    mismatched_columns.push(Arc::clone(&column.name));
184                }
185            }
186        }
187
188        // We only need to error for columns that exist in the upstream table and not in the CDC
189        // table.  SQL Server decodes based on column name, so any columns missing from the table
190        // description are ignored when reading from CDC.
191        if !mismatched_columns.is_empty() {
192            Err(SqlServerSourcePurificationError::CdcMissingColumns {
193                capture_instance: Arc::clone(capture_instance),
194                col_names: mismatched_columns,
195            })?
196        }
197    }
198
199    let capture_instances: BTreeMap<_, _> = requested_exports
200        .iter()
201        .map(|requested| {
202            let table = requested
203                .meta
204                .sql_server_table()
205                .expect("sql server source");
206            let capture_instance = requested
207                .meta
208                .sql_server_capture_instance()
209                .expect("sql server source");
210
211            (table.qualified_name(), Arc::clone(capture_instance))
212        })
213        .collect();
214
215    mz_sql_server_util::inspect::validate_source_privileges(
216        client,
217        capture_instances.values().map(|instance| instance.as_ref()),
218    )
219    .await?;
220
221    // If CDC is freshly enabled for a table, it has been observed that
222    // the `start_lsn`` from `cdc.change_tables` can be ahead of the LSN
223    // returned by `sys.fn_cdc_get_max_lsn`.  Eventually, the LSN returned
224    // by `sys.fn_cdc_get_max_lsn` will surpass `start_lsn`. For this
225    // reason, we choose the initial_lsn to be
226    // `max(start_lsns, sys.fn_cdc_get_max_lsn())``.
227    let mut initial_lsns = mz_sql_server_util::inspect::get_min_lsns(
228        client,
229        capture_instances.values().map(|instance| instance.as_ref()),
230    )
231    .await?;
232
233    let max_lsn = mz_sql_server_util::inspect::get_max_lsn_retry(client, timeout).await?;
234    tracing::debug!(?initial_lsns, %max_lsn, "retrieved start LSNs");
235    for lsn in initial_lsns.values_mut() {
236        *lsn = std::cmp::max(*lsn, max_lsn);
237    }
238
239    let mut tables = vec![];
240    for requested in requested_exports {
241        let mut table = requested
242            .meta
243            .sql_server_table()
244            .expect("sql server source")
245            .clone();
246
247        let maybe_text_cols = text_cols_map.get(&table.qualified_name());
248        let maybe_excl_cols = excl_cols_map.get(&table.qualified_name());
249
250        if let Some(text_cols) = maybe_text_cols {
251            table.apply_text_columns(text_cols);
252        }
253
254        if let Some(excl_cols) = maybe_excl_cols {
255            table.apply_excl_columns(excl_cols);
256        }
257
258        if table.columns.iter().all(|c| c.is_excluded()) {
259            Err(SqlServerSourcePurificationError::AllColumnsExcluded {
260                tbl_name: Arc::clone(&table.name),
261            })?
262        }
263
264        tables.push(requested.change_meta(table));
265    }
266
267    if tables.is_empty() {
268        Err(SqlServerSourcePurificationError::NoTables)?;
269    }
270
271    let reference_resolver = SourceReferenceResolver::new(
272        database,
273        &tables.iter().map(|r| &r.meta).collect::<Vec<_>>(),
274    )?;
275
276    // Normalize column options and remove unused column references.
277    let normalized_text_columns = normalize_column_refs(
278        text_columns,
279        &reference_resolver,
280        &tables,
281        SqlServerConfigOptionName::TextColumns,
282    )?;
283    let normalized_excl_columns = normalize_column_refs(
284        excl_columns,
285        &reference_resolver,
286        &tables,
287        SqlServerConfigOptionName::ExcludeColumns,
288    )?;
289
290    let exports = tables
291        .into_iter()
292        .map(|reference| {
293            let table_reference = reference.meta.qualified_name();
294            let text_columns = text_cols_map.get(&table_reference).map(|cols| {
295                cols.iter()
296                    .map(|c| Ident::new(*c).expect("validated above"))
297                    .collect()
298            });
299            let excl_columns = excl_cols_map.get(&table_reference).map(|cols| {
300                cols.iter()
301                    .map(|c| Ident::new(*c).expect("validated above"))
302                    .collect()
303            });
304            let capture_instance = capture_instances
305                .get(&reference.meta.qualified_name())
306                .expect("capture instance should exist");
307
308            let initial_lsn = *initial_lsns.get(capture_instance).ok_or_else(|| {
309                SqlServerSourcePurificationError::NoStartLsn(capture_instance.to_string())
310            })?;
311
312            let export = PurifiedSourceExport {
313                external_reference: reference.external_reference,
314                details: PurifiedExportDetails::SqlServer {
315                    table: reference.meta,
316                    text_columns,
317                    excl_columns,
318                    capture_instance: Arc::clone(capture_instance),
319                    initial_lsn,
320                },
321            };
322
323            Ok::<_, SqlServerSourcePurificationError>((reference.name, export))
324        })
325        .collect::<Result<_, _>>()?;
326
327    Ok(PurifiedSourceExports {
328        source_exports: exports,
329        normalized_text_columns,
330        normalized_excl_columns,
331    })
332}
333
334pub fn generate_create_subsource_statements(
335    scx: &StatementContext,
336    source_name: ResolvedItemName,
337    requested_subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
338) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
339    let mut subsources = Vec::with_capacity(requested_subsources.len());
340
341    for (subsource_name, purified_export) in requested_subsources {
342        let SqlServerExportStatementValues {
343            columns,
344            constraints,
345            text_columns,
346            excl_columns,
347            details,
348            external_reference,
349        } = generate_source_export_statement_values(scx, purified_export)?;
350
351        let mut with_options = vec![
352            CreateSubsourceOption {
353                name: CreateSubsourceOptionName::ExternalReference,
354                value: Some(WithOptionValue::UnresolvedItemName(external_reference)),
355            },
356            CreateSubsourceOption {
357                name: CreateSubsourceOptionName::Details,
358                value: Some(WithOptionValue::Value(Value::String(hex::encode(
359                    details.into_proto().encode_to_vec(),
360                )))),
361            },
362        ];
363
364        if let Some(text_columns) = text_columns {
365            with_options.push(CreateSubsourceOption {
366                name: CreateSubsourceOptionName::TextColumns,
367                value: Some(WithOptionValue::Sequence(text_columns)),
368            });
369        }
370        if let Some(excl_columns) = excl_columns {
371            with_options.push(CreateSubsourceOption {
372                name: CreateSubsourceOptionName::ExcludeColumns,
373                value: Some(WithOptionValue::Sequence(excl_columns)),
374            });
375        }
376
377        // Create the subsource statement
378        let subsource = CreateSubsourceStatement {
379            name: subsource_name,
380            columns,
381            of_source: Some(source_name.clone()),
382            constraints,
383            if_not_exists: false,
384            with_options,
385        };
386        subsources.push(subsource);
387    }
388
389    Ok(subsources)
390}
391
392pub(super) struct SqlServerExportStatementValues {
393    pub(super) columns: Vec<ColumnDef<Aug>>,
394    pub(super) constraints: Vec<TableConstraint<Aug>>,
395    pub(super) text_columns: Option<Vec<WithOptionValue<Aug>>>,
396    pub(super) excl_columns: Option<Vec<WithOptionValue<Aug>>>,
397    pub(super) details: SourceExportStatementDetails,
398    pub(super) external_reference: UnresolvedItemName,
399}
400
401pub(super) fn generate_source_export_statement_values(
402    scx: &StatementContext,
403    purified_export: PurifiedSourceExport,
404) -> Result<SqlServerExportStatementValues, PlanError> {
405    let PurifiedExportDetails::SqlServer {
406        table,
407        text_columns,
408        excl_columns,
409        capture_instance,
410        initial_lsn,
411    } = purified_export.details
412    else {
413        unreachable!("purified export details must be SQL Server")
414    };
415
416    // Filter out columns that the user wanted to exclude.
417    let included_columns = table
418        .columns
419        .iter()
420        .filter_map(|c| c.column_type.as_ref().map(|ct| (c.name.as_ref(), ct)));
421
422    let mut column_defs = vec![];
423    let mut constraints = vec![];
424
425    // there are likely less excluded columns than included
426    let excluded_columns: BTreeSet<_> = table
427        .columns
428        .iter()
429        .filter_map(|c| match &c.column_type {
430            None => Some(c.name.as_ref()),
431            Some(_) => None,
432        })
433        .collect();
434    for constraint in table.constraints.iter() {
435        // if one or more columns of the constraint are excluded, do not record the constraint
436        // in materialize
437        if constraint
438            .column_names
439            .iter()
440            .any(|col| excluded_columns.contains(col.as_str()))
441        {
442            tracing::debug!(
443                "skipping constraint {name} due to excluded column",
444                name = &constraint.constraint_name
445            );
446            continue;
447        }
448
449        constraints.push(mz_sql_parser::ast::TableConstraint::Unique {
450            name: Some(Ident::new_lossy(constraint.constraint_name.clone())),
451            columns: constraint
452                .column_names
453                .iter()
454                .map(Ident::new)
455                .collect::<Result<_, _>>()?,
456            is_primary: matches!(
457                constraint.constraint_type,
458                SqlServerTableConstraintType::PrimaryKey
459            ),
460            // NULLs are not distinct for SQL Server UNIQUE constraint, and not allowed for PRIMARY KEY.
461            // See <https://learn.microsoft.com/en-us/sql/relational-databases/tables/unique-constraints-and-check-constraints?view=sql-server-ver17#unique-constraints>
462            nulls_not_distinct: matches!(
463                constraint.constraint_type,
464                SqlServerTableConstraintType::Unique
465            ),
466        });
467    }
468
469    tracing::debug!(
470        "source export constraints for {schema_name}.{table_name}: {constraints:?}",
471        schema_name = &table.schema_name,
472        table_name = &table.name
473    );
474
475    for (col_name, col_type) in included_columns {
476        let name = Ident::new(col_name)?;
477        let ty = mz_pgrepr::Type::from(&col_type.scalar_type);
478        let data_type = scx.resolve_type(ty)?;
479        let mut col_options = vec![];
480
481        if !col_type.nullable {
482            col_options.push(mz_sql_parser::ast::ColumnOptionDef {
483                name: None,
484                option: mz_sql_parser::ast::ColumnOption::NotNull,
485            });
486        }
487
488        column_defs.push(ColumnDef {
489            name,
490            data_type,
491            collation: None,
492            options: col_options,
493        });
494    }
495
496    let details = SourceExportStatementDetails::SqlServer {
497        table,
498        capture_instance,
499        initial_lsn,
500    };
501    let text_columns = text_columns.map(|mut columns| {
502        columns.sort();
503        columns
504            .into_iter()
505            .map(WithOptionValue::Ident::<Aug>)
506            .collect()
507    });
508    let excl_columns = excl_columns.map(|mut columns| {
509        columns.sort();
510        columns
511            .into_iter()
512            .map(WithOptionValue::Ident::<Aug>)
513            .collect()
514    });
515
516    let values = SqlServerExportStatementValues {
517        columns: column_defs,
518        constraints,
519        text_columns,
520        excl_columns,
521        details,
522        external_reference: purified_export.external_reference,
523    };
524    Ok(values)
525}
526
527/// Create a [`BTreeMap`] of [`SqlServerQualifiedTableName`] to a [`BTreeSet`] of column names.
528fn map_column_refs<'a>(
529    cols: &'a [UnresolvedItemName],
530    option_type: SqlServerConfigOptionName,
531) -> Result<BTreeMap<SqlServerQualifiedTableName, BTreeSet<&'a str>>, PlanError> {
532    let mut table_to_cols: BTreeMap<_, BTreeSet<_>> = BTreeMap::new();
533    for name in cols.iter() {
534        // We only support fully qualified references for now (e.g. `schema_name.table_name.column_name`)
535        if name.0.len() == 3 {
536            let key = mz_sql_server_util::desc::SqlServerQualifiedTableName {
537                schema_name: name.0[0].as_str().into(),
538                table_name: name.0[1].as_str().into(),
539            };
540            let new = table_to_cols
541                .entry(key)
542                .or_default()
543                .insert(name.0[2].as_str());
544            if !new {
545                return Err(PlanError::InvalidOptionValue {
546                    option_name: option_type.to_ast_string_simple(),
547                    err: Box::new(PlanError::UnexpectedDuplicateReference { name: name.clone() }),
548                });
549            }
550        } else {
551            return Err(PlanError::InvalidOptionValue {
552                option_name: option_type.to_ast_string_simple(),
553                err: Box::new(PlanError::UnderqualifiedColumnName(name.to_string())),
554            });
555        }
556    }
557    Ok(table_to_cols)
558}
559
560/// Normalize the provided `cols` references to a sorted and deduplicated list of
561/// [`WithOptionValue`]s.
562///
563/// Returns an error if any column reference in `cols` is not part of a table in the
564/// provided `tables`.
565fn normalize_column_refs(
566    cols: &[UnresolvedItemName],
567    reference_resolver: &SourceReferenceResolver,
568    tables: &[RequestedSourceExport<SqlServerTableDesc>],
569    option_name: SqlServerConfigOptionName,
570) -> Result<Vec<WithOptionValue<Aug>>, SqlServerSourcePurificationError> {
571    let (seq, unknown): (Vec<_>, Vec<_>) = cols.into_iter().partition(|name| {
572        let (column_name, qual) = name.0.split_last().expect("non-empty");
573        match reference_resolver.resolve_idx(qual) {
574            // TODO(sql_server3): This needs to also introduce the maximum qualification
575            // on  the columns, i.e. ensure they have the schema name.
576            Ok(idx) => tables[idx]
577                .meta
578                .columns
579                .iter()
580                .any(|n| &*n.name == column_name.as_str()),
581            Err(_) => false,
582        }
583    });
584
585    if !unknown.is_empty() {
586        return Err(SqlServerSourcePurificationError::DanglingColumns {
587            option_name: option_name.to_string(),
588            items: unknown.into_iter().cloned().collect(),
589        });
590    }
591
592    let mut seq: Vec<_> = seq
593        .into_iter()
594        .cloned()
595        .map(WithOptionValue::UnresolvedItemName)
596        .collect();
597
598    seq.sort();
599    seq.dedup();
600    Ok(seq)
601}