Skip to main content

mz_deploy/project/compiler/object_validation/
clusters.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Cluster validation for object statements.
11//!
12//! Validates that indexes, materialized views, sinks, and sources specify
13//! which cluster they run on using the `IN CLUSTER` clause.
14
15use super::identifiers::validate_cluster_name;
16use crate::project::ast::Statement;
17use crate::project::error::{ValidationError, ValidationErrorKind};
18use crate::project::ir::compiled::FullyQualifiedName;
19use mz_sql_parser::ast::*;
20
21/// Validates that all indexes specify a cluster.
22///
23/// Indexes in Materialize must specify which cluster they run on using the IN CLUSTER clause.
24/// This ensures deterministic deployment and avoids implicit cluster selection.
25///
26/// # Example
27///
28/// Valid:
29/// ```sql
30/// CREATE INDEX idx ON table (col) IN CLUSTER quickstart;
31/// ```
32///
33/// Invalid:
34/// ```sql
35/// CREATE INDEX idx ON table (col);  -- missing cluster
36/// ```
37pub(super) fn validate_index_clusters(
38    fqn: &FullyQualifiedName,
39    indexes: &[CreateIndexStatement<Raw>],
40    offsets: &[usize],
41    errors: &mut Vec<ValidationError>,
42) {
43    for (i, index) in indexes.iter().enumerate() {
44        let offset = offsets[i];
45        match &index.in_cluster {
46            None => {
47                let index_sql = format!("{};", index);
48                let index_name = index
49                    .name
50                    .as_ref()
51                    .map(|n| n.to_string())
52                    .unwrap_or_else(|| "<unnamed>".to_string());
53
54                errors.push(ValidationError::with_file_sql_and_offset(
55                    ValidationErrorKind::IndexMissingCluster { index_name },
56                    fqn.path.clone(),
57                    index_sql,
58                    offset,
59                ));
60            }
61            Some(cluster) => {
62                // Validate cluster name format
63                let cluster_name = cluster.to_string();
64                if let Err(e) = validate_cluster_name(&cluster_name, &fqn.path, offset) {
65                    errors.push(e);
66                }
67            }
68        }
69    }
70}
71
72/// Validates that indexes are only defined on views and materialized views.
73///
74/// Tables and sources are storage objects whose lifecycle `mz-deploy` manages
75/// through `apply`; an index on one is not supported.
76pub(super) fn validate_indexes_supported(
77    fqn: &FullyQualifiedName,
78    stmt: &Statement,
79    indexes: &[CreateIndexStatement<Raw>],
80    offsets: &[usize],
81    errors: &mut Vec<ValidationError>,
82) {
83    let object_type = match stmt {
84        Statement::CreateTable(_) | Statement::CreateTableFromSource(_) => "table",
85        Statement::CreateSource(_) => "source",
86        _ => return,
87    };
88
89    for (i, index) in indexes.iter().enumerate() {
90        let index_name = index
91            .name
92            .as_ref()
93            .map(|n| n.to_string())
94            .unwrap_or_else(|| "<unnamed>".to_string());
95        let index_sql = format!("{};", index);
96
97        errors.push(ValidationError::with_file_sql_and_offset(
98            ValidationErrorKind::IndexOnStorageObject {
99                object_type: object_type.to_string(),
100                object_name: fqn.object().to_string(),
101                index_name,
102            },
103            fqn.path.clone(),
104            index_sql,
105            offsets[i],
106        ));
107    }
108}
109
110/// Validates that a materialized view specifies a cluster.
111///
112/// Materialized views in Materialize must specify which cluster they run on using the IN CLUSTER clause.
113/// This ensures deterministic deployment and avoids implicit cluster selection.
114///
115/// # Example
116///
117/// Valid:
118/// ```sql
119/// CREATE MATERIALIZED VIEW mv IN CLUSTER quickstart AS SELECT ...;
120/// ```
121///
122/// Invalid:
123/// ```sql
124/// CREATE MATERIALIZED VIEW mv AS SELECT ...;  -- missing cluster
125/// ```
126pub(super) fn validate_mv_cluster(
127    fqn: &FullyQualifiedName,
128    stmt: &Statement,
129    main_offset: usize,
130    errors: &mut Vec<ValidationError>,
131) {
132    if let Statement::CreateMaterializedView(mv) = stmt {
133        match &mv.in_cluster {
134            None => {
135                let mv_sql = format!("{};", mv);
136                let view_name = mv.name.to_string();
137
138                errors.push(ValidationError::with_file_sql_and_offset(
139                    ValidationErrorKind::MaterializedViewMissingCluster { view_name },
140                    fqn.path.clone(),
141                    mv_sql,
142                    main_offset,
143                ));
144            }
145            Some(cluster) => {
146                // Validate cluster name format
147                let cluster_name = cluster.to_string();
148                if let Err(e) = validate_cluster_name(&cluster_name, &fqn.path, main_offset) {
149                    errors.push(e);
150                }
151            }
152        }
153    }
154}
155
156/// Validates that a sink specifies a cluster.
157///
158/// Sinks in Materialize must specify which cluster they run on using the IN CLUSTER clause.
159/// This ensures deterministic deployment and avoids implicit cluster selection.
160///
161/// # Example
162///
163/// Valid:
164/// ```sql
165/// CREATE SINK sink IN CLUSTER quickstart FROM table INTO ...;
166/// ```
167///
168/// Invalid:
169/// ```sql
170/// CREATE SINK sink FROM table INTO ...;  -- missing cluster
171/// ```
172pub(super) fn validate_sink_cluster(
173    fqn: &FullyQualifiedName,
174    stmt: &Statement,
175    main_offset: usize,
176    errors: &mut Vec<ValidationError>,
177) {
178    if let Statement::CreateSink(sink) = stmt {
179        match &sink.in_cluster {
180            None => {
181                let sink_sql = format!("{};", sink);
182                let sink_name = sink
183                    .name
184                    .as_ref()
185                    .map(|n| n.to_string())
186                    .unwrap_or_else(|| "<unnamed>".to_string());
187
188                errors.push(ValidationError::with_file_sql_and_offset(
189                    ValidationErrorKind::SinkMissingCluster { sink_name },
190                    fqn.path.clone(),
191                    sink_sql,
192                    main_offset,
193                ));
194            }
195            Some(cluster) => {
196                // Validate cluster name format
197                let cluster_name = cluster.to_string();
198                if let Err(e) = validate_cluster_name(&cluster_name, &fqn.path, main_offset) {
199                    errors.push(e);
200                }
201            }
202        }
203    }
204}
205
206/// Validates that a CREATE SOURCE statement has a required IN CLUSTER clause.
207pub(super) fn validate_source_cluster(
208    fqn: &FullyQualifiedName,
209    stmt: &Statement,
210    main_offset: usize,
211    errors: &mut Vec<ValidationError>,
212) {
213    if let Statement::CreateSource(source) = stmt {
214        match &source.in_cluster {
215            None => {
216                let source_sql = format!("{};", source);
217                errors.push(ValidationError::with_file_sql_and_offset(
218                    ValidationErrorKind::SourceMissingCluster {
219                        source_name: source.name.to_string(),
220                    },
221                    fqn.path.clone(),
222                    source_sql,
223                    main_offset,
224                ));
225            }
226            Some(cluster) => {
227                let cluster_name = cluster.to_string();
228                if let Err(e) = validate_cluster_name(&cluster_name, &fqn.path, main_offset) {
229                    errors.push(e);
230                }
231            }
232        }
233
234        if source.external_references.is_some() {
235            let source_sql = format!("{};", source);
236            errors.push(ValidationError::with_file_sql_and_offset(
237                ValidationErrorKind::SourceExternalReferences {
238                    source_name: source.name.to_string(),
239                },
240                fqn.path.clone(),
241                source_sql,
242                main_offset,
243            ));
244        }
245    }
246}