Skip to main content

mz_deploy/project/analysis/
topology.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Project-graph traversal and topology methods.
11//!
12//! This module extends [`graph::Project`](crate::project::ir::graph::Project)
13//! with traversal and accessor methods. The core operation is a DFS-based
14//! topological sort that produces a deployment-safe ordering where every
15//! dependency precedes its dependents. External dependencies are excluded
16//! from traversal. A cycle produces
17//! [`DependencyError::CircularDependency`].
18//!
19//! Accessor methods (`iter_objects`, `find_object`, `get_tables`) flatten the
20//! `database > schema > object` hierarchy into simple iterators.
21
22use super::super::ast::Statement;
23use super::super::error::DependencyError;
24use crate::project::ir::object_id::ObjectId;
25use crate::project::ir::{
26    compiled,
27    graph::{DatabaseObject, ModStatement, Project},
28};
29use std::collections::{BTreeMap, BTreeSet};
30
31impl Project {
32    /// Get topologically sorted objects for deployment.
33    ///
34    /// Returns objects in an order where dependencies come before dependents.
35    /// External dependencies are excluded from the sort as they are not deployable.
36    pub fn topological_sort(&self) -> Result<Vec<ObjectId>, DependencyError> {
37        let mut sorted = Vec::new();
38        let mut visited = BTreeSet::new();
39        let mut in_progress = BTreeSet::new();
40
41        for object_id in self.dependency_graph.keys() {
42            // Skip external dependencies - we don't deploy them
43            if self.external_dependencies.contains(object_id) {
44                continue;
45            }
46
47            if !visited.contains(object_id) {
48                self.visit(object_id, &mut visited, &mut in_progress, &mut sorted)?;
49            }
50        }
51
52        Ok(sorted)
53    }
54
55    /// Returns all table-like objects (`CREATE TABLE` and `CREATE TABLE ... FROM SOURCE`).
56    pub fn get_tables(&self) -> impl Iterator<Item = ObjectId> {
57        self.databases
58            .iter()
59            .flat_map(|db| db.schemas.iter())
60            .flat_map(|schema| schema.objects.iter())
61            .filter(|object| {
62                matches!(
63                    object.typed_object.stmt,
64                    Statement::CreateTable(_) | Statement::CreateTableFromSource(_)
65                )
66            })
67            .map(|object| object.id.clone())
68    }
69
70    /// Returns only `CREATE TABLE ... FROM SOURCE` objects.
71    ///
72    /// Unlike [`get_tables`](Self::get_tables), this excludes plain `CREATE TABLE`
73    /// objects whose schemas can be derived from the SQL AST. Only
74    /// `CreateTableFromSource` tables need their columns queried from the live
75    /// server (via `types.lock`), because their columns depend on the external source.
76    pub fn get_tables_from_source(&self) -> impl Iterator<Item = ObjectId> + '_ {
77        self.databases
78            .iter()
79            .flat_map(|db| db.schemas.iter())
80            .flat_map(|schema| schema.objects.iter())
81            .filter(|object| {
82                matches!(
83                    object.typed_object.stmt,
84                    Statement::CreateTableFromSource(_)
85                )
86            })
87            .map(|object| object.id.clone())
88    }
89
90    fn visit(
91        &self,
92        object_id: &ObjectId,
93        visited: &mut BTreeSet<ObjectId>,
94        in_progress: &mut BTreeSet<ObjectId>,
95        sorted: &mut Vec<ObjectId>,
96    ) -> Result<(), DependencyError> {
97        if self.external_dependencies.contains(object_id) {
98            return Ok(());
99        }
100
101        if in_progress.contains(object_id) {
102            return Err(DependencyError::CircularDependency {
103                object: object_id.clone(),
104            });
105        }
106
107        if visited.contains(object_id) {
108            return Ok(());
109        }
110
111        in_progress.insert(object_id.clone());
112
113        if let Some(deps) = self.dependency_graph.get(object_id) {
114            for dep in deps {
115                self.visit(dep, visited, in_progress, sorted)?;
116            }
117        }
118
119        in_progress.remove(object_id);
120        visited.insert(object_id.clone());
121        sorted.push(object_id.clone());
122
123        Ok(())
124    }
125
126    /// Get all database objects in topological order with their compiled statements.
127    ///
128    /// Returns a vector of `(ObjectId, compiled object)` tuples in deployment order.
129    /// This allows access to the fully qualified SQL statements for each object.
130    pub fn get_sorted_objects(
131        &self,
132    ) -> Result<Vec<(ObjectId, &compiled::DatabaseObject)>, DependencyError> {
133        let sorted_ids = self.topological_sort()?;
134        let mut result = Vec::new();
135
136        for object_id in sorted_ids {
137            // Find the corresponding typed object
138            if let Some(typed_obj) = self.find_typed_object(&object_id) {
139                result.push((object_id, typed_obj));
140            }
141        }
142
143        Ok(result)
144    }
145
146    /// Find the compiled object for a given ObjectId.
147    fn find_typed_object(&self, object_id: &ObjectId) -> Option<&compiled::DatabaseObject> {
148        for database in &self.databases {
149            if Some(database.name.as_str()) != object_id.database() {
150                continue;
151            }
152            for schema in &database.schemas {
153                if schema.name != object_id.schema() {
154                    continue;
155                }
156                for obj in &schema.objects {
157                    if obj.id == *object_id {
158                        return Some(&obj.typed_object);
159                    }
160                }
161            }
162        }
163        None
164    }
165
166    /// Returns all module-level statements in execution order.
167    ///
168    /// Module statements are executed before object statements and come from
169    /// database.sql or schema.sql files. They're used for setup like grants,
170    /// comments, and other database/schema-level configuration.
171    ///
172    /// # Execution Order
173    ///
174    /// 1. All database-level mod statements (in the order databases appear)
175    /// 2. All schema-level mod statements (in the order schemas appear)
176    ///
177    /// This ensures that database setup happens before schema setup, which
178    /// happens before object creation.
179    ///
180    /// # Returns
181    ///
182    /// A vector of `ModStatement` enums, each containing:
183    /// - Context (database name, schema name for schema-level statements)
184    /// - Reference to the statement to execute
185    pub fn iter_mod_statements(&self) -> Vec<ModStatement<'_>> {
186        let mut result = Vec::new();
187
188        // First: all database-level mod statements
189        for database in &self.databases {
190            if let Some(stmts) = &database.mod_statements {
191                for stmt in stmts {
192                    result.push(ModStatement::Database {
193                        database: &database.name,
194                        statement: stmt,
195                    });
196                }
197            }
198        }
199
200        // Second: all schema-level mod statements
201        for database in &self.databases {
202            for schema in &database.schemas {
203                if let Some(stmts) = &schema.mod_statements {
204                    for stmt in stmts {
205                        // Skip SET api — it's a directive for mz-deploy,
206                        // not SQL to send to Materialize.
207                        if matches!(stmt, mz_sql_parser::ast::Statement::SetVariable(s) if s.variable.as_str().eq_ignore_ascii_case("api"))
208                        {
209                            continue;
210                        }
211                        result.push(ModStatement::Schema {
212                            database: &database.name,
213                            schema: &schema.name,
214                            statement: stmt,
215                        });
216                    }
217                }
218            }
219        }
220
221        result
222    }
223
224    /// Build a reverse dependency graph.
225    ///
226    /// Maps each object to the set of objects that depend on it.
227    /// Used for incremental deployment to find downstream dependencies.
228    ///
229    /// # Returns
230    /// HashMap where key is an ObjectId and value is the set of objects that depend on it
231    pub fn build_reverse_dependency_graph(&self) -> BTreeMap<ObjectId, BTreeSet<ObjectId>> {
232        let mut reverse: BTreeMap<ObjectId, BTreeSet<ObjectId>> = BTreeMap::new();
233
234        for (obj_id, deps) in &self.dependency_graph {
235            for dep in deps {
236                reverse
237                    .entry(dep.clone())
238                    .or_default()
239                    .insert(obj_id.clone());
240            }
241        }
242
243        reverse
244    }
245
246    /// Get topologically sorted objects filtered by a set of object IDs.
247    ///
248    /// Returns objects in deployment order, but only those in the filter set.
249    /// Maintains topological ordering within the filtered subset.
250    ///
251    /// # Arguments
252    /// * `filter` - Set of ObjectIds to include in the result
253    ///
254    /// # Returns
255    /// Vector of `(ObjectId, compiled object)` tuples in deployment order
256    pub fn get_sorted_objects_filtered(
257        &self,
258        filter: &BTreeSet<ObjectId>,
259    ) -> Result<Vec<(ObjectId, &compiled::DatabaseObject)>, DependencyError> {
260        let sorted_ids = self.topological_sort()?;
261
262        // Filter to only include objects in the filter set
263        let filtered_ids: Vec<ObjectId> = sorted_ids
264            .into_iter()
265            .filter(|id| filter.contains(id))
266            .collect();
267
268        let mut result = Vec::new();
269        for object_id in filtered_ids {
270            if let Some(typed_obj) = self.find_typed_object(&object_id) {
271                result.push((object_id, typed_obj));
272            }
273        }
274
275        Ok(result)
276    }
277
278    /// Iterate over all database objects in the project.
279    ///
280    /// This flattens the database → schema → object hierarchy into a single iterator.
281    ///
282    /// # Returns
283    /// Iterator over references to all DatabaseObject instances in the project
284    ///
285    /// # Example
286    /// ```ignore
287    /// for obj in project.iter_objects() {
288    ///     println!("Object: {}", obj.id);
289    /// }
290    /// ```
291    pub fn iter_objects(&self) -> impl Iterator<Item = &DatabaseObject> {
292        self.databases
293            .iter()
294            .flat_map(|db| db.schemas.iter())
295            .flat_map(|schema| schema.objects.iter())
296    }
297
298    /// Find a database object by its ObjectId.
299    ///
300    /// This is more efficient than manually iterating through the hierarchy.
301    ///
302    /// # Arguments
303    /// * `id` - The ObjectId to search for
304    ///
305    /// # Returns
306    /// Some(&DatabaseObject) if found, None otherwise
307    ///
308    /// # Example
309    /// ```ignore
310    /// if let Some(obj) = project.find_object(&object_id) {
311    ///     println!("Found: {}", obj.id);
312    /// }
313    /// ```
314    pub fn find_object(&self, id: &ObjectId) -> Option<&DatabaseObject> {
315        self.iter_objects().find(|obj| &obj.id == id)
316    }
317
318    /// Validate that sources and sinks don't share clusters with indexes or materialized views.
319    ///
320    /// This validation prevents accidentally recreating sources/sinks when updating compute objects.
321    ///
322    /// # Arguments
323    /// * `sources_by_cluster` - Map of cluster name to list of source FQNs from the database
324    ///
325    /// # Returns
326    /// * `Ok(())` if no conflicts found
327    /// * `Err((cluster_name, compute_objects, storage_objects))` if conflicts detected
328    pub fn validate_cluster_isolation(
329        &self,
330        sources_by_cluster: &BTreeMap<String, Vec<String>>,
331    ) -> Result<(), (String, Vec<String>, Vec<String>)> {
332        super::graph_validation::validate_cluster_isolation(self, sources_by_cluster)
333    }
334}