mz_deploy/project/resolve/normalize/transformers.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Name transformation strategies for SQL AST normalization.
11//!
12//! This module provides different strategies for transforming object names in SQL statements.
13//! Each transformer implements the `NameTransformer` trait, allowing the `NormalizingVisitor`
14//! to apply different transformation strategies using the same traversal logic.
15//!
16//! ## Strategies
17//!
18//! | Transformer | When used | Transform example |
19//! |-------------|-----------|-------------------|
20//! | `FullyQualifyingTransformer` | Typed phase (default normalization) | `sales` → `materialize.public.sales` |
21//! | `FlatteningTransformer` | Type-checking (single-schema container) | `materialize.public.sales` → `"materialize.public.sales"` |
22//! | `StagingTransformer` | Blue/green staging | `materialize.public.sales` → `materialize.public_v1.sales` |
23//! | `ExplainTransformer` | Explain command (dedicated schema) | `materialize.public.sales` → `materialize._mz_explain."materialize.public.sales"` |
24//!
25//! ## StagingTransformer Rules
26//!
27//! The staging transformer appends a suffix to schema and cluster names. It
28//! has special handling for objects that should **not** be transformed:
29//!
30//! - **External dependencies** — objects not defined in the project are
31//! referenced as-is (they exist in production schemas).
32//! - **Non-deployed objects** — when `objects_to_deploy` is set, objects
33//! outside that set are treated as external.
34//! - **Replacement objects** — objects in replacement schemas are deployed
35//! in-place, so references to them are not suffixed.
36//!
37//! **Key Insight:** `transform_own_name` always suffixes, even for replacement
38//! objects. The `is_external` exemption applies only to *references to other
39//! objects*, not to the object's own CREATE statement name.
40
41use crate::project::ir::compiled::FullyQualifiedName;
42use crate::project::ir::object_id::ObjectId;
43use mz_repr::namespaces::is_system_schema;
44use mz_sql_parser::ast::*;
45use std::collections::BTreeMap;
46
47/// Trait for transforming object names in SQL AST nodes.
48///
49/// Implementations of this trait define how names should be transformed
50/// (e.g., fully qualified, flattened, etc.).
51pub trait NameTransformer {
52 /// Transform a name using the implementing strategy.
53 ///
54 /// Takes an `UnresolvedItemName` and returns a transformed version according
55 /// to the strategy. The input may be partially qualified (1, 2, or 3 parts).
56 fn transform_name(&self, name: &UnresolvedItemName) -> UnresolvedItemName;
57
58 /// Transform the object's own name (the name in the CREATE statement).
59 ///
60 /// By default, delegates to `transform_name`. Override to apply different
61 /// rules for the object being created vs. references to other objects.
62 /// For example, the staging transformer always suffixes the object's own
63 /// schema even when it is a replacement object.
64 fn transform_own_name(&self, name: &UnresolvedItemName) -> UnresolvedItemName {
65 self.transform_name(name)
66 }
67
68 /// Get the database name from the transformer's FQN context.
69 fn database_name(&self) -> &str;
70}
71
72/// Transforms names to be fully qualified (`database.schema.object`).
73///
74/// This is the default normalization strategy that ensures all object references
75/// use the 3-part qualified format.
76pub struct FullyQualifyingTransformer<'a> {
77 pub(crate) fqn: &'a FullyQualifiedName,
78 /// Optional mapping from original database names to suffixed names.
79 /// When present, 3-part names with a database in this map get rewritten.
80 pub(crate) database_name_map: Option<&'a BTreeMap<String, String>>,
81}
82
83impl<'a> NameTransformer for FullyQualifyingTransformer<'a> {
84 fn transform_name(&self, name: &UnresolvedItemName) -> UnresolvedItemName {
85 match name.0.len() {
86 1 => {
87 // Unqualified: object only
88 // Convert to database.schema.object
89 let object = name.0[0].clone();
90 let database = Ident::new(self.fqn.database()).expect("valid database identifier");
91 let schema = Ident::new(self.fqn.schema()).expect("valid schema identifier");
92 UnresolvedItemName(vec![database, schema, object])
93 }
94 2 => {
95 // Schema-qualified: schema.object. System catalogs are
96 // database-less, so leave them as 2-part. Otherwise prepend
97 // the file's default database.
98 if is_system_schema(name.0[0].as_str()) {
99 return name.clone();
100 }
101 let schema = name.0[0].clone();
102 let object = name.0[1].clone();
103 let database = Ident::new(self.fqn.database()).expect("valid database identifier");
104 UnresolvedItemName(vec![database, schema, object])
105 }
106 3 => {
107 // Already fully qualified — optionally rewrite database name
108 if let Some(map) = &self.database_name_map {
109 let db_str = name.0[0].to_string();
110 if let Some(new_db) = map.get(&db_str) {
111 let database = Ident::new(new_db).expect("valid identifier");
112 return UnresolvedItemName(vec![
113 database,
114 name.0[1].clone(),
115 name.0[2].clone(),
116 ]);
117 }
118 }
119 name.clone()
120 }
121 _ => {
122 // Invalid - return as-is
123 name.clone()
124 }
125 }
126 }
127
128 fn database_name(&self) -> &str {
129 self.fqn.database()
130 }
131}
132
133/// Transforms names to be flattened (`database_schema_object`).
134///
135/// This strategy creates a single unqualified identifier by concatenating
136/// the database, schema, and object names with underscores. Useful for
137/// temporary objects that need unqualified names.
138pub struct FlatteningTransformer<'a> {
139 pub(crate) fqn: &'a FullyQualifiedName,
140}
141
142impl<'a> NameTransformer for FlatteningTransformer<'a> {
143 fn transform_name(&self, name: &UnresolvedItemName) -> UnresolvedItemName {
144 // System catalog references live outside the flattening container:
145 // the typecheck server resolves them natively. Leave them as-is.
146 if name.0.len() == 2 && is_system_schema(name.0[0].as_str()) {
147 return name.clone();
148 }
149
150 // First, fully qualify the name to ensure we have all parts
151 let fully_qualified = match name.0.len() {
152 1 => {
153 // Unqualified: object only - use FQN context
154 vec![
155 self.fqn.database().to_string(),
156 self.fqn.schema().to_string(),
157 name.0[0].to_string(),
158 ]
159 }
160 2 => {
161 // Schema-qualified: schema.object - use FQN database
162 vec![
163 self.fqn.database().to_string(),
164 name.0[0].to_string(),
165 name.0[1].to_string(),
166 ]
167 }
168 3 => {
169 // Already fully qualified
170 vec![
171 name.0[0].to_string(),
172 name.0[1].to_string(),
173 name.0[2].to_string(),
174 ]
175 }
176 _ => {
177 // Invalid - return as-is
178 return name.clone();
179 }
180 };
181
182 // Flatten to single identifier: "database.schema.object"
183 let flattened = fully_qualified.join(".");
184 let flattened_ident = Ident::new(&flattened).expect("valid flattened identifier");
185 UnresolvedItemName(vec![flattened_ident])
186 }
187
188 fn database_name(&self) -> &str {
189 self.fqn.database()
190 }
191}
192
193/// Transforms names for staging environments by appending a suffix to schema names.
194///
195/// This strategy is used to create isolated staging environments where all objects
196/// are deployed to schema names with a suffix (e.g., `public_staging`), and all
197/// clusters are renamed with the same suffix (e.g., `quickstart_staging`).
198///
199/// External dependencies (objects not defined in the project) are NOT transformed.
200/// Objects not being deployed in this staging run are also treated as external.
201pub struct StagingTransformer<'a> {
202 fqn: &'a FullyQualifiedName,
203 staging_suffix: String,
204 external_dependencies: &'a std::collections::BTreeSet<ObjectId>,
205 objects_to_deploy: Option<&'a std::collections::BTreeSet<ObjectId>>,
206 replacement_objects: &'a std::collections::BTreeSet<ObjectId>,
207}
208
209impl<'a> StagingTransformer<'a> {
210 /// Create a new staging transformer with the given suffix.
211 ///
212 /// # Arguments
213 /// * `fqn` - The fully qualified name context
214 /// * `staging_suffix` - The suffix to append (e.g., "_staging")
215 /// * `external_dependencies` - Set of external dependencies that should NOT be transformed
216 /// * `objects_to_deploy` - Optional set of objects being deployed; objects not in this set are treated as external
217 pub fn new(
218 fqn: &'a FullyQualifiedName,
219 staging_suffix: String,
220 external_dependencies: &'a std::collections::BTreeSet<ObjectId>,
221 objects_to_deploy: Option<&'a std::collections::BTreeSet<ObjectId>>,
222 replacement_objects: &'a std::collections::BTreeSet<ObjectId>,
223 ) -> Self {
224 Self {
225 fqn,
226 staging_suffix,
227 external_dependencies,
228 objects_to_deploy,
229 replacement_objects,
230 }
231 }
232
233 /// Check if a name refers to an external dependency or an object not being deployed
234 pub(crate) fn is_external(&self, name: &UnresolvedItemName) -> bool {
235 use ObjectId;
236
237 if name.0.is_empty() || name.0.len() > 3 {
238 return false; // Invalid name, not external
239 }
240 let object_id = ObjectId::from_item_name(name, self.fqn.database(), self.fqn.schema());
241
242 // Check if it's in the external dependencies
243 if self.external_dependencies.contains(&object_id) {
244 return true;
245 }
246
247 // Replacement objects are deployed in-place (not to staging schemas),
248 // so references to them should never be suffixed.
249 if self.replacement_objects.contains(&object_id) {
250 return true;
251 }
252
253 // If objects_to_deploy is specified, check if this object is NOT in that set
254 // If not being deployed, treat as external
255 if let Some(objects_to_deploy) = self.objects_to_deploy
256 && !objects_to_deploy.contains(&object_id)
257 {
258 return true;
259 }
260
261 false
262 }
263}
264
265impl<'a> NameTransformer for StagingTransformer<'a> {
266 fn transform_name(&self, name: &UnresolvedItemName) -> UnresolvedItemName {
267 // Check if this is an external dependency - if so, don't transform it
268 if self.is_external(name) {
269 return name.clone();
270 }
271
272 match name.0.len() {
273 1 => {
274 // Unqualified: object only
275 // Add staging suffix to schema: database.schema_staging.object
276 let object = name.0[0].clone();
277 let database = Ident::new(self.fqn.database()).expect("valid database identifier");
278 let staging_schema = format!("{}{}", self.fqn.schema(), self.staging_suffix);
279 let schema = Ident::new(&staging_schema).expect("valid schema identifier");
280 UnresolvedItemName(vec![database, schema, object])
281 }
282 2 => {
283 // Schema-qualified: schema.object
284 // Add staging suffix to schema: database.schema_staging.object
285 let schema_name = format!("{}{}", name.0[0], self.staging_suffix);
286 let schema = Ident::new(&schema_name).expect("valid schema identifier");
287 let object = name.0[1].clone();
288 let database = Ident::new(self.fqn.database()).expect("valid database identifier");
289 UnresolvedItemName(vec![database, schema, object])
290 }
291 3 => {
292 // Fully qualified: database.schema.object
293 // Add staging suffix to schema: database.schema_staging.object
294 let database = name.0[0].clone();
295 let schema_name = format!("{}{}", name.0[1], self.staging_suffix);
296 let schema = Ident::new(&schema_name).expect("valid schema identifier");
297 let object = name.0[2].clone();
298 UnresolvedItemName(vec![database, schema, object])
299 }
300 _ => {
301 // Invalid - return as-is
302 name.clone()
303 }
304 }
305 }
306
307 /// Always suffix the object's own name, even for replacement objects.
308 ///
309 /// During staging, the object being created must always go into the staging
310 /// schema (e.g., `core_v1`). The `is_external` check for replacement objects
311 /// only applies to *references* to other objects — not to the object's own
312 /// CREATE statement name.
313 fn transform_own_name(&self, name: &UnresolvedItemName) -> UnresolvedItemName {
314 match name.0.len() {
315 1 => {
316 let object = name.0[0].clone();
317 let database = Ident::new(self.fqn.database()).expect("valid database identifier");
318 let staging_schema = format!("{}{}", self.fqn.schema(), self.staging_suffix);
319 let schema = Ident::new(&staging_schema).expect("valid schema identifier");
320 UnresolvedItemName(vec![database, schema, object])
321 }
322 2 => {
323 let schema_name = format!("{}{}", name.0[0], self.staging_suffix);
324 let schema = Ident::new(&schema_name).expect("valid schema identifier");
325 let object = name.0[1].clone();
326 let database = Ident::new(self.fqn.database()).expect("valid database identifier");
327 UnresolvedItemName(vec![database, schema, object])
328 }
329 3 => {
330 let database = name.0[0].clone();
331 let schema_name = format!("{}{}", name.0[1], self.staging_suffix);
332 let schema = Ident::new(&schema_name).expect("valid schema identifier");
333 let object = name.0[2].clone();
334 UnresolvedItemName(vec![database, schema, object])
335 }
336 _ => name.clone(),
337 }
338 }
339
340 fn database_name(&self) -> &str {
341 self.fqn.database()
342 }
343}
344
345/// Extension trait for transformers that also transform cluster names.
346///
347/// This trait allows transformers to modify cluster references in addition to
348/// object names. It's used by the StagingTransformer to rename clusters for
349/// staging environments.
350pub trait ClusterTransformer: NameTransformer {
351 /// Transform a cluster name according to the strategy.
352 fn transform_cluster(&self, cluster_name: &Ident) -> Ident;
353
354 /// Get the original cluster name from a transformed name.
355 ///
356 /// This is used to look up production cluster configurations when creating
357 /// staging clusters.
358 fn get_original_cluster_name(&self, staged_name: &str) -> String;
359}
360
361impl<'a> ClusterTransformer for StagingTransformer<'a> {
362 fn transform_cluster(&self, cluster_name: &Ident) -> Ident {
363 // Transform: quickstart → quickstart_staging
364 let staging_name = format!("{}{}", cluster_name, self.staging_suffix);
365 Ident::new(&staging_name).expect("valid cluster identifier")
366 }
367
368 fn get_original_cluster_name(&self, staged_name: &str) -> String {
369 // Reverse transform: quickstart_staging → quickstart
370 staged_name
371 .strip_suffix(&self.staging_suffix)
372 .unwrap_or(staged_name)
373 .to_string()
374 }
375}
376
377/// Transforms names for the explain command by placing all objects into a
378/// dedicated schema with flattened identifiers.
379///
380/// This strategy rewrites every object reference to live in a single explain
381/// schema using the `"db.schema.obj"` flattened naming convention (same as
382/// [`FlatteningTransformer`]) but qualified with `<database>.<explain_schema>`.
383/// This avoids name collisions between objects from different schemas while
384/// keeping everything in a real (non-temporary) schema that can be dropped
385/// after the explain completes.
386///
387/// Also implements [`ClusterTransformer`] to rewrite all `IN CLUSTER` clauses
388/// to `quickstart`, since explain always runs against that cluster.
389pub struct ExplainTransformer<'a> {
390 fqn: &'a FullyQualifiedName,
391 explain_database: String,
392 explain_schema: String,
393}
394
395impl<'a> ExplainTransformer<'a> {
396 /// Create a new explain transformer.
397 ///
398 /// # Arguments
399 /// * `fqn` - The fully qualified name context for resolving partial names
400 /// * `explain_database` - The database where the explain schema lives
401 /// * `explain_schema` - The dedicated schema name (e.g., `_mz_explain_<uuid>`)
402 pub fn new(
403 fqn: &'a FullyQualifiedName,
404 explain_database: String,
405 explain_schema: String,
406 ) -> Self {
407 Self {
408 fqn,
409 explain_database,
410 explain_schema,
411 }
412 }
413
414 /// Flatten a fully qualified name into a single quoted identifier.
415 fn flatten(&self, parts: &[String; 3]) -> UnresolvedItemName {
416 let flattened = parts.join(".");
417 let flattened_ident = Ident::new(&flattened).expect("valid flattened identifier");
418 let db_ident = Ident::new(&self.explain_database).expect("valid database identifier");
419 let schema_ident = Ident::new(&self.explain_schema).expect("valid schema identifier");
420 UnresolvedItemName(vec![db_ident, schema_ident, flattened_ident])
421 }
422}
423
424impl<'a> NameTransformer for ExplainTransformer<'a> {
425 fn transform_name(&self, name: &UnresolvedItemName) -> UnresolvedItemName {
426 // System catalog references aren't flattened into the explain schema;
427 // the server resolves them natively.
428 if name.0.len() == 2 && is_system_schema(name.0[0].as_str()) {
429 return name.clone();
430 }
431 let parts = match name.0.len() {
432 1 => [
433 self.fqn.database().to_string(),
434 self.fqn.schema().to_string(),
435 name.0[0].to_string(),
436 ],
437 2 => [
438 self.fqn.database().to_string(),
439 name.0[0].to_string(),
440 name.0[1].to_string(),
441 ],
442 3 => [
443 name.0[0].to_string(),
444 name.0[1].to_string(),
445 name.0[2].to_string(),
446 ],
447 _ => return name.clone(),
448 };
449 self.flatten(&parts)
450 }
451
452 fn database_name(&self) -> &str {
453 self.fqn.database()
454 }
455}
456
457impl<'a> ClusterTransformer for ExplainTransformer<'a> {
458 fn transform_cluster(&self, _cluster_name: &Ident) -> Ident {
459 Ident::new("quickstart").expect("valid cluster identifier")
460 }
461
462 fn get_original_cluster_name(&self, staged_name: &str) -> String {
463 staged_name.to_string()
464 }
465}