1use crate::project::error::{
17 LoadError, ProjectError, ValidationError, ValidationErrorKind, ValidationErrors,
18};
19use crate::project::syntax::parser::{
20 LocatedStatement, parse_statements_with_context, statement_type_name,
21};
22use crate::project::syntax::profile_files::collect_all_sql_files;
23use mz_sql_parser::ast::{
24 ClusterOptionName, CommentObjectType, CommentStatement, CreateClusterStatement,
25 GrantPrivilegesStatement, GrantTargetSpecification, GrantTargetSpecificationInner, Ident,
26 ObjectType, Raw, RawClusterName, Statement, UnresolvedObjectName, WithOptionValue,
27};
28use std::collections::BTreeMap;
29use std::path::Path;
30
31pub(crate) struct ClusterDefinition {
33 pub name: String,
35 pub create_stmt: CreateClusterStatement<Raw>,
37 pub grants: Vec<GrantPrivilegesStatement<Raw>>,
39 pub comments: Vec<CommentStatement<Raw>>,
41}
42
43pub(crate) fn load_clusters(
49 root: &Path,
50 profile: &str,
51 profile_suffix: Option<&str>,
52 variables: &BTreeMap<String, String>,
53) -> Result<Vec<ClusterDefinition>, ProjectError> {
54 let clusters_dir = root.join("clusters");
55
56 if !clusters_dir.exists() {
57 return Ok(vec![]);
58 }
59
60 if !clusters_dir.is_dir() {
61 return Err(LoadError::RootNotDirectory { path: clusters_dir }.into());
62 }
63
64 let all_files = collect_all_sql_files(&clusters_dir)?;
65
66 let mut definitions = Vec::new();
67 let mut errors = Vec::new();
68
69 for object_files in all_files {
70 let expected_name = &object_files.name;
71
72 let mut all_variant_paths = Vec::new();
74 if let Some(ref default_path) = object_files.default {
75 all_variant_paths.push((default_path.clone(), None));
76 }
77 for (prof, override_path) in &object_files.overrides {
78 all_variant_paths.push((override_path.clone(), Some(prof.as_str())));
79 }
80
81 for (path, _) in &all_variant_paths {
83 let sql = std::fs::read_to_string(path).map_err(|e| LoadError::FileReadFailed {
84 path: path.clone(),
85 source: e,
86 })?;
87 let located = parse_statements_with_context(&sql, path.clone(), variables, true)?;
88
89 if let Err(mut errs) = classify_cluster_statements(expected_name, path, located) {
90 errors.append(&mut errs);
91 }
92 }
93
94 let active_path = object_files
96 .overrides
97 .get(profile)
98 .or(object_files.default.as_ref());
99
100 let active_path = match active_path {
101 Some(p) => p.clone(),
102 None => continue, };
104
105 let sql = std::fs::read_to_string(&active_path).map_err(|e| LoadError::FileReadFailed {
106 path: active_path.clone(),
107 source: e,
108 })?;
109 let located = parse_statements_with_context(&sql, active_path.clone(), variables, true)?;
110
111 match classify_cluster_statements(expected_name, &active_path, located) {
112 Ok(def) => definitions.push(def),
113 Err(mut errs) => errors.append(&mut errs),
114 }
115 }
116
117 if !errors.is_empty() {
118 return Err(ValidationErrors::new(errors).into());
119 }
120
121 if let Some(suffix) = profile_suffix {
123 for def in &mut definitions {
124 apply_cluster_suffix(def, suffix);
125 }
126 }
127
128 Ok(definitions)
129}
130
131fn classify_cluster_statements(
133 expected_name: &str,
134 path: &Path,
135 located_statements: Vec<LocatedStatement>,
136) -> Result<ClusterDefinition, Vec<ValidationError>> {
137 let mut create_stmts: Vec<(CreateClusterStatement<Raw>, usize)> = Vec::new();
138 let mut grants: Vec<GrantPrivilegesStatement<Raw>> = Vec::new();
139 let mut comments: Vec<CommentStatement<Raw>> = Vec::new();
140 let mut errors = Vec::new();
141
142 for LocatedStatement {
143 ast: stmt,
144 byte_offset,
145 } in located_statements
146 {
147 match stmt {
148 Statement::CreateCluster(s) => {
149 create_stmts.push((s, byte_offset));
150 }
151 Statement::GrantPrivileges(s) => {
152 match &s.target {
154 GrantTargetSpecification::Object {
155 object_type: ObjectType::Cluster,
156 object_spec_inner: GrantTargetSpecificationInner::Objects { names },
157 } => {
158 for name in names {
160 let target_name = name.to_string();
161 if target_name.to_lowercase() != expected_name.to_lowercase() {
162 errors.push(ValidationError::with_file_sql_and_offset(
163 ValidationErrorKind::ClusterGrantTargetMismatch {
164 target: target_name,
165 cluster_name: expected_name.to_string(),
166 },
167 path.to_path_buf(),
168 s.to_string(),
169 byte_offset,
170 ));
171 }
172 }
173 grants.push(s);
174 }
175 _ => {
176 errors.push(ValidationError::with_file_sql_and_offset(
177 ValidationErrorKind::InvalidClusterStatement {
178 statement_type: "GRANT (not targeting a cluster)".to_string(),
179 cluster_name: expected_name.to_string(),
180 },
181 path.to_path_buf(),
182 s.to_string(),
183 byte_offset,
184 ));
185 }
186 }
187 }
188 Statement::Comment(s) => {
189 match &s.object {
191 CommentObjectType::Cluster { name } => {
192 let target_name = match name {
193 RawClusterName::Unresolved(ident) => ident.to_string(),
194 RawClusterName::Resolved(id) => id.clone(),
195 };
196 if target_name.to_lowercase() != expected_name.to_lowercase() {
197 errors.push(ValidationError::with_file_sql_and_offset(
198 ValidationErrorKind::ClusterCommentTargetMismatch {
199 target: target_name,
200 cluster_name: expected_name.to_string(),
201 },
202 path.to_path_buf(),
203 s.to_string(),
204 byte_offset,
205 ));
206 }
207 comments.push(s);
208 }
209 _ => {
210 errors.push(ValidationError::with_file_sql_and_offset(
211 ValidationErrorKind::InvalidClusterStatement {
212 statement_type: "COMMENT (not targeting a cluster)".to_string(),
213 cluster_name: expected_name.to_string(),
214 },
215 path.to_path_buf(),
216 s.to_string(),
217 byte_offset,
218 ));
219 }
220 }
221 }
222 other => {
223 errors.push(ValidationError::with_file_sql_and_offset(
224 ValidationErrorKind::InvalidClusterStatement {
225 statement_type: statement_type_name(&other).to_string(),
226 cluster_name: expected_name.to_string(),
227 },
228 path.to_path_buf(),
229 other.to_string(),
230 byte_offset,
231 ));
232 }
233 }
234 }
235
236 if create_stmts.is_empty() {
238 errors.push(ValidationError::with_file(
239 ValidationErrorKind::ClusterMissingCreateStatement {
240 cluster_name: expected_name.to_string(),
241 },
242 path.to_path_buf(),
243 ));
244 } else if create_stmts.len() > 1 {
245 errors.push(ValidationError::with_file_and_offset(
247 ValidationErrorKind::ClusterMultipleCreateStatements {
248 cluster_name: expected_name.to_string(),
249 },
250 path.to_path_buf(),
251 create_stmts[1].1,
252 ));
253 }
254
255 if !errors.is_empty() {
256 return Err(errors);
257 }
258
259 let (create_stmt, create_offset) = create_stmts.into_iter().next().unwrap();
260
261 let declared_name = create_stmt.name.to_string();
263 if declared_name.to_lowercase() != expected_name.to_lowercase() {
264 return Err(vec![ValidationError::with_file_and_offset(
265 ValidationErrorKind::ClusterNameMismatch {
266 declared: declared_name,
267 expected: expected_name.to_string(),
268 },
269 path.to_path_buf(),
270 create_offset,
271 )]);
272 }
273
274 Ok(ClusterDefinition {
275 name: expected_name.to_string(),
276 create_stmt,
277 grants,
278 comments,
279 })
280}
281
282fn apply_cluster_suffix(def: &mut ClusterDefinition, suffix: &str) {
287 def.name = format!("{}{}", def.name, suffix);
289 def.create_stmt.name = Ident::new(&def.name).expect("valid cluster identifier");
290
291 for grant in &mut def.grants {
293 if let GrantTargetSpecification::Object {
294 object_type: ObjectType::Cluster,
295 object_spec_inner: GrantTargetSpecificationInner::Objects { names },
296 } = &mut grant.target
297 {
298 for name in names {
299 if let UnresolvedObjectName::Cluster(ident) = name {
300 *ident = suffixed_ident(ident, suffix);
301 }
302 }
303 }
304 }
305
306 for comment in &mut def.comments {
308 if let CommentObjectType::Cluster { name } = &mut comment.object {
309 if let RawClusterName::Unresolved(ident) = name {
310 *ident = suffixed_ident(ident, suffix);
311 }
312 }
313 }
314}
315
316fn suffixed_ident(ident: &Ident, suffix: &str) -> Ident {
318 Ident::new(&format!("{}{}", ident, suffix)).expect("valid cluster identifier")
319}
320
321pub(crate) fn extract_size(create_stmt: &CreateClusterStatement<Raw>) -> Option<String> {
323 for opt in &create_stmt.options {
324 if opt.name == ClusterOptionName::Size {
325 if let Some(WithOptionValue::Value(ref v)) = opt.value {
326 return Some(v.to_string().trim_matches('\'').to_string());
327 }
328 }
329 }
330 None
331}
332
333pub(crate) fn extract_replication_factor(create_stmt: &CreateClusterStatement<Raw>) -> Option<u32> {
335 for opt in &create_stmt.options {
336 if opt.name == ClusterOptionName::ReplicationFactor {
337 if let Some(WithOptionValue::Value(ref v)) = opt.value {
338 return v.to_string().parse::<u32>().ok();
339 }
340 }
341 }
342 None
343}
344
345#[cfg(test)]
346mod tests {
347 use super::*;
348 use std::fs;
349 use tempfile::TempDir;
350
351 fn create_test_dir() -> TempDir {
352 TempDir::new().unwrap()
353 }
354
355 #[mz_ore::test]
356 fn test_load_clusters_no_directory() {
357 let dir = create_test_dir();
358 let result = load_clusters(dir.path(), "default", None, &BTreeMap::new()).unwrap();
359 assert!(
360 result.is_empty(),
361 "should return empty vec when clusters/ doesn't exist"
362 );
363 }
364
365 #[mz_ore::test]
366 fn test_load_clusters_basic() {
367 let dir = create_test_dir();
368 let clusters_dir = dir.path().join("clusters");
369 fs::create_dir(&clusters_dir).unwrap();
370
371 fs::write(
372 clusters_dir.join("analytics.sql"),
373 "CREATE CLUSTER analytics (SIZE = '100cc', REPLICATION FACTOR = 1);\n\
374 GRANT USAGE ON CLUSTER analytics TO analyst_role;\n\
375 COMMENT ON CLUSTER analytics IS 'Analytics workloads';",
376 )
377 .unwrap();
378
379 let result = load_clusters(dir.path(), "default", None, &BTreeMap::new()).unwrap();
380 assert_eq!(result.len(), 1);
381 assert_eq!(result[0].name, "analytics");
382 assert_eq!(result[0].grants.len(), 1);
383 assert_eq!(result[0].comments.len(), 1);
384
385 assert_eq!(
387 extract_size(&result[0].create_stmt),
388 Some("100cc".to_string())
389 );
390 assert_eq!(extract_replication_factor(&result[0].create_stmt), Some(1));
391 }
392
393 #[mz_ore::test]
394 fn test_load_clusters_create_only() {
395 let dir = create_test_dir();
396 let clusters_dir = dir.path().join("clusters");
397 fs::create_dir(&clusters_dir).unwrap();
398
399 fs::write(
400 clusters_dir.join("quickstart.sql"),
401 "CREATE CLUSTER quickstart (SIZE = '25cc');",
402 )
403 .unwrap();
404
405 let result = load_clusters(dir.path(), "default", None, &BTreeMap::new()).unwrap();
406 assert_eq!(result.len(), 1);
407 assert_eq!(result[0].name, "quickstart");
408 assert!(result[0].grants.is_empty());
409 assert!(result[0].comments.is_empty());
410 }
411
412 #[mz_ore::test]
413 fn test_load_clusters_name_mismatch() {
414 let dir = create_test_dir();
415 let clusters_dir = dir.path().join("clusters");
416 fs::create_dir(&clusters_dir).unwrap();
417
418 fs::write(
419 clusters_dir.join("analytics.sql"),
420 "CREATE CLUSTER wrong_name (SIZE = '100cc');",
421 )
422 .unwrap();
423
424 let result = load_clusters(dir.path(), "default", None, &BTreeMap::new());
425 assert!(
426 result.is_err(),
427 "should error when cluster name doesn't match filename"
428 );
429 }
430
431 #[mz_ore::test]
432 fn test_load_clusters_missing_create() {
433 let dir = create_test_dir();
434 let clusters_dir = dir.path().join("clusters");
435 fs::create_dir(&clusters_dir).unwrap();
436
437 fs::write(
438 clusters_dir.join("analytics.sql"),
439 "GRANT USAGE ON CLUSTER analytics TO analyst_role;",
440 )
441 .unwrap();
442
443 let result = load_clusters(dir.path(), "default", None, &BTreeMap::new());
444 assert!(
445 result.is_err(),
446 "should error when no CREATE CLUSTER statement"
447 );
448 }
449
450 #[mz_ore::test]
451 fn test_load_clusters_unsupported_statement() {
452 let dir = create_test_dir();
453 let clusters_dir = dir.path().join("clusters");
454 fs::create_dir(&clusters_dir).unwrap();
455
456 fs::write(
457 clusters_dir.join("analytics.sql"),
458 "CREATE CLUSTER analytics (SIZE = '100cc');\n\
459 CREATE TABLE foo (id INT);",
460 )
461 .unwrap();
462
463 let result = load_clusters(dir.path(), "default", None, &BTreeMap::new());
464 assert!(
465 result.is_err(),
466 "should error on unsupported statement type"
467 );
468 }
469
470 #[mz_ore::test]
471 fn test_load_clusters_grant_target_mismatch() {
472 let dir = create_test_dir();
473 let clusters_dir = dir.path().join("clusters");
474 fs::create_dir(&clusters_dir).unwrap();
475
476 fs::write(
477 clusters_dir.join("analytics.sql"),
478 "CREATE CLUSTER analytics (SIZE = '100cc');\n\
479 GRANT USAGE ON CLUSTER other_cluster TO analyst_role;",
480 )
481 .unwrap();
482
483 let result = load_clusters(dir.path(), "default", None, &BTreeMap::new());
484 assert!(
485 result.is_err(),
486 "should error when grant targets wrong cluster"
487 );
488 }
489
490 #[mz_ore::test]
491 fn test_load_clusters_comment_target_mismatch() {
492 let dir = create_test_dir();
493 let clusters_dir = dir.path().join("clusters");
494 fs::create_dir(&clusters_dir).unwrap();
495
496 fs::write(
497 clusters_dir.join("analytics.sql"),
498 "CREATE CLUSTER analytics (SIZE = '100cc');\n\
499 COMMENT ON CLUSTER other_cluster IS 'wrong target';",
500 )
501 .unwrap();
502
503 let result = load_clusters(dir.path(), "default", None, &BTreeMap::new());
504 assert!(
505 result.is_err(),
506 "should error when comment targets wrong cluster"
507 );
508 }
509
510 #[mz_ore::test]
511 fn test_load_clusters_multiple_files() {
512 let dir = create_test_dir();
513 let clusters_dir = dir.path().join("clusters");
514 fs::create_dir(&clusters_dir).unwrap();
515
516 fs::write(
517 clusters_dir.join("analytics.sql"),
518 "CREATE CLUSTER analytics (SIZE = '100cc');",
519 )
520 .unwrap();
521
522 fs::write(
523 clusters_dir.join("quickstart.sql"),
524 "CREATE CLUSTER quickstart (SIZE = '25cc', REPLICATION FACTOR = 2);",
525 )
526 .unwrap();
527
528 let result = load_clusters(dir.path(), "default", None, &BTreeMap::new()).unwrap();
529 assert_eq!(result.len(), 2);
530 assert_eq!(result[0].name, "analytics");
532 assert_eq!(result[1].name, "quickstart");
533 }
534
535 #[mz_ore::test]
536 fn test_load_clusters_multi_variant_valid() {
537 let dir = create_test_dir();
538 let clusters_dir = dir.path().join("clusters");
539 fs::create_dir(&clusters_dir).unwrap();
540
541 fs::write(
542 clusters_dir.join("analytics.sql"),
543 "CREATE CLUSTER analytics (SIZE = '100cc');",
544 )
545 .unwrap();
546 fs::write(
547 clusters_dir.join("analytics#staging.sql"),
548 "CREATE CLUSTER analytics (SIZE = '25cc');",
549 )
550 .unwrap();
551
552 let result = load_clusters(dir.path(), "staging", None, &BTreeMap::new()).unwrap();
554 assert_eq!(result.len(), 1);
555 assert_eq!(result[0].name, "analytics");
556 assert_eq!(
557 extract_size(&result[0].create_stmt),
558 Some("25cc".to_string())
559 );
560 }
561
562 #[mz_ore::test]
563 fn test_load_clusters_multi_variant_fallback_default() {
564 let dir = create_test_dir();
565 let clusters_dir = dir.path().join("clusters");
566 fs::create_dir(&clusters_dir).unwrap();
567
568 fs::write(
569 clusters_dir.join("analytics.sql"),
570 "CREATE CLUSTER analytics (SIZE = '100cc');",
571 )
572 .unwrap();
573 fs::write(
574 clusters_dir.join("analytics#staging.sql"),
575 "CREATE CLUSTER analytics (SIZE = '25cc');",
576 )
577 .unwrap();
578
579 let result = load_clusters(dir.path(), "prod", None, &BTreeMap::new()).unwrap();
581 assert_eq!(result.len(), 1);
582 assert_eq!(result[0].name, "analytics");
583 assert_eq!(
584 extract_size(&result[0].create_stmt),
585 Some("100cc".to_string())
586 );
587 }
588
589 #[mz_ore::test]
590 fn test_load_clusters_invalid_variant_errors() {
591 let dir = create_test_dir();
592 let clusters_dir = dir.path().join("clusters");
593 fs::create_dir(&clusters_dir).unwrap();
594
595 fs::write(
596 clusters_dir.join("analytics.sql"),
597 "CREATE CLUSTER analytics (SIZE = '100cc');",
598 )
599 .unwrap();
600 fs::write(
602 clusters_dir.join("analytics#staging.sql"),
603 "CREATE CLUSTER wrong_name (SIZE = '25cc');",
604 )
605 .unwrap();
606
607 let result = load_clusters(dir.path(), "default", None, &BTreeMap::new());
608 assert!(
609 result.is_err(),
610 "invalid variant should error even when not active profile"
611 );
612 }
613}