mz_deploy/project/analysis/topology.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Project-graph traversal and topology methods.
11//!
12//! This module extends [`graph::Project`](crate::project::ir::graph::Project)
13//! with traversal and accessor methods. The core operation is a DFS-based
14//! topological sort that produces a deployment-safe ordering where every
15//! dependency precedes its dependents. External dependencies are excluded
16//! from traversal. A cycle produces
17//! [`DependencyError::CircularDependency`].
18//!
19//! Accessor methods (`iter_objects`, `find_object`, `get_tables`) flatten the
20//! `database > schema > object` hierarchy into simple iterators.
21
22use super::super::ast::Statement;
23use super::super::error::DependencyError;
24use crate::project::ir::object_id::ObjectId;
25use crate::project::ir::{
26 compiled,
27 graph::{DatabaseObject, ModStatement, Project},
28};
29use std::collections::{BTreeMap, BTreeSet};
30
31impl Project {
32 /// Get topologically sorted objects for deployment.
33 ///
34 /// Returns objects in an order where dependencies come before dependents.
35 /// External dependencies are excluded from the sort as they are not deployable.
36 pub fn topological_sort(&self) -> Result<Vec<ObjectId>, DependencyError> {
37 let mut sorted = Vec::new();
38 let mut visited = BTreeSet::new();
39 let mut in_progress = BTreeSet::new();
40
41 for object_id in self.dependency_graph.keys() {
42 // Skip external dependencies - we don't deploy them
43 if self.external_dependencies.contains(object_id) {
44 continue;
45 }
46
47 if !visited.contains(object_id) {
48 self.visit(object_id, &mut visited, &mut in_progress, &mut sorted)?;
49 }
50 }
51
52 Ok(sorted)
53 }
54
55 /// Returns all table-like objects (`CREATE TABLE` and `CREATE TABLE ... FROM SOURCE`).
56 pub fn get_tables(&self) -> impl Iterator<Item = ObjectId> {
57 self.databases
58 .iter()
59 .flat_map(|db| db.schemas.iter())
60 .flat_map(|schema| schema.objects.iter())
61 .filter(|object| {
62 matches!(
63 object.typed_object.stmt,
64 Statement::CreateTable(_) | Statement::CreateTableFromSource(_)
65 )
66 })
67 .map(|object| object.id.clone())
68 }
69
70 /// Returns only `CREATE TABLE ... FROM SOURCE` objects.
71 ///
72 /// Unlike [`get_tables`](Self::get_tables), this excludes plain `CREATE TABLE`
73 /// objects whose schemas can be derived from the SQL AST. Only
74 /// `CreateTableFromSource` tables need their columns queried from the live
75 /// server (via `types.lock`), because their columns depend on the external source.
76 pub fn get_tables_from_source(&self) -> impl Iterator<Item = ObjectId> + '_ {
77 self.databases
78 .iter()
79 .flat_map(|db| db.schemas.iter())
80 .flat_map(|schema| schema.objects.iter())
81 .filter(|object| {
82 matches!(
83 object.typed_object.stmt,
84 Statement::CreateTableFromSource(_)
85 )
86 })
87 .map(|object| object.id.clone())
88 }
89
90 fn visit(
91 &self,
92 object_id: &ObjectId,
93 visited: &mut BTreeSet<ObjectId>,
94 in_progress: &mut BTreeSet<ObjectId>,
95 sorted: &mut Vec<ObjectId>,
96 ) -> Result<(), DependencyError> {
97 if self.external_dependencies.contains(object_id) {
98 return Ok(());
99 }
100
101 if in_progress.contains(object_id) {
102 return Err(DependencyError::CircularDependency {
103 object: object_id.clone(),
104 });
105 }
106
107 if visited.contains(object_id) {
108 return Ok(());
109 }
110
111 in_progress.insert(object_id.clone());
112
113 if let Some(deps) = self.dependency_graph.get(object_id) {
114 for dep in deps {
115 self.visit(dep, visited, in_progress, sorted)?;
116 }
117 }
118
119 in_progress.remove(object_id);
120 visited.insert(object_id.clone());
121 sorted.push(object_id.clone());
122
123 Ok(())
124 }
125
126 /// Get all database objects in topological order with their compiled statements.
127 ///
128 /// Returns a vector of `(ObjectId, compiled object)` tuples in deployment order.
129 /// This allows access to the fully qualified SQL statements for each object.
130 pub fn get_sorted_objects(
131 &self,
132 ) -> Result<Vec<(ObjectId, &compiled::DatabaseObject)>, DependencyError> {
133 let sorted_ids = self.topological_sort()?;
134 let mut result = Vec::new();
135
136 for object_id in sorted_ids {
137 // Find the corresponding typed object
138 if let Some(typed_obj) = self.find_typed_object(&object_id) {
139 result.push((object_id, typed_obj));
140 }
141 }
142
143 Ok(result)
144 }
145
146 /// Find the compiled object for a given ObjectId.
147 fn find_typed_object(&self, object_id: &ObjectId) -> Option<&compiled::DatabaseObject> {
148 for database in &self.databases {
149 if Some(database.name.as_str()) != object_id.database() {
150 continue;
151 }
152 for schema in &database.schemas {
153 if schema.name != object_id.schema() {
154 continue;
155 }
156 for obj in &schema.objects {
157 if obj.id == *object_id {
158 return Some(&obj.typed_object);
159 }
160 }
161 }
162 }
163 None
164 }
165
166 /// Returns all module-level statements in execution order.
167 ///
168 /// Module statements are executed before object statements and come from
169 /// database.sql or schema.sql files. They're used for setup like grants,
170 /// comments, and other database/schema-level configuration.
171 ///
172 /// # Execution Order
173 ///
174 /// 1. All database-level mod statements (in the order databases appear)
175 /// 2. All schema-level mod statements (in the order schemas appear)
176 ///
177 /// This ensures that database setup happens before schema setup, which
178 /// happens before object creation.
179 ///
180 /// # Returns
181 ///
182 /// A vector of `ModStatement` enums, each containing:
183 /// - Context (database name, schema name for schema-level statements)
184 /// - Reference to the statement to execute
185 pub fn iter_mod_statements(&self) -> Vec<ModStatement<'_>> {
186 let mut result = Vec::new();
187
188 // First: all database-level mod statements
189 for database in &self.databases {
190 if let Some(stmts) = &database.mod_statements {
191 for stmt in stmts {
192 result.push(ModStatement::Database {
193 database: &database.name,
194 statement: stmt,
195 });
196 }
197 }
198 }
199
200 // Second: all schema-level mod statements
201 for database in &self.databases {
202 for schema in &database.schemas {
203 if let Some(stmts) = &schema.mod_statements {
204 for stmt in stmts {
205 // Skip SET api — it's a directive for mz-deploy,
206 // not SQL to send to Materialize.
207 if matches!(stmt, mz_sql_parser::ast::Statement::SetVariable(s) if s.variable.as_str().eq_ignore_ascii_case("api"))
208 {
209 continue;
210 }
211 result.push(ModStatement::Schema {
212 database: &database.name,
213 schema: &schema.name,
214 statement: stmt,
215 });
216 }
217 }
218 }
219 }
220
221 result
222 }
223
224 /// Build a reverse dependency graph.
225 ///
226 /// Maps each object to the set of objects that depend on it.
227 /// Used for incremental deployment to find downstream dependencies.
228 ///
229 /// # Returns
230 /// HashMap where key is an ObjectId and value is the set of objects that depend on it
231 pub fn build_reverse_dependency_graph(&self) -> BTreeMap<ObjectId, BTreeSet<ObjectId>> {
232 let mut reverse: BTreeMap<ObjectId, BTreeSet<ObjectId>> = BTreeMap::new();
233
234 for (obj_id, deps) in &self.dependency_graph {
235 for dep in deps {
236 reverse
237 .entry(dep.clone())
238 .or_default()
239 .insert(obj_id.clone());
240 }
241 }
242
243 reverse
244 }
245
246 /// Get topologically sorted objects filtered by a set of object IDs.
247 ///
248 /// Returns objects in deployment order, but only those in the filter set.
249 /// Maintains topological ordering within the filtered subset.
250 ///
251 /// # Arguments
252 /// * `filter` - Set of ObjectIds to include in the result
253 ///
254 /// # Returns
255 /// Vector of `(ObjectId, compiled object)` tuples in deployment order
256 pub fn get_sorted_objects_filtered(
257 &self,
258 filter: &BTreeSet<ObjectId>,
259 ) -> Result<Vec<(ObjectId, &compiled::DatabaseObject)>, DependencyError> {
260 let sorted_ids = self.topological_sort()?;
261
262 // Filter to only include objects in the filter set
263 let filtered_ids: Vec<ObjectId> = sorted_ids
264 .into_iter()
265 .filter(|id| filter.contains(id))
266 .collect();
267
268 let mut result = Vec::new();
269 for object_id in filtered_ids {
270 if let Some(typed_obj) = self.find_typed_object(&object_id) {
271 result.push((object_id, typed_obj));
272 }
273 }
274
275 Ok(result)
276 }
277
278 /// Iterate over all database objects in the project.
279 ///
280 /// This flattens the database → schema → object hierarchy into a single iterator.
281 ///
282 /// # Returns
283 /// Iterator over references to all DatabaseObject instances in the project
284 ///
285 /// # Example
286 /// ```ignore
287 /// for obj in project.iter_objects() {
288 /// println!("Object: {}", obj.id);
289 /// }
290 /// ```
291 pub fn iter_objects(&self) -> impl Iterator<Item = &DatabaseObject> {
292 self.databases
293 .iter()
294 .flat_map(|db| db.schemas.iter())
295 .flat_map(|schema| schema.objects.iter())
296 }
297
298 /// Find a database object by its ObjectId.
299 ///
300 /// This is more efficient than manually iterating through the hierarchy.
301 ///
302 /// # Arguments
303 /// * `id` - The ObjectId to search for
304 ///
305 /// # Returns
306 /// Some(&DatabaseObject) if found, None otherwise
307 ///
308 /// # Example
309 /// ```ignore
310 /// if let Some(obj) = project.find_object(&object_id) {
311 /// println!("Found: {}", obj.id);
312 /// }
313 /// ```
314 pub fn find_object(&self, id: &ObjectId) -> Option<&DatabaseObject> {
315 self.iter_objects().find(|obj| &obj.id == id)
316 }
317
318 /// Validate that sources and sinks don't share clusters with indexes or materialized views.
319 ///
320 /// This validation prevents accidentally recreating sources/sinks when updating compute objects.
321 ///
322 /// # Arguments
323 /// * `sources_by_cluster` - Map of cluster name to list of source FQNs from the database
324 ///
325 /// # Returns
326 /// * `Ok(())` if no conflicts found
327 /// * `Err((cluster_name, compute_objects, storage_objects))` if conflicts detected
328 pub fn validate_cluster_isolation(
329 &self,
330 sources_by_cluster: &BTreeMap<String, Vec<String>>,
331 ) -> Result<(), (String, Vec<String>, Vec<String>)> {
332 super::graph_validation::validate_cluster_isolation(self, sources_by_cluster)
333 }
334}