mz_sql/pure/
sql_server.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::sync::Arc;
12
13use mz_proto::RustType;
14use mz_sql_parser::ast::display::AstDisplay;
15use mz_sql_parser::ast::{
16    ColumnDef, CreateSubsourceOption, CreateSubsourceOptionName, CreateSubsourceStatement,
17    ExternalReferences, Ident, SqlServerConfigOptionName, UnresolvedItemName, Value,
18    WithOptionValue,
19};
20use mz_sql_server_util::desc::{SqlServerQualifiedTableName, SqlServerTableDesc};
21use mz_storage_types::sources::{SourceExportStatementDetails, SourceReferenceResolver};
22use prost::Message;
23
24use crate::names::{Aug, ResolvedItemName};
25use crate::plan::{PlanError, StatementContext};
26use crate::pure::{
27    PurifiedExportDetails, PurifiedSourceExport, RequestedSourceExport, RetrievedSourceReferences,
28    SourceReferencePolicy, SqlServerSourcePurificationError,
29};
30
31pub(super) struct PurifiedSourceExports {
32    /// Map of source export names to details of the export.
33    pub(super) source_exports: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
34    /// Normalized list of columns that we'll decode as text.
35    ///
36    /// Note: Each source export (i.e. subsource) also tracks which of its
37    /// columns we decode as text, but we also return this normalized list so
38    /// we can roundtrip a `CREATE SOURCE` statement.
39    pub(super) normalized_text_columns: Vec<WithOptionValue<Aug>>,
40    /// Normalized list of columns that we'll exclude.
41    ///
42    /// Note: Each source export (i.e. subsource) also tracks which of its
43    /// columns we should exlude, but we also return this normalized list so
44    /// we can roundtrip a `CREATE SOURCE` statement.
45    pub(super) normalized_excl_columns: Vec<WithOptionValue<Aug>>,
46}
47
48/// Purify the requested [`ExternalReferences`] from the provided
49/// [`RetrievedSourceReferences`].
50#[allow(clippy::unused_async)]
51pub(super) async fn purify_source_exports(
52    database: &str,
53    _client: &mut mz_sql_server_util::Client,
54    retrieved_references: &RetrievedSourceReferences,
55    requested_references: &Option<ExternalReferences>,
56    text_columns: &[UnresolvedItemName],
57    excl_columns: &[UnresolvedItemName],
58    unresolved_source_name: &UnresolvedItemName,
59    reference_policy: &SourceReferencePolicy,
60) -> Result<PurifiedSourceExports, PlanError> {
61    let requested_exports = match requested_references.as_ref() {
62        Some(requested) => {
63            if *reference_policy == SourceReferencePolicy::NotAllowed {
64                return Err(PlanError::UseTablesForSources(requested.to_string()));
65            } else {
66                let exports = retrieved_references
67                    .requested_source_exports(Some(requested), unresolved_source_name)?;
68                exports
69            }
70        }
71        None => {
72            if *reference_policy == SourceReferencePolicy::Required {
73                return Err(SqlServerSourcePurificationError::RequiresExternalReferences.into());
74            }
75
76            // If no external reference is specified, it does not make sense to include
77            // or text or exclude columns.
78            if !text_columns.is_empty() {
79                Err(
80                    SqlServerSourcePurificationError::UnnecessaryOptionsWithoutReferences(
81                        "TEXT COLUMNS".to_string(),
82                    ),
83                )?
84            }
85            if !excl_columns.is_empty() {
86                Err(
87                    SqlServerSourcePurificationError::UnnecessaryOptionsWithoutReferences(
88                        "EXCLUDE COLUMNS".to_string(),
89                    ),
90                )?
91            }
92
93            return Ok(PurifiedSourceExports {
94                source_exports: BTreeMap::default(),
95                normalized_text_columns: Vec::default(),
96                normalized_excl_columns: Vec::default(),
97            });
98        }
99    };
100
101    // TODO(sql_server2): Should we check if these have overlapping columns?
102    // What do our other sources do?
103    let text_cols_map = map_column_refs(text_columns, SqlServerConfigOptionName::TextColumns)?;
104    let excl_cols_map = map_column_refs(excl_columns, SqlServerConfigOptionName::ExcludeColumns)?;
105
106    if requested_exports.is_empty() {
107        sql_bail!(
108            "SQL Server source must ingest at least one table, but {} matched none",
109            requested_references
110                .as_ref()
111                .expect("checked above")
112                .to_ast_string_simple()
113        )
114    }
115
116    // Ensure the columns we intend to include are supported.
117    //
118    // For example, we don't support `text` columns in SQL Server because they
119    // do not include the "before" value when an `UPDATE` occurs. When parsing
120    // the raw metadata for this type we mark the column as "unsupported" but
121    // don't return an error, in case later (here) they intend to exclude that
122    // column.
123    for export in &requested_exports {
124        let table = export.meta.sql_server_table().expect("sql server source");
125        let maybe_excl_cols = excl_cols_map.get(&table.qualified_name());
126        let is_excluded = |name| {
127            maybe_excl_cols
128                .as_ref()
129                .map(|cols| cols.contains(name))
130                .unwrap_or(false)
131        };
132
133        let maybe_bad_column = table
134            .columns
135            .iter()
136            .find(|col| !col.is_supported() && !is_excluded(col.name.as_ref()));
137        if let Some(bad_column) = maybe_bad_column {
138            Err(SqlServerSourcePurificationError::UnsupportedColumn {
139                schema_name: Arc::clone(&table.schema_name),
140                tbl_name: Arc::clone(&table.name),
141                col_name: Arc::clone(&bad_column.name),
142                col_type: Arc::clone(&bad_column.raw_type),
143            })?
144        }
145    }
146
147    // TODO(sql_server1): Validate permissions on upstream tables.
148
149    let capture_instances: BTreeMap<_, _> = requested_exports
150        .iter()
151        .map(|requested| {
152            let table = requested
153                .meta
154                .sql_server_table()
155                .expect("sql server source");
156            let capture_instance = requested
157                .meta
158                .sql_server_capture_instance()
159                .expect("sql server source");
160
161            (table.qualified_name(), Arc::clone(capture_instance))
162        })
163        .collect();
164
165    let tables: Vec<_> = requested_exports
166        .into_iter()
167        .map(|requested| {
168            let mut table = requested
169                .meta
170                .sql_server_table()
171                .expect("sql server source")
172                .clone();
173
174            let maybe_text_cols = text_cols_map.get(&table.qualified_name());
175            let maybe_excl_cols = excl_cols_map.get(&table.qualified_name());
176
177            if let Some(text_cols) = maybe_text_cols {
178                table.apply_text_columns(text_cols);
179            }
180            // TODO(sql_server2): Should we prevent excluding all columns? What do our other
181            // sources do?
182            if let Some(excl_cols) = maybe_excl_cols {
183                table.apply_excl_columns(excl_cols);
184            }
185
186            requested.change_meta(table)
187        })
188        .collect();
189
190    if tables.is_empty() {
191        Err(SqlServerSourcePurificationError::NoTables)?;
192    }
193
194    let reference_resolver = SourceReferenceResolver::new(
195        database,
196        &tables.iter().map(|r| &r.meta).collect::<Vec<_>>(),
197    )?;
198
199    // Normalize column options and remove unused column references.
200    let normalized_text_columns = normalize_column_refs(
201        text_columns,
202        &reference_resolver,
203        &tables,
204        SqlServerConfigOptionName::TextColumns,
205    )?;
206    let normalized_excl_columns = normalize_column_refs(
207        excl_columns,
208        &reference_resolver,
209        &tables,
210        SqlServerConfigOptionName::ExcludeColumns,
211    )?;
212
213    let exports = tables
214        .into_iter()
215        .map(|reference| {
216            let table_reference = reference.meta.qualified_name();
217            let text_columns = text_cols_map.get(&table_reference).map(|cols| {
218                cols.iter()
219                    .map(|c| Ident::new(*c).expect("validated above"))
220                    .collect()
221            });
222            let excl_columns = excl_cols_map.get(&table_reference).map(|cols| {
223                cols.iter()
224                    .map(|c| Ident::new(*c).expect("validated above"))
225                    .collect()
226            });
227            let capture_instance = capture_instances
228                .get(&reference.meta.qualified_name())
229                .expect("capture instance should exist");
230
231            let export = PurifiedSourceExport {
232                external_reference: reference.external_reference,
233                details: PurifiedExportDetails::SqlServer {
234                    table: reference.meta,
235                    text_columns,
236                    excl_columns,
237                    capture_instance: Arc::clone(capture_instance),
238                },
239            };
240
241            (reference.name, export)
242        })
243        .collect();
244
245    Ok(PurifiedSourceExports {
246        source_exports: exports,
247        normalized_text_columns,
248        normalized_excl_columns,
249    })
250}
251
252pub fn generate_create_subsource_statements(
253    scx: &StatementContext,
254    source_name: ResolvedItemName,
255    requested_subsources: BTreeMap<UnresolvedItemName, PurifiedSourceExport>,
256) -> Result<Vec<CreateSubsourceStatement<Aug>>, PlanError> {
257    let mut subsources = Vec::with_capacity(requested_subsources.len());
258
259    for (subsource_name, purified_export) in requested_subsources {
260        let SqlServerExportStatementValues {
261            columns,
262            text_columns,
263            excl_columns,
264            details,
265            external_reference,
266        } = generate_source_export_statement_values(scx, purified_export)?;
267
268        let mut with_options = vec![
269            CreateSubsourceOption {
270                name: CreateSubsourceOptionName::ExternalReference,
271                value: Some(WithOptionValue::UnresolvedItemName(external_reference)),
272            },
273            CreateSubsourceOption {
274                name: CreateSubsourceOptionName::Details,
275                value: Some(WithOptionValue::Value(Value::String(hex::encode(
276                    details.into_proto().encode_to_vec(),
277                )))),
278            },
279        ];
280
281        if let Some(text_columns) = text_columns {
282            with_options.push(CreateSubsourceOption {
283                name: CreateSubsourceOptionName::TextColumns,
284                value: Some(WithOptionValue::Sequence(text_columns)),
285            });
286        }
287        if let Some(excl_columns) = excl_columns {
288            with_options.push(CreateSubsourceOption {
289                name: CreateSubsourceOptionName::ExcludeColumns,
290                value: Some(WithOptionValue::Sequence(excl_columns)),
291            });
292        }
293
294        // Create the subsource statement
295        let subsource = CreateSubsourceStatement {
296            name: subsource_name,
297            columns,
298            of_source: Some(source_name.clone()),
299            // TODO(sql_server2): Support contraints from the upstream SQL Server instasnce.
300            constraints: vec![],
301            if_not_exists: false,
302            with_options,
303        };
304        subsources.push(subsource);
305    }
306
307    Ok(subsources)
308}
309
310struct SqlServerExportStatementValues {
311    pub columns: Vec<ColumnDef<Aug>>,
312    pub text_columns: Option<Vec<WithOptionValue<Aug>>>,
313    pub excl_columns: Option<Vec<WithOptionValue<Aug>>>,
314    pub details: SourceExportStatementDetails,
315    pub external_reference: UnresolvedItemName,
316}
317
318fn generate_source_export_statement_values(
319    scx: &StatementContext,
320    purified_export: PurifiedSourceExport,
321) -> Result<SqlServerExportStatementValues, PlanError> {
322    let PurifiedExportDetails::SqlServer {
323        table,
324        text_columns,
325        excl_columns,
326        capture_instance,
327    } = purified_export.details
328    else {
329        unreachable!("purified export details must be SQL Server")
330    };
331
332    // Filter out columns that the user wanted to exclude.
333    let included_columns = table
334        .columns
335        .iter()
336        .filter_map(|c| c.column_type.as_ref().map(|ct| (c.name.as_ref(), ct)));
337    let mut column_defs = vec![];
338
339    for (col_name, col_type) in included_columns {
340        let name = Ident::new(col_name)?;
341        let ty = mz_pgrepr::Type::from(&col_type.scalar_type);
342        let data_type = scx.resolve_type(ty)?;
343        let mut col_options = vec![];
344
345        if !col_type.nullable {
346            col_options.push(mz_sql_parser::ast::ColumnOptionDef {
347                name: None,
348                option: mz_sql_parser::ast::ColumnOption::NotNull,
349            });
350        }
351        column_defs.push(ColumnDef {
352            name,
353            data_type,
354            collation: None,
355            options: col_options,
356        });
357    }
358
359    // TODO(sql_server2): Support table contraints like primary key or unique.
360
361    let details = SourceExportStatementDetails::SqlServer {
362        table,
363        capture_instance,
364    };
365    let text_columns = text_columns.map(|mut columns| {
366        columns.sort();
367        columns
368            .into_iter()
369            .map(WithOptionValue::Ident::<Aug>)
370            .collect()
371    });
372    let excl_columns = excl_columns.map(|mut columns| {
373        columns.sort();
374        columns
375            .into_iter()
376            .map(WithOptionValue::Ident::<Aug>)
377            .collect()
378    });
379
380    let values = SqlServerExportStatementValues {
381        columns: column_defs,
382        text_columns,
383        excl_columns,
384        details,
385        external_reference: purified_export.external_reference,
386    };
387    Ok(values)
388}
389
390/// Create a [`BTreeMap`] of [`SqlServerQualifiedTableName`] to a [`BTreeSet`] of column names.
391fn map_column_refs<'a>(
392    cols: &'a [UnresolvedItemName],
393    option_type: SqlServerConfigOptionName,
394) -> Result<BTreeMap<SqlServerQualifiedTableName, BTreeSet<&'a str>>, PlanError> {
395    let mut table_to_cols: BTreeMap<_, BTreeSet<_>> = BTreeMap::new();
396    for name in cols.iter() {
397        // We only support fully qualified references for now (e.g. `schema_name.table_name.column_name`)
398        if name.0.len() == 3 {
399            let key = mz_sql_server_util::desc::SqlServerQualifiedTableName {
400                schema_name: name.0[0].as_str().into(),
401                table_name: name.0[1].as_str().into(),
402            };
403            let new = table_to_cols
404                .entry(key)
405                .or_default()
406                .insert(name.0[2].as_str());
407            if !new {
408                return Err(PlanError::InvalidOptionValue {
409                    option_name: option_type.to_ast_string_simple(),
410                    err: Box::new(PlanError::UnexpectedDuplicateReference { name: name.clone() }),
411                });
412            }
413        } else {
414            return Err(PlanError::InvalidOptionValue {
415                option_name: option_type.to_ast_string_simple(),
416                err: Box::new(PlanError::UnderqualifiedColumnName(name.to_string())),
417            });
418        }
419    }
420    Ok(table_to_cols)
421}
422
423/// Normalize the provided `cols` references to a sorted and deduplicated list of
424/// [`WithOptionValue`]s.
425///
426/// Returns an error if any column reference in `cols` is not part of a table in the
427/// provided `tables`.
428fn normalize_column_refs(
429    cols: &[UnresolvedItemName],
430    reference_resolver: &SourceReferenceResolver,
431    tables: &[RequestedSourceExport<SqlServerTableDesc>],
432    option_name: SqlServerConfigOptionName,
433) -> Result<Vec<WithOptionValue<Aug>>, SqlServerSourcePurificationError> {
434    let (seq, unknown): (Vec<_>, Vec<_>) = cols.into_iter().partition(|name| {
435        let (column_name, qual) = name.0.split_last().expect("non-empty");
436        match reference_resolver.resolve_idx(qual) {
437            // TODO(sql_server3): This needs to also introduce the maximum qualification
438            // on  the columns, i.e. ensure they have the schema name.
439            Ok(idx) => tables[idx]
440                .meta
441                .columns
442                .iter()
443                .any(|n| &*n.name == column_name.as_str()),
444            Err(_) => false,
445        }
446    });
447
448    if !unknown.is_empty() {
449        return Err(SqlServerSourcePurificationError::DanglingColumns {
450            option_name: option_name.to_string(),
451            items: unknown.into_iter().cloned().collect(),
452        });
453    }
454
455    let mut seq: Vec<_> = seq
456        .into_iter()
457        .cloned()
458        .map(WithOptionValue::UnresolvedItemName)
459        .collect();
460
461    seq.sort();
462    seq.dedup();
463    Ok(seq)
464}