1use super::super::ast::{Cluster, Statement};
48use crate::project::ir::object_id::ObjectId;
49use crate::project::ir::{
50 compiled,
51 graph::{Database, DatabaseObject, Project, Schema, SchemaType},
52 unit_test::UnitTest,
53};
54use crate::project::resolve::cte_scope::CteScope;
55use mz_sql_parser::ast::visit::{self, Visit};
56use mz_sql_parser::ast::*;
57use rayon::prelude::*;
58use std::collections::{BTreeMap, BTreeSet};
59
60fn determine_schema_type(objects: &[DatabaseObject]) -> SchemaType {
70 if objects.is_empty() {
71 return SchemaType::Empty;
72 }
73
74 match &objects[0].typed_object.stmt {
77 Statement::CreateTable(_)
78 | Statement::CreateTableFromSource(_)
79 | Statement::CreateSource(_)
80 | Statement::CreateSink(_)
81 | Statement::CreateSecret(_)
82 | Statement::CreateConnection(_) => SchemaType::Storage,
83 Statement::CreateView(_) | Statement::CreateMaterializedView(_) => SchemaType::Compute,
84 }
85}
86
87struct TypedObjectTask {
90 db_name: String,
91 schema_name: String,
92 typed_obj: compiled::DatabaseObject,
93}
94
95struct ProcessedObject {
97 db_name: String,
98 schema_name: String,
99 object_id: ObjectId,
100 typed_object: compiled::DatabaseObject,
101 dependencies: BTreeSet<ObjectId>,
102 clusters: BTreeSet<Cluster>,
103 tests: Vec<(ObjectId, UnitTest)>,
104}
105
106impl From<compiled::Project> for Project {
107 fn from(compiled_project: compiled::Project) -> Self {
120 let mut object_tasks = Vec::new();
122 let mut defined_objects = BTreeSet::new();
123
124 struct DbMeta {
126 name: String,
127 mod_statements: Option<Vec<mz_sql_parser::ast::Statement<Raw>>>,
128 schema_metas: Vec<SchemaMeta>,
129 }
130 struct SchemaMeta {
131 name: String,
132 mod_statements: Option<Vec<mz_sql_parser::ast::Statement<Raw>>>,
133 }
134 let mut db_metas: Vec<DbMeta> = Vec::new();
135
136 for typed_db in &compiled_project.databases {
137 for typed_schema in &typed_db.schemas {
138 for typed_obj in &typed_schema.objects {
139 let object_id = ObjectId::new(
140 typed_db.name.clone(),
141 typed_schema.name.clone(),
142 typed_obj.stmt.ident().object.as_str().to_string(),
143 );
144 defined_objects.insert(object_id);
145 }
146 }
147 }
148
149 for typed_db in compiled_project.databases {
150 let mut schema_metas = Vec::new();
151
152 for typed_schema in typed_db.schemas {
153 schema_metas.push(SchemaMeta {
154 name: typed_schema.name.clone(),
155 mod_statements: typed_schema.mod_statements,
156 });
157
158 for typed_obj in typed_schema.objects {
159 object_tasks.push(TypedObjectTask {
160 db_name: typed_db.name.clone(),
161 schema_name: typed_schema.name.clone(),
162 typed_obj,
163 });
164 }
165 }
166
167 db_metas.push(DbMeta {
168 name: typed_db.name,
169 mod_statements: typed_db.mod_statements,
170 schema_metas,
171 });
172 }
173
174 let processed: Vec<ProcessedObject> = object_tasks
176 .into_par_iter()
177 .map(|task| {
178 let object_id = ObjectId::new(
179 task.db_name.clone(),
180 task.schema_name.clone(),
181 task.typed_obj.stmt.ident().object.as_str().to_string(),
182 );
183
184 let (dependencies, clusters) =
185 extract_dependencies(&task.typed_obj.stmt, &task.db_name, &task.schema_name);
186
187 let tests: Vec<_> = task
189 .typed_obj
190 .tests
191 .iter()
192 .map(|test_stmt| {
193 let unit_test = UnitTest::from_execute_statement(test_stmt);
194 (object_id.clone(), unit_test)
195 })
196 .collect();
197
198 ProcessedObject {
199 db_name: task.db_name,
200 schema_name: task.schema_name,
201 object_id,
202 typed_object: task.typed_obj,
203 dependencies,
204 clusters,
205 tests,
206 }
207 })
208 .collect();
209
210 let mut dependency_graph = BTreeMap::new();
212 let mut external_dependencies = BTreeSet::new();
213 let mut cluster_dependencies = BTreeSet::new();
214 let mut tests = Vec::new();
215
216 let mut objects_by_location: BTreeMap<(String, String), Vec<DatabaseObject>> =
218 BTreeMap::new();
219
220 for po in processed {
221 for cluster in po.clusters {
223 cluster_dependencies.insert(cluster);
224 }
225
226 for dep in &po.dependencies {
228 if !defined_objects.contains(dep) {
229 external_dependencies.insert(dep.clone());
230 }
231 }
232
233 dependency_graph.insert(po.object_id.clone(), po.dependencies.clone());
234 tests.extend(po.tests);
235
236 objects_by_location
237 .entry((po.db_name.clone(), po.schema_name.clone()))
238 .or_default()
239 .push(DatabaseObject {
240 id: po.object_id,
241 typed_object: po.typed_object,
242 dependencies: po.dependencies,
243 });
244 }
245
246 let mut databases = Vec::new();
248
249 for meta in db_metas {
250 let mut schemas = Vec::new();
251
252 for schema_meta in meta.schema_metas {
253 let objects = objects_by_location
254 .remove(&(meta.name.clone(), schema_meta.name.clone()))
255 .unwrap_or_default();
256
257 let schema_type = determine_schema_type(&objects);
258
259 schemas.push(Schema {
260 name: schema_meta.name,
261 objects,
262 mod_statements: schema_meta.mod_statements,
263 schema_type,
264 });
265 }
266
267 databases.push(Database {
268 name: meta.name,
269 schemas,
270 mod_statements: meta.mod_statements,
271 });
272 }
273
274 Project {
275 databases,
276 dependency_graph,
277 external_dependencies,
278 cluster_dependencies,
279 tests,
280 replacement_schemas: compiled_project.replacement_schemas,
281 compile_dirty: BTreeSet::new(),
282 }
283 }
284}
285
286pub(crate) fn extract_external_indexes(
290 object: &DatabaseObject,
291) -> Vec<(Cluster, CreateIndexStatement<Raw>)> {
292 match &object.typed_object.stmt {
293 Statement::CreateMaterializedView(materialized_view) => {
294 let mv_cluster =
295 Cluster::new(materialized_view.in_cluster.clone().unwrap().to_string());
296
297 object
298 .typed_object
299 .indexes
300 .iter()
301 .filter_map(|index| {
302 let index_cluster = Cluster::new(index.in_cluster.clone().unwrap().to_string());
303
304 (mv_cluster != index_cluster).then(|| (index_cluster, index.clone()))
305 })
306 .collect()
307 }
308 _ => object
309 .typed_object
310 .indexes
311 .iter()
312 .map(|index| {
313 let cluster = Cluster::new(index.in_cluster.clone().unwrap().to_string());
314 (cluster, index.clone())
315 })
316 .collect(),
317 }
318}
319
320struct DependencyVisitor<'a> {
327 default_database: &'a str,
328 default_schema: &'a str,
329 deps: BTreeSet<ObjectId>,
330 cte_scope: CteScope,
331}
332
333impl<'a> DependencyVisitor<'a> {
334 fn new(default_database: &'a str, default_schema: &'a str) -> Self {
335 Self {
336 default_database,
337 default_schema,
338 deps: BTreeSet::new(),
339 cte_scope: CteScope::new(),
340 }
341 }
342}
343
344impl<'ast> Visit<'ast, Raw> for DependencyVisitor<'_> {
345 fn visit_query(&mut self, node: &'ast Query<Raw>) {
346 if matches!(node.ctes, CteBlock::Simple(_)) {
352 self.cte_scope.push(BTreeSet::new());
353 if let CteBlock::Simple(ctes) = &node.ctes {
354 for cte in ctes {
355 self.visit_query(&cte.query);
357 self.cte_scope.insert_current(cte.alias.name.to_string());
358 }
359 }
360 self.visit_set_expr(&node.body);
363 for order_by in &node.order_by {
364 self.visit_order_by_expr(order_by);
365 }
366 if let Some(limit) = &node.limit {
367 self.visit_limit(limit);
368 }
369 if let Some(offset) = &node.offset {
370 self.visit_expr(offset);
371 }
372 self.cte_scope.pop();
373 } else {
374 let names = CteScope::collect_cte_names(&node.ctes);
375 self.cte_scope.push(names);
376 visit::visit_query(self, node);
377 self.cte_scope.pop();
378 }
379 }
380
381 fn visit_table_factor(&mut self, node: &'ast TableFactor<Raw>) {
382 match node {
383 TableFactor::Table { name, .. } => {
384 let unresolved = name.name();
385 if unresolved.0.len() == 1 && self.cte_scope.is_cte(&unresolved.0[0].to_string()) {
386 return;
387 }
388 self.deps.insert(ObjectId::from_raw_item_name(
389 name,
390 self.default_database,
391 self.default_schema,
392 ));
393 }
395 _ => visit::visit_table_factor(self, node),
396 }
397 }
398}
399
400pub(crate) fn extract_dependencies(
407 stmt: &Statement,
408 default_database: &str,
409 default_schema: &str,
410) -> (BTreeSet<ObjectId>, BTreeSet<Cluster>) {
411 let mut visitor = DependencyVisitor::new(default_database, default_schema);
412 let mut clusters = BTreeSet::new();
413
414 match stmt {
415 Statement::CreateView(s) => {
416 visitor.visit_query(&s.definition.query);
417 }
418 Statement::CreateMaterializedView(s) => {
419 visitor.visit_query(&s.query);
420
421 if let Some(ref cluster_name) = s.in_cluster {
423 clusters.insert(Cluster::new(cluster_name.to_string()));
424 }
425 }
426 Statement::CreateTableFromSource(s) => {
427 let source_id =
429 ObjectId::from_raw_item_name(&s.source, default_database, default_schema);
430 visitor.deps.insert(source_id);
431 }
432 Statement::CreateSink(s) => {
433 let from_id = ObjectId::from_raw_item_name(&s.from, default_database, default_schema);
435 visitor.deps.insert(from_id);
436
437 if let Some(ref cluster_name) = s.in_cluster {
438 clusters.insert(Cluster::new(cluster_name.to_string()));
439 }
440 }
441 Statement::CreateSource(s) => {
442 extract_source_connection_dep(
444 &s.connection,
445 default_database,
446 default_schema,
447 &mut visitor.deps,
448 );
449
450 if let Some(ref cluster_name) = s.in_cluster {
451 clusters.insert(Cluster::new(cluster_name.to_string()));
452 }
453 }
454 Statement::CreateConnection(s) => {
455 extract_connection_option_deps(
456 &s.values,
457 default_database,
458 default_schema,
459 &mut visitor.deps,
460 );
461 }
462 Statement::CreateTable(_) | Statement::CreateSecret(_) => {}
464 }
465
466 (visitor.deps, clusters)
467}
468
469fn extract_source_connection_dep(
471 connection: &CreateSourceConnection<Raw>,
472 default_database: &str,
473 default_schema: &str,
474 deps: &mut BTreeSet<ObjectId>,
475) {
476 match connection {
477 CreateSourceConnection::Kafka { connection, .. }
478 | CreateSourceConnection::Postgres { connection, .. }
479 | CreateSourceConnection::SqlServer { connection, .. }
480 | CreateSourceConnection::MySql { connection, .. } => {
481 deps.insert(ObjectId::from_raw_item_name(
482 connection,
483 default_database,
484 default_schema,
485 ));
486 }
487 CreateSourceConnection::LoadGenerator { .. } => {}
488 }
489}
490
491fn extract_connection_option_deps(
493 options: &[ConnectionOption<Raw>],
494 default_database: &str,
495 default_schema: &str,
496 deps: &mut BTreeSet<ObjectId>,
497) {
498 for option in options {
499 if let Some(ref value) = option.value {
500 extract_with_option_value_deps(value, default_database, default_schema, deps);
501 }
502 }
503}
504
505pub(crate) struct DependencyValidation {
507 pub undeclared: BTreeSet<ObjectId>,
509 pub unused: BTreeSet<ObjectId>,
511}
512
513pub(crate) fn validate_dependencies(
520 declared: &BTreeSet<ObjectId>,
521 discovered: &BTreeSet<ObjectId>,
522) -> DependencyValidation {
523 DependencyValidation {
524 undeclared: discovered.difference(declared).cloned().collect(),
525 unused: declared.difference(discovered).cloned().collect(),
526 }
527}
528
529fn extract_with_option_value_deps(
531 value: &WithOptionValue<Raw>,
532 default_database: &str,
533 default_schema: &str,
534 deps: &mut BTreeSet<ObjectId>,
535) {
536 match value {
537 WithOptionValue::Secret(name) | WithOptionValue::Item(name) => {
538 deps.insert(ObjectId::from_raw_item_name(
539 name,
540 default_database,
541 default_schema,
542 ));
543 }
544 WithOptionValue::ConnectionAwsPrivatelink(pl) => {
545 deps.insert(ObjectId::from_raw_item_name(
546 &pl.connection,
547 default_database,
548 default_schema,
549 ));
550 }
551 WithOptionValue::ConnectionKafkaBroker(broker) => match &broker.tunnel {
552 KafkaBrokerTunnel::SshTunnel(name) => {
553 deps.insert(ObjectId::from_raw_item_name(
554 name,
555 default_database,
556 default_schema,
557 ));
558 }
559 KafkaBrokerTunnel::AwsPrivatelink(aws) => {
560 deps.insert(ObjectId::from_raw_item_name(
561 &aws.connection,
562 default_database,
563 default_schema,
564 ));
565 }
566 KafkaBrokerTunnel::Direct => {}
567 },
568 WithOptionValue::Sequence(items) => {
569 for item in items {
570 extract_with_option_value_deps(item, default_database, default_schema, deps);
571 }
572 }
573 _ => {}
574 }
575}
576
577#[cfg(test)]
578mod dependency_validation_tests {
579 use super::*;
580 use crate::project::ir::object_id::ObjectId;
581 use std::collections::BTreeSet;
582
583 fn oid(db: &str, schema: &str, obj: &str) -> ObjectId {
584 ObjectId::new(db.to_string(), schema.to_string(), obj.to_string())
585 }
586
587 #[mz_ore::test]
588 fn test_all_externals_declared() {
589 let declared = BTreeSet::from([
590 oid("ontology", "public", "customers"),
591 oid("ontology", "public", "orders"),
592 ]);
593 let discovered = BTreeSet::from([
594 oid("ontology", "public", "customers"),
595 oid("ontology", "public", "orders"),
596 ]);
597 let result = validate_dependencies(&declared, &discovered);
598 assert!(result.undeclared.is_empty());
599 assert!(result.unused.is_empty());
600 }
601
602 #[mz_ore::test]
603 fn test_undeclared_external_detected() {
604 let declared = BTreeSet::from([oid("ontology", "public", "customers")]);
605 let discovered = BTreeSet::from([
606 oid("ontology", "public", "customers"),
607 oid("ontology", "public", "orders"),
608 ]);
609 let result = validate_dependencies(&declared, &discovered);
610 assert_eq!(result.undeclared.len(), 1);
611 assert!(
612 result
613 .undeclared
614 .contains(&oid("ontology", "public", "orders"))
615 );
616 assert!(result.unused.is_empty());
617 }
618
619 #[mz_ore::test]
620 fn test_unused_declared_detected() {
621 let declared = BTreeSet::from([
622 oid("ontology", "public", "customers"),
623 oid("analytics", "public", "page_views"),
624 ]);
625 let discovered = BTreeSet::from([oid("ontology", "public", "customers")]);
626 let result = validate_dependencies(&declared, &discovered);
627 assert!(result.undeclared.is_empty());
628 assert_eq!(result.unused.len(), 1);
629 assert!(
630 result
631 .unused
632 .contains(&oid("analytics", "public", "page_views"))
633 );
634 }
635
636 #[mz_ore::test]
637 fn test_both_empty() {
638 let declared = BTreeSet::new();
639 let discovered = BTreeSet::new();
640 let result = validate_dependencies(&declared, &discovered);
641 assert!(result.undeclared.is_empty());
642 assert!(result.unused.is_empty());
643 }
644
645 #[mz_ore::test]
646 fn test_both_undeclared_and_unused() {
647 let declared = BTreeSet::from([oid("a", "b", "unused")]);
648 let discovered = BTreeSet::from([oid("x", "y", "undeclared")]);
649 let result = validate_dependencies(&declared, &discovered);
650 assert_eq!(result.undeclared.len(), 1);
651 assert_eq!(result.unused.len(), 1);
652 }
653
654 #[cfg_attr(miri, ignore)] #[mz_ore::test]
656 fn test_simple_cte_shadowing_records_catalog_dependency() {
657 use crate::project::syntax::parser::parse_statements;
663
664 let stmts = parse_statements(vec![
665 "CREATE VIEW v AS \
666 WITH products AS (SELECT id FROM products WHERE active) \
667 SELECT * FROM products",
668 ])
669 .unwrap();
670
671 let query = match &stmts[0] {
672 mz_sql_parser::ast::Statement::CreateView(c) => c.definition.query.clone(),
673 _ => panic!("expected CREATE VIEW"),
674 };
675
676 let mut visitor = DependencyVisitor::new("materialize", "public");
677 visitor.visit_query(&query);
678
679 assert!(
680 visitor
681 .deps
682 .contains(&oid("materialize", "public", "products")),
683 "dependency on the catalog table shadowed by the simple CTE should be \
684 collected, got: {:?}",
685 visitor.deps
686 );
687 }
688}