1use super::CacheError;
31use super::schema;
32use crate::project::ast::Statement;
33use crate::project::compiler::cache_io::hex_digest;
34use crate::project::ir::graph;
35use crate::project::ir::infrastructure::{self, Infrastructure};
36use crate::project::ir::object_id::ObjectId;
37use crate::project::resolve::cte_scope::CteScope;
38use crate::types::ColumnType;
39use mz_sql_parser::ast::visit::{self, Visit};
40use mz_sql_parser::ast::{CommentObjectType, Raw, RawClusterName, TableFactor};
41use rusqlite::{Connection, OptionalExtension, params};
42use sha2::{Digest, Sha256};
43use std::collections::{BTreeMap, BTreeSet};
44use std::fs;
45use std::path::{Path, PathBuf};
46use std::time::UNIX_EPOCH;
47
48const OBJECT_STATE_TABLE: &str = "object_state";
49const TYPECHECK_OBJECTS_TABLE: &str = "typecheck_objects";
50const TYPECHECK_COLUMNS_TABLE: &str = "typecheck_columns";
51
52fn db_err(path: &Path) -> impl Fn(rusqlite::Error) -> CacheError + '_ {
53 move |source| CacheError::DatabaseOperationFailed {
54 path: path.to_path_buf(),
55 source,
56 }
57}
58
59fn file_read_err(path: &Path) -> impl Fn(std::io::Error) -> CacheError + '_ {
60 move |source| CacheError::FileReadFailed {
61 path: path.to_path_buf(),
62 source,
63 }
64}
65
66const FILE_STATE_UPSERT: &str = "
67 INSERT INTO file_state(path, size, mtime_ns, content_hash, contents)
68 VALUES(?1, ?2, ?3, ?4, ?5)
69 ON CONFLICT(path) DO UPDATE SET
70 size = excluded.size,
71 mtime_ns = excluded.mtime_ns,
72 content_hash = excluded.content_hash,
73 contents = excluded.contents
74";
75
76fn read_and_upsert_file(
79 upsert: &mut rusqlite::Statement<'_>,
80 path: &Path,
81 path_str: &str,
82 size: i64,
83 mtime_ns: i64,
84 db_path: &Path,
85) -> Result<(String, String), CacheError> {
86 let contents = fs::read_to_string(path).map_err(file_read_err(path))?;
87 let content_hash = hex_digest(Sha256::digest(contents.as_bytes()));
88 upsert
89 .execute(params![path_str, size, mtime_ns, &content_hash, &contents])
90 .map_err(db_err(db_path))?;
91 Ok((content_hash, contents))
92}
93
94#[derive(Debug, Clone)]
101pub(crate) enum CompiledObjectArtifact {
102 Skipped,
103 Object(CompiledObjectArtifactData),
104}
105
106#[derive(Debug, Clone)]
111pub(crate) struct CompiledObjectArtifactData {
112 pub db_name: String,
113 pub schema_name: String,
114 pub file_path: PathBuf,
115 pub stmt_sql: String,
116 pub indexes_sql: Vec<String>,
117 pub grants_sql: Vec<String>,
118 pub comments_sql: Vec<String>,
119 pub tests_sql: Vec<String>,
120}
121
122struct ObjectStateHeader<'a> {
125 kind: &'static str,
126 db_name: Option<&'a str>,
127 schema_name: Option<&'a str>,
128 file_path: Option<String>,
129 stmt_sql: Option<&'a str>,
130}
131
132impl<'a> ObjectStateHeader<'a> {
133 fn from_artifact(artifact: &'a CompiledObjectArtifact) -> Self {
134 match artifact {
135 CompiledObjectArtifact::Skipped => Self {
136 kind: "skipped",
137 db_name: None,
138 schema_name: None,
139 file_path: None,
140 stmt_sql: None,
141 },
142 CompiledObjectArtifact::Object(data) => Self {
143 kind: "object",
144 db_name: Some(&data.db_name),
145 schema_name: Some(&data.schema_name),
146 file_path: Some(data.file_path.to_string_lossy().into_owned()),
147 stmt_sql: Some(&data.stmt_sql),
148 },
149 }
150 }
151}
152
153fn prepare_delete<'tx>(
154 tx: &'tx rusqlite::Transaction<'_>,
155 table: &str,
156 path: &Path,
157) -> Result<rusqlite::Statement<'tx>, CacheError> {
158 tx.prepare(&format!("DELETE FROM {table} WHERE object_key = ?1"))
159 .map_err(db_err(path))
160}
161
162fn prepare_fragment_insert<'tx>(
163 tx: &'tx rusqlite::Transaction<'_>,
164 table: &str,
165 path: &Path,
166) -> Result<rusqlite::Statement<'tx>, CacheError> {
167 tx.prepare(&format!(
168 "INSERT INTO {table}(object_key, position, sql_text) VALUES(?1, ?2, ?3)"
169 ))
170 .map_err(db_err(path))
171}
172
173fn run_execute(
174 stmt: &mut rusqlite::Statement<'_>,
175 params: impl rusqlite::Params,
176 path: &Path,
177) -> Result<(), CacheError> {
178 stmt.execute(params).map(|_| ()).map_err(db_err(path))
179}
180
181fn write_fragments(
182 stmt: &mut rusqlite::Statement<'_>,
183 object_key: &str,
184 fragments: &[String],
185 path: &Path,
186) -> Result<(), CacheError> {
187 for (position, sql) in fragments.iter().enumerate() {
188 let position = i64::try_from(position).unwrap_or(i64::MAX);
189 stmt.execute(params![object_key, position, sql])
190 .map_err(db_err(path))?;
191 }
192 Ok(())
193}
194
195fn collect_fragments(
196 stmt: &mut rusqlite::Statement<'_>,
197 object_key: &str,
198 path: &Path,
199) -> Result<Vec<String>, CacheError> {
200 let rows = stmt
201 .query_map(params![object_key], |row| row.get::<_, String>(0))
202 .map_err(db_err(path))?;
203 let mut out = Vec::new();
204 for row in rows {
205 out.push(row.map_err(db_err(path))?);
206 }
207 Ok(out)
208}
209
210pub(crate) struct BuildArtifact {
211 path: PathBuf,
212 conn: Connection,
213}
214
215impl BuildArtifact {
216 pub(crate) fn open(
223 root: &Path,
224 profile: &str,
225 profile_suffix: Option<&str>,
226 variables: &BTreeMap<String, String>,
227 ) -> Result<Self, CacheError> {
228 let path = super::db_path(root, profile, profile_suffix, variables);
229 let parent = path
230 .parent()
231 .expect("cache db path always has a parent directory");
232 fs::create_dir_all(parent).map_err(|source| CacheError::DirectoryCreationFailed {
233 path: parent.to_path_buf(),
234 source,
235 })?;
236 let conn = Connection::open(&path).map_err(|source| CacheError::DatabaseOpenFailed {
237 path: path.clone(),
238 source,
239 })?;
240 let db = Self { path, conn };
241 db.initialize()?;
242 Ok(db)
243 }
244
245 fn initialize(&self) -> Result<(), CacheError> {
246 const PAGE_CACHE_KIB: i64 = -64 * 1024;
248 const MMAP_BYTES: i64 = 256 * 1024 * 1024;
249 const WAL_AUTOCHECKPOINT_PAGES: i64 = 10_000;
250 let pragmas = format!(
251 "
252 PRAGMA journal_mode=WAL;
253 PRAGMA synchronous=NORMAL;
254 PRAGMA cache_size={PAGE_CACHE_KIB};
255 PRAGMA temp_store=MEMORY;
256 PRAGMA mmap_size={MMAP_BYTES};
257 PRAGMA wal_autocheckpoint={WAL_AUTOCHECKPOINT_PAGES};
258 ",
259 );
260 self.conn
261 .execute_batch(&pragmas)
262 .map_err(db_err(&self.path))?;
263 self.conn
264 .execute_batch(
265 "
266 CREATE TABLE IF NOT EXISTS meta (
267 key TEXT PRIMARY KEY,
268 value TEXT NOT NULL
269 );
270 ",
271 )
272 .map_err(db_err(&self.path))?;
273
274 let version: Option<i64> = self
275 .conn
276 .query_row(
277 "SELECT value FROM meta WHERE key = 'schema_version'",
278 [],
279 |row| {
280 row.get::<_, String>(0)
281 .map(|s| s.parse::<i64>().unwrap_or_default())
282 },
283 )
284 .optional()
285 .map_err(db_err(&self.path))?;
286
287 if version != Some(schema::SCHEMA_VERSION) {
288 self.conn
289 .execute_batch(schema::DROP_SQL)
290 .map_err(db_err(&self.path))?;
291 }
292
293 self.create_schema()?;
294 Ok(())
295 }
296
297 fn create_schema(&self) -> Result<(), CacheError> {
298 self.conn
299 .execute_batch(schema::CREATE_SQL)
300 .map_err(db_err(&self.path))?;
301
302 self.conn
303 .execute(
304 "INSERT OR REPLACE INTO meta(key, value) VALUES ('schema_version', ?1)",
305 params![schema::SCHEMA_VERSION.to_string()],
306 )
307 .map_err(db_err(&self.path))?;
308 Ok(())
309 }
310
311 pub(crate) fn load_file_hashes(
314 &mut self,
315 fs: &crate::fs::FileSystem,
316 paths: &BTreeSet<PathBuf>,
317 ) -> Result<BTreeMap<PathBuf, String>, CacheError> {
318 let tx = self.conn.transaction().map_err(db_err(&self.path))?;
319 let mut select = tx
320 .prepare("SELECT size, mtime_ns, content_hash FROM file_state WHERE path = ?1")
321 .map_err(db_err(&self.path))?;
322 let mut upsert = tx.prepare(FILE_STATE_UPSERT).map_err(db_err(&self.path))?;
323
324 let mut results = BTreeMap::new();
325 for path in paths {
326 if fs.is_overlay(path) {
330 let contents = fs.read_to_string(path).map_err(file_read_err(path))?;
331 results.insert(
332 path.clone(),
333 hex_digest(Sha256::digest(contents.as_bytes())),
334 );
335 continue;
336 }
337
338 let (size, mtime_ns) = file_metadata_signature(path)?;
339 let path_str = path.to_string_lossy().to_string();
340 let cached: Option<(i64, i64, String)> = select
341 .query_row([&path_str], |row| {
342 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
343 })
344 .optional()
345 .map_err(db_err(&self.path))?;
346
347 let hash = match cached {
348 Some((s, m, h)) if s == size && m == mtime_ns => h,
349 _ => {
350 let (h, _) = read_and_upsert_file(
351 &mut upsert,
352 path,
353 &path_str,
354 size,
355 mtime_ns,
356 &self.path,
357 )?;
358 h
359 }
360 };
361 results.insert(path.clone(), hash);
362 }
363
364 drop(select);
365 drop(upsert);
366 tx.commit().map_err(db_err(&self.path))?;
367 Ok(results)
368 }
369
370 pub(crate) fn load_file_contents(
373 &mut self,
374 fs: &crate::fs::FileSystem,
375 paths: &BTreeSet<PathBuf>,
376 ) -> Result<BTreeMap<PathBuf, String>, CacheError> {
377 let tx = self.conn.transaction().map_err(db_err(&self.path))?;
378 let mut select = tx
379 .prepare(
380 "SELECT size, mtime_ns, contents \
381 FROM file_state WHERE path = ?1",
382 )
383 .map_err(db_err(&self.path))?;
384 let mut upsert = tx.prepare(FILE_STATE_UPSERT).map_err(db_err(&self.path))?;
385
386 let mut results = BTreeMap::new();
387 for path in paths {
388 if fs.is_overlay(path) {
389 let contents = fs.read_to_string(path).map_err(file_read_err(path))?;
390 results.insert(path.clone(), contents);
391 continue;
392 }
393
394 let (size, mtime_ns) = file_metadata_signature(path)?;
395 let path_str = path.to_string_lossy().to_string();
396 let cached: Option<(i64, i64, Option<String>)> = select
397 .query_row([&path_str], |row| {
398 Ok((row.get(0)?, row.get(1)?, row.get(2)?))
399 })
400 .optional()
401 .map_err(db_err(&self.path))?;
402
403 let contents = match cached {
404 Some((s, m, Some(c))) if s == size && m == mtime_ns => c,
405 _ => {
406 let (_, contents) = read_and_upsert_file(
407 &mut upsert,
408 path,
409 &path_str,
410 size,
411 mtime_ns,
412 &self.path,
413 )?;
414 contents
415 }
416 };
417 results.insert(path.clone(), contents);
418 }
419
420 drop(select);
421 drop(upsert);
422 tx.commit().map_err(db_err(&self.path))?;
423 Ok(results)
424 }
425
426 pub(crate) fn load_object_fingerprints(&self) -> Result<BTreeMap<String, String>, CacheError> {
431 let mut stmt = self
432 .conn
433 .prepare("SELECT object_key, fingerprint FROM object_state")
434 .map_err(db_err(&self.path))?;
435 let rows = stmt
436 .query_map([], |row| {
437 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
438 })
439 .map_err(db_err(&self.path))?;
440
441 let mut result = BTreeMap::new();
442 for row in rows {
443 let (key, fingerprint) = row.map_err(db_err(&self.path))?;
444 result.insert(key, fingerprint);
445 }
446 Ok(result)
447 }
448
449 pub(crate) fn load_object_artifacts(
452 &self,
453 keys: &BTreeSet<String>,
454 ) -> Result<BTreeMap<String, CompiledObjectArtifact>, CacheError> {
455 let mut result = BTreeMap::new();
456 if keys.is_empty() {
457 return Ok(result);
458 }
459
460 let mut header = self
461 .conn
462 .prepare(
463 "SELECT kind, db_name, schema_name, file_path, stmt_sql \
464 FROM object_state WHERE object_key = ?1",
465 )
466 .map_err(db_err(&self.path))?;
467 let mut indexes = self.prepare_fragment_select("object_state_indexes")?;
468 let mut grants = self.prepare_fragment_select("object_state_grants")?;
469 let mut comments = self.prepare_fragment_select("object_state_comments")?;
470 let mut tests = self.prepare_fragment_select("object_state_tests")?;
471
472 for key in keys {
473 let row = header
474 .query_row(params![key], |row| {
475 Ok((
476 row.get::<_, String>(0)?,
477 row.get::<_, Option<String>>(1)?,
478 row.get::<_, Option<String>>(2)?,
479 row.get::<_, Option<String>>(3)?,
480 row.get::<_, Option<String>>(4)?,
481 ))
482 })
483 .optional()
484 .map_err(db_err(&self.path))?;
485 let Some((kind, db_name, schema_name, file_path, stmt_sql)) = row else {
486 continue;
487 };
488 let artifact = match kind.as_str() {
489 "skipped" => CompiledObjectArtifact::Skipped,
490 "object" => CompiledObjectArtifact::Object(CompiledObjectArtifactData {
491 db_name: db_name.unwrap_or_default(),
492 schema_name: schema_name.unwrap_or_default(),
493 file_path: file_path.map(PathBuf::from).unwrap_or_default(),
494 stmt_sql: stmt_sql.unwrap_or_default(),
495 indexes_sql: collect_fragments(&mut indexes, key, &self.path)?,
496 grants_sql: collect_fragments(&mut grants, key, &self.path)?,
497 comments_sql: collect_fragments(&mut comments, key, &self.path)?,
498 tests_sql: collect_fragments(&mut tests, key, &self.path)?,
499 }),
500 _ => continue,
501 };
502 result.insert(key.clone(), artifact);
503 }
504 Ok(result)
505 }
506
507 fn prepare_fragment_select(&self, table: &str) -> Result<rusqlite::Statement<'_>, CacheError> {
508 self.conn
509 .prepare(&format!(
510 "SELECT sql_text FROM {table} WHERE object_key = ?1 ORDER BY position"
511 ))
512 .map_err(db_err(&self.path))
513 }
514
515 pub(crate) fn upsert_object_rows(&mut self, rows: &[ObjectStateRow]) -> Result<(), CacheError> {
518 let tx = self.conn.transaction().map_err(db_err(&self.path))?;
519 {
520 let mut upsert_header = tx
521 .prepare(
522 "
523 INSERT INTO object_state(
524 object_key, fingerprint, kind, db_name, schema_name, file_path, stmt_sql
525 )
526 VALUES(?1, ?2, ?3, ?4, ?5, ?6, ?7)
527 ON CONFLICT(object_key) DO UPDATE SET
528 fingerprint = excluded.fingerprint,
529 kind = excluded.kind,
530 db_name = excluded.db_name,
531 schema_name = excluded.schema_name,
532 file_path = excluded.file_path,
533 stmt_sql = excluded.stmt_sql
534 ",
535 )
536 .map_err(db_err(&self.path))?;
537 let mut delete_indexes = prepare_delete(&tx, "object_state_indexes", &self.path)?;
538 let mut delete_grants = prepare_delete(&tx, "object_state_grants", &self.path)?;
539 let mut delete_comments = prepare_delete(&tx, "object_state_comments", &self.path)?;
540 let mut delete_tests = prepare_delete(&tx, "object_state_tests", &self.path)?;
541 let mut insert_indexes =
542 prepare_fragment_insert(&tx, "object_state_indexes", &self.path)?;
543 let mut insert_grants =
544 prepare_fragment_insert(&tx, "object_state_grants", &self.path)?;
545 let mut insert_comments =
546 prepare_fragment_insert(&tx, "object_state_comments", &self.path)?;
547 let mut insert_tests = prepare_fragment_insert(&tx, "object_state_tests", &self.path)?;
548
549 for row in rows {
550 let header = ObjectStateHeader::from_artifact(&row.artifact);
551 upsert_header
552 .execute(params![
553 row.object_key,
554 row.fingerprint,
555 header.kind,
556 header.db_name,
557 header.schema_name,
558 header.file_path,
559 header.stmt_sql,
560 ])
561 .map_err(db_err(&self.path))?;
562
563 run_execute(&mut delete_indexes, params![row.object_key], &self.path)?;
564 run_execute(&mut delete_grants, params![row.object_key], &self.path)?;
565 run_execute(&mut delete_comments, params![row.object_key], &self.path)?;
566 run_execute(&mut delete_tests, params![row.object_key], &self.path)?;
567
568 if let CompiledObjectArtifact::Object(data) = &row.artifact {
569 write_fragments(
570 &mut insert_indexes,
571 &row.object_key,
572 &data.indexes_sql,
573 &self.path,
574 )?;
575 write_fragments(
576 &mut insert_grants,
577 &row.object_key,
578 &data.grants_sql,
579 &self.path,
580 )?;
581 write_fragments(
582 &mut insert_comments,
583 &row.object_key,
584 &data.comments_sql,
585 &self.path,
586 )?;
587 write_fragments(
588 &mut insert_tests,
589 &row.object_key,
590 &data.tests_sql,
591 &self.path,
592 )?;
593 }
594 }
595 }
596 tx.commit().map_err(db_err(&self.path))
597 }
598
599 pub(crate) fn prune_object_rows(&mut self, keep: &BTreeSet<String>) -> Result<(), CacheError> {
601 self.prune_rows(OBJECT_STATE_TABLE, keep)
602 }
603
604 pub(crate) fn upsert_typecheck_results(
608 &mut self,
609 rows: &[(String, String, BTreeMap<String, ColumnType>)],
610 ) -> Result<(), CacheError> {
611 let tx = self.conn.transaction().map_err(db_err(&self.path))?;
612 {
613 let mut upsert_obj = tx
614 .prepare(
615 "
616 INSERT INTO typecheck_objects(object_key, object_kind)
617 VALUES(?1, ?2)
618 ON CONFLICT(object_key) DO UPDATE SET
619 object_kind = excluded.object_kind
620 ",
621 )
622 .map_err(db_err(&self.path))?;
623 let mut delete_cols = tx
624 .prepare("DELETE FROM typecheck_columns WHERE object_key = ?1")
625 .map_err(db_err(&self.path))?;
626 let mut insert_col = tx
627 .prepare(
628 "INSERT INTO typecheck_columns(object_key, column_name, column_type, nullable, position)
629 VALUES(?1, ?2, ?3, ?4, ?5)",
630 )
631 .map_err(db_err(&self.path))?;
632
633 for (key, kind, columns) in rows {
634 upsert_obj
635 .execute(params![key, kind])
636 .map_err(db_err(&self.path))?;
637 delete_cols.execute([key]).map_err(db_err(&self.path))?;
638 for (col_name, col_type) in columns {
639 insert_col
640 .execute(params![
641 key,
642 col_name,
643 col_type.r#type,
644 i32::from(col_type.nullable),
645 i64::try_from(col_type.position).unwrap_or(0),
646 ])
647 .map_err(db_err(&self.path))?;
648 }
649 }
650 }
651 tx.commit().map_err(db_err(&self.path))
652 }
653
654 pub(crate) fn prune_typecheck_results(
657 &mut self,
658 keep: &BTreeSet<String>,
659 ) -> Result<(), CacheError> {
660 self.prune_rows(TYPECHECK_COLUMNS_TABLE, keep)?;
661 self.prune_rows(TYPECHECK_OBJECTS_TABLE, keep)
662 }
663
664 pub(crate) fn load_typecheck_columns(
669 &self,
670 ) -> Result<BTreeMap<String, BTreeMap<String, ColumnType>>, CacheError> {
671 let mut stmt = self
672 .conn
673 .prepare(
674 "SELECT object_key, column_name, column_type, nullable, position \
675 FROM typecheck_columns",
676 )
677 .map_err(db_err(&self.path))?;
678 let rows = stmt
679 .query_map([], |row| {
680 Ok((
681 row.get::<_, String>(0)?,
682 row.get::<_, String>(1)?,
683 ColumnType {
684 r#type: row.get(2)?,
685 nullable: row.get::<_, i32>(3)? != 0,
686 position: usize::try_from(row.get::<_, i64>(4)?).unwrap_or(0),
687 comment: None,
688 },
689 ))
690 })
691 .map_err(db_err(&self.path))?;
692 let mut out: BTreeMap<String, BTreeMap<String, ColumnType>> = BTreeMap::new();
693 for row in rows {
694 let (key, name, ty) = row.map_err(db_err(&self.path))?;
695 out.entry(key).or_default().insert(name, ty);
696 }
697 Ok(out)
698 }
699
700 pub(crate) fn load_external_type_digests(
702 &self,
703 ) -> Result<BTreeMap<String, String>, CacheError> {
704 let mut stmt = self
705 .conn
706 .prepare("SELECT object_key, digest FROM external_type_digest")
707 .map_err(db_err(&self.path))?;
708 let rows = stmt
709 .query_map([], |row| {
710 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
711 })
712 .map_err(db_err(&self.path))?;
713 let mut out = BTreeMap::new();
714 for row in rows {
715 let (key, digest) = row.map_err(db_err(&self.path))?;
716 out.insert(key, digest);
717 }
718 Ok(out)
719 }
720
721 pub(crate) fn replace_external_type_digests(
724 &mut self,
725 digests: &BTreeMap<String, String>,
726 ) -> Result<(), CacheError> {
727 let db_err = db_err(&self.path);
728 let tx = self.conn.transaction().map_err(&db_err)?;
729 tx.execute("DELETE FROM external_type_digest", [])
730 .map_err(&db_err)?;
731 {
732 let mut stmt = tx
733 .prepare("INSERT INTO external_type_digest(object_key, digest) VALUES(?1, ?2)")
734 .map_err(&db_err)?;
735 for (key, digest) in digests {
736 stmt.execute(params![key, digest]).map_err(&db_err)?;
737 }
738 }
739 tx.commit().map_err(&db_err)
740 }
741
742 pub(crate) fn write_project(
745 &mut self,
746 project: &graph::Project,
747 changed_keys: &BTreeSet<String>,
748 deleted_keys: &BTreeSet<String>,
749 root: &Path,
750 ) -> Result<(), CacheError> {
751 const PER_OBJECT_TABLES: &[&str] = &[
752 "project_objects",
753 "project_dependencies",
754 "project_comments",
755 "project_indexes",
756 "project_grants",
757 "project_tests",
758 "project_aliases",
759 "project_infrastructure",
760 "project_infrastructure_properties",
761 ];
762
763 let db_err = db_err(&self.path);
764
765 let tx = self.conn.transaction().map_err(&db_err)?;
766
767 if !changed_keys.is_empty() || !deleted_keys.is_empty() {
768 for table in PER_OBJECT_TABLES {
769 let mut stmt = tx
770 .prepare(&format!("DELETE FROM {table} WHERE object_key = ?1"))
771 .map_err(&db_err)?;
772 for key in changed_keys.iter().chain(deleted_keys.iter()) {
773 stmt.execute(params![key]).map_err(&db_err)?;
774 }
775 }
776 }
777
778 tx.execute_batch(
779 "
780 DELETE FROM project_databases;
781 DELETE FROM project_schemas;
782 DELETE FROM project_external_dependencies;
783 DELETE FROM project_cluster_dependencies;
784 DELETE FROM project_replacement_schemas;
785 DELETE FROM project_mod_statements;
786 ",
787 )
788 .map_err(&db_err)?;
789
790 {
791 let mut stmts = ProjectStatements::new(&tx, &db_err)?;
792
793 for db in &project.databases {
794 stmts.ins_db.execute(params![&db.name]).map_err(&db_err)?;
795
796 for schema in &db.schemas {
797 let schema_type = schema.schema_type.to_string();
798 stmts
799 .ins_schema
800 .execute(params![&db.name, &schema.name, schema_type.as_str()])
801 .map_err(&db_err)?;
802
803 for obj in &schema.objects {
804 if changed_keys.contains(&obj.id.to_string()) {
805 stmts.insert_object(obj, &db.name, &schema.name, root, &db_err)?;
806 }
807 }
808 }
809
810 stmts.insert_mod_statements(db, &db_err)?;
811 }
812
813 for ext_dep in &project.external_dependencies {
814 stmts
815 .ins_ext_dep
816 .execute(params![ext_dep.to_string()])
817 .map_err(&db_err)?;
818 }
819 for cluster in &project.cluster_dependencies {
820 stmts
821 .ins_cluster_dep
822 .execute(params![&cluster.name])
823 .map_err(&db_err)?;
824 }
825 for rs in &project.replacement_schemas {
826 stmts
827 .ins_repl_schema
828 .execute(params![&rs.database, &rs.schema])
829 .map_err(&db_err)?;
830 }
831 }
832
833 tx.commit().map_err(&db_err)
834 }
835
836 fn load_row_keys(&self, table: &str) -> Result<BTreeSet<String>, CacheError> {
837 let mut stmt = self
838 .conn
839 .prepare(&format!("SELECT object_key FROM {table}"))
840 .map_err(db_err(&self.path))?;
841 let rows = stmt
842 .query_map([], |row| row.get::<_, String>(0))
843 .map_err(db_err(&self.path))?;
844 let mut keys = BTreeSet::new();
845 for row in rows {
846 keys.insert(row.map_err(db_err(&self.path))?);
847 }
848 Ok(keys)
849 }
850
851 fn prune_rows(&mut self, table: &str, keep: &BTreeSet<String>) -> Result<(), CacheError> {
852 let existing = self.load_row_keys(table)?;
853 let tx = self.conn.transaction().map_err(db_err(&self.path))?;
854 {
855 let mut stmt = tx
856 .prepare(&format!("DELETE FROM {table} WHERE object_key = ?1"))
857 .map_err(db_err(&self.path))?;
858 for key in &existing {
859 if !keep.contains(key) {
860 stmt.execute([key]).map_err(db_err(&self.path))?;
861 }
862 }
863 }
864 tx.commit().map_err(db_err(&self.path))
865 }
866}
867
868struct ProjectStatements<'tx> {
870 ins_db: rusqlite::Statement<'tx>,
871 ins_schema: rusqlite::Statement<'tx>,
872 ins_obj: rusqlite::Statement<'tx>,
873 ins_dep: rusqlite::Statement<'tx>,
874 ins_comment: rusqlite::Statement<'tx>,
875 ins_index: rusqlite::Statement<'tx>,
876 ins_grant: rusqlite::Statement<'tx>,
877 ins_test: rusqlite::Statement<'tx>,
878 ins_infra: rusqlite::Statement<'tx>,
879 ins_infra_prop: rusqlite::Statement<'tx>,
880 ins_ext_dep: rusqlite::Statement<'tx>,
881 ins_cluster_dep: rusqlite::Statement<'tx>,
882 ins_repl_schema: rusqlite::Statement<'tx>,
883 ins_alias: rusqlite::Statement<'tx>,
884 ins_mod_stmt: rusqlite::Statement<'tx>,
885}
886
887impl<'tx> ProjectStatements<'tx> {
888 fn new(
889 tx: &'tx rusqlite::Transaction<'_>,
890 db_err: &impl Fn(rusqlite::Error) -> CacheError,
891 ) -> Result<Self, CacheError> {
892 Ok(Self {
893 ins_db: tx
894 .prepare("INSERT INTO project_databases (name) VALUES (?1)")
895 .map_err(db_err)?,
896 ins_schema: tx
897 .prepare("INSERT INTO project_schemas (database, name, schema_type) VALUES (?1, ?2, ?3)")
898 .map_err(db_err)?,
899 ins_obj: tx
900 .prepare("INSERT INTO project_objects (object_key, database, schema, name, object_kind, cluster, file_path, sql_text) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)")
901 .map_err(db_err)?,
902 ins_dep: tx
903 .prepare("INSERT INTO project_dependencies (object_key, dependency_key) VALUES (?1, ?2)")
904 .map_err(db_err)?,
905 ins_comment: tx
906 .prepare("INSERT INTO project_comments (object_key, comment_type, target_column, comment_text, sql_text) VALUES (?1, ?2, ?3, ?4, ?5)")
907 .map_err(db_err)?,
908 ins_index: tx
909 .prepare("INSERT INTO project_indexes (object_key, index_name, cluster, columns, sql_text) VALUES (?1, ?2, ?3, ?4, ?5)")
910 .map_err(db_err)?,
911 ins_grant: tx
912 .prepare("INSERT INTO project_grants (object_key, privilege, grantee, sql_text) VALUES (?1, ?2, ?3, ?4)")
913 .map_err(db_err)?,
914 ins_test: tx
915 .prepare("INSERT INTO project_tests (object_key, test_name, sql_text) VALUES (?1, ?2, ?3)")
916 .map_err(db_err)?,
917 ins_infra: tx
918 .prepare("INSERT INTO project_infrastructure (object_key, infra_type, connector_type, connection_ref, source_ref, external_reference) VALUES (?1, ?2, ?3, ?4, ?5, ?6)")
919 .map_err(db_err)?,
920 ins_infra_prop: tx
921 .prepare("INSERT INTO project_infrastructure_properties (object_key, property_key, property_value, secret_ref, object_ref) VALUES (?1, ?2, ?3, ?4, ?5)")
922 .map_err(db_err)?,
923 ins_ext_dep: tx
924 .prepare("INSERT INTO project_external_dependencies (object_key) VALUES (?1)")
925 .map_err(db_err)?,
926 ins_cluster_dep: tx
927 .prepare("INSERT INTO project_cluster_dependencies (cluster_name) VALUES (?1)")
928 .map_err(db_err)?,
929 ins_repl_schema: tx
930 .prepare("INSERT INTO project_replacement_schemas (database, schema) VALUES (?1, ?2)")
931 .map_err(db_err)?,
932 ins_alias: tx
933 .prepare("INSERT INTO project_aliases (object_key, alias, target_fqn) VALUES (?1, ?2, ?3)")
934 .map_err(db_err)?,
935 ins_mod_stmt: tx
936 .prepare("INSERT INTO project_mod_statements (database, schema, position, sql_text) VALUES (?1, ?2, ?3, ?4)")
937 .map_err(db_err)?,
938 })
939 }
940
941 fn insert_object(
942 &mut self,
943 obj: &graph::DatabaseObject,
944 db_name: &str,
945 schema_name: &str,
946 root: &Path,
947 db_err: &impl Fn(rusqlite::Error) -> CacheError,
948 ) -> Result<(), CacheError> {
949 let object_key = obj.id.to_string();
950 let typed = &obj.typed_object;
951 let kind = typed.stmt.kind().as_str();
952 let cluster = statement_cluster(&typed.stmt);
953 let file_path = typed
954 .path
955 .strip_prefix(root)
956 .unwrap_or(&typed.path)
957 .to_string_lossy()
958 .to_string();
959 let sql_text = format!("{};", typed.stmt);
960
961 self.ins_obj
962 .execute(params![
963 &object_key,
964 db_name,
965 schema_name,
966 obj.id.object(),
967 kind,
968 &cluster,
969 &file_path,
970 &sql_text,
971 ])
972 .map_err(db_err)?;
973
974 for dep in &obj.dependencies {
975 self.ins_dep
976 .execute(params![&object_key, dep.to_string()])
977 .map_err(db_err)?;
978 }
979
980 for comment in &typed.comments {
981 let (comment_type, target_column) = match &comment.object {
982 CommentObjectType::Column { name } => ("column", Some(name.column.to_string())),
983 _ => ("object", None),
984 };
985 let comment_text = comment.comment.as_deref().unwrap_or("");
986 let comment_sql = format!("{};", comment);
987 self.ins_comment
988 .execute(params![
989 &object_key,
990 comment_type,
991 &target_column,
992 comment_text,
993 &comment_sql,
994 ])
995 .map_err(db_err)?;
996 }
997
998 for idx in &typed.indexes {
999 let index_name = idx.name.as_ref().map(|n| n.to_string()).unwrap_or_default();
1000 let idx_cluster = idx.in_cluster.as_ref().map(|c| c.to_string());
1001 let columns_str = idx
1002 .key_parts
1003 .as_ref()
1004 .map(|parts| {
1005 parts
1006 .iter()
1007 .map(|p| p.to_string())
1008 .collect::<Vec<_>>()
1009 .join(", ")
1010 })
1011 .unwrap_or_default();
1012 let idx_sql = format!("{};", idx);
1013 self.ins_index
1014 .execute(params![
1015 &object_key,
1016 &index_name,
1017 &idx_cluster,
1018 &columns_str,
1019 &idx_sql,
1020 ])
1021 .map_err(db_err)?;
1022 }
1023
1024 for grant in &typed.grants {
1025 let privilege = grant.privileges.to_string();
1026 let grantee = grant
1027 .roles
1028 .iter()
1029 .map(|r| r.to_string())
1030 .collect::<Vec<_>>()
1031 .join(", ");
1032 let grant_sql = format!("{};", grant);
1033 self.ins_grant
1034 .execute(params![&object_key, &privilege, &grantee, &grant_sql])
1035 .map_err(db_err)?;
1036 }
1037
1038 for test in &typed.tests {
1039 let test_name = test.name.to_string();
1040 let test_sql = format!("{};", test);
1041 self.ins_test
1042 .execute(params![&object_key, &test_name, &test_sql])
1043 .map_err(db_err)?;
1044 }
1045
1046 if let Some(infra) = infrastructure::extract(&typed.stmt) {
1047 self.insert_infrastructure(&object_key, &infra, db_err)?;
1048 }
1049
1050 let aliases = extract_alias_map(&typed.stmt, db_name, schema_name);
1051 for (alias, target_fqn) in &aliases {
1052 self.ins_alias
1053 .execute(params![&object_key, alias, target_fqn])
1054 .map_err(db_err)?;
1055 }
1056
1057 Ok(())
1058 }
1059
1060 fn insert_infrastructure(
1061 &mut self,
1062 object_key: &str,
1063 infra: &Infrastructure,
1064 db_err: &impl Fn(rusqlite::Error) -> CacheError,
1065 ) -> Result<(), CacheError> {
1066 let (infra_type, connector_type, connection_ref, source_ref, external_reference) =
1067 match infra {
1068 Infrastructure::Connection { connector_type, .. } => (
1069 "connection",
1070 Some(connector_type.as_str()),
1071 None,
1072 None,
1073 None,
1074 ),
1075 Infrastructure::Source {
1076 connector_type,
1077 connection_ref,
1078 ..
1079 } => (
1080 "source",
1081 Some(connector_type.as_str()),
1082 connection_ref.as_deref(),
1083 None,
1084 None,
1085 ),
1086 Infrastructure::TableFromSource {
1087 source_ref,
1088 external_reference,
1089 } => (
1090 "table-from-source",
1091 None,
1092 None,
1093 Some(source_ref.as_str()),
1094 external_reference.as_deref(),
1095 ),
1096 };
1097
1098 self.ins_infra
1099 .execute(params![
1100 object_key,
1101 infra_type,
1102 connector_type,
1103 connection_ref,
1104 source_ref,
1105 external_reference,
1106 ])
1107 .map_err(db_err)?;
1108
1109 let properties = match infra {
1110 Infrastructure::Connection { properties, .. }
1111 | Infrastructure::Source { properties, .. } => properties.as_slice(),
1112 Infrastructure::TableFromSource { .. } => &[],
1113 };
1114 for prop in properties {
1115 self.ins_infra_prop
1116 .execute(params![
1117 object_key,
1118 &prop.key,
1119 &prop.value,
1120 &prop.secret_ref,
1121 &prop.object_ref,
1122 ])
1123 .map_err(db_err)?;
1124 }
1125
1126 Ok(())
1127 }
1128
1129 fn insert_mod_statements(
1130 &mut self,
1131 db: &graph::Database,
1132 db_err: &impl Fn(rusqlite::Error) -> CacheError,
1133 ) -> Result<(), CacheError> {
1134 if let Some(stmts) = &db.mod_statements {
1135 for (pos, stmt) in stmts.iter().enumerate() {
1136 self.ins_mod_stmt
1137 .execute(params![
1138 &db.name,
1139 Option::<String>::None,
1140 i64::try_from(pos).unwrap_or(0),
1141 format!("{};", stmt),
1142 ])
1143 .map_err(db_err)?;
1144 }
1145 }
1146
1147 for schema in &db.schemas {
1148 if let Some(stmts) = &schema.mod_statements {
1149 for (pos, stmt) in stmts.iter().enumerate() {
1150 self.ins_mod_stmt
1151 .execute(params![
1152 &db.name,
1153 Some(&schema.name),
1154 i64::try_from(pos).unwrap_or(0),
1155 format!("{};", stmt),
1156 ])
1157 .map_err(db_err)?;
1158 }
1159 }
1160 }
1161
1162 Ok(())
1163 }
1164}
1165
1166fn file_metadata_signature(path: &Path) -> Result<(i64, i64), CacheError> {
1167 let metadata = fs::metadata(path).map_err(|source| CacheError::FileReadFailed {
1168 path: path.to_path_buf(),
1169 source,
1170 })?;
1171 let size = i64::try_from(metadata.len()).unwrap_or(i64::MAX);
1172 let modified = metadata
1173 .modified()
1174 .map_err(|source| CacheError::FileReadFailed {
1175 path: path.to_path_buf(),
1176 source,
1177 })?;
1178 let duration =
1179 modified
1180 .duration_since(UNIX_EPOCH)
1181 .map_err(|source| CacheError::FileReadFailed {
1182 path: path.to_path_buf(),
1183 source: std::io::Error::other(source),
1184 })?;
1185 let mtime_ns = i64::try_from(duration.as_nanos()).unwrap_or(i64::MAX);
1188 Ok((size, mtime_ns))
1189}
1190
1191fn statement_cluster(stmt: &Statement) -> Option<String> {
1193 use crate::project::ast::Statement;
1194
1195 let in_cluster = match stmt {
1196 Statement::CreateMaterializedView(mv) => mv.in_cluster.as_ref(),
1197 Statement::CreateSource(source) => source.in_cluster.as_ref(),
1198 Statement::CreateSink(sink) => sink.in_cluster.as_ref(),
1199 Statement::CreateView(_)
1200 | Statement::CreateTable(_)
1201 | Statement::CreateTableFromSource(_)
1202 | Statement::CreateSecret(_)
1203 | Statement::CreateConnection(_) => None,
1204 };
1205
1206 match in_cluster {
1207 Some(RawClusterName::Unresolved(ident)) => Some(ident.to_string()),
1208 _ => None,
1209 }
1210}
1211
1212#[derive(Debug, Clone)]
1214pub(crate) struct ObjectStateRow {
1215 pub object_key: String,
1217 pub fingerprint: String,
1220 pub artifact: CompiledObjectArtifact,
1221}
1222
1223struct AliasVisitor<'a> {
1226 default_db: &'a str,
1227 default_schema: &'a str,
1228 aliases: BTreeMap<String, String>,
1229 cte_scope: CteScope,
1230}
1231
1232impl<'a> AliasVisitor<'a> {
1233 fn new(default_db: &'a str, default_schema: &'a str) -> Self {
1234 Self {
1235 default_db,
1236 default_schema,
1237 aliases: BTreeMap::new(),
1238 cte_scope: CteScope::new(),
1239 }
1240 }
1241}
1242
1243impl<'ast> Visit<'ast, Raw> for AliasVisitor<'_> {
1244 fn visit_query(&mut self, node: &'ast mz_sql_parser::ast::Query<Raw>) {
1245 let names = CteScope::collect_cte_names(&node.ctes);
1246 self.cte_scope.push(names);
1247 visit::visit_query(self, node);
1248 self.cte_scope.pop();
1249 }
1250
1251 fn visit_table_factor(&mut self, node: &'ast TableFactor<Raw>) {
1252 match node {
1253 TableFactor::Table { name, alias } => {
1254 let unresolved = name.name();
1255 if unresolved.0.len() == 1 && self.cte_scope.is_cte(&unresolved.0[0].to_string()) {
1256 return;
1257 }
1258 let obj_id =
1259 ObjectId::from_raw_item_name(name, self.default_db, self.default_schema);
1260 let fqn = obj_id.to_string();
1261 if let Some(bare) = unresolved.0.last().map(|i| i.to_string().to_lowercase()) {
1262 self.aliases.insert(bare, fqn.clone());
1263 }
1264 if let Some(alias) = alias {
1265 self.aliases
1266 .insert(alias.name.to_string().to_lowercase(), fqn);
1267 }
1268 }
1269 TableFactor::NestedJoin { .. } => {
1270 visit::visit_table_factor(self, node);
1271 }
1272 TableFactor::Derived { .. }
1274 | TableFactor::Function { .. }
1275 | TableFactor::RowsFrom { .. } => {}
1276 }
1277 }
1278}
1279
1280pub(crate) fn extract_alias_map(
1286 stmt: &Statement,
1287 default_db: &str,
1288 default_schema: &str,
1289) -> BTreeMap<String, String> {
1290 let mut visitor = AliasVisitor::new(default_db, default_schema);
1291 match stmt {
1292 Statement::CreateView(s) => {
1293 visitor.visit_query(&s.definition.query);
1294 }
1295 Statement::CreateMaterializedView(s) => {
1296 visitor.visit_query(&s.query);
1297 }
1298 _ => {}
1299 }
1300 visitor.aliases
1301}
1302
1303#[cfg(test)]
1304mod tests {
1305 use super::*;
1306 use tempfile::tempdir;
1307
1308 fn open_db(root: &Path) -> BuildArtifact {
1309 BuildArtifact::open(root, "default", None, &BTreeMap::new()).unwrap()
1310 }
1311
1312 #[cfg_attr(miri, ignore)] #[mz_ore::test]
1314 fn load_file_contents_treats_null_contents_as_miss() {
1315 let temp = tempdir().unwrap();
1316 let file = temp.path().join("model.sql");
1317 fs::write(&file, "CREATE VIEW v AS SELECT 1;").unwrap();
1318
1319 let mut db = open_db(temp.path());
1320 let paths = BTreeSet::from([file.clone()]);
1321 let fs = crate::fs::FileSystem::new();
1322 db.load_file_hashes(&fs, &paths).unwrap();
1323 db.conn
1324 .execute(
1325 "UPDATE file_state SET contents = NULL WHERE path = ?1",
1326 [file.to_string_lossy().to_string()],
1327 )
1328 .unwrap();
1329
1330 let entries = db.load_file_contents(&fs, &paths).unwrap();
1331 assert_eq!(
1332 entries.get(&file).map(String::as_str),
1333 Some("CREATE VIEW v AS SELECT 1;")
1334 );
1335
1336 let repaired: Option<String> = db
1337 .conn
1338 .query_row(
1339 "SELECT contents FROM file_state WHERE path = ?1",
1340 [file.to_string_lossy().to_string()],
1341 |row| row.get(0),
1342 )
1343 .optional()
1344 .unwrap();
1345 assert_eq!(repaired.as_deref(), Some("CREATE VIEW v AS SELECT 1;"));
1346 }
1347
1348 #[cfg_attr(miri, ignore)] #[mz_ore::test]
1350 fn prune_rows_removes_stale_object_and_typecheck_entries() {
1351 let temp = tempdir().unwrap();
1352 let mut db = open_db(temp.path());
1353 db.upsert_object_rows(&[
1354 ObjectStateRow {
1355 object_key: "db.public.keep".into(),
1356 fingerprint: "keep".into(),
1357 artifact: CompiledObjectArtifact::Skipped,
1358 },
1359 ObjectStateRow {
1360 object_key: "db.public.drop".into(),
1361 fingerprint: "drop".into(),
1362 artifact: CompiledObjectArtifact::Skipped,
1363 },
1364 ])
1365 .unwrap();
1366 db.upsert_typecheck_results(&[
1367 ("db.public.keep".into(), "view".into(), BTreeMap::new()),
1368 ("db.public.drop".into(), "view".into(), BTreeMap::new()),
1369 ])
1370 .unwrap();
1371
1372 let keep = BTreeSet::from([String::from("db.public.keep")]);
1373 db.prune_object_rows(&keep).unwrap();
1374 db.prune_typecheck_results(&keep).unwrap();
1375
1376 let object_keys = db.load_row_keys(OBJECT_STATE_TABLE).unwrap();
1377 let typecheck_keys = db.load_row_keys(TYPECHECK_OBJECTS_TABLE).unwrap();
1378 assert_eq!(object_keys, keep);
1379 assert_eq!(typecheck_keys, keep);
1380 }
1381
1382 fn parse_stmt(sql: &str) -> Statement {
1384 let parsed = mz_sql_parser::parser::parse_statements(sql).unwrap();
1385 match parsed.into_iter().next().unwrap().ast {
1386 mz_sql_parser::ast::Statement::CreateView(s) => Statement::CreateView(s),
1387 mz_sql_parser::ast::Statement::CreateMaterializedView(s) => {
1388 Statement::CreateMaterializedView(s)
1389 }
1390 mz_sql_parser::ast::Statement::CreateTable(s) => Statement::CreateTable(s),
1391 other => panic!("Unexpected statement type: {:?}", other),
1392 }
1393 }
1394
1395 fn all_object_keys(project: &graph::Project) -> BTreeSet<String> {
1396 project
1397 .databases
1398 .iter()
1399 .flat_map(|db| {
1400 db.schemas
1401 .iter()
1402 .flat_map(|s| s.objects.iter().map(|o| o.id.to_string()))
1403 })
1404 .collect()
1405 }
1406
1407 fn make_project(db_name: &str, schema_name: &str, stmt: Statement) -> graph::Project {
1408 use crate::project::ir::compiled;
1409
1410 let typed_obj = compiled::DatabaseObject {
1411 path: PathBuf::from("test.sql"),
1412 stmt,
1413 indexes: vec![],
1414 grants: vec![],
1415 comments: vec![],
1416 tests: vec![],
1417 };
1418 let obj_id = ObjectId::new(
1419 db_name.to_string(),
1420 schema_name.to_string(),
1421 typed_obj.stmt.ident().object.as_str().to_string(),
1422 );
1423 let db_obj = graph::DatabaseObject {
1424 id: obj_id,
1425 typed_object: typed_obj,
1426 dependencies: BTreeSet::new(),
1427 };
1428 graph::Project {
1429 databases: vec![graph::Database {
1430 name: db_name.to_string(),
1431 schemas: vec![graph::Schema {
1432 name: schema_name.to_string(),
1433 objects: vec![db_obj],
1434 mod_statements: None,
1435 schema_type: graph::SchemaType::Compute,
1436 }],
1437 mod_statements: None,
1438 }],
1439 dependency_graph: BTreeMap::new(),
1440 external_dependencies: BTreeSet::new(),
1441 cluster_dependencies: BTreeSet::new(),
1442 tests: vec![],
1443 replacement_schemas: BTreeSet::new(),
1444 compile_dirty: BTreeSet::new(),
1445 }
1446 }
1447
1448 #[cfg_attr(miri, ignore)] #[mz_ore::test]
1450 fn write_project_persists_aliases() {
1451 let temp = tempdir().unwrap();
1452 let mut db = open_db(temp.path());
1453
1454 let stmt = parse_stmt("CREATE VIEW v AS SELECT o.id FROM orders AS o");
1455 let project = make_project("mydb", "public", stmt);
1456 let changed = all_object_keys(&project);
1457 db.write_project(&project, &changed, &BTreeSet::new(), temp.path())
1458 .unwrap();
1459
1460 let rows: Vec<(String, String, String)> = db
1461 .conn
1462 .prepare("SELECT object_key, alias, target_fqn FROM project_aliases ORDER BY alias")
1463 .unwrap()
1464 .query_map([], |row| Ok((row.get(0)?, row.get(1)?, row.get(2)?)))
1465 .unwrap()
1466 .collect::<Result<Vec<_>, _>>()
1467 .unwrap();
1468
1469 assert_eq!(rows.len(), 2);
1470
1471 let o_row = rows.iter().find(|(_, alias, _)| alias == "o").unwrap();
1472 assert_eq!(o_row.0, "mydb.public.v");
1473 assert_eq!(o_row.2, "mydb.public.orders");
1474
1475 let orders_row = rows.iter().find(|(_, alias, _)| alias == "orders").unwrap();
1476 assert_eq!(orders_row.0, "mydb.public.v");
1477 assert_eq!(orders_row.2, "mydb.public.orders");
1478 }
1479
1480 #[cfg_attr(miri, ignore)] #[mz_ore::test]
1482 fn write_project_no_aliases_for_table_stmt() {
1483 let temp = tempdir().unwrap();
1484 let mut db = open_db(temp.path());
1485
1486 let stmt = parse_stmt("CREATE TABLE t (id INT)");
1487 let project = make_project("mydb", "public", stmt);
1488 let changed = all_object_keys(&project);
1489 db.write_project(&project, &changed, &BTreeSet::new(), temp.path())
1490 .unwrap();
1491
1492 let count: i64 = db
1493 .conn
1494 .query_row("SELECT COUNT(*) FROM project_aliases", [], |row| row.get(0))
1495 .unwrap();
1496 assert_eq!(count, 0);
1497 }
1498}