1use crate::project::ir::object_id::ObjectId;
18use crate::types::{ColumnType, ObjectKind};
19use rusqlite::{Connection, OpenFlags, params};
20use std::collections::{BTreeMap, BTreeSet};
21use std::path::Path;
22
23#[derive(Debug, Clone)]
25pub struct CachedObject {
26 pub fqn: String,
27 pub database: String,
28 pub schema: String,
29 pub name: String,
30 pub kind: ObjectKind,
31 pub cluster: Option<String>,
32 pub file_path: String,
33 pub sql_text: String,
34 pub comments: Vec<CachedComment>,
35 pub indexes: Vec<CachedIndex>,
36 pub grants: Vec<CachedGrant>,
37 pub aliases: BTreeMap<String, String>,
38 pub infrastructure: Option<CachedInfrastructure>,
39}
40
41#[derive(Debug, Clone)]
43pub struct CachedObjectSummary {
44 pub fqn: String,
45 pub database: String,
46 pub schema: String,
47 pub name: String,
48 pub kind: ObjectKind,
49 pub cluster: Option<String>,
50 pub file_path: String,
51}
52
53#[derive(Debug, Clone)]
55pub struct CachedDatabase {
56 pub name: String,
57 pub schemas: Vec<CachedSchema>,
58}
59
60#[derive(Debug, Clone)]
62pub struct CachedSchema {
63 pub name: String,
64 pub schema_type: String,
65 pub objects: Vec<CachedObject>,
66}
67
68#[derive(Debug, Clone)]
70pub struct CachedComment {
71 pub comment_type: String,
72 pub target_column: Option<String>,
73 pub text: String,
74 pub sql_text: String,
75}
76
77#[derive(Debug, Clone)]
79pub struct CachedIndex {
80 pub name: String,
81 pub cluster: Option<String>,
82 pub columns: String,
83 pub sql_text: String,
84}
85
86#[derive(Debug, Clone)]
88pub struct CachedGrant {
89 pub privilege: String,
90 pub grantee: String,
91 pub sql_text: String,
92}
93
94#[derive(Debug, Clone)]
96pub struct CachedInfrastructure {
97 pub infra_type: String,
98 pub connector_type: Option<String>,
99 pub connection_ref: Option<String>,
100 pub source_ref: Option<String>,
101 pub external_reference: Option<String>,
102 pub properties: Vec<CachedProperty>,
103}
104
105#[derive(Debug, Clone)]
107pub struct CachedProperty {
108 pub key: String,
109 pub value: String,
110 pub secret_ref: Option<String>,
111 pub object_ref: Option<String>,
112}
113
114#[derive(Debug, Clone)]
116pub struct CachedTest {
117 pub name: String,
118 pub sql_text: String,
119}
120
121pub struct ProjectCache {
126 conn: Connection,
127}
128
129impl ProjectCache {
130 pub fn open(
135 directory: &Path,
136 profile: &str,
137 profile_suffix: Option<&str>,
138 variables: &BTreeMap<String, String>,
139 ) -> Result<Option<Self>, super::CacheError> {
140 let path = super::db_path(directory, profile, profile_suffix, variables);
141 if !path.exists() {
142 return Ok(None);
143 }
144 let conn = Connection::open_with_flags(
145 &path,
146 OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
147 )
148 .map_err(|source| super::CacheError::DatabaseOpenFailed {
149 path: path.clone(),
150 source,
151 })?;
152 Ok(Some(Self { conn }))
153 }
154
155 fn query_vec<T, P, F>(&self, sql: &str, params: P, map: F) -> Vec<T>
158 where
159 P: rusqlite::Params,
160 F: FnMut(&rusqlite::Row<'_>) -> rusqlite::Result<T>,
161 {
162 let Ok(mut stmt) = self.conn.prepare(sql) else {
163 return Vec::new();
164 };
165 let Ok(rows) = stmt.query_map(params, map) else {
166 return Vec::new();
167 };
168 rows.filter_map(|r| r.ok()).collect()
169 }
170
171 pub fn get_columns(&self, id: &ObjectId) -> Option<BTreeMap<String, ColumnType>> {
173 let mut stmt = self
174 .conn
175 .prepare(
176 "SELECT column_name, column_type, nullable, position \
177 FROM typecheck_columns WHERE object_key = ?1",
178 )
179 .ok()?;
180 let rows = stmt
181 .query_map(params![id.to_string()], |row| {
182 Ok((
183 row.get::<_, String>(0)?,
184 ColumnType {
185 r#type: row.get(1)?,
186 nullable: row.get::<_, i32>(2)? != 0,
187 position: usize::try_from(row.get::<_, i64>(3)?).unwrap_or(0),
188 comment: None,
189 },
190 ))
191 })
192 .ok()?;
193 let mut columns = BTreeMap::new();
194 for row in rows {
195 let (name, col_type) = row.ok()?;
196 columns.insert(name, col_type);
197 }
198 if columns.is_empty() {
199 None
200 } else {
201 Some(columns)
202 }
203 }
204
205 pub fn get_kind(&self, id: &ObjectId) -> Option<ObjectKind> {
207 self.conn
208 .query_row(
209 "SELECT object_kind FROM typecheck_objects WHERE object_key = ?1",
210 params![id.to_string()],
211 |row| {
212 let kind_str: String = row.get(0)?;
213 Ok(ObjectKind::from_db_str(&kind_str))
214 },
215 )
216 .ok()
217 }
218
219 pub fn get_column_names(&self, ids: &[&ObjectId]) -> BTreeMap<String, BTreeSet<String>> {
224 if ids.is_empty() {
225 return BTreeMap::new();
226 }
227 let placeholders: Vec<String> = (1..=ids.len()).map(|i| format!("?{}", i)).collect();
228 let sql = format!(
229 "SELECT object_key, column_name FROM typecheck_columns WHERE object_key IN ({})",
230 placeholders.join(", ")
231 );
232 let mut stmt = match self.conn.prepare(&sql) {
233 Ok(s) => s,
234 Err(_) => return BTreeMap::new(),
235 };
236 let key_strings: Vec<String> = ids.iter().map(|id| id.to_string()).collect();
237 let params: Vec<&dyn rusqlite::ToSql> = key_strings
238 .iter()
239 .map(|s| -> &dyn rusqlite::ToSql { s })
240 .collect();
241 let rows = match stmt.query_map(params.as_slice(), |row| {
242 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
243 }) {
244 Ok(r) => r,
245 Err(_) => return BTreeMap::new(),
246 };
247 let mut result: BTreeMap<String, BTreeSet<String>> = BTreeMap::new();
248 for row in rows {
249 if let Ok((key, col)) = row {
250 result
251 .entry(key.to_lowercase())
252 .or_default()
253 .insert(col.to_lowercase());
254 }
255 }
256 result
257 }
258
259 pub fn get_object(&self, id: &ObjectId) -> Option<CachedObject> {
261 let fqn = id.to_string();
262 let row = self
264 .conn
265 .query_row(
266 "SELECT database, schema, name, object_kind, cluster, \
267 file_path, sql_text \
268 FROM project_objects WHERE object_key = ?1",
269 params![fqn],
270 |row| {
271 Ok((
272 row.get::<_, String>(0)?,
273 row.get::<_, String>(1)?,
274 row.get::<_, String>(2)?,
275 row.get::<_, String>(3)?,
276 row.get::<_, Option<String>>(4)?,
277 row.get::<_, String>(5)?,
278 row.get::<_, String>(6)?,
279 ))
280 },
281 )
282 .ok()?;
283
284 let (database, schema, name, kind_str, cluster, file_path, sql_text) = row;
285 let kind = ObjectKind::from_db_str(&kind_str);
286
287 let comments = self.query_comments(&fqn);
288 let indexes = self.query_indexes(&fqn);
289 let grants = self.query_grants(&fqn);
290 let aliases = self.query_aliases(&fqn);
291 let infrastructure = self.query_infrastructure(&fqn);
292
293 Some(CachedObject {
294 fqn,
295 database,
296 schema,
297 name,
298 kind,
299 cluster,
300 file_path,
301 sql_text,
302 comments,
303 indexes,
304 grants,
305 aliases,
306 infrastructure,
307 })
308 }
309
310 pub fn get_object_by_path(&self, file_path: &str) -> Option<CachedObject> {
312 let fqn: String = self
313 .conn
314 .query_row(
315 "SELECT object_key FROM project_objects WHERE file_path = ?1",
316 params![file_path],
317 |row| row.get(0),
318 )
319 .ok()?;
320 let id = fqn.parse::<ObjectId>().ok()?;
321 self.get_object(&id)
322 }
323
324 pub fn list_objects(&self) -> Vec<CachedObjectSummary> {
326 self.query_vec(
327 "SELECT object_key, database, schema, name, object_kind, cluster, \
328 file_path FROM project_objects",
329 [],
330 |row| {
331 Ok(CachedObjectSummary {
332 fqn: row.get(0)?,
333 database: row.get(1)?,
334 schema: row.get(2)?,
335 name: row.get(3)?,
336 kind: ObjectKind::from_db_str(&row.get::<_, String>(4)?),
337 cluster: row.get(5)?,
338 file_path: row.get(6)?,
339 })
340 },
341 )
342 }
343
344 pub fn list_databases_with_objects(&self) -> Vec<CachedDatabase> {
347 let db_names: Vec<String> =
348 self.query_vec("SELECT name FROM project_databases", [], |row| row.get(0));
349
350 db_names
351 .into_iter()
352 .map(|db_name| {
353 let schema_rows: Vec<(String, String)> = self.query_vec(
354 "SELECT name, schema_type FROM project_schemas WHERE database = ?1",
355 params![&db_name],
356 |row| Ok((row.get(0)?, row.get(1)?)),
357 );
358 let schemas = schema_rows
359 .into_iter()
360 .map(|(schema_name, schema_type)| {
361 let object_ids = self.query_object_keys_in_schema(&db_name, &schema_name);
362 let objects = object_ids
363 .iter()
364 .filter_map(|id| self.get_object(id))
365 .collect();
366 CachedSchema {
367 name: schema_name,
368 schema_type,
369 objects,
370 }
371 })
372 .collect();
373 CachedDatabase {
374 name: db_name,
375 schemas,
376 }
377 })
378 .collect()
379 }
380
381 pub fn list_external_dependencies(&self) -> Vec<ObjectId> {
383 self.query_vec(
384 "SELECT object_key FROM project_external_dependencies",
385 [],
386 |row| row.get::<_, String>(0),
387 )
388 .into_iter()
389 .filter_map(|s| s.parse().ok())
390 .collect()
391 }
392
393 pub fn get_dependencies(&self, id: &ObjectId) -> Vec<ObjectId> {
395 self.query_vec(
396 "SELECT dependency_key FROM project_dependencies WHERE object_key = ?1",
397 params![id.to_string()],
398 |row| row.get::<_, String>(0),
399 )
400 .into_iter()
401 .filter_map(|s| s.parse().ok())
402 .collect()
403 }
404
405 pub fn get_dependents(&self, id: &ObjectId) -> Vec<ObjectId> {
407 self.query_vec(
408 "SELECT object_key FROM project_dependencies WHERE dependency_key = ?1",
409 params![id.to_string()],
410 |row| row.get::<_, String>(0),
411 )
412 .into_iter()
413 .filter_map(|s| s.parse().ok())
414 .collect()
415 }
416
417 pub fn get_tests(&self, id: &ObjectId) -> Vec<CachedTest> {
419 self.query_vec(
420 "SELECT test_name, sql_text FROM project_tests WHERE object_key = ?1",
421 params![id.to_string()],
422 |row| {
423 Ok(CachedTest {
424 name: row.get(0)?,
425 sql_text: row.get(1)?,
426 })
427 },
428 )
429 }
430
431 pub fn get_mod_statements(&self, database: &str, schema: Option<&str>) -> Vec<String> {
433 self.query_vec(
434 "SELECT sql_text FROM project_mod_statements \
435 WHERE database = ?1 AND schema IS ?2 \
436 ORDER BY position",
437 params![database, schema],
438 |row| row.get(0),
439 )
440 }
441
442 fn query_comments(&self, object_key: &str) -> Vec<CachedComment> {
443 self.query_vec(
444 "SELECT comment_type, target_column, comment_text, sql_text \
445 FROM project_comments WHERE object_key = ?1",
446 params![object_key],
447 |row| {
448 Ok(CachedComment {
449 comment_type: row.get(0)?,
450 target_column: row.get(1)?,
451 text: row.get(2)?,
452 sql_text: row.get(3)?,
453 })
454 },
455 )
456 }
457
458 fn query_indexes(&self, object_key: &str) -> Vec<CachedIndex> {
459 self.query_vec(
460 "SELECT index_name, cluster, columns, sql_text \
461 FROM project_indexes WHERE object_key = ?1",
462 params![object_key],
463 |row| {
464 Ok(CachedIndex {
465 name: row.get::<_, Option<String>>(0)?.unwrap_or_default(),
466 cluster: row.get(1)?,
467 columns: row.get(2)?,
468 sql_text: row.get(3)?,
469 })
470 },
471 )
472 }
473
474 fn query_grants(&self, object_key: &str) -> Vec<CachedGrant> {
475 self.query_vec(
476 "SELECT privilege, grantee, sql_text \
477 FROM project_grants WHERE object_key = ?1",
478 params![object_key],
479 |row| {
480 Ok(CachedGrant {
481 privilege: row.get(0)?,
482 grantee: row.get(1)?,
483 sql_text: row.get(2)?,
484 })
485 },
486 )
487 }
488
489 fn query_infrastructure(&self, object_key: &str) -> Option<CachedInfrastructure> {
490 let row = self
491 .conn
492 .query_row(
493 "SELECT infra_type, connector_type, connection_ref, source_ref, external_reference \
494 FROM project_infrastructure WHERE object_key = ?1",
495 params![object_key],
496 |row| {
497 Ok((
498 row.get::<_, String>(0)?,
499 row.get::<_, Option<String>>(1)?,
500 row.get::<_, Option<String>>(2)?,
501 row.get::<_, Option<String>>(3)?,
502 row.get::<_, Option<String>>(4)?,
503 ))
504 },
505 )
506 .ok()?;
507
508 let (infra_type, connector_type, connection_ref, source_ref, external_reference) = row;
509 let properties = self.query_infrastructure_properties(object_key);
510
511 Some(CachedInfrastructure {
512 infra_type,
513 connector_type,
514 connection_ref,
515 source_ref,
516 external_reference,
517 properties,
518 })
519 }
520
521 fn query_infrastructure_properties(&self, object_key: &str) -> Vec<CachedProperty> {
522 self.query_vec(
523 "SELECT property_key, property_value, secret_ref, object_ref \
524 FROM project_infrastructure_properties WHERE object_key = ?1",
525 params![object_key],
526 |row| {
527 Ok(CachedProperty {
528 key: row.get(0)?,
529 value: row.get(1)?,
530 secret_ref: row.get(2)?,
531 object_ref: row.get(3)?,
532 })
533 },
534 )
535 }
536
537 fn query_aliases(&self, object_key: &str) -> BTreeMap<String, String> {
538 self.query_vec(
539 "SELECT alias, target_fqn FROM project_aliases WHERE object_key = ?1",
540 params![object_key],
541 |row| Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?)),
542 )
543 .into_iter()
544 .collect()
545 }
546
547 fn query_object_keys_in_schema(&self, database: &str, schema: &str) -> Vec<ObjectId> {
548 self.query_vec(
549 "SELECT object_key FROM project_objects WHERE database = ?1 AND schema = ?2",
550 params![database, schema],
551 |row| row.get::<_, String>(0),
552 )
553 .into_iter()
554 .filter_map(|s| s.parse().ok())
555 .collect()
556 }
557}
558
559#[cfg(test)]
560mod tests {
561 use super::*;
562 use rusqlite::Connection;
563 use std::collections::BTreeMap;
564
565 fn create_test_db(path: &Path) -> Connection {
567 let conn = Connection::open(path).unwrap();
568 conn.execute_batch(
569 "
570 CREATE TABLE IF NOT EXISTS typecheck_objects (
571 object_key TEXT PRIMARY KEY,
572 object_kind TEXT NOT NULL
573 );
574 CREATE TABLE IF NOT EXISTS typecheck_columns (
575 object_key TEXT NOT NULL,
576 column_name TEXT NOT NULL,
577 column_type TEXT NOT NULL,
578 nullable INTEGER NOT NULL,
579 position INTEGER NOT NULL,
580 PRIMARY KEY (object_key, column_name),
581 FOREIGN KEY (object_key) REFERENCES typecheck_objects(object_key)
582 );
583 CREATE TABLE IF NOT EXISTS project_databases (
584 name TEXT PRIMARY KEY
585 );
586 CREATE TABLE IF NOT EXISTS project_schemas (
587 database TEXT NOT NULL,
588 name TEXT NOT NULL,
589 schema_type TEXT NOT NULL,
590 PRIMARY KEY (database, name)
591 );
592 CREATE TABLE IF NOT EXISTS project_objects (
593 object_key TEXT PRIMARY KEY,
594 database TEXT NOT NULL,
595 schema TEXT NOT NULL,
596 name TEXT NOT NULL,
597 object_kind TEXT NOT NULL,
598 cluster TEXT,
599 file_path TEXT NOT NULL,
600 sql_text TEXT NOT NULL
601 );
602 CREATE TABLE IF NOT EXISTS project_dependencies (
603 object_key TEXT NOT NULL,
604 dependency_key TEXT NOT NULL,
605 PRIMARY KEY (object_key, dependency_key)
606 );
607 CREATE TABLE IF NOT EXISTS project_external_dependencies (
608 object_key TEXT NOT NULL PRIMARY KEY
609 );
610 CREATE TABLE IF NOT EXISTS project_comments (
611 object_key TEXT NOT NULL,
612 comment_type TEXT NOT NULL,
613 target_column TEXT,
614 comment_text TEXT NOT NULL,
615 sql_text TEXT NOT NULL,
616 PRIMARY KEY (object_key, comment_type, target_column)
617 );
618 CREATE TABLE IF NOT EXISTS project_indexes (
619 object_key TEXT NOT NULL,
620 index_name TEXT,
621 cluster TEXT,
622 columns TEXT NOT NULL,
623 sql_text TEXT NOT NULL,
624 PRIMARY KEY (object_key, index_name)
625 );
626 CREATE TABLE IF NOT EXISTS project_grants (
627 object_key TEXT NOT NULL,
628 privilege TEXT NOT NULL,
629 grantee TEXT NOT NULL,
630 sql_text TEXT NOT NULL,
631 PRIMARY KEY (object_key, privilege, grantee)
632 );
633 CREATE TABLE IF NOT EXISTS project_tests (
634 object_key TEXT NOT NULL,
635 test_name TEXT NOT NULL,
636 sql_text TEXT NOT NULL,
637 PRIMARY KEY (object_key, test_name)
638 );
639 CREATE TABLE IF NOT EXISTS project_infrastructure (
640 object_key TEXT NOT NULL PRIMARY KEY,
641 infra_type TEXT NOT NULL,
642 connector_type TEXT,
643 connection_ref TEXT,
644 source_ref TEXT,
645 external_reference TEXT
646 );
647 CREATE TABLE IF NOT EXISTS project_infrastructure_properties (
648 object_key TEXT NOT NULL,
649 property_key TEXT NOT NULL,
650 property_value TEXT NOT NULL,
651 secret_ref TEXT,
652 object_ref TEXT,
653 PRIMARY KEY (object_key, property_key)
654 );
655 CREATE TABLE IF NOT EXISTS project_aliases (
656 object_key TEXT NOT NULL,
657 alias TEXT NOT NULL,
658 target_fqn TEXT NOT NULL,
659 PRIMARY KEY (object_key, alias)
660 );
661 CREATE TABLE IF NOT EXISTS project_mod_statements (
662 database TEXT NOT NULL,
663 schema TEXT,
664 position INTEGER NOT NULL,
665 sql_text TEXT NOT NULL,
666 PRIMARY KEY (database, schema, position)
667 );
668 ",
669 )
670 .unwrap();
671 conn
672 }
673
674 fn open_cache(path: &Path) -> ProjectCache {
675 ProjectCache {
676 conn: Connection::open_with_flags(
677 path,
678 OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
679 )
680 .unwrap(),
681 }
682 }
683
684 #[cfg_attr(miri, ignore)] #[mz_ore::test]
686 fn test_open_returns_none_when_no_db() {
687 let dir = tempfile::tempdir().unwrap();
688 let result = ProjectCache::open(dir.path(), "default", None, &BTreeMap::new());
689 assert!(result.is_ok());
690 assert!(result.unwrap().is_none());
691 }
692
693 #[cfg_attr(miri, ignore)] #[mz_ore::test]
695 fn test_get_columns_found() {
696 let dir = tempfile::tempdir().unwrap();
697 let db_path = dir.path().join("test.db");
698 let conn = create_test_db(&db_path);
699 conn.execute(
700 "INSERT INTO typecheck_objects (object_key, object_kind) VALUES (?1, ?2)",
701 params!["db.schema.my_view", "view"],
702 )
703 .unwrap();
704 conn.execute(
705 "INSERT INTO typecheck_columns (object_key, column_name, column_type, nullable, position) \
706 VALUES (?1, ?2, ?3, ?4, ?5)",
707 params!["db.schema.my_view", "id", "integer", 0, 1],
708 )
709 .unwrap();
710 conn.execute(
711 "INSERT INTO typecheck_columns (object_key, column_name, column_type, nullable, position) \
712 VALUES (?1, ?2, ?3, ?4, ?5)",
713 params!["db.schema.my_view", "name", "text", 1, 2],
714 )
715 .unwrap();
716 drop(conn);
717
718 let cache = ProjectCache {
719 conn: Connection::open_with_flags(
720 &db_path,
721 OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
722 )
723 .unwrap(),
724 };
725 let columns = cache
726 .get_columns(&"db.schema.my_view".parse::<ObjectId>().unwrap())
727 .unwrap();
728 assert_eq!(columns.len(), 2);
729
730 let id_col = &columns["id"];
731 assert_eq!(id_col.r#type, "integer");
732 assert!(!id_col.nullable);
733 assert_eq!(id_col.position, 1);
734
735 let name_col = &columns["name"];
736 assert_eq!(name_col.r#type, "text");
737 assert!(name_col.nullable);
738 assert_eq!(name_col.position, 2);
739 }
740
741 #[cfg_attr(miri, ignore)] #[mz_ore::test]
743 fn test_get_columns_not_found() {
744 let dir = tempfile::tempdir().unwrap();
745 let db_path = dir.path().join("test.db");
746 let _conn = create_test_db(&db_path);
747
748 let cache = ProjectCache {
749 conn: Connection::open_with_flags(
750 &db_path,
751 OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
752 )
753 .unwrap(),
754 };
755 assert!(
756 cache
757 .get_columns(&"nonexistent.object.x".parse::<ObjectId>().unwrap())
758 .is_none()
759 );
760 }
761
762 #[cfg_attr(miri, ignore)] #[mz_ore::test]
764 fn test_get_kind_found() {
765 let dir = tempfile::tempdir().unwrap();
766 let db_path = dir.path().join("test.db");
767 let conn = create_test_db(&db_path);
768 conn.execute(
769 "INSERT INTO typecheck_objects (object_key, object_kind) VALUES (?1, ?2)",
770 params!["db.schema.my_mv", "materialized-view"],
771 )
772 .unwrap();
773 drop(conn);
774
775 let cache = ProjectCache {
776 conn: Connection::open_with_flags(
777 &db_path,
778 OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
779 )
780 .unwrap(),
781 };
782 assert_eq!(
783 cache.get_kind(&"db.schema.my_mv".parse::<ObjectId>().unwrap()),
784 Some(ObjectKind::MaterializedView)
785 );
786 }
787
788 #[cfg_attr(miri, ignore)] #[mz_ore::test]
790 fn test_get_kind_not_found() {
791 let dir = tempfile::tempdir().unwrap();
792 let db_path = dir.path().join("test.db");
793 let _conn = create_test_db(&db_path);
794
795 let cache = ProjectCache {
796 conn: Connection::open_with_flags(
797 &db_path,
798 OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
799 )
800 .unwrap(),
801 };
802 assert!(
803 cache
804 .get_kind(&"nonexistent.object.x".parse::<ObjectId>().unwrap())
805 .is_none()
806 );
807 }
808
809 #[cfg_attr(miri, ignore)] #[mz_ore::test]
811 fn test_get_column_names_batch() {
812 let dir = tempfile::tempdir().unwrap();
813 let db_path = dir.path().join("test.db");
814 let conn = create_test_db(&db_path);
815
816 conn.execute(
817 "INSERT INTO typecheck_objects (object_key, object_kind) VALUES (?1, ?2)",
818 params!["db.schema.obj_a", "view"],
819 )
820 .unwrap();
821 conn.execute(
822 "INSERT INTO typecheck_objects (object_key, object_kind) VALUES (?1, ?2)",
823 params!["db.schema.obj_b", "table"],
824 )
825 .unwrap();
826 conn.execute(
827 "INSERT INTO typecheck_columns (object_key, column_name, column_type, nullable, position) \
828 VALUES (?1, ?2, ?3, ?4, ?5)",
829 params!["db.schema.obj_a", "Col_X", "integer", 0, 1],
830 )
831 .unwrap();
832 conn.execute(
833 "INSERT INTO typecheck_columns (object_key, column_name, column_type, nullable, position) \
834 VALUES (?1, ?2, ?3, ?4, ?5)",
835 params!["db.schema.obj_b", "Col_Y", "text", 1, 1],
836 )
837 .unwrap();
838 drop(conn);
839
840 let cache = ProjectCache {
841 conn: Connection::open_with_flags(
842 &db_path,
843 OpenFlags::SQLITE_OPEN_READ_ONLY | OpenFlags::SQLITE_OPEN_NO_MUTEX,
844 )
845 .unwrap(),
846 };
847
848 let id_a: ObjectId = "db.schema.obj_a".parse().unwrap();
849 let id_b: ObjectId = "db.schema.obj_b".parse().unwrap();
850 let result = cache.get_column_names(&[&id_a, &id_b]);
851 assert_eq!(result.len(), 2);
852 assert!(result["db.schema.obj_a"].contains("col_x"));
853 assert!(result["db.schema.obj_b"].contains("col_y"));
854 }
855
856 fn insert_sample_project(conn: &Connection) {
858 conn.execute(
859 "INSERT INTO project_databases (name) VALUES (?1)",
860 params!["mydb"],
861 )
862 .unwrap();
863 conn.execute(
864 "INSERT INTO project_schemas (database, name, schema_type) VALUES (?1, ?2, ?3)",
865 params!["mydb", "public", "user"],
866 )
867 .unwrap();
868 conn.execute(
869 "INSERT INTO project_objects (object_key, database, schema, name, object_kind, cluster, file_path, sql_text) \
870 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
871 params![
872 "mydb.public.orders",
873 "mydb",
874 "public",
875 "orders",
876 "materialized-view",
877 "compute",
878 "sql/orders.sql",
879 "CREATE MATERIALIZED VIEW orders AS SELECT 1",
880 ],
881 )
882 .unwrap();
883 conn.execute(
884 "INSERT INTO project_objects (object_key, database, schema, name, object_kind, cluster, file_path, sql_text) \
885 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
886 params![
887 "mydb.public.users",
888 "mydb",
889 "public",
890 "users",
891 "view",
892 None::<String>,
893 "sql/users.sql",
894 "CREATE VIEW users AS SELECT 1",
895 ],
896 )
897 .unwrap();
898 conn.execute(
899 "INSERT INTO project_comments (object_key, comment_type, target_column, comment_text, sql_text) \
900 VALUES (?1, ?2, ?3, ?4, ?5)",
901 params![
902 "mydb.public.orders",
903 "object",
904 None::<String>,
905 "Order data",
906 "COMMENT ON MATERIALIZED VIEW orders IS 'Order data'"
907 ],
908 )
909 .unwrap();
910 conn.execute(
911 "INSERT INTO project_indexes (object_key, index_name, cluster, columns, sql_text) \
912 VALUES (?1, ?2, ?3, ?4, ?5)",
913 params![
914 "mydb.public.orders",
915 "orders_id_idx",
916 "compute",
917 "id",
918 "CREATE INDEX orders_id_idx ON orders (id)"
919 ],
920 )
921 .unwrap();
922 conn.execute(
923 "INSERT INTO project_grants (object_key, privilege, grantee, sql_text) \
924 VALUES (?1, ?2, ?3, ?4)",
925 params![
926 "mydb.public.orders",
927 "SELECT",
928 "reader_role",
929 "GRANT SELECT ON orders TO reader_role"
930 ],
931 )
932 .unwrap();
933 conn.execute(
934 "INSERT INTO project_aliases (object_key, alias, target_fqn) VALUES (?1, ?2, ?3)",
935 params!["mydb.public.orders", "raw_orders", "ext.public.raw_orders"],
936 )
937 .unwrap();
938 conn.execute(
939 "INSERT INTO project_aliases (object_key, alias, target_fqn) VALUES (?1, ?2, ?3)",
940 params![
941 "mydb.public.orders",
942 "order_items",
943 "ext.public.order_items"
944 ],
945 )
946 .unwrap();
947 conn.execute(
948 "INSERT INTO project_infrastructure (object_key, infra_type, connector_type, connection_ref, source_ref, external_reference) \
949 VALUES (?1, ?2, ?3, ?4, ?5, ?6)",
950 params![
951 "mydb.public.orders",
952 "source",
953 "postgres",
954 "mydb.public.pg_conn",
955 None::<String>,
956 None::<String>
957 ],
958 )
959 .unwrap();
960 conn.execute(
961 "INSERT INTO project_infrastructure_properties (object_key, property_key, property_value, secret_ref, object_ref) \
962 VALUES (?1, ?2, ?3, ?4, ?5)",
963 params![
964 "mydb.public.orders",
965 "PUBLICATION",
966 "mz_source",
967 None::<String>,
968 None::<String>
969 ],
970 )
971 .unwrap();
972 conn.execute(
973 "INSERT INTO project_dependencies (object_key, dependency_key) VALUES (?1, ?2)",
974 params!["mydb.public.orders", "mydb.public.users"],
975 )
976 .unwrap();
977 conn.execute(
978 "INSERT INTO project_external_dependencies (object_key) VALUES (?1)",
979 params!["ext.public.raw_data"],
980 )
981 .unwrap();
982 conn.execute(
983 "INSERT INTO project_tests (object_key, test_name, sql_text) VALUES (?1, ?2, ?3)",
984 params![
985 "mydb.public.orders",
986 "test_orders_not_empty",
987 "SELECT count(*) > 0 FROM orders"
988 ],
989 )
990 .unwrap();
991 conn.execute(
992 "INSERT INTO project_mod_statements (database, schema, position, sql_text) \
993 VALUES (?1, ?2, ?3, ?4)",
994 params!["mydb", None::<String>, 0, "CREATE DATABASE mydb"],
995 )
996 .unwrap();
997 conn.execute(
998 "INSERT INTO project_mod_statements (database, schema, position, sql_text) \
999 VALUES (?1, ?2, ?3, ?4)",
1000 params!["mydb", "public", 0, "CREATE SCHEMA public"],
1001 )
1002 .unwrap();
1003 }
1004
1005 #[cfg_attr(miri, ignore)] #[mz_ore::test]
1007 fn test_get_object_full_metadata() {
1008 let dir = tempfile::tempdir().unwrap();
1009 let db_path = dir.path().join("test.db");
1010 let conn = create_test_db(&db_path);
1011 insert_sample_project(&conn);
1012 drop(conn);
1013
1014 let cache = open_cache(&db_path);
1015 let obj = cache
1016 .get_object(&"mydb.public.orders".parse::<ObjectId>().unwrap())
1017 .unwrap();
1018
1019 assert_eq!(obj.fqn, "mydb.public.orders");
1020 assert_eq!(obj.database, "mydb");
1021 assert_eq!(obj.schema, "public");
1022 assert_eq!(obj.name, "orders");
1023 assert_eq!(obj.kind, ObjectKind::MaterializedView);
1024 assert_eq!(obj.cluster.as_deref(), Some("compute"));
1025 assert_eq!(obj.file_path, "sql/orders.sql");
1026
1027 assert_eq!(obj.comments.len(), 1);
1028 assert_eq!(obj.comments[0].comment_type, "object");
1029 assert_eq!(obj.comments[0].text, "Order data");
1030
1031 assert_eq!(obj.indexes.len(), 1);
1032 assert_eq!(obj.indexes[0].name, "orders_id_idx");
1033
1034 assert_eq!(obj.grants.len(), 1);
1035 assert_eq!(obj.grants[0].privilege, "SELECT");
1036 assert_eq!(obj.grants[0].grantee, "reader_role");
1037
1038 assert_eq!(obj.aliases.len(), 2);
1039 assert_eq!(obj.aliases["order_items"], "ext.public.order_items");
1040 assert_eq!(obj.aliases["raw_orders"], "ext.public.raw_orders");
1041
1042 let infra = obj.infrastructure.unwrap();
1043 assert_eq!(infra.infra_type, "source");
1044 assert_eq!(infra.connector_type.as_deref(), Some("postgres"));
1045 assert_eq!(infra.connection_ref.as_deref(), Some("mydb.public.pg_conn"));
1046 assert_eq!(infra.properties.len(), 1);
1047 assert_eq!(infra.properties[0].key, "PUBLICATION");
1048 }
1049
1050 #[cfg_attr(miri, ignore)] #[mz_ore::test]
1052 fn test_get_object_not_found() {
1053 let dir = tempfile::tempdir().unwrap();
1054 let db_path = dir.path().join("test.db");
1055 let conn = create_test_db(&db_path);
1056 insert_sample_project(&conn);
1057 drop(conn);
1058
1059 let cache = open_cache(&db_path);
1060 assert!(
1061 cache
1062 .get_object(&"nonexistent.x.y".parse::<ObjectId>().unwrap())
1063 .is_none()
1064 );
1065 }
1066
1067 #[cfg_attr(miri, ignore)] #[mz_ore::test]
1069 fn test_get_object_by_path() {
1070 let dir = tempfile::tempdir().unwrap();
1071 let db_path = dir.path().join("test.db");
1072 let conn = create_test_db(&db_path);
1073 insert_sample_project(&conn);
1074 drop(conn);
1075
1076 let cache = open_cache(&db_path);
1077 let obj = cache.get_object_by_path("sql/orders.sql").unwrap();
1078 assert_eq!(obj.fqn, "mydb.public.orders");
1079 }
1080
1081 #[cfg_attr(miri, ignore)] #[mz_ore::test]
1083 fn test_list_objects() {
1084 let dir = tempfile::tempdir().unwrap();
1085 let db_path = dir.path().join("test.db");
1086 let conn = create_test_db(&db_path);
1087 insert_sample_project(&conn);
1088 drop(conn);
1089
1090 let cache = open_cache(&db_path);
1091 let objects = cache.list_objects();
1092 assert_eq!(objects.len(), 2);
1093
1094 let fqns: Vec<&str> = objects.iter().map(|o| o.fqn.as_str()).collect();
1095 assert!(fqns.contains(&"mydb.public.orders"));
1096 assert!(fqns.contains(&"mydb.public.users"));
1097 }
1098
1099 #[cfg_attr(miri, ignore)] #[mz_ore::test]
1101 fn test_list_databases() {
1102 let dir = tempfile::tempdir().unwrap();
1103 let db_path = dir.path().join("test.db");
1104 let conn = create_test_db(&db_path);
1105 insert_sample_project(&conn);
1106 drop(conn);
1107
1108 let cache = open_cache(&db_path);
1109 let databases = cache.list_databases_with_objects();
1110 assert_eq!(databases.len(), 1);
1111 assert_eq!(databases[0].name, "mydb");
1112 assert_eq!(databases[0].schemas.len(), 1);
1113 assert_eq!(databases[0].schemas[0].name, "public");
1114 assert_eq!(databases[0].schemas[0].schema_type, "user");
1115 assert_eq!(databases[0].schemas[0].objects.len(), 2);
1116
1117 let fqns: Vec<&str> = databases[0].schemas[0]
1118 .objects
1119 .iter()
1120 .map(|o| o.fqn.as_str())
1121 .collect();
1122 assert!(fqns.contains(&"mydb.public.orders"));
1123 assert!(fqns.contains(&"mydb.public.users"));
1124 }
1125
1126 #[cfg_attr(miri, ignore)] #[mz_ore::test]
1128 fn test_get_dependencies_and_dependents() {
1129 let dir = tempfile::tempdir().unwrap();
1130 let db_path = dir.path().join("test.db");
1131 let conn = create_test_db(&db_path);
1132 insert_sample_project(&conn);
1133 drop(conn);
1134
1135 let cache = open_cache(&db_path);
1136
1137 let orders: ObjectId = "mydb.public.orders".parse().unwrap();
1138 let users: ObjectId = "mydb.public.users".parse().unwrap();
1139
1140 let deps = cache.get_dependencies(&orders);
1141 assert_eq!(deps, vec![users.clone()]);
1142
1143 let dependents = cache.get_dependents(&users);
1144 assert_eq!(dependents, vec![orders.clone()]);
1145
1146 assert!(cache.get_dependencies(&users).is_empty());
1147 }
1148
1149 #[cfg_attr(miri, ignore)] #[mz_ore::test]
1151 fn test_list_external_dependencies() {
1152 let dir = tempfile::tempdir().unwrap();
1153 let db_path = dir.path().join("test.db");
1154 let conn = create_test_db(&db_path);
1155 insert_sample_project(&conn);
1156 drop(conn);
1157
1158 let cache = open_cache(&db_path);
1159 let ext = cache.list_external_dependencies();
1160 assert_eq!(
1161 ext,
1162 vec!["ext.public.raw_data".parse::<ObjectId>().unwrap()]
1163 );
1164 }
1165
1166 #[cfg_attr(miri, ignore)] #[mz_ore::test]
1168 fn test_get_tests() {
1169 let dir = tempfile::tempdir().unwrap();
1170 let db_path = dir.path().join("test.db");
1171 let conn = create_test_db(&db_path);
1172 insert_sample_project(&conn);
1173 drop(conn);
1174
1175 let cache = open_cache(&db_path);
1176 let orders: ObjectId = "mydb.public.orders".parse().unwrap();
1177 let users: ObjectId = "mydb.public.users".parse().unwrap();
1178
1179 let tests = cache.get_tests(&orders);
1180 assert_eq!(tests.len(), 1);
1181 assert_eq!(tests[0].name, "test_orders_not_empty");
1182
1183 assert!(cache.get_tests(&users).is_empty());
1184 }
1185
1186 #[cfg_attr(miri, ignore)] #[mz_ore::test]
1188 fn test_get_mod_statements() {
1189 let dir = tempfile::tempdir().unwrap();
1190 let db_path = dir.path().join("test.db");
1191 let conn = create_test_db(&db_path);
1192 insert_sample_project(&conn);
1193 drop(conn);
1194
1195 let cache = open_cache(&db_path);
1196
1197 let db_mods = cache.get_mod_statements("mydb", None);
1198 assert_eq!(db_mods, vec!["CREATE DATABASE mydb"]);
1199
1200 let schema_mods = cache.get_mod_statements("mydb", Some("public"));
1201 assert_eq!(schema_mods, vec!["CREATE SCHEMA public"]);
1202
1203 assert!(cache.get_mod_statements("unknown", None).is_empty());
1204 }
1205}