mz_sql/
normalize.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! SQL normalization routines.
11//!
12//! Normalization is the process of taking relatively unstructured types from
13//! the [`ast`] module and converting them to more structured types.
14//!
15//! [`ast`]: crate::ast
16
17use std::fmt;
18
19use itertools::Itertools;
20use mz_repr::{ColumnName, GlobalId};
21use mz_sql_parser::ast::display::AstDisplay;
22use mz_sql_parser::ast::visit_mut::{self, VisitMut};
23use mz_sql_parser::ast::{
24    ContinualTaskStmt, CreateConnectionStatement, CreateContinualTaskStatement,
25    CreateContinualTaskSugar, CreateIndexStatement, CreateMaterializedViewStatement,
26    CreateSecretStatement, CreateSinkStatement, CreateSourceStatement, CreateSubsourceStatement,
27    CreateTableFromSourceStatement, CreateTableStatement, CreateTypeStatement, CreateViewStatement,
28    CreateWebhookSourceStatement, CteBlock, Function, FunctionArgs, Ident, IfExistsBehavior,
29    MutRecBlock, Op, Query, Statement, TableFactor, TableFromSourceColumns, UnresolvedItemName,
30    UnresolvedSchemaName, Value, ViewDefinition,
31};
32
33use crate::names::{Aug, FullItemName, PartialItemName, PartialSchemaName, RawDatabaseSpecifier};
34use crate::plan::error::PlanError;
35use crate::plan::statement::StatementContext;
36
37/// Normalizes a single identifier.
38pub fn ident(ident: Ident) -> String {
39    ident.into_string()
40}
41
42/// Normalizes a single identifier.
43pub fn ident_ref(ident: &Ident) -> &str {
44    ident.as_str()
45}
46
47/// Normalizes an identifier that represents a column name.
48pub fn column_name(id: Ident) -> ColumnName {
49    ColumnName::from(ident(id))
50}
51
52/// Normalizes an unresolved object name.
53pub fn unresolved_item_name(mut name: UnresolvedItemName) -> Result<PartialItemName, PlanError> {
54    if name.0.len() < 1 || name.0.len() > 3 {
55        return Err(PlanError::MisqualifiedName(name.to_string()));
56    }
57    let out = PartialItemName {
58        item: ident(
59            name.0
60                .pop()
61                .expect("name checked to have at least one component"),
62        ),
63        schema: name.0.pop().map(ident),
64        database: name.0.pop().map(ident),
65    };
66    assert!(name.0.is_empty());
67    Ok(out)
68}
69
70/// Normalizes an unresolved schema name.
71pub fn unresolved_schema_name(
72    mut name: UnresolvedSchemaName,
73) -> Result<PartialSchemaName, PlanError> {
74    if name.0.len() < 1 || name.0.len() > 2 {
75        return Err(PlanError::MisqualifiedName(name.to_string()));
76    }
77    let out = PartialSchemaName {
78        schema: ident(
79            name.0
80                .pop()
81                .expect("name checked to have at least one component"),
82        ),
83        database: name.0.pop().map(ident),
84    };
85    assert!(name.0.is_empty());
86    Ok(out)
87}
88
89/// Normalizes an operator reference.
90///
91/// Qualified operators outside of the pg_catalog schema are rejected.
92pub fn op(op: &Op) -> Result<&str, PlanError> {
93    if let Some(namespace) = &op.namespace {
94        if namespace.len() != 0
95            && (namespace.len() != 1
96                || namespace[0].as_str() != mz_repr::namespaces::PG_CATALOG_SCHEMA)
97        {
98            sql_bail!(
99                "operator does not exist: {}.{}",
100                namespace.iter().map(|n| n.to_string()).join("."),
101                op.op,
102            )
103        }
104    }
105    Ok(&op.op)
106}
107
108#[derive(Debug, Clone)]
109pub enum SqlValueOrSecret {
110    Value(Value),
111    Secret(GlobalId),
112}
113
114impl fmt::Display for SqlValueOrSecret {
115    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
116        match self {
117            SqlValueOrSecret::Value(v) => write!(f, "{}", v),
118            SqlValueOrSecret::Secret(id) => write!(f, "{}", id),
119        }
120    }
121}
122
123impl From<SqlValueOrSecret> for Option<Value> {
124    fn from(s: SqlValueOrSecret) -> Self {
125        match s {
126            SqlValueOrSecret::Value(v) => Some(v),
127            SqlValueOrSecret::Secret(_id) => None,
128        }
129    }
130}
131
132/// Unnormalizes an item name.
133///
134/// This is the inverse of the [`unresolved_item_name`] function.
135pub fn unresolve(name: FullItemName) -> UnresolvedItemName {
136    // TODO(parkmycar): Refactor FullItemName to use `Ident`.
137    let mut out = vec![];
138    if let RawDatabaseSpecifier::Name(n) = name.database {
139        out.push(Ident::new_unchecked(n));
140    }
141    out.push(Ident::new_unchecked(name.schema));
142    out.push(Ident::new_unchecked(name.item));
143    UnresolvedItemName(out)
144}
145
146/// Converts an `UnresolvedItemName` to a `FullItemName` if the
147/// `UnresolvedItemName` is fully specified. Otherwise returns an error.
148pub fn full_name(mut raw_name: UnresolvedItemName) -> Result<FullItemName, PlanError> {
149    match raw_name.0.len() {
150        3 => Ok(FullItemName {
151            item: ident(raw_name.0.pop().unwrap()),
152            schema: ident(raw_name.0.pop().unwrap()),
153            database: RawDatabaseSpecifier::Name(ident(raw_name.0.pop().unwrap())),
154        }),
155        2 => Ok(FullItemName {
156            item: ident(raw_name.0.pop().unwrap()),
157            schema: ident(raw_name.0.pop().unwrap()),
158            database: RawDatabaseSpecifier::Ambient,
159        }),
160        _ => sql_bail!("unresolved name {} not fully qualified", raw_name),
161    }
162}
163
164/// Normalizes a `CREATE` statement.
165///
166/// The resulting statement will not depend upon any session parameters, nor
167/// specify any non-default options (like `TEMPORARY`, `IF NOT EXISTS`, etc).
168///
169/// The goal is to construct a backwards-compatible description of the item.
170/// SQL is the most stable part of Materialize, so SQL is used to describe the
171/// items that are persisted in the catalog.
172pub fn create_statement(
173    scx: &StatementContext,
174    mut stmt: Statement<Aug>,
175) -> Result<String, PlanError> {
176    let allocate_name = |name: &UnresolvedItemName| -> Result<_, PlanError> {
177        Ok(unresolve(
178            scx.allocate_full_name(unresolved_item_name(name.clone())?)?,
179        ))
180    };
181
182    let allocate_temporary_name = |name: &UnresolvedItemName| -> Result<_, PlanError> {
183        Ok(unresolve(scx.allocate_temporary_full_name(
184            unresolved_item_name(name.clone())?,
185        )))
186    };
187
188    struct QueryNormalizer {
189        ctes: Vec<Ident>,
190        err: Option<PlanError>,
191    }
192
193    impl QueryNormalizer {
194        fn new() -> QueryNormalizer {
195            QueryNormalizer {
196                ctes: vec![],
197                err: None,
198            }
199        }
200    }
201
202    impl<'ast> VisitMut<'ast, Aug> for QueryNormalizer {
203        fn visit_query_mut(&mut self, query: &'ast mut Query<Aug>) {
204            let n = self.ctes.len();
205            match &query.ctes {
206                CteBlock::Simple(ctes) => {
207                    for cte in ctes.iter() {
208                        self.ctes.push(cte.alias.name.clone());
209                    }
210                }
211                CteBlock::MutuallyRecursive(MutRecBlock { options: _, ctes }) => {
212                    for cte in ctes.iter() {
213                        self.ctes.push(cte.name.clone());
214                    }
215                }
216            }
217            visit_mut::visit_query_mut(self, query);
218            self.ctes.truncate(n);
219        }
220
221        fn visit_function_mut(&mut self, func: &'ast mut Function<Aug>) {
222            match &mut func.args {
223                FunctionArgs::Star => (),
224                FunctionArgs::Args { args, order_by } => {
225                    for arg in args {
226                        self.visit_expr_mut(arg);
227                    }
228                    for expr in order_by {
229                        self.visit_order_by_expr_mut(expr);
230                    }
231                }
232            }
233            if let Some(over) = &mut func.over {
234                self.visit_window_spec_mut(over);
235            }
236        }
237
238        fn visit_table_factor_mut(&mut self, table_factor: &'ast mut TableFactor<Aug>) {
239            match table_factor {
240                TableFactor::Table { name, alias, .. } => {
241                    self.visit_item_name_mut(name);
242                    if let Some(alias) = alias {
243                        self.visit_table_alias_mut(alias);
244                    }
245                }
246                // We only need special behavior for `TableFactor::Table`.
247                // Just visit the other types of table factors like normal.
248                _ => visit_mut::visit_table_factor_mut(self, table_factor),
249            }
250        }
251    }
252
253    // Think very hard before changing any of the branches in this match
254    // statement. All identifiers must be quoted. All object names must be
255    // allocated or resolved, depending on whether they are the object created
256    // by the statement or an object depended upon by the statement. All
257    // non-default options must be disabled.
258    //
259    // Wildcard matches are explicitly avoided so that future additions to the
260    // syntax cause compile errors here. Before you ignore a new field, triple
261    // check that it does not need to be normalized according to the rules
262    // above.
263    match &mut stmt {
264        Statement::CreateSource(CreateSourceStatement {
265            name,
266            in_cluster: _,
267            col_names: _,
268            connection: _,
269            format: _,
270            include_metadata: _,
271            envelope: _,
272            if_not_exists,
273            key_constraint: _,
274            with_options: _,
275            external_references: _,
276            progress_subsource: _,
277        }) => {
278            *name = allocate_name(name)?;
279            *if_not_exists = false;
280        }
281
282        Statement::CreateSubsource(CreateSubsourceStatement {
283            name,
284            columns,
285            constraints: _,
286            of_source: _,
287            if_not_exists,
288            with_options: _,
289        }) => {
290            *name = allocate_name(name)?;
291            let mut normalizer = QueryNormalizer::new();
292            for c in columns {
293                normalizer.visit_column_def_mut(c);
294            }
295            if let Some(err) = normalizer.err {
296                return Err(err);
297            }
298            *if_not_exists = false;
299        }
300
301        Statement::CreateTableFromSource(CreateTableFromSourceStatement {
302            name,
303            columns,
304            constraints: _,
305            external_reference: _,
306            source: _,
307            if_not_exists,
308            format: _,
309            include_metadata: _,
310            envelope: _,
311            with_options: _,
312        }) => {
313            *name = allocate_name(name)?;
314            let mut normalizer = QueryNormalizer::new();
315            if let TableFromSourceColumns::Defined(columns) = columns {
316                for c in columns {
317                    normalizer.visit_column_def_mut(c);
318                }
319            }
320            if let Some(err) = normalizer.err {
321                return Err(err);
322            }
323            *if_not_exists = false;
324        }
325
326        Statement::CreateTable(CreateTableStatement {
327            name,
328            columns,
329            constraints: _,
330            if_not_exists,
331            temporary,
332            with_options: _,
333        }) => {
334            *name = if *temporary {
335                allocate_temporary_name(name)?
336            } else {
337                allocate_name(name)?
338            };
339            let mut normalizer = QueryNormalizer::new();
340            for c in columns {
341                normalizer.visit_column_def_mut(c);
342            }
343            if let Some(err) = normalizer.err {
344                return Err(err);
345            }
346            *if_not_exists = false;
347        }
348
349        Statement::CreateWebhookSource(CreateWebhookSourceStatement {
350            name,
351            is_table: _,
352            if_not_exists,
353            include_headers: _,
354            body_format: _,
355            validate_using: _,
356            in_cluster: _,
357        }) => {
358            *name = allocate_name(name)?;
359            *if_not_exists = false;
360        }
361
362        Statement::CreateSink(CreateSinkStatement {
363            name,
364            in_cluster: _,
365            connection: _,
366            format: _,
367            envelope: _,
368            if_not_exists,
369            ..
370        }) => {
371            if let Some(name) = name {
372                *name = allocate_name(name)?;
373            }
374            *if_not_exists = false;
375        }
376
377        Statement::CreateView(CreateViewStatement {
378            temporary,
379            if_exists,
380            definition:
381                ViewDefinition {
382                    name,
383                    query,
384                    columns: _,
385                },
386        }) => {
387            *name = if *temporary {
388                allocate_temporary_name(name)?
389            } else {
390                allocate_name(name)?
391            };
392            {
393                let mut normalizer = QueryNormalizer::new();
394                normalizer.visit_query_mut(query);
395                if let Some(err) = normalizer.err {
396                    return Err(err);
397                }
398            }
399            *if_exists = IfExistsBehavior::Error;
400        }
401
402        Statement::CreateMaterializedView(CreateMaterializedViewStatement {
403            if_exists,
404            name,
405            columns: _,
406            replacing: _,
407            in_cluster: _,
408            query,
409            with_options: _,
410            as_of: _,
411        }) => {
412            *name = allocate_name(name)?;
413            {
414                let mut normalizer = QueryNormalizer::new();
415                normalizer.visit_query_mut(query);
416                if let Some(err) = normalizer.err {
417                    return Err(err);
418                }
419            }
420            *if_exists = IfExistsBehavior::Error;
421        }
422
423        Statement::CreateContinualTask(CreateContinualTaskStatement {
424            name,
425            columns: _,
426            input,
427            with_options: _,
428            stmts,
429            in_cluster: _,
430            as_of: _,
431            sugar,
432        }) => {
433            let mut normalizer = QueryNormalizer::new();
434            normalizer.visit_item_name_mut(name);
435            normalizer.visit_item_name_mut(input);
436            for stmt in stmts {
437                match stmt {
438                    ContinualTaskStmt::Delete(stmt) => normalizer.visit_delete_statement_mut(stmt),
439                    ContinualTaskStmt::Insert(stmt) => normalizer.visit_insert_statement_mut(stmt),
440                }
441            }
442            match sugar {
443                Some(CreateContinualTaskSugar::Transform { transform }) => {
444                    normalizer.visit_query_mut(transform)
445                }
446                Some(CreateContinualTaskSugar::Retain { retain }) => {
447                    normalizer.visit_expr_mut(retain)
448                }
449                None => {}
450            }
451            if let Some(err) = normalizer.err {
452                return Err(err);
453            }
454        }
455
456        Statement::CreateIndex(CreateIndexStatement {
457            name: _,
458            in_cluster: _,
459            key_parts,
460            with_options: _,
461            if_not_exists,
462            ..
463        }) => {
464            let mut normalizer = QueryNormalizer::new();
465            if let Some(key_parts) = key_parts {
466                for key_part in key_parts {
467                    normalizer.visit_expr_mut(key_part);
468                    if let Some(err) = normalizer.err {
469                        return Err(err);
470                    }
471                }
472            }
473            *if_not_exists = false;
474        }
475
476        Statement::CreateType(CreateTypeStatement { name, as_type }) => {
477            *name = allocate_name(name)?;
478            let mut normalizer = QueryNormalizer::new();
479            normalizer.visit_create_type_as_mut(as_type);
480            if let Some(err) = normalizer.err {
481                return Err(err);
482            }
483        }
484        Statement::CreateSecret(CreateSecretStatement {
485            name,
486            if_not_exists,
487            value: _,
488        }) => {
489            *name = allocate_name(name)?;
490            *if_not_exists = false;
491        }
492        Statement::CreateConnection(CreateConnectionStatement {
493            name,
494            connection_type: _,
495            values,
496            with_options,
497            if_not_exists,
498        }) => {
499            *name = allocate_name(name)?;
500            *if_not_exists = false;
501
502            values.sort();
503
504            // Validation only occurs once during planning and should not be
505            // considered part of the statement's AST/canonical representation.
506            with_options
507                .retain(|o| o.name != mz_sql_parser::ast::CreateConnectionOptionName::Validate);
508        }
509
510        _ => unreachable!(),
511    }
512
513    Ok(stmt.to_ast_string_stable())
514}
515
516/// Generates a struct capable of taking a `Vec` of types commonly used to
517/// represent `WITH` options into useful data types, such as strings.
518/// Additionally, it is able to convert the useful data types back to the `Vec`
519/// of options.
520///
521/// # Parameters
522/// - `$option_ty`: Accepts a struct representing a set of `WITH` options, which
523///     must contain the fields `name` and `value`.
524///     - `name` must be of type `$option_tyName`, e.g. if `$option_ty` is
525///       `FooOption`, then `name` must be of type `FooOptionName`.
526///       `$option_tyName` must be an enum representing `WITH` option keys.
527///     - `TryFromValue<value>` must be implemented for the type you want to
528///       take the option to. The `sql::plan::with_option` module contains these
529///       implementations.
530/// - `$option_name` must be an element of `$option_tyName`
531/// - `$t` is the type you want to convert the option's value to. If the
532///   option's value is absent (i.e. the user only entered the option's key),
533///   you can also define a default value.
534/// - `Default($v)` is an optional parameter that sets the default value of the
535///   field to `$v`. `$v` must be convertible to `$t` using `.into`. This also
536///   converts the struct's type from `Option<$t>` to `<$t>`.
537/// - `AllowMultiple` is an optional parameter that, when specified, allows
538///   the given option to appear multiple times in the `WITH` clause. This
539///   also converts the struct's type from `$t` to `Vec<$t>`.
540macro_rules! generate_extracted_config {
541    // No default specified, have remaining options.
542    ($option_ty:ty, [$($processed:tt)*], ($option_name:path, $t:ty), $($tail:tt),*) => {
543        generate_extracted_config!($option_ty, [$($processed)* ($option_name, Option::<$t>, None, false)], $(
544            $tail
545        ),*);
546    };
547    // No default specified, no remaining options.
548    ($option_ty:ty, [$($processed:tt)*], ($option_name:path, $t:ty)) => {
549        generate_extracted_config!($option_ty, [$($processed)* ($option_name, Option::<$t>, None, false)]);
550    };
551    // Default specified, have remaining options.
552    ($option_ty:ty, [$($processed:tt)*], ($option_name:path, $t:ty, Default($v:expr)), $($tail:tt),*) => {
553        generate_extracted_config!($option_ty, [$($processed)* ($option_name, $t, $v, false)], $(
554            $tail
555        ),*);
556    };
557    // Default specified, no remaining options.
558    ($option_ty:ty, [$($processed:tt)*], ($option_name:path, $t:ty, Default($v:expr))) => {
559        generate_extracted_config!($option_ty, [$($processed)* ($option_name, $t, $v, false)]);
560    };
561    // AllowMultiple specified, have remaining options.
562    ($option_ty:ty, [$($processed:tt)*], ($option_name:path, $t:ty, AllowMultiple), $($tail:tt),*) => {
563        generate_extracted_config!($option_ty, [$($processed)* ($option_name, $t, vec![], true)], $(
564            $tail
565        ),*);
566    };
567    // AllowMultiple specified, no remaining options.
568    ($option_ty:ty, [$($processed:tt)*], ($option_name:path, $t:ty, AllowMultiple)) => {
569        generate_extracted_config!($option_ty, [$($processed)* ($option_name, $t, vec![], true)]);
570    };
571    ($option_ty:ty, [$(($option_name:path, $t:ty, $v:expr, $allow_multiple:literal))+]) => {
572        paste::paste! {
573            #[derive(Debug)]
574            pub struct [<$option_ty Extracted>] {
575                pub(crate) seen: ::std::collections::BTreeSet::<[<$option_ty Name>]>,
576                $(
577                    pub [<$option_name:snake>]: generate_extracted_config!(
578                        @ifty $allow_multiple,
579                        Vec::<$t>,
580                        $t
581                    ),
582                )*
583            }
584
585            impl std::default::Default for [<$option_ty Extracted>] {
586                fn default() -> Self {
587                    [<$option_ty Extracted>] {
588                        seen: ::std::collections::BTreeSet::<[<$option_ty Name>]>::new(),
589                        $(
590                            [<$option_name:snake>]: <generate_extracted_config!(
591                                @ifty $allow_multiple,
592                                Vec::<$t>,
593                                $t
594                            )>::from($v),
595                        )*
596                    }
597                }
598            }
599
600            impl std::convert::TryFrom<Vec<$option_ty<Aug>>> for [<$option_ty Extracted>] {
601                type Error = $crate::plan::PlanError;
602                fn try_from(v: Vec<$option_ty<Aug>>) -> Result<[<$option_ty Extracted>], Self::Error> {
603                    use [<$option_ty Name>]::*;
604                    let mut extracted = [<$option_ty Extracted>]::default();
605                    for option in v {
606                        match option.name {
607                            $(
608                                $option_name => {
609                                    if !$allow_multiple && !extracted.seen.insert(option.name.clone()) {
610                                        sql_bail!("{} specified more than once", option.name.to_ast_string_simple());
611                                    }
612                                    let val: $t = $crate::plan::with_options::TryFromValue::try_from_value(option.value)
613                                        .map_err(|e| sql_err!("invalid {}: {}", option.name.to_ast_string_simple(), e))?;
614                                    generate_extracted_config!(
615                                        @ifexpr $allow_multiple,
616                                        extracted.[<$option_name:snake>].push(val),
617                                        extracted.[<$option_name:snake>] = val
618                                    );
619                                }
620                            )*
621                        }
622                    }
623                    Ok(extracted)
624                }
625            }
626
627            impl [<$option_ty Extracted>] {
628                #[allow(unused)]
629                fn into_values(self, catalog: &dyn crate::catalog::SessionCatalog) -> Vec<$option_ty<Aug>> {
630                    use [<$option_ty Name>]::*;
631                    let mut options = Vec::new();
632                    $(
633                        let value = self.[<$option_name:snake>];
634                        let values: Vec<_> = generate_extracted_config!(
635                            @ifexpr $allow_multiple,
636                            value,
637                            Vec::from([value])
638                        );
639                        for value in values {
640                            // If `try_into_value` returns `None`, then there was no option that
641                            // generated this value. For example, this can happen when `value` is
642                            // `None`.
643                            let maybe_value = <$t as $crate::plan::with_options::TryFromValue<
644                                Option<mz_sql_parser::ast::WithOptionValue<$crate::names::Aug>>
645                            >>::try_into_value(value, catalog);
646                            match maybe_value {
647                                Some(value) => {
648                                    let option = $option_ty {name: $option_name, value};
649                                    options.push(option);
650                                },
651                                None => (),
652                            }
653                        }
654                    )*
655                    options
656                }
657            }
658        }
659    };
660    ($option_ty:ty, $($h:tt),+) => {
661        generate_extracted_config!{$option_ty, [], $($h),+}
662    };
663    // Helper `if` constructs to conditionally generate expressions and types
664    // based on the value of $allow_multiple.
665    (@ifexpr false, $lhs:expr, $rhs:expr) => {
666        $rhs
667    };
668    (@ifexpr true, $lhs:expr, $rhs:expr) => {
669        $lhs
670    };
671    (@ifty false, $lhs:ty, $rhs:ty) => {
672        $rhs
673    };
674    (@ifty true, $lhs:ty, $rhs:ty) => {
675        $lhs
676    };
677}
678
679pub(crate) use generate_extracted_config;