1pub(crate) mod cache;
83mod cache_io;
84mod mod_statements;
85mod object_validation;
86pub(crate) mod typecheck;
87
88use super::error::{LoadError, ProjectError, ValidationError, ValidationErrors};
89use crate::project::ir::{compiled, graph};
90use crate::project::syntax::input;
91use crate::project::syntax::parser::parse_statements_with_context;
92use crate::project::syntax::profile_files::collect_all_sql_files;
93use crate::verbose;
94use cache::BuildArtifact;
95use cache::build_artifact::{CompiledObjectArtifact, CompiledObjectArtifactData, ObjectStateRow};
96use cache_io::hex_digest;
97use mz_sql_parser::ast::{
98 CommentStatement, CreateIndexStatement, ExecuteUnitTestStatement, GrantPrivilegesStatement,
99 Raw, Statement,
100};
101use rayon::prelude::*;
102use sha2::{Digest, Sha256};
103use std::collections::{BTreeMap, BTreeSet};
104use std::fs;
105use std::path::{Path, PathBuf};
106
107pub(crate) const COMPILER_DIR: &str = "compiler";
108
109#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
115pub(crate) struct CompileStats {
116 pub cache_hits: usize,
117 pub cache_misses: usize,
118}
119
120#[derive(Debug)]
133struct Discovery {
134 db_metas: Vec<object_validation::DatabaseBuildMeta>,
135 object_descriptors: Vec<ObjectDescriptor>,
136 db_name_map: BTreeMap<String, String>,
137}
138
139#[derive(Debug, Clone)]
146struct ObjectDescriptor {
147 db_name: String,
151 original_db_name: String,
155 schema_name: String,
156 object_name: String,
157 variants: Vec<VariantDescriptor>,
158}
159
160#[derive(Debug, Clone)]
167struct VariantDescriptor {
168 path: PathBuf,
169 profile: Option<String>,
170}
171
172#[derive(Debug, Clone)]
179struct CachedTypedObject {
180 db_name: String,
181 schema_name: String,
182 typed_object: compiled::DatabaseObject,
183}
184
185enum ObjectCompileResult {
195 Ok {
196 compiled: Option<CachedTypedObject>,
197 state_row: Option<ObjectStateRow>,
198 stats: CompileStats,
199 },
200 ValidationErr(Vec<ValidationError>),
201 ProjectErr(ProjectError),
202}
203
204enum ObjectPlanResult {
217 Hit {
218 object_key: String,
219 compiled: Option<CachedTypedObject>,
220 stats: CompileStats,
221 },
222 Miss {
223 object_key: String,
224 fingerprint: String,
225 descriptor: ObjectDescriptor,
226 },
227 ProjectErr(ProjectError),
228}
229
230pub(crate) fn compile_sync<P: AsRef<Path>>(
240 fs: &crate::fs::FileSystem,
241 root: P,
242 profile: Option<&str>,
243 profile_suffix: Option<&str>,
244 variables: &BTreeMap<String, String>,
245) -> Result<graph::Project, ProjectError> {
246 compile_sync_with_stats(fs, root, profile, profile_suffix, variables)
247 .map(|(project, _)| project)
248}
249
250fn compile_sync_with_stats<P: AsRef<Path>>(
267 fs: &crate::fs::FileSystem,
268 root: P,
269 profile: Option<&str>,
270 profile_suffix: Option<&str>,
271 variables: &BTreeMap<String, String>,
272) -> Result<(graph::Project, CompileStats), ProjectError> {
273 let profile_set = profile.is_some();
278 let profile = profile.unwrap_or("");
279 let root = root.as_ref();
280 let mut db =
281 BuildArtifact::open(root, profile, profile_suffix, variables).map_err(LoadError::from)?;
282 let discovery = discover_project(fs, root, profile_suffix, variables, profile_set, &mut db)?;
283
284 let variant_paths: BTreeSet<PathBuf> = discovery
285 .object_descriptors
286 .iter()
287 .flat_map(|descriptor| {
288 descriptor
289 .variants
290 .iter()
291 .map(|variant| variant.path.clone())
292 })
293 .collect();
294 let file_hashes = db
295 .load_file_hashes(fs, &variant_paths)
296 .map_err(LoadError::from)?;
297
298 let existing_fingerprints = db.load_object_fingerprints().map_err(LoadError::from)?;
299
300 let stages: Vec<ObjectPlanStage> = discovery
302 .object_descriptors
303 .clone()
304 .into_par_iter()
305 .map(|descriptor| stage_object(descriptor, &existing_fingerprints, &file_hashes, variables))
306 .collect();
307
308 let hit_keys: BTreeSet<String> = stages
310 .iter()
311 .filter_map(|stage| match stage {
312 ObjectPlanStage::Hit { object_key, .. } => Some(object_key.clone()),
313 _ => None,
314 })
315 .collect();
316 let hit_artifacts = db
317 .load_object_artifacts(&hit_keys)
318 .map_err(LoadError::from)?;
319
320 let plans: Vec<ObjectPlanResult> = stages
323 .into_par_iter()
324 .map(|stage| finalize_stage(stage, &hit_artifacts))
325 .collect();
326
327 let mut all_validation_errors = Vec::new();
328 let mut validated_objects = Vec::new();
329 let mut stats = CompileStats::default();
330 let mut current_keys = BTreeSet::new();
331 let mut miss_keys: BTreeSet<String> = BTreeSet::new();
332 let mut misses = Vec::new();
333
334 for plan in plans {
335 match plan {
336 ObjectPlanResult::Hit {
337 object_key,
338 compiled,
339 stats: object_stats,
340 } => {
341 current_keys.insert(object_key);
342 if let Some(compiled) = compiled {
343 validated_objects.push((
344 compiled.db_name,
345 compiled.schema_name,
346 compiled.typed_object,
347 ));
348 }
349 stats.cache_hits += object_stats.cache_hits;
350 stats.cache_misses += object_stats.cache_misses;
351 }
352 ObjectPlanResult::Miss {
353 object_key,
354 fingerprint,
355 descriptor,
356 } => {
357 current_keys.insert(object_key.clone());
358 miss_keys.insert(object_key.clone());
359 misses.push((object_key, fingerprint, descriptor));
360 }
361 ObjectPlanResult::ProjectErr(err) => return Err(err),
362 }
363 }
364
365 if !misses.is_empty() {
366 let miss_paths: BTreeSet<PathBuf> = misses
367 .iter()
368 .flat_map(|(_, _, descriptor)| {
369 descriptor
370 .variants
371 .iter()
372 .map(|variant| variant.path.clone())
373 })
374 .collect();
375 let miss_file_entries = db
376 .load_file_contents(fs, &miss_paths)
377 .map_err(LoadError::from)?;
378 let results: Vec<ObjectCompileResult> = misses
379 .into_par_iter()
380 .map(|(object_key, fingerprint, descriptor)| {
381 compile_object(
382 descriptor,
383 object_key,
384 fingerprint,
385 profile,
386 variables,
387 profile_set,
388 &miss_file_entries,
389 )
390 })
391 .collect();
392
393 let mut updated_rows = Vec::new();
394 for result in results {
395 match result {
396 ObjectCompileResult::Ok {
397 compiled,
398 state_row,
399 stats: object_stats,
400 } => {
401 if let Some(compiled) = compiled {
402 validated_objects.push((
403 compiled.db_name,
404 compiled.schema_name,
405 compiled.typed_object,
406 ));
407 }
408 if let Some(row) = state_row {
409 updated_rows.push(row);
410 }
411 stats.cache_hits += object_stats.cache_hits;
412 stats.cache_misses += object_stats.cache_misses;
413 }
414 ObjectCompileResult::ValidationErr(errs) => all_validation_errors.extend(errs),
415 ObjectCompileResult::ProjectErr(err) => return Err(err),
416 }
417 }
418 db.upsert_object_rows(&updated_rows)
419 .map_err(LoadError::from)?;
420 }
421
422 if !all_validation_errors.is_empty() {
423 return Err(ValidationErrors::new(all_validation_errors).into());
424 }
425 db.prune_object_rows(¤t_keys)
426 .map_err(LoadError::from)?;
427
428 let mut compiled_project =
429 object_validation::assemble_project(discovery.db_metas, validated_objects)?;
430 if !discovery.db_name_map.is_empty() {
431 compiled_project.rewrite_database_references(&discovery.db_name_map);
432 }
433 if let Some(ps) = profile_suffix {
434 let cluster_name_map = build_cluster_name_map(&compiled_project, ps);
435 if !cluster_name_map.is_empty() {
436 compiled_project.rewrite_cluster_references(&cluster_name_map);
437 }
438 }
439
440 let mut project = graph::Project::from(compiled_project);
441 project.compile_dirty = miss_keys.iter().filter_map(|k| k.parse().ok()).collect();
442
443 let deleted_keys: BTreeSet<String> = existing_fingerprints
445 .keys()
446 .filter(|k| !current_keys.contains(*k))
447 .cloned()
448 .collect();
449 if let Err(e) = db.write_project(&project, &miss_keys, &deleted_keys, root) {
450 verbose!("Failed to persist project to SQLite: {}", e);
451 }
452
453 Ok((project, stats))
454}
455
456fn build_cluster_name_map(
459 project: &compiled::Project,
460 cluster_suffix: &str,
461) -> BTreeMap<String, String> {
462 let mut names = BTreeSet::new();
463 for db in &project.databases {
464 for schema in &db.schemas {
465 for obj in &schema.objects {
466 names.extend(obj.clusters());
467 }
468 }
469 }
470 names
471 .into_iter()
472 .map(|name| {
473 let suffixed = format!("{}{}", name, cluster_suffix);
474 (name, suffixed)
475 })
476 .collect()
477}
478
479fn discover_project(
501 fs: &crate::fs::FileSystem,
502 root: &Path,
503 profile_suffix: Option<&str>,
504 variables: &BTreeMap<String, String>,
505 profile_set: bool,
506 db: &mut BuildArtifact,
507) -> Result<Discovery, ProjectError> {
508 if !root.exists() {
509 return Err(LoadError::RootNotFound {
510 path: root.to_path_buf(),
511 }
512 .into());
513 }
514 if !root.is_dir() {
515 return Err(LoadError::RootNotDirectory {
516 path: root.to_path_buf(),
517 }
518 .into());
519 }
520
521 let models_dir = root.join("models");
522 if !models_dir.is_dir() {
523 return Err(LoadError::ModelsNotFound { path: models_dir }.into());
524 }
525
526 let mut db_name_map = BTreeMap::new();
527 let mut db_metas = Vec::new();
528 let mut object_descriptors = Vec::new();
529 let mut validation_errors = Vec::new();
530
531 for db_entry in fs::read_dir(&models_dir).map_err(|source| LoadError::DirectoryReadFailed {
532 path: models_dir.clone(),
533 source,
534 })? {
535 let db_entry = db_entry.map_err(|source| LoadError::EntryReadFailed {
536 directory: models_dir.clone(),
537 source,
538 })?;
539 let db_path = db_entry.path();
540 if !db_path.is_dir() || db_entry.file_name().to_string_lossy().starts_with('.') {
541 continue;
542 }
543
544 let original_db_name = db_entry.file_name().to_string_lossy().to_string();
545 let db_name = match profile_suffix {
546 Some(suffix) => format!("{}{}", original_db_name, suffix),
547 None => original_db_name.clone(),
548 };
549 if profile_suffix.is_some() {
550 db_name_map.insert(original_db_name.clone(), db_name.clone());
551 }
552
553 let db_mod_path = models_dir.join(format!("{}.sql", original_db_name));
554 let db_mod_statements = parse_mod_statements(
555 fs,
556 &db_mod_path,
557 &original_db_name,
558 profile_suffix,
559 variables,
560 profile_set,
561 db,
562 )?;
563 if let Some(ref stmts) = db_mod_statements {
564 mod_statements::validate_database_mod_statements(
565 &db_name,
566 &db_mod_path,
567 stmts,
568 &mut validation_errors,
569 );
570 }
571
572 let mut schema_metas = Vec::new();
573 for schema_entry in
574 fs::read_dir(&db_path).map_err(|source| LoadError::DirectoryReadFailed {
575 path: db_path.clone(),
576 source,
577 })?
578 {
579 let schema_entry = schema_entry.map_err(|source| LoadError::EntryReadFailed {
580 directory: db_path.clone(),
581 source,
582 })?;
583 let schema_path = schema_entry.path();
584 if !schema_path.is_dir() || schema_entry.file_name().to_string_lossy().starts_with('.')
585 {
586 continue;
587 }
588
589 let schema_name = schema_entry.file_name().to_string_lossy().to_string();
590 let schema_mod_path = db_path.join(format!("{}.sql", schema_name));
591 let mut schema_mod_statements = parse_mod_statements(
592 fs,
593 &schema_mod_path,
594 &original_db_name,
595 profile_suffix,
596 variables,
597 profile_set,
598 db,
599 )?;
600 if let Some(ref mut stmts) = schema_mod_statements {
601 mod_statements::validate_schema_mod_statements(
602 &db_name,
603 &schema_name,
604 &schema_mod_path,
605 stmts,
606 &mut validation_errors,
607 );
608 }
609
610 let object_files = collect_all_sql_files(&schema_path)?;
611 for object_files in object_files {
612 let mut variants = Vec::new();
613 if let Some(path) = object_files.default {
614 variants.push(VariantDescriptor {
615 path,
616 profile: None,
617 });
618 }
619 for (variant_profile, path) in object_files.overrides {
620 variants.push(VariantDescriptor {
621 path,
622 profile: Some(variant_profile),
623 });
624 }
625 object_descriptors.push(ObjectDescriptor {
626 db_name: db_name.clone(),
627 original_db_name: original_db_name.clone(),
628 schema_name: schema_name.clone(),
629 object_name: object_files.name,
630 variants,
631 });
632 }
633
634 schema_metas.push(object_validation::SchemaBuildMeta {
635 name: schema_name,
636 mod_statements: schema_mod_statements,
637 });
638 }
639
640 db_metas.push(object_validation::DatabaseBuildMeta {
641 name: db_name,
642 mod_statements: db_mod_statements,
643 schemas: schema_metas,
644 });
645 }
646
647 if !validation_errors.is_empty() {
648 return Err(ValidationErrors::new(validation_errors).into());
649 }
650
651 Ok(Discovery {
652 db_metas,
653 object_descriptors,
654 db_name_map,
655 })
656}
657
658fn parse_mod_statements(
665 fs: &crate::fs::FileSystem,
666 path: &Path,
667 original_db_name: &str,
668 profile_suffix: Option<&str>,
669 variables: &BTreeMap<String, String>,
670 profile_set: bool,
671 db: &mut BuildArtifact,
672) -> Result<Option<Vec<Statement<Raw>>>, ProjectError> {
673 if !path.exists() {
674 return Ok(None);
675 }
676
677 let mut entries = db
678 .load_file_contents(fs, &BTreeSet::from([path.to_path_buf()]))
679 .map_err(LoadError::from)?;
680 let sql = entries
681 .remove(path)
682 .ok_or_else(|| LoadError::InvalidFileName {
683 path: path.to_path_buf(),
684 })?;
685 let mut statements: Vec<Statement<Raw>> =
686 parse_statements_with_context(&sql, path.to_path_buf(), variables, profile_set)?
687 .into_iter()
688 .map(|stmt| stmt.ast)
689 .collect();
690 if let Some(suffix) = profile_suffix {
691 crate::project::resolve::normalize::rewrite_database_names(
692 &mut statements,
693 original_db_name,
694 suffix,
695 );
696 }
697 Ok(Some(statements))
698}
699
700enum ObjectPlanStage {
706 Hit {
707 object_key: String,
708 fingerprint: String,
709 descriptor: ObjectDescriptor,
710 },
711 Miss {
712 object_key: String,
713 fingerprint: String,
714 descriptor: ObjectDescriptor,
715 },
716 ProjectErr(ProjectError),
717}
718
719fn stage_object(
722 descriptor: ObjectDescriptor,
723 existing_fingerprints: &BTreeMap<String, String>,
724 file_hashes: &BTreeMap<PathBuf, String>,
725 variables: &BTreeMap<String, String>,
726) -> ObjectPlanStage {
727 let object_key = object_key(
728 &descriptor.db_name,
729 &descriptor.schema_name,
730 &descriptor.object_name,
731 );
732 let fingerprint = match object_fingerprint(&descriptor, file_hashes, variables) {
733 Ok(fingerprint) => fingerprint,
734 Err(err) => return ObjectPlanStage::ProjectErr(err),
735 };
736
737 if existing_fingerprints.get(&object_key) == Some(&fingerprint) {
738 ObjectPlanStage::Hit {
739 object_key,
740 fingerprint,
741 descriptor,
742 }
743 } else {
744 ObjectPlanStage::Miss {
745 object_key,
746 fingerprint,
747 descriptor,
748 }
749 }
750}
751
752fn finalize_stage(
758 stage: ObjectPlanStage,
759 hit_artifacts: &BTreeMap<String, CompiledObjectArtifact>,
760) -> ObjectPlanResult {
761 match stage {
762 ObjectPlanStage::Hit {
763 object_key,
764 fingerprint,
765 descriptor,
766 } => {
767 let Some(artifact) = hit_artifacts.get(&object_key) else {
768 verbose!(
769 "recompiling {} after cached object row was missing during artifact load",
770 object_key
771 );
772 return ObjectPlanResult::Miss {
773 object_key,
774 fingerprint,
775 descriptor,
776 };
777 };
778 match artifact_to_compiled_object(artifact) {
779 Ok(compiled) => ObjectPlanResult::Hit {
780 object_key,
781 compiled,
782 stats: CompileStats {
783 cache_hits: 1,
784 cache_misses: 0,
785 },
786 },
787 Err(()) => {
788 verbose!(
789 "recompiling {} after cached object payload could not be reconstructed",
790 object_key
791 );
792 ObjectPlanResult::Miss {
793 object_key,
794 fingerprint,
795 descriptor,
796 }
797 }
798 }
799 }
800 ObjectPlanStage::Miss {
801 object_key,
802 fingerprint,
803 descriptor,
804 } => ObjectPlanResult::Miss {
805 object_key,
806 fingerprint,
807 descriptor,
808 },
809 ObjectPlanStage::ProjectErr(err) => ObjectPlanResult::ProjectErr(err),
810 }
811}
812
813enum ObjectCompileFailure {
818 Validation(Vec<ValidationError>),
819 Project(ProjectError),
820}
821
822fn compile_object_uncached(
834 descriptor: ObjectDescriptor,
835 profile: &str,
836 variables: &BTreeMap<String, String>,
837 profile_set: bool,
838 file_entries: &BTreeMap<PathBuf, String>,
839) -> Result<Option<CachedTypedObject>, ObjectCompileFailure> {
840 let mut variants = Vec::new();
841 for variant in descriptor.variants {
842 let sql = file_entries.get(&variant.path).cloned().ok_or_else(|| {
843 ObjectCompileFailure::Project(
844 LoadError::InvalidFileName {
845 path: variant.path.clone(),
846 }
847 .into(),
848 )
849 })?;
850 let statements =
851 parse_statements_with_context(&sql, variant.path.clone(), variables, profile_set)
852 .map_err(|err| ObjectCompileFailure::Project(err.into()))?;
853 variants.push(input::ObjectVariant {
854 path: variant.path,
855 profile: variant.profile,
856 statements,
857 });
858 }
859
860 let raw_object = input::DatabaseObject {
866 name: descriptor.object_name,
867 database: descriptor.original_db_name.clone(),
868 schema: descriptor.schema_name.clone(),
869 variants,
870 };
871
872 match compiled::DatabaseObject::validate(raw_object, profile) {
873 Ok(Some(typed_object)) => Ok(Some(CachedTypedObject {
874 db_name: descriptor.db_name,
875 schema_name: descriptor.schema_name,
876 typed_object,
877 })),
878 Ok(None) => Ok(None),
879 Err(errs) => Err(ObjectCompileFailure::Validation(errs.errors)),
880 }
881}
882
883fn compile_object(
890 descriptor: ObjectDescriptor,
891 object_key: String,
892 fingerprint: String,
893 profile: &str,
894 variables: &BTreeMap<String, String>,
895 profile_set: bool,
896 file_entries: &BTreeMap<PathBuf, String>,
897) -> ObjectCompileResult {
898 let compiled =
899 match compile_object_uncached(descriptor, profile, variables, profile_set, file_entries) {
900 Ok(compiled) => compiled,
901 Err(ObjectCompileFailure::Validation(errs)) => {
902 return ObjectCompileResult::ValidationErr(errs);
903 }
904 Err(ObjectCompileFailure::Project(err)) => return ObjectCompileResult::ProjectErr(err),
905 };
906
907 let artifact = match &compiled {
908 Some(object) => CompiledObjectArtifact::Object(compiled_object_to_artifact_data(object)),
909 None => CompiledObjectArtifact::Skipped,
910 };
911
912 ObjectCompileResult::Ok {
913 compiled,
914 state_row: Some(ObjectStateRow {
915 object_key,
916 fingerprint,
917 artifact,
918 }),
919 stats: CompileStats {
920 cache_hits: 0,
921 cache_misses: 1,
922 },
923 }
924}
925
926fn object_fingerprint(
939 descriptor: &ObjectDescriptor,
940 file_hashes: &BTreeMap<PathBuf, String>,
941 variables: &BTreeMap<String, String>,
942) -> Result<String, ProjectError> {
943 let mut hasher = Sha256::new();
944 hasher.update(descriptor.db_name.as_bytes());
945 hasher.update([0]);
946 hasher.update(descriptor.schema_name.as_bytes());
947 hasher.update([0]);
948 hasher.update(descriptor.object_name.as_bytes());
949 hasher.update([0]);
950 for (name, value) in variables {
951 hasher.update(name.as_bytes());
952 hasher.update([0]);
953 hasher.update(value.as_bytes());
954 hasher.update([0xff]);
955 }
956 for variant in &descriptor.variants {
957 hasher.update(variant.path.to_string_lossy().as_bytes());
958 hasher.update([0]);
959 hasher.update(variant.profile.as_deref().unwrap_or("").as_bytes());
960 hasher.update([0]);
961 let content_hash =
962 file_hashes
963 .get(&variant.path)
964 .ok_or_else(|| LoadError::InvalidFileName {
965 path: variant.path.clone(),
966 })?;
967 hasher.update(content_hash.as_bytes());
968 hasher.update([0xfe]);
969 }
970 Ok(hex_digest(hasher.finalize()))
971}
972
973fn object_key(db_name: &str, schema_name: &str, object_name: &str) -> String {
975 format!("{db_name}.{schema_name}.{object_name}")
976}
977
978pub(crate) fn profile_namespace(
985 profile: &str,
986 profile_suffix: Option<&str>,
987 variables: &BTreeMap<String, String>,
988) -> String {
989 let mut hasher = Sha256::new();
990 hasher.update(profile.as_bytes());
991 hasher.update([0]);
992 hasher.update(profile_suffix.unwrap_or("").as_bytes());
993 hasher.update([0]);
994 for (name, value) in variables {
995 hasher.update(name.as_bytes());
996 hasher.update([0]);
997 hasher.update(value.as_bytes());
998 hasher.update([0xff]);
999 }
1000 hex_digest(hasher.finalize())
1001}
1002
1003fn compiled_object_to_artifact_data(object: &CachedTypedObject) -> CompiledObjectArtifactData {
1007 CompiledObjectArtifactData {
1008 db_name: object.db_name.clone(),
1009 schema_name: object.schema_name.clone(),
1010 file_path: object.typed_object.path.clone(),
1011 stmt_sql: format!("{};", object.typed_object.stmt),
1012 indexes_sql: object
1013 .typed_object
1014 .indexes
1015 .iter()
1016 .map(|stmt| format!("{};", stmt))
1017 .collect(),
1018 grants_sql: object
1019 .typed_object
1020 .grants
1021 .iter()
1022 .map(|stmt| format!("{};", stmt))
1023 .collect(),
1024 comments_sql: object
1025 .typed_object
1026 .comments
1027 .iter()
1028 .map(|stmt| format!("{};", stmt))
1029 .collect(),
1030 tests_sql: object
1031 .typed_object
1032 .tests
1033 .iter()
1034 .map(|stmt| format!("{};", stmt))
1035 .collect(),
1036 }
1037}
1038
1039fn artifact_to_compiled_object(
1045 artifact: &CompiledObjectArtifact,
1046) -> Result<Option<CachedTypedObject>, ()> {
1047 match artifact {
1048 CompiledObjectArtifact::Skipped => Ok(None),
1049 CompiledObjectArtifact::Object(data) => Ok(Some(CachedTypedObject {
1050 db_name: data.db_name.clone(),
1051 schema_name: data.schema_name.clone(),
1052 typed_object: compiled::DatabaseObject {
1053 path: data.file_path.clone(),
1054 stmt: parse_main_statement(&data.stmt_sql)?,
1055 indexes: parse_statement_list(&data.indexes_sql, expect_index)?,
1056 grants: parse_statement_list(&data.grants_sql, expect_grant)?,
1057 comments: parse_statement_list(&data.comments_sql, expect_comment)?,
1058 tests: parse_statement_list(&data.tests_sql, expect_test)?,
1059 },
1060 })),
1061 }
1062}
1063
1064fn parse_sql(sql: &str) -> Result<Vec<Statement<Raw>>, ()> {
1070 mz_sql_parser::parser::parse_statements_with_limit(sql)
1071 .map_err(|_| ())?
1072 .map(|stmts| stmts.into_iter().map(|stmt| stmt.ast).collect())
1073 .map_err(|_| ())
1074}
1075
1076fn parse_one_statement(sql: &str) -> Result<Statement<Raw>, ()> {
1081 let mut statements = parse_sql(sql)?;
1082 if statements.len() != 1 {
1083 return Err(());
1084 }
1085 Ok(statements.remove(0))
1086}
1087
1088fn parse_statement_list<T>(
1094 sql_statements: &[String],
1095 parser: fn(Statement<Raw>) -> Result<T, ()>,
1096) -> Result<Vec<T>, ()> {
1097 sql_statements
1098 .iter()
1099 .map(|sql| parse_one_statement(sql).and_then(parser))
1100 .collect()
1101}
1102
1103fn parse_main_statement(sql: &str) -> Result<crate::project::ast::Statement, ()> {
1109 match parse_one_statement(sql)? {
1110 Statement::CreateSink(stmt) => Ok(crate::project::ast::Statement::CreateSink(stmt)),
1111 Statement::CreateView(stmt) => Ok(crate::project::ast::Statement::CreateView(stmt)),
1112 Statement::CreateMaterializedView(stmt) => {
1113 Ok(crate::project::ast::Statement::CreateMaterializedView(stmt))
1114 }
1115 Statement::CreateTable(stmt) => Ok(crate::project::ast::Statement::CreateTable(stmt)),
1116 Statement::CreateTableFromSource(stmt) => {
1117 Ok(crate::project::ast::Statement::CreateTableFromSource(stmt))
1118 }
1119 Statement::CreateSource(stmt) => Ok(crate::project::ast::Statement::CreateSource(stmt)),
1120 Statement::CreateSecret(stmt) => Ok(crate::project::ast::Statement::CreateSecret(stmt)),
1121 Statement::CreateConnection(stmt) => {
1122 Ok(crate::project::ast::Statement::CreateConnection(stmt))
1123 }
1124 _ => Err(()),
1125 }
1126}
1127
1128fn expect_index(stmt: Statement<Raw>) -> Result<CreateIndexStatement<Raw>, ()> {
1130 match stmt {
1131 Statement::CreateIndex(stmt) => Ok(stmt),
1132 _ => Err(()),
1133 }
1134}
1135
1136fn expect_grant(stmt: Statement<Raw>) -> Result<GrantPrivilegesStatement<Raw>, ()> {
1138 match stmt {
1139 Statement::GrantPrivileges(stmt) => Ok(stmt),
1140 _ => Err(()),
1141 }
1142}
1143
1144fn expect_comment(stmt: Statement<Raw>) -> Result<CommentStatement<Raw>, ()> {
1146 match stmt {
1147 Statement::Comment(stmt) => Ok(stmt),
1148 _ => Err(()),
1149 }
1150}
1151
1152fn expect_test(stmt: Statement<Raw>) -> Result<ExecuteUnitTestStatement<Raw>, ()> {
1154 match stmt {
1155 Statement::ExecuteUnitTest(stmt) => Ok(stmt),
1156 _ => Err(()),
1157 }
1158}