Skip to main content

mz_deploy/project/compiler/cache/
build_artifact.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Persistence layer for the incremental compiler.
11//!
12//! Stores advisory build state scoped to one profile namespace. The database
13//! persists four categories of state:
14//!
15//! - **File metadata** — Content hashes and source text, keyed by source path.
16//!   Freshness is determined by file size and modification time; stale entries
17//!   are transparently refreshed from disk.
18//! - **Object artifacts** — Compiled object payloads, keyed by logical object
19//!   identifier and content fingerprint.
20//! - **Typecheck artifacts** — Per-object validation results (fingerprints,
21//!   column schemas), used for incremental dirty detection.
22//! - **Project snapshot** — Full compiled project graph for read-only consumers
23//!   (LSP, explain).
24//!
25//! All cached state is advisory. Missing, corrupt, or schema-incompatible
26//! entries are treated as cache misses and rebuilt from source. The compiler
27//! owns the schema version; a version mismatch triggers a full rebuild of the
28//! namespace-local database.
29
30use super::CacheError;
31use super::schema;
32use crate::project::ast::Statement;
33use crate::project::compiler::cache_io::hex_digest;
34use crate::project::ir::graph;
35use crate::project::ir::infrastructure::{self, Infrastructure};
36use crate::project::ir::object_id::ObjectId;
37use crate::project::resolve::cte_scope::CteScope;
38use crate::types::ColumnType;
39use mz_sql_parser::ast::visit::{self, Visit};
40use mz_sql_parser::ast::{CommentObjectType, Raw, RawClusterName, TableFactor};
41use rusqlite::{Connection, OptionalExtension, params};
42use sha2::{Digest, Sha256};
43use std::collections::{BTreeMap, BTreeSet};
44use std::fs;
45use std::path::{Path, PathBuf};
46use std::time::UNIX_EPOCH;
47
48const OBJECT_STATE_TABLE: &str = "object_state";
49const TYPECHECK_OBJECTS_TABLE: &str = "typecheck_objects";
50const TYPECHECK_COLUMNS_TABLE: &str = "typecheck_columns";
51
52fn db_err(path: &Path) -> impl Fn(rusqlite::Error) -> CacheError + '_ {
53    move |source| CacheError::DatabaseOperationFailed {
54        path: path.to_path_buf(),
55        source,
56    }
57}
58
59fn file_read_err(path: &Path) -> impl Fn(std::io::Error) -> CacheError + '_ {
60    move |source| CacheError::FileReadFailed {
61        path: path.to_path_buf(),
62        source,
63    }
64}
65
66const FILE_STATE_UPSERT: &str = "
67    INSERT INTO file_state(path, size, mtime_ns, content_hash, contents)
68    VALUES(?1, ?2, ?3, ?4, ?5)
69    ON CONFLICT(path) DO UPDATE SET
70        size = excluded.size,
71        mtime_ns = excluded.mtime_ns,
72        content_hash = excluded.content_hash,
73        contents = excluded.contents
74";
75
76/// Read the file at `path`, compute its hash, and upsert the row into
77/// `file_state`. Returns the hash and contents.
78fn read_and_upsert_file(
79    upsert: &mut rusqlite::Statement<'_>,
80    path: &Path,
81    path_str: &str,
82    size: i64,
83    mtime_ns: i64,
84    db_path: &Path,
85) -> Result<(String, String), CacheError> {
86    let contents = fs::read_to_string(path).map_err(file_read_err(path))?;
87    let content_hash = hex_digest(Sha256::digest(contents.as_bytes()));
88    upsert
89        .execute(params![path_str, size, mtime_ns, &content_hash, &contents])
90        .map_err(db_err(db_path))?;
91    Ok((content_hash, contents))
92}
93
94/// A persisted compile artifact for one logical object.
95///
96/// `Skipped` records that the object was intentionally excluded for the active
97/// profile (e.g., a profile variant that doesn't match). `Object` carries the
98/// SQL strings that constitute a compiled object — they are stored verbatim
99/// and re-parsed on cache hit.
100#[derive(Debug, Clone)]
101pub(crate) enum CompiledObjectArtifact {
102    Skipped,
103    Object(CompiledObjectArtifactData),
104}
105
106/// SQL fragments that together describe a compiled database object.
107///
108/// Stores SQL text rather than AST nodes because the AST types are not
109/// `Serialize`-able. On cache hit, the strings are re-parsed back into AST.
110#[derive(Debug, Clone)]
111pub(crate) struct CompiledObjectArtifactData {
112    pub db_name: String,
113    pub schema_name: String,
114    pub file_path: PathBuf,
115    pub stmt_sql: String,
116    pub indexes_sql: Vec<String>,
117    pub grants_sql: Vec<String>,
118    pub comments_sql: Vec<String>,
119    pub tests_sql: Vec<String>,
120}
121
122/// Column values to write into the `object_state` row, derived from a
123/// [`CompiledObjectArtifact`].
124struct ObjectStateHeader<'a> {
125    kind: &'static str,
126    db_name: Option<&'a str>,
127    schema_name: Option<&'a str>,
128    file_path: Option<String>,
129    stmt_sql: Option<&'a str>,
130}
131
132impl<'a> ObjectStateHeader<'a> {
133    fn from_artifact(artifact: &'a CompiledObjectArtifact) -> Self {
134        match artifact {
135            CompiledObjectArtifact::Skipped => Self {
136                kind: "skipped",
137                db_name: None,
138                schema_name: None,
139                file_path: None,
140                stmt_sql: None,
141            },
142            CompiledObjectArtifact::Object(data) => Self {
143                kind: "object",
144                db_name: Some(&data.db_name),
145                schema_name: Some(&data.schema_name),
146                file_path: Some(data.file_path.to_string_lossy().into_owned()),
147                stmt_sql: Some(&data.stmt_sql),
148            },
149        }
150    }
151}
152
153fn prepare_delete<'tx>(
154    tx: &'tx rusqlite::Transaction<'_>,
155    table: &str,
156    path: &Path,
157) -> Result<rusqlite::Statement<'tx>, CacheError> {
158    tx.prepare(&format!("DELETE FROM {table} WHERE object_key = ?1"))
159        .map_err(db_err(path))
160}
161
162fn prepare_fragment_insert<'tx>(
163    tx: &'tx rusqlite::Transaction<'_>,
164    table: &str,
165    path: &Path,
166) -> Result<rusqlite::Statement<'tx>, CacheError> {
167    tx.prepare(&format!(
168        "INSERT INTO {table}(object_key, position, sql_text) VALUES(?1, ?2, ?3)"
169    ))
170    .map_err(db_err(path))
171}
172
173fn run_execute(
174    stmt: &mut rusqlite::Statement<'_>,
175    params: impl rusqlite::Params,
176    path: &Path,
177) -> Result<(), CacheError> {
178    stmt.execute(params).map(|_| ()).map_err(db_err(path))
179}
180
181fn write_fragments(
182    stmt: &mut rusqlite::Statement<'_>,
183    object_key: &str,
184    fragments: &[String],
185    path: &Path,
186) -> Result<(), CacheError> {
187    for (position, sql) in fragments.iter().enumerate() {
188        let position = i64::try_from(position).unwrap_or(i64::MAX);
189        stmt.execute(params![object_key, position, sql])
190            .map_err(db_err(path))?;
191    }
192    Ok(())
193}
194
195fn collect_fragments(
196    stmt: &mut rusqlite::Statement<'_>,
197    object_key: &str,
198    path: &Path,
199) -> Result<Vec<String>, CacheError> {
200    let rows = stmt
201        .query_map(params![object_key], |row| row.get::<_, String>(0))
202        .map_err(db_err(path))?;
203    let mut out = Vec::new();
204    for row in rows {
205        out.push(row.map_err(db_err(path))?);
206    }
207    Ok(out)
208}
209
210pub(crate) struct BuildArtifact {
211    path: PathBuf,
212    conn: Connection,
213}
214
215impl BuildArtifact {
216    /// Open (or create) the SQLite build artifact database for a profile namespace.
217    ///
218    /// The namespace directory is derived from the active profile name, optional
219    /// suffix, and compile-time variable bindings, so different profiles use
220    /// isolated caches. On schema version mismatch the database is dropped and
221    /// recreated — safe because all cached state is advisory.
222    pub(crate) fn open(
223        root: &Path,
224        profile: &str,
225        profile_suffix: Option<&str>,
226        variables: &BTreeMap<String, String>,
227    ) -> Result<Self, CacheError> {
228        let path = super::db_path(root, profile, profile_suffix, variables);
229        let parent = path
230            .parent()
231            .expect("cache db path always has a parent directory");
232        fs::create_dir_all(parent).map_err(|source| CacheError::DirectoryCreationFailed {
233            path: parent.to_path_buf(),
234            source,
235        })?;
236        let conn = Connection::open(&path).map_err(|source| CacheError::DatabaseOpenFailed {
237            path: path.clone(),
238            source,
239        })?;
240        let db = Self { path, conn };
241        db.initialize()?;
242        Ok(db)
243    }
244
245    fn initialize(&self) -> Result<(), CacheError> {
246        // Negative cache_size is in KiB; positive would be page count.
247        const PAGE_CACHE_KIB: i64 = -64 * 1024;
248        const MMAP_BYTES: i64 = 256 * 1024 * 1024;
249        const WAL_AUTOCHECKPOINT_PAGES: i64 = 10_000;
250        let pragmas = format!(
251            "
252            PRAGMA journal_mode=WAL;
253            PRAGMA synchronous=NORMAL;
254            PRAGMA cache_size={PAGE_CACHE_KIB};
255            PRAGMA temp_store=MEMORY;
256            PRAGMA mmap_size={MMAP_BYTES};
257            PRAGMA wal_autocheckpoint={WAL_AUTOCHECKPOINT_PAGES};
258            ",
259        );
260        self.conn
261            .execute_batch(&pragmas)
262            .map_err(db_err(&self.path))?;
263        self.conn
264            .execute_batch(
265                "
266                CREATE TABLE IF NOT EXISTS meta (
267                    key TEXT PRIMARY KEY,
268                    value TEXT NOT NULL
269                );
270                ",
271            )
272            .map_err(db_err(&self.path))?;
273
274        let version: Option<i64> = self
275            .conn
276            .query_row(
277                "SELECT value FROM meta WHERE key = 'schema_version'",
278                [],
279                |row| {
280                    row.get::<_, String>(0)
281                        .map(|s| s.parse::<i64>().unwrap_or_default())
282                },
283            )
284            .optional()
285            .map_err(db_err(&self.path))?;
286
287        if version != Some(schema::SCHEMA_VERSION) {
288            self.conn
289                .execute_batch(schema::DROP_SQL)
290                .map_err(db_err(&self.path))?;
291        }
292
293        self.create_schema()?;
294        Ok(())
295    }
296
297    fn create_schema(&self) -> Result<(), CacheError> {
298        self.conn
299            .execute_batch(schema::CREATE_SQL)
300            .map_err(db_err(&self.path))?;
301
302        self.conn
303            .execute(
304                "INSERT OR REPLACE INTO meta(key, value) VALUES ('schema_version', ?1)",
305                params![schema::SCHEMA_VERSION.to_string()],
306            )
307            .map_err(db_err(&self.path))?;
308        Ok(())
309    }
310
311    /// Load content hashes for the requested source paths. Stale or missing
312    /// cache entries are transparently refreshed from disk.
313    pub(crate) fn load_file_hashes(
314        &mut self,
315        fs: &crate::fs::FileSystem,
316        paths: &BTreeSet<PathBuf>,
317    ) -> Result<BTreeMap<PathBuf, String>, CacheError> {
318        let tx = self.conn.transaction().map_err(db_err(&self.path))?;
319        let mut select = tx
320            .prepare("SELECT size, mtime_ns, content_hash FROM file_state WHERE path = ?1")
321            .map_err(db_err(&self.path))?;
322        let mut upsert = tx.prepare(FILE_STATE_UPSERT).map_err(db_err(&self.path))?;
323
324        let mut results = BTreeMap::new();
325        for path in paths {
326            // Overlay-covered paths bypass the disk-keyed content cache:
327            // disk size+mtime are unchanged while the in-memory buffer can
328            // differ, so a cache hit would serve stale disk bytes.
329            if fs.is_overlay(path) {
330                let contents = fs.read_to_string(path).map_err(file_read_err(path))?;
331                results.insert(
332                    path.clone(),
333                    hex_digest(Sha256::digest(contents.as_bytes())),
334                );
335                continue;
336            }
337
338            let (size, mtime_ns) = file_metadata_signature(path)?;
339            let path_str = path.to_string_lossy().to_string();
340            let cached: Option<(i64, i64, String)> = select
341                .query_row([&path_str], |row| {
342                    Ok((row.get(0)?, row.get(1)?, row.get(2)?))
343                })
344                .optional()
345                .map_err(db_err(&self.path))?;
346
347            let hash = match cached {
348                Some((s, m, h)) if s == size && m == mtime_ns => h,
349                _ => {
350                    let (h, _) = read_and_upsert_file(
351                        &mut upsert,
352                        path,
353                        &path_str,
354                        size,
355                        mtime_ns,
356                        &self.path,
357                    )?;
358                    h
359                }
360            };
361            results.insert(path.clone(), hash);
362        }
363
364        drop(select);
365        drop(upsert);
366        tx.commit().map_err(db_err(&self.path))?;
367        Ok(results)
368    }
369
370    /// Load file contents for the requested source paths. Stale, missing,
371    /// or contents-NULL cache entries are transparently refreshed from disk.
372    pub(crate) fn load_file_contents(
373        &mut self,
374        fs: &crate::fs::FileSystem,
375        paths: &BTreeSet<PathBuf>,
376    ) -> Result<BTreeMap<PathBuf, String>, CacheError> {
377        let tx = self.conn.transaction().map_err(db_err(&self.path))?;
378        let mut select = tx
379            .prepare(
380                "SELECT size, mtime_ns, contents \
381                 FROM file_state WHERE path = ?1",
382            )
383            .map_err(db_err(&self.path))?;
384        let mut upsert = tx.prepare(FILE_STATE_UPSERT).map_err(db_err(&self.path))?;
385
386        let mut results = BTreeMap::new();
387        for path in paths {
388            if fs.is_overlay(path) {
389                let contents = fs.read_to_string(path).map_err(file_read_err(path))?;
390                results.insert(path.clone(), contents);
391                continue;
392            }
393
394            let (size, mtime_ns) = file_metadata_signature(path)?;
395            let path_str = path.to_string_lossy().to_string();
396            let cached: Option<(i64, i64, Option<String>)> = select
397                .query_row([&path_str], |row| {
398                    Ok((row.get(0)?, row.get(1)?, row.get(2)?))
399                })
400                .optional()
401                .map_err(db_err(&self.path))?;
402
403            let contents = match cached {
404                Some((s, m, Some(c))) if s == size && m == mtime_ns => c,
405                _ => {
406                    let (_, contents) = read_and_upsert_file(
407                        &mut upsert,
408                        path,
409                        &path_str,
410                        size,
411                        mtime_ns,
412                        &self.path,
413                    )?;
414                    contents
415                }
416            };
417            results.insert(path.clone(), contents);
418        }
419
420        drop(select);
421        drop(upsert);
422        tx.commit().map_err(db_err(&self.path))?;
423        Ok(results)
424    }
425
426    /// Load just the (object_key, fingerprint) pairs from `object_state`.
427    ///
428    /// Used during the planning phase to detect cache hits without paying the
429    /// cost of materializing each object's SQL fragments.
430    pub(crate) fn load_object_fingerprints(&self) -> Result<BTreeMap<String, String>, CacheError> {
431        let mut stmt = self
432            .conn
433            .prepare("SELECT object_key, fingerprint FROM object_state")
434            .map_err(db_err(&self.path))?;
435        let rows = stmt
436            .query_map([], |row| {
437                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
438            })
439            .map_err(db_err(&self.path))?;
440
441        let mut result = BTreeMap::new();
442        for row in rows {
443            let (key, fingerprint) = row.map_err(db_err(&self.path))?;
444            result.insert(key, fingerprint);
445        }
446        Ok(result)
447    }
448
449    /// Load full compile artifacts for the requested object keys. Only keys
450    /// present in `object_state` produce an entry.
451    pub(crate) fn load_object_artifacts(
452        &self,
453        keys: &BTreeSet<String>,
454    ) -> Result<BTreeMap<String, CompiledObjectArtifact>, CacheError> {
455        let mut result = BTreeMap::new();
456        if keys.is_empty() {
457            return Ok(result);
458        }
459
460        let mut header = self
461            .conn
462            .prepare(
463                "SELECT kind, db_name, schema_name, file_path, stmt_sql \
464                 FROM object_state WHERE object_key = ?1",
465            )
466            .map_err(db_err(&self.path))?;
467        let mut indexes = self.prepare_fragment_select("object_state_indexes")?;
468        let mut grants = self.prepare_fragment_select("object_state_grants")?;
469        let mut comments = self.prepare_fragment_select("object_state_comments")?;
470        let mut tests = self.prepare_fragment_select("object_state_tests")?;
471
472        for key in keys {
473            let row = header
474                .query_row(params![key], |row| {
475                    Ok((
476                        row.get::<_, String>(0)?,
477                        row.get::<_, Option<String>>(1)?,
478                        row.get::<_, Option<String>>(2)?,
479                        row.get::<_, Option<String>>(3)?,
480                        row.get::<_, Option<String>>(4)?,
481                    ))
482                })
483                .optional()
484                .map_err(db_err(&self.path))?;
485            let Some((kind, db_name, schema_name, file_path, stmt_sql)) = row else {
486                continue;
487            };
488            let artifact = match kind.as_str() {
489                "skipped" => CompiledObjectArtifact::Skipped,
490                "object" => CompiledObjectArtifact::Object(CompiledObjectArtifactData {
491                    db_name: db_name.unwrap_or_default(),
492                    schema_name: schema_name.unwrap_or_default(),
493                    file_path: file_path.map(PathBuf::from).unwrap_or_default(),
494                    stmt_sql: stmt_sql.unwrap_or_default(),
495                    indexes_sql: collect_fragments(&mut indexes, key, &self.path)?,
496                    grants_sql: collect_fragments(&mut grants, key, &self.path)?,
497                    comments_sql: collect_fragments(&mut comments, key, &self.path)?,
498                    tests_sql: collect_fragments(&mut tests, key, &self.path)?,
499                }),
500                _ => continue,
501            };
502            result.insert(key.clone(), artifact);
503        }
504        Ok(result)
505    }
506
507    fn prepare_fragment_select(&self, table: &str) -> Result<rusqlite::Statement<'_>, CacheError> {
508        self.conn
509            .prepare(&format!(
510                "SELECT sql_text FROM {table} WHERE object_key = ?1 ORDER BY position"
511            ))
512            .map_err(db_err(&self.path))
513    }
514
515    /// Replace each object's fragment rows so row-count changes (e.g., a
516    /// removed index) are reflected exactly.
517    pub(crate) fn upsert_object_rows(&mut self, rows: &[ObjectStateRow]) -> Result<(), CacheError> {
518        let tx = self.conn.transaction().map_err(db_err(&self.path))?;
519        {
520            let mut upsert_header = tx
521                .prepare(
522                    "
523                    INSERT INTO object_state(
524                        object_key, fingerprint, kind, db_name, schema_name, file_path, stmt_sql
525                    )
526                    VALUES(?1, ?2, ?3, ?4, ?5, ?6, ?7)
527                    ON CONFLICT(object_key) DO UPDATE SET
528                        fingerprint = excluded.fingerprint,
529                        kind = excluded.kind,
530                        db_name = excluded.db_name,
531                        schema_name = excluded.schema_name,
532                        file_path = excluded.file_path,
533                        stmt_sql = excluded.stmt_sql
534                    ",
535                )
536                .map_err(db_err(&self.path))?;
537            let mut delete_indexes = prepare_delete(&tx, "object_state_indexes", &self.path)?;
538            let mut delete_grants = prepare_delete(&tx, "object_state_grants", &self.path)?;
539            let mut delete_comments = prepare_delete(&tx, "object_state_comments", &self.path)?;
540            let mut delete_tests = prepare_delete(&tx, "object_state_tests", &self.path)?;
541            let mut insert_indexes =
542                prepare_fragment_insert(&tx, "object_state_indexes", &self.path)?;
543            let mut insert_grants =
544                prepare_fragment_insert(&tx, "object_state_grants", &self.path)?;
545            let mut insert_comments =
546                prepare_fragment_insert(&tx, "object_state_comments", &self.path)?;
547            let mut insert_tests = prepare_fragment_insert(&tx, "object_state_tests", &self.path)?;
548
549            for row in rows {
550                let header = ObjectStateHeader::from_artifact(&row.artifact);
551                upsert_header
552                    .execute(params![
553                        row.object_key,
554                        row.fingerprint,
555                        header.kind,
556                        header.db_name,
557                        header.schema_name,
558                        header.file_path,
559                        header.stmt_sql,
560                    ])
561                    .map_err(db_err(&self.path))?;
562
563                run_execute(&mut delete_indexes, params![row.object_key], &self.path)?;
564                run_execute(&mut delete_grants, params![row.object_key], &self.path)?;
565                run_execute(&mut delete_comments, params![row.object_key], &self.path)?;
566                run_execute(&mut delete_tests, params![row.object_key], &self.path)?;
567
568                if let CompiledObjectArtifact::Object(data) = &row.artifact {
569                    write_fragments(
570                        &mut insert_indexes,
571                        &row.object_key,
572                        &data.indexes_sql,
573                        &self.path,
574                    )?;
575                    write_fragments(
576                        &mut insert_grants,
577                        &row.object_key,
578                        &data.grants_sql,
579                        &self.path,
580                    )?;
581                    write_fragments(
582                        &mut insert_comments,
583                        &row.object_key,
584                        &data.comments_sql,
585                        &self.path,
586                    )?;
587                    write_fragments(
588                        &mut insert_tests,
589                        &row.object_key,
590                        &data.tests_sql,
591                        &self.path,
592                    )?;
593                }
594            }
595        }
596        tx.commit().map_err(db_err(&self.path))
597    }
598
599    /// Remove `object_state` rows for objects no longer in the current project.
600    pub(crate) fn prune_object_rows(&mut self, keep: &BTreeSet<String>) -> Result<(), CacheError> {
601        self.prune_rows(OBJECT_STATE_TABLE, keep)
602    }
603
604    /// Persist or update typecheck artifacts for a batch of objects.
605    ///
606    /// Column records for an object are fully replaced (no partial updates).
607    pub(crate) fn upsert_typecheck_results(
608        &mut self,
609        rows: &[(String, String, BTreeMap<String, ColumnType>)],
610    ) -> Result<(), CacheError> {
611        let tx = self.conn.transaction().map_err(db_err(&self.path))?;
612        {
613            let mut upsert_obj = tx
614                .prepare(
615                    "
616                    INSERT INTO typecheck_objects(object_key, object_kind)
617                    VALUES(?1, ?2)
618                    ON CONFLICT(object_key) DO UPDATE SET
619                        object_kind = excluded.object_kind
620                    ",
621                )
622                .map_err(db_err(&self.path))?;
623            let mut delete_cols = tx
624                .prepare("DELETE FROM typecheck_columns WHERE object_key = ?1")
625                .map_err(db_err(&self.path))?;
626            let mut insert_col = tx
627                .prepare(
628                    "INSERT INTO typecheck_columns(object_key, column_name, column_type, nullable, position)
629                     VALUES(?1, ?2, ?3, ?4, ?5)",
630                )
631                .map_err(db_err(&self.path))?;
632
633            for (key, kind, columns) in rows {
634                upsert_obj
635                    .execute(params![key, kind])
636                    .map_err(db_err(&self.path))?;
637                delete_cols.execute([key]).map_err(db_err(&self.path))?;
638                for (col_name, col_type) in columns {
639                    insert_col
640                        .execute(params![
641                            key,
642                            col_name,
643                            col_type.r#type,
644                            i32::from(col_type.nullable),
645                            i64::try_from(col_type.position).unwrap_or(0),
646                        ])
647                        .map_err(db_err(&self.path))?;
648                }
649            }
650        }
651        tx.commit().map_err(db_err(&self.path))
652    }
653
654    /// Remove stale typecheck artifacts for objects no longer in the current
655    /// project.
656    pub(crate) fn prune_typecheck_results(
657        &mut self,
658        keep: &BTreeSet<String>,
659    ) -> Result<(), CacheError> {
660        self.prune_rows(TYPECHECK_COLUMNS_TABLE, keep)?;
661        self.prune_rows(TYPECHECK_OBJECTS_TABLE, keep)
662    }
663
664    /// Load every cached typecheck column row, grouped by object key.
665    ///
666    /// Single round-trip so the parallel typecheck DAG can run without
667    /// holding a live SQLite connection.
668    pub(crate) fn load_typecheck_columns(
669        &self,
670    ) -> Result<BTreeMap<String, BTreeMap<String, ColumnType>>, CacheError> {
671        let mut stmt = self
672            .conn
673            .prepare(
674                "SELECT object_key, column_name, column_type, nullable, position \
675                 FROM typecheck_columns",
676            )
677            .map_err(db_err(&self.path))?;
678        let rows = stmt
679            .query_map([], |row| {
680                Ok((
681                    row.get::<_, String>(0)?,
682                    row.get::<_, String>(1)?,
683                    ColumnType {
684                        r#type: row.get(2)?,
685                        nullable: row.get::<_, i32>(3)? != 0,
686                        position: usize::try_from(row.get::<_, i64>(4)?).unwrap_or(0),
687                        comment: None,
688                    },
689                ))
690            })
691            .map_err(db_err(&self.path))?;
692        let mut out: BTreeMap<String, BTreeMap<String, ColumnType>> = BTreeMap::new();
693        for row in rows {
694            let (key, name, ty) = row.map_err(db_err(&self.path))?;
695            out.entry(key).or_default().insert(name, ty);
696        }
697        Ok(out)
698    }
699
700    /// Load the digest map for external types (object_key -> digest).
701    pub(crate) fn load_external_type_digests(
702        &self,
703    ) -> Result<BTreeMap<String, String>, CacheError> {
704        let mut stmt = self
705            .conn
706            .prepare("SELECT object_key, digest FROM external_type_digest")
707            .map_err(db_err(&self.path))?;
708        let rows = stmt
709            .query_map([], |row| {
710                Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
711            })
712            .map_err(db_err(&self.path))?;
713        let mut out = BTreeMap::new();
714        for row in rows {
715            let (key, digest) = row.map_err(db_err(&self.path))?;
716            out.insert(key, digest);
717        }
718        Ok(out)
719    }
720
721    /// Replace the cached external-type digest set with the provided rows and
722    /// drop any rows for keys not present.
723    pub(crate) fn replace_external_type_digests(
724        &mut self,
725        digests: &BTreeMap<String, String>,
726    ) -> Result<(), CacheError> {
727        let db_err = db_err(&self.path);
728        let tx = self.conn.transaction().map_err(&db_err)?;
729        tx.execute("DELETE FROM external_type_digest", [])
730            .map_err(&db_err)?;
731        {
732            let mut stmt = tx
733                .prepare("INSERT INTO external_type_digest(object_key, digest) VALUES(?1, ?2)")
734                .map_err(&db_err)?;
735            for (key, digest) in digests {
736                stmt.execute(params![key, digest]).map_err(&db_err)?;
737            }
738        }
739        tx.commit().map_err(&db_err)
740    }
741
742    /// Rewrites per-object rows for `changed_keys ∪ deleted_keys`; small
743    /// project-wide tables are rewritten in full.
744    pub(crate) fn write_project(
745        &mut self,
746        project: &graph::Project,
747        changed_keys: &BTreeSet<String>,
748        deleted_keys: &BTreeSet<String>,
749        root: &Path,
750    ) -> Result<(), CacheError> {
751        const PER_OBJECT_TABLES: &[&str] = &[
752            "project_objects",
753            "project_dependencies",
754            "project_comments",
755            "project_indexes",
756            "project_grants",
757            "project_tests",
758            "project_aliases",
759            "project_infrastructure",
760            "project_infrastructure_properties",
761        ];
762
763        let db_err = db_err(&self.path);
764
765        let tx = self.conn.transaction().map_err(&db_err)?;
766
767        if !changed_keys.is_empty() || !deleted_keys.is_empty() {
768            for table in PER_OBJECT_TABLES {
769                let mut stmt = tx
770                    .prepare(&format!("DELETE FROM {table} WHERE object_key = ?1"))
771                    .map_err(&db_err)?;
772                for key in changed_keys.iter().chain(deleted_keys.iter()) {
773                    stmt.execute(params![key]).map_err(&db_err)?;
774                }
775            }
776        }
777
778        tx.execute_batch(
779            "
780            DELETE FROM project_databases;
781            DELETE FROM project_schemas;
782            DELETE FROM project_external_dependencies;
783            DELETE FROM project_cluster_dependencies;
784            DELETE FROM project_replacement_schemas;
785            DELETE FROM project_mod_statements;
786            ",
787        )
788        .map_err(&db_err)?;
789
790        {
791            let mut stmts = ProjectStatements::new(&tx, &db_err)?;
792
793            for db in &project.databases {
794                stmts.ins_db.execute(params![&db.name]).map_err(&db_err)?;
795
796                for schema in &db.schemas {
797                    let schema_type = schema.schema_type.to_string();
798                    stmts
799                        .ins_schema
800                        .execute(params![&db.name, &schema.name, schema_type.as_str()])
801                        .map_err(&db_err)?;
802
803                    for obj in &schema.objects {
804                        if changed_keys.contains(&obj.id.to_string()) {
805                            stmts.insert_object(obj, &db.name, &schema.name, root, &db_err)?;
806                        }
807                    }
808                }
809
810                stmts.insert_mod_statements(db, &db_err)?;
811            }
812
813            for ext_dep in &project.external_dependencies {
814                stmts
815                    .ins_ext_dep
816                    .execute(params![ext_dep.to_string()])
817                    .map_err(&db_err)?;
818            }
819            for cluster in &project.cluster_dependencies {
820                stmts
821                    .ins_cluster_dep
822                    .execute(params![&cluster.name])
823                    .map_err(&db_err)?;
824            }
825            for rs in &project.replacement_schemas {
826                stmts
827                    .ins_repl_schema
828                    .execute(params![&rs.database, &rs.schema])
829                    .map_err(&db_err)?;
830            }
831        }
832
833        tx.commit().map_err(&db_err)
834    }
835
836    fn load_row_keys(&self, table: &str) -> Result<BTreeSet<String>, CacheError> {
837        let mut stmt = self
838            .conn
839            .prepare(&format!("SELECT object_key FROM {table}"))
840            .map_err(db_err(&self.path))?;
841        let rows = stmt
842            .query_map([], |row| row.get::<_, String>(0))
843            .map_err(db_err(&self.path))?;
844        let mut keys = BTreeSet::new();
845        for row in rows {
846            keys.insert(row.map_err(db_err(&self.path))?);
847        }
848        Ok(keys)
849    }
850
851    fn prune_rows(&mut self, table: &str, keep: &BTreeSet<String>) -> Result<(), CacheError> {
852        let existing = self.load_row_keys(table)?;
853        let tx = self.conn.transaction().map_err(db_err(&self.path))?;
854        {
855            let mut stmt = tx
856                .prepare(&format!("DELETE FROM {table} WHERE object_key = ?1"))
857                .map_err(db_err(&self.path))?;
858            for key in &existing {
859                if !keep.contains(key) {
860                    stmt.execute([key]).map_err(db_err(&self.path))?;
861                }
862            }
863        }
864        tx.commit().map_err(db_err(&self.path))
865    }
866}
867
868/// Prepared INSERT statements for [`BuildArtifact::write_project`].
869struct ProjectStatements<'tx> {
870    ins_db: rusqlite::Statement<'tx>,
871    ins_schema: rusqlite::Statement<'tx>,
872    ins_obj: rusqlite::Statement<'tx>,
873    ins_dep: rusqlite::Statement<'tx>,
874    ins_comment: rusqlite::Statement<'tx>,
875    ins_index: rusqlite::Statement<'tx>,
876    ins_grant: rusqlite::Statement<'tx>,
877    ins_test: rusqlite::Statement<'tx>,
878    ins_infra: rusqlite::Statement<'tx>,
879    ins_infra_prop: rusqlite::Statement<'tx>,
880    ins_ext_dep: rusqlite::Statement<'tx>,
881    ins_cluster_dep: rusqlite::Statement<'tx>,
882    ins_repl_schema: rusqlite::Statement<'tx>,
883    ins_alias: rusqlite::Statement<'tx>,
884    ins_mod_stmt: rusqlite::Statement<'tx>,
885}
886
887impl<'tx> ProjectStatements<'tx> {
888    fn new(
889        tx: &'tx rusqlite::Transaction<'_>,
890        db_err: &impl Fn(rusqlite::Error) -> CacheError,
891    ) -> Result<Self, CacheError> {
892        Ok(Self {
893            ins_db: tx
894                .prepare("INSERT INTO project_databases (name) VALUES (?1)")
895                .map_err(db_err)?,
896            ins_schema: tx
897                .prepare("INSERT INTO project_schemas (database, name, schema_type) VALUES (?1, ?2, ?3)")
898                .map_err(db_err)?,
899            ins_obj: tx
900                .prepare("INSERT INTO project_objects (object_key, database, schema, name, object_kind, cluster, file_path, sql_text) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)")
901                .map_err(db_err)?,
902            ins_dep: tx
903                .prepare("INSERT INTO project_dependencies (object_key, dependency_key) VALUES (?1, ?2)")
904                .map_err(db_err)?,
905            ins_comment: tx
906                .prepare("INSERT INTO project_comments (object_key, comment_type, target_column, comment_text, sql_text) VALUES (?1, ?2, ?3, ?4, ?5)")
907                .map_err(db_err)?,
908            ins_index: tx
909                .prepare("INSERT INTO project_indexes (object_key, index_name, cluster, columns, sql_text) VALUES (?1, ?2, ?3, ?4, ?5)")
910                .map_err(db_err)?,
911            ins_grant: tx
912                .prepare("INSERT INTO project_grants (object_key, privilege, grantee, sql_text) VALUES (?1, ?2, ?3, ?4)")
913                .map_err(db_err)?,
914            ins_test: tx
915                .prepare("INSERT INTO project_tests (object_key, test_name, sql_text) VALUES (?1, ?2, ?3)")
916                .map_err(db_err)?,
917            ins_infra: tx
918                .prepare("INSERT INTO project_infrastructure (object_key, infra_type, connector_type, connection_ref, source_ref, external_reference) VALUES (?1, ?2, ?3, ?4, ?5, ?6)")
919                .map_err(db_err)?,
920            ins_infra_prop: tx
921                .prepare("INSERT INTO project_infrastructure_properties (object_key, property_key, property_value, secret_ref, object_ref) VALUES (?1, ?2, ?3, ?4, ?5)")
922                .map_err(db_err)?,
923            ins_ext_dep: tx
924                .prepare("INSERT INTO project_external_dependencies (object_key) VALUES (?1)")
925                .map_err(db_err)?,
926            ins_cluster_dep: tx
927                .prepare("INSERT INTO project_cluster_dependencies (cluster_name) VALUES (?1)")
928                .map_err(db_err)?,
929            ins_repl_schema: tx
930                .prepare("INSERT INTO project_replacement_schemas (database, schema) VALUES (?1, ?2)")
931                .map_err(db_err)?,
932            ins_alias: tx
933                .prepare("INSERT INTO project_aliases (object_key, alias, target_fqn) VALUES (?1, ?2, ?3)")
934                .map_err(db_err)?,
935            ins_mod_stmt: tx
936                .prepare("INSERT INTO project_mod_statements (database, schema, position, sql_text) VALUES (?1, ?2, ?3, ?4)")
937                .map_err(db_err)?,
938        })
939    }
940
941    fn insert_object(
942        &mut self,
943        obj: &graph::DatabaseObject,
944        db_name: &str,
945        schema_name: &str,
946        root: &Path,
947        db_err: &impl Fn(rusqlite::Error) -> CacheError,
948    ) -> Result<(), CacheError> {
949        let object_key = obj.id.to_string();
950        let typed = &obj.typed_object;
951        let kind = typed.stmt.kind().as_str();
952        let cluster = statement_cluster(&typed.stmt);
953        let file_path = typed
954            .path
955            .strip_prefix(root)
956            .unwrap_or(&typed.path)
957            .to_string_lossy()
958            .to_string();
959        let sql_text = format!("{};", typed.stmt);
960
961        self.ins_obj
962            .execute(params![
963                &object_key,
964                db_name,
965                schema_name,
966                obj.id.object(),
967                kind,
968                &cluster,
969                &file_path,
970                &sql_text,
971            ])
972            .map_err(db_err)?;
973
974        for dep in &obj.dependencies {
975            self.ins_dep
976                .execute(params![&object_key, dep.to_string()])
977                .map_err(db_err)?;
978        }
979
980        for comment in &typed.comments {
981            let (comment_type, target_column) = match &comment.object {
982                CommentObjectType::Column { name } => ("column", Some(name.column.to_string())),
983                _ => ("object", None),
984            };
985            let comment_text = comment.comment.as_deref().unwrap_or("");
986            let comment_sql = format!("{};", comment);
987            self.ins_comment
988                .execute(params![
989                    &object_key,
990                    comment_type,
991                    &target_column,
992                    comment_text,
993                    &comment_sql,
994                ])
995                .map_err(db_err)?;
996        }
997
998        for idx in &typed.indexes {
999            let index_name = idx.name.as_ref().map(|n| n.to_string()).unwrap_or_default();
1000            let idx_cluster = idx.in_cluster.as_ref().map(|c| c.to_string());
1001            let columns_str = idx
1002                .key_parts
1003                .as_ref()
1004                .map(|parts| {
1005                    parts
1006                        .iter()
1007                        .map(|p| p.to_string())
1008                        .collect::<Vec<_>>()
1009                        .join(", ")
1010                })
1011                .unwrap_or_default();
1012            let idx_sql = format!("{};", idx);
1013            self.ins_index
1014                .execute(params![
1015                    &object_key,
1016                    &index_name,
1017                    &idx_cluster,
1018                    &columns_str,
1019                    &idx_sql,
1020                ])
1021                .map_err(db_err)?;
1022        }
1023
1024        for grant in &typed.grants {
1025            let privilege = grant.privileges.to_string();
1026            let grantee = grant
1027                .roles
1028                .iter()
1029                .map(|r| r.to_string())
1030                .collect::<Vec<_>>()
1031                .join(", ");
1032            let grant_sql = format!("{};", grant);
1033            self.ins_grant
1034                .execute(params![&object_key, &privilege, &grantee, &grant_sql])
1035                .map_err(db_err)?;
1036        }
1037
1038        for test in &typed.tests {
1039            let test_name = test.name.to_string();
1040            let test_sql = format!("{};", test);
1041            self.ins_test
1042                .execute(params![&object_key, &test_name, &test_sql])
1043                .map_err(db_err)?;
1044        }
1045
1046        if let Some(infra) = infrastructure::extract(&typed.stmt) {
1047            self.insert_infrastructure(&object_key, &infra, db_err)?;
1048        }
1049
1050        let aliases = extract_alias_map(&typed.stmt, db_name, schema_name);
1051        for (alias, target_fqn) in &aliases {
1052            self.ins_alias
1053                .execute(params![&object_key, alias, target_fqn])
1054                .map_err(db_err)?;
1055        }
1056
1057        Ok(())
1058    }
1059
1060    fn insert_infrastructure(
1061        &mut self,
1062        object_key: &str,
1063        infra: &Infrastructure,
1064        db_err: &impl Fn(rusqlite::Error) -> CacheError,
1065    ) -> Result<(), CacheError> {
1066        let (infra_type, connector_type, connection_ref, source_ref, external_reference) =
1067            match infra {
1068                Infrastructure::Connection { connector_type, .. } => (
1069                    "connection",
1070                    Some(connector_type.as_str()),
1071                    None,
1072                    None,
1073                    None,
1074                ),
1075                Infrastructure::Source {
1076                    connector_type,
1077                    connection_ref,
1078                    ..
1079                } => (
1080                    "source",
1081                    Some(connector_type.as_str()),
1082                    connection_ref.as_deref(),
1083                    None,
1084                    None,
1085                ),
1086                Infrastructure::TableFromSource {
1087                    source_ref,
1088                    external_reference,
1089                } => (
1090                    "table-from-source",
1091                    None,
1092                    None,
1093                    Some(source_ref.as_str()),
1094                    external_reference.as_deref(),
1095                ),
1096            };
1097
1098        self.ins_infra
1099            .execute(params![
1100                object_key,
1101                infra_type,
1102                connector_type,
1103                connection_ref,
1104                source_ref,
1105                external_reference,
1106            ])
1107            .map_err(db_err)?;
1108
1109        let properties = match infra {
1110            Infrastructure::Connection { properties, .. }
1111            | Infrastructure::Source { properties, .. } => properties.as_slice(),
1112            Infrastructure::TableFromSource { .. } => &[],
1113        };
1114        for prop in properties {
1115            self.ins_infra_prop
1116                .execute(params![
1117                    object_key,
1118                    &prop.key,
1119                    &prop.value,
1120                    &prop.secret_ref,
1121                    &prop.object_ref,
1122                ])
1123                .map_err(db_err)?;
1124        }
1125
1126        Ok(())
1127    }
1128
1129    fn insert_mod_statements(
1130        &mut self,
1131        db: &graph::Database,
1132        db_err: &impl Fn(rusqlite::Error) -> CacheError,
1133    ) -> Result<(), CacheError> {
1134        if let Some(stmts) = &db.mod_statements {
1135            for (pos, stmt) in stmts.iter().enumerate() {
1136                self.ins_mod_stmt
1137                    .execute(params![
1138                        &db.name,
1139                        Option::<String>::None,
1140                        i64::try_from(pos).unwrap_or(0),
1141                        format!("{};", stmt),
1142                    ])
1143                    .map_err(db_err)?;
1144            }
1145        }
1146
1147        for schema in &db.schemas {
1148            if let Some(stmts) = &schema.mod_statements {
1149                for (pos, stmt) in stmts.iter().enumerate() {
1150                    self.ins_mod_stmt
1151                        .execute(params![
1152                            &db.name,
1153                            Some(&schema.name),
1154                            i64::try_from(pos).unwrap_or(0),
1155                            format!("{};", stmt),
1156                        ])
1157                        .map_err(db_err)?;
1158                }
1159            }
1160        }
1161
1162        Ok(())
1163    }
1164}
1165
1166fn file_metadata_signature(path: &Path) -> Result<(i64, i64), CacheError> {
1167    let metadata = fs::metadata(path).map_err(|source| CacheError::FileReadFailed {
1168        path: path.to_path_buf(),
1169        source,
1170    })?;
1171    let size = i64::try_from(metadata.len()).unwrap_or(i64::MAX);
1172    let modified = metadata
1173        .modified()
1174        .map_err(|source| CacheError::FileReadFailed {
1175            path: path.to_path_buf(),
1176            source,
1177        })?;
1178    let duration =
1179        modified
1180            .duration_since(UNIX_EPOCH)
1181            .map_err(|source| CacheError::FileReadFailed {
1182                path: path.to_path_buf(),
1183                source: std::io::Error::other(source),
1184            })?;
1185    // File mtimes are an advisory cache key; saturate if the platform
1186    // reports a nanosecond value larger than the on-disk schema stores.
1187    let mtime_ns = i64::try_from(duration.as_nanos()).unwrap_or(i64::MAX);
1188    Ok((size, mtime_ns))
1189}
1190
1191/// Extract the cluster name from a statement's `IN CLUSTER` clause, if present.
1192fn statement_cluster(stmt: &Statement) -> Option<String> {
1193    use crate::project::ast::Statement;
1194
1195    let in_cluster = match stmt {
1196        Statement::CreateMaterializedView(mv) => mv.in_cluster.as_ref(),
1197        Statement::CreateSource(source) => source.in_cluster.as_ref(),
1198        Statement::CreateSink(sink) => sink.in_cluster.as_ref(),
1199        Statement::CreateView(_)
1200        | Statement::CreateTable(_)
1201        | Statement::CreateTableFromSource(_)
1202        | Statement::CreateSecret(_)
1203        | Statement::CreateConnection(_) => None,
1204    };
1205
1206    match in_cluster {
1207        Some(RawClusterName::Unresolved(ident)) => Some(ident.to_string()),
1208        _ => None,
1209    }
1210}
1211
1212/// An object compilation artifact to be written to `object_state`.
1213#[derive(Debug, Clone)]
1214pub(crate) struct ObjectStateRow {
1215    /// Logical object identifier (`database.schema.object`).
1216    pub object_key: String,
1217    /// Composite hash of the object key, variant paths, content hashes, and
1218    /// compile-time variables. Used to detect cache staleness.
1219    pub fingerprint: String,
1220    pub artifact: CompiledObjectArtifact,
1221}
1222
1223/// AST visitor that collects FROM-clause table aliases. Only direct table
1224/// references; derived subqueries and table functions are skipped.
1225struct AliasVisitor<'a> {
1226    default_db: &'a str,
1227    default_schema: &'a str,
1228    aliases: BTreeMap<String, String>,
1229    cte_scope: CteScope,
1230}
1231
1232impl<'a> AliasVisitor<'a> {
1233    fn new(default_db: &'a str, default_schema: &'a str) -> Self {
1234        Self {
1235            default_db,
1236            default_schema,
1237            aliases: BTreeMap::new(),
1238            cte_scope: CteScope::new(),
1239        }
1240    }
1241}
1242
1243impl<'ast> Visit<'ast, Raw> for AliasVisitor<'_> {
1244    fn visit_query(&mut self, node: &'ast mz_sql_parser::ast::Query<Raw>) {
1245        let names = CteScope::collect_cte_names(&node.ctes);
1246        self.cte_scope.push(names);
1247        visit::visit_query(self, node);
1248        self.cte_scope.pop();
1249    }
1250
1251    fn visit_table_factor(&mut self, node: &'ast TableFactor<Raw>) {
1252        match node {
1253            TableFactor::Table { name, alias } => {
1254                let unresolved = name.name();
1255                if unresolved.0.len() == 1 && self.cte_scope.is_cte(&unresolved.0[0].to_string()) {
1256                    return;
1257                }
1258                let obj_id =
1259                    ObjectId::from_raw_item_name(name, self.default_db, self.default_schema);
1260                let fqn = obj_id.to_string();
1261                if let Some(bare) = unresolved.0.last().map(|i| i.to_string().to_lowercase()) {
1262                    self.aliases.insert(bare, fqn.clone());
1263                }
1264                if let Some(alias) = alias {
1265                    self.aliases
1266                        .insert(alias.name.to_string().to_lowercase(), fqn);
1267                }
1268            }
1269            TableFactor::NestedJoin { .. } => {
1270                visit::visit_table_factor(self, node);
1271            }
1272            // Don't recurse into subqueries or table functions for alias collection
1273            TableFactor::Derived { .. }
1274            | TableFactor::Function { .. }
1275            | TableFactor::RowsFrom { .. } => {}
1276        }
1277    }
1278}
1279
1280/// Extract alias → fully-qualified name map from a statement's query body.
1281///
1282/// Only `CreateView` and `CreateMaterializedView` produce aliases.
1283/// All keys are lowercased for case-insensitive lookup. CTE references
1284/// are excluded from the alias map.
1285pub(crate) fn extract_alias_map(
1286    stmt: &Statement,
1287    default_db: &str,
1288    default_schema: &str,
1289) -> BTreeMap<String, String> {
1290    let mut visitor = AliasVisitor::new(default_db, default_schema);
1291    match stmt {
1292        Statement::CreateView(s) => {
1293            visitor.visit_query(&s.definition.query);
1294        }
1295        Statement::CreateMaterializedView(s) => {
1296            visitor.visit_query(&s.query);
1297        }
1298        _ => {}
1299    }
1300    visitor.aliases
1301}
1302
1303#[cfg(test)]
1304mod tests {
1305    use super::*;
1306    use tempfile::tempdir;
1307
1308    fn open_db(root: &Path) -> BuildArtifact {
1309        BuildArtifact::open(root, "default", None, &BTreeMap::new()).unwrap()
1310    }
1311
1312    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
1313    #[mz_ore::test]
1314    fn load_file_contents_treats_null_contents_as_miss() {
1315        let temp = tempdir().unwrap();
1316        let file = temp.path().join("model.sql");
1317        fs::write(&file, "CREATE VIEW v AS SELECT 1;").unwrap();
1318
1319        let mut db = open_db(temp.path());
1320        let paths = BTreeSet::from([file.clone()]);
1321        let fs = crate::fs::FileSystem::new();
1322        db.load_file_hashes(&fs, &paths).unwrap();
1323        db.conn
1324            .execute(
1325                "UPDATE file_state SET contents = NULL WHERE path = ?1",
1326                [file.to_string_lossy().to_string()],
1327            )
1328            .unwrap();
1329
1330        let entries = db.load_file_contents(&fs, &paths).unwrap();
1331        assert_eq!(
1332            entries.get(&file).map(String::as_str),
1333            Some("CREATE VIEW v AS SELECT 1;")
1334        );
1335
1336        let repaired: Option<String> = db
1337            .conn
1338            .query_row(
1339                "SELECT contents FROM file_state WHERE path = ?1",
1340                [file.to_string_lossy().to_string()],
1341                |row| row.get(0),
1342            )
1343            .optional()
1344            .unwrap();
1345        assert_eq!(repaired.as_deref(), Some("CREATE VIEW v AS SELECT 1;"));
1346    }
1347
1348    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
1349    #[mz_ore::test]
1350    fn prune_rows_removes_stale_object_and_typecheck_entries() {
1351        let temp = tempdir().unwrap();
1352        let mut db = open_db(temp.path());
1353        db.upsert_object_rows(&[
1354            ObjectStateRow {
1355                object_key: "db.public.keep".into(),
1356                fingerprint: "keep".into(),
1357                artifact: CompiledObjectArtifact::Skipped,
1358            },
1359            ObjectStateRow {
1360                object_key: "db.public.drop".into(),
1361                fingerprint: "drop".into(),
1362                artifact: CompiledObjectArtifact::Skipped,
1363            },
1364        ])
1365        .unwrap();
1366        db.upsert_typecheck_results(&[
1367            ("db.public.keep".into(), "view".into(), BTreeMap::new()),
1368            ("db.public.drop".into(), "view".into(), BTreeMap::new()),
1369        ])
1370        .unwrap();
1371
1372        let keep = BTreeSet::from([String::from("db.public.keep")]);
1373        db.prune_object_rows(&keep).unwrap();
1374        db.prune_typecheck_results(&keep).unwrap();
1375
1376        let object_keys = db.load_row_keys(OBJECT_STATE_TABLE).unwrap();
1377        let typecheck_keys = db.load_row_keys(TYPECHECK_OBJECTS_TABLE).unwrap();
1378        assert_eq!(object_keys, keep);
1379        assert_eq!(typecheck_keys, keep);
1380    }
1381
1382    /// Helper: parse SQL into a [`Statement`] for test construction.
1383    fn parse_stmt(sql: &str) -> Statement {
1384        let parsed = mz_sql_parser::parser::parse_statements(sql).unwrap();
1385        match parsed.into_iter().next().unwrap().ast {
1386            mz_sql_parser::ast::Statement::CreateView(s) => Statement::CreateView(s),
1387            mz_sql_parser::ast::Statement::CreateMaterializedView(s) => {
1388                Statement::CreateMaterializedView(s)
1389            }
1390            mz_sql_parser::ast::Statement::CreateTable(s) => Statement::CreateTable(s),
1391            other => panic!("Unexpected statement type: {:?}", other),
1392        }
1393    }
1394
1395    fn all_object_keys(project: &graph::Project) -> BTreeSet<String> {
1396        project
1397            .databases
1398            .iter()
1399            .flat_map(|db| {
1400                db.schemas
1401                    .iter()
1402                    .flat_map(|s| s.objects.iter().map(|o| o.id.to_string()))
1403            })
1404            .collect()
1405    }
1406
1407    fn make_project(db_name: &str, schema_name: &str, stmt: Statement) -> graph::Project {
1408        use crate::project::ir::compiled;
1409
1410        let typed_obj = compiled::DatabaseObject {
1411            path: PathBuf::from("test.sql"),
1412            stmt,
1413            indexes: vec![],
1414            grants: vec![],
1415            comments: vec![],
1416            tests: vec![],
1417        };
1418        let obj_id = ObjectId::new(
1419            db_name.to_string(),
1420            schema_name.to_string(),
1421            typed_obj.stmt.ident().object.as_str().to_string(),
1422        );
1423        let db_obj = graph::DatabaseObject {
1424            id: obj_id,
1425            typed_object: typed_obj,
1426            dependencies: BTreeSet::new(),
1427        };
1428        graph::Project {
1429            databases: vec![graph::Database {
1430                name: db_name.to_string(),
1431                schemas: vec![graph::Schema {
1432                    name: schema_name.to_string(),
1433                    objects: vec![db_obj],
1434                    mod_statements: None,
1435                    schema_type: graph::SchemaType::Compute,
1436                }],
1437                mod_statements: None,
1438            }],
1439            dependency_graph: BTreeMap::new(),
1440            external_dependencies: BTreeSet::new(),
1441            cluster_dependencies: BTreeSet::new(),
1442            tests: vec![],
1443            replacement_schemas: BTreeSet::new(),
1444            compile_dirty: BTreeSet::new(),
1445        }
1446    }
1447
1448    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
1449    #[mz_ore::test]
1450    fn write_project_persists_aliases() {
1451        let temp = tempdir().unwrap();
1452        let mut db = open_db(temp.path());
1453
1454        let stmt = parse_stmt("CREATE VIEW v AS SELECT o.id FROM orders AS o");
1455        let project = make_project("mydb", "public", stmt);
1456        let changed = all_object_keys(&project);
1457        db.write_project(&project, &changed, &BTreeSet::new(), temp.path())
1458            .unwrap();
1459
1460        let rows: Vec<(String, String, String)> = db
1461            .conn
1462            .prepare("SELECT object_key, alias, target_fqn FROM project_aliases ORDER BY alias")
1463            .unwrap()
1464            .query_map([], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))
1465            .unwrap()
1466            .collect::<Result<Vec<_>, _>>()
1467            .unwrap();
1468
1469        assert_eq!(rows.len(), 2);
1470
1471        let o_row = rows.iter().find(|(_, alias, _)| alias == "o").unwrap();
1472        assert_eq!(o_row.0, "mydb.public.v");
1473        assert_eq!(o_row.2, "mydb.public.orders");
1474
1475        let orders_row = rows.iter().find(|(_, alias, _)| alias == "orders").unwrap();
1476        assert_eq!(orders_row.0, "mydb.public.v");
1477        assert_eq!(orders_row.2, "mydb.public.orders");
1478    }
1479
1480    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
1481    #[mz_ore::test]
1482    fn write_project_no_aliases_for_table_stmt() {
1483        let temp = tempdir().unwrap();
1484        let mut db = open_db(temp.path());
1485
1486        let stmt = parse_stmt("CREATE TABLE t (id INT)");
1487        let project = make_project("mydb", "public", stmt);
1488        let changed = all_object_keys(&project);
1489        db.write_project(&project, &changed, &BTreeSet::new(), temp.path())
1490            .unwrap();
1491
1492        let count: i64 = db
1493            .conn
1494            .query_row("SELECT COUNT(*) FROM project_aliases", [], |row| row.get(0))
1495            .unwrap();
1496        assert_eq!(count, 0);
1497    }
1498}