1mod clusters;
44mod identifiers;
45mod references;
46mod schema_constraints;
47
48use clusters::{
49 validate_index_clusters, validate_indexes_supported, validate_mv_cluster,
50 validate_sink_cluster, validate_source_cluster,
51};
52use identifiers::{validate_fqn_identifiers, validate_ident};
53use references::{
54 validate_comment_references, validate_grant_references, validate_index_references,
55};
56use schema_constraints::validate_no_storage_and_computation_in_schema;
57
58use super::super::ast::Statement;
59use crate::project::SchemaQualifier;
60use crate::project::error::{ValidationError, ValidationErrorKind, ValidationErrors};
61use crate::project::ir::compiled::{Database, DatabaseObject, FullyQualifiedName, Project, Schema};
62use crate::project::resolve::normalize::NormalizingVisitor;
63use crate::project::syntax::input;
64use crate::project::syntax::parser::{LocatedStatement, statement_type_name};
65use mz_sql_parser::ast::*;
66use std::collections::{BTreeMap, BTreeSet};
67use std::path::PathBuf;
68
69fn classify_variant_object_type(
72 name: &str,
73 path: &std::path::Path,
74 statements: &[&mz_sql_parser::ast::Statement<Raw>],
75) -> Result<ObjectType, Vec<ValidationError>> {
76 let mut errors = Vec::new();
77 let mut object_type: Option<ObjectType> = None;
78 let mut main_count = 0usize;
79
80 for stmt in statements {
81 let stmt_type = match stmt {
82 mz_sql_parser::ast::Statement::CreateSink(_) => Some(ObjectType::Sink),
83 mz_sql_parser::ast::Statement::CreateView(_) => Some(ObjectType::View),
84 mz_sql_parser::ast::Statement::CreateMaterializedView(_) => {
85 Some(ObjectType::MaterializedView)
86 }
87 mz_sql_parser::ast::Statement::CreateTable(_) => Some(ObjectType::Table),
88 mz_sql_parser::ast::Statement::CreateTableFromSource(_) => Some(ObjectType::Table),
89 mz_sql_parser::ast::Statement::CreateSource(_) => Some(ObjectType::Source),
90 mz_sql_parser::ast::Statement::CreateSecret(_) => Some(ObjectType::Secret),
91 mz_sql_parser::ast::Statement::CreateConnection(_) => Some(ObjectType::Connection),
92 mz_sql_parser::ast::Statement::CreateIndex(_)
93 | mz_sql_parser::ast::Statement::GrantPrivileges(_)
94 | mz_sql_parser::ast::Statement::Comment(_)
95 | mz_sql_parser::ast::Statement::ExecuteUnitTest(_) => None,
96 other => {
97 errors.push(ValidationError::with_file(
98 ValidationErrorKind::UnsupportedStatement {
99 object_name: name.to_string(),
100 statement_type: statement_type_name(other).to_string(),
101 },
102 path.to_path_buf(),
103 ));
104 None
105 }
106 };
107
108 if let Some(t) = stmt_type {
109 main_count += 1;
110 if main_count > 1 {
111 errors.push(ValidationError::with_file(
112 ValidationErrorKind::MultipleMainStatements {
113 object_name: name.to_string(),
114 },
115 path.to_path_buf(),
116 ));
117 } else {
118 object_type = Some(t);
119 }
120 }
121 }
122
123 if object_type.is_none() {
124 errors.push(ValidationError::with_file(
125 ValidationErrorKind::NoMainStatement {
126 object_name: name.to_string(),
127 },
128 path.to_path_buf(),
129 ));
130 errors.push(ValidationError::with_file(
131 ValidationErrorKind::NoObjectType,
132 path.to_path_buf(),
133 ));
134 }
135
136 if !errors.is_empty() {
137 Err(errors)
138 } else {
139 Ok(object_type.unwrap())
140 }
141}
142
143fn object_type_name(t: ObjectType) -> &'static str {
145 match t {
146 ObjectType::View => "view",
147 ObjectType::MaterializedView => "materialized view",
148 ObjectType::Table => "table",
149 ObjectType::Source => "source",
150 ObjectType::Sink => "sink",
151 ObjectType::Secret => "secret",
152 ObjectType::Connection => "connection",
153 _ => "unknown",
154 }
155}
156
157fn validate_single_variant(
163 name: &str,
164 database: &str,
165 schema: &str,
166 path: &std::path::Path,
167 located_statements: Vec<LocatedStatement>,
168) -> Result<DatabaseObject, ValidationErrors> {
169 let mut errors = Vec::new();
170 let mut main_stmt: Option<(Statement, usize)> = None;
171 let mut object_type: Option<ObjectType> = None;
172 let mut indexes = Vec::new();
173 let mut index_offsets = Vec::new();
174 let mut grants = Vec::new();
175 let mut grant_offsets = Vec::new();
176 let mut comments = Vec::new();
177 let mut comment_offsets = Vec::new();
178 let mut tests = Vec::new();
179
180 for LocatedStatement {
181 ast: stmt,
182 byte_offset,
183 } in located_statements
184 {
185 match stmt {
186 mz_sql_parser::ast::Statement::CreateSink(s) => {
188 if main_stmt.is_some() {
189 errors.push(ValidationError::with_file_and_offset(
190 ValidationErrorKind::MultipleMainStatements {
191 object_name: name.to_string(),
192 },
193 path.to_path_buf(),
194 byte_offset,
195 ));
196 } else {
197 main_stmt = Some((Statement::CreateSink(s), byte_offset));
198 object_type = Some(ObjectType::Sink);
199 }
200 }
201 mz_sql_parser::ast::Statement::CreateView(s) => {
202 if main_stmt.is_some() {
203 errors.push(ValidationError::with_file_and_offset(
204 ValidationErrorKind::MultipleMainStatements {
205 object_name: name.to_string(),
206 },
207 path.to_path_buf(),
208 byte_offset,
209 ));
210 } else {
211 main_stmt = Some((Statement::CreateView(s), byte_offset));
212 object_type = Some(ObjectType::View);
213 }
214 }
215 mz_sql_parser::ast::Statement::CreateMaterializedView(s) => {
216 if main_stmt.is_some() {
217 errors.push(ValidationError::with_file_and_offset(
218 ValidationErrorKind::MultipleMainStatements {
219 object_name: name.to_string(),
220 },
221 path.to_path_buf(),
222 byte_offset,
223 ));
224 } else {
225 main_stmt = Some((Statement::CreateMaterializedView(s), byte_offset));
226 object_type = Some(ObjectType::MaterializedView);
227 }
228 }
229 mz_sql_parser::ast::Statement::CreateTable(s) => {
230 if main_stmt.is_some() {
231 errors.push(ValidationError::with_file_and_offset(
232 ValidationErrorKind::MultipleMainStatements {
233 object_name: name.to_string(),
234 },
235 path.to_path_buf(),
236 byte_offset,
237 ));
238 } else {
239 main_stmt = Some((Statement::CreateTable(s), byte_offset));
240 object_type = Some(ObjectType::Table);
241 }
242 }
243 mz_sql_parser::ast::Statement::CreateTableFromSource(s) => {
244 if main_stmt.is_some() {
245 errors.push(ValidationError::with_file_and_offset(
246 ValidationErrorKind::MultipleMainStatements {
247 object_name: name.to_string(),
248 },
249 path.to_path_buf(),
250 byte_offset,
251 ));
252 } else {
253 main_stmt = Some((Statement::CreateTableFromSource(s), byte_offset));
254 object_type = Some(ObjectType::Table);
255 }
256 }
257 mz_sql_parser::ast::Statement::CreateSource(s) => {
258 if main_stmt.is_some() {
259 errors.push(ValidationError::with_file_and_offset(
260 ValidationErrorKind::MultipleMainStatements {
261 object_name: name.to_string(),
262 },
263 path.to_path_buf(),
264 byte_offset,
265 ));
266 } else {
267 main_stmt = Some((Statement::CreateSource(s), byte_offset));
268 object_type = Some(ObjectType::Source);
269 }
270 }
271 mz_sql_parser::ast::Statement::CreateSecret(s) => {
272 if main_stmt.is_some() {
273 errors.push(ValidationError::with_file_and_offset(
274 ValidationErrorKind::MultipleMainStatements {
275 object_name: name.to_string(),
276 },
277 path.to_path_buf(),
278 byte_offset,
279 ));
280 } else {
281 main_stmt = Some((Statement::CreateSecret(s), byte_offset));
282 object_type = Some(ObjectType::Secret);
283 }
284 }
285 mz_sql_parser::ast::Statement::CreateConnection(s) => {
286 if main_stmt.is_some() {
287 errors.push(ValidationError::with_file_and_offset(
288 ValidationErrorKind::MultipleMainStatements {
289 object_name: name.to_string(),
290 },
291 path.to_path_buf(),
292 byte_offset,
293 ));
294 } else {
295 main_stmt = Some((Statement::CreateConnection(s), byte_offset));
296 object_type = Some(ObjectType::Connection);
297 }
298 }
299
300 mz_sql_parser::ast::Statement::CreateIndex(s) => {
302 index_offsets.push(byte_offset);
303 indexes.push(s);
304 }
305 mz_sql_parser::ast::Statement::GrantPrivileges(s) => {
306 grant_offsets.push(byte_offset);
307 grants.push(s);
308 }
309 mz_sql_parser::ast::Statement::Comment(s) => {
310 comment_offsets.push(byte_offset);
311 comments.push(s);
312 }
313
314 mz_sql_parser::ast::Statement::ExecuteUnitTest(s) => {
316 tests.push(s);
317 }
318
319 other => {
321 errors.push(ValidationError::with_file_and_offset(
322 ValidationErrorKind::UnsupportedStatement {
323 object_name: name.to_string(),
324 statement_type: statement_type_name(&other).to_string(),
325 },
326 path.to_path_buf(),
327 byte_offset,
328 ));
329 }
330 }
331 }
332
333 if main_stmt.is_none() {
335 errors.push(ValidationError::with_file(
336 ValidationErrorKind::NoMainStatement {
337 object_name: name.to_string(),
338 },
339 path.to_path_buf(),
340 ));
341 }
342
343 if object_type.is_none() {
344 errors.push(ValidationError::with_file(
345 ValidationErrorKind::NoObjectType,
346 path.to_path_buf(),
347 ));
348 }
349
350 if !errors.is_empty() && (main_stmt.is_none() || object_type.is_none()) {
352 return Err(ValidationErrors::new(errors));
353 }
354
355 let (stmt, main_offset) = main_stmt.unwrap();
357 let obj_type = object_type.unwrap();
358
359 let fqn = match FullyQualifiedName::with_names(path, name, database, schema) {
360 Ok(fqn) => fqn,
361 Err(e) => {
362 errors.push(e);
363 return Err(ValidationErrors::new(errors));
364 }
365 };
366
367 let main_ident = stmt.ident();
369
370 validate_ident(&stmt, &fqn, main_offset, &mut errors);
372
373 validate_fqn_identifiers(&fqn, main_offset, &mut errors);
375
376 let stmt = stmt.normalize_stmt(&fqn);
378
379 let visitor = NormalizingVisitor::fully_qualifying(&fqn);
381 visitor.normalize_index_references(&mut indexes);
382 visitor.normalize_grant_references(&mut grants);
383 visitor.normalize_comment_references(&mut comments);
384
385 validate_index_clusters(&fqn, &indexes, &index_offsets, &mut errors);
387 validate_indexes_supported(&fqn, &stmt, &indexes, &index_offsets, &mut errors);
388 validate_mv_cluster(&fqn, &stmt, main_offset, &mut errors);
389 validate_sink_cluster(&fqn, &stmt, main_offset, &mut errors);
390 validate_source_cluster(&fqn, &stmt, main_offset, &mut errors);
391
392 validate_index_references(&fqn, &indexes, &index_offsets, &main_ident, &mut errors);
393 validate_grant_references(
394 &fqn,
395 &grants,
396 &grant_offsets,
397 &main_ident,
398 obj_type,
399 &mut errors,
400 );
401 validate_comment_references(
402 &fqn,
403 &comments,
404 &comment_offsets,
405 &main_ident,
406 &obj_type,
407 &mut errors,
408 );
409
410 if !errors.is_empty() {
411 return Err(ValidationErrors::new(errors));
412 }
413
414 Ok(DatabaseObject {
415 path: path.to_path_buf(),
416 stmt,
417 indexes,
418 grants,
419 comments,
420 tests,
421 })
422}
423
424impl DatabaseObject {
425 pub fn validate(
428 value: input::DatabaseObject,
429 profile: &str,
430 ) -> Result<Option<Self>, ValidationErrors> {
431 let mut errors = Vec::new();
432
433 let mut variant_types: Vec<(ObjectType, &input::ObjectVariant)> = Vec::new();
435 for variant in &value.variants {
436 let stmts: Vec<_> = variant.statements.iter().map(|ls| &ls.ast).collect();
437 match classify_variant_object_type(&value.name, &variant.path, &stmts) {
438 Ok(obj_type) => variant_types.push((obj_type, variant)),
439 Err(errs) => errors.extend(errs),
440 }
441 }
442
443 if variant_types.is_empty() && !errors.is_empty() {
445 return Err(ValidationErrors::new(errors));
446 }
447
448 let reference_type = variant_types
451 .iter()
452 .find(|(_, v)| v.profile.is_none())
453 .or_else(|| variant_types.first())
454 .map(|(t, _)| *t);
455
456 if let Some(ref_type) = reference_type {
457 let ref_variant = variant_types
458 .iter()
459 .find(|(_, v)| v.profile.is_none())
460 .or_else(|| variant_types.first())
461 .map(|(_, v)| *v)
462 .unwrap();
463
464 for (obj_type, variant) in &variant_types {
465 if *obj_type != ref_type {
466 errors.push(ValidationError::with_file(
467 ValidationErrorKind::ProfileObjectTypeMismatch {
468 object_name: value.name.clone(),
469 default_type: object_type_name(ref_type).to_string(),
470 override_profile: variant
471 .profile
472 .clone()
473 .unwrap_or_else(|| "default".to_string()),
474 override_type: object_type_name(*obj_type).to_string(),
475 default_path: ref_variant.path.clone(),
476 override_path: variant.path.clone(),
477 },
478 variant.path.clone(),
479 ));
480 }
481 }
482
483 let has_overrides = value.variants.iter().any(|v| v.profile.is_some());
485 if has_overrides
486 && (ref_type == ObjectType::View || ref_type == ObjectType::MaterializedView)
487 {
488 for variant in &value.variants {
489 if let Some(ref prof) = variant.profile {
490 errors.push(ValidationError::with_file(
491 ValidationErrorKind::ProfileOverrideNotAllowed {
492 object_name: value.name.clone(),
493 object_type: object_type_name(ref_type).to_string(),
494 override_profile: prof.clone(),
495 override_path: variant.path.clone(),
496 },
497 variant.path.clone(),
498 ));
499 }
500 }
501 }
502 }
503
504 if !errors.is_empty() {
505 return Err(ValidationErrors::new(errors));
506 }
507
508 let active_variant = value
512 .variants
513 .iter()
514 .find(|v| v.profile.as_deref() == Some(profile))
515 .or_else(|| value.variants.iter().find(|v| v.profile.is_none()));
516
517 let active_variant = match active_variant {
518 Some(v) => v,
519 None => {
520 return Ok(None);
523 }
524 };
525
526 validate_single_variant(
528 &value.name,
529 &value.database,
530 &value.schema,
531 &active_variant.path,
532 active_variant.statements.clone(),
533 )
534 .map(Some)
535 }
536}
537
538#[derive(Debug, Clone)]
539pub(crate) struct SchemaBuildMeta {
540 pub name: String,
541 pub mod_statements: Option<Vec<mz_sql_parser::ast::Statement<Raw>>>,
542}
543
544#[derive(Debug, Clone)]
545pub(crate) struct DatabaseBuildMeta {
546 pub name: String,
547 pub mod_statements: Option<Vec<mz_sql_parser::ast::Statement<Raw>>>,
548 pub schemas: Vec<SchemaBuildMeta>,
549}
550
551pub(crate) fn assemble_project(
552 db_metas: Vec<DatabaseBuildMeta>,
553 validated_objects: Vec<(String, String, DatabaseObject)>,
554) -> Result<Project, ValidationErrors> {
555 let mut all_errors = Vec::new();
556
557 let mut objects_by_location: BTreeMap<(String, String), Vec<DatabaseObject>> = BTreeMap::new();
558 for (db_name, schema_name, object) in validated_objects {
559 objects_by_location
560 .entry((db_name, schema_name))
561 .or_default()
562 .push(object);
563 }
564
565 let mut databases = Vec::new();
566
567 for meta in db_metas {
568 let mut schemas = Vec::new();
569
570 for schema_meta in &meta.schemas {
571 let objects = objects_by_location
572 .remove(&(meta.name.clone(), schema_meta.name.clone()))
573 .unwrap_or_default();
574
575 validate_no_storage_and_computation_in_schema(
576 &schema_meta.name,
577 &objects,
578 &mut all_errors,
579 );
580
581 schemas.push(Schema {
582 name: schema_meta.name.clone(),
583 objects,
584 mod_statements: schema_meta.mod_statements.clone(),
585 });
586 }
587
588 databases.push(Database {
589 name: meta.name,
590 schemas,
591 mod_statements: meta.mod_statements,
592 });
593 }
594
595 let replacement_schemas = derive_replacement_schemas(&databases);
596 validate_replacement_schemas(&replacement_schemas, &databases, &mut all_errors);
597
598 if !all_errors.is_empty() {
599 return Err(ValidationErrors::new(all_errors));
600 }
601
602 Ok(Project {
603 databases,
604 replacement_schemas,
605 })
606}
607
608fn derive_replacement_schemas(databases: &[Database]) -> BTreeSet<SchemaQualifier> {
611 let mut replacement_schemas = BTreeSet::new();
612 for db in databases {
613 for schema in &db.schemas {
614 if let Some(mod_stmts) = &schema.mod_statements {
615 for stmt in mod_stmts {
616 if matches!(stmt, mz_sql_parser::ast::Statement::SetVariable(s) if s.variable.as_str().eq_ignore_ascii_case("api"))
617 {
618 replacement_schemas
619 .insert(SchemaQualifier::new(db.name.clone(), schema.name.clone()));
620 break;
621 }
622 }
623 }
624 }
625 }
626 replacement_schemas
627}
628
629fn validate_replacement_schemas(
633 replacement_schemas: &BTreeSet<SchemaQualifier>,
634 databases: &[Database],
635 errors: &mut Vec<ValidationError>,
636) {
637 if replacement_schemas.is_empty() {
638 return;
639 }
640
641 for db in databases {
642 for schema in &db.schemas {
643 if !replacement_schemas
644 .iter()
645 .any(|sq| sq.database == db.name && sq.schema == schema.name)
646 {
647 continue;
648 }
649
650 for obj in &schema.objects {
651 if !matches!(obj.stmt, Statement::CreateMaterializedView(_)) {
652 errors.push(ValidationError::with_file(
653 ValidationErrorKind::ReplacementSchemaNonMvObject {
654 database: db.name.clone(),
655 schema: schema.name.clone(),
656 object_name: obj.stmt.ident().object.to_string(),
657 object_type: obj.stmt.kind(),
658 },
659 PathBuf::from(format!("{}/{}.sql", db.name, schema.name)),
660 ));
661 }
662 }
663 }
664 }
665}