Skip to main content

mz_deploy/project/resolve/normalize/
visitor.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! The NormalizingVisitor for traversing SQL AST and applying name transformations.
11//!
12//! This module contains the `NormalizingVisitor` struct which transforms object
13//! names in SQL statements using a configurable strategy (via the `NameTransformer`
14//! trait). Query-level traversal is delegated to mz-sql-parser's auto-generated
15//! [`VisitMut`] trait — the visitor overrides `visit_query_mut` (for CTE scope
16//! management) and `visit_table_factor_mut` (for name transformation and implicit
17//! aliasing). All other AST nodes (expressions, set operations, etc.) are handled
18//! by the default traversal.
19//!
20//! ## CTE Scoping
21//!
22//! Common Table Expressions (CTEs) introduce names that shadow real database
23//! objects. The visitor uses [`CteScope`]
24//! to track which names are currently in scope:
25//!
26//! - Simple CTE names are introduced incrementally: each CTE body is visited
27//!   with only its *earlier* siblings in scope, then its own name is added.
28//!   Mutually-recursive blocks push all names up front. This mirrors
29//!   Materialize's resolver (`fold_query` in `src/sql/src/names.rs`), so a
30//!   simple CTE whose name shadows a catalog object can still reference that
31//!   object inside its own body.
32//! - Unqualified single-identifier references are checked against the scope
33//!   stack — if a match is found, the reference is **not transformed** (it
34//!   refers to a CTE, not a database object).
35//! - When leaving a `WITH` clause, the scope is popped.
36//!
37//! **Key Insight:** CTE names can only be referenced by their unqualified
38//! name. Any multi-part reference (e.g., `schema.name`) is always a database
39//! object reference and is always transformed.
40//!
41//! ## Implicit Aliasing
42//!
43//! When a table reference in a FROM clause is transformed (e.g., `sales` →
44//! `materialize.public.sales`), an implicit alias preserving the original
45//! table name is attached. This ensures that column references like
46//! `sales.column` continue to resolve correctly after transformation.
47
48use super::overlay_transformer::OverlayTransformer;
49use super::transformers::{
50    ClusterTransformer, ExplainTransformer, FlatteningTransformer, FullyQualifyingTransformer,
51    NameTransformer, StagingTransformer,
52};
53use crate::project::ir::compiled::FullyQualifiedName;
54use crate::project::ir::object_id::ObjectId;
55use crate::project::resolve::cte_scope::CteScope;
56use mz_sql_parser::ast::visit_mut::{self, VisitMut};
57use mz_sql_parser::ast::*;
58
59/// Visitor that traverses SQL AST and transforms names using a given strategy.
60///
61/// This struct is generic over the `NameTransformer` trait, allowing different
62/// transformation strategies to reuse the same traversal logic.
63///
64/// Implements [`VisitMut`] to delegate query-level traversal to mz-sql-parser's
65/// auto-generated visitor, overriding only `visit_query_mut` (CTE scope) and
66/// `visit_table_factor_mut` (name transformation + implicit aliasing).
67pub struct NormalizingVisitor<T: NameTransformer> {
68    transformer: T,
69    cte_scope: CteScope,
70}
71
72impl<T: NameTransformer> NormalizingVisitor<T> {
73    /// Create a new visitor with the given transformer.
74    pub fn new(transformer: T) -> Self {
75        Self {
76            transformer,
77            cte_scope: CteScope::new(),
78        }
79    }
80
81    /// Get a reference to the transformer.
82    pub fn transformer(&self) -> &T {
83        &self.transformer
84    }
85
86    /// Normalize a RawItemName to be transformed according to the strategy.
87    ///
88    /// Converts partially qualified or unqualified object references using
89    /// the current file's FQN context.
90    ///
91    /// CTEs (Common Table Expressions) are not transformed - they remain as-is.
92    pub fn normalize_raw_item_name(&self, name: &mut RawItemName) {
93        let unresolved = name.name_mut();
94
95        // Check if this is a CTE reference (unqualified single identifier)
96        // CTEs can only be referenced by their unqualified name
97        if unresolved.0.len() == 1 {
98            let name_str = unresolved.0[0].to_string();
99            if self.cte_scope.is_cte(&name_str) {
100                // This is a CTE reference - don't transform it
101                crate::verbose!("Skipping transform of CTE reference: {}", name_str);
102                return;
103            }
104            crate::verbose!("Transforming non-CTE reference: {}", name_str);
105        }
106
107        *unresolved = self.transformer.transform_name(unresolved);
108    }
109
110    /// Normalize an UnresolvedItemName to be transformed according to the strategy.
111    ///
112    /// Similar to normalize_raw_item_name, but works directly with UnresolvedItemName.
113    pub fn normalize_unresolved_item_name(&self, name: &mut UnresolvedItemName) {
114        *name = self.transformer.transform_name(name);
115    }
116
117    /// Normalize an UnresolvedSchemaName to be fully qualified (`database.schema`).
118    ///
119    /// Converts unqualified schema names (e.g., `public`) to fully qualified
120    /// names (e.g., `materialize.public`) using the current file's FQN context.
121    pub fn normalize_unresolved_schema_name(&self, name: &mut UnresolvedSchemaName) {
122        match name.0.len() {
123            1 => {
124                // Unqualified: schema only (e.g., "public")
125                // Prepend database to make database.schema
126                let schema = name.0[0].clone();
127                let database = Ident::new(self.transformer.database_name())
128                    .expect("valid database identifier");
129                name.0 = vec![database, schema];
130            }
131            _ => {
132                // Already qualified or invalid - leave as-is
133            }
134        }
135    }
136
137    /// Normalize connection references in CREATE SINK statements.
138    ///
139    /// Handles both Kafka and Iceberg sink types, ensuring their connection
140    /// references are normalized.
141    pub fn normalize_sink_connection(&self, connection: &mut CreateSinkConnection<Raw>) {
142        match connection {
143            CreateSinkConnection::Kafka { connection, .. } => {
144                self.normalize_raw_item_name(connection);
145            }
146            CreateSinkConnection::Iceberg {
147                catalog_connection,
148                aws_connection,
149                ..
150            } => {
151                self.normalize_raw_item_name(catalog_connection);
152                if let Some(aws_connection) = aws_connection {
153                    self.normalize_raw_item_name(aws_connection);
154                }
155            }
156        }
157    }
158
159    /// Normalize the connection reference in CREATE SOURCE statements.
160    ///
161    /// Sources reference a connection (Kafka, Postgres, etc.) that needs to be
162    /// normalized to a fully qualified name.
163    pub fn normalize_source_connection(&self, connection: &mut CreateSourceConnection<Raw>) {
164        match connection {
165            CreateSourceConnection::Kafka { connection, .. }
166            | CreateSourceConnection::Postgres { connection, .. }
167            | CreateSourceConnection::SqlServer { connection, .. }
168            | CreateSourceConnection::MySql { connection, .. } => {
169                self.normalize_raw_item_name(connection);
170            }
171            CreateSourceConnection::LoadGenerator { .. } => {}
172        }
173    }
174
175    /// Normalize connection option references in CREATE CONNECTION statements.
176    ///
177    /// Handles secret references, item references, AWS PrivateLink connections,
178    /// and Kafka broker tunnels within connection options.
179    pub fn normalize_connection_options(&self, options: &mut [ConnectionOption<Raw>]) {
180        for option in options {
181            if let Some(ref mut value) = option.value {
182                self.normalize_with_option_value(value);
183            }
184        }
185    }
186
187    /// Normalize a single WithOptionValue, recursing into nested structures.
188    fn normalize_with_option_value(&self, value: &mut WithOptionValue<Raw>) {
189        match value {
190            WithOptionValue::Secret(name) | WithOptionValue::Item(name) => {
191                self.normalize_raw_item_name(name);
192            }
193            WithOptionValue::ConnectionAwsPrivatelink(pl) => {
194                self.normalize_raw_item_name(&mut pl.connection);
195            }
196            WithOptionValue::ConnectionKafkaBroker(broker) => match &mut broker.tunnel {
197                KafkaBrokerTunnel::SshTunnel(name) => self.normalize_raw_item_name(name),
198                KafkaBrokerTunnel::AwsPrivatelink(aws) => {
199                    self.normalize_raw_item_name(&mut aws.connection)
200                }
201                KafkaBrokerTunnel::Direct => {}
202            },
203            WithOptionValue::Sequence(items) => {
204                for item in items {
205                    self.normalize_with_option_value(item);
206                }
207            }
208            _ => {}
209        }
210    }
211
212    /// Normalize all table references in a query (used for views and materialized views).
213    ///
214    /// Delegates to the [`VisitMut`] implementation which handles CTE scoping
215    /// and recursive traversal automatically.
216    pub fn normalize_query(&mut self, query: &mut Query<Raw>) {
217        self.visit_query_mut(query);
218    }
219
220    /// Normalize index references.
221    ///
222    /// Indexes reference the table/view they're created on, and this reference
223    /// needs to be normalized.
224    pub fn normalize_index_references(&self, indexes: &mut [CreateIndexStatement<Raw>]) {
225        for index in indexes {
226            self.normalize_raw_item_name(&mut index.on_name);
227        }
228    }
229
230    /// Normalize cluster references in indexes.
231    ///
232    /// Indexes can specify an IN CLUSTER clause, and these cluster references
233    /// need to be normalized for staging environments.
234    pub fn normalize_index_clusters(&self, indexes: &mut [CreateIndexStatement<Raw>])
235    where
236        T: ClusterTransformer,
237    {
238        for index in indexes {
239            self.normalize_cluster_name(&mut index.in_cluster);
240        }
241    }
242
243    /// Normalize grant target references.
244    ///
245    /// GRANT statements reference the object they grant permissions on, and these
246    /// references need to be normalized.
247    pub fn normalize_grant_references(&self, grants: &mut [GrantPrivilegesStatement<Raw>]) {
248        for grant in grants {
249            if let GrantTargetSpecification::Object {
250                object_spec_inner, ..
251            } = &mut grant.target
252                && let GrantTargetSpecificationInner::Objects { names } = object_spec_inner
253            {
254                for obj in names {
255                    if let UnresolvedObjectName::Item(item_name) = obj {
256                        self.normalize_unresolved_item_name(item_name);
257                    }
258                }
259            }
260        }
261    }
262
263    /// Normalize comment object references.
264    ///
265    /// COMMENT statements reference the object they comment on, and these
266    /// references need to be normalized.
267    pub fn normalize_comment_references(&self, comments: &mut [CommentStatement<Raw>]) {
268        for comment in comments {
269            match &mut comment.object {
270                CommentObjectType::Table { name }
271                | CommentObjectType::View { name }
272                | CommentObjectType::MaterializedView { name }
273                | CommentObjectType::Source { name }
274                | CommentObjectType::Sink { name }
275                | CommentObjectType::Connection { name }
276                | CommentObjectType::Secret { name } => {
277                    self.normalize_raw_item_name(name);
278                }
279                CommentObjectType::Column { name } => {
280                    // For columns, normalize the table/view reference (the relation)
281                    self.normalize_raw_item_name(&mut name.relation);
282                }
283                _ => {
284                    // Other comment types don't need normalization
285                }
286            }
287        }
288    }
289
290    /// Normalize a cluster name using a ClusterTransformer.
291    ///
292    /// This method transforms cluster references in statements that support
293    /// the `IN CLUSTER` clause. It's primarily used by the StagingTransformer
294    /// to rename clusters for staging environments.
295    ///
296    /// # Type Parameter
297    /// `T` must implement `ClusterTransformer` for this method to be callable.
298    pub fn normalize_cluster_name(&self, cluster: &mut Option<RawClusterName>)
299    where
300        T: ClusterTransformer,
301    {
302        if let Some(cluster_name) = cluster {
303            match cluster_name {
304                RawClusterName::Unresolved(ident) => {
305                    let transformed = self.transformer.transform_cluster(ident);
306                    *cluster_name = RawClusterName::Unresolved(transformed);
307                }
308                RawClusterName::Resolved(_) => {
309                    // Already resolved, leave as-is
310                }
311            }
312        }
313    }
314}
315
316impl<T: NameTransformer> VisitMut<'_, Raw> for NormalizingVisitor<T> {
317    fn visit_query_mut(&mut self, node: &mut Query<Raw>) {
318        // Mirror Materialize's name resolver (`fold_query` in
319        // `src/sql/src/names.rs`): a simple CTE's body is resolved with only its
320        // *earlier* siblings in scope, so a simple CTE whose name shadows a
321        // catalog object can still reference that object inside its own body.
322        // Mutually-recursive blocks make every name visible up front.
323        if matches!(node.ctes, CteBlock::Simple(_)) {
324            self.cte_scope.push(std::collections::BTreeSet::new());
325            if let CteBlock::Simple(ctes) = &mut node.ctes {
326                for cte in ctes.iter_mut() {
327                    // Visit this body before its own name is in scope.
328                    self.visit_query_mut(&mut cte.query);
329                    self.cte_scope.insert_current(cte.alias.name.to_string());
330                }
331            }
332            // The main query body sees all simple CTE names. Replicate the rest
333            // of the default `Query` traversal (body, order_by, limit, offset).
334            self.visit_set_expr_mut(&mut node.body);
335            for order_by in &mut node.order_by {
336                self.visit_order_by_expr_mut(order_by);
337            }
338            if let Some(limit) = &mut node.limit {
339                self.visit_limit_mut(limit);
340            }
341            if let Some(offset) = &mut node.offset {
342                self.visit_expr_mut(offset);
343            }
344            self.cte_scope.pop();
345        } else {
346            let names = CteScope::collect_cte_names(&node.ctes);
347            self.cte_scope.push(names);
348            visit_mut::visit_query_mut(self, node);
349            self.cte_scope.pop();
350        }
351    }
352
353    fn visit_table_factor_mut(&mut self, node: &mut TableFactor<Raw>) {
354        match node {
355            TableFactor::Table { name, alias } => {
356                // Save the original table name (the last part) before transformation.
357                // This will be used as an implicit alias if one doesn't exist.
358                let original_table_name = match name.name().0.len() {
359                    1 => {
360                        let name_str = name.name().0[0].to_string();
361                        // Don't create an alias if this is a CTE reference (it won't be transformed)
362                        if !self.cte_scope.is_cte(&name_str) {
363                            Some(name.name().0[0].clone())
364                        } else {
365                            None
366                        }
367                    }
368                    2 | 3 => Some(name.name().0.last().unwrap().clone()),
369                    _ => None,
370                };
371
372                // Normalize the table name (e.g., "sales" -> "materialize.public.sales")
373                self.normalize_raw_item_name(name);
374
375                // If there's no explicit alias and we have an original table name, create
376                // an implicit alias so that qualified column references like "sales.column"
377                // continue to work after transformation.
378                if alias.is_none() {
379                    if let Some(original) = original_table_name {
380                        *alias = Some(TableAlias {
381                            name: original,
382                            columns: vec![],
383                            strict: false,
384                        });
385                    }
386                }
387            }
388            _ => visit_mut::visit_table_factor_mut(self, node),
389        }
390    }
391}
392
393// Convenience constructors for common use cases
394impl<'a> NormalizingVisitor<FullyQualifyingTransformer<'a>> {
395    /// Create a visitor that fully qualifies names (`database.schema.object`).
396    pub fn fully_qualifying(fqn: &'a FullyQualifiedName) -> Self {
397        Self::new(FullyQualifyingTransformer {
398            fqn,
399            database_name_map: None,
400        })
401    }
402
403    /// Create a visitor that fully qualifies names and optionally rewrites
404    /// cross-database references using a database name map.
405    pub fn fully_qualifying_with_db_map(
406        fqn: &'a FullyQualifiedName,
407        database_name_map: Option<&'a std::collections::BTreeMap<String, String>>,
408    ) -> Self {
409        Self::new(FullyQualifyingTransformer {
410            fqn,
411            database_name_map,
412        })
413    }
414}
415
416impl<'a> NormalizingVisitor<FlatteningTransformer<'a>> {
417    /// Create a visitor that flattens names (`database_schema_object`).
418    pub fn flattening(fqn: &'a FullyQualifiedName) -> Self {
419        Self::new(FlatteningTransformer { fqn })
420    }
421}
422
423impl<'a> NormalizingVisitor<ExplainTransformer<'a>> {
424    /// Create a visitor that transforms names for the explain command.
425    ///
426    /// All object references are rewritten to `<database>.<explain_schema>."db.schema.obj"`,
427    /// and all `IN CLUSTER` clauses are rewritten to `quickstart`.
428    pub fn explain(
429        fqn: &'a FullyQualifiedName,
430        explain_database: String,
431        explain_schema: String,
432    ) -> Self {
433        Self::new(ExplainTransformer::new(
434            fqn,
435            explain_database,
436            explain_schema,
437        ))
438    }
439}
440
441impl<'a> NormalizingVisitor<OverlayTransformer<'a>> {
442    /// Create a visitor that rewrites references for `mz-deploy dev` overlay
443    /// compilation.
444    ///
445    /// Applies the two-step schema-level resolution rule:
446    /// - External databases (not in `in_project_databases`) → emit verbatim.
447    /// - Dirty `(database, schema)` pairs → rewrite database component to
448    ///   `<database>__<profile_name>`.
449    ///
450    /// Any configured `profile_suffix` is applied to in-project names by
451    /// the project planner before `dev` calls this constructor, so the
452    /// transformer sees already-suffixed names.
453    ///
454    /// # Arguments
455    /// * `fqn` - Context used to resolve 1- and 2-part names to fully
456    ///   qualified form.
457    /// * `profile_name` - Developer profile name; becomes the `__<name>`
458    ///   suffix on overlay databases.
459    /// * `in_project_databases` - Set of databases declared in the project's
460    ///   `project.toml` (or equivalent ownership declaration). References
461    ///   to databases outside this set are treated as external and emitted
462    ///   verbatim.
463    /// * `dirty_schemas` - Dirty `(database, schema)` pairs.
464    /// * `target_cluster` - Cluster name to rewrite every `IN CLUSTER` clause
465    ///   on overlay materialized views and indexes to.
466    pub fn overlay(
467        fqn: &'a FullyQualifiedName,
468        profile_name: &'a str,
469        in_project_databases: &'a std::collections::BTreeSet<String>,
470        dirty_schemas: &'a std::collections::BTreeSet<crate::project::SchemaQualifier>,
471        target_cluster: &'a str,
472    ) -> Self {
473        Self::new(OverlayTransformer {
474            fqn,
475            profile_name,
476            in_project_databases,
477            dirty_schemas,
478            target_cluster,
479        })
480    }
481}
482
483impl<'a> NormalizingVisitor<StagingTransformer<'a>> {
484    /// Create a visitor that transforms names for staging environments.
485    ///
486    /// This visitor appends a suffix to schema and cluster names to create
487    /// isolated staging environments. External dependencies and objects not
488    /// being deployed are NOT transformed.
489    ///
490    /// # Arguments
491    /// * `fqn` - The fully qualified name context
492    /// * `suffix` - The suffix to append (e.g., "_staging")
493    /// * `external_dependencies` - Set of external dependencies that should NOT be transformed
494    /// * `objects_to_deploy` - Optional set of objects being deployed; objects not in this set are treated as external
495    ///
496    /// # Example
497    /// ```rust,ignore
498    /// let visitor = NormalizingVisitor::staging(&fqn, "_staging".to_string(), &external_deps, Some(&objects));
499    /// // Transforms: public → public_staging, quickstart → quickstart_staging
500    /// // But leaves external dependencies and non-deployed objects unchanged
501    /// ```
502    pub fn staging(
503        fqn: &'a FullyQualifiedName,
504        suffix: String,
505        external_dependencies: &'a std::collections::BTreeSet<ObjectId>,
506        objects_to_deploy: Option<&'a std::collections::BTreeSet<ObjectId>>,
507        replacement_objects: &'a std::collections::BTreeSet<ObjectId>,
508    ) -> Self {
509        Self::new(StagingTransformer::new(
510            fqn,
511            suffix,
512            external_dependencies,
513            objects_to_deploy,
514            replacement_objects,
515        ))
516    }
517}
518
519#[cfg(test)]
520mod tests {
521    use super::*;
522    use crate::project::SchemaQualifier;
523    use crate::project::ir::object_id::ObjectId;
524    use mz_sql_parser::ast::Ident;
525    use std::collections::BTreeSet;
526
527    #[mz_ore::test]
528    fn overlay_factory_produces_working_visitor() {
529        let fqn: FullyQualifiedName = ObjectId::new(
530            "app".to_string(),
531            "public".to_string(),
532            "my_view".to_string(),
533        )
534        .into();
535        let in_project: BTreeSet<String> = ["app".to_string()].into_iter().collect();
536        let dirty: BTreeSet<SchemaQualifier> = [SchemaQualifier::new(
537            "app".to_string(),
538            "public".to_string(),
539        )]
540        .into_iter()
541        .collect();
542
543        let visitor =
544            NormalizingVisitor::overlay(&fqn, "alice", &in_project, &dirty, "quickstart_dev");
545
546        let mut name = UnresolvedItemName(vec![
547            Ident::new("app").unwrap(),
548            Ident::new("public").unwrap(),
549            Ident::new("orders").unwrap(),
550        ]);
551        visitor.normalize_unresolved_item_name(&mut name);
552
553        assert_eq!(name.0[0].as_str(), "app__alice");
554        assert_eq!(name.0[1].as_str(), "public");
555        assert_eq!(name.0[2].as_str(), "orders");
556    }
557}