Skip to main content

mz_deploy/project/compiler/
object_validation.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Object validation and compiled-project assembly.
11//!
12//! This module turns source-owned objects into validated compiled objects and
13//! assembles the full compiled project from those validated results.
14//!
15//! Validation is defined per logical object:
16//!
17//! - classify statements and require exactly one primary create statement
18//! - validate names against the source-owned object key
19//! - normalize identifiers and dependencies into canonical qualified form
20//! - validate clusters, references, comments, and grants
21//! - resolve active profile variants and reject incompatible overrides
22//!
23//! Project assembly validates database and schema setup statements on every
24//! invocation, groups validated objects by `(database, schema)`, and enforces
25//! schema-wide invariants before producing a compiled project.
26//!
27//! ## Per-Object Validation (in `validate_single_variant`)
28//!
29//! 1. **Statement classification** — exactly one main CREATE statement per file
30//! 2. **Name validation** — object name matches file stem, FQN matches path
31//! 3. **Identifier format** — lowercase, valid characters
32//! 4. **Name normalization** — fully qualify all references via `NormalizingVisitor`
33//! 5. **Cluster validation** — MVs, sinks, sources, indexes have required `IN CLUSTER`
34//! 6. **Reference validation** — indexes, grants, comments reference the parent object
35//!
36//! ## Profile Variant Handling
37//!
38//! Objects may have multiple file variants (e.g., `conn.sql` and
39//! `conn#staging.sql`). All variants are classified for type consistency,
40//! then only the active variant (matching profile or default) is fully
41//! validated. Views and materialized views do not allow profile overrides.
42
43mod clusters;
44mod identifiers;
45mod references;
46mod schema_constraints;
47
48use clusters::{
49    validate_index_clusters, validate_indexes_supported, validate_mv_cluster,
50    validate_sink_cluster, validate_source_cluster,
51};
52use identifiers::{validate_fqn_identifiers, validate_ident};
53use references::{
54    validate_comment_references, validate_grant_references, validate_index_references,
55};
56use schema_constraints::validate_no_storage_and_computation_in_schema;
57
58use super::super::ast::Statement;
59use crate::project::SchemaQualifier;
60use crate::project::error::{ValidationError, ValidationErrorKind, ValidationErrors};
61use crate::project::ir::compiled::{Database, DatabaseObject, FullyQualifiedName, Project, Schema};
62use crate::project::resolve::normalize::NormalizingVisitor;
63use crate::project::syntax::input;
64use crate::project::syntax::parser::{LocatedStatement, statement_type_name};
65use mz_sql_parser::ast::*;
66use std::collections::{BTreeMap, BTreeSet};
67use std::path::PathBuf;
68
69/// Classify statements in a single variant and determine its object type.
70/// Returns `(ObjectType, path)` on success, or errors.
71fn classify_variant_object_type(
72    name: &str,
73    path: &std::path::Path,
74    statements: &[&mz_sql_parser::ast::Statement<Raw>],
75) -> Result<ObjectType, Vec<ValidationError>> {
76    let mut errors = Vec::new();
77    let mut object_type: Option<ObjectType> = None;
78    let mut main_count = 0usize;
79
80    for stmt in statements {
81        let stmt_type = match stmt {
82            mz_sql_parser::ast::Statement::CreateSink(_) => Some(ObjectType::Sink),
83            mz_sql_parser::ast::Statement::CreateView(_) => Some(ObjectType::View),
84            mz_sql_parser::ast::Statement::CreateMaterializedView(_) => {
85                Some(ObjectType::MaterializedView)
86            }
87            mz_sql_parser::ast::Statement::CreateTable(_) => Some(ObjectType::Table),
88            mz_sql_parser::ast::Statement::CreateTableFromSource(_) => Some(ObjectType::Table),
89            mz_sql_parser::ast::Statement::CreateSource(_) => Some(ObjectType::Source),
90            mz_sql_parser::ast::Statement::CreateSecret(_) => Some(ObjectType::Secret),
91            mz_sql_parser::ast::Statement::CreateConnection(_) => Some(ObjectType::Connection),
92            mz_sql_parser::ast::Statement::CreateIndex(_)
93            | mz_sql_parser::ast::Statement::GrantPrivileges(_)
94            | mz_sql_parser::ast::Statement::Comment(_)
95            | mz_sql_parser::ast::Statement::ExecuteUnitTest(_) => None,
96            other => {
97                errors.push(ValidationError::with_file(
98                    ValidationErrorKind::UnsupportedStatement {
99                        object_name: name.to_string(),
100                        statement_type: statement_type_name(other).to_string(),
101                    },
102                    path.to_path_buf(),
103                ));
104                None
105            }
106        };
107
108        if let Some(t) = stmt_type {
109            main_count += 1;
110            if main_count > 1 {
111                errors.push(ValidationError::with_file(
112                    ValidationErrorKind::MultipleMainStatements {
113                        object_name: name.to_string(),
114                    },
115                    path.to_path_buf(),
116                ));
117            } else {
118                object_type = Some(t);
119            }
120        }
121    }
122
123    if object_type.is_none() {
124        errors.push(ValidationError::with_file(
125            ValidationErrorKind::NoMainStatement {
126                object_name: name.to_string(),
127            },
128            path.to_path_buf(),
129        ));
130        errors.push(ValidationError::with_file(
131            ValidationErrorKind::NoObjectType,
132            path.to_path_buf(),
133        ));
134    }
135
136    if !errors.is_empty() {
137        Err(errors)
138    } else {
139        Ok(object_type.unwrap())
140    }
141}
142
143/// A human-readable name for an ObjectType.
144fn object_type_name(t: ObjectType) -> &'static str {
145    match t {
146        ObjectType::View => "view",
147        ObjectType::MaterializedView => "materialized view",
148        ObjectType::Table => "table",
149        ObjectType::Source => "source",
150        ObjectType::Sink => "sink",
151        ObjectType::Secret => "secret",
152        ObjectType::Connection => "connection",
153        _ => "unknown",
154    }
155}
156
157/// Validate a single variant's statements fully and produce a compiled object.
158///
159/// This runs all existing validation (classify statements, check name/fqn, indexes,
160/// grants, comments, clusters, etc.). Each statement carries its byte offset from
161/// the source file so that validation errors can point to the exact location.
162fn validate_single_variant(
163    name: &str,
164    database: &str,
165    schema: &str,
166    path: &std::path::Path,
167    located_statements: Vec<LocatedStatement>,
168) -> Result<DatabaseObject, ValidationErrors> {
169    let mut errors = Vec::new();
170    let mut main_stmt: Option<(Statement, usize)> = None;
171    let mut object_type: Option<ObjectType> = None;
172    let mut indexes = Vec::new();
173    let mut index_offsets = Vec::new();
174    let mut grants = Vec::new();
175    let mut grant_offsets = Vec::new();
176    let mut comments = Vec::new();
177    let mut comment_offsets = Vec::new();
178    let mut tests = Vec::new();
179
180    for LocatedStatement {
181        ast: stmt,
182        byte_offset,
183    } in located_statements
184    {
185        match stmt {
186            // Main CREATE statements
187            mz_sql_parser::ast::Statement::CreateSink(s) => {
188                if main_stmt.is_some() {
189                    errors.push(ValidationError::with_file_and_offset(
190                        ValidationErrorKind::MultipleMainStatements {
191                            object_name: name.to_string(),
192                        },
193                        path.to_path_buf(),
194                        byte_offset,
195                    ));
196                } else {
197                    main_stmt = Some((Statement::CreateSink(s), byte_offset));
198                    object_type = Some(ObjectType::Sink);
199                }
200            }
201            mz_sql_parser::ast::Statement::CreateView(s) => {
202                if main_stmt.is_some() {
203                    errors.push(ValidationError::with_file_and_offset(
204                        ValidationErrorKind::MultipleMainStatements {
205                            object_name: name.to_string(),
206                        },
207                        path.to_path_buf(),
208                        byte_offset,
209                    ));
210                } else {
211                    main_stmt = Some((Statement::CreateView(s), byte_offset));
212                    object_type = Some(ObjectType::View);
213                }
214            }
215            mz_sql_parser::ast::Statement::CreateMaterializedView(s) => {
216                if main_stmt.is_some() {
217                    errors.push(ValidationError::with_file_and_offset(
218                        ValidationErrorKind::MultipleMainStatements {
219                            object_name: name.to_string(),
220                        },
221                        path.to_path_buf(),
222                        byte_offset,
223                    ));
224                } else {
225                    main_stmt = Some((Statement::CreateMaterializedView(s), byte_offset));
226                    object_type = Some(ObjectType::MaterializedView);
227                }
228            }
229            mz_sql_parser::ast::Statement::CreateTable(s) => {
230                if main_stmt.is_some() {
231                    errors.push(ValidationError::with_file_and_offset(
232                        ValidationErrorKind::MultipleMainStatements {
233                            object_name: name.to_string(),
234                        },
235                        path.to_path_buf(),
236                        byte_offset,
237                    ));
238                } else {
239                    main_stmt = Some((Statement::CreateTable(s), byte_offset));
240                    object_type = Some(ObjectType::Table);
241                }
242            }
243            mz_sql_parser::ast::Statement::CreateTableFromSource(s) => {
244                if main_stmt.is_some() {
245                    errors.push(ValidationError::with_file_and_offset(
246                        ValidationErrorKind::MultipleMainStatements {
247                            object_name: name.to_string(),
248                        },
249                        path.to_path_buf(),
250                        byte_offset,
251                    ));
252                } else {
253                    main_stmt = Some((Statement::CreateTableFromSource(s), byte_offset));
254                    object_type = Some(ObjectType::Table);
255                }
256            }
257            mz_sql_parser::ast::Statement::CreateSource(s) => {
258                if main_stmt.is_some() {
259                    errors.push(ValidationError::with_file_and_offset(
260                        ValidationErrorKind::MultipleMainStatements {
261                            object_name: name.to_string(),
262                        },
263                        path.to_path_buf(),
264                        byte_offset,
265                    ));
266                } else {
267                    main_stmt = Some((Statement::CreateSource(s), byte_offset));
268                    object_type = Some(ObjectType::Source);
269                }
270            }
271            mz_sql_parser::ast::Statement::CreateSecret(s) => {
272                if main_stmt.is_some() {
273                    errors.push(ValidationError::with_file_and_offset(
274                        ValidationErrorKind::MultipleMainStatements {
275                            object_name: name.to_string(),
276                        },
277                        path.to_path_buf(),
278                        byte_offset,
279                    ));
280                } else {
281                    main_stmt = Some((Statement::CreateSecret(s), byte_offset));
282                    object_type = Some(ObjectType::Secret);
283                }
284            }
285            mz_sql_parser::ast::Statement::CreateConnection(s) => {
286                if main_stmt.is_some() {
287                    errors.push(ValidationError::with_file_and_offset(
288                        ValidationErrorKind::MultipleMainStatements {
289                            object_name: name.to_string(),
290                        },
291                        path.to_path_buf(),
292                        byte_offset,
293                    ));
294                } else {
295                    main_stmt = Some((Statement::CreateConnection(s), byte_offset));
296                    object_type = Some(ObjectType::Connection);
297                }
298            }
299
300            // Supporting statements — track byte offsets in parallel vectors
301            mz_sql_parser::ast::Statement::CreateIndex(s) => {
302                index_offsets.push(byte_offset);
303                indexes.push(s);
304            }
305            mz_sql_parser::ast::Statement::GrantPrivileges(s) => {
306                grant_offsets.push(byte_offset);
307                grants.push(s);
308            }
309            mz_sql_parser::ast::Statement::Comment(s) => {
310                comment_offsets.push(byte_offset);
311                comments.push(s);
312            }
313
314            // Test statements are collected for later execution
315            mz_sql_parser::ast::Statement::ExecuteUnitTest(s) => {
316                tests.push(s);
317            }
318
319            // Unsupported statements
320            other => {
321                errors.push(ValidationError::with_file_and_offset(
322                    ValidationErrorKind::UnsupportedStatement {
323                        object_name: name.to_string(),
324                        statement_type: statement_type_name(&other).to_string(),
325                    },
326                    path.to_path_buf(),
327                    byte_offset,
328                ));
329            }
330        }
331    }
332
333    // Check for main statement (file-level — no single statement to point at)
334    if main_stmt.is_none() {
335        errors.push(ValidationError::with_file(
336            ValidationErrorKind::NoMainStatement {
337                object_name: name.to_string(),
338            },
339            path.to_path_buf(),
340        ));
341    }
342
343    if object_type.is_none() {
344        errors.push(ValidationError::with_file(
345            ValidationErrorKind::NoObjectType,
346            path.to_path_buf(),
347        ));
348    }
349
350    // If we have fatal errors (no main statement or object type), return early
351    if !errors.is_empty() && (main_stmt.is_none() || object_type.is_none()) {
352        return Err(ValidationErrors::new(errors));
353    }
354
355    // Unwrap is safe here because we checked above
356    let (stmt, main_offset) = main_stmt.unwrap();
357    let obj_type = object_type.unwrap();
358
359    let fqn = match FullyQualifiedName::with_names(path, name, database, schema) {
360        Ok(fqn) => fqn,
361        Err(e) => {
362            errors.push(e);
363            return Err(ValidationErrors::new(errors));
364        }
365    };
366
367    // Get identifier from original statement before normalization
368    let main_ident = stmt.ident();
369
370    // Validate the original statement identifier against FQN
371    validate_ident(&stmt, &fqn, main_offset, &mut errors);
372
373    // Validate identifier format (lowercase, valid characters)
374    validate_fqn_identifiers(&fqn, main_offset, &mut errors);
375
376    // Normalize statement name and dependencies
377    let stmt = stmt.normalize_stmt(&fqn);
378
379    // Normalize index, grant, and comment references to be fully qualified
380    let visitor = NormalizingVisitor::fully_qualifying(&fqn);
381    visitor.normalize_index_references(&mut indexes);
382    visitor.normalize_grant_references(&mut grants);
383    visitor.normalize_comment_references(&mut comments);
384
385    // Validate cluster requirements
386    validate_index_clusters(&fqn, &indexes, &index_offsets, &mut errors);
387    validate_indexes_supported(&fqn, &stmt, &indexes, &index_offsets, &mut errors);
388    validate_mv_cluster(&fqn, &stmt, main_offset, &mut errors);
389    validate_sink_cluster(&fqn, &stmt, main_offset, &mut errors);
390    validate_source_cluster(&fqn, &stmt, main_offset, &mut errors);
391
392    validate_index_references(&fqn, &indexes, &index_offsets, &main_ident, &mut errors);
393    validate_grant_references(
394        &fqn,
395        &grants,
396        &grant_offsets,
397        &main_ident,
398        obj_type,
399        &mut errors,
400    );
401    validate_comment_references(
402        &fqn,
403        &comments,
404        &comment_offsets,
405        &main_ident,
406        &obj_type,
407        &mut errors,
408    );
409
410    if !errors.is_empty() {
411        return Err(ValidationErrors::new(errors));
412    }
413
414    Ok(DatabaseObject {
415        path: path.to_path_buf(),
416        stmt,
417        indexes,
418        grants,
419        comments,
420        tests,
421    })
422}
423
424impl DatabaseObject {
425    /// Validate all variants of a source-owned database object, check cross-variant consistency,
426    /// and resolve the active variant for the given profile.
427    pub fn validate(
428        value: input::DatabaseObject,
429        profile: &str,
430    ) -> Result<Option<Self>, ValidationErrors> {
431        let mut errors = Vec::new();
432
433        // Step 1: Classify all variants to determine their object types
434        let mut variant_types: Vec<(ObjectType, &input::ObjectVariant)> = Vec::new();
435        for variant in &value.variants {
436            let stmts: Vec<_> = variant.statements.iter().map(|ls| &ls.ast).collect();
437            match classify_variant_object_type(&value.name, &variant.path, &stmts) {
438                Ok(obj_type) => variant_types.push((obj_type, variant)),
439                Err(errs) => errors.extend(errs),
440            }
441        }
442
443        // If we couldn't classify any variant, return errors early
444        if variant_types.is_empty() && !errors.is_empty() {
445            return Err(ValidationErrors::new(errors));
446        }
447
448        // Step 2: Check type consistency across variants
449        // Find the default variant's type as the reference type
450        let reference_type = variant_types
451            .iter()
452            .find(|(_, v)| v.profile.is_none())
453            .or_else(|| variant_types.first())
454            .map(|(t, _)| *t);
455
456        if let Some(ref_type) = reference_type {
457            let ref_variant = variant_types
458                .iter()
459                .find(|(_, v)| v.profile.is_none())
460                .or_else(|| variant_types.first())
461                .map(|(_, v)| *v)
462                .unwrap();
463
464            for (obj_type, variant) in &variant_types {
465                if *obj_type != ref_type {
466                    errors.push(ValidationError::with_file(
467                        ValidationErrorKind::ProfileObjectTypeMismatch {
468                            object_name: value.name.clone(),
469                            default_type: object_type_name(ref_type).to_string(),
470                            override_profile: variant
471                                .profile
472                                .clone()
473                                .unwrap_or_else(|| "default".to_string()),
474                            override_type: object_type_name(*obj_type).to_string(),
475                            default_path: ref_variant.path.clone(),
476                            override_path: variant.path.clone(),
477                        },
478                        variant.path.clone(),
479                    ));
480                }
481            }
482
483            // Step 3: Check view/MV restriction
484            let has_overrides = value.variants.iter().any(|v| v.profile.is_some());
485            if has_overrides
486                && (ref_type == ObjectType::View || ref_type == ObjectType::MaterializedView)
487            {
488                for variant in &value.variants {
489                    if let Some(ref prof) = variant.profile {
490                        errors.push(ValidationError::with_file(
491                            ValidationErrorKind::ProfileOverrideNotAllowed {
492                                object_name: value.name.clone(),
493                                object_type: object_type_name(ref_type).to_string(),
494                                override_profile: prof.clone(),
495                                override_path: variant.path.clone(),
496                            },
497                            variant.path.clone(),
498                        ));
499                    }
500                }
501            }
502        }
503
504        if !errors.is_empty() {
505            return Err(ValidationErrors::new(errors));
506        }
507
508        // Step 4: Resolve active variant — pick profile match or fall back to default.
509        // If no matching profile variant and no default variant exist, skip this object
510        // (it belongs to a different profile).
511        let active_variant = value
512            .variants
513            .iter()
514            .find(|v| v.profile.as_deref() == Some(profile))
515            .or_else(|| value.variants.iter().find(|v| v.profile.is_none()));
516
517        let active_variant = match active_variant {
518            Some(v) => v,
519            None => {
520                // No variant matches the active profile and no default exists.
521                // This object is defined only for other profiles — skip it.
522                return Ok(None);
523            }
524        };
525
526        // Step 5: Fully validate the active variant
527        validate_single_variant(
528            &value.name,
529            &value.database,
530            &value.schema,
531            &active_variant.path,
532            active_variant.statements.clone(),
533        )
534        .map(Some)
535    }
536}
537
538#[derive(Debug, Clone)]
539pub(crate) struct SchemaBuildMeta {
540    pub name: String,
541    pub mod_statements: Option<Vec<mz_sql_parser::ast::Statement<Raw>>>,
542}
543
544#[derive(Debug, Clone)]
545pub(crate) struct DatabaseBuildMeta {
546    pub name: String,
547    pub mod_statements: Option<Vec<mz_sql_parser::ast::Statement<Raw>>>,
548    pub schemas: Vec<SchemaBuildMeta>,
549}
550
551pub(crate) fn assemble_project(
552    db_metas: Vec<DatabaseBuildMeta>,
553    validated_objects: Vec<(String, String, DatabaseObject)>,
554) -> Result<Project, ValidationErrors> {
555    let mut all_errors = Vec::new();
556
557    let mut objects_by_location: BTreeMap<(String, String), Vec<DatabaseObject>> = BTreeMap::new();
558    for (db_name, schema_name, object) in validated_objects {
559        objects_by_location
560            .entry((db_name, schema_name))
561            .or_default()
562            .push(object);
563    }
564
565    let mut databases = Vec::new();
566
567    for meta in db_metas {
568        let mut schemas = Vec::new();
569
570        for schema_meta in &meta.schemas {
571            let objects = objects_by_location
572                .remove(&(meta.name.clone(), schema_meta.name.clone()))
573                .unwrap_or_default();
574
575            validate_no_storage_and_computation_in_schema(
576                &schema_meta.name,
577                &objects,
578                &mut all_errors,
579            );
580
581            schemas.push(Schema {
582                name: schema_meta.name.clone(),
583                objects,
584                mod_statements: schema_meta.mod_statements.clone(),
585            });
586        }
587
588        databases.push(Database {
589            name: meta.name,
590            schemas,
591            mod_statements: meta.mod_statements,
592        });
593    }
594
595    let replacement_schemas = derive_replacement_schemas(&databases);
596    validate_replacement_schemas(&replacement_schemas, &databases, &mut all_errors);
597
598    if !all_errors.is_empty() {
599        return Err(ValidationErrors::new(all_errors));
600    }
601
602    Ok(Project {
603        databases,
604        replacement_schemas,
605    })
606}
607
608/// Scan all schemas for `SET api = stable` statements and build
609/// the set of replacement schemas.
610fn derive_replacement_schemas(databases: &[Database]) -> BTreeSet<SchemaQualifier> {
611    let mut replacement_schemas = BTreeSet::new();
612    for db in databases {
613        for schema in &db.schemas {
614            if let Some(mod_stmts) = &schema.mod_statements {
615                for stmt in mod_stmts {
616                    if matches!(stmt, mz_sql_parser::ast::Statement::SetVariable(s) if s.variable.as_str().eq_ignore_ascii_case("api"))
617                    {
618                        replacement_schemas
619                            .insert(SchemaQualifier::new(db.name.clone(), schema.name.clone()));
620                        break;
621                    }
622                }
623            }
624        }
625    }
626    replacement_schemas
627}
628
629/// Validate replacement schemas derived from `SET api = stable` statements.
630///
631/// Ensures replacement schemas only contain materialized views.
632fn validate_replacement_schemas(
633    replacement_schemas: &BTreeSet<SchemaQualifier>,
634    databases: &[Database],
635    errors: &mut Vec<ValidationError>,
636) {
637    if replacement_schemas.is_empty() {
638        return;
639    }
640
641    for db in databases {
642        for schema in &db.schemas {
643            if !replacement_schemas
644                .iter()
645                .any(|sq| sq.database == db.name && sq.schema == schema.name)
646            {
647                continue;
648            }
649
650            for obj in &schema.objects {
651                if !matches!(obj.stmt, Statement::CreateMaterializedView(_)) {
652                    errors.push(ValidationError::with_file(
653                        ValidationErrorKind::ReplacementSchemaNonMvObject {
654                            database: db.name.clone(),
655                            schema: schema.name.clone(),
656                            object_name: obj.stmt.ident().object.to_string(),
657                            object_type: obj.stmt.kind(),
658                        },
659                        PathBuf::from(format!("{}/{}.sql", db.name, schema.name)),
660                    ));
661                }
662            }
663        }
664    }
665}