Skip to main content

mz_sql/
normalize.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! SQL normalization routines.
11//!
12//! Normalization is the process of taking relatively unstructured types from
13//! the [`ast`] module and converting them to more structured types.
14//!
15//! [`ast`]: crate::ast
16
17use std::fmt;
18
19use itertools::Itertools;
20use mz_repr::{ColumnName, GlobalId};
21use mz_sql_parser::ast::display::AstDisplay;
22use mz_sql_parser::ast::visit_mut::{self, VisitMut};
23use mz_sql_parser::ast::{
24    CreateConnectionStatement, CreateIndexStatement, CreateMaterializedViewStatement,
25    CreateSecretStatement, CreateSinkStatement, CreateSourceStatement, CreateSubsourceStatement,
26    CreateTableFromSourceStatement, CreateTableStatement, CreateTypeStatement, CreateViewStatement,
27    CreateWebhookSourceStatement, CteBlock, Function, FunctionArgs, Ident, IfExistsBehavior,
28    MutRecBlock, Op, Query, Statement, TableFactor, TableFromSourceColumns, UnresolvedItemName,
29    UnresolvedSchemaName, Value, ViewDefinition,
30};
31
32use crate::names::{Aug, FullItemName, PartialItemName, PartialSchemaName, RawDatabaseSpecifier};
33use crate::plan::error::PlanError;
34use crate::plan::statement::StatementContext;
35
36/// Normalizes a single identifier.
37pub fn ident(ident: Ident) -> String {
38    ident.into_string()
39}
40
41/// Normalizes a single identifier.
42pub fn ident_ref(ident: &Ident) -> &str {
43    ident.as_str()
44}
45
46/// Normalizes an identifier that represents a column name.
47pub fn column_name(id: Ident) -> ColumnName {
48    ColumnName::from(ident(id))
49}
50
51/// Normalizes an unresolved object name.
52pub fn unresolved_item_name(mut name: UnresolvedItemName) -> Result<PartialItemName, PlanError> {
53    if name.0.len() < 1 || name.0.len() > 3 {
54        return Err(PlanError::MisqualifiedName(name.to_string()));
55    }
56    let out = PartialItemName {
57        item: ident(
58            name.0
59                .pop()
60                .expect("name checked to have at least one component"),
61        ),
62        schema: name.0.pop().map(ident),
63        database: name.0.pop().map(ident),
64    };
65    assert!(name.0.is_empty());
66    Ok(out)
67}
68
69/// Normalizes an unresolved schema name.
70pub fn unresolved_schema_name(
71    mut name: UnresolvedSchemaName,
72) -> Result<PartialSchemaName, PlanError> {
73    if name.0.len() < 1 || name.0.len() > 2 {
74        return Err(PlanError::MisqualifiedName(name.to_string()));
75    }
76    let out = PartialSchemaName {
77        schema: ident(
78            name.0
79                .pop()
80                .expect("name checked to have at least one component"),
81        ),
82        database: name.0.pop().map(ident),
83    };
84    assert!(name.0.is_empty());
85    Ok(out)
86}
87
88/// Normalizes an operator reference.
89///
90/// Qualified operators outside of the pg_catalog schema are rejected.
91pub fn op(op: &Op) -> Result<&str, PlanError> {
92    if let Some(namespace) = &op.namespace {
93        if namespace.len() != 0
94            && (namespace.len() != 1
95                || namespace[0].as_str() != mz_repr::namespaces::PG_CATALOG_SCHEMA)
96        {
97            sql_bail!(
98                "operator does not exist: {}.{}",
99                namespace.iter().map(|n| n.to_string()).join("."),
100                op.op,
101            )
102        }
103    }
104    Ok(&op.op)
105}
106
107#[derive(Debug, Clone)]
108pub enum SqlValueOrSecret {
109    Value(Value),
110    Secret(GlobalId),
111}
112
113impl fmt::Display for SqlValueOrSecret {
114    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
115        match self {
116            SqlValueOrSecret::Value(v) => write!(f, "{}", v),
117            SqlValueOrSecret::Secret(id) => write!(f, "{}", id),
118        }
119    }
120}
121
122impl From<SqlValueOrSecret> for Option<Value> {
123    fn from(s: SqlValueOrSecret) -> Self {
124        match s {
125            SqlValueOrSecret::Value(v) => Some(v),
126            SqlValueOrSecret::Secret(_id) => None,
127        }
128    }
129}
130
131/// Unnormalizes an item name.
132///
133/// This is the inverse of the [`unresolved_item_name`] function.
134pub fn unresolve(name: FullItemName) -> UnresolvedItemName {
135    // TODO(parkmycar): Refactor FullItemName to use `Ident`.
136    let mut out = vec![];
137    if let RawDatabaseSpecifier::Name(n) = name.database {
138        out.push(Ident::new_unchecked(n));
139    }
140    out.push(Ident::new_unchecked(name.schema));
141    out.push(Ident::new_unchecked(name.item));
142    UnresolvedItemName(out)
143}
144
145/// Converts an `UnresolvedItemName` to a `FullItemName` if the
146/// `UnresolvedItemName` is fully specified. Otherwise returns an error.
147pub fn full_name(mut raw_name: UnresolvedItemName) -> Result<FullItemName, PlanError> {
148    match raw_name.0.len() {
149        3 => Ok(FullItemName {
150            item: ident(raw_name.0.pop().unwrap()),
151            schema: ident(raw_name.0.pop().unwrap()),
152            database: RawDatabaseSpecifier::Name(ident(raw_name.0.pop().unwrap())),
153        }),
154        2 => Ok(FullItemName {
155            item: ident(raw_name.0.pop().unwrap()),
156            schema: ident(raw_name.0.pop().unwrap()),
157            database: RawDatabaseSpecifier::Ambient,
158        }),
159        _ => sql_bail!("unresolved name {} not fully qualified", raw_name),
160    }
161}
162
163/// Normalizes a `CREATE` statement.
164///
165/// The resulting statement will not depend upon any session parameters, nor
166/// specify any non-default options (like `TEMPORARY`, `IF NOT EXISTS`, etc).
167///
168/// The goal is to construct a backwards-compatible description of the item.
169/// SQL is the most stable part of Materialize, so SQL is used to describe the
170/// items that are persisted in the catalog.
171pub fn create_statement(
172    scx: &StatementContext,
173    mut stmt: Statement<Aug>,
174) -> Result<String, PlanError> {
175    let allocate_name = |name: &UnresolvedItemName| -> Result<_, PlanError> {
176        Ok(unresolve(
177            scx.allocate_full_name(unresolved_item_name(name.clone())?)?,
178        ))
179    };
180
181    let allocate_temporary_name = |name: &UnresolvedItemName| -> Result<_, PlanError> {
182        Ok(unresolve(scx.allocate_temporary_full_name(
183            unresolved_item_name(name.clone())?,
184        )))
185    };
186
187    struct QueryNormalizer {
188        ctes: Vec<Ident>,
189        err: Option<PlanError>,
190    }
191
192    impl QueryNormalizer {
193        fn new() -> QueryNormalizer {
194            QueryNormalizer {
195                ctes: vec![],
196                err: None,
197            }
198        }
199    }
200
201    impl<'ast> VisitMut<'ast, Aug> for QueryNormalizer {
202        fn visit_query_mut(&mut self, query: &'ast mut Query<Aug>) {
203            let n = self.ctes.len();
204            match &query.ctes {
205                CteBlock::Simple(ctes) => {
206                    for cte in ctes.iter() {
207                        self.ctes.push(cte.alias.name.clone());
208                    }
209                }
210                CteBlock::MutuallyRecursive(MutRecBlock { options: _, ctes }) => {
211                    for cte in ctes.iter() {
212                        self.ctes.push(cte.name.clone());
213                    }
214                }
215            }
216            visit_mut::visit_query_mut(self, query);
217            self.ctes.truncate(n);
218        }
219
220        fn visit_function_mut(&mut self, func: &'ast mut Function<Aug>) {
221            match &mut func.args {
222                FunctionArgs::Star => (),
223                FunctionArgs::Args { args, order_by } => {
224                    for arg in args {
225                        self.visit_expr_mut(arg);
226                    }
227                    for expr in order_by {
228                        self.visit_order_by_expr_mut(expr);
229                    }
230                }
231            }
232            if let Some(over) = &mut func.over {
233                self.visit_window_spec_mut(over);
234            }
235        }
236
237        fn visit_table_factor_mut(&mut self, table_factor: &'ast mut TableFactor<Aug>) {
238            match table_factor {
239                TableFactor::Table { name, alias, .. } => {
240                    self.visit_item_name_mut(name);
241                    if let Some(alias) = alias {
242                        self.visit_table_alias_mut(alias);
243                    }
244                }
245                // We only need special behavior for `TableFactor::Table`.
246                // Just visit the other types of table factors like normal.
247                _ => visit_mut::visit_table_factor_mut(self, table_factor),
248            }
249        }
250    }
251
252    // Think very hard before changing any of the branches in this match
253    // statement. All identifiers must be quoted. All object names must be
254    // allocated or resolved, depending on whether they are the object created
255    // by the statement or an object depended upon by the statement. All
256    // non-default options must be disabled.
257    //
258    // Wildcard matches are explicitly avoided so that future additions to the
259    // syntax cause compile errors here. Before you ignore a new field, triple
260    // check that it does not need to be normalized according to the rules
261    // above.
262    match &mut stmt {
263        Statement::CreateSource(CreateSourceStatement {
264            name,
265            in_cluster: _,
266            col_names: _,
267            connection: _,
268            format: _,
269            include_metadata: _,
270            envelope: _,
271            if_not_exists,
272            key_constraint: _,
273            with_options: _,
274            external_references: _,
275            progress_subsource: _,
276        }) => {
277            *name = allocate_name(name)?;
278            *if_not_exists = false;
279        }
280
281        Statement::CreateSubsource(CreateSubsourceStatement {
282            name,
283            columns,
284            constraints: _,
285            of_source: _,
286            if_not_exists,
287            with_options: _,
288        }) => {
289            *name = allocate_name(name)?;
290            let mut normalizer = QueryNormalizer::new();
291            for c in columns {
292                normalizer.visit_column_def_mut(c);
293            }
294            if let Some(err) = normalizer.err {
295                return Err(err);
296            }
297            *if_not_exists = false;
298        }
299
300        Statement::CreateTableFromSource(CreateTableFromSourceStatement {
301            name,
302            columns,
303            constraints: _,
304            external_reference: _,
305            source: _,
306            if_not_exists,
307            format: _,
308            include_metadata: _,
309            envelope: _,
310            with_options: _,
311        }) => {
312            *name = allocate_name(name)?;
313            let mut normalizer = QueryNormalizer::new();
314            if let TableFromSourceColumns::Defined(columns) = columns {
315                for c in columns {
316                    normalizer.visit_column_def_mut(c);
317                }
318            }
319            if let Some(err) = normalizer.err {
320                return Err(err);
321            }
322            *if_not_exists = false;
323        }
324
325        Statement::CreateTable(CreateTableStatement {
326            name,
327            columns,
328            constraints: _,
329            if_not_exists,
330            temporary,
331            with_options: _,
332        }) => {
333            *name = if *temporary {
334                allocate_temporary_name(name)?
335            } else {
336                allocate_name(name)?
337            };
338            let mut normalizer = QueryNormalizer::new();
339            for c in columns {
340                normalizer.visit_column_def_mut(c);
341            }
342            if let Some(err) = normalizer.err {
343                return Err(err);
344            }
345            *if_not_exists = false;
346        }
347
348        Statement::CreateWebhookSource(CreateWebhookSourceStatement {
349            name,
350            is_table: _,
351            if_not_exists,
352            include_headers: _,
353            body_format: _,
354            validate_using: _,
355            in_cluster: _,
356        }) => {
357            *name = allocate_name(name)?;
358            *if_not_exists = false;
359        }
360
361        Statement::CreateSink(CreateSinkStatement {
362            name,
363            in_cluster: _,
364            connection: _,
365            format: _,
366            envelope: _,
367            if_not_exists,
368            ..
369        }) => {
370            if let Some(name) = name {
371                *name = allocate_name(name)?;
372            }
373            *if_not_exists = false;
374        }
375
376        Statement::CreateView(CreateViewStatement {
377            temporary,
378            if_exists,
379            definition:
380                ViewDefinition {
381                    name,
382                    query,
383                    columns: _,
384                },
385        }) => {
386            *name = if *temporary {
387                allocate_temporary_name(name)?
388            } else {
389                allocate_name(name)?
390            };
391            {
392                let mut normalizer = QueryNormalizer::new();
393                normalizer.visit_query_mut(query);
394                if let Some(err) = normalizer.err {
395                    return Err(err);
396                }
397            }
398            *if_exists = IfExistsBehavior::Error;
399        }
400
401        Statement::CreateMaterializedView(CreateMaterializedViewStatement {
402            if_exists,
403            name,
404            columns: _,
405            replacement_for: _,
406            in_cluster: _,
407            in_cluster_replica: _,
408            query,
409            with_options: _,
410            as_of: _,
411        }) => {
412            *name = allocate_name(name)?;
413            {
414                let mut normalizer = QueryNormalizer::new();
415                normalizer.visit_query_mut(query);
416                if let Some(err) = normalizer.err {
417                    return Err(err);
418                }
419            }
420            *if_exists = IfExistsBehavior::Error;
421        }
422
423        Statement::CreateIndex(CreateIndexStatement {
424            name: _,
425            in_cluster: _,
426            key_parts,
427            with_options: _,
428            if_not_exists,
429            ..
430        }) => {
431            let mut normalizer = QueryNormalizer::new();
432            if let Some(key_parts) = key_parts {
433                for key_part in key_parts {
434                    normalizer.visit_expr_mut(key_part);
435                    if let Some(err) = normalizer.err {
436                        return Err(err);
437                    }
438                }
439            }
440            *if_not_exists = false;
441        }
442
443        Statement::CreateType(CreateTypeStatement { name, as_type }) => {
444            *name = allocate_name(name)?;
445            let mut normalizer = QueryNormalizer::new();
446            normalizer.visit_create_type_as_mut(as_type);
447            if let Some(err) = normalizer.err {
448                return Err(err);
449            }
450        }
451        Statement::CreateSecret(CreateSecretStatement {
452            name,
453            if_not_exists,
454            value: _,
455        }) => {
456            *name = allocate_name(name)?;
457            *if_not_exists = false;
458        }
459        Statement::CreateConnection(CreateConnectionStatement {
460            name,
461            connection_type: _,
462            values,
463            with_options,
464            if_not_exists,
465        }) => {
466            *name = allocate_name(name)?;
467            *if_not_exists = false;
468
469            values.sort();
470
471            // Validation only occurs once during planning and should not be
472            // considered part of the statement's AST/canonical representation.
473            with_options
474                .retain(|o| o.name != mz_sql_parser::ast::CreateConnectionOptionName::Validate);
475        }
476
477        _ => bail_internal!("unexpected statement type for normalization"),
478    }
479
480    Ok(stmt.to_ast_string_stable())
481}
482
483/// Generates a struct capable of taking a `Vec` of types commonly used to
484/// represent `WITH` options into useful data types, such as strings.
485/// Additionally, it is able to convert the useful data types back to the `Vec`
486/// of options.
487///
488/// # Parameters
489/// - `$option_ty`: Accepts a struct representing a set of `WITH` options, which
490///     must contain the fields `name` and `value`.
491///     - `name` must be of type `$option_tyName`, e.g. if `$option_ty` is
492///       `FooOption`, then `name` must be of type `FooOptionName`.
493///       `$option_tyName` must be an enum representing `WITH` option keys.
494///     - `TryFromValue<value>` must be implemented for the type you want to
495///       take the option to. The `sql::plan::with_option` module contains these
496///       implementations.
497/// - `$option_name` must be an element of `$option_tyName`
498/// - `$t` is the type you want to convert the option's value to. If the
499///   option's value is absent (i.e. the user only entered the option's key),
500///   you can also define a default value.
501/// - `Default($v)` is an optional parameter that sets the default value of the
502///   field to `$v`. `$v` must be convertible to `$t` using `.into`. This also
503///   converts the struct's type from `Option<$t>` to `<$t>`.
504/// - `AllowMultiple` is an optional parameter that, when specified, allows
505///   the given option to appear multiple times in the `WITH` clause. This
506///   also converts the struct's type from `$t` to `Vec<$t>`.
507macro_rules! generate_extracted_config {
508    // No default specified, have remaining options.
509    (
510        $option_ty:ty, [$($processed:tt)*],
511        ($option_name:path, $t:ty), $($tail:tt),*
512    ) => {
513        generate_extracted_config!(
514            $option_ty,
515            [$($processed)* ($option_name, Option::<$t>, None, false)],
516            $($tail),*
517        );
518    };
519    // No default specified, no remaining options.
520    (
521        $option_ty:ty, [$($processed:tt)*],
522        ($option_name:path, $t:ty)
523    ) => {
524        generate_extracted_config!(
525            $option_ty,
526            [$($processed)* ($option_name, Option::<$t>, None, false)]
527        );
528    };
529    // Default specified, have remaining options.
530    (
531        $option_ty:ty, [$($processed:tt)*],
532        ($option_name:path, $t:ty, Default($v:expr)), $($tail:tt),*
533    ) => {
534        generate_extracted_config!(
535            $option_ty,
536            [$($processed)* ($option_name, $t, $v, false)],
537            $($tail),*
538        );
539    };
540    // Default specified, no remaining options.
541    (
542        $option_ty:ty, [$($processed:tt)*],
543        ($option_name:path, $t:ty, Default($v:expr))
544    ) => {
545        generate_extracted_config!(
546            $option_ty,
547            [$($processed)* ($option_name, $t, $v, false)]
548        );
549    };
550    // AllowMultiple specified, have remaining options.
551    (
552        $option_ty:ty, [$($processed:tt)*],
553        ($option_name:path, $t:ty, AllowMultiple), $($tail:tt),*
554    ) => {
555        generate_extracted_config!(
556            $option_ty,
557            [$($processed)* ($option_name, $t, vec![], true)],
558            $($tail),*
559        );
560    };
561    // AllowMultiple specified, no remaining options.
562    (
563        $option_ty:ty, [$($processed:tt)*],
564        ($option_name:path, $t:ty, AllowMultiple)
565    ) => {
566        generate_extracted_config!(
567            $option_ty,
568            [$($processed)* ($option_name, $t, vec![], true)]
569        );
570    };
571    ($option_ty:ty, [$(($option_name:path, $t:ty, $v:expr, $allow_multiple:literal))+]) => {
572        paste::paste! {
573            #[derive(Debug)]
574            pub struct [<$option_ty Extracted>] {
575                pub(crate) seen: ::std::collections::BTreeSet::<[<$option_ty Name>]>,
576                $(
577                    pub [<$option_name:snake>]: generate_extracted_config!(
578                        @ifty $allow_multiple,
579                        Vec::<$t>,
580                        $t
581                    ),
582                )*
583            }
584
585            impl std::default::Default for [<$option_ty Extracted>] {
586                fn default() -> Self {
587                    [<$option_ty Extracted>] {
588                        seen: ::std::collections::BTreeSet::<[<$option_ty Name>]>::new(),
589                        $(
590                            [<$option_name:snake>]: <generate_extracted_config!(
591                                @ifty $allow_multiple,
592                                Vec::<$t>,
593                                $t
594                            )>::from($v),
595                        )*
596                    }
597                }
598            }
599
600            impl std::convert::TryFrom<Vec<$option_ty<Aug>>>
601                for [<$option_ty Extracted>]
602            {
603                type Error = $crate::plan::PlanError;
604                fn try_from(
605                    v: Vec<$option_ty<Aug>>,
606                ) -> Result<[<$option_ty Extracted>], Self::Error> {
607                    use [<$option_ty Name>]::*;
608                    let mut extracted = [<$option_ty Extracted>]::default();
609                    for option in v {
610                        match option.name {
611                            $(
612                                $option_name => {
613                                    if !$allow_multiple
614                                        && !extracted.seen.insert(option.name.clone())
615                                    {
616                                        sql_bail!(
617                                            "{} specified more than once",
618                                            option.name.to_ast_string_simple(),
619                                        );
620                                    }
621                                    let val: $t = $crate::plan::with_options
622                                        ::TryFromValue::try_from_value(option.value)
623                                        .map_err(|e| sql_err!(
624                                            "invalid {}: {}",
625                                            option.name.to_ast_string_simple(),
626                                            e,
627                                        ))?;
628                                    generate_extracted_config!(
629                                        @ifexpr $allow_multiple,
630                                        extracted.[<$option_name:snake>].push(val),
631                                        extracted.[<$option_name:snake>] = val
632                                    );
633                                }
634                            )*
635                        }
636                    }
637                    Ok(extracted)
638                }
639            }
640
641            impl [<$option_ty Extracted>] {
642                #[allow(unused)]
643                fn into_values(
644                    self,
645                    catalog: &dyn crate::catalog::SessionCatalog,
646                ) -> Vec<$option_ty<Aug>> {
647                    use [<$option_ty Name>]::*;
648                    let mut options = Vec::new();
649                    $(
650                        let value = self.[<$option_name:snake>];
651                        let values: Vec<_> = generate_extracted_config!(
652                            @ifexpr $allow_multiple,
653                            value,
654                            Vec::from([value])
655                        );
656                        for value in values {
657                            // If `try_into_value` returns `None`, then there was no option that
658                            // generated this value. For example, this can happen when `value` is
659                            // `None`.
660                            let maybe_value = <$t as $crate::plan::with_options::TryFromValue<
661                                Option<mz_sql_parser::ast::WithOptionValue<$crate::names::Aug>>
662                            >>::try_into_value(value, catalog);
663                            match maybe_value {
664                                Some(value) => {
665                                    let option = $option_ty {name: $option_name, value};
666                                    options.push(option);
667                                },
668                                None => (),
669                            }
670                        }
671                    )*
672                    options
673                }
674            }
675        }
676    };
677    ($option_ty:ty, $($h:tt),+) => {
678        generate_extracted_config!{$option_ty, [], $($h),+}
679    };
680    // Helper `if` constructs to conditionally generate expressions and types
681    // based on the value of $allow_multiple.
682    (@ifexpr false, $lhs:expr, $rhs:expr) => {
683        $rhs
684    };
685    (@ifexpr true, $lhs:expr, $rhs:expr) => {
686        $lhs
687    };
688    (@ifty false, $lhs:ty, $rhs:ty) => {
689        $rhs
690    };
691    (@ifty true, $lhs:ty, $rhs:ty) => {
692        $lhs
693    };
694}
695
696pub(crate) use generate_extracted_config;