mz_deploy/project/analysis/graph_validation.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Deployment-time validation for project graphs.
11//!
12//! These validations check runtime constraints that require the full graph
13//! representation (dependency graph, cluster assignments, etc.) rather than
14//! the per-object checks performed during object compilation.
15//!
16//! ## Validations
17//!
18//! - **Cluster isolation** ([`validate_cluster_isolation`]): Ensures that
19//! sources/sinks do not share a cluster with materialized views or indexes.
20//! During a blue/green swap, **all** objects on a cluster are affected. If
21//! storage and compute objects share a cluster, an MV update would force
22//! source recreation — an expensive and disruptive operation.
23
24use super::super::ast::Statement;
25use crate::project::ir::graph::Project;
26use std::collections::{BTreeMap, BTreeSet};
27
28/// Validate that sources and sinks don't share clusters with indexes or materialized views.
29///
30/// This validation prevents accidentally recreating sources/sinks when updating compute objects.
31/// During a blue/green swap, all objects on a cluster are affected — mixing storage and compute
32/// objects on the same cluster means a view update could trigger source recreation.
33///
34/// # Arguments
35/// * `project` - The project graph to validate
36/// * `sources_by_cluster` - Map of cluster name to list of source FQNs from the database
37///
38/// # Returns
39/// * `Ok(())` if no conflicts found
40/// * `Err((cluster_name, compute_objects, storage_objects))` if conflicts detected
41pub(crate) fn validate_cluster_isolation(
42 project: &Project,
43 sources_by_cluster: &BTreeMap<String, Vec<String>>,
44) -> Result<(), (String, Vec<String>, Vec<String>)> {
45 // Build a map of cluster -> compute objects (indexes, MVs)
46 let mut cluster_compute_objects: BTreeMap<String, Vec<String>> = BTreeMap::new();
47
48 for db in &project.databases {
49 for schema in &db.schemas {
50 for obj in &schema.objects {
51 // Check for materialized views
52 if let Statement::CreateMaterializedView(mv) = &obj.typed_object.stmt {
53 if let Some(cluster_name) = &mv.in_cluster {
54 cluster_compute_objects
55 .entry(cluster_name.to_string())
56 .or_default()
57 .push(obj.id.to_string());
58 }
59 }
60
61 // Check for indexes
62 for index in &obj.typed_object.indexes {
63 if let Some(cluster_name) = &index.in_cluster {
64 let index_name = index
65 .name
66 .as_ref()
67 .map(|n| format!(" (index: {})", n))
68 .unwrap_or_default();
69 cluster_compute_objects
70 .entry(cluster_name.to_string())
71 .or_default()
72 .push(format!("{}{}", obj.id, index_name));
73 }
74 }
75 }
76 }
77 }
78
79 // Build a map of cluster -> sinks
80 let mut cluster_sinks: BTreeMap<String, Vec<String>> = BTreeMap::new();
81
82 for db in &project.databases {
83 for schema in &db.schemas {
84 for obj in &schema.objects {
85 if let Statement::CreateSink(sink) = &obj.typed_object.stmt {
86 if let Some(cluster_name) = &sink.in_cluster {
87 cluster_sinks
88 .entry(cluster_name.to_string())
89 .or_default()
90 .push(obj.id.to_string());
91 }
92 }
93 }
94 }
95 }
96
97 // Get all clusters that have compute objects or sinks
98 let mut all_clusters: BTreeSet<String> = BTreeSet::new();
99 all_clusters.extend(cluster_compute_objects.keys().cloned());
100 all_clusters.extend(cluster_sinks.keys().cloned());
101
102 // Check for conflicts: cluster has both compute objects AND (sources OR sinks)
103 for cluster_name in all_clusters {
104 let compute_objects = cluster_compute_objects.get(&cluster_name);
105 let sources = sources_by_cluster.get(&cluster_name);
106 let sinks = cluster_sinks.get(&cluster_name);
107
108 let has_compute = compute_objects.is_some() && !compute_objects.unwrap().is_empty();
109 let has_sources = sources.is_some() && !sources.unwrap().is_empty();
110 let has_sinks = sinks.is_some() && !sinks.unwrap().is_empty();
111
112 if has_compute && (has_sources || has_sinks) {
113 let mut storage_objects = Vec::new();
114 if let Some(sources) = sources {
115 storage_objects.extend(sources.iter().cloned());
116 }
117 if let Some(sinks) = sinks {
118 storage_objects.extend(sinks.iter().cloned());
119 }
120
121 return Err((
122 cluster_name,
123 compute_objects.unwrap().clone(),
124 storage_objects,
125 ));
126 }
127 }
128
129 Ok(())
130}