Skip to main content

mz_deploy/project/analysis/
deps.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Dependency extraction and graph assembly from compiled project state.
11//!
12//! This module walks the SQL AST of every object to build a project-wide
13//! dependency graph and determine schema types. Query-level traversal is
14//! delegated to mz-sql-parser's auto-generated [`Visit`] trait — a
15//! [`DependencyVisitor`] overrides `visit_query` (for CTE scope management)
16//! and `visit_table_factor` (to collect table references as dependencies).
17//! All other AST nodes (expressions, set operations, etc.) are handled by
18//! the default traversal.
19//!
20//! # CTE Scoping
21//!
22//! CTE references must be excluded from the dependency set because they
23//! are query-local names, not database objects. The visitor uses
24//! [`CteScope`] to track CTE names across nested queries. Simple CTE names are
25//! introduced incrementally (each body sees only earlier siblings), while
26//! mutually-recursive blocks push all names up front — mirroring Materialize's
27//! own resolver. A simple CTE that shadows a catalog object therefore still
28//! records a dependency on that object when it references it inside its own
29//! body (e.g. `WITH products AS (SELECT * FROM products) ...`).
30//!
31//! # Statement-Level Dispatch
32//!
33//! The top-level [`extract_dependencies`] function matches on statement type
34//! and calls `visitor.visit_query()` only on the relevant subtree (e.g., the
35//! query body of a CREATE VIEW, not the view name itself). Non-query
36//! dependencies (source connections, connection options) are extracted by
37//! dedicated helper functions.
38//!
39//! # Dependency Validation
40//!
41//! After graph assembly, `external_dependencies` (the set of references to
42//! objects not defined in the project) is validated against the dependencies
43//! declared in `project.toml` via [`validate_dependencies()`]. This cross-check
44//! produces two sets: undeclared external references (hard error) and declared
45//! dependencies that are never referenced (warning).
46
47use super::super::ast::{Cluster, Statement};
48use crate::project::ir::object_id::ObjectId;
49use crate::project::ir::{
50    compiled,
51    graph::{Database, DatabaseObject, Project, Schema, SchemaType},
52    unit_test::UnitTest,
53};
54use crate::project::resolve::cte_scope::CteScope;
55use mz_sql_parser::ast::visit::{self, Visit};
56use mz_sql_parser::ast::*;
57use rayon::prelude::*;
58use std::collections::{BTreeMap, BTreeSet};
59
60/// Determine the schema type based on the objects it contains.
61///
62/// Returns:
63/// - `SchemaType::Storage` if the schema contains tables, sinks, or tables from sources
64/// - `SchemaType::Compute` if the schema contains views or materialized views
65/// - `SchemaType::Empty` if the schema contains no objects
66///
67/// Note: Due to compiled-project validation, schemas cannot contain both storage
68/// and compute objects, so we only need to check the first object.
69fn determine_schema_type(objects: &[DatabaseObject]) -> SchemaType {
70    if objects.is_empty() {
71        return SchemaType::Empty;
72    }
73
74    // Check the first object to determine schema type
75    // Validation ensures all objects in a schema are the same type
76    match &objects[0].typed_object.stmt {
77        Statement::CreateTable(_)
78        | Statement::CreateTableFromSource(_)
79        | Statement::CreateSource(_)
80        | Statement::CreateSink(_)
81        | Statement::CreateSecret(_)
82        | Statement::CreateConnection(_) => SchemaType::Storage,
83        Statement::CreateView(_) | Statement::CreateMaterializedView(_) => SchemaType::Compute,
84    }
85}
86
87/// A flattened compiled object with its database/schema context,
88/// used as the unit of work for dependency extraction.
89struct TypedObjectTask {
90    db_name: String,
91    schema_name: String,
92    typed_obj: compiled::DatabaseObject,
93}
94
95/// The result of processing a single compiled object through dependency extraction.
96struct ProcessedObject {
97    db_name: String,
98    schema_name: String,
99    object_id: ObjectId,
100    typed_object: compiled::DatabaseObject,
101    dependencies: BTreeSet<ObjectId>,
102    clusters: BTreeSet<Cluster>,
103    tests: Vec<(ObjectId, UnitTest)>,
104}
105
106impl From<compiled::Project> for Project {
107    /// Converts a compiled project into a dependency-aware project graph.
108    ///
109    /// Graph assembly is structured as collect → process → reassemble:
110    ///
111    /// 1. **Collect** — Flatten all compiled objects into `TypedObjectTask`s and
112    ///    collect defined object IDs for external dependency detection.
113    ///
114    /// 2. **Process** — Extract dependencies and clusters from each object.
115    ///    This is the CPU-intensive step.
116    ///
117    /// 3. **Reassemble** — Merge results into the dependency graph and
118    ///    hierarchical `Project` structure.
119    fn from(compiled_project: compiled::Project) -> Self {
120        // Flatten all compiled objects and collect defined object IDs.
121        let mut object_tasks = Vec::new();
122        let mut defined_objects = BTreeSet::new();
123
124        // Track database/schema metadata for reassembly
125        struct DbMeta {
126            name: String,
127            mod_statements: Option<Vec<mz_sql_parser::ast::Statement<Raw>>>,
128            schema_metas: Vec<SchemaMeta>,
129        }
130        struct SchemaMeta {
131            name: String,
132            mod_statements: Option<Vec<mz_sql_parser::ast::Statement<Raw>>>,
133        }
134        let mut db_metas: Vec<DbMeta> = Vec::new();
135
136        for typed_db in &compiled_project.databases {
137            for typed_schema in &typed_db.schemas {
138                for typed_obj in &typed_schema.objects {
139                    let object_id = ObjectId::new(
140                        typed_db.name.clone(),
141                        typed_schema.name.clone(),
142                        typed_obj.stmt.ident().object.as_str().to_string(),
143                    );
144                    defined_objects.insert(object_id);
145                }
146            }
147        }
148
149        for typed_db in compiled_project.databases {
150            let mut schema_metas = Vec::new();
151
152            for typed_schema in typed_db.schemas {
153                schema_metas.push(SchemaMeta {
154                    name: typed_schema.name.clone(),
155                    mod_statements: typed_schema.mod_statements,
156                });
157
158                for typed_obj in typed_schema.objects {
159                    object_tasks.push(TypedObjectTask {
160                        db_name: typed_db.name.clone(),
161                        schema_name: typed_schema.name.clone(),
162                        typed_obj,
163                    });
164                }
165            }
166
167            db_metas.push(DbMeta {
168                name: typed_db.name,
169                mod_statements: typed_db.mod_statements,
170                schema_metas,
171            });
172        }
173
174        // Extract dependencies and clusters from each object.
175        let processed: Vec<ProcessedObject> = object_tasks
176            .into_par_iter()
177            .map(|task| {
178                let object_id = ObjectId::new(
179                    task.db_name.clone(),
180                    task.schema_name.clone(),
181                    task.typed_obj.stmt.ident().object.as_str().to_string(),
182                );
183
184                let (dependencies, clusters) =
185                    extract_dependencies(&task.typed_obj.stmt, &task.db_name, &task.schema_name);
186
187                // Collect tests
188                let tests: Vec<_> = task
189                    .typed_obj
190                    .tests
191                    .iter()
192                    .map(|test_stmt| {
193                        let unit_test = UnitTest::from_execute_statement(test_stmt);
194                        (object_id.clone(), unit_test)
195                    })
196                    .collect();
197
198                ProcessedObject {
199                    db_name: task.db_name,
200                    schema_name: task.schema_name,
201                    object_id,
202                    typed_object: task.typed_obj,
203                    dependencies,
204                    clusters,
205                    tests,
206                }
207            })
208            .collect();
209
210        // Merge results into dependency graph and hierarchical structure.
211        let mut dependency_graph = BTreeMap::new();
212        let mut external_dependencies = BTreeSet::new();
213        let mut cluster_dependencies = BTreeSet::new();
214        let mut tests = Vec::new();
215
216        // Group graph objects by (db_name, schema_name)
217        let mut objects_by_location: BTreeMap<(String, String), Vec<DatabaseObject>> =
218            BTreeMap::new();
219
220        for po in processed {
221            // Merge clusters
222            for cluster in po.clusters {
223                cluster_dependencies.insert(cluster);
224            }
225
226            // Check for external dependencies
227            for dep in &po.dependencies {
228                if !defined_objects.contains(dep) {
229                    external_dependencies.insert(dep.clone());
230                }
231            }
232
233            dependency_graph.insert(po.object_id.clone(), po.dependencies.clone());
234            tests.extend(po.tests);
235
236            objects_by_location
237                .entry((po.db_name.clone(), po.schema_name.clone()))
238                .or_default()
239                .push(DatabaseObject {
240                    id: po.object_id,
241                    typed_object: po.typed_object,
242                    dependencies: po.dependencies,
243                });
244        }
245
246        // Reassemble into hierarchical structure
247        let mut databases = Vec::new();
248
249        for meta in db_metas {
250            let mut schemas = Vec::new();
251
252            for schema_meta in meta.schema_metas {
253                let objects = objects_by_location
254                    .remove(&(meta.name.clone(), schema_meta.name.clone()))
255                    .unwrap_or_default();
256
257                let schema_type = determine_schema_type(&objects);
258
259                schemas.push(Schema {
260                    name: schema_meta.name,
261                    objects,
262                    mod_statements: schema_meta.mod_statements,
263                    schema_type,
264                });
265            }
266
267            databases.push(Database {
268                name: meta.name,
269                schemas,
270                mod_statements: meta.mod_statements,
271            });
272        }
273
274        Project {
275            databases,
276            dependency_graph,
277            external_dependencies,
278            cluster_dependencies,
279            tests,
280            replacement_schemas: compiled_project.replacement_schemas,
281            compile_dirty: BTreeSet::new(),
282        }
283    }
284}
285
286/// Find all external indexes on an object. That is,
287/// any index that lives on a different cluster than the one where
288/// the main object is installed.
289pub(crate) fn extract_external_indexes(
290    object: &DatabaseObject,
291) -> Vec<(Cluster, CreateIndexStatement<Raw>)> {
292    match &object.typed_object.stmt {
293        Statement::CreateMaterializedView(materialized_view) => {
294            let mv_cluster =
295                Cluster::new(materialized_view.in_cluster.clone().unwrap().to_string());
296
297            object
298                .typed_object
299                .indexes
300                .iter()
301                .filter_map(|index| {
302                    let index_cluster = Cluster::new(index.in_cluster.clone().unwrap().to_string());
303
304                    (mv_cluster != index_cluster).then(|| (index_cluster, index.clone()))
305                })
306                .collect()
307        }
308        _ => object
309            .typed_object
310            .indexes
311            .iter()
312            .map(|index| {
313                let cluster = Cluster::new(index.in_cluster.clone().unwrap().to_string());
314                (cluster, index.clone())
315            })
316            .collect(),
317    }
318}
319
320/// Visitor that collects table reference dependencies from query ASTs.
321///
322/// Overrides `visit_query` for CTE scope management and `visit_table_factor`
323/// to collect table references. All other traversal (expressions, set
324/// operations, subqueries) is handled by mz-sql-parser's auto-generated
325/// default implementations.
326struct DependencyVisitor<'a> {
327    default_database: &'a str,
328    default_schema: &'a str,
329    deps: BTreeSet<ObjectId>,
330    cte_scope: CteScope,
331}
332
333impl<'a> DependencyVisitor<'a> {
334    fn new(default_database: &'a str, default_schema: &'a str) -> Self {
335        Self {
336            default_database,
337            default_schema,
338            deps: BTreeSet::new(),
339            cte_scope: CteScope::new(),
340        }
341    }
342}
343
344impl<'ast> Visit<'ast, Raw> for DependencyVisitor<'_> {
345    fn visit_query(&mut self, node: &'ast Query<Raw>) {
346        // Mirror Materialize's name resolver (`fold_query` in
347        // `src/sql/src/names.rs`): a simple CTE's body is resolved with only its
348        // *earlier* siblings in scope, so a simple CTE whose name shadows a
349        // catalog object still depends on that object via references in its own
350        // body. Mutually-recursive blocks make every name visible up front.
351        if matches!(node.ctes, CteBlock::Simple(_)) {
352            self.cte_scope.push(BTreeSet::new());
353            if let CteBlock::Simple(ctes) = &node.ctes {
354                for cte in ctes {
355                    // Visit this body before its own name is in scope.
356                    self.visit_query(&cte.query);
357                    self.cte_scope.insert_current(cte.alias.name.to_string());
358                }
359            }
360            // The main query body sees all simple CTE names. Replicate the rest
361            // of the default `Query` traversal (body, order_by, limit, offset).
362            self.visit_set_expr(&node.body);
363            for order_by in &node.order_by {
364                self.visit_order_by_expr(order_by);
365            }
366            if let Some(limit) = &node.limit {
367                self.visit_limit(limit);
368            }
369            if let Some(offset) = &node.offset {
370                self.visit_expr(offset);
371            }
372            self.cte_scope.pop();
373        } else {
374            let names = CteScope::collect_cte_names(&node.ctes);
375            self.cte_scope.push(names);
376            visit::visit_query(self, node);
377            self.cte_scope.pop();
378        }
379    }
380
381    fn visit_table_factor(&mut self, node: &'ast TableFactor<Raw>) {
382        match node {
383            TableFactor::Table { name, .. } => {
384                let unresolved = name.name();
385                if unresolved.0.len() == 1 && self.cte_scope.is_cte(&unresolved.0[0].to_string()) {
386                    return;
387                }
388                self.deps.insert(ObjectId::from_raw_item_name(
389                    name,
390                    self.default_database,
391                    self.default_schema,
392                ));
393                // Don't call default — it would visit_item_name which we don't need
394            }
395            _ => visit::visit_table_factor(self, node),
396        }
397    }
398}
399
400/// Extract all dependencies from a statement.
401///
402/// Returns a tuple of (object_dependencies, cluster_dependencies).
403///
404/// This function is public to allow the changeset module to analyze
405/// cluster dependencies for incremental deployment.
406pub(crate) fn extract_dependencies(
407    stmt: &Statement,
408    default_database: &str,
409    default_schema: &str,
410) -> (BTreeSet<ObjectId>, BTreeSet<Cluster>) {
411    let mut visitor = DependencyVisitor::new(default_database, default_schema);
412    let mut clusters = BTreeSet::new();
413
414    match stmt {
415        Statement::CreateView(s) => {
416            visitor.visit_query(&s.definition.query);
417        }
418        Statement::CreateMaterializedView(s) => {
419            visitor.visit_query(&s.query);
420
421            // Extract cluster dependency from IN CLUSTER clause
422            if let Some(ref cluster_name) = s.in_cluster {
423                clusters.insert(Cluster::new(cluster_name.to_string()));
424            }
425        }
426        Statement::CreateTableFromSource(s) => {
427            // Table depends on the source it's created from
428            let source_id =
429                ObjectId::from_raw_item_name(&s.source, default_database, default_schema);
430            visitor.deps.insert(source_id);
431        }
432        Statement::CreateSink(s) => {
433            // Sink depends on the shard it reads from
434            let from_id = ObjectId::from_raw_item_name(&s.from, default_database, default_schema);
435            visitor.deps.insert(from_id);
436
437            if let Some(ref cluster_name) = s.in_cluster {
438                clusters.insert(Cluster::new(cluster_name.to_string()));
439            }
440        }
441        Statement::CreateSource(s) => {
442            // Source depends on its connection
443            extract_source_connection_dep(
444                &s.connection,
445                default_database,
446                default_schema,
447                &mut visitor.deps,
448            );
449
450            if let Some(ref cluster_name) = s.in_cluster {
451                clusters.insert(Cluster::new(cluster_name.to_string()));
452            }
453        }
454        Statement::CreateConnection(s) => {
455            extract_connection_option_deps(
456                &s.values,
457                default_database,
458                default_schema,
459                &mut visitor.deps,
460            );
461        }
462        // These don't have dependencies on other database objects
463        Statement::CreateTable(_) | Statement::CreateSecret(_) => {}
464    }
465
466    (visitor.deps, clusters)
467}
468
469/// Extract the connection dependency from a source's connection clause.
470fn extract_source_connection_dep(
471    connection: &CreateSourceConnection<Raw>,
472    default_database: &str,
473    default_schema: &str,
474    deps: &mut BTreeSet<ObjectId>,
475) {
476    match connection {
477        CreateSourceConnection::Kafka { connection, .. }
478        | CreateSourceConnection::Postgres { connection, .. }
479        | CreateSourceConnection::SqlServer { connection, .. }
480        | CreateSourceConnection::MySql { connection, .. } => {
481            deps.insert(ObjectId::from_raw_item_name(
482                connection,
483                default_database,
484                default_schema,
485            ));
486        }
487        CreateSourceConnection::LoadGenerator { .. } => {}
488    }
489}
490
491/// Extract dependencies from connection options (secrets, other connections).
492fn extract_connection_option_deps(
493    options: &[ConnectionOption<Raw>],
494    default_database: &str,
495    default_schema: &str,
496    deps: &mut BTreeSet<ObjectId>,
497) {
498    for option in options {
499        if let Some(ref value) = option.value {
500            extract_with_option_value_deps(value, default_database, default_schema, deps);
501        }
502    }
503}
504
505/// Result of validating declared dependencies against discovered external references.
506pub(crate) struct DependencyValidation {
507    /// External references found in SQL that are not declared in project.toml.
508    pub undeclared: BTreeSet<ObjectId>,
509    /// Dependencies declared in project.toml that no SQL object references.
510    pub unused: BTreeSet<ObjectId>,
511}
512
513/// Cross-reference declared dependencies (from project.toml) against discovered
514/// external references (from the compiled dependency graph).
515///
516/// Returns the set difference in both directions:
517/// - `undeclared`: discovered but not declared (should be a hard error)
518/// - `unused`: declared but not discovered (should be a warning)
519pub(crate) fn validate_dependencies(
520    declared: &BTreeSet<ObjectId>,
521    discovered: &BTreeSet<ObjectId>,
522) -> DependencyValidation {
523    DependencyValidation {
524        undeclared: discovered.difference(declared).cloned().collect(),
525        unused: declared.difference(discovered).cloned().collect(),
526    }
527}
528
529/// Extract dependencies from a single WithOptionValue, recursing into nested structures.
530fn extract_with_option_value_deps(
531    value: &WithOptionValue<Raw>,
532    default_database: &str,
533    default_schema: &str,
534    deps: &mut BTreeSet<ObjectId>,
535) {
536    match value {
537        WithOptionValue::Secret(name) | WithOptionValue::Item(name) => {
538            deps.insert(ObjectId::from_raw_item_name(
539                name,
540                default_database,
541                default_schema,
542            ));
543        }
544        WithOptionValue::ConnectionAwsPrivatelink(pl) => {
545            deps.insert(ObjectId::from_raw_item_name(
546                &pl.connection,
547                default_database,
548                default_schema,
549            ));
550        }
551        WithOptionValue::ConnectionKafkaBroker(broker) => match &broker.tunnel {
552            KafkaBrokerTunnel::SshTunnel(name) => {
553                deps.insert(ObjectId::from_raw_item_name(
554                    name,
555                    default_database,
556                    default_schema,
557                ));
558            }
559            KafkaBrokerTunnel::AwsPrivatelink(aws) => {
560                deps.insert(ObjectId::from_raw_item_name(
561                    &aws.connection,
562                    default_database,
563                    default_schema,
564                ));
565            }
566            KafkaBrokerTunnel::Direct => {}
567        },
568        WithOptionValue::Sequence(items) => {
569            for item in items {
570                extract_with_option_value_deps(item, default_database, default_schema, deps);
571            }
572        }
573        _ => {}
574    }
575}
576
577#[cfg(test)]
578mod dependency_validation_tests {
579    use super::*;
580    use crate::project::ir::object_id::ObjectId;
581    use std::collections::BTreeSet;
582
583    fn oid(db: &str, schema: &str, obj: &str) -> ObjectId {
584        ObjectId::new(db.to_string(), schema.to_string(), obj.to_string())
585    }
586
587    #[mz_ore::test]
588    fn test_all_externals_declared() {
589        let declared = BTreeSet::from([
590            oid("ontology", "public", "customers"),
591            oid("ontology", "public", "orders"),
592        ]);
593        let discovered = BTreeSet::from([
594            oid("ontology", "public", "customers"),
595            oid("ontology", "public", "orders"),
596        ]);
597        let result = validate_dependencies(&declared, &discovered);
598        assert!(result.undeclared.is_empty());
599        assert!(result.unused.is_empty());
600    }
601
602    #[mz_ore::test]
603    fn test_undeclared_external_detected() {
604        let declared = BTreeSet::from([oid("ontology", "public", "customers")]);
605        let discovered = BTreeSet::from([
606            oid("ontology", "public", "customers"),
607            oid("ontology", "public", "orders"),
608        ]);
609        let result = validate_dependencies(&declared, &discovered);
610        assert_eq!(result.undeclared.len(), 1);
611        assert!(
612            result
613                .undeclared
614                .contains(&oid("ontology", "public", "orders"))
615        );
616        assert!(result.unused.is_empty());
617    }
618
619    #[mz_ore::test]
620    fn test_unused_declared_detected() {
621        let declared = BTreeSet::from([
622            oid("ontology", "public", "customers"),
623            oid("analytics", "public", "page_views"),
624        ]);
625        let discovered = BTreeSet::from([oid("ontology", "public", "customers")]);
626        let result = validate_dependencies(&declared, &discovered);
627        assert!(result.undeclared.is_empty());
628        assert_eq!(result.unused.len(), 1);
629        assert!(
630            result
631                .unused
632                .contains(&oid("analytics", "public", "page_views"))
633        );
634    }
635
636    #[mz_ore::test]
637    fn test_both_empty() {
638        let declared = BTreeSet::new();
639        let discovered = BTreeSet::new();
640        let result = validate_dependencies(&declared, &discovered);
641        assert!(result.undeclared.is_empty());
642        assert!(result.unused.is_empty());
643    }
644
645    #[mz_ore::test]
646    fn test_both_undeclared_and_unused() {
647        let declared = BTreeSet::from([oid("a", "b", "unused")]);
648        let discovered = BTreeSet::from([oid("x", "y", "undeclared")]);
649        let result = validate_dependencies(&declared, &discovered);
650        assert_eq!(result.undeclared.len(), 1);
651        assert_eq!(result.unused.len(), 1);
652    }
653
654    #[cfg_attr(miri, ignore)] // unsupported operation: can't call foreign function `rust_psm_stack_pointer` on OS `linux`
655    #[mz_ore::test]
656    fn test_simple_cte_shadowing_records_catalog_dependency() {
657        // QA Finding 1: a simple CTE that shadows a catalog object and references
658        // that object inside its own body must record a dependency on the catalog
659        // object. The inner `products` resolves to the catalog table (only earlier
660        // siblings are in scope while a simple CTE body is visited); the outer
661        // `products` resolves to the CTE.
662        use crate::project::syntax::parser::parse_statements;
663
664        let stmts = parse_statements(vec![
665            "CREATE VIEW v AS \
666             WITH products AS (SELECT id FROM products WHERE active) \
667             SELECT * FROM products",
668        ])
669        .unwrap();
670
671        let query = match &stmts[0] {
672            mz_sql_parser::ast::Statement::CreateView(c) => c.definition.query.clone(),
673            _ => panic!("expected CREATE VIEW"),
674        };
675
676        let mut visitor = DependencyVisitor::new("materialize", "public");
677        visitor.visit_query(&query);
678
679        assert!(
680            visitor
681                .deps
682                .contains(&oid("materialize", "public", "products")),
683            "dependency on the catalog table shadowed by the simple CTE should be \
684             collected, got: {:?}",
685            visitor.deps
686        );
687    }
688}