Skip to main content

mz_deploy/project/
clusters.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Cluster definition loading and validation.
11//!
12//! Loads cluster definitions from `<root>/clusters/` directory. Each `.sql` file
13//! defines a single cluster with a required `CREATE CLUSTER` statement and optional
14//! `GRANT` and `COMMENT` statements.
15
16use crate::project::error::{
17    LoadError, ProjectError, ValidationError, ValidationErrorKind, ValidationErrors,
18};
19use crate::project::syntax::parser::{
20    LocatedStatement, parse_statements_with_context, statement_type_name,
21};
22use crate::project::syntax::profile_files::collect_all_sql_files;
23use mz_sql_parser::ast::{
24    ClusterOptionName, CommentObjectType, CommentStatement, CreateClusterStatement,
25    GrantPrivilegesStatement, GrantTargetSpecification, GrantTargetSpecificationInner, Ident,
26    ObjectType, Raw, RawClusterName, Statement, UnresolvedObjectName, WithOptionValue,
27};
28use std::collections::BTreeMap;
29use std::path::Path;
30
31/// A parsed cluster definition from a `.sql` file in the `clusters/` directory.
32pub(crate) struct ClusterDefinition {
33    /// Cluster name (derived from filename and validated against CREATE statement).
34    pub name: String,
35    /// The CREATE CLUSTER statement.
36    pub create_stmt: CreateClusterStatement<Raw>,
37    /// Optional GRANT statements targeting this cluster.
38    pub grants: Vec<GrantPrivilegesStatement<Raw>>,
39    /// Optional COMMENT statements targeting this cluster.
40    pub comments: Vec<CommentStatement<Raw>>,
41}
42
43/// Load all cluster definitions from `<root>/clusters/`.
44///
45/// Returns an empty vec if `clusters/` doesn't exist (the directory is optional).
46/// If `profile_suffix` is provided, each cluster definition is rewritten with the
47/// suffix appended to all cluster name references (CREATE, GRANT, COMMENT).
48pub(crate) fn load_clusters(
49    root: &Path,
50    profile: &str,
51    profile_suffix: Option<&str>,
52    variables: &BTreeMap<String, String>,
53) -> Result<Vec<ClusterDefinition>, ProjectError> {
54    let clusters_dir = root.join("clusters");
55
56    if !clusters_dir.exists() {
57        return Ok(vec![]);
58    }
59
60    if !clusters_dir.is_dir() {
61        return Err(LoadError::RootNotDirectory { path: clusters_dir }.into());
62    }
63
64    let all_files = collect_all_sql_files(&clusters_dir)?;
65
66    let mut definitions = Vec::new();
67    let mut errors = Vec::new();
68
69    for object_files in all_files {
70        let expected_name = &object_files.name;
71
72        // Validate all variants independently
73        let mut all_variant_paths = Vec::new();
74        if let Some(ref default_path) = object_files.default {
75            all_variant_paths.push((default_path.clone(), None));
76        }
77        for (prof, override_path) in &object_files.overrides {
78            all_variant_paths.push((override_path.clone(), Some(prof.as_str())));
79        }
80
81        // Validate each variant independently
82        for (path, _) in &all_variant_paths {
83            let sql = std::fs::read_to_string(path).map_err(|e| LoadError::FileReadFailed {
84                path: path.clone(),
85                source: e,
86            })?;
87            let located = parse_statements_with_context(&sql, path.clone(), variables, true)?;
88
89            if let Err(mut errs) = classify_cluster_statements(expected_name, path, located) {
90                errors.append(&mut errs);
91            }
92        }
93
94        // Resolve the active variant: prefer profile match, fall back to default
95        let active_path = object_files
96            .overrides
97            .get(profile)
98            .or(object_files.default.as_ref());
99
100        let active_path = match active_path {
101            Some(p) => p.clone(),
102            None => continue, // no variant for this profile
103        };
104
105        let sql = std::fs::read_to_string(&active_path).map_err(|e| LoadError::FileReadFailed {
106            path: active_path.clone(),
107            source: e,
108        })?;
109        let located = parse_statements_with_context(&sql, active_path.clone(), variables, true)?;
110
111        match classify_cluster_statements(expected_name, &active_path, located) {
112            Ok(def) => definitions.push(def),
113            Err(mut errs) => errors.append(&mut errs),
114        }
115    }
116
117    if !errors.is_empty() {
118        return Err(ValidationErrors::new(errors).into());
119    }
120
121    // Apply cluster suffix after validation (filename-vs-declared-name check uses original names)
122    if let Some(suffix) = profile_suffix {
123        for def in &mut definitions {
124            apply_cluster_suffix(def, suffix);
125        }
126    }
127
128    Ok(definitions)
129}
130
131/// Classify parsed statements into a `ClusterDefinition`, returning validation errors.
132fn classify_cluster_statements(
133    expected_name: &str,
134    path: &Path,
135    located_statements: Vec<LocatedStatement>,
136) -> Result<ClusterDefinition, Vec<ValidationError>> {
137    let mut create_stmts: Vec<(CreateClusterStatement<Raw>, usize)> = Vec::new();
138    let mut grants: Vec<GrantPrivilegesStatement<Raw>> = Vec::new();
139    let mut comments: Vec<CommentStatement<Raw>> = Vec::new();
140    let mut errors = Vec::new();
141
142    for LocatedStatement {
143        ast: stmt,
144        byte_offset,
145    } in located_statements
146    {
147        match stmt {
148            Statement::CreateCluster(s) => {
149                create_stmts.push((s, byte_offset));
150            }
151            Statement::GrantPrivileges(s) => {
152                // Validate that the grant targets a cluster
153                match &s.target {
154                    GrantTargetSpecification::Object {
155                        object_type: ObjectType::Cluster,
156                        object_spec_inner: GrantTargetSpecificationInner::Objects { names },
157                    } => {
158                        // Validate cluster name matches
159                        for name in names {
160                            let target_name = name.to_string();
161                            if target_name.to_lowercase() != expected_name.to_lowercase() {
162                                errors.push(ValidationError::with_file_sql_and_offset(
163                                    ValidationErrorKind::ClusterGrantTargetMismatch {
164                                        target: target_name,
165                                        cluster_name: expected_name.to_string(),
166                                    },
167                                    path.to_path_buf(),
168                                    s.to_string(),
169                                    byte_offset,
170                                ));
171                            }
172                        }
173                        grants.push(s);
174                    }
175                    _ => {
176                        errors.push(ValidationError::with_file_sql_and_offset(
177                            ValidationErrorKind::InvalidClusterStatement {
178                                statement_type: "GRANT (not targeting a cluster)".to_string(),
179                                cluster_name: expected_name.to_string(),
180                            },
181                            path.to_path_buf(),
182                            s.to_string(),
183                            byte_offset,
184                        ));
185                    }
186                }
187            }
188            Statement::Comment(s) => {
189                // Validate that the comment targets a cluster
190                match &s.object {
191                    CommentObjectType::Cluster { name } => {
192                        let target_name = match name {
193                            RawClusterName::Unresolved(ident) => ident.to_string(),
194                            RawClusterName::Resolved(id) => id.clone(),
195                        };
196                        if target_name.to_lowercase() != expected_name.to_lowercase() {
197                            errors.push(ValidationError::with_file_sql_and_offset(
198                                ValidationErrorKind::ClusterCommentTargetMismatch {
199                                    target: target_name,
200                                    cluster_name: expected_name.to_string(),
201                                },
202                                path.to_path_buf(),
203                                s.to_string(),
204                                byte_offset,
205                            ));
206                        }
207                        comments.push(s);
208                    }
209                    _ => {
210                        errors.push(ValidationError::with_file_sql_and_offset(
211                            ValidationErrorKind::InvalidClusterStatement {
212                                statement_type: "COMMENT (not targeting a cluster)".to_string(),
213                                cluster_name: expected_name.to_string(),
214                            },
215                            path.to_path_buf(),
216                            s.to_string(),
217                            byte_offset,
218                        ));
219                    }
220                }
221            }
222            other => {
223                errors.push(ValidationError::with_file_sql_and_offset(
224                    ValidationErrorKind::InvalidClusterStatement {
225                        statement_type: statement_type_name(&other).to_string(),
226                        cluster_name: expected_name.to_string(),
227                    },
228                    path.to_path_buf(),
229                    other.to_string(),
230                    byte_offset,
231                ));
232            }
233        }
234    }
235
236    // Validate exactly one CREATE CLUSTER (file-level errors)
237    if create_stmts.is_empty() {
238        errors.push(ValidationError::with_file(
239            ValidationErrorKind::ClusterMissingCreateStatement {
240                cluster_name: expected_name.to_string(),
241            },
242            path.to_path_buf(),
243        ));
244    } else if create_stmts.len() > 1 {
245        // Point to the second CREATE CLUSTER
246        errors.push(ValidationError::with_file_and_offset(
247            ValidationErrorKind::ClusterMultipleCreateStatements {
248                cluster_name: expected_name.to_string(),
249            },
250            path.to_path_buf(),
251            create_stmts[1].1,
252        ));
253    }
254
255    if !errors.is_empty() {
256        return Err(errors);
257    }
258
259    let (create_stmt, create_offset) = create_stmts.into_iter().next().unwrap();
260
261    // Validate cluster name matches filename
262    let declared_name = create_stmt.name.to_string();
263    if declared_name.to_lowercase() != expected_name.to_lowercase() {
264        return Err(vec![ValidationError::with_file_and_offset(
265            ValidationErrorKind::ClusterNameMismatch {
266                declared: declared_name,
267                expected: expected_name.to_string(),
268            },
269            path.to_path_buf(),
270            create_offset,
271        )]);
272    }
273
274    Ok(ClusterDefinition {
275        name: expected_name.to_string(),
276        create_stmt,
277        grants,
278        comments,
279    })
280}
281
282/// Apply a suffix to all cluster name references within a `ClusterDefinition`.
283///
284/// Rewrites the definition name, the CREATE statement name, GRANT target names,
285/// and COMMENT target names.
286fn apply_cluster_suffix(def: &mut ClusterDefinition, suffix: &str) {
287    // Rewrite definition name first, then reference it for the CREATE statement
288    def.name = format!("{}{}", def.name, suffix);
289    def.create_stmt.name = Ident::new(&def.name).expect("valid cluster identifier");
290
291    // Rewrite GRANT target cluster names
292    for grant in &mut def.grants {
293        if let GrantTargetSpecification::Object {
294            object_type: ObjectType::Cluster,
295            object_spec_inner: GrantTargetSpecificationInner::Objects { names },
296        } = &mut grant.target
297        {
298            for name in names {
299                if let UnresolvedObjectName::Cluster(ident) = name {
300                    *ident = suffixed_ident(ident, suffix);
301                }
302            }
303        }
304    }
305
306    // Rewrite COMMENT target cluster names
307    for comment in &mut def.comments {
308        if let CommentObjectType::Cluster { name } = &mut comment.object {
309            if let RawClusterName::Unresolved(ident) = name {
310                *ident = suffixed_ident(ident, suffix);
311            }
312        }
313    }
314}
315
316/// Append a suffix to an `Ident`, returning a new `Ident`.
317fn suffixed_ident(ident: &Ident, suffix: &str) -> Ident {
318    Ident::new(&format!("{}{}", ident, suffix)).expect("valid cluster identifier")
319}
320
321/// Extract the desired SIZE from a CreateClusterStatement's options.
322pub(crate) fn extract_size(create_stmt: &CreateClusterStatement<Raw>) -> Option<String> {
323    for opt in &create_stmt.options {
324        if opt.name == ClusterOptionName::Size {
325            if let Some(WithOptionValue::Value(ref v)) = opt.value {
326                return Some(v.to_string().trim_matches('\'').to_string());
327            }
328        }
329    }
330    None
331}
332
333/// Extract the desired REPLICATION FACTOR from a CreateClusterStatement's options.
334pub(crate) fn extract_replication_factor(create_stmt: &CreateClusterStatement<Raw>) -> Option<u32> {
335    for opt in &create_stmt.options {
336        if opt.name == ClusterOptionName::ReplicationFactor {
337            if let Some(WithOptionValue::Value(ref v)) = opt.value {
338                return v.to_string().parse::<u32>().ok();
339            }
340        }
341    }
342    None
343}
344
345#[cfg(test)]
346mod tests {
347    use super::*;
348    use std::fs;
349    use tempfile::TempDir;
350
351    fn create_test_dir() -> TempDir {
352        TempDir::new().unwrap()
353    }
354
355    #[mz_ore::test]
356    fn test_load_clusters_no_directory() {
357        let dir = create_test_dir();
358        let result = load_clusters(dir.path(), "default", None, &BTreeMap::new()).unwrap();
359        assert!(
360            result.is_empty(),
361            "should return empty vec when clusters/ doesn't exist"
362        );
363    }
364
365    #[mz_ore::test]
366    fn test_load_clusters_basic() {
367        let dir = create_test_dir();
368        let clusters_dir = dir.path().join("clusters");
369        fs::create_dir(&clusters_dir).unwrap();
370
371        fs::write(
372            clusters_dir.join("analytics.sql"),
373            "CREATE CLUSTER analytics (SIZE = '100cc', REPLICATION FACTOR = 1);\n\
374             GRANT USAGE ON CLUSTER analytics TO analyst_role;\n\
375             COMMENT ON CLUSTER analytics IS 'Analytics workloads';",
376        )
377        .unwrap();
378
379        let result = load_clusters(dir.path(), "default", None, &BTreeMap::new()).unwrap();
380        assert_eq!(result.len(), 1);
381        assert_eq!(result[0].name, "analytics");
382        assert_eq!(result[0].grants.len(), 1);
383        assert_eq!(result[0].comments.len(), 1);
384
385        // Verify extracted options
386        assert_eq!(
387            extract_size(&result[0].create_stmt),
388            Some("100cc".to_string())
389        );
390        assert_eq!(extract_replication_factor(&result[0].create_stmt), Some(1));
391    }
392
393    #[mz_ore::test]
394    fn test_load_clusters_create_only() {
395        let dir = create_test_dir();
396        let clusters_dir = dir.path().join("clusters");
397        fs::create_dir(&clusters_dir).unwrap();
398
399        fs::write(
400            clusters_dir.join("quickstart.sql"),
401            "CREATE CLUSTER quickstart (SIZE = '25cc');",
402        )
403        .unwrap();
404
405        let result = load_clusters(dir.path(), "default", None, &BTreeMap::new()).unwrap();
406        assert_eq!(result.len(), 1);
407        assert_eq!(result[0].name, "quickstart");
408        assert!(result[0].grants.is_empty());
409        assert!(result[0].comments.is_empty());
410    }
411
412    #[mz_ore::test]
413    fn test_load_clusters_name_mismatch() {
414        let dir = create_test_dir();
415        let clusters_dir = dir.path().join("clusters");
416        fs::create_dir(&clusters_dir).unwrap();
417
418        fs::write(
419            clusters_dir.join("analytics.sql"),
420            "CREATE CLUSTER wrong_name (SIZE = '100cc');",
421        )
422        .unwrap();
423
424        let result = load_clusters(dir.path(), "default", None, &BTreeMap::new());
425        assert!(
426            result.is_err(),
427            "should error when cluster name doesn't match filename"
428        );
429    }
430
431    #[mz_ore::test]
432    fn test_load_clusters_missing_create() {
433        let dir = create_test_dir();
434        let clusters_dir = dir.path().join("clusters");
435        fs::create_dir(&clusters_dir).unwrap();
436
437        fs::write(
438            clusters_dir.join("analytics.sql"),
439            "GRANT USAGE ON CLUSTER analytics TO analyst_role;",
440        )
441        .unwrap();
442
443        let result = load_clusters(dir.path(), "default", None, &BTreeMap::new());
444        assert!(
445            result.is_err(),
446            "should error when no CREATE CLUSTER statement"
447        );
448    }
449
450    #[mz_ore::test]
451    fn test_load_clusters_unsupported_statement() {
452        let dir = create_test_dir();
453        let clusters_dir = dir.path().join("clusters");
454        fs::create_dir(&clusters_dir).unwrap();
455
456        fs::write(
457            clusters_dir.join("analytics.sql"),
458            "CREATE CLUSTER analytics (SIZE = '100cc');\n\
459             CREATE TABLE foo (id INT);",
460        )
461        .unwrap();
462
463        let result = load_clusters(dir.path(), "default", None, &BTreeMap::new());
464        assert!(
465            result.is_err(),
466            "should error on unsupported statement type"
467        );
468    }
469
470    #[mz_ore::test]
471    fn test_load_clusters_grant_target_mismatch() {
472        let dir = create_test_dir();
473        let clusters_dir = dir.path().join("clusters");
474        fs::create_dir(&clusters_dir).unwrap();
475
476        fs::write(
477            clusters_dir.join("analytics.sql"),
478            "CREATE CLUSTER analytics (SIZE = '100cc');\n\
479             GRANT USAGE ON CLUSTER other_cluster TO analyst_role;",
480        )
481        .unwrap();
482
483        let result = load_clusters(dir.path(), "default", None, &BTreeMap::new());
484        assert!(
485            result.is_err(),
486            "should error when grant targets wrong cluster"
487        );
488    }
489
490    #[mz_ore::test]
491    fn test_load_clusters_comment_target_mismatch() {
492        let dir = create_test_dir();
493        let clusters_dir = dir.path().join("clusters");
494        fs::create_dir(&clusters_dir).unwrap();
495
496        fs::write(
497            clusters_dir.join("analytics.sql"),
498            "CREATE CLUSTER analytics (SIZE = '100cc');\n\
499             COMMENT ON CLUSTER other_cluster IS 'wrong target';",
500        )
501        .unwrap();
502
503        let result = load_clusters(dir.path(), "default", None, &BTreeMap::new());
504        assert!(
505            result.is_err(),
506            "should error when comment targets wrong cluster"
507        );
508    }
509
510    #[mz_ore::test]
511    fn test_load_clusters_multiple_files() {
512        let dir = create_test_dir();
513        let clusters_dir = dir.path().join("clusters");
514        fs::create_dir(&clusters_dir).unwrap();
515
516        fs::write(
517            clusters_dir.join("analytics.sql"),
518            "CREATE CLUSTER analytics (SIZE = '100cc');",
519        )
520        .unwrap();
521
522        fs::write(
523            clusters_dir.join("quickstart.sql"),
524            "CREATE CLUSTER quickstart (SIZE = '25cc', REPLICATION FACTOR = 2);",
525        )
526        .unwrap();
527
528        let result = load_clusters(dir.path(), "default", None, &BTreeMap::new()).unwrap();
529        assert_eq!(result.len(), 2);
530        // Sorted by filename
531        assert_eq!(result[0].name, "analytics");
532        assert_eq!(result[1].name, "quickstart");
533    }
534
535    #[mz_ore::test]
536    fn test_load_clusters_multi_variant_valid() {
537        let dir = create_test_dir();
538        let clusters_dir = dir.path().join("clusters");
539        fs::create_dir(&clusters_dir).unwrap();
540
541        fs::write(
542            clusters_dir.join("analytics.sql"),
543            "CREATE CLUSTER analytics (SIZE = '100cc');",
544        )
545        .unwrap();
546        fs::write(
547            clusters_dir.join("analytics#staging.sql"),
548            "CREATE CLUSTER analytics (SIZE = '25cc');",
549        )
550        .unwrap();
551
552        // With staging profile, should pick staging variant
553        let result = load_clusters(dir.path(), "staging", None, &BTreeMap::new()).unwrap();
554        assert_eq!(result.len(), 1);
555        assert_eq!(result[0].name, "analytics");
556        assert_eq!(
557            extract_size(&result[0].create_stmt),
558            Some("25cc".to_string())
559        );
560    }
561
562    #[mz_ore::test]
563    fn test_load_clusters_multi_variant_fallback_default() {
564        let dir = create_test_dir();
565        let clusters_dir = dir.path().join("clusters");
566        fs::create_dir(&clusters_dir).unwrap();
567
568        fs::write(
569            clusters_dir.join("analytics.sql"),
570            "CREATE CLUSTER analytics (SIZE = '100cc');",
571        )
572        .unwrap();
573        fs::write(
574            clusters_dir.join("analytics#staging.sql"),
575            "CREATE CLUSTER analytics (SIZE = '25cc');",
576        )
577        .unwrap();
578
579        // With prod profile (no match), should fall back to default
580        let result = load_clusters(dir.path(), "prod", None, &BTreeMap::new()).unwrap();
581        assert_eq!(result.len(), 1);
582        assert_eq!(result[0].name, "analytics");
583        assert_eq!(
584            extract_size(&result[0].create_stmt),
585            Some("100cc".to_string())
586        );
587    }
588
589    #[mz_ore::test]
590    fn test_load_clusters_invalid_variant_errors() {
591        let dir = create_test_dir();
592        let clusters_dir = dir.path().join("clusters");
593        fs::create_dir(&clusters_dir).unwrap();
594
595        fs::write(
596            clusters_dir.join("analytics.sql"),
597            "CREATE CLUSTER analytics (SIZE = '100cc');",
598        )
599        .unwrap();
600        // Invalid staging variant: name mismatch
601        fs::write(
602            clusters_dir.join("analytics#staging.sql"),
603            "CREATE CLUSTER wrong_name (SIZE = '25cc');",
604        )
605        .unwrap();
606
607        let result = load_clusters(dir.path(), "default", None, &BTreeMap::new());
608        assert!(
609            result.is_err(),
610            "invalid variant should error even when not active profile"
611        );
612    }
613}