Skip to main content

mz_sql/
normalize.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! SQL normalization routines.
11//!
12//! Normalization is the process of taking relatively unstructured types from
13//! the [`ast`] module and converting them to more structured types.
14//!
15//! [`ast`]: crate::ast
16
17use std::fmt;
18
19use itertools::Itertools;
20use mz_repr::{ColumnName, GlobalId};
21use mz_sql_parser::ast::display::AstDisplay;
22use mz_sql_parser::ast::visit_mut::{self, VisitMut};
23use mz_sql_parser::ast::{
24    ContinualTaskStmt, CreateConnectionStatement, CreateContinualTaskStatement,
25    CreateContinualTaskSugar, CreateIndexStatement, CreateMaterializedViewStatement,
26    CreateSecretStatement, CreateSinkStatement, CreateSourceStatement, CreateSubsourceStatement,
27    CreateTableFromSourceStatement, CreateTableStatement, CreateTypeStatement, CreateViewStatement,
28    CreateWebhookSourceStatement, CteBlock, Function, FunctionArgs, Ident, IfExistsBehavior,
29    MutRecBlock, Op, Query, Statement, TableFactor, TableFromSourceColumns, UnresolvedItemName,
30    UnresolvedSchemaName, Value, ViewDefinition,
31};
32
33use crate::names::{Aug, FullItemName, PartialItemName, PartialSchemaName, RawDatabaseSpecifier};
34use crate::plan::error::PlanError;
35use crate::plan::statement::StatementContext;
36
37/// Normalizes a single identifier.
38pub fn ident(ident: Ident) -> String {
39    ident.into_string()
40}
41
42/// Normalizes a single identifier.
43pub fn ident_ref(ident: &Ident) -> &str {
44    ident.as_str()
45}
46
47/// Normalizes an identifier that represents a column name.
48pub fn column_name(id: Ident) -> ColumnName {
49    ColumnName::from(ident(id))
50}
51
52/// Normalizes an unresolved object name.
53pub fn unresolved_item_name(mut name: UnresolvedItemName) -> Result<PartialItemName, PlanError> {
54    if name.0.len() < 1 || name.0.len() > 3 {
55        return Err(PlanError::MisqualifiedName(name.to_string()));
56    }
57    let out = PartialItemName {
58        item: ident(
59            name.0
60                .pop()
61                .expect("name checked to have at least one component"),
62        ),
63        schema: name.0.pop().map(ident),
64        database: name.0.pop().map(ident),
65    };
66    assert!(name.0.is_empty());
67    Ok(out)
68}
69
70/// Normalizes an unresolved schema name.
71pub fn unresolved_schema_name(
72    mut name: UnresolvedSchemaName,
73) -> Result<PartialSchemaName, PlanError> {
74    if name.0.len() < 1 || name.0.len() > 2 {
75        return Err(PlanError::MisqualifiedName(name.to_string()));
76    }
77    let out = PartialSchemaName {
78        schema: ident(
79            name.0
80                .pop()
81                .expect("name checked to have at least one component"),
82        ),
83        database: name.0.pop().map(ident),
84    };
85    assert!(name.0.is_empty());
86    Ok(out)
87}
88
89/// Normalizes an operator reference.
90///
91/// Qualified operators outside of the pg_catalog schema are rejected.
92pub fn op(op: &Op) -> Result<&str, PlanError> {
93    if let Some(namespace) = &op.namespace {
94        if namespace.len() != 0
95            && (namespace.len() != 1
96                || namespace[0].as_str() != mz_repr::namespaces::PG_CATALOG_SCHEMA)
97        {
98            sql_bail!(
99                "operator does not exist: {}.{}",
100                namespace.iter().map(|n| n.to_string()).join("."),
101                op.op,
102            )
103        }
104    }
105    Ok(&op.op)
106}
107
108#[derive(Debug, Clone)]
109pub enum SqlValueOrSecret {
110    Value(Value),
111    Secret(GlobalId),
112}
113
114impl fmt::Display for SqlValueOrSecret {
115    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116        match self {
117            SqlValueOrSecret::Value(v) => write!(f, "{}", v),
118            SqlValueOrSecret::Secret(id) => write!(f, "{}", id),
119        }
120    }
121}
122
123impl From<SqlValueOrSecret> for Option<Value> {
124    fn from(s: SqlValueOrSecret) -> Self {
125        match s {
126            SqlValueOrSecret::Value(v) => Some(v),
127            SqlValueOrSecret::Secret(_id) => None,
128        }
129    }
130}
131
132/// Unnormalizes an item name.
133///
134/// This is the inverse of the [`unresolved_item_name`] function.
135pub fn unresolve(name: FullItemName) -> UnresolvedItemName {
136    // TODO(parkmycar): Refactor FullItemName to use `Ident`.
137    let mut out = vec![];
138    if let RawDatabaseSpecifier::Name(n) = name.database {
139        out.push(Ident::new_unchecked(n));
140    }
141    out.push(Ident::new_unchecked(name.schema));
142    out.push(Ident::new_unchecked(name.item));
143    UnresolvedItemName(out)
144}
145
146/// Converts an `UnresolvedItemName` to a `FullItemName` if the
147/// `UnresolvedItemName` is fully specified. Otherwise returns an error.
148pub fn full_name(mut raw_name: UnresolvedItemName) -> Result<FullItemName, PlanError> {
149    match raw_name.0.len() {
150        3 => Ok(FullItemName {
151            item: ident(raw_name.0.pop().unwrap()),
152            schema: ident(raw_name.0.pop().unwrap()),
153            database: RawDatabaseSpecifier::Name(ident(raw_name.0.pop().unwrap())),
154        }),
155        2 => Ok(FullItemName {
156            item: ident(raw_name.0.pop().unwrap()),
157            schema: ident(raw_name.0.pop().unwrap()),
158            database: RawDatabaseSpecifier::Ambient,
159        }),
160        _ => sql_bail!("unresolved name {} not fully qualified", raw_name),
161    }
162}
163
164/// Normalizes a `CREATE` statement.
165///
166/// The resulting statement will not depend upon any session parameters, nor
167/// specify any non-default options (like `TEMPORARY`, `IF NOT EXISTS`, etc).
168///
169/// The goal is to construct a backwards-compatible description of the item.
170/// SQL is the most stable part of Materialize, so SQL is used to describe the
171/// items that are persisted in the catalog.
172pub fn create_statement(
173    scx: &StatementContext,
174    mut stmt: Statement<Aug>,
175) -> Result<String, PlanError> {
176    let allocate_name = |name: &UnresolvedItemName| -> Result<_, PlanError> {
177        Ok(unresolve(
178            scx.allocate_full_name(unresolved_item_name(name.clone())?)?,
179        ))
180    };
181
182    let allocate_temporary_name = |name: &UnresolvedItemName| -> Result<_, PlanError> {
183        Ok(unresolve(scx.allocate_temporary_full_name(
184            unresolved_item_name(name.clone())?,
185        )))
186    };
187
188    struct QueryNormalizer {
189        ctes: Vec<Ident>,
190        err: Option<PlanError>,
191    }
192
193    impl QueryNormalizer {
194        fn new() -> QueryNormalizer {
195            QueryNormalizer {
196                ctes: vec![],
197                err: None,
198            }
199        }
200    }
201
202    impl<'ast> VisitMut<'ast, Aug> for QueryNormalizer {
203        fn visit_query_mut(&mut self, query: &'ast mut Query<Aug>) {
204            let n = self.ctes.len();
205            match &query.ctes {
206                CteBlock::Simple(ctes) => {
207                    for cte in ctes.iter() {
208                        self.ctes.push(cte.alias.name.clone());
209                    }
210                }
211                CteBlock::MutuallyRecursive(MutRecBlock { options: _, ctes }) => {
212                    for cte in ctes.iter() {
213                        self.ctes.push(cte.name.clone());
214                    }
215                }
216            }
217            visit_mut::visit_query_mut(self, query);
218            self.ctes.truncate(n);
219        }
220
221        fn visit_function_mut(&mut self, func: &'ast mut Function<Aug>) {
222            match &mut func.args {
223                FunctionArgs::Star => (),
224                FunctionArgs::Args { args, order_by } => {
225                    for arg in args {
226                        self.visit_expr_mut(arg);
227                    }
228                    for expr in order_by {
229                        self.visit_order_by_expr_mut(expr);
230                    }
231                }
232            }
233            if let Some(over) = &mut func.over {
234                self.visit_window_spec_mut(over);
235            }
236        }
237
238        fn visit_table_factor_mut(&mut self, table_factor: &'ast mut TableFactor<Aug>) {
239            match table_factor {
240                TableFactor::Table { name, alias, .. } => {
241                    self.visit_item_name_mut(name);
242                    if let Some(alias) = alias {
243                        self.visit_table_alias_mut(alias);
244                    }
245                }
246                // We only need special behavior for `TableFactor::Table`.
247                // Just visit the other types of table factors like normal.
248                _ => visit_mut::visit_table_factor_mut(self, table_factor),
249            }
250        }
251    }
252
253    // Think very hard before changing any of the branches in this match
254    // statement. All identifiers must be quoted. All object names must be
255    // allocated or resolved, depending on whether they are the object created
256    // by the statement or an object depended upon by the statement. All
257    // non-default options must be disabled.
258    //
259    // Wildcard matches are explicitly avoided so that future additions to the
260    // syntax cause compile errors here. Before you ignore a new field, triple
261    // check that it does not need to be normalized according to the rules
262    // above.
263    match &mut stmt {
264        Statement::CreateSource(CreateSourceStatement {
265            name,
266            in_cluster: _,
267            col_names: _,
268            connection: _,
269            format: _,
270            include_metadata: _,
271            envelope: _,
272            if_not_exists,
273            key_constraint: _,
274            with_options: _,
275            external_references: _,
276            progress_subsource: _,
277        }) => {
278            *name = allocate_name(name)?;
279            *if_not_exists = false;
280        }
281
282        Statement::CreateSubsource(CreateSubsourceStatement {
283            name,
284            columns,
285            constraints: _,
286            of_source: _,
287            if_not_exists,
288            with_options: _,
289        }) => {
290            *name = allocate_name(name)?;
291            let mut normalizer = QueryNormalizer::new();
292            for c in columns {
293                normalizer.visit_column_def_mut(c);
294            }
295            if let Some(err) = normalizer.err {
296                return Err(err);
297            }
298            *if_not_exists = false;
299        }
300
301        Statement::CreateTableFromSource(CreateTableFromSourceStatement {
302            name,
303            columns,
304            constraints: _,
305            external_reference: _,
306            source: _,
307            if_not_exists,
308            format: _,
309            include_metadata: _,
310            envelope: _,
311            with_options: _,
312        }) => {
313            *name = allocate_name(name)?;
314            let mut normalizer = QueryNormalizer::new();
315            if let TableFromSourceColumns::Defined(columns) = columns {
316                for c in columns {
317                    normalizer.visit_column_def_mut(c);
318                }
319            }
320            if let Some(err) = normalizer.err {
321                return Err(err);
322            }
323            *if_not_exists = false;
324        }
325
326        Statement::CreateTable(CreateTableStatement {
327            name,
328            columns,
329            constraints: _,
330            if_not_exists,
331            temporary,
332            with_options: _,
333        }) => {
334            *name = if *temporary {
335                allocate_temporary_name(name)?
336            } else {
337                allocate_name(name)?
338            };
339            let mut normalizer = QueryNormalizer::new();
340            for c in columns {
341                normalizer.visit_column_def_mut(c);
342            }
343            if let Some(err) = normalizer.err {
344                return Err(err);
345            }
346            *if_not_exists = false;
347        }
348
349        Statement::CreateWebhookSource(CreateWebhookSourceStatement {
350            name,
351            is_table: _,
352            if_not_exists,
353            include_headers: _,
354            body_format: _,
355            validate_using: _,
356            in_cluster: _,
357        }) => {
358            *name = allocate_name(name)?;
359            *if_not_exists = false;
360        }
361
362        Statement::CreateSink(CreateSinkStatement {
363            name,
364            in_cluster: _,
365            connection: _,
366            format: _,
367            envelope: _,
368            if_not_exists,
369            ..
370        }) => {
371            if let Some(name) = name {
372                *name = allocate_name(name)?;
373            }
374            *if_not_exists = false;
375        }
376
377        Statement::CreateView(CreateViewStatement {
378            temporary,
379            if_exists,
380            definition:
381                ViewDefinition {
382                    name,
383                    query,
384                    columns: _,
385                },
386        }) => {
387            *name = if *temporary {
388                allocate_temporary_name(name)?
389            } else {
390                allocate_name(name)?
391            };
392            {
393                let mut normalizer = QueryNormalizer::new();
394                normalizer.visit_query_mut(query);
395                if let Some(err) = normalizer.err {
396                    return Err(err);
397                }
398            }
399            *if_exists = IfExistsBehavior::Error;
400        }
401
402        Statement::CreateMaterializedView(CreateMaterializedViewStatement {
403            if_exists,
404            name,
405            columns: _,
406            replacement_for: _,
407            in_cluster: _,
408            in_cluster_replica: _,
409            query,
410            with_options: _,
411            as_of: _,
412        }) => {
413            *name = allocate_name(name)?;
414            {
415                let mut normalizer = QueryNormalizer::new();
416                normalizer.visit_query_mut(query);
417                if let Some(err) = normalizer.err {
418                    return Err(err);
419                }
420            }
421            *if_exists = IfExistsBehavior::Error;
422        }
423
424        Statement::CreateContinualTask(CreateContinualTaskStatement {
425            name,
426            columns: _,
427            input,
428            with_options: _,
429            stmts,
430            in_cluster: _,
431            as_of: _,
432            sugar,
433        }) => {
434            let mut normalizer = QueryNormalizer::new();
435            normalizer.visit_item_name_mut(name);
436            normalizer.visit_item_name_mut(input);
437            for stmt in stmts {
438                match stmt {
439                    ContinualTaskStmt::Delete(stmt) => normalizer.visit_delete_statement_mut(stmt),
440                    ContinualTaskStmt::Insert(stmt) => normalizer.visit_insert_statement_mut(stmt),
441                }
442            }
443            match sugar {
444                Some(CreateContinualTaskSugar::Transform { transform }) => {
445                    normalizer.visit_query_mut(transform)
446                }
447                Some(CreateContinualTaskSugar::Retain { retain }) => {
448                    normalizer.visit_expr_mut(retain)
449                }
450                None => {}
451            }
452            if let Some(err) = normalizer.err {
453                return Err(err);
454            }
455        }
456
457        Statement::CreateIndex(CreateIndexStatement {
458            name: _,
459            in_cluster: _,
460            key_parts,
461            with_options: _,
462            if_not_exists,
463            ..
464        }) => {
465            let mut normalizer = QueryNormalizer::new();
466            if let Some(key_parts) = key_parts {
467                for key_part in key_parts {
468                    normalizer.visit_expr_mut(key_part);
469                    if let Some(err) = normalizer.err {
470                        return Err(err);
471                    }
472                }
473            }
474            *if_not_exists = false;
475        }
476
477        Statement::CreateType(CreateTypeStatement { name, as_type }) => {
478            *name = allocate_name(name)?;
479            let mut normalizer = QueryNormalizer::new();
480            normalizer.visit_create_type_as_mut(as_type);
481            if let Some(err) = normalizer.err {
482                return Err(err);
483            }
484        }
485        Statement::CreateSecret(CreateSecretStatement {
486            name,
487            if_not_exists,
488            value: _,
489        }) => {
490            *name = allocate_name(name)?;
491            *if_not_exists = false;
492        }
493        Statement::CreateConnection(CreateConnectionStatement {
494            name,
495            connection_type: _,
496            values,
497            with_options,
498            if_not_exists,
499        }) => {
500            *name = allocate_name(name)?;
501            *if_not_exists = false;
502
503            values.sort();
504
505            // Validation only occurs once during planning and should not be
506            // considered part of the statement's AST/canonical representation.
507            with_options
508                .retain(|o| o.name != mz_sql_parser::ast::CreateConnectionOptionName::Validate);
509        }
510
511        _ => unreachable!(),
512    }
513
514    Ok(stmt.to_ast_string_stable())
515}
516
517/// Generates a struct capable of taking a `Vec` of types commonly used to
518/// represent `WITH` options into useful data types, such as strings.
519/// Additionally, it is able to convert the useful data types back to the `Vec`
520/// of options.
521///
522/// # Parameters
523/// - `$option_ty`: Accepts a struct representing a set of `WITH` options, which
524///     must contain the fields `name` and `value`.
525///     - `name` must be of type `$option_tyName`, e.g. if `$option_ty` is
526///       `FooOption`, then `name` must be of type `FooOptionName`.
527///       `$option_tyName` must be an enum representing `WITH` option keys.
528///     - `TryFromValue<value>` must be implemented for the type you want to
529///       take the option to. The `sql::plan::with_option` module contains these
530///       implementations.
531/// - `$option_name` must be an element of `$option_tyName`
532/// - `$t` is the type you want to convert the option's value to. If the
533///   option's value is absent (i.e. the user only entered the option's key),
534///   you can also define a default value.
535/// - `Default($v)` is an optional parameter that sets the default value of the
536///   field to `$v`. `$v` must be convertible to `$t` using `.into`. This also
537///   converts the struct's type from `Option<$t>` to `<$t>`.
538/// - `AllowMultiple` is an optional parameter that, when specified, allows
539///   the given option to appear multiple times in the `WITH` clause. This
540///   also converts the struct's type from `$t` to `Vec<$t>`.
541macro_rules! generate_extracted_config {
542    // No default specified, have remaining options.
543    (
544        $option_ty:ty, [$($processed:tt)*],
545        ($option_name:path, $t:ty), $($tail:tt),*
546    ) => {
547        generate_extracted_config!(
548            $option_ty,
549            [$($processed)* ($option_name, Option::<$t>, None, false)],
550            $($tail),*
551        );
552    };
553    // No default specified, no remaining options.
554    (
555        $option_ty:ty, [$($processed:tt)*],
556        ($option_name:path, $t:ty)
557    ) => {
558        generate_extracted_config!(
559            $option_ty,
560            [$($processed)* ($option_name, Option::<$t>, None, false)]
561        );
562    };
563    // Default specified, have remaining options.
564    (
565        $option_ty:ty, [$($processed:tt)*],
566        ($option_name:path, $t:ty, Default($v:expr)), $($tail:tt),*
567    ) => {
568        generate_extracted_config!(
569            $option_ty,
570            [$($processed)* ($option_name, $t, $v, false)],
571            $($tail),*
572        );
573    };
574    // Default specified, no remaining options.
575    (
576        $option_ty:ty, [$($processed:tt)*],
577        ($option_name:path, $t:ty, Default($v:expr))
578    ) => {
579        generate_extracted_config!(
580            $option_ty,
581            [$($processed)* ($option_name, $t, $v, false)]
582        );
583    };
584    // AllowMultiple specified, have remaining options.
585    (
586        $option_ty:ty, [$($processed:tt)*],
587        ($option_name:path, $t:ty, AllowMultiple), $($tail:tt),*
588    ) => {
589        generate_extracted_config!(
590            $option_ty,
591            [$($processed)* ($option_name, $t, vec![], true)],
592            $($tail),*
593        );
594    };
595    // AllowMultiple specified, no remaining options.
596    (
597        $option_ty:ty, [$($processed:tt)*],
598        ($option_name:path, $t:ty, AllowMultiple)
599    ) => {
600        generate_extracted_config!(
601            $option_ty,
602            [$($processed)* ($option_name, $t, vec![], true)]
603        );
604    };
605    ($option_ty:ty, [$(($option_name:path, $t:ty, $v:expr, $allow_multiple:literal))+]) => {
606        paste::paste! {
607            #[derive(Debug)]
608            pub struct [<$option_ty Extracted>] {
609                pub(crate) seen: ::std::collections::BTreeSet::<[<$option_ty Name>]>,
610                $(
611                    pub [<$option_name:snake>]: generate_extracted_config!(
612                        @ifty $allow_multiple,
613                        Vec::<$t>,
614                        $t
615                    ),
616                )*
617            }
618
619            impl std::default::Default for [<$option_ty Extracted>] {
620                fn default() -> Self {
621                    [<$option_ty Extracted>] {
622                        seen: ::std::collections::BTreeSet::<[<$option_ty Name>]>::new(),
623                        $(
624                            [<$option_name:snake>]: <generate_extracted_config!(
625                                @ifty $allow_multiple,
626                                Vec::<$t>,
627                                $t
628                            )>::from($v),
629                        )*
630                    }
631                }
632            }
633
634            impl std::convert::TryFrom<Vec<$option_ty<Aug>>>
635                for [<$option_ty Extracted>]
636            {
637                type Error = $crate::plan::PlanError;
638                fn try_from(
639                    v: Vec<$option_ty<Aug>>,
640                ) -> Result<[<$option_ty Extracted>], Self::Error> {
641                    use [<$option_ty Name>]::*;
642                    let mut extracted = [<$option_ty Extracted>]::default();
643                    for option in v {
644                        match option.name {
645                            $(
646                                $option_name => {
647                                    if !$allow_multiple
648                                        && !extracted.seen.insert(option.name.clone())
649                                    {
650                                        sql_bail!(
651                                            "{} specified more than once",
652                                            option.name.to_ast_string_simple(),
653                                        );
654                                    }
655                                    let val: $t = $crate::plan::with_options
656                                        ::TryFromValue::try_from_value(option.value)
657                                        .map_err(|e| sql_err!(
658                                            "invalid {}: {}",
659                                            option.name.to_ast_string_simple(),
660                                            e,
661                                        ))?;
662                                    generate_extracted_config!(
663                                        @ifexpr $allow_multiple,
664                                        extracted.[<$option_name:snake>].push(val),
665                                        extracted.[<$option_name:snake>] = val
666                                    );
667                                }
668                            )*
669                        }
670                    }
671                    Ok(extracted)
672                }
673            }
674
675            impl [<$option_ty Extracted>] {
676                #[allow(unused)]
677                fn into_values(
678                    self,
679                    catalog: &dyn crate::catalog::SessionCatalog,
680                ) -> Vec<$option_ty<Aug>> {
681                    use [<$option_ty Name>]::*;
682                    let mut options = Vec::new();
683                    $(
684                        let value = self.[<$option_name:snake>];
685                        let values: Vec<_> = generate_extracted_config!(
686                            @ifexpr $allow_multiple,
687                            value,
688                            Vec::from([value])
689                        );
690                        for value in values {
691                            // If `try_into_value` returns `None`, then there was no option that
692                            // generated this value. For example, this can happen when `value` is
693                            // `None`.
694                            let maybe_value = <$t as $crate::plan::with_options::TryFromValue<
695                                Option<mz_sql_parser::ast::WithOptionValue<$crate::names::Aug>>
696                            >>::try_into_value(value, catalog);
697                            match maybe_value {
698                                Some(value) => {
699                                    let option = $option_ty {name: $option_name, value};
700                                    options.push(option);
701                                },
702                                None => (),
703                            }
704                        }
705                    )*
706                    options
707                }
708            }
709        }
710    };
711    ($option_ty:ty, $($h:tt),+) => {
712        generate_extracted_config!{$option_ty, [], $($h),+}
713    };
714    // Helper `if` constructs to conditionally generate expressions and types
715    // based on the value of $allow_multiple.
716    (@ifexpr false, $lhs:expr, $rhs:expr) => {
717        $rhs
718    };
719    (@ifexpr true, $lhs:expr, $rhs:expr) => {
720        $lhs
721    };
722    (@ifty false, $lhs:ty, $rhs:ty) => {
723        $rhs
724    };
725    (@ifty true, $lhs:ty, $rhs:ty) => {
726        $lhs
727    };
728}
729
730pub(crate) use generate_extracted_config;