Skip to main content

mz_deploy/project/analysis/
graph_validation.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Deployment-time validation for project graphs.
11//!
12//! These validations check runtime constraints that require the full graph
13//! representation (dependency graph, cluster assignments, etc.) rather than
14//! the per-object checks performed during object compilation.
15//!
16//! ## Validations
17//!
18//! - **Cluster isolation** ([`validate_cluster_isolation`]): Ensures that
19//!   sources/sinks do not share a cluster with materialized views or indexes.
20//!   During a blue/green swap, **all** objects on a cluster are affected. If
21//!   storage and compute objects share a cluster, an MV update would force
22//!   source recreation — an expensive and disruptive operation.
23
24use super::super::ast::Statement;
25use crate::project::ir::graph::Project;
26use std::collections::{BTreeMap, BTreeSet};
27
28/// Validate that sources and sinks don't share clusters with indexes or materialized views.
29///
30/// This validation prevents accidentally recreating sources/sinks when updating compute objects.
31/// During a blue/green swap, all objects on a cluster are affected — mixing storage and compute
32/// objects on the same cluster means a view update could trigger source recreation.
33///
34/// # Arguments
35/// * `project` - The project graph to validate
36/// * `sources_by_cluster` - Map of cluster name to list of source FQNs from the database
37///
38/// # Returns
39/// * `Ok(())` if no conflicts found
40/// * `Err((cluster_name, compute_objects, storage_objects))` if conflicts detected
41pub(crate) fn validate_cluster_isolation(
42    project: &Project,
43    sources_by_cluster: &BTreeMap<String, Vec<String>>,
44) -> Result<(), (String, Vec<String>, Vec<String>)> {
45    // Build a map of cluster -> compute objects (indexes, MVs)
46    let mut cluster_compute_objects: BTreeMap<String, Vec<String>> = BTreeMap::new();
47
48    for db in &project.databases {
49        for schema in &db.schemas {
50            for obj in &schema.objects {
51                // Check for materialized views
52                if let Statement::CreateMaterializedView(mv) = &obj.typed_object.stmt {
53                    if let Some(cluster_name) = &mv.in_cluster {
54                        cluster_compute_objects
55                            .entry(cluster_name.to_string())
56                            .or_default()
57                            .push(obj.id.to_string());
58                    }
59                }
60
61                // Check for indexes
62                for index in &obj.typed_object.indexes {
63                    if let Some(cluster_name) = &index.in_cluster {
64                        let index_name = index
65                            .name
66                            .as_ref()
67                            .map(|n| format!(" (index: {})", n))
68                            .unwrap_or_default();
69                        cluster_compute_objects
70                            .entry(cluster_name.to_string())
71                            .or_default()
72                            .push(format!("{}{}", obj.id, index_name));
73                    }
74                }
75            }
76        }
77    }
78
79    // Build a map of cluster -> sinks
80    let mut cluster_sinks: BTreeMap<String, Vec<String>> = BTreeMap::new();
81
82    for db in &project.databases {
83        for schema in &db.schemas {
84            for obj in &schema.objects {
85                if let Statement::CreateSink(sink) = &obj.typed_object.stmt {
86                    if let Some(cluster_name) = &sink.in_cluster {
87                        cluster_sinks
88                            .entry(cluster_name.to_string())
89                            .or_default()
90                            .push(obj.id.to_string());
91                    }
92                }
93            }
94        }
95    }
96
97    // Get all clusters that have compute objects or sinks
98    let mut all_clusters: BTreeSet<String> = BTreeSet::new();
99    all_clusters.extend(cluster_compute_objects.keys().cloned());
100    all_clusters.extend(cluster_sinks.keys().cloned());
101
102    // Check for conflicts: cluster has both compute objects AND (sources OR sinks)
103    for cluster_name in all_clusters {
104        let compute_objects = cluster_compute_objects.get(&cluster_name);
105        let sources = sources_by_cluster.get(&cluster_name);
106        let sinks = cluster_sinks.get(&cluster_name);
107
108        let has_compute = compute_objects.is_some() && !compute_objects.unwrap().is_empty();
109        let has_sources = sources.is_some() && !sources.unwrap().is_empty();
110        let has_sinks = sinks.is_some() && !sinks.unwrap().is_empty();
111
112        if has_compute && (has_sources || has_sinks) {
113            let mut storage_objects = Vec::new();
114            if let Some(sources) = sources {
115                storage_objects.extend(sources.iter().cloned());
116            }
117            if let Some(sinks) = sinks {
118                storage_objects.extend(sinks.iter().cloned());
119            }
120
121            return Err((
122                cluster_name,
123                compute_objects.unwrap().clone(),
124                storage_objects,
125            ));
126        }
127    }
128
129    Ok(())
130}