mz_deploy/project/resolve/normalize/visitor.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! The NormalizingVisitor for traversing SQL AST and applying name transformations.
11//!
12//! This module contains the `NormalizingVisitor` struct which transforms object
13//! names in SQL statements using a configurable strategy (via the `NameTransformer`
14//! trait). Query-level traversal is delegated to mz-sql-parser's auto-generated
15//! [`VisitMut`] trait — the visitor overrides `visit_query_mut` (for CTE scope
16//! management) and `visit_table_factor_mut` (for name transformation and implicit
17//! aliasing). All other AST nodes (expressions, set operations, etc.) are handled
18//! by the default traversal.
19//!
20//! ## CTE Scoping
21//!
22//! Common Table Expressions (CTEs) introduce names that shadow real database
23//! objects. The visitor uses [`CteScope`]
24//! to track which names are currently in scope:
25//!
26//! - Simple CTE names are introduced incrementally: each CTE body is visited
27//! with only its *earlier* siblings in scope, then its own name is added.
28//! Mutually-recursive blocks push all names up front. This mirrors
29//! Materialize's resolver (`fold_query` in `src/sql/src/names.rs`), so a
30//! simple CTE whose name shadows a catalog object can still reference that
31//! object inside its own body.
32//! - Unqualified single-identifier references are checked against the scope
33//! stack — if a match is found, the reference is **not transformed** (it
34//! refers to a CTE, not a database object).
35//! - When leaving a `WITH` clause, the scope is popped.
36//!
37//! **Key Insight:** CTE names can only be referenced by their unqualified
38//! name. Any multi-part reference (e.g., `schema.name`) is always a database
39//! object reference and is always transformed.
40//!
41//! ## Implicit Aliasing
42//!
43//! When a table reference in a FROM clause is transformed (e.g., `sales` →
44//! `materialize.public.sales`), an implicit alias preserving the original
45//! table name is attached. This ensures that column references like
46//! `sales.column` continue to resolve correctly after transformation.
47
48use super::overlay_transformer::OverlayTransformer;
49use super::transformers::{
50 ClusterTransformer, ExplainTransformer, FlatteningTransformer, FullyQualifyingTransformer,
51 NameTransformer, StagingTransformer,
52};
53use crate::project::ir::compiled::FullyQualifiedName;
54use crate::project::ir::object_id::ObjectId;
55use crate::project::resolve::cte_scope::CteScope;
56use mz_sql_parser::ast::visit_mut::{self, VisitMut};
57use mz_sql_parser::ast::*;
58
59/// Visitor that traverses SQL AST and transforms names using a given strategy.
60///
61/// This struct is generic over the `NameTransformer` trait, allowing different
62/// transformation strategies to reuse the same traversal logic.
63///
64/// Implements [`VisitMut`] to delegate query-level traversal to mz-sql-parser's
65/// auto-generated visitor, overriding only `visit_query_mut` (CTE scope) and
66/// `visit_table_factor_mut` (name transformation + implicit aliasing).
67pub struct NormalizingVisitor<T: NameTransformer> {
68 transformer: T,
69 cte_scope: CteScope,
70}
71
72impl<T: NameTransformer> NormalizingVisitor<T> {
73 /// Create a new visitor with the given transformer.
74 pub fn new(transformer: T) -> Self {
75 Self {
76 transformer,
77 cte_scope: CteScope::new(),
78 }
79 }
80
81 /// Get a reference to the transformer.
82 pub fn transformer(&self) -> &T {
83 &self.transformer
84 }
85
86 /// Normalize a RawItemName to be transformed according to the strategy.
87 ///
88 /// Converts partially qualified or unqualified object references using
89 /// the current file's FQN context.
90 ///
91 /// CTEs (Common Table Expressions) are not transformed - they remain as-is.
92 pub fn normalize_raw_item_name(&self, name: &mut RawItemName) {
93 let unresolved = name.name_mut();
94
95 // Check if this is a CTE reference (unqualified single identifier)
96 // CTEs can only be referenced by their unqualified name
97 if unresolved.0.len() == 1 {
98 let name_str = unresolved.0[0].to_string();
99 if self.cte_scope.is_cte(&name_str) {
100 // This is a CTE reference - don't transform it
101 crate::verbose!("Skipping transform of CTE reference: {}", name_str);
102 return;
103 }
104 crate::verbose!("Transforming non-CTE reference: {}", name_str);
105 }
106
107 *unresolved = self.transformer.transform_name(unresolved);
108 }
109
110 /// Normalize an UnresolvedItemName to be transformed according to the strategy.
111 ///
112 /// Similar to normalize_raw_item_name, but works directly with UnresolvedItemName.
113 pub fn normalize_unresolved_item_name(&self, name: &mut UnresolvedItemName) {
114 *name = self.transformer.transform_name(name);
115 }
116
117 /// Normalize an UnresolvedSchemaName to be fully qualified (`database.schema`).
118 ///
119 /// Converts unqualified schema names (e.g., `public`) to fully qualified
120 /// names (e.g., `materialize.public`) using the current file's FQN context.
121 pub fn normalize_unresolved_schema_name(&self, name: &mut UnresolvedSchemaName) {
122 match name.0.len() {
123 1 => {
124 // Unqualified: schema only (e.g., "public")
125 // Prepend database to make database.schema
126 let schema = name.0[0].clone();
127 let database = Ident::new(self.transformer.database_name())
128 .expect("valid database identifier");
129 name.0 = vec![database, schema];
130 }
131 _ => {
132 // Already qualified or invalid - leave as-is
133 }
134 }
135 }
136
137 /// Normalize connection references in CREATE SINK statements.
138 ///
139 /// Handles both Kafka and Iceberg sink types, ensuring their connection
140 /// references are normalized.
141 pub fn normalize_sink_connection(&self, connection: &mut CreateSinkConnection<Raw>) {
142 match connection {
143 CreateSinkConnection::Kafka { connection, .. } => {
144 self.normalize_raw_item_name(connection);
145 }
146 CreateSinkConnection::Iceberg {
147 catalog_connection,
148 aws_connection,
149 ..
150 } => {
151 self.normalize_raw_item_name(catalog_connection);
152 if let Some(aws_connection) = aws_connection {
153 self.normalize_raw_item_name(aws_connection);
154 }
155 }
156 }
157 }
158
159 /// Normalize the connection reference in CREATE SOURCE statements.
160 ///
161 /// Sources reference a connection (Kafka, Postgres, etc.) that needs to be
162 /// normalized to a fully qualified name.
163 pub fn normalize_source_connection(&self, connection: &mut CreateSourceConnection<Raw>) {
164 match connection {
165 CreateSourceConnection::Kafka { connection, .. }
166 | CreateSourceConnection::Postgres { connection, .. }
167 | CreateSourceConnection::SqlServer { connection, .. }
168 | CreateSourceConnection::MySql { connection, .. } => {
169 self.normalize_raw_item_name(connection);
170 }
171 CreateSourceConnection::LoadGenerator { .. } => {}
172 }
173 }
174
175 /// Normalize connection option references in CREATE CONNECTION statements.
176 ///
177 /// Handles secret references, item references, AWS PrivateLink connections,
178 /// and Kafka broker tunnels within connection options.
179 pub fn normalize_connection_options(&self, options: &mut [ConnectionOption<Raw>]) {
180 for option in options {
181 if let Some(ref mut value) = option.value {
182 self.normalize_with_option_value(value);
183 }
184 }
185 }
186
187 /// Normalize a single WithOptionValue, recursing into nested structures.
188 fn normalize_with_option_value(&self, value: &mut WithOptionValue<Raw>) {
189 match value {
190 WithOptionValue::Secret(name) | WithOptionValue::Item(name) => {
191 self.normalize_raw_item_name(name);
192 }
193 WithOptionValue::ConnectionAwsPrivatelink(pl) => {
194 self.normalize_raw_item_name(&mut pl.connection);
195 }
196 WithOptionValue::ConnectionKafkaBroker(broker) => match &mut broker.tunnel {
197 KafkaBrokerTunnel::SshTunnel(name) => self.normalize_raw_item_name(name),
198 KafkaBrokerTunnel::AwsPrivatelink(aws) => {
199 self.normalize_raw_item_name(&mut aws.connection)
200 }
201 KafkaBrokerTunnel::Direct => {}
202 },
203 WithOptionValue::Sequence(items) => {
204 for item in items {
205 self.normalize_with_option_value(item);
206 }
207 }
208 _ => {}
209 }
210 }
211
212 /// Normalize all table references in a query (used for views and materialized views).
213 ///
214 /// Delegates to the [`VisitMut`] implementation which handles CTE scoping
215 /// and recursive traversal automatically.
216 pub fn normalize_query(&mut self, query: &mut Query<Raw>) {
217 self.visit_query_mut(query);
218 }
219
220 /// Normalize index references.
221 ///
222 /// Indexes reference the table/view they're created on, and this reference
223 /// needs to be normalized.
224 pub fn normalize_index_references(&self, indexes: &mut [CreateIndexStatement<Raw>]) {
225 for index in indexes {
226 self.normalize_raw_item_name(&mut index.on_name);
227 }
228 }
229
230 /// Normalize cluster references in indexes.
231 ///
232 /// Indexes can specify an IN CLUSTER clause, and these cluster references
233 /// need to be normalized for staging environments.
234 pub fn normalize_index_clusters(&self, indexes: &mut [CreateIndexStatement<Raw>])
235 where
236 T: ClusterTransformer,
237 {
238 for index in indexes {
239 self.normalize_cluster_name(&mut index.in_cluster);
240 }
241 }
242
243 /// Normalize grant target references.
244 ///
245 /// GRANT statements reference the object they grant permissions on, and these
246 /// references need to be normalized.
247 pub fn normalize_grant_references(&self, grants: &mut [GrantPrivilegesStatement<Raw>]) {
248 for grant in grants {
249 if let GrantTargetSpecification::Object {
250 object_spec_inner, ..
251 } = &mut grant.target
252 && let GrantTargetSpecificationInner::Objects { names } = object_spec_inner
253 {
254 for obj in names {
255 if let UnresolvedObjectName::Item(item_name) = obj {
256 self.normalize_unresolved_item_name(item_name);
257 }
258 }
259 }
260 }
261 }
262
263 /// Normalize comment object references.
264 ///
265 /// COMMENT statements reference the object they comment on, and these
266 /// references need to be normalized.
267 pub fn normalize_comment_references(&self, comments: &mut [CommentStatement<Raw>]) {
268 for comment in comments {
269 match &mut comment.object {
270 CommentObjectType::Table { name }
271 | CommentObjectType::View { name }
272 | CommentObjectType::MaterializedView { name }
273 | CommentObjectType::Source { name }
274 | CommentObjectType::Sink { name }
275 | CommentObjectType::Connection { name }
276 | CommentObjectType::Secret { name } => {
277 self.normalize_raw_item_name(name);
278 }
279 CommentObjectType::Column { name } => {
280 // For columns, normalize the table/view reference (the relation)
281 self.normalize_raw_item_name(&mut name.relation);
282 }
283 _ => {
284 // Other comment types don't need normalization
285 }
286 }
287 }
288 }
289
290 /// Normalize a cluster name using a ClusterTransformer.
291 ///
292 /// This method transforms cluster references in statements that support
293 /// the `IN CLUSTER` clause. It's primarily used by the StagingTransformer
294 /// to rename clusters for staging environments.
295 ///
296 /// # Type Parameter
297 /// `T` must implement `ClusterTransformer` for this method to be callable.
298 pub fn normalize_cluster_name(&self, cluster: &mut Option<RawClusterName>)
299 where
300 T: ClusterTransformer,
301 {
302 if let Some(cluster_name) = cluster {
303 match cluster_name {
304 RawClusterName::Unresolved(ident) => {
305 let transformed = self.transformer.transform_cluster(ident);
306 *cluster_name = RawClusterName::Unresolved(transformed);
307 }
308 RawClusterName::Resolved(_) => {
309 // Already resolved, leave as-is
310 }
311 }
312 }
313 }
314}
315
316impl<T: NameTransformer> VisitMut<'_, Raw> for NormalizingVisitor<T> {
317 fn visit_query_mut(&mut self, node: &mut Query<Raw>) {
318 // Mirror Materialize's name resolver (`fold_query` in
319 // `src/sql/src/names.rs`): a simple CTE's body is resolved with only its
320 // *earlier* siblings in scope, so a simple CTE whose name shadows a
321 // catalog object can still reference that object inside its own body.
322 // Mutually-recursive blocks make every name visible up front.
323 if matches!(node.ctes, CteBlock::Simple(_)) {
324 self.cte_scope.push(std::collections::BTreeSet::new());
325 if let CteBlock::Simple(ctes) = &mut node.ctes {
326 for cte in ctes.iter_mut() {
327 // Visit this body before its own name is in scope.
328 self.visit_query_mut(&mut cte.query);
329 self.cte_scope.insert_current(cte.alias.name.to_string());
330 }
331 }
332 // The main query body sees all simple CTE names. Replicate the rest
333 // of the default `Query` traversal (body, order_by, limit, offset).
334 self.visit_set_expr_mut(&mut node.body);
335 for order_by in &mut node.order_by {
336 self.visit_order_by_expr_mut(order_by);
337 }
338 if let Some(limit) = &mut node.limit {
339 self.visit_limit_mut(limit);
340 }
341 if let Some(offset) = &mut node.offset {
342 self.visit_expr_mut(offset);
343 }
344 self.cte_scope.pop();
345 } else {
346 let names = CteScope::collect_cte_names(&node.ctes);
347 self.cte_scope.push(names);
348 visit_mut::visit_query_mut(self, node);
349 self.cte_scope.pop();
350 }
351 }
352
353 fn visit_table_factor_mut(&mut self, node: &mut TableFactor<Raw>) {
354 match node {
355 TableFactor::Table { name, alias } => {
356 // Save the original table name (the last part) before transformation.
357 // This will be used as an implicit alias if one doesn't exist.
358 let original_table_name = match name.name().0.len() {
359 1 => {
360 let name_str = name.name().0[0].to_string();
361 // Don't create an alias if this is a CTE reference (it won't be transformed)
362 if !self.cte_scope.is_cte(&name_str) {
363 Some(name.name().0[0].clone())
364 } else {
365 None
366 }
367 }
368 2 | 3 => Some(name.name().0.last().unwrap().clone()),
369 _ => None,
370 };
371
372 // Normalize the table name (e.g., "sales" -> "materialize.public.sales")
373 self.normalize_raw_item_name(name);
374
375 // If there's no explicit alias and we have an original table name, create
376 // an implicit alias so that qualified column references like "sales.column"
377 // continue to work after transformation.
378 if alias.is_none() {
379 if let Some(original) = original_table_name {
380 *alias = Some(TableAlias {
381 name: original,
382 columns: vec![],
383 strict: false,
384 });
385 }
386 }
387 }
388 _ => visit_mut::visit_table_factor_mut(self, node),
389 }
390 }
391}
392
393// Convenience constructors for common use cases
394impl<'a> NormalizingVisitor<FullyQualifyingTransformer<'a>> {
395 /// Create a visitor that fully qualifies names (`database.schema.object`).
396 pub fn fully_qualifying(fqn: &'a FullyQualifiedName) -> Self {
397 Self::new(FullyQualifyingTransformer {
398 fqn,
399 database_name_map: None,
400 })
401 }
402
403 /// Create a visitor that fully qualifies names and optionally rewrites
404 /// cross-database references using a database name map.
405 pub fn fully_qualifying_with_db_map(
406 fqn: &'a FullyQualifiedName,
407 database_name_map: Option<&'a std::collections::BTreeMap<String, String>>,
408 ) -> Self {
409 Self::new(FullyQualifyingTransformer {
410 fqn,
411 database_name_map,
412 })
413 }
414}
415
416impl<'a> NormalizingVisitor<FlatteningTransformer<'a>> {
417 /// Create a visitor that flattens names (`database_schema_object`).
418 pub fn flattening(fqn: &'a FullyQualifiedName) -> Self {
419 Self::new(FlatteningTransformer { fqn })
420 }
421}
422
423impl<'a> NormalizingVisitor<ExplainTransformer<'a>> {
424 /// Create a visitor that transforms names for the explain command.
425 ///
426 /// All object references are rewritten to `<database>.<explain_schema>."db.schema.obj"`,
427 /// and all `IN CLUSTER` clauses are rewritten to `quickstart`.
428 pub fn explain(
429 fqn: &'a FullyQualifiedName,
430 explain_database: String,
431 explain_schema: String,
432 ) -> Self {
433 Self::new(ExplainTransformer::new(
434 fqn,
435 explain_database,
436 explain_schema,
437 ))
438 }
439}
440
441impl<'a> NormalizingVisitor<OverlayTransformer<'a>> {
442 /// Create a visitor that rewrites references for `mz-deploy dev` overlay
443 /// compilation.
444 ///
445 /// Applies the two-step schema-level resolution rule:
446 /// - External databases (not in `in_project_databases`) → emit verbatim.
447 /// - Dirty `(database, schema)` pairs → rewrite database component to
448 /// `<database>__<profile_name>`.
449 ///
450 /// Any configured `profile_suffix` is applied to in-project names by
451 /// the project planner before `dev` calls this constructor, so the
452 /// transformer sees already-suffixed names.
453 ///
454 /// # Arguments
455 /// * `fqn` - Context used to resolve 1- and 2-part names to fully
456 /// qualified form.
457 /// * `profile_name` - Developer profile name; becomes the `__<name>`
458 /// suffix on overlay databases.
459 /// * `in_project_databases` - Set of databases declared in the project's
460 /// `project.toml` (or equivalent ownership declaration). References
461 /// to databases outside this set are treated as external and emitted
462 /// verbatim.
463 /// * `dirty_schemas` - Dirty `(database, schema)` pairs.
464 /// * `target_cluster` - Cluster name to rewrite every `IN CLUSTER` clause
465 /// on overlay materialized views and indexes to.
466 pub fn overlay(
467 fqn: &'a FullyQualifiedName,
468 profile_name: &'a str,
469 in_project_databases: &'a std::collections::BTreeSet<String>,
470 dirty_schemas: &'a std::collections::BTreeSet<crate::project::SchemaQualifier>,
471 target_cluster: &'a str,
472 ) -> Self {
473 Self::new(OverlayTransformer {
474 fqn,
475 profile_name,
476 in_project_databases,
477 dirty_schemas,
478 target_cluster,
479 })
480 }
481}
482
483impl<'a> NormalizingVisitor<StagingTransformer<'a>> {
484 /// Create a visitor that transforms names for staging environments.
485 ///
486 /// This visitor appends a suffix to schema and cluster names to create
487 /// isolated staging environments. External dependencies and objects not
488 /// being deployed are NOT transformed.
489 ///
490 /// # Arguments
491 /// * `fqn` - The fully qualified name context
492 /// * `suffix` - The suffix to append (e.g., "_staging")
493 /// * `external_dependencies` - Set of external dependencies that should NOT be transformed
494 /// * `objects_to_deploy` - Optional set of objects being deployed; objects not in this set are treated as external
495 ///
496 /// # Example
497 /// ```rust,ignore
498 /// let visitor = NormalizingVisitor::staging(&fqn, "_staging".to_string(), &external_deps, Some(&objects));
499 /// // Transforms: public → public_staging, quickstart → quickstart_staging
500 /// // But leaves external dependencies and non-deployed objects unchanged
501 /// ```
502 pub fn staging(
503 fqn: &'a FullyQualifiedName,
504 suffix: String,
505 external_dependencies: &'a std::collections::BTreeSet<ObjectId>,
506 objects_to_deploy: Option<&'a std::collections::BTreeSet<ObjectId>>,
507 replacement_objects: &'a std::collections::BTreeSet<ObjectId>,
508 ) -> Self {
509 Self::new(StagingTransformer::new(
510 fqn,
511 suffix,
512 external_dependencies,
513 objects_to_deploy,
514 replacement_objects,
515 ))
516 }
517}
518
519#[cfg(test)]
520mod tests {
521 use super::*;
522 use crate::project::SchemaQualifier;
523 use crate::project::ir::object_id::ObjectId;
524 use mz_sql_parser::ast::Ident;
525 use std::collections::BTreeSet;
526
527 #[mz_ore::test]
528 fn overlay_factory_produces_working_visitor() {
529 let fqn: FullyQualifiedName = ObjectId::new(
530 "app".to_string(),
531 "public".to_string(),
532 "my_view".to_string(),
533 )
534 .into();
535 let in_project: BTreeSet<String> = ["app".to_string()].into_iter().collect();
536 let dirty: BTreeSet<SchemaQualifier> = [SchemaQualifier::new(
537 "app".to_string(),
538 "public".to_string(),
539 )]
540 .into_iter()
541 .collect();
542
543 let visitor =
544 NormalizingVisitor::overlay(&fqn, "alice", &in_project, &dirty, "quickstart_dev");
545
546 let mut name = UnresolvedItemName(vec![
547 Ident::new("app").unwrap(),
548 Ident::new("public").unwrap(),
549 Ident::new("orders").unwrap(),
550 ]);
551 visitor.normalize_unresolved_item_name(&mut name);
552
553 assert_eq!(name.0[0].as_str(), "app__alice");
554 assert_eq!(name.0[1].as_str(), "public");
555 assert_eq!(name.0[2].as_str(), "orders");
556 }
557}