Skip to main content

mz_deploy/project/
compiler.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Incremental project compiler.
11//!
12//! This module is the canonical implementation of `project::plan_sync()`'s
13//! compile contract:
14//!
15//! - The result of compilation is a
16//!   [`crate::project::ir::graph::Project`].
17//! - The **unit of incremental reuse** is a logical database object
18//!   (`database.schema.object`), not an entire project.
19//! - Object-local work is evaluated independently and may run in parallel.
20//! - Cross-object validation remains deterministic and is performed after all
21//!   object-local results for the invocation are available.
22//!
23//! ## Build Artifacts
24//!
25//! Compiler state is scoped to the active configuration (profile name,
26//! optional suffix, and compile-time variable bindings). Each configuration
27//! gets an isolated namespace so caches never leak across profiles.
28//!
29//! Within a namespace the compiler persists:
30//!
31//! - file metadata and content hashes to avoid rereading unchanged files
32//! - cached object artifacts for incremental reuse across invocations
33//! - cached runtime typecheck artifacts for incremental dirty detection
34//!
35//! All cached state is advisory. Missing, corrupt, or schema-incompatible
36//! entries are treated as cache misses and rebuilt from source.
37//!
38//! ## Invalidation Rules
39//!
40//! An object cache entry is reusable only when its fingerprint matches the
41//! current compile inputs for that object.
42//! The fingerprint includes:
43//!
44//! - the logical object key
45//! - every file variant that can affect active-variant resolution
46//! - the full path of every file variant
47//! - the cached content hash of those variants
48//! - the compile-time variable map
49//!
50//! As a result:
51//!
52//! - editing any variant for an object invalidates that object's cache entry
53//! - changing variables invalidates every object whose fingerprint includes
54//!   those variables
55//! - changing the active profile or suffix moves compilation to a different
56//!   namespace, isolating caches across profiles
57//! - moving the same checkout to a different directory invalidates the cache
58//!   because file paths are part of the fingerprint contract
59//!
60//! This module does **not** currently perform dependency-directed invalidation.
61//! Downstream project-graph work is recomputed from the object set produced for
62//! the current invocation.
63//!
64//! ## Correctness Guarantees
65//!
66//! Cached object artifacts store a validated object payload. A cache hit must
67//! therefore produce the same object facts that object-local parsing and
68//! validation would produce from source while skipping revalidation.
69//!
70//! Compilation must preserve these invariants:
71//!
72//! - all object-local validation errors are reported exactly as if the object
73//!   had been freshly compiled
74//! - database- and schema-level mod statements are validated on every
75//!   invocation; they are not cached independently
76//! - schema-level checks (e.g., storage/computation separation, replacement
77//!   schemas) are enforced after object artifacts are assembled, so they see
78//!   the full current project
79//! - final dependency extraction operates on a complete compiled project
80//!   assembled for the current invocation
81
82pub(crate) mod cache;
83mod cache_io;
84mod mod_statements;
85mod object_validation;
86pub(crate) mod typecheck;
87
88use super::error::{LoadError, ProjectError, ValidationError, ValidationErrors};
89use crate::project::ir::{compiled, graph};
90use crate::project::syntax::input;
91use crate::project::syntax::parser::parse_statements_with_context;
92use crate::project::syntax::profile_files::collect_all_sql_files;
93use crate::verbose;
94use cache::BuildArtifact;
95use cache::build_artifact::{CompiledObjectArtifact, CompiledObjectArtifactData, ObjectStateRow};
96use cache_io::hex_digest;
97use mz_sql_parser::ast::{
98    CommentStatement, CreateIndexStatement, ExecuteUnitTestStatement, GrantPrivilegesStatement,
99    Raw, Statement,
100};
101use rayon::prelude::*;
102use sha2::{Digest, Sha256};
103use std::collections::{BTreeMap, BTreeSet};
104use std::fs;
105use std::path::{Path, PathBuf};
106
107pub(crate) const COMPILER_DIR: &str = "compiler";
108
109/// Counters for cache behavior during a single compilation run.
110///
111/// Used by the compile orchestrator to report how many objects were served
112/// from cached artifacts (`cache_hits`) versus recompiled from source
113/// (`cache_misses`).
114#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
115pub(crate) struct CompileStats {
116    pub cache_hits: usize,
117    pub cache_misses: usize,
118}
119
120/// Output of the discovery phase: everything needed to plan and compile objects.
121///
122/// Produced by [`discover_project`], which walks the `models/` directory tree
123/// and collects:
124///
125/// - `db_metas` — database and schema metadata (names, mod statements) used
126///   by [`object_validation::assemble_project`] to build the compiled project.
127/// - `object_descriptors` — one entry per logical object with its file variants,
128///   used to fingerprint and compile individual objects.
129/// - `db_name_map` — when a profile suffix is active, maps original database
130///   names to their suffixed forms (e.g., `"app"` → `"app_dev"`), used later
131///   to rewrite cross-database references in compiled object SQL.
132#[derive(Debug)]
133struct Discovery {
134    db_metas: Vec<object_validation::DatabaseBuildMeta>,
135    object_descriptors: Vec<ObjectDescriptor>,
136    db_name_map: BTreeMap<String, String>,
137}
138
139/// A logical database object discovered on disk, not yet compiled.
140///
141/// Identifies an object by its fully qualified triple (`db_name.schema_name.object_name`)
142/// and lists every file variant (default + profile overrides) that could contribute
143/// to the active variant. This is the unit of parallelism for fingerprinting and
144/// compilation: each descriptor is processed independently.
145#[derive(Debug, Clone)]
146struct ObjectDescriptor {
147    /// Database name after profile suffix has been applied (matches the
148    /// database name that will be deployed). Used for grouping, caching,
149    /// fingerprinting, and as the canonical name in the compiled project.
150    db_name: String,
151    /// Original directory name without profile suffix. Used to build the FQN
152    /// for per-object validation so that the user's SQL — which references
153    /// the unsuffixed name they wrote in their files — matches the directory.
154    original_db_name: String,
155    schema_name: String,
156    object_name: String,
157    variants: Vec<VariantDescriptor>,
158}
159
160/// A single file variant contributing to an [`ObjectDescriptor`].
161///
162/// - `path` — absolute path to the `.sql` file on disk.
163/// - `profile` — `None` for the default variant (`object.sql`), `Some("prod")`
164///   for a profile override (`object#prod.sql`). Used during active-variant
165///   resolution to select which file to compile for the current profile.
166#[derive(Debug, Clone)]
167struct VariantDescriptor {
168    path: PathBuf,
169    profile: Option<String>,
170}
171
172/// In-memory representation of a compiled object together with its location.
173///
174/// Carries the fully validated [`compiled::DatabaseObject`] alongside the
175/// database and schema names needed to slot it into the assembled project.
176/// Produced by both cache hits (via `CachedTypedObjectArtifact::into_compiled_object`)
177/// and fresh compilation (via [`compile_object_uncached`]).
178#[derive(Debug, Clone)]
179struct CachedTypedObject {
180    db_name: String,
181    schema_name: String,
182    typed_object: compiled::DatabaseObject,
183}
184
185/// Result of compiling a single object from source.
186///
187/// - `Ok` — compilation succeeded. `compiled` is `None` when the object was
188///   skipped (e.g., no matching profile variant). `state_row` carries the
189///   artifact to persist in the cache.
190/// - `ValidationErr` — the object has user-facing validation errors. These are
191///   collected and reported after all objects are compiled.
192/// - `ProjectErr` — an internal error (I/O failure, parse crash) that should
193///   abort compilation immediately.
194enum ObjectCompileResult {
195    Ok {
196        compiled: Option<CachedTypedObject>,
197        state_row: Option<ObjectStateRow>,
198        stats: CompileStats,
199    },
200    ValidationErr(Vec<ValidationError>),
201    ProjectErr(ProjectError),
202}
203
204/// Result of the planning phase for a single object.
205///
206/// Planning determines whether a cached artifact can be reused (`Hit`) or the
207/// object must be recompiled from source (`Miss`). This runs in parallel across
208/// all discovered objects before any compilation begins.
209///
210/// - `Hit` — the cached fingerprint matches; `compiled` contains the
211///   deserialized object (or `None` if the cached artifact was `Skipped`).
212/// - `Miss` — the object needs fresh compilation. Carries the descriptor and
213///   current fingerprint so the compile phase can produce and persist a new
214///   artifact.
215/// - `ProjectErr` — fingerprinting failed (e.g., missing file hash).
216enum ObjectPlanResult {
217    Hit {
218        object_key: String,
219        compiled: Option<CachedTypedObject>,
220        stats: CompileStats,
221    },
222    Miss {
223        object_key: String,
224        fingerprint: String,
225        descriptor: ObjectDescriptor,
226    },
227    ProjectErr(ProjectError),
228}
229
230/// Compile a project directory into a dependency-aware [`graph::Project`].
231///
232/// This is the canonical synchronous compiler entrypoint. It parses and
233/// validates every object for the active profile, reusing cached artifacts
234/// when fingerprints still match, and returns a fully-linked project graph
235/// with dependency and cross-object validation applied.
236///
237/// See [`compile_sync_with_stats`] for the detailed pipeline and cache
238/// behavior.
239pub(crate) fn compile_sync<P: AsRef<Path>>(
240    fs: &crate::fs::FileSystem,
241    root: P,
242    profile: Option<&str>,
243    profile_suffix: Option<&str>,
244    variables: &BTreeMap<String, String>,
245) -> Result<graph::Project, ProjectError> {
246    compile_sync_with_stats(fs, root, profile, profile_suffix, variables)
247        .map(|(project, _)| project)
248}
249
250/// Internal entry point that returns compile statistics alongside the project.
251///
252/// Runs the full incremental pipeline:
253///
254/// 1. **Discover** — walk `models/` to find databases, schemas, objects, and
255///    mod files. Build the [`Discovery`] containing all descriptors and the
256///    database name map.
257/// 2. **Plan** — fingerprint every object against the cached artifact store.
258///    Partition objects into cache hits and cache misses (parallel via rayon).
259/// 3. **Compile misses** — parse, validate, and normalize each miss from
260///    source. Persist new artifacts back to the cache (parallel via rayon).
261/// 4. **Assemble** — combine database/schema metadata with validated objects
262///    into a [`compiled::Project`]. Apply cross-database and cluster name
263///    rewrites if a profile suffix is active.
264/// 5. **Build graph** — run cross-object validation, dependency extraction,
265///    and topological analysis to produce the final [`graph::Project`].
266fn compile_sync_with_stats<P: AsRef<Path>>(
267    fs: &crate::fs::FileSystem,
268    root: P,
269    profile: Option<&str>,
270    profile_suffix: Option<&str>,
271    variables: &BTreeMap<String, String>,
272) -> Result<(graph::Project, CompileStats), ProjectError> {
273    // Internally we keep the profile name as a plain `&str` (empty when no
274    // profile is selected) and carry the "is a profile set?" bit separately
275    // for error display. The empty string can't collide with a real profile
276    // name (validation rejects it) and produces a stable cache namespace.
277    let profile_set = profile.is_some();
278    let profile = profile.unwrap_or("");
279    let root = root.as_ref();
280    let mut db =
281        BuildArtifact::open(root, profile, profile_suffix, variables).map_err(LoadError::from)?;
282    let discovery = discover_project(fs, root, profile_suffix, variables, profile_set, &mut db)?;
283
284    let variant_paths: BTreeSet<PathBuf> = discovery
285        .object_descriptors
286        .iter()
287        .flat_map(|descriptor| {
288            descriptor
289                .variants
290                .iter()
291                .map(|variant| variant.path.clone())
292        })
293        .collect();
294    let file_hashes = db
295        .load_file_hashes(fs, &variant_paths)
296        .map_err(LoadError::from)?;
297
298    let existing_fingerprints = db.load_object_fingerprints().map_err(LoadError::from)?;
299
300    // Phase 1: classify each descriptor as a fingerprint-level Hit/Miss in parallel.
301    let stages: Vec<ObjectPlanStage> = discovery
302        .object_descriptors
303        .clone()
304        .into_par_iter()
305        .map(|descriptor| stage_object(descriptor, &existing_fingerprints, &file_hashes, variables))
306        .collect();
307
308    // Phase 2: load full artifacts only for fingerprint-level hits — selective.
309    let hit_keys: BTreeSet<String> = stages
310        .iter()
311        .filter_map(|stage| match stage {
312            ObjectPlanStage::Hit { object_key, .. } => Some(object_key.clone()),
313            _ => None,
314        })
315        .collect();
316    let hit_artifacts = db
317        .load_object_artifacts(&hit_keys)
318        .map_err(LoadError::from)?;
319
320    // Phase 3: finalize each stage. Hits parse their cached SQL into AST; if any
321    // fragment fails to parse the entry is demoted to a Miss for fresh compilation.
322    let plans: Vec<ObjectPlanResult> = stages
323        .into_par_iter()
324        .map(|stage| finalize_stage(stage, &hit_artifacts))
325        .collect();
326
327    let mut all_validation_errors = Vec::new();
328    let mut validated_objects = Vec::new();
329    let mut stats = CompileStats::default();
330    let mut current_keys = BTreeSet::new();
331    let mut miss_keys: BTreeSet<String> = BTreeSet::new();
332    let mut misses = Vec::new();
333
334    for plan in plans {
335        match plan {
336            ObjectPlanResult::Hit {
337                object_key,
338                compiled,
339                stats: object_stats,
340            } => {
341                current_keys.insert(object_key);
342                if let Some(compiled) = compiled {
343                    validated_objects.push((
344                        compiled.db_name,
345                        compiled.schema_name,
346                        compiled.typed_object,
347                    ));
348                }
349                stats.cache_hits += object_stats.cache_hits;
350                stats.cache_misses += object_stats.cache_misses;
351            }
352            ObjectPlanResult::Miss {
353                object_key,
354                fingerprint,
355                descriptor,
356            } => {
357                current_keys.insert(object_key.clone());
358                miss_keys.insert(object_key.clone());
359                misses.push((object_key, fingerprint, descriptor));
360            }
361            ObjectPlanResult::ProjectErr(err) => return Err(err),
362        }
363    }
364
365    if !misses.is_empty() {
366        let miss_paths: BTreeSet<PathBuf> = misses
367            .iter()
368            .flat_map(|(_, _, descriptor)| {
369                descriptor
370                    .variants
371                    .iter()
372                    .map(|variant| variant.path.clone())
373            })
374            .collect();
375        let miss_file_entries = db
376            .load_file_contents(fs, &miss_paths)
377            .map_err(LoadError::from)?;
378        let results: Vec<ObjectCompileResult> = misses
379            .into_par_iter()
380            .map(|(object_key, fingerprint, descriptor)| {
381                compile_object(
382                    descriptor,
383                    object_key,
384                    fingerprint,
385                    profile,
386                    variables,
387                    profile_set,
388                    &miss_file_entries,
389                )
390            })
391            .collect();
392
393        let mut updated_rows = Vec::new();
394        for result in results {
395            match result {
396                ObjectCompileResult::Ok {
397                    compiled,
398                    state_row,
399                    stats: object_stats,
400                } => {
401                    if let Some(compiled) = compiled {
402                        validated_objects.push((
403                            compiled.db_name,
404                            compiled.schema_name,
405                            compiled.typed_object,
406                        ));
407                    }
408                    if let Some(row) = state_row {
409                        updated_rows.push(row);
410                    }
411                    stats.cache_hits += object_stats.cache_hits;
412                    stats.cache_misses += object_stats.cache_misses;
413                }
414                ObjectCompileResult::ValidationErr(errs) => all_validation_errors.extend(errs),
415                ObjectCompileResult::ProjectErr(err) => return Err(err),
416            }
417        }
418        db.upsert_object_rows(&updated_rows)
419            .map_err(LoadError::from)?;
420    }
421
422    if !all_validation_errors.is_empty() {
423        return Err(ValidationErrors::new(all_validation_errors).into());
424    }
425    db.prune_object_rows(&current_keys)
426        .map_err(LoadError::from)?;
427
428    let mut compiled_project =
429        object_validation::assemble_project(discovery.db_metas, validated_objects)?;
430    if !discovery.db_name_map.is_empty() {
431        compiled_project.rewrite_database_references(&discovery.db_name_map);
432    }
433    if let Some(ps) = profile_suffix {
434        let cluster_name_map = build_cluster_name_map(&compiled_project, ps);
435        if !cluster_name_map.is_empty() {
436            compiled_project.rewrite_cluster_references(&cluster_name_map);
437        }
438    }
439
440    let mut project = graph::Project::from(compiled_project);
441    project.compile_dirty = miss_keys.iter().filter_map(|k| k.parse().ok()).collect();
442
443    // Advisory persist for LSP consumption — failure is logged, not fatal.
444    let deleted_keys: BTreeSet<String> = existing_fingerprints
445        .keys()
446        .filter(|k| !current_keys.contains(*k))
447        .cloned()
448        .collect();
449    if let Err(e) = db.write_project(&project, &miss_keys, &deleted_keys, root) {
450        verbose!("Failed to persist project to SQLite: {}", e);
451    }
452
453    Ok((project, stats))
454}
455
456/// Build a map from original cluster name to the suffixed cluster name for all
457/// clusters referenced by the compiled project.
458fn build_cluster_name_map(
459    project: &compiled::Project,
460    cluster_suffix: &str,
461) -> BTreeMap<String, String> {
462    let mut names = BTreeSet::new();
463    for db in &project.databases {
464        for schema in &db.schemas {
465            for obj in &schema.objects {
466                names.extend(obj.clusters());
467            }
468        }
469    }
470    names
471        .into_iter()
472        .map(|name| {
473            let suffixed = format!("{}{}", name, cluster_suffix);
474            (name, suffixed)
475        })
476        .collect()
477}
478
479/// Walk the `models/` directory tree and collect everything needed for compilation.
480///
481/// The directory structure follows the convention:
482///
483/// ```text
484/// models/
485///   <database>/               ← directory name = database name
486///     <database>.sql          ← optional database-level mod file (grants, comments)
487///     <schema>/               ← directory name = schema name
488///       <schema>.sql          ← optional schema-level mod file
489///       <object>.sql          ← one file per database object
490///       <object>#<profile>.sql ← optional profile variant override
491/// ```
492///
493/// For each database directory:
494/// - Computes the effective database name (original + profile suffix if active).
495/// - Parses and validates database and schema mod files.
496/// - Collects all object file variants into [`ObjectDescriptor`]s.
497/// - Builds the `db_name_map` for cross-database reference rewriting.
498///
499/// Returns a [`Discovery`] or fails with accumulated validation errors.
500fn discover_project(
501    fs: &crate::fs::FileSystem,
502    root: &Path,
503    profile_suffix: Option<&str>,
504    variables: &BTreeMap<String, String>,
505    profile_set: bool,
506    db: &mut BuildArtifact,
507) -> Result<Discovery, ProjectError> {
508    if !root.exists() {
509        return Err(LoadError::RootNotFound {
510            path: root.to_path_buf(),
511        }
512        .into());
513    }
514    if !root.is_dir() {
515        return Err(LoadError::RootNotDirectory {
516            path: root.to_path_buf(),
517        }
518        .into());
519    }
520
521    let models_dir = root.join("models");
522    if !models_dir.is_dir() {
523        return Err(LoadError::ModelsNotFound { path: models_dir }.into());
524    }
525
526    let mut db_name_map = BTreeMap::new();
527    let mut db_metas = Vec::new();
528    let mut object_descriptors = Vec::new();
529    let mut validation_errors = Vec::new();
530
531    for db_entry in fs::read_dir(&models_dir).map_err(|source| LoadError::DirectoryReadFailed {
532        path: models_dir.clone(),
533        source,
534    })? {
535        let db_entry = db_entry.map_err(|source| LoadError::EntryReadFailed {
536            directory: models_dir.clone(),
537            source,
538        })?;
539        let db_path = db_entry.path();
540        if !db_path.is_dir() || db_entry.file_name().to_string_lossy().starts_with('.') {
541            continue;
542        }
543
544        let original_db_name = db_entry.file_name().to_string_lossy().to_string();
545        let db_name = match profile_suffix {
546            Some(suffix) => format!("{}{}", original_db_name, suffix),
547            None => original_db_name.clone(),
548        };
549        if profile_suffix.is_some() {
550            db_name_map.insert(original_db_name.clone(), db_name.clone());
551        }
552
553        let db_mod_path = models_dir.join(format!("{}.sql", original_db_name));
554        let db_mod_statements = parse_mod_statements(
555            fs,
556            &db_mod_path,
557            &original_db_name,
558            profile_suffix,
559            variables,
560            profile_set,
561            db,
562        )?;
563        if let Some(ref stmts) = db_mod_statements {
564            mod_statements::validate_database_mod_statements(
565                &db_name,
566                &db_mod_path,
567                stmts,
568                &mut validation_errors,
569            );
570        }
571
572        let mut schema_metas = Vec::new();
573        for schema_entry in
574            fs::read_dir(&db_path).map_err(|source| LoadError::DirectoryReadFailed {
575                path: db_path.clone(),
576                source,
577            })?
578        {
579            let schema_entry = schema_entry.map_err(|source| LoadError::EntryReadFailed {
580                directory: db_path.clone(),
581                source,
582            })?;
583            let schema_path = schema_entry.path();
584            if !schema_path.is_dir() || schema_entry.file_name().to_string_lossy().starts_with('.')
585            {
586                continue;
587            }
588
589            let schema_name = schema_entry.file_name().to_string_lossy().to_string();
590            let schema_mod_path = db_path.join(format!("{}.sql", schema_name));
591            let mut schema_mod_statements = parse_mod_statements(
592                fs,
593                &schema_mod_path,
594                &original_db_name,
595                profile_suffix,
596                variables,
597                profile_set,
598                db,
599            )?;
600            if let Some(ref mut stmts) = schema_mod_statements {
601                mod_statements::validate_schema_mod_statements(
602                    &db_name,
603                    &schema_name,
604                    &schema_mod_path,
605                    stmts,
606                    &mut validation_errors,
607                );
608            }
609
610            let object_files = collect_all_sql_files(&schema_path)?;
611            for object_files in object_files {
612                let mut variants = Vec::new();
613                if let Some(path) = object_files.default {
614                    variants.push(VariantDescriptor {
615                        path,
616                        profile: None,
617                    });
618                }
619                for (variant_profile, path) in object_files.overrides {
620                    variants.push(VariantDescriptor {
621                        path,
622                        profile: Some(variant_profile),
623                    });
624                }
625                object_descriptors.push(ObjectDescriptor {
626                    db_name: db_name.clone(),
627                    original_db_name: original_db_name.clone(),
628                    schema_name: schema_name.clone(),
629                    object_name: object_files.name,
630                    variants,
631                });
632            }
633
634            schema_metas.push(object_validation::SchemaBuildMeta {
635                name: schema_name,
636                mod_statements: schema_mod_statements,
637            });
638        }
639
640        db_metas.push(object_validation::DatabaseBuildMeta {
641            name: db_name,
642            mod_statements: db_mod_statements,
643            schemas: schema_metas,
644        });
645    }
646
647    if !validation_errors.is_empty() {
648        return Err(ValidationErrors::new(validation_errors).into());
649    }
650
651    Ok(Discovery {
652        db_metas,
653        object_descriptors,
654        db_name_map,
655    })
656}
657
658/// Parse mod statements from a SQL file, optionally rewriting database names.
659///
660/// If `profile_suffix` is `Some`, all `UnresolvedDatabaseName` nodes matching
661/// `original_db_name` are rewritten at the AST level by appending the suffix.
662/// This is safer than raw text substitution because it only touches identifier
663/// nodes, not string literals or comments.
664fn parse_mod_statements(
665    fs: &crate::fs::FileSystem,
666    path: &Path,
667    original_db_name: &str,
668    profile_suffix: Option<&str>,
669    variables: &BTreeMap<String, String>,
670    profile_set: bool,
671    db: &mut BuildArtifact,
672) -> Result<Option<Vec<Statement<Raw>>>, ProjectError> {
673    if !path.exists() {
674        return Ok(None);
675    }
676
677    let mut entries = db
678        .load_file_contents(fs, &BTreeSet::from([path.to_path_buf()]))
679        .map_err(LoadError::from)?;
680    let sql = entries
681        .remove(path)
682        .ok_or_else(|| LoadError::InvalidFileName {
683            path: path.to_path_buf(),
684        })?;
685    let mut statements: Vec<Statement<Raw>> =
686        parse_statements_with_context(&sql, path.to_path_buf(), variables, profile_set)?
687            .into_iter()
688            .map(|stmt| stmt.ast)
689            .collect();
690    if let Some(suffix) = profile_suffix {
691        crate::project::resolve::normalize::rewrite_database_names(
692            &mut statements,
693            original_db_name,
694            suffix,
695        );
696    }
697    Ok(Some(statements))
698}
699
700/// Intermediate planner classification before cached SQL fragments are loaded.
701///
702/// Phase 1 of the planner produces a stage by comparing the current and stored
703/// fingerprints. Hits become candidates for cache reuse; their SQL fragments
704/// are loaded and parsed in a later phase (see [`finalize_stage`]).
705enum ObjectPlanStage {
706    Hit {
707        object_key: String,
708        fingerprint: String,
709        descriptor: ObjectDescriptor,
710    },
711    Miss {
712        object_key: String,
713        fingerprint: String,
714        descriptor: ObjectDescriptor,
715    },
716    ProjectErr(ProjectError),
717}
718
719/// Classify a single object as a fingerprint-level Hit or Miss against the
720/// stored cache, without loading the cached SQL fragments.
721fn stage_object(
722    descriptor: ObjectDescriptor,
723    existing_fingerprints: &BTreeMap<String, String>,
724    file_hashes: &BTreeMap<PathBuf, String>,
725    variables: &BTreeMap<String, String>,
726) -> ObjectPlanStage {
727    let object_key = object_key(
728        &descriptor.db_name,
729        &descriptor.schema_name,
730        &descriptor.object_name,
731    );
732    let fingerprint = match object_fingerprint(&descriptor, file_hashes, variables) {
733        Ok(fingerprint) => fingerprint,
734        Err(err) => return ObjectPlanStage::ProjectErr(err),
735    };
736
737    if existing_fingerprints.get(&object_key) == Some(&fingerprint) {
738        ObjectPlanStage::Hit {
739            object_key,
740            fingerprint,
741            descriptor,
742        }
743    } else {
744        ObjectPlanStage::Miss {
745            object_key,
746            fingerprint,
747            descriptor,
748        }
749    }
750}
751
752/// Turn a fingerprint-level stage into a final [`ObjectPlanResult`].
753///
754/// For a fingerprint Hit, attempts to reconstruct the typed object by
755/// re-parsing the cached SQL fragments. If any fragment fails to parse the
756/// entry is treated as a Miss for fresh compilation.
757fn finalize_stage(
758    stage: ObjectPlanStage,
759    hit_artifacts: &BTreeMap<String, CompiledObjectArtifact>,
760) -> ObjectPlanResult {
761    match stage {
762        ObjectPlanStage::Hit {
763            object_key,
764            fingerprint,
765            descriptor,
766        } => {
767            let Some(artifact) = hit_artifacts.get(&object_key) else {
768                verbose!(
769                    "recompiling {} after cached object row was missing during artifact load",
770                    object_key
771                );
772                return ObjectPlanResult::Miss {
773                    object_key,
774                    fingerprint,
775                    descriptor,
776                };
777            };
778            match artifact_to_compiled_object(artifact) {
779                Ok(compiled) => ObjectPlanResult::Hit {
780                    object_key,
781                    compiled,
782                    stats: CompileStats {
783                        cache_hits: 1,
784                        cache_misses: 0,
785                    },
786                },
787                Err(()) => {
788                    verbose!(
789                        "recompiling {} after cached object payload could not be reconstructed",
790                        object_key
791                    );
792                    ObjectPlanResult::Miss {
793                        object_key,
794                        fingerprint,
795                        descriptor,
796                    }
797                }
798            }
799        }
800        ObjectPlanStage::Miss {
801            object_key,
802            fingerprint,
803            descriptor,
804        } => ObjectPlanResult::Miss {
805            object_key,
806            fingerprint,
807            descriptor,
808        },
809        ObjectPlanStage::ProjectErr(err) => ObjectPlanResult::ProjectErr(err),
810    }
811}
812
813/// Internal error type for [`compile_object_uncached`].
814///
815/// Separates user-facing validation errors (which should be collected and
816/// reported together) from internal project errors (which abort compilation).
817enum ObjectCompileFailure {
818    Validation(Vec<ValidationError>),
819    Project(ProjectError),
820}
821
822/// Compile a single object from source files without consulting the cache.
823///
824/// Reads the SQL content for each file variant from the pre-loaded
825/// `file_entries` map, parses them into AST, builds an [`input::DatabaseObject`],
826/// and runs object-level validation via [`compiled::DatabaseObject::validate`].
827///
828/// Returns `Ok(Some(...))` for a successfully compiled object, `Ok(None)` if
829/// the object was skipped (no matching profile variant), or an error for
830/// validation failures or I/O problems.
831///
832/// This function is pure (no database writes) and runs in parallel via rayon.
833fn compile_object_uncached(
834    descriptor: ObjectDescriptor,
835    profile: &str,
836    variables: &BTreeMap<String, String>,
837    profile_set: bool,
838    file_entries: &BTreeMap<PathBuf, String>,
839) -> Result<Option<CachedTypedObject>, ObjectCompileFailure> {
840    let mut variants = Vec::new();
841    for variant in descriptor.variants {
842        let sql = file_entries.get(&variant.path).cloned().ok_or_else(|| {
843            ObjectCompileFailure::Project(
844                LoadError::InvalidFileName {
845                    path: variant.path.clone(),
846                }
847                .into(),
848            )
849        })?;
850        let statements =
851            parse_statements_with_context(&sql, variant.path.clone(), variables, profile_set)
852                .map_err(|err| ObjectCompileFailure::Project(err.into()))?;
853        variants.push(input::ObjectVariant {
854            path: variant.path,
855            profile: variant.profile,
856            statements,
857        });
858    }
859
860    // Build the input with the *original* (directory-derived) database name so
861    // that per-object validation compares the user's declared database against
862    // the directory they wrote it under. The suffixed name is reapplied to
863    // dependencies and the statement's own name post-assembly via
864    // `Project::rewrite_database_references`.
865    let raw_object = input::DatabaseObject {
866        name: descriptor.object_name,
867        database: descriptor.original_db_name.clone(),
868        schema: descriptor.schema_name.clone(),
869        variants,
870    };
871
872    match compiled::DatabaseObject::validate(raw_object, profile) {
873        Ok(Some(typed_object)) => Ok(Some(CachedTypedObject {
874            db_name: descriptor.db_name,
875            schema_name: descriptor.schema_name,
876            typed_object,
877        })),
878        Ok(None) => Ok(None),
879        Err(errs) => Err(ObjectCompileFailure::Validation(errs.errors)),
880    }
881}
882
883/// Compile a single object and wrap the result for cache persistence.
884///
885/// Delegates to [`compile_object_uncached`] for the actual compilation, then
886/// serializes the result into a [`CompiledObjectArtifact`] and packages it
887/// with the object key and fingerprint into an [`ObjectStateRow`] ready to
888/// be upserted into the SQLite cache.
889fn compile_object(
890    descriptor: ObjectDescriptor,
891    object_key: String,
892    fingerprint: String,
893    profile: &str,
894    variables: &BTreeMap<String, String>,
895    profile_set: bool,
896    file_entries: &BTreeMap<PathBuf, String>,
897) -> ObjectCompileResult {
898    let compiled =
899        match compile_object_uncached(descriptor, profile, variables, profile_set, file_entries) {
900            Ok(compiled) => compiled,
901            Err(ObjectCompileFailure::Validation(errs)) => {
902                return ObjectCompileResult::ValidationErr(errs);
903            }
904            Err(ObjectCompileFailure::Project(err)) => return ObjectCompileResult::ProjectErr(err),
905        };
906
907    let artifact = match &compiled {
908        Some(object) => CompiledObjectArtifact::Object(compiled_object_to_artifact_data(object)),
909        None => CompiledObjectArtifact::Skipped,
910    };
911
912    ObjectCompileResult::Ok {
913        compiled,
914        state_row: Some(ObjectStateRow {
915            object_key,
916            fingerprint,
917            artifact,
918        }),
919        stats: CompileStats {
920            cache_hits: 0,
921            cache_misses: 1,
922        },
923    }
924}
925
926/// Compute a SHA-256 fingerprint for an object's current compile inputs.
927///
928/// The fingerprint is a hex-encoded hash of:
929///
930/// - the object's logical key (`db_name`, `schema_name`, `object_name`)
931/// - every compile-time variable binding (name and value)
932/// - every file variant's path, profile tag, and content hash
933///
934/// Two invocations produce the same fingerprint if and only if the object's
935/// identity, variables, file paths, and file contents are all identical.
936/// This is the cache key: a matching fingerprint means the cached artifact
937/// is safe to reuse without recompilation.
938fn object_fingerprint(
939    descriptor: &ObjectDescriptor,
940    file_hashes: &BTreeMap<PathBuf, String>,
941    variables: &BTreeMap<String, String>,
942) -> Result<String, ProjectError> {
943    let mut hasher = Sha256::new();
944    hasher.update(descriptor.db_name.as_bytes());
945    hasher.update([0]);
946    hasher.update(descriptor.schema_name.as_bytes());
947    hasher.update([0]);
948    hasher.update(descriptor.object_name.as_bytes());
949    hasher.update([0]);
950    for (name, value) in variables {
951        hasher.update(name.as_bytes());
952        hasher.update([0]);
953        hasher.update(value.as_bytes());
954        hasher.update([0xff]);
955    }
956    for variant in &descriptor.variants {
957        hasher.update(variant.path.to_string_lossy().as_bytes());
958        hasher.update([0]);
959        hasher.update(variant.profile.as_deref().unwrap_or("").as_bytes());
960        hasher.update([0]);
961        let content_hash =
962            file_hashes
963                .get(&variant.path)
964                .ok_or_else(|| LoadError::InvalidFileName {
965                    path: variant.path.clone(),
966                })?;
967        hasher.update(content_hash.as_bytes());
968        hasher.update([0xfe]);
969    }
970    Ok(hex_digest(hasher.finalize()))
971}
972
973/// Build the canonical cache key for a logical object: `"db.schema.object"`.
974fn object_key(db_name: &str, schema_name: &str, object_name: &str) -> String {
975    format!("{db_name}.{schema_name}.{object_name}")
976}
977
978/// Compute the cache namespace for a profile configuration.
979///
980/// Returns a hex-encoded SHA-256 hash of the profile name, optional suffix,
981/// and variable bindings. This ensures that different profile/suffix/variable
982/// combinations use isolated SQLite databases under `target/compiler/`,
983/// preventing cross-contamination of cached artifacts.
984pub(crate) fn profile_namespace(
985    profile: &str,
986    profile_suffix: Option<&str>,
987    variables: &BTreeMap<String, String>,
988) -> String {
989    let mut hasher = Sha256::new();
990    hasher.update(profile.as_bytes());
991    hasher.update([0]);
992    hasher.update(profile_suffix.unwrap_or("").as_bytes());
993    hasher.update([0]);
994    for (name, value) in variables {
995        hasher.update(name.as_bytes());
996        hasher.update([0]);
997        hasher.update(value.as_bytes());
998        hasher.update([0xff]);
999    }
1000    hex_digest(hasher.finalize())
1001}
1002
1003/// Convert a freshly compiled object into the SQL-text shape persisted in the
1004/// cache. Each AST node is rendered with a trailing semicolon so the inverse
1005/// parse in [`artifact_to_compiled_object`] sees a self-contained statement.
1006fn compiled_object_to_artifact_data(object: &CachedTypedObject) -> CompiledObjectArtifactData {
1007    CompiledObjectArtifactData {
1008        db_name: object.db_name.clone(),
1009        schema_name: object.schema_name.clone(),
1010        file_path: object.typed_object.path.clone(),
1011        stmt_sql: format!("{};", object.typed_object.stmt),
1012        indexes_sql: object
1013            .typed_object
1014            .indexes
1015            .iter()
1016            .map(|stmt| format!("{};", stmt))
1017            .collect(),
1018        grants_sql: object
1019            .typed_object
1020            .grants
1021            .iter()
1022            .map(|stmt| format!("{};", stmt))
1023            .collect(),
1024        comments_sql: object
1025            .typed_object
1026            .comments
1027            .iter()
1028            .map(|stmt| format!("{};", stmt))
1029            .collect(),
1030        tests_sql: object
1031            .typed_object
1032            .tests
1033            .iter()
1034            .map(|stmt| format!("{};", stmt))
1035            .collect(),
1036    }
1037}
1038
1039/// Reconstruct a [`CachedTypedObject`] by re-parsing the cached SQL fragments.
1040///
1041/// Returns `Ok(None)` for `Skipped` entries. Returns `Err(())` if any fragment
1042/// fails to parse — the caller should treat this as a cache miss and recompile
1043/// the object from source.
1044fn artifact_to_compiled_object(
1045    artifact: &CompiledObjectArtifact,
1046) -> Result<Option<CachedTypedObject>, ()> {
1047    match artifact {
1048        CompiledObjectArtifact::Skipped => Ok(None),
1049        CompiledObjectArtifact::Object(data) => Ok(Some(CachedTypedObject {
1050            db_name: data.db_name.clone(),
1051            schema_name: data.schema_name.clone(),
1052            typed_object: compiled::DatabaseObject {
1053                path: data.file_path.clone(),
1054                stmt: parse_main_statement(&data.stmt_sql)?,
1055                indexes: parse_statement_list(&data.indexes_sql, expect_index)?,
1056                grants: parse_statement_list(&data.grants_sql, expect_grant)?,
1057                comments: parse_statement_list(&data.comments_sql, expect_comment)?,
1058                tests: parse_statement_list(&data.tests_sql, expect_test)?,
1059            },
1060        })),
1061    }
1062}
1063
1064/// Parse a SQL string into a list of raw AST statements.
1065///
1066/// Returns `Err(())` on any parse failure. Used only for cache reconstruction
1067/// where detailed error reporting is unnecessary — a parse failure simply
1068/// means the cache entry is stale.
1069fn parse_sql(sql: &str) -> Result<Vec<Statement<Raw>>, ()> {
1070    mz_sql_parser::parser::parse_statements_with_limit(sql)
1071        .map_err(|_| ())?
1072        .map(|stmts| stmts.into_iter().map(|stmt| stmt.ast).collect())
1073        .map_err(|_| ())
1074}
1075
1076/// Parse a SQL string that must contain exactly one statement.
1077///
1078/// Returns `Err(())` if parsing fails or the string contains zero or
1079/// multiple statements.
1080fn parse_one_statement(sql: &str) -> Result<Statement<Raw>, ()> {
1081    let mut statements = parse_sql(sql)?;
1082    if statements.len() != 1 {
1083        return Err(());
1084    }
1085    Ok(statements.remove(0))
1086}
1087
1088/// Parse a list of SQL strings and downcast each to a specific statement type.
1089///
1090/// Each string is parsed via [`parse_one_statement`], then passed through
1091/// `parser` to extract the expected AST variant (e.g., `CreateIndexStatement`).
1092/// Returns `Err(())` if any string fails to parse or has the wrong statement type.
1093fn parse_statement_list<T>(
1094    sql_statements: &[String],
1095    parser: fn(Statement<Raw>) -> Result<T, ()>,
1096) -> Result<Vec<T>, ()> {
1097    sql_statements
1098        .iter()
1099        .map(|sql| parse_one_statement(sql).and_then(parser))
1100        .collect()
1101}
1102
1103/// Parse a cached main statement SQL string into the project's [`Statement`](crate::project::ast::Statement) enum.
1104///
1105/// Only the statement types that mz-deploy manages as database objects are
1106/// accepted: views, materialized views, tables, table-from-source, sources,
1107/// sinks, secrets, and connections. Any other statement type returns `Err(())`.
1108fn parse_main_statement(sql: &str) -> Result<crate::project::ast::Statement, ()> {
1109    match parse_one_statement(sql)? {
1110        Statement::CreateSink(stmt) => Ok(crate::project::ast::Statement::CreateSink(stmt)),
1111        Statement::CreateView(stmt) => Ok(crate::project::ast::Statement::CreateView(stmt)),
1112        Statement::CreateMaterializedView(stmt) => {
1113            Ok(crate::project::ast::Statement::CreateMaterializedView(stmt))
1114        }
1115        Statement::CreateTable(stmt) => Ok(crate::project::ast::Statement::CreateTable(stmt)),
1116        Statement::CreateTableFromSource(stmt) => {
1117            Ok(crate::project::ast::Statement::CreateTableFromSource(stmt))
1118        }
1119        Statement::CreateSource(stmt) => Ok(crate::project::ast::Statement::CreateSource(stmt)),
1120        Statement::CreateSecret(stmt) => Ok(crate::project::ast::Statement::CreateSecret(stmt)),
1121        Statement::CreateConnection(stmt) => {
1122            Ok(crate::project::ast::Statement::CreateConnection(stmt))
1123        }
1124        _ => Err(()),
1125    }
1126}
1127
1128/// Extract a [`CreateIndexStatement`] from a generic `Statement`, or `Err(())`.
1129fn expect_index(stmt: Statement<Raw>) -> Result<CreateIndexStatement<Raw>, ()> {
1130    match stmt {
1131        Statement::CreateIndex(stmt) => Ok(stmt),
1132        _ => Err(()),
1133    }
1134}
1135
1136/// Extract a [`GrantPrivilegesStatement`] from a generic `Statement`, or `Err(())`.
1137fn expect_grant(stmt: Statement<Raw>) -> Result<GrantPrivilegesStatement<Raw>, ()> {
1138    match stmt {
1139        Statement::GrantPrivileges(stmt) => Ok(stmt),
1140        _ => Err(()),
1141    }
1142}
1143
1144/// Extract a [`CommentStatement`] from a generic `Statement`, or `Err(())`.
1145fn expect_comment(stmt: Statement<Raw>) -> Result<CommentStatement<Raw>, ()> {
1146    match stmt {
1147        Statement::Comment(stmt) => Ok(stmt),
1148        _ => Err(()),
1149    }
1150}
1151
1152/// Extract an [`ExecuteUnitTestStatement`] from a generic `Statement`, or `Err(())`.
1153fn expect_test(stmt: Statement<Raw>) -> Result<ExecuteUnitTestStatement<Raw>, ()> {
1154    match stmt {
1155        Statement::ExecuteUnitTest(stmt) => Ok(stmt),
1156        _ => Err(()),
1157    }
1158}