mz_sql/pure/
sql_server.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::sync::Arc;
12use std::time::Duration;
13
14use mz_ore::collections::CollectionExt;
15use mz_proto::RustType;
16use mz_sql_parser::ast::display::AstDisplay;
17use mz_sql_parser::ast::{
18    ColumnDef, CreateSubsourceOption, CreateSubsourceOptionName, CreateSubsourceStatement,
19    ExternalReferences, Ident, SqlServerConfigOptionName, TableConstraint, UnresolvedItemName,
20    Value, WithOptionValue,
21};
22use mz_sql_server_util::desc::{SqlServerQualifiedTableName, SqlServerTableDesc};
23use mz_storage_types::sources::{SourceExportStatementDetails, SourceReferenceResolver};
24use prost::Message;
25
26use crate::names::{Aug, ResolvedItemName};
27use crate::plan::{PlanError, StatementContext};
28use crate::pure::{
29    PurifiedExportDetails, PurifiedSourceExport, RequestedSourceExport, RetrievedSourceReferences,
30    SourceReferencePolicy, SqlServerSourcePurificationError,
31};
32
33pub(super) struct PurifiedSourceExports {
34    /// Map of source export names to details of the export.
35    pub(super) source_exports: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
36    /// Normalized list of columns that we'll decode as text.
37    ///
38    /// Note: Each source export (i.e. subsource) also tracks which of its
39    /// columns we decode as text, but we also return this normalized list so
40    /// we can roundtrip a `CREATE SOURCE` statement.
41    pub(super) normalized_text_columns: Vec<WithOptionValue<Aug>>,
42    /// Normalized list of columns that we'll exclude.
43    ///
44    /// Note: Each source export (i.e. subsource) also tracks which of its
45    /// columns we should exlude, but we also return this normalized list so
46    /// we can roundtrip a `CREATE SOURCE` statement.
47    pub(super) normalized_excl_columns: Vec<WithOptionValue<Aug>>,
48}
49
50/// Purify the requested [`ExternalReferences`] from the provided
51/// [`RetrievedSourceReferences`].
52#[allow(clippy::unused_async)]
53pub(super) async fn purify_source_exports(
54    database: &str,
55    client: &mut mz_sql_server_util::Client,
56    retrieved_references: &RetrievedSourceReferences,
57    requested_references: &Option<ExternalReferences>,
58    text_columns: &[UnresolvedItemName],
59    excl_columns: &[UnresolvedItemName],
60    unresolved_source_name: &UnresolvedItemName,
61    timeout: Duration,
62    reference_policy: &SourceReferencePolicy,
63) -> Result<PurifiedSourceExports, PlanError> {
64    let requested_exports = match requested_references.as_ref() {
65        Some(requested) => {
66            if *reference_policy == SourceReferencePolicy::NotAllowed {
67                return Err(PlanError::UseTablesForSources(requested.to_string()));
68            } else {
69                let exports = retrieved_references
70                    .requested_source_exports(Some(requested), unresolved_source_name)?;
71                exports
72            }
73        }
74        None => {
75            if *reference_policy == SourceReferencePolicy::Required {
76                return Err(SqlServerSourcePurificationError::RequiresExternalReferences.into());
77            }
78
79            // If no external reference is specified, it does not make sense to include
80            // or text or exclude columns.
81            if !text_columns.is_empty() {
82                Err(
83                    SqlServerSourcePurificationError::UnnecessaryOptionsWithoutReferences(
84                        "TEXT COLUMNS".to_string(),
85                    ),
86                )?
87            }
88            if !excl_columns.is_empty() {
89                Err(
90                    SqlServerSourcePurificationError::UnnecessaryOptionsWithoutReferences(
91                        "EXCLUDE COLUMNS".to_string(),
92                    ),
93                )?
94            }
95
96            return Ok(PurifiedSourceExports {
97                source_exports: BTreeMap::default(),
98                normalized_text_columns: Vec::default(),
99                normalized_excl_columns: Vec::default(),
100            });
101        }
102    };
103
104    if requested_exports.is_empty() {
105        sql_bail!(
106            "SQL Server source must ingest at least one table, but {} matched none",
107            requested_references
108                .as_ref()
109                .expect("checked above")
110                .to_ast_string_simple()
111        )
112    }
113
114    super::validate_source_export_names(&requested_exports)?;
115
116    // TODO(sql_server2): Should we check if these have overlapping columns?
117    // What do our other sources do?
118    let text_cols_map = map_column_refs(text_columns, SqlServerConfigOptionName::TextColumns)?;
119    let excl_cols_map = map_column_refs(excl_columns, SqlServerConfigOptionName::ExcludeColumns)?;
120
121    // Ensure the columns we intend to include are supported.
122    //
123    // For example, we don't support `text` columns in SQL Server because they
124    // do not include the "before" value when an `UPDATE` occurs. When parsing
125    // the raw metadata for this type we mark the column as "unsupported" but
126    // don't return an error, in case later (here) they intend to exclude that
127    // column.
128    for export in &requested_exports {
129        let table = export.meta.sql_server_table().expect("sql server source");
130        let maybe_excl_cols = excl_cols_map.get(&table.qualified_name());
131        let is_excluded = |name| {
132            maybe_excl_cols
133                .as_ref()
134                .map(|cols| cols.contains(name))
135                .unwrap_or(false)
136        };
137
138        let maybe_bad_column = table
139            .columns
140            .iter()
141            .find(|col| !col.is_supported() && !is_excluded(col.name.as_ref()));
142        if let Some(bad_column) = maybe_bad_column {
143            Err(SqlServerSourcePurificationError::UnsupportedColumn {
144                schema_name: Arc::clone(&table.schema_name),
145                tbl_name: Arc::clone(&table.name),
146                col_name: Arc::clone(&bad_column.name),
147                col_type: Arc::clone(&bad_column.raw_type),
148            })?
149        }
150    }
151
152    let capture_instances: BTreeMap<_, _> = requested_exports
153        .iter()
154        .map(|requested| {
155            let table = requested
156                .meta
157                .sql_server_table()
158                .expect("sql server source");
159            let capture_instance = requested
160                .meta
161                .sql_server_capture_instance()
162                .expect("sql server source");
163
164            (table.qualified_name(), Arc::clone(capture_instance))
165        })
166        .collect();
167
168    mz_sql_server_util::inspect::validate_source_privileges(
169        client,
170        capture_instances.values().map(|instance| instance.as_ref()),
171    )
172    .await?;
173
174    // If CDC is freshly enabled for a table, it has been observed that
175    // the `start_lsn`` from `cdc.change_tables` can be ahead of the LSN
176    // returned by `sys.fn_cdc_get_max_lsn`.  Eventually, the LSN returned
177    // by `sys.fn_cdc_get_max_lsn` will surpass `start_lsn`. For this
178    // reason, we choose the initial_lsn to be
179    // `max(start_lsns, sys.fn_cdc_get_max_lsn())``.
180    let mut initial_lsns = mz_sql_server_util::inspect::get_min_lsns(
181        client,
182        capture_instances.values().map(|instance| instance.as_ref()),
183    )
184    .await?;
185
186    let max_lsn = mz_sql_server_util::inspect::get_max_lsn_retry(client, timeout).await?;
187    tracing::debug!(?initial_lsns, %max_lsn, "retrieved start LSNs");
188    for lsn in initial_lsns.values_mut() {
189        *lsn = std::cmp::max(*lsn, max_lsn);
190    }
191
192    let mut tables = vec![];
193    for requested in requested_exports {
194        let mut table = requested
195            .meta
196            .sql_server_table()
197            .expect("sql server source")
198            .clone();
199
200        let maybe_text_cols = text_cols_map.get(&table.qualified_name());
201        let maybe_excl_cols = excl_cols_map.get(&table.qualified_name());
202
203        if let Some(text_cols) = maybe_text_cols {
204            table.apply_text_columns(text_cols);
205        }
206
207        if let Some(excl_cols) = maybe_excl_cols {
208            table.apply_excl_columns(excl_cols);
209        }
210
211        if table.columns.iter().all(|c| c.is_excluded()) {
212            Err(SqlServerSourcePurificationError::AllColumnsExcluded {
213                tbl_name: Arc::clone(&table.name),
214            })?
215        }
216
217        tables.push(requested.change_meta(table));
218    }
219
220    if tables.is_empty() {
221        Err(SqlServerSourcePurificationError::NoTables)?;
222    }
223
224    let reference_resolver = SourceReferenceResolver::new(
225        database,
226        &tables.iter().map(|r| &r.meta).collect::<Vec<_>>(),
227    )?;
228
229    // Normalize column options and remove unused column references.
230    let normalized_text_columns = normalize_column_refs(
231        text_columns,
232        &reference_resolver,
233        &tables,
234        SqlServerConfigOptionName::TextColumns,
235    )?;
236    let normalized_excl_columns = normalize_column_refs(
237        excl_columns,
238        &reference_resolver,
239        &tables,
240        SqlServerConfigOptionName::ExcludeColumns,
241    )?;
242
243    let exports = tables
244        .into_iter()
245        .map(|reference| {
246            let table_reference = reference.meta.qualified_name();
247            let text_columns = text_cols_map.get(&table_reference).map(|cols| {
248                cols.iter()
249                    .map(|c| Ident::new(*c).expect("validated above"))
250                    .collect()
251            });
252            let excl_columns = excl_cols_map.get(&table_reference).map(|cols| {
253                cols.iter()
254                    .map(|c| Ident::new(*c).expect("validated above"))
255                    .collect()
256            });
257            let capture_instance = capture_instances
258                .get(&reference.meta.qualified_name())
259                .expect("capture instance should exist");
260
261            let initial_lsn = *initial_lsns.get(capture_instance).ok_or_else(|| {
262                SqlServerSourcePurificationError::NoStartLsn(capture_instance.to_string())
263            })?;
264
265            let export = PurifiedSourceExport {
266                external_reference: reference.external_reference,
267                details: PurifiedExportDetails::SqlServer {
268                    table: reference.meta,
269                    text_columns,
270                    excl_columns,
271                    capture_instance: Arc::clone(capture_instance),
272                    initial_lsn,
273                },
274            };
275
276            Ok::<_, SqlServerSourcePurificationError>((reference.name, export))
277        })
278        .collect::<Result<_, _>>()?;
279
280    Ok(PurifiedSourceExports {
281        source_exports: exports,
282        normalized_text_columns,
283        normalized_excl_columns,
284    })
285}
286
287pub fn generate_create_subsource_statements(
288    scx: &StatementContext,
289    source_name: ResolvedItemName,
290    requested_subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
291) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
292    let mut subsources = Vec::with_capacity(requested_subsources.len());
293
294    for (subsource_name, purified_export) in requested_subsources {
295        let SqlServerExportStatementValues {
296            columns,
297            constraints,
298            text_columns,
299            excl_columns,
300            details,
301            external_reference,
302        } = generate_source_export_statement_values(scx, purified_export)?;
303
304        let mut with_options = vec![
305            CreateSubsourceOption {
306                name: CreateSubsourceOptionName::ExternalReference,
307                value: Some(WithOptionValue::UnresolvedItemName(external_reference)),
308            },
309            CreateSubsourceOption {
310                name: CreateSubsourceOptionName::Details,
311                value: Some(WithOptionValue::Value(Value::String(hex::encode(
312                    details.into_proto().encode_to_vec(),
313                )))),
314            },
315        ];
316
317        if let Some(text_columns) = text_columns {
318            with_options.push(CreateSubsourceOption {
319                name: CreateSubsourceOptionName::TextColumns,
320                value: Some(WithOptionValue::Sequence(text_columns)),
321            });
322        }
323        if let Some(excl_columns) = excl_columns {
324            with_options.push(CreateSubsourceOption {
325                name: CreateSubsourceOptionName::ExcludeColumns,
326                value: Some(WithOptionValue::Sequence(excl_columns)),
327            });
328        }
329
330        // Create the subsource statement
331        let subsource = CreateSubsourceStatement {
332            name: subsource_name,
333            columns,
334            of_source: Some(source_name.clone()),
335            constraints,
336            if_not_exists: false,
337            with_options,
338        };
339        subsources.push(subsource);
340    }
341
342    Ok(subsources)
343}
344
345pub(super) struct SqlServerExportStatementValues {
346    pub(super) columns: Vec<ColumnDef<Aug>>,
347    pub(super) constraints: Vec<TableConstraint<Aug>>,
348    pub(super) text_columns: Option<Vec<WithOptionValue<Aug>>>,
349    pub(super) excl_columns: Option<Vec<WithOptionValue<Aug>>>,
350    pub(super) details: SourceExportStatementDetails,
351    pub(super) external_reference: UnresolvedItemName,
352}
353
354pub(super) fn generate_source_export_statement_values(
355    scx: &StatementContext,
356    purified_export: PurifiedSourceExport,
357) -> Result<SqlServerExportStatementValues, PlanError> {
358    let PurifiedExportDetails::SqlServer {
359        table,
360        text_columns,
361        excl_columns,
362        capture_instance,
363        initial_lsn,
364    } = purified_export.details
365    else {
366        unreachable!("purified export details must be SQL Server")
367    };
368
369    // Filter out columns that the user wanted to exclude.
370    let included_columns = table.columns.iter().filter_map(|c| {
371        c.column_type
372            .as_ref()
373            .map(|ct| (c.name.as_ref(), ct, c.primary_key_constraint.clone()))
374    });
375
376    let mut primary_keys: BTreeMap<Arc<str>, Vec<Ident>> = BTreeMap::new();
377    let mut column_defs = vec![];
378    let mut constraints = vec![];
379
380    for (col_name, col_type, col_primary_key_constraint) in included_columns {
381        let name = Ident::new(col_name)?;
382        let ty = mz_pgrepr::Type::from(&col_type.scalar_type);
383        let data_type = scx.resolve_type(ty)?;
384        let mut col_options = vec![];
385
386        if let Some(constraint) = col_primary_key_constraint {
387            let columns = primary_keys.entry(constraint).or_default();
388            columns.push(name.clone());
389        }
390        if !col_type.nullable {
391            col_options.push(mz_sql_parser::ast::ColumnOptionDef {
392                name: None,
393                option: mz_sql_parser::ast::ColumnOption::NotNull,
394            });
395        }
396
397        column_defs.push(ColumnDef {
398            name,
399            data_type,
400            collation: None,
401            options: col_options,
402        });
403    }
404
405    match primary_keys.len() {
406        // No primary key.
407        0 => (),
408        1 => {
409            let (constraint_name, columns) = primary_keys.into_element();
410            constraints.push(mz_sql_parser::ast::TableConstraint::Unique {
411                name: Some(Ident::new_lossy(&*constraint_name)),
412                columns,
413                is_primary: true,
414                nulls_not_distinct: false,
415            });
416        }
417        // Multiple primary keys..?
418        2.. => {
419            let constraint_names = primary_keys.into_keys().collect();
420            return Err(PlanError::SqlServerSourcePurificationError(
421                SqlServerSourcePurificationError::MultiplePrimaryKeys { constraint_names },
422            ));
423        }
424    }
425
426    let details = SourceExportStatementDetails::SqlServer {
427        table,
428        capture_instance,
429        initial_lsn,
430    };
431    let text_columns = text_columns.map(|mut columns| {
432        columns.sort();
433        columns
434            .into_iter()
435            .map(WithOptionValue::Ident::<Aug>)
436            .collect()
437    });
438    let excl_columns = excl_columns.map(|mut columns| {
439        columns.sort();
440        columns
441            .into_iter()
442            .map(WithOptionValue::Ident::<Aug>)
443            .collect()
444    });
445
446    let values = SqlServerExportStatementValues {
447        columns: column_defs,
448        constraints,
449        text_columns,
450        excl_columns,
451        details,
452        external_reference: purified_export.external_reference,
453    };
454    Ok(values)
455}
456
457/// Create a [`BTreeMap`] of [`SqlServerQualifiedTableName`] to a [`BTreeSet`] of column names.
458fn map_column_refs<'a>(
459    cols: &'a [UnresolvedItemName],
460    option_type: SqlServerConfigOptionName,
461) -> Result<BTreeMap<SqlServerQualifiedTableName, BTreeSet<&'a str>>, PlanError> {
462    let mut table_to_cols: BTreeMap<_, BTreeSet<_>> = BTreeMap::new();
463    for name in cols.iter() {
464        // We only support fully qualified references for now (e.g. `schema_name.table_name.column_name`)
465        if name.0.len() == 3 {
466            let key = mz_sql_server_util::desc::SqlServerQualifiedTableName {
467                schema_name: name.0[0].as_str().into(),
468                table_name: name.0[1].as_str().into(),
469            };
470            let new = table_to_cols
471                .entry(key)
472                .or_default()
473                .insert(name.0[2].as_str());
474            if !new {
475                return Err(PlanError::InvalidOptionValue {
476                    option_name: option_type.to_ast_string_simple(),
477                    err: Box::new(PlanError::UnexpectedDuplicateReference { name: name.clone() }),
478                });
479            }
480        } else {
481            return Err(PlanError::InvalidOptionValue {
482                option_name: option_type.to_ast_string_simple(),
483                err: Box::new(PlanError::UnderqualifiedColumnName(name.to_string())),
484            });
485        }
486    }
487    Ok(table_to_cols)
488}
489
490/// Normalize the provided `cols` references to a sorted and deduplicated list of
491/// [`WithOptionValue`]s.
492///
493/// Returns an error if any column reference in `cols` is not part of a table in the
494/// provided `tables`.
495fn normalize_column_refs(
496    cols: &[UnresolvedItemName],
497    reference_resolver: &SourceReferenceResolver,
498    tables: &[RequestedSourceExport<SqlServerTableDesc>],
499    option_name: SqlServerConfigOptionName,
500) -> Result<Vec<WithOptionValue<Aug>>, SqlServerSourcePurificationError> {
501    let (seq, unknown): (Vec<_>, Vec<_>) = cols.into_iter().partition(|name| {
502        let (column_name, qual) = name.0.split_last().expect("non-empty");
503        match reference_resolver.resolve_idx(qual) {
504            // TODO(sql_server3): This needs to also introduce the maximum qualification
505            // on  the columns, i.e. ensure they have the schema name.
506            Ok(idx) => tables[idx]
507                .meta
508                .columns
509                .iter()
510                .any(|n| &*n.name == column_name.as_str()),
511            Err(_) => false,
512        }
513    });
514
515    if !unknown.is_empty() {
516        return Err(SqlServerSourcePurificationError::DanglingColumns {
517            option_name: option_name.to_string(),
518            items: unknown.into_iter().cloned().collect(),
519        });
520    }
521
522    let mut seq: Vec<_> = seq
523        .into_iter()
524        .cloned()
525        .map(WithOptionValue::UnresolvedItemName)
526        .collect();
527
528    seq.sort();
529    seq.dedup();
530    Ok(seq)
531}