mz_sql/
normalize.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! SQL normalization routines.
11//!
12//! Normalization is the process of taking relatively unstructured types from
13//! the [`ast`] module and converting them to more structured types.
14//!
15//! [`ast`]: crate::ast
16
17use std::fmt;
18
19use itertools::Itertools;
20use mz_repr::{ColumnName, GlobalId};
21use mz_sql_parser::ast::display::AstDisplay;
22use mz_sql_parser::ast::visit_mut::{self, VisitMut};
23use mz_sql_parser::ast::{
24    ContinualTaskStmt, CreateConnectionStatement, CreateContinualTaskStatement,
25    CreateContinualTaskSugar, CreateIndexStatement, CreateMaterializedViewStatement,
26    CreateSecretStatement, CreateSinkStatement, CreateSourceStatement, CreateSubsourceStatement,
27    CreateTableFromSourceStatement, CreateTableStatement, CreateTypeStatement, CreateViewStatement,
28    CreateWebhookSourceStatement, CteBlock, Function, FunctionArgs, Ident, IfExistsBehavior,
29    MutRecBlock, Op, Query, Statement, TableFactor, TableFromSourceColumns, UnresolvedItemName,
30    UnresolvedSchemaName, Value, ViewDefinition,
31};
32
33use crate::names::{Aug, FullItemName, PartialItemName, PartialSchemaName, RawDatabaseSpecifier};
34use crate::plan::error::PlanError;
35use crate::plan::statement::StatementContext;
36
37/// Normalizes a single identifier.
38pub fn ident(ident: Ident) -> String {
39    ident.into_string()
40}
41
42/// Normalizes a single identifier.
43pub fn ident_ref(ident: &Ident) -> &str {
44    ident.as_str()
45}
46
47/// Normalizes an identifier that represents a column name.
48pub fn column_name(id: Ident) -> ColumnName {
49    ColumnName::from(ident(id))
50}
51
52/// Normalizes an unresolved object name.
53pub fn unresolved_item_name(mut name: UnresolvedItemName) -> Result<PartialItemName, PlanError> {
54    if name.0.len() < 1 || name.0.len() > 3 {
55        return Err(PlanError::MisqualifiedName(name.to_string()));
56    }
57    let out = PartialItemName {
58        item: ident(
59            name.0
60                .pop()
61                .expect("name checked to have at least one component"),
62        ),
63        schema: name.0.pop().map(ident),
64        database: name.0.pop().map(ident),
65    };
66    assert!(name.0.is_empty());
67    Ok(out)
68}
69
70/// Normalizes an unresolved schema name.
71pub fn unresolved_schema_name(
72    mut name: UnresolvedSchemaName,
73) -> Result<PartialSchemaName, PlanError> {
74    if name.0.len() < 1 || name.0.len() > 2 {
75        return Err(PlanError::MisqualifiedName(name.to_string()));
76    }
77    let out = PartialSchemaName {
78        schema: ident(
79            name.0
80                .pop()
81                .expect("name checked to have at least one component"),
82        ),
83        database: name.0.pop().map(ident),
84    };
85    assert!(name.0.is_empty());
86    Ok(out)
87}
88
89/// Normalizes an operator reference.
90///
91/// Qualified operators outside of the pg_catalog schema are rejected.
92pub fn op(op: &Op) -> Result<&str, PlanError> {
93    if let Some(namespace) = &op.namespace {
94        if namespace.len() != 0
95            && (namespace.len() != 1
96                || namespace[0].as_str() != mz_repr::namespaces::PG_CATALOG_SCHEMA)
97        {
98            sql_bail!(
99                "operator does not exist: {}.{}",
100                namespace.iter().map(|n| n.to_string()).join("."),
101                op.op,
102            )
103        }
104    }
105    Ok(&op.op)
106}
107
108#[derive(Debug, Clone)]
109pub enum SqlValueOrSecret {
110    Value(Value),
111    Secret(GlobalId),
112}
113
114impl fmt::Display for SqlValueOrSecret {
115    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116        match self {
117            SqlValueOrSecret::Value(v) => write!(f, "{}", v),
118            SqlValueOrSecret::Secret(id) => write!(f, "{}", id),
119        }
120    }
121}
122
123impl From<SqlValueOrSecret> for Option<Value> {
124    fn from(s: SqlValueOrSecret) -> Self {
125        match s {
126            SqlValueOrSecret::Value(v) => Some(v),
127            SqlValueOrSecret::Secret(_id) => None,
128        }
129    }
130}
131
132/// Unnormalizes an item name.
133///
134/// This is the inverse of the [`unresolved_item_name`] function.
135pub fn unresolve(name: FullItemName) -> UnresolvedItemName {
136    // TODO(parkmycar): Refactor FullItemName to use `Ident`.
137    let mut out = vec![];
138    if let RawDatabaseSpecifier::Name(n) = name.database {
139        out.push(Ident::new_unchecked(n));
140    }
141    out.push(Ident::new_unchecked(name.schema));
142    out.push(Ident::new_unchecked(name.item));
143    UnresolvedItemName(out)
144}
145
146/// Converts an `UnresolvedItemName` to a `FullItemName` if the
147/// `UnresolvedItemName` is fully specified. Otherwise returns an error.
148pub fn full_name(mut raw_name: UnresolvedItemName) -> Result<FullItemName, PlanError> {
149    match raw_name.0.len() {
150        3 => Ok(FullItemName {
151            item: ident(raw_name.0.pop().unwrap()),
152            schema: ident(raw_name.0.pop().unwrap()),
153            database: RawDatabaseSpecifier::Name(ident(raw_name.0.pop().unwrap())),
154        }),
155        2 => Ok(FullItemName {
156            item: ident(raw_name.0.pop().unwrap()),
157            schema: ident(raw_name.0.pop().unwrap()),
158            database: RawDatabaseSpecifier::Ambient,
159        }),
160        _ => sql_bail!("unresolved name {} not fully qualified", raw_name),
161    }
162}
163
164/// Normalizes a `CREATE` statement.
165///
166/// The resulting statement will not depend upon any session parameters, nor
167/// specify any non-default options (like `TEMPORARY`, `IF NOT EXISTS`, etc).
168///
169/// The goal is to construct a backwards-compatible description of the item.
170/// SQL is the most stable part of Materialize, so SQL is used to describe the
171/// items that are persisted in the catalog.
172pub fn create_statement(
173    scx: &StatementContext,
174    mut stmt: Statement<Aug>,
175) -> Result<String, PlanError> {
176    let allocate_name = |name: &UnresolvedItemName| -> Result<_, PlanError> {
177        Ok(unresolve(
178            scx.allocate_full_name(unresolved_item_name(name.clone())?)?,
179        ))
180    };
181
182    let allocate_temporary_name = |name: &UnresolvedItemName| -> Result<_, PlanError> {
183        Ok(unresolve(scx.allocate_temporary_full_name(
184            unresolved_item_name(name.clone())?,
185        )))
186    };
187
188    struct QueryNormalizer {
189        ctes: Vec<Ident>,
190        err: Option<PlanError>,
191    }
192
193    impl QueryNormalizer {
194        fn new() -> QueryNormalizer {
195            QueryNormalizer {
196                ctes: vec![],
197                err: None,
198            }
199        }
200    }
201
202    impl<'ast> VisitMut<'ast, Aug> for QueryNormalizer {
203        fn visit_query_mut(&mut self, query: &'ast mut Query<Aug>) {
204            let n = self.ctes.len();
205            match &query.ctes {
206                CteBlock::Simple(ctes) => {
207                    for cte in ctes.iter() {
208                        self.ctes.push(cte.alias.name.clone());
209                    }
210                }
211                CteBlock::MutuallyRecursive(MutRecBlock { options: _, ctes }) => {
212                    for cte in ctes.iter() {
213                        self.ctes.push(cte.name.clone());
214                    }
215                }
216            }
217            visit_mut::visit_query_mut(self, query);
218            self.ctes.truncate(n);
219        }
220
221        fn visit_function_mut(&mut self, func: &'ast mut Function<Aug>) {
222            match &mut func.args {
223                FunctionArgs::Star => (),
224                FunctionArgs::Args { args, order_by } => {
225                    for arg in args {
226                        self.visit_expr_mut(arg);
227                    }
228                    for expr in order_by {
229                        self.visit_order_by_expr_mut(expr);
230                    }
231                }
232            }
233            if let Some(over) = &mut func.over {
234                self.visit_window_spec_mut(over);
235            }
236        }
237
238        fn visit_table_factor_mut(&mut self, table_factor: &'ast mut TableFactor<Aug>) {
239            match table_factor {
240                TableFactor::Table { name, alias, .. } => {
241                    self.visit_item_name_mut(name);
242                    if let Some(alias) = alias {
243                        self.visit_table_alias_mut(alias);
244                    }
245                }
246                // We only need special behavior for `TableFactor::Table`.
247                // Just visit the other types of table factors like normal.
248                _ => visit_mut::visit_table_factor_mut(self, table_factor),
249            }
250        }
251    }
252
253    // Think very hard before changing any of the branches in this match
254    // statement. All identifiers must be quoted. All object names must be
255    // allocated or resolved, depending on whether they are the object created
256    // by the statement or an object depended upon by the statement. All
257    // non-default options must be disabled.
258    //
259    // Wildcard matches are explicitly avoided so that future additions to the
260    // syntax cause compile errors here. Before you ignore a new field, triple
261    // check that it does not need to be normalized according to the rules
262    // above.
263    match &mut stmt {
264        Statement::CreateSource(CreateSourceStatement {
265            name,
266            in_cluster: _,
267            col_names: _,
268            connection: _,
269            format: _,
270            include_metadata: _,
271            envelope: _,
272            if_not_exists,
273            key_constraint: _,
274            with_options: _,
275            external_references: _,
276            progress_subsource: _,
277        }) => {
278            *name = allocate_name(name)?;
279            *if_not_exists = false;
280        }
281
282        Statement::CreateSubsource(CreateSubsourceStatement {
283            name,
284            columns,
285            constraints: _,
286            of_source: _,
287            if_not_exists,
288            with_options: _,
289        }) => {
290            *name = allocate_name(name)?;
291            let mut normalizer = QueryNormalizer::new();
292            for c in columns {
293                normalizer.visit_column_def_mut(c);
294            }
295            if let Some(err) = normalizer.err {
296                return Err(err);
297            }
298            *if_not_exists = false;
299        }
300
301        Statement::CreateTableFromSource(CreateTableFromSourceStatement {
302            name,
303            columns,
304            constraints: _,
305            external_reference: _,
306            source: _,
307            if_not_exists,
308            format: _,
309            include_metadata: _,
310            envelope: _,
311            with_options: _,
312        }) => {
313            *name = allocate_name(name)?;
314            let mut normalizer = QueryNormalizer::new();
315            if let TableFromSourceColumns::Defined(columns) = columns {
316                for c in columns {
317                    normalizer.visit_column_def_mut(c);
318                }
319            }
320            if let Some(err) = normalizer.err {
321                return Err(err);
322            }
323            *if_not_exists = false;
324        }
325
326        Statement::CreateTable(CreateTableStatement {
327            name,
328            columns,
329            constraints: _,
330            if_not_exists,
331            temporary,
332            with_options: _,
333        }) => {
334            *name = if *temporary {
335                allocate_temporary_name(name)?
336            } else {
337                allocate_name(name)?
338            };
339            let mut normalizer = QueryNormalizer::new();
340            for c in columns {
341                normalizer.visit_column_def_mut(c);
342            }
343            if let Some(err) = normalizer.err {
344                return Err(err);
345            }
346            *if_not_exists = false;
347        }
348
349        Statement::CreateWebhookSource(CreateWebhookSourceStatement {
350            name,
351            is_table: _,
352            if_not_exists,
353            include_headers: _,
354            body_format: _,
355            validate_using: _,
356            in_cluster: _,
357        }) => {
358            *name = allocate_name(name)?;
359            *if_not_exists = false;
360        }
361
362        Statement::CreateSink(CreateSinkStatement {
363            name,
364            in_cluster: _,
365            connection: _,
366            format: _,
367            envelope: _,
368            if_not_exists,
369            ..
370        }) => {
371            if let Some(name) = name {
372                *name = allocate_name(name)?;
373            }
374            *if_not_exists = false;
375        }
376
377        Statement::CreateView(CreateViewStatement {
378            temporary,
379            if_exists,
380            definition:
381                ViewDefinition {
382                    name,
383                    query,
384                    columns: _,
385                },
386        }) => {
387            *name = if *temporary {
388                allocate_temporary_name(name)?
389            } else {
390                allocate_name(name)?
391            };
392            {
393                let mut normalizer = QueryNormalizer::new();
394                normalizer.visit_query_mut(query);
395                if let Some(err) = normalizer.err {
396                    return Err(err);
397                }
398            }
399            *if_exists = IfExistsBehavior::Error;
400        }
401
402        Statement::CreateMaterializedView(CreateMaterializedViewStatement {
403            if_exists,
404            name,
405            columns: _,
406            in_cluster: _,
407            query,
408            with_options: _,
409            as_of: _,
410        }) => {
411            *name = allocate_name(name)?;
412            {
413                let mut normalizer = QueryNormalizer::new();
414                normalizer.visit_query_mut(query);
415                if let Some(err) = normalizer.err {
416                    return Err(err);
417                }
418            }
419            *if_exists = IfExistsBehavior::Error;
420        }
421
422        Statement::CreateContinualTask(CreateContinualTaskStatement {
423            name,
424            columns: _,
425            input,
426            with_options: _,
427            stmts,
428            in_cluster: _,
429            as_of: _,
430            sugar,
431        }) => {
432            let mut normalizer = QueryNormalizer::new();
433            normalizer.visit_item_name_mut(name);
434            normalizer.visit_item_name_mut(input);
435            for stmt in stmts {
436                match stmt {
437                    ContinualTaskStmt::Delete(stmt) => normalizer.visit_delete_statement_mut(stmt),
438                    ContinualTaskStmt::Insert(stmt) => normalizer.visit_insert_statement_mut(stmt),
439                }
440            }
441            match sugar {
442                Some(CreateContinualTaskSugar::Transform { transform }) => {
443                    normalizer.visit_query_mut(transform)
444                }
445                Some(CreateContinualTaskSugar::Retain { retain }) => {
446                    normalizer.visit_expr_mut(retain)
447                }
448                None => {}
449            }
450            if let Some(err) = normalizer.err {
451                return Err(err);
452            }
453        }
454
455        Statement::CreateIndex(CreateIndexStatement {
456            name: _,
457            in_cluster: _,
458            key_parts,
459            with_options: _,
460            if_not_exists,
461            ..
462        }) => {
463            let mut normalizer = QueryNormalizer::new();
464            if let Some(key_parts) = key_parts {
465                for key_part in key_parts {
466                    normalizer.visit_expr_mut(key_part);
467                    if let Some(err) = normalizer.err {
468                        return Err(err);
469                    }
470                }
471            }
472            *if_not_exists = false;
473        }
474
475        Statement::CreateType(CreateTypeStatement { name, as_type }) => {
476            *name = allocate_name(name)?;
477            let mut normalizer = QueryNormalizer::new();
478            normalizer.visit_create_type_as_mut(as_type);
479            if let Some(err) = normalizer.err {
480                return Err(err);
481            }
482        }
483        Statement::CreateSecret(CreateSecretStatement {
484            name,
485            if_not_exists,
486            value: _,
487        }) => {
488            *name = allocate_name(name)?;
489            *if_not_exists = false;
490        }
491        Statement::CreateConnection(CreateConnectionStatement {
492            name,
493            connection_type: _,
494            values,
495            with_options,
496            if_not_exists,
497        }) => {
498            *name = allocate_name(name)?;
499            *if_not_exists = false;
500
501            values.sort();
502
503            // Validation only occurs once during planning and should not be
504            // considered part of the statement's AST/canonical representation.
505            with_options
506                .retain(|o| o.name != mz_sql_parser::ast::CreateConnectionOptionName::Validate);
507        }
508
509        _ => unreachable!(),
510    }
511
512    Ok(stmt.to_ast_string_stable())
513}
514
515/// Generates a struct capable of taking a `Vec` of types commonly used to
516/// represent `WITH` options into useful data types, such as strings.
517/// Additionally, it is able to convert the useful data types back to the `Vec`
518/// of options.
519///
520/// # Parameters
521/// - `$option_ty`: Accepts a struct representing a set of `WITH` options, which
522///     must contain the fields `name` and `value`.
523///     - `name` must be of type `$option_tyName`, e.g. if `$option_ty` is
524///       `FooOption`, then `name` must be of type `FooOptionName`.
525///       `$option_tyName` must be an enum representing `WITH` option keys.
526///     - `TryFromValue<value>` must be implemented for the type you want to
527///       take the option to. The `sql::plan::with_option` module contains these
528///       implementations.
529/// - `$option_name` must be an element of `$option_tyName`
530/// - `$t` is the type you want to convert the option's value to. If the
531///   option's value is absent (i.e. the user only entered the option's key),
532///   you can also define a default value.
533/// - `Default($v)` is an optional parameter that sets the default value of the
534///   field to `$v`. `$v` must be convertible to `$t` using `.into`. This also
535///   converts the struct's type from `Option<$t>` to `<$t>`.
536/// - `AllowMultiple` is an optional parameter that, when specified, allows
537///   the given option to appear multiple times in the `WITH` clause. This
538///   also converts the struct's type from `$t` to `Vec<$t>`.
539macro_rules! generate_extracted_config {
540    // No default specified, have remaining options.
541    ($option_ty:ty, [$($processed:tt)*], ($option_name:path, $t:ty), $($tail:tt),*) => {
542        generate_extracted_config!($option_ty, [$($processed)* ($option_name, Option::<$t>, None, false)], $(
543            $tail
544        ),*);
545    };
546    // No default specified, no remaining options.
547    ($option_ty:ty, [$($processed:tt)*], ($option_name:path, $t:ty)) => {
548        generate_extracted_config!($option_ty, [$($processed)* ($option_name, Option::<$t>, None, false)]);
549    };
550    // Default specified, have remaining options.
551    ($option_ty:ty, [$($processed:tt)*], ($option_name:path, $t:ty, Default($v:expr)), $($tail:tt),*) => {
552        generate_extracted_config!($option_ty, [$($processed)* ($option_name, $t, $v, false)], $(
553            $tail
554        ),*);
555    };
556    // Default specified, no remaining options.
557    ($option_ty:ty, [$($processed:tt)*], ($option_name:path, $t:ty, Default($v:expr))) => {
558        generate_extracted_config!($option_ty, [$($processed)* ($option_name, $t, $v, false)]);
559    };
560    // AllowMultiple specified, have remaining options.
561    ($option_ty:ty, [$($processed:tt)*], ($option_name:path, $t:ty, AllowMultiple), $($tail:tt),*) => {
562        generate_extracted_config!($option_ty, [$($processed)* ($option_name, $t, vec![], true)], $(
563            $tail
564        ),*);
565    };
566    // AllowMultiple specified, no remaining options.
567    ($option_ty:ty, [$($processed:tt)*], ($option_name:path, $t:ty, AllowMultiple)) => {
568        generate_extracted_config!($option_ty, [$($processed)* ($option_name, $t, vec![], true)]);
569    };
570    ($option_ty:ty, [$(($option_name:path, $t:ty, $v:expr, $allow_multiple:literal))+]) => {
571        paste::paste! {
572            #[derive(Debug)]
573            pub struct [<$option_ty Extracted>] {
574                pub(crate) seen: ::std::collections::BTreeSet::<[<$option_ty Name>]>,
575                $(
576                    pub [<$option_name:snake>]: generate_extracted_config!(
577                        @ifty $allow_multiple,
578                        Vec::<$t>,
579                        $t
580                    ),
581                )*
582            }
583
584            impl std::default::Default for [<$option_ty Extracted>] {
585                fn default() -> Self {
586                    [<$option_ty Extracted>] {
587                        seen: ::std::collections::BTreeSet::<[<$option_ty Name>]>::new(),
588                        $(
589                            [<$option_name:snake>]: <generate_extracted_config!(
590                                @ifty $allow_multiple,
591                                Vec::<$t>,
592                                $t
593                            )>::from($v),
594                        )*
595                    }
596                }
597            }
598
599            impl std::convert::TryFrom<Vec<$option_ty<Aug>>> for [<$option_ty Extracted>] {
600                type Error = $crate::plan::PlanError;
601                fn try_from(v: Vec<$option_ty<Aug>>) -> Result<[<$option_ty Extracted>], Self::Error> {
602                    use [<$option_ty Name>]::*;
603                    let mut extracted = [<$option_ty Extracted>]::default();
604                    for option in v {
605                        match option.name {
606                            $(
607                                $option_name => {
608                                    if !$allow_multiple && !extracted.seen.insert(option.name.clone()) {
609                                        sql_bail!("{} specified more than once", option.name.to_ast_string_simple());
610                                    }
611                                    let val: $t = $crate::plan::with_options::TryFromValue::try_from_value(option.value)
612                                        .map_err(|e| sql_err!("invalid {}: {}", option.name.to_ast_string_simple(), e))?;
613                                    generate_extracted_config!(
614                                        @ifexpr $allow_multiple,
615                                        extracted.[<$option_name:snake>].push(val),
616                                        extracted.[<$option_name:snake>] = val
617                                    );
618                                }
619                            )*
620                        }
621                    }
622                    Ok(extracted)
623                }
624            }
625
626            impl [<$option_ty Extracted>] {
627                #[allow(unused)]
628                fn into_values(self, catalog: &dyn crate::catalog::SessionCatalog) -> Vec<$option_ty<Aug>> {
629                    use [<$option_ty Name>]::*;
630                    let mut options = Vec::new();
631                    $(
632                        let value = self.[<$option_name:snake>];
633                        let values: Vec<_> = generate_extracted_config!(
634                            @ifexpr $allow_multiple,
635                            value,
636                            Vec::from([value])
637                        );
638                        for value in values {
639                            // If `try_into_value` returns `None`, then there was no option that
640                            // generated this value. For example, this can happen when `value` is
641                            // `None`.
642                            let maybe_value = <$t as $crate::plan::with_options::TryFromValue<
643                                Option<mz_sql_parser::ast::WithOptionValue<$crate::names::Aug>>
644                            >>::try_into_value(value, catalog);
645                            match maybe_value {
646                                Some(value) => {
647                                    let option = $option_ty {name: $option_name, value};
648                                    options.push(option);
649                                },
650                                None => (),
651                            }
652                        }
653                    )*
654                    options
655                }
656            }
657        }
658    };
659    ($option_ty:ty, $($h:tt),+) => {
660        generate_extracted_config!{$option_ty, [], $($h),+}
661    };
662    // Helper `if` constructs to conditionally generate expressions and types
663    // based on the value of $allow_multiple.
664    (@ifexpr false, $lhs:expr, $rhs:expr) => {
665        $rhs
666    };
667    (@ifexpr true, $lhs:expr, $rhs:expr) => {
668        $lhs
669    };
670    (@ifty false, $lhs:ty, $rhs:ty) => {
671        $rhs
672    };
673    (@ifty true, $lhs:ty, $rhs:ty) => {
674        $lhs
675    };
676}
677
678pub(crate) use generate_extracted_config;