Skip to main content

mz_deploy/project/resolve/normalize/
transformers.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Name transformation strategies for SQL AST normalization.
11//!
12//! This module provides different strategies for transforming object names in SQL statements.
13//! Each transformer implements the `NameTransformer` trait, allowing the `NormalizingVisitor`
14//! to apply different transformation strategies using the same traversal logic.
15//!
16//! ## Strategies
17//!
18//! | Transformer | When used | Transform example |
19//! |-------------|-----------|-------------------|
20//! | `FullyQualifyingTransformer` | Typed phase (default normalization) | `sales` → `materialize.public.sales` |
21//! | `FlatteningTransformer` | Type-checking (single-schema container) | `materialize.public.sales` → `"materialize.public.sales"` |
22//! | `StagingTransformer` | Blue/green staging | `materialize.public.sales` → `materialize.public_v1.sales` |
23//! | `ExplainTransformer` | Explain command (dedicated schema) | `materialize.public.sales` → `materialize._mz_explain."materialize.public.sales"` |
24//!
25//! ## StagingTransformer Rules
26//!
27//! The staging transformer appends a suffix to schema and cluster names. It
28//! has special handling for objects that should **not** be transformed:
29//!
30//! - **External dependencies** — objects not defined in the project are
31//!   referenced as-is (they exist in production schemas).
32//! - **Non-deployed objects** — when `objects_to_deploy` is set, objects
33//!   outside that set are treated as external.
34//! - **Replacement objects** — objects in replacement schemas are deployed
35//!   in-place, so references to them are not suffixed.
36//!
37//! **Key Insight:** `transform_own_name` always suffixes, even for replacement
38//! objects. The `is_external` exemption applies only to *references to other
39//! objects*, not to the object's own CREATE statement name.
40
41use crate::project::ir::compiled::FullyQualifiedName;
42use crate::project::ir::object_id::ObjectId;
43use mz_repr::namespaces::is_system_schema;
44use mz_sql_parser::ast::*;
45use std::collections::BTreeMap;
46
47/// Trait for transforming object names in SQL AST nodes.
48///
49/// Implementations of this trait define how names should be transformed
50/// (e.g., fully qualified, flattened, etc.).
51pub trait NameTransformer {
52    /// Transform a name using the implementing strategy.
53    ///
54    /// Takes an `UnresolvedItemName` and returns a transformed version according
55    /// to the strategy. The input may be partially qualified (1, 2, or 3 parts).
56    fn transform_name(&self, name: &UnresolvedItemName) -> UnresolvedItemName;
57
58    /// Transform the object's own name (the name in the CREATE statement).
59    ///
60    /// By default, delegates to `transform_name`. Override to apply different
61    /// rules for the object being created vs. references to other objects.
62    /// For example, the staging transformer always suffixes the object's own
63    /// schema even when it is a replacement object.
64    fn transform_own_name(&self, name: &UnresolvedItemName) -> UnresolvedItemName {
65        self.transform_name(name)
66    }
67
68    /// Get the database name from the transformer's FQN context.
69    fn database_name(&self) -> &str;
70}
71
72/// Transforms names to be fully qualified (`database.schema.object`).
73///
74/// This is the default normalization strategy that ensures all object references
75/// use the 3-part qualified format.
76pub struct FullyQualifyingTransformer<'a> {
77    pub(crate) fqn: &'a FullyQualifiedName,
78    /// Optional mapping from original database names to suffixed names.
79    /// When present, 3-part names with a database in this map get rewritten.
80    pub(crate) database_name_map: Option<&'a BTreeMap<String, String>>,
81}
82
83impl<'a> NameTransformer for FullyQualifyingTransformer<'a> {
84    fn transform_name(&self, name: &UnresolvedItemName) -> UnresolvedItemName {
85        match name.0.len() {
86            1 => {
87                // Unqualified: object only
88                // Convert to database.schema.object
89                let object = name.0[0].clone();
90                let database = Ident::new(self.fqn.database()).expect("valid database identifier");
91                let schema = Ident::new(self.fqn.schema()).expect("valid schema identifier");
92                UnresolvedItemName(vec![database, schema, object])
93            }
94            2 => {
95                // Schema-qualified: schema.object. System catalogs are
96                // database-less, so leave them as 2-part. Otherwise prepend
97                // the file's default database.
98                if is_system_schema(name.0[0].as_str()) {
99                    return name.clone();
100                }
101                let schema = name.0[0].clone();
102                let object = name.0[1].clone();
103                let database = Ident::new(self.fqn.database()).expect("valid database identifier");
104                UnresolvedItemName(vec![database, schema, object])
105            }
106            3 => {
107                // Already fully qualified — optionally rewrite database name
108                if let Some(map) = &self.database_name_map {
109                    let db_str = name.0[0].to_string();
110                    if let Some(new_db) = map.get(&db_str) {
111                        let database = Ident::new(new_db).expect("valid identifier");
112                        return UnresolvedItemName(vec![
113                            database,
114                            name.0[1].clone(),
115                            name.0[2].clone(),
116                        ]);
117                    }
118                }
119                name.clone()
120            }
121            _ => {
122                // Invalid - return as-is
123                name.clone()
124            }
125        }
126    }
127
128    fn database_name(&self) -> &str {
129        self.fqn.database()
130    }
131}
132
133/// Transforms names to be flattened (`database_schema_object`).
134///
135/// This strategy creates a single unqualified identifier by concatenating
136/// the database, schema, and object names with underscores. Useful for
137/// temporary objects that need unqualified names.
138pub struct FlatteningTransformer<'a> {
139    pub(crate) fqn: &'a FullyQualifiedName,
140}
141
142impl<'a> NameTransformer for FlatteningTransformer<'a> {
143    fn transform_name(&self, name: &UnresolvedItemName) -> UnresolvedItemName {
144        // System catalog references live outside the flattening container:
145        // the typecheck server resolves them natively. Leave them as-is.
146        if name.0.len() == 2 && is_system_schema(name.0[0].as_str()) {
147            return name.clone();
148        }
149
150        // First, fully qualify the name to ensure we have all parts
151        let fully_qualified = match name.0.len() {
152            1 => {
153                // Unqualified: object only - use FQN context
154                vec![
155                    self.fqn.database().to_string(),
156                    self.fqn.schema().to_string(),
157                    name.0[0].to_string(),
158                ]
159            }
160            2 => {
161                // Schema-qualified: schema.object - use FQN database
162                vec![
163                    self.fqn.database().to_string(),
164                    name.0[0].to_string(),
165                    name.0[1].to_string(),
166                ]
167            }
168            3 => {
169                // Already fully qualified
170                vec![
171                    name.0[0].to_string(),
172                    name.0[1].to_string(),
173                    name.0[2].to_string(),
174                ]
175            }
176            _ => {
177                // Invalid - return as-is
178                return name.clone();
179            }
180        };
181
182        // Flatten to single identifier: "database.schema.object"
183        let flattened = fully_qualified.join(".");
184        let flattened_ident = Ident::new(&flattened).expect("valid flattened identifier");
185        UnresolvedItemName(vec![flattened_ident])
186    }
187
188    fn database_name(&self) -> &str {
189        self.fqn.database()
190    }
191}
192
193/// Transforms names for staging environments by appending a suffix to schema names.
194///
195/// This strategy is used to create isolated staging environments where all objects
196/// are deployed to schema names with a suffix (e.g., `public_staging`), and all
197/// clusters are renamed with the same suffix (e.g., `quickstart_staging`).
198///
199/// External dependencies (objects not defined in the project) are NOT transformed.
200/// Objects not being deployed in this staging run are also treated as external.
201pub struct StagingTransformer<'a> {
202    fqn: &'a FullyQualifiedName,
203    staging_suffix: String,
204    external_dependencies: &'a std::collections::BTreeSet<ObjectId>,
205    objects_to_deploy: Option<&'a std::collections::BTreeSet<ObjectId>>,
206    replacement_objects: &'a std::collections::BTreeSet<ObjectId>,
207}
208
209impl<'a> StagingTransformer<'a> {
210    /// Create a new staging transformer with the given suffix.
211    ///
212    /// # Arguments
213    /// * `fqn` - The fully qualified name context
214    /// * `staging_suffix` - The suffix to append (e.g., "_staging")
215    /// * `external_dependencies` - Set of external dependencies that should NOT be transformed
216    /// * `objects_to_deploy` - Optional set of objects being deployed; objects not in this set are treated as external
217    pub fn new(
218        fqn: &'a FullyQualifiedName,
219        staging_suffix: String,
220        external_dependencies: &'a std::collections::BTreeSet<ObjectId>,
221        objects_to_deploy: Option<&'a std::collections::BTreeSet<ObjectId>>,
222        replacement_objects: &'a std::collections::BTreeSet<ObjectId>,
223    ) -> Self {
224        Self {
225            fqn,
226            staging_suffix,
227            external_dependencies,
228            objects_to_deploy,
229            replacement_objects,
230        }
231    }
232
233    /// Check if a name refers to an external dependency or an object not being deployed
234    pub(crate) fn is_external(&self, name: &UnresolvedItemName) -> bool {
235        use ObjectId;
236
237        if name.0.is_empty() || name.0.len() > 3 {
238            return false; // Invalid name, not external
239        }
240        let object_id = ObjectId::from_item_name(name, self.fqn.database(), self.fqn.schema());
241
242        // Check if it's in the external dependencies
243        if self.external_dependencies.contains(&object_id) {
244            return true;
245        }
246
247        // Replacement objects are deployed in-place (not to staging schemas),
248        // so references to them should never be suffixed.
249        if self.replacement_objects.contains(&object_id) {
250            return true;
251        }
252
253        // If objects_to_deploy is specified, check if this object is NOT in that set
254        // If not being deployed, treat as external
255        if let Some(objects_to_deploy) = self.objects_to_deploy
256            && !objects_to_deploy.contains(&object_id)
257        {
258            return true;
259        }
260
261        false
262    }
263}
264
265impl<'a> NameTransformer for StagingTransformer<'a> {
266    fn transform_name(&self, name: &UnresolvedItemName) -> UnresolvedItemName {
267        // Check if this is an external dependency - if so, don't transform it
268        if self.is_external(name) {
269            return name.clone();
270        }
271
272        match name.0.len() {
273            1 => {
274                // Unqualified: object only
275                // Add staging suffix to schema: database.schema_staging.object
276                let object = name.0[0].clone();
277                let database = Ident::new(self.fqn.database()).expect("valid database identifier");
278                let staging_schema = format!("{}{}", self.fqn.schema(), self.staging_suffix);
279                let schema = Ident::new(&staging_schema).expect("valid schema identifier");
280                UnresolvedItemName(vec![database, schema, object])
281            }
282            2 => {
283                // Schema-qualified: schema.object
284                // Add staging suffix to schema: database.schema_staging.object
285                let schema_name = format!("{}{}", name.0[0], self.staging_suffix);
286                let schema = Ident::new(&schema_name).expect("valid schema identifier");
287                let object = name.0[1].clone();
288                let database = Ident::new(self.fqn.database()).expect("valid database identifier");
289                UnresolvedItemName(vec![database, schema, object])
290            }
291            3 => {
292                // Fully qualified: database.schema.object
293                // Add staging suffix to schema: database.schema_staging.object
294                let database = name.0[0].clone();
295                let schema_name = format!("{}{}", name.0[1], self.staging_suffix);
296                let schema = Ident::new(&schema_name).expect("valid schema identifier");
297                let object = name.0[2].clone();
298                UnresolvedItemName(vec![database, schema, object])
299            }
300            _ => {
301                // Invalid - return as-is
302                name.clone()
303            }
304        }
305    }
306
307    /// Always suffix the object's own name, even for replacement objects.
308    ///
309    /// During staging, the object being created must always go into the staging
310    /// schema (e.g., `core_v1`). The `is_external` check for replacement objects
311    /// only applies to *references* to other objects — not to the object's own
312    /// CREATE statement name.
313    fn transform_own_name(&self, name: &UnresolvedItemName) -> UnresolvedItemName {
314        match name.0.len() {
315            1 => {
316                let object = name.0[0].clone();
317                let database = Ident::new(self.fqn.database()).expect("valid database identifier");
318                let staging_schema = format!("{}{}", self.fqn.schema(), self.staging_suffix);
319                let schema = Ident::new(&staging_schema).expect("valid schema identifier");
320                UnresolvedItemName(vec![database, schema, object])
321            }
322            2 => {
323                let schema_name = format!("{}{}", name.0[0], self.staging_suffix);
324                let schema = Ident::new(&schema_name).expect("valid schema identifier");
325                let object = name.0[1].clone();
326                let database = Ident::new(self.fqn.database()).expect("valid database identifier");
327                UnresolvedItemName(vec![database, schema, object])
328            }
329            3 => {
330                let database = name.0[0].clone();
331                let schema_name = format!("{}{}", name.0[1], self.staging_suffix);
332                let schema = Ident::new(&schema_name).expect("valid schema identifier");
333                let object = name.0[2].clone();
334                UnresolvedItemName(vec![database, schema, object])
335            }
336            _ => name.clone(),
337        }
338    }
339
340    fn database_name(&self) -> &str {
341        self.fqn.database()
342    }
343}
344
345/// Extension trait for transformers that also transform cluster names.
346///
347/// This trait allows transformers to modify cluster references in addition to
348/// object names. It's used by the StagingTransformer to rename clusters for
349/// staging environments.
350pub trait ClusterTransformer: NameTransformer {
351    /// Transform a cluster name according to the strategy.
352    fn transform_cluster(&self, cluster_name: &Ident) -> Ident;
353
354    /// Get the original cluster name from a transformed name.
355    ///
356    /// This is used to look up production cluster configurations when creating
357    /// staging clusters.
358    fn get_original_cluster_name(&self, staged_name: &str) -> String;
359}
360
361impl<'a> ClusterTransformer for StagingTransformer<'a> {
362    fn transform_cluster(&self, cluster_name: &Ident) -> Ident {
363        // Transform: quickstart → quickstart_staging
364        let staging_name = format!("{}{}", cluster_name, self.staging_suffix);
365        Ident::new(&staging_name).expect("valid cluster identifier")
366    }
367
368    fn get_original_cluster_name(&self, staged_name: &str) -> String {
369        // Reverse transform: quickstart_staging → quickstart
370        staged_name
371            .strip_suffix(&self.staging_suffix)
372            .unwrap_or(staged_name)
373            .to_string()
374    }
375}
376
377/// Transforms names for the explain command by placing all objects into a
378/// dedicated schema with flattened identifiers.
379///
380/// This strategy rewrites every object reference to live in a single explain
381/// schema using the `"db.schema.obj"` flattened naming convention (same as
382/// [`FlatteningTransformer`]) but qualified with `<database>.<explain_schema>`.
383/// This avoids name collisions between objects from different schemas while
384/// keeping everything in a real (non-temporary) schema that can be dropped
385/// after the explain completes.
386///
387/// Also implements [`ClusterTransformer`] to rewrite all `IN CLUSTER` clauses
388/// to `quickstart`, since explain always runs against that cluster.
389pub struct ExplainTransformer<'a> {
390    fqn: &'a FullyQualifiedName,
391    explain_database: String,
392    explain_schema: String,
393}
394
395impl<'a> ExplainTransformer<'a> {
396    /// Create a new explain transformer.
397    ///
398    /// # Arguments
399    /// * `fqn` - The fully qualified name context for resolving partial names
400    /// * `explain_database` - The database where the explain schema lives
401    /// * `explain_schema` - The dedicated schema name (e.g., `_mz_explain_<uuid>`)
402    pub fn new(
403        fqn: &'a FullyQualifiedName,
404        explain_database: String,
405        explain_schema: String,
406    ) -> Self {
407        Self {
408            fqn,
409            explain_database,
410            explain_schema,
411        }
412    }
413
414    /// Flatten a fully qualified name into a single quoted identifier.
415    fn flatten(&self, parts: &[String; 3]) -> UnresolvedItemName {
416        let flattened = parts.join(".");
417        let flattened_ident = Ident::new(&flattened).expect("valid flattened identifier");
418        let db_ident = Ident::new(&self.explain_database).expect("valid database identifier");
419        let schema_ident = Ident::new(&self.explain_schema).expect("valid schema identifier");
420        UnresolvedItemName(vec![db_ident, schema_ident, flattened_ident])
421    }
422}
423
424impl<'a> NameTransformer for ExplainTransformer<'a> {
425    fn transform_name(&self, name: &UnresolvedItemName) -> UnresolvedItemName {
426        // System catalog references aren't flattened into the explain schema;
427        // the server resolves them natively.
428        if name.0.len() == 2 && is_system_schema(name.0[0].as_str()) {
429            return name.clone();
430        }
431        let parts = match name.0.len() {
432            1 => [
433                self.fqn.database().to_string(),
434                self.fqn.schema().to_string(),
435                name.0[0].to_string(),
436            ],
437            2 => [
438                self.fqn.database().to_string(),
439                name.0[0].to_string(),
440                name.0[1].to_string(),
441            ],
442            3 => [
443                name.0[0].to_string(),
444                name.0[1].to_string(),
445                name.0[2].to_string(),
446            ],
447            _ => return name.clone(),
448        };
449        self.flatten(&parts)
450    }
451
452    fn database_name(&self) -> &str {
453        self.fqn.database()
454    }
455}
456
457impl<'a> ClusterTransformer for ExplainTransformer<'a> {
458    fn transform_cluster(&self, _cluster_name: &Ident) -> Ident {
459        Ident::new("quickstart").expect("valid cluster identifier")
460    }
461
462    fn get_original_cluster_name(&self, staged_name: &str) -> String {
463        staged_name.to_string()
464    }
465}