1use super::ObjectRef;
16use crate::cli::CliError;
17use crate::cli::executor::{self, DeploymentExecutor};
18use crate::cli::{git, progress};
19use crate::client::DeploymentMode;
20use crate::client::{
21 Client, ClusterConfig, ClusterOptions, DeploymentKind, PendingStatement, ReplacementMvRecord,
22};
23use crate::config::Settings;
24use crate::log;
25use crate::project::SchemaQualifier;
26use crate::project::analysis::changeset::ChangeSet;
27use crate::project::analysis::deployment_snapshot::{self, DeploymentSnapshot};
28use crate::project::analysis::deps::extract_external_indexes;
29use crate::project::ast::Statement;
30use crate::project::ir::compiled::{DatabaseObject, FullyQualifiedName};
31use crate::project::ir::graph::Project;
32use crate::project::ir::object_id::ObjectId;
33use crate::project::resolve::normalize::{self, NormalizingVisitor};
34use crate::verbose;
35use mz_ore::option::OptionExt;
36use std::collections::BTreeSet;
37use std::fmt;
38use std::path::Path;
39use std::time::Instant;
40
41struct StageAnalysis<'a> {
46 objects: Vec<ObjectRef<'a>>,
47 sinks: Vec<ObjectRef<'a>>,
48 replacement_mvs: Vec<ObjectRef<'a>>,
49 schema_set: BTreeSet<SchemaQualifier>,
50 cluster_set: BTreeSet<String>,
51}
52
53struct PartitionedObjects<'a> {
57 objects: Vec<ObjectRef<'a>>,
58 sinks: Vec<ObjectRef<'a>>,
59 replacement_mvs: Vec<ObjectRef<'a>>,
60 table_count: usize,
61}
62
63#[derive(serde::Serialize)]
66struct StageResult {
67 deploy_id: String,
68 objects_deployed: usize,
69 #[serde(skip)]
70 duration: std::time::Duration,
71}
72
73impl fmt::Display for StageResult {
74 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75 write!(
76 f,
77 " \u{2713} Successfully deployed {} objects to '{}' staging environment ({:.1}s)",
78 self.objects_deployed,
79 self.deploy_id,
80 self.duration.as_secs_f64()
81 )
82 }
83}
84
85#[derive(serde::Serialize)]
86struct StagePlan {
87 deploy_id: String,
88 schemas: Vec<StagePlanSchema>,
89 clusters: Vec<StagePlanCluster>,
90 objects: Vec<StagePlanObject>,
91 sinks: Vec<StagePlanObject>,
92 replacement_mvs: Vec<StagePlanObject>,
93}
94
95#[derive(serde::Serialize)]
96struct StagePlanSchema {
97 database: String,
98 schema: String,
99 staging_schema: String,
100}
101
102#[derive(serde::Serialize)]
103struct StagePlanCluster {
104 production_cluster: String,
105 staging_cluster: String,
106}
107
108#[derive(serde::Serialize)]
109struct StagePlanObject {
110 database: String,
111 schema: String,
112 object: String,
113}
114
115impl fmt::Display for StagePlan {
116 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
117 writeln!(f, "Stage plan for '{}':", self.deploy_id)?;
118
119 if !self.schemas.is_empty() {
120 writeln!(f, "\nSchemas ({}):", self.schemas.len())?;
121 for s in &self.schemas {
122 writeln!(
123 f,
124 " {}.{} \u{2192} {}",
125 s.database, s.schema, s.staging_schema
126 )?;
127 }
128 }
129
130 if !self.clusters.is_empty() {
131 writeln!(f, "\nClusters ({}):", self.clusters.len())?;
132 for c in &self.clusters {
133 writeln!(
134 f,
135 " {} \u{2192} {}",
136 c.production_cluster, c.staging_cluster
137 )?;
138 }
139 }
140
141 if !self.objects.is_empty() {
142 writeln!(f, "\nObjects ({}):", self.objects.len())?;
143 for o in &self.objects {
144 writeln!(f, " {}.{}.{}", o.database, o.schema, o.object)?;
145 }
146 }
147
148 if !self.sinks.is_empty() {
149 writeln!(f, "\nSinks ({}):", self.sinks.len())?;
150 for s in &self.sinks {
151 writeln!(f, " {}.{}.{}", s.database, s.schema, s.object)?;
152 }
153 }
154
155 if !self.replacement_mvs.is_empty() {
156 writeln!(f, "\nReplacement MVs ({}):", self.replacement_mvs.len())?;
157 for m in &self.replacement_mvs {
158 writeln!(f, " {}.{}.{}", m.database, m.schema, m.object)?;
159 }
160 }
161
162 Ok(())
163 }
164}
165
166pub async fn run(
187 settings: &Settings,
188 stage_name: Option<&str>,
189 allow_dirty: bool,
190 no_rollback: bool,
191 dry_run: bool,
192) -> Result<(), CliError> {
193 let profile = settings.connection();
194 let directory = &settings.directory;
195 let start_time = Instant::now();
196
197 if !allow_dirty && git::is_dirty(directory) {
198 return Err(CliError::GitDirty);
199 }
200
201 let stage_name = stage_name
202 .owned()
203 .or_else(|| git::get_git_commit(directory).map(|sha| sha.chars().take(7).collect()))
204 .unwrap_or_else(executor::generate_random_env_name);
205
206 let planned_project = super::compile::run(settings, true).await?;
207 let staging_suffix = format!("_{}", stage_name);
208
209 let client = Client::connect_with_profile(profile.clone())
210 .await
211 .map_err(CliError::Connection)?;
212
213 crate::cli::commands::setup::verify(&client, settings.emulator()).await?;
214 let role =
215 crate::cli::commands::setup::validate_connection(&client, settings.emulator()).await?;
216 crate::cli::commands::setup::require_deployer(role)?;
217
218 let Some(analysis) = analyze_project_changes(&client, &planned_project, &stage_name).await?
219 else {
220 return Ok(());
221 };
222
223 validate_project_for_stage(
224 &client,
225 &planned_project,
226 directory,
227 &analysis.schema_set,
228 &analysis.cluster_set,
229 )
230 .await?;
231
232 if !dry_run {
233 record_stage_metadata(
234 &client,
235 directory,
236 &stage_name,
237 &staging_suffix,
238 &analysis.objects,
239 &analysis.sinks,
240 &analysis.replacement_mvs,
241 &planned_project.replacement_schemas,
242 )
243 .await?;
244 }
245
246 if dry_run {
247 let plan = StagePlan {
248 deploy_id: stage_name.to_string(),
249 schemas: analysis
250 .schema_set
251 .iter()
252 .map(|sq| StagePlanSchema {
253 database: sq.database.clone(),
254 schema: sq.schema.clone(),
255 staging_schema: format!("{}{}", sq.schema, staging_suffix),
256 })
257 .collect(),
258 clusters: analysis
259 .cluster_set
260 .iter()
261 .map(|c| StagePlanCluster {
262 production_cluster: c.clone(),
263 staging_cluster: format!("{}{}", c, staging_suffix),
264 })
265 .collect(),
266 objects: analysis
267 .objects
268 .iter()
269 .map(|(id, _)| StagePlanObject {
270 database: id.expect_database().to_string(),
271 schema: id.schema().to_string(),
272 object: id.object().to_string(),
273 })
274 .collect(),
275 sinks: analysis
276 .sinks
277 .iter()
278 .map(|(id, _)| StagePlanObject {
279 database: id.expect_database().to_string(),
280 schema: id.schema().to_string(),
281 object: id.object().to_string(),
282 })
283 .collect(),
284 replacement_mvs: analysis
285 .replacement_mvs
286 .iter()
287 .map(|(id, _)| StagePlanObject {
288 database: id.expect_database().to_string(),
289 schema: id.schema().to_string(),
290 object: id.object().to_string(),
291 })
292 .collect(),
293 };
294 log::output(&plan);
295 return Ok(());
296 }
297
298 let success_count = create_resources_with_rollback(
299 &client,
300 &stage_name,
301 &staging_suffix,
302 &analysis.schema_set,
303 &analysis.cluster_set,
304 &planned_project,
305 &analysis.objects,
306 &analysis.replacement_mvs,
307 no_rollback,
308 dry_run,
309 )
310 .await?;
311
312 let result = StageResult {
313 deploy_id: stage_name.to_string(),
314 objects_deployed: success_count,
315 duration: start_time.elapsed(),
316 };
317 log::output(&result);
318 log::print_deploy_id(&stage_name);
319 Ok(())
320}
321
322async fn analyze_project_changes<'a>(
327 client: &Client,
328 planned_project: &'a Project,
329 stage_name: &str,
330) -> Result<Option<StageAnalysis<'a>>, CliError> {
331 progress::stage_start("Analyzing project changes");
332 let analyze_start = Instant::now();
333
334 if client
335 .deployments()
336 .get_deployment_metadata(stage_name)
337 .await?
338 .is_some()
339 {
340 return Err(CliError::InvalidEnvironmentName {
341 name: format!("deployment '{}' already exists", stage_name),
342 });
343 }
344
345 let new_snapshot = deployment_snapshot::build_snapshot_from_planned(planned_project)?;
346 let production_snapshot = deployment_snapshot::load_from_database(client, None).await?;
347
348 let change_set = if production_snapshot.objects.is_empty() {
349 None
350 } else {
351 Some(ChangeSet::from_deployment_snapshot_comparison(
352 &production_snapshot,
353 &new_snapshot,
354 planned_project,
355 ))
356 };
357
358 if let Some(ref cs) = change_set {
381 validate_no_new_objects_in_existing_stable_schemas(cs, &production_snapshot)?;
382 }
383
384 let objects = select_stage_objects(planned_project, change_set.as_ref())?;
385 if objects.is_empty() && change_set.as_ref().is_some_and(ChangeSet::is_empty) {
386 progress::success("No changes detected compared to production, skipping deployment");
387 return Ok(None);
388 }
389
390 let replacement_object_ids = change_set
391 .as_ref()
392 .map(|cs| cs.changed_replacement_objects.clone())
393 .unwrap_or_default();
394 let partitioned = partition_objects(objects, &replacement_object_ids);
395 log_partition_summary(&partitioned);
396
397 let object_ids: BTreeSet<_> = partitioned
398 .objects
399 .iter()
400 .map(|(id, _)| id.clone())
401 .collect();
402 client
403 .validation()
404 .validate_table_dependencies(planned_project, &object_ids)
405 .await?;
406
407 let (schema_set, cluster_set) =
408 collect_stage_resources(&partitioned.objects, &partitioned.replacement_mvs);
409
410 let analyze_duration = analyze_start.elapsed();
411 progress::stage_success(
412 &format!(
413 "Ready to deploy {} view(s)/materialized view(s)",
414 partitioned.objects.len()
415 ),
416 analyze_duration,
417 );
418
419 Ok(Some(StageAnalysis {
420 objects: partitioned.objects,
421 sinks: partitioned.sinks,
422 replacement_mvs: partitioned.replacement_mvs,
423 schema_set,
424 cluster_set,
425 }))
426}
427
428fn select_stage_objects<'a>(
432 planned_project: &'a Project,
433 change_set: Option<&ChangeSet>,
434) -> Result<Vec<ObjectRef<'a>>, CliError> {
435 if let Some(cs) = change_set {
436 if cs.is_empty() {
437 return Ok(Vec::new());
438 }
439 verbose!("{}", cs);
440 Ok(planned_project.get_sorted_objects_filtered(&cs.objects_to_deploy)?)
441 } else {
442 verbose!("Full deployment: no production deployment found");
443 Ok(planned_project.get_sorted_objects()?)
444 }
445}
446
447fn partition_objects<'a>(
452 objects: Vec<ObjectRef<'a>>,
453 replacement_object_ids: &BTreeSet<ObjectId>,
454) -> PartitionedObjects<'a> {
455 let mut kept = Vec::new();
456 let mut sinks = Vec::new();
457 let mut replacement_mvs = Vec::new();
458 let mut table_count = 0;
459
460 for (object_id, typed_obj) in objects {
461 match &typed_obj.stmt {
462 Statement::CreateTable(_)
463 | Statement::CreateTableFromSource(_)
464 | Statement::CreateSource(_)
465 | Statement::CreateSecret(_)
466 | Statement::CreateConnection(_) => {
467 table_count += 1;
468 }
469 Statement::CreateSink(_) => sinks.push((object_id, typed_obj)),
470 Statement::CreateMaterializedView(_) if replacement_object_ids.contains(&object_id) => {
471 replacement_mvs.push((object_id, typed_obj));
472 }
473 _ => kept.push((object_id, typed_obj)),
474 }
475 }
476
477 PartitionedObjects {
478 objects: kept,
479 sinks,
480 replacement_mvs,
481 table_count,
482 }
483}
484
485fn log_partition_summary(partitioned: &PartitionedObjects<'_>) {
487 if partitioned.table_count > 0 {
488 verbose!(
489 "Skipped {} table(s)/source(s) - use 'mz-deploy apply' for those",
490 partitioned.table_count
491 );
492 }
493 if !partitioned.sinks.is_empty() {
494 verbose!(
495 "Found {} sink(s) - will be created during apply after swap",
496 partitioned.sinks.len()
497 );
498 }
499 if !partitioned.replacement_mvs.is_empty() {
500 verbose!(
501 "Found {} replacement MV(s) - will use CREATE REPLACEMENT protocol",
502 partitioned.replacement_mvs.len()
503 );
504 }
505}
506
507fn collect_stage_resources(
513 objects: &[ObjectRef<'_>],
514 replacement_mvs: &[ObjectRef<'_>],
515) -> (BTreeSet<SchemaQualifier>, BTreeSet<String>) {
516 let mut schema_set = BTreeSet::new();
517 let mut cluster_set = BTreeSet::new();
518
519 for (object_id, typed_obj) in objects.iter().chain(replacement_mvs.iter()) {
520 schema_set.insert(SchemaQualifier::new(
521 object_id.expect_database().to_string(),
522 object_id.schema().to_string(),
523 ));
524 cluster_set.extend(typed_obj.clusters());
525 }
526
527 (schema_set, cluster_set)
528}
529
530async fn validate_project_for_stage(
534 client: &Client,
535 planned_project: &Project,
536 directory: &Path,
537 schema_set: &BTreeSet<SchemaQualifier>,
538 cluster_set: &BTreeSet<String>,
539) -> Result<(), CliError> {
540 progress::stage_start("Validating project");
541 let validate_start = Instant::now();
542 client
543 .validation()
544 .validate_project(planned_project, directory)
545 .await?;
546 client
547 .validation()
548 .validate_cluster_isolation(planned_project)
549 .await?;
550 client
551 .validation()
552 .validate_privileges(planned_project)
553 .await?;
554 client
555 .validation()
556 .validate_schema_ownership(schema_set)
557 .await?;
558 client
559 .validation()
560 .validate_cluster_ownership(cluster_set)
561 .await?;
562 client
563 .validation()
564 .validate_sink_connections_exist(planned_project)
565 .await?;
566 let validate_duration = validate_start.elapsed();
567 progress::stage_success("All validations passed", validate_duration);
568 Ok(())
569}
570
571async fn record_stage_metadata(
576 client: &Client,
577 directory: &Path,
578 stage_name: &str,
579 staging_suffix: &str,
580 objects: &[ObjectRef<'_>],
581 sinks: &[ObjectRef<'_>],
582 replacement_mvs: &[ObjectRef<'_>],
583 replacement_schemas: &BTreeSet<SchemaQualifier>,
584) -> Result<(), CliError> {
585 progress::stage_start("Recording deployment metadata");
586 let metadata_start = Instant::now();
587 let metadata = executor::collect_deployment_metadata(client, directory).await;
588
589 let mut staging_snapshot = DeploymentSnapshot::default();
590
591 for (object_id, typed_obj) in objects {
592 let hash = deployment_snapshot::compute_typed_hash(typed_obj);
593 staging_snapshot.objects.insert(object_id.clone(), hash);
594 staging_snapshot.schemas.insert(
595 SchemaQualifier::new(
596 object_id.expect_database().to_string(),
597 object_id.schema().to_string(),
598 ),
599 DeploymentKind::Objects,
600 );
601 }
602
603 for (object_id, typed_obj) in sinks {
604 let hash = deployment_snapshot::compute_typed_hash(typed_obj);
605 staging_snapshot.objects.insert(object_id.clone(), hash);
606 staging_snapshot
607 .schemas
608 .entry(SchemaQualifier::new(
609 object_id.expect_database().to_string(),
610 object_id.schema().to_string(),
611 ))
612 .or_insert(DeploymentKind::Sinks);
613 }
614
615 for (object_id, typed_obj) in replacement_mvs {
616 let hash = deployment_snapshot::compute_typed_hash(typed_obj);
617 staging_snapshot.objects.insert(object_id.clone(), hash);
618 staging_snapshot.schemas.insert(
619 SchemaQualifier::new(
620 object_id.expect_database().to_string(),
621 object_id.schema().to_string(),
622 ),
623 DeploymentKind::Replacement,
624 );
625 }
626
627 for sq in replacement_schemas {
632 if staging_snapshot.schemas.contains_key(sq) {
633 staging_snapshot
634 .schemas
635 .insert(sq.clone(), DeploymentKind::Replacement);
636 }
637 }
638
639 deployment_snapshot::write_to_database(
640 client,
641 &staging_snapshot,
642 stage_name,
643 &metadata,
644 None,
645 DeploymentMode::Stage,
646 )
647 .await?;
648
649 if !sinks.is_empty() {
650 let pending_statements: Vec<PendingStatement> = sinks
651 .iter()
652 .enumerate()
653 .map(|(idx, (object_id, typed_obj))| {
654 let original_fqn: FullyQualifiedName = object_id.clone().into();
655 let mut visitor = NormalizingVisitor::fully_qualifying(&original_fqn);
656 let stmt = typed_obj
657 .stmt
658 .clone()
659 .normalize_name_with(&visitor, &original_fqn.to_item_name())
660 .normalize_dependencies_with(&mut visitor);
661 let hash = deployment_snapshot::compute_typed_hash(typed_obj);
662 #[allow(clippy::as_conversions)]
663 PendingStatement {
664 deploy_id: stage_name.to_string(),
665 sequence_num: idx as i32,
666 database: object_id.expect_database().to_string(),
667 schema: object_id.schema().to_string(),
668 object: object_id.object().to_string(),
669 object_hash: hash,
670 statement_sql: stmt.to_string(),
671 statement_kind: "sink".to_string(),
672 executed_at: None,
673 }
674 })
675 .collect();
676
677 client
678 .deployments()
679 .insert_pending_statements(&pending_statements)
680 .await?;
681 verbose!(
682 "Stored {} pending sink statement(s)",
683 pending_statements.len()
684 );
685 }
686
687 if !replacement_mvs.is_empty() {
688 let records: Vec<ReplacementMvRecord> = replacement_mvs
689 .iter()
690 .map(|(object_id, _)| ReplacementMvRecord {
691 deploy_id: stage_name.to_string(),
692 target_database: object_id.expect_database().to_string(),
693 target_schema: object_id.schema().to_string(),
694 target_name: object_id.object().to_string(),
695 replacement_schema: format!("{}{}", object_id.schema(), staging_suffix),
696 })
697 .collect();
698 client
699 .deployments()
700 .insert_replacement_mvs(&records)
701 .await?;
702 verbose!("Stored {} replacement MV record(s)", records.len());
703 }
704
705 let metadata_duration = metadata_start.elapsed();
706 progress::stage_success("Deployment metadata recorded", metadata_duration);
707 Ok(())
708}
709
710#[allow(clippy::too_many_arguments)]
716async fn create_resources_with_rollback<'a>(
717 client: &Client,
718 stage_name: &str,
719 staging_suffix: &str,
720 schema_set: &BTreeSet<SchemaQualifier>,
721 cluster_set: &BTreeSet<String>,
722 planned_project: &'a Project,
723 objects: &'a [(ObjectId, &'a DatabaseObject)],
724 replacement_mvs: &'a [(ObjectId, &'a DatabaseObject)],
725 no_rollback: bool,
726 dry_run: bool,
727) -> Result<usize, CliError> {
728 let executor = DeploymentExecutor::with_dry_run(client, dry_run);
729
730 let result = async {
731 create_databases_and_schemas(&executor, planned_project, schema_set, staging_suffix)
732 .await?;
733 create_staging_clusters(&executor, client, stage_name, cluster_set, staging_suffix).await?;
734 deploy_objects_to_staging(
735 &executor,
736 objects,
737 replacement_mvs,
738 planned_project,
739 cluster_set,
740 staging_suffix,
741 )
742 .await
743 }
744 .await;
745
746 match result {
747 Ok(count) => Ok(count),
748 Err(e) if dry_run || no_rollback => {
749 if !dry_run {
750 progress::error("Deployment failed (skipping rollback due to --no-rollback flag)");
751 }
752 Err(e)
753 }
754 Err(e) => {
755 progress::error("Deployment failed, rolling back...");
756 let (schemas, clusters) = rollback_staging_resources(client, stage_name).await;
757
758 if schemas > 0 || clusters > 0 {
759 progress::success(&format!(
760 "Rolled back: {} schema(s), {} cluster(s)",
761 schemas, clusters
762 ));
763 }
764
765 Err(e)
766 }
767 }
768}
769
770async fn create_databases_and_schemas(
775 executor: &DeploymentExecutor<'_>,
776 planned_project: &Project,
777 schema_set: &BTreeSet<SchemaQualifier>,
778 staging_suffix: &str,
779) -> Result<(), CliError> {
780 let schema_set_dbs: BTreeSet<&str> = schema_set.iter().map(|sq| sq.database.as_str()).collect();
783 for db in &planned_project.databases {
784 if !schema_set_dbs.contains(db.name.as_str()) {
785 executor.ensure_database(&db.name).await?;
786 verbose!(" Ensured database {} exists", db.name);
787 }
788 }
789
790 progress::stage_start("Creating staging schemas and applying setup statements");
792 let schema_start = Instant::now();
793 executor
794 .prepare_databases_and_schemas(planned_project, schema_set, Some(staging_suffix))
795 .await?;
796 let schema_duration = schema_start.elapsed();
797 progress::stage_success(
798 &format!(
799 "Created {} staging schema(s) with setup statements",
800 schema_set.len()
801 ),
802 schema_duration,
803 );
804
805 if !executor.is_dry_run() {
807 for sq in schema_set {
808 executor.ensure_schema(&sq.database, &sq.schema).await?;
809 verbose!(" Ensured schema {}.{} exists", sq.database, sq.schema);
810 }
811 }
812
813 Ok(())
814}
815
816async fn create_staging_clusters(
822 executor: &DeploymentExecutor<'_>,
823 client: &Client,
824 stage_name: &str,
825 cluster_set: &BTreeSet<String>,
826 staging_suffix: &str,
827) -> Result<(), CliError> {
828 let cluster_names: Vec<String> = cluster_set.iter().cloned().collect();
830 executor
831 .record_deployment_clusters(stage_name, &cluster_names)
832 .await?;
833
834 progress::stage_start("Creating staging clusters");
835 let cluster_start = Instant::now();
836 let mut created_clusters = 0;
837
838 let existing_staging_clusters = if !executor.is_dry_run() {
840 let staging_cluster_names: Vec<String> = cluster_set
841 .iter()
842 .map(|name| format!("{}{}", name, staging_suffix))
843 .collect();
844 client
845 .introspection()
846 .check_clusters_exist(&staging_cluster_names)
847 .await?
848 } else {
849 BTreeSet::new()
850 };
851
852 for prod_cluster in cluster_set {
853 let staging_cluster = format!("{}{}", prod_cluster, staging_suffix);
854
855 if executor.is_dry_run() {
856 let placeholder = ClusterConfig::Managed {
858 options: ClusterOptions {
859 size: String::new(),
860 replication_factor: 1,
861 },
862 grants: Vec::new(),
863 };
864 executor
865 .create_cluster(&staging_cluster, prod_cluster, &placeholder)
866 .await?;
867 created_clusters += 1;
868 continue;
869 }
870
871 if existing_staging_clusters.contains(&staging_cluster) {
873 verbose!(" Cluster '{}' already exists, skipping", staging_cluster);
874 continue;
875 }
876
877 let config = client
879 .introspection()
880 .get_cluster_config(prod_cluster)
881 .await?;
882
883 let config = match config {
884 Some(config) => config,
885 None => {
886 return Err(CliError::ClusterNotFound {
887 name: prod_cluster.clone(),
888 });
889 }
890 };
891
892 executor
893 .create_cluster(&staging_cluster, prod_cluster, &config)
894 .await?;
895 created_clusters += 1;
896
897 log_cluster_creation(&staging_cluster, prod_cluster, &config);
898 }
899
900 let cluster_duration = cluster_start.elapsed();
901 progress::stage_success(
902 &format!("Created {} cluster(s)", created_clusters),
903 cluster_duration,
904 );
905
906 Ok(())
907}
908
909fn log_cluster_creation(staging_cluster: &str, prod_cluster: &str, config: &ClusterConfig) {
911 match config {
912 ClusterConfig::Managed { options, grants } => {
913 verbose!(
914 " Created managed cluster '{}' (size: {}, replication_factor: {}, {} grant(s), cloned from '{}')",
915 staging_cluster,
916 options.size,
917 options.replication_factor,
918 grants.len(),
919 prod_cluster
920 );
921 }
922 ClusterConfig::Unmanaged { replicas, grants } => {
923 verbose!(
924 " Created unmanaged cluster '{}' with {} replica(s), {} grant(s) (cloned from '{}')",
925 staging_cluster,
926 replicas.len(),
927 grants.len(),
928 prod_cluster
929 );
930 for replica in replicas {
931 verbose!(
932 " - {} (size: {}{})",
933 replica.name,
934 replica.size,
935 replica
936 .availability_zone
937 .as_ref()
938 .map(|az| format!(", az: {}", az))
939 .unwrap_or_default()
940 );
941 }
942 }
943 }
944}
945
946async fn deploy_objects_to_staging<'a>(
953 executor: &DeploymentExecutor<'_>,
954 objects: &'a [(ObjectId, &'a DatabaseObject)],
955 replacement_mvs: &'a [(ObjectId, &'a DatabaseObject)],
956 planned_project: &'a Project,
957 cluster_set: &BTreeSet<String>,
958 staging_suffix: &str,
959) -> Result<usize, CliError> {
960 progress::stage_start("Deploying objects to staging");
961 let deploy_start = Instant::now();
962
963 let objects_to_deploy_set: BTreeSet<_> = objects
966 .iter()
967 .chain(replacement_mvs.iter())
968 .map(|(oid, _)| oid.clone())
969 .collect();
970
971 let mut external_indexes: Vec<_> = planned_project
973 .iter_objects()
974 .filter(|object| !objects_to_deploy_set.contains(&object.id))
975 .flat_map(extract_external_indexes)
976 .filter_map(|(cluster, index)| cluster_set.contains(&cluster.name).then_some(index))
977 .collect();
978
979 normalize::transform_cluster_names_for_staging(&mut external_indexes, staging_suffix);
981 for index in external_indexes {
982 verbose!("Creating external index {}", index);
983 executor.execute_sql(&index).await?;
984 }
985
986 let replacement_object_ids: BTreeSet<ObjectId> =
989 replacement_mvs.iter().map(|(oid, _)| oid.clone()).collect();
990
991 let mut success_count = 0;
992
993 for (idx, (object_id, typed_obj)) in objects.iter().enumerate() {
995 verbose!(
996 "Applying {}/{}: {}{} (to schema {}{})",
997 idx + 1,
998 objects.len(),
999 object_id.object(),
1000 staging_suffix,
1001 object_id.schema(),
1002 staging_suffix
1003 );
1004
1005 deploy_single_object(
1006 executor,
1007 object_id,
1008 typed_obj,
1009 staging_suffix,
1010 planned_project,
1011 &objects_to_deploy_set,
1012 &replacement_object_ids,
1013 |stmt| stmt,
1014 )
1015 .await?;
1016 success_count += 1;
1017 }
1018
1019 for (idx, (object_id, typed_obj)) in replacement_mvs.iter().enumerate() {
1021 verbose!(
1022 "Applying replacement MV {}/{}: {} FOR {}",
1023 idx + 1,
1024 replacement_mvs.len(),
1025 object_id.object(),
1026 object_id
1027 );
1028
1029 let production_target = object_id.to_unresolved_item_name();
1030 deploy_single_object(
1031 executor,
1032 object_id,
1033 typed_obj,
1034 staging_suffix,
1035 planned_project,
1036 &objects_to_deploy_set,
1037 &replacement_object_ids,
1038 |stmt| match stmt {
1039 Statement::CreateMaterializedView(mut mv) => {
1040 mv.replacement_for =
1041 Some(mz_sql_parser::ast::RawItemName::Name(production_target));
1042 Statement::CreateMaterializedView(mv)
1043 }
1044 other => other,
1045 },
1046 )
1047 .await?;
1048 success_count += 1;
1049 }
1050
1051 let deploy_duration = deploy_start.elapsed();
1052 progress::stage_success(
1053 &format!("Deployed {} view(s)/materialized view(s)", success_count),
1054 deploy_duration,
1055 );
1056
1057 Ok(success_count)
1058}
1059
1060async fn rollback_staging_resources(client: &Client, environment: &str) -> (usize, usize) {
1073 let staging_schemas = best_effort_fetch(
1074 client
1075 .introspection()
1076 .get_staging_schemas(environment)
1077 .await,
1078 "query staging schemas",
1079 );
1080 let staging_clusters = best_effort_fetch(
1081 client
1082 .introspection()
1083 .get_staging_clusters(environment)
1084 .await,
1085 "query staging clusters",
1086 );
1087
1088 let schema_count = staging_schemas.len();
1089 let cluster_count = staging_clusters.len();
1090
1091 if !staging_schemas.is_empty() {
1092 verbose!("Dropping staging schemas...");
1093 if let Err(e) = client
1094 .introspection()
1095 .drop_staging_schemas(&staging_schemas)
1096 .await
1097 {
1098 verbose!("Warning: Failed to drop some schemas: {}", e);
1099 } else {
1100 for sq in &staging_schemas {
1101 verbose!(" Dropped {}.{}", sq.database, sq.schema);
1102 }
1103 }
1104 }
1105
1106 if !staging_clusters.is_empty() {
1107 verbose!("Dropping staging clusters...");
1108 if let Err(e) = client
1109 .introspection()
1110 .drop_staging_clusters(&staging_clusters)
1111 .await
1112 {
1113 verbose!("Warning: Failed to drop some clusters: {}", e);
1114 } else {
1115 for cluster in &staging_clusters {
1116 verbose!(" Dropped {}", cluster);
1117 }
1118 }
1119 }
1120
1121 verbose!("Deleting deployment records...");
1122 best_effort_delete(
1123 client
1124 .deployments()
1125 .delete_deployment_clusters(environment)
1126 .await,
1127 "delete cluster records",
1128 );
1129 best_effort_delete(
1130 client
1131 .deployments()
1132 .delete_pending_statements(environment)
1133 .await,
1134 "delete pending statements",
1135 );
1136 best_effort_delete(
1137 client
1138 .deployments()
1139 .delete_replacement_mvs(environment)
1140 .await,
1141 "delete replacement MV records",
1142 );
1143 best_effort_delete(
1144 client.deployments().delete_deployment(environment).await,
1145 "delete deployment records",
1146 );
1147
1148 (schema_count, cluster_count)
1149}
1150
1151fn best_effort_fetch<T, E: fmt::Display>(result: Result<Vec<T>, E>, action: &str) -> Vec<T> {
1156 match result {
1157 Ok(values) => values,
1158 Err(e) => {
1159 verbose!("Warning: Failed to {}: {}", action, e);
1160 vec![]
1161 }
1162 }
1163}
1164
1165fn best_effort_delete<E: fmt::Display>(result: Result<(), E>, action: &str) {
1167 if let Err(e) = result {
1168 verbose!("Warning: Failed to {}: {}", action, e);
1169 }
1170}
1171
1172async fn deploy_single_object(
1183 executor: &DeploymentExecutor<'_>,
1184 object_id: &ObjectId,
1185 typed_obj: &DatabaseObject,
1186 staging_suffix: &str,
1187 planned_project: &Project,
1188 objects_to_deploy_set: &BTreeSet<ObjectId>,
1189 replacement_objects: &BTreeSet<ObjectId>,
1190 transform: impl FnOnce(Statement) -> Statement,
1191) -> Result<(), CliError> {
1192 let original_fqn: FullyQualifiedName = object_id.clone().into();
1193
1194 let mut visitor = NormalizingVisitor::staging(
1195 &original_fqn,
1196 staging_suffix.to_string(),
1197 &planned_project.external_dependencies,
1198 Some(objects_to_deploy_set),
1199 replacement_objects,
1200 );
1201
1202 let stmt = typed_obj
1203 .stmt
1204 .clone()
1205 .normalize_name_with(&visitor, &original_fqn.to_item_name())
1206 .normalize_dependencies_with(&mut visitor)
1207 .normalize_cluster_with(&visitor);
1208
1209 let stmt = transform(stmt);
1210 executor.execute_sql(&stmt).await?;
1211
1212 let mut indexes = typed_obj.indexes.clone();
1214 let mut grants = typed_obj.grants.clone();
1215 let mut comments = typed_obj.comments.clone();
1216
1217 visitor.normalize_index_references(&mut indexes);
1218 visitor.normalize_index_clusters(&mut indexes);
1219 visitor.normalize_grant_references(&mut grants);
1220 visitor.normalize_comment_references(&mut comments);
1221
1222 for index in &indexes {
1223 executor.execute_sql(index).await?;
1224 }
1225
1226 for grant in &grants {
1227 executor.execute_sql(grant).await?;
1228 }
1229
1230 for comment in &comments {
1231 executor.execute_sql(comment).await?;
1232 }
1233
1234 Ok(())
1235}
1236
1237fn validate_no_new_objects_in_existing_stable_schemas(
1240 change_set: &ChangeSet,
1241 production_snapshot: &DeploymentSnapshot,
1242) -> Result<(), CliError> {
1243 let blocked: Vec<_> = change_set
1244 .new_replacement_objects
1245 .iter()
1246 .filter(|obj| {
1247 !production_snapshot.objects.contains_key(obj)
1248 && production_snapshot
1249 .objects
1250 .keys()
1251 .any(|prod| prod.database() == obj.database() && prod.schema() == obj.schema())
1252 })
1253 .collect();
1254
1255 if blocked.is_empty() {
1256 return Ok(());
1257 }
1258
1259 let first = blocked[0];
1260 Err(CliError::NewObjectInExistingStableSchema {
1261 database: first.expect_database().to_string(),
1262 schema: first.schema().to_string(),
1263 objects: blocked.iter().map(|o| o.object().to_string()).collect(),
1264 })
1265}
1266
1267#[cfg(test)]
1268mod tests {
1269 use super::*;
1270 use crate::project::analysis::deployment_snapshot::build_snapshot_from_planned;
1271 use crate::project::ir::compiled;
1272 use crate::project::ir::object_id::ObjectId;
1273 use std::collections::{BTreeMap, BTreeSet};
1274
1275 fn make_typed_object(sqls: &[&str]) -> DatabaseObject {
1280 let mut stmt = None;
1281 let mut indexes = Vec::new();
1282
1283 for sql in sqls {
1284 let parsed = mz_sql_parser::parser::parse_statements(sql).unwrap();
1285 for p in parsed {
1286 match p.ast {
1287 mz_sql_parser::ast::Statement::CreateView(s) => {
1288 stmt = Some(Statement::CreateView(s));
1289 }
1290 mz_sql_parser::ast::Statement::CreateMaterializedView(s) => {
1291 stmt = Some(Statement::CreateMaterializedView(s));
1292 }
1293 mz_sql_parser::ast::Statement::CreateTable(s) => {
1294 stmt = Some(Statement::CreateTable(s));
1295 }
1296 mz_sql_parser::ast::Statement::CreateSource(s) => {
1297 stmt = Some(Statement::CreateSource(s));
1298 }
1299 mz_sql_parser::ast::Statement::CreateConnection(s) => {
1300 stmt = Some(Statement::CreateConnection(s));
1301 }
1302 mz_sql_parser::ast::Statement::CreateSecret(s) => {
1303 stmt = Some(Statement::CreateSecret(s));
1304 }
1305 mz_sql_parser::ast::Statement::CreateIndex(s) => {
1306 indexes.push(s);
1307 }
1308 other => panic!("Unexpected statement type: {:?}", other),
1309 }
1310 }
1311 }
1312
1313 DatabaseObject {
1314 path: std::path::PathBuf::from("test.sql"),
1315 stmt: stmt.expect("Expected at least one CREATE statement"),
1316 indexes,
1317 grants: vec![],
1318 comments: vec![],
1319 tests: vec![],
1320 }
1321 }
1322
1323 fn make_planned_project(objects: Vec<(&str, &str, &str, DatabaseObject)>) -> Project {
1325 let mut db_map: BTreeMap<String, BTreeMap<String, Vec<DatabaseObject>>> = BTreeMap::new();
1327
1328 for (database, schema, _name, typed_obj) in objects {
1329 db_map
1330 .entry(database.to_string())
1331 .or_default()
1332 .entry(schema.to_string())
1333 .or_default()
1334 .push(typed_obj);
1335 }
1336
1337 let databases: Vec<compiled::Database> = db_map
1338 .into_iter()
1339 .map(|(db_name, schemas)| compiled::Database {
1340 name: db_name,
1341 schemas: schemas
1342 .into_iter()
1343 .map(|(schema_name, objs)| compiled::Schema {
1344 name: schema_name,
1345 objects: objs,
1346 mod_statements: None,
1347 })
1348 .collect(),
1349 mod_statements: None,
1350 })
1351 .collect();
1352
1353 let typed_project = compiled::Project {
1354 databases,
1355 replacement_schemas: BTreeSet::new(),
1356 };
1357
1358 Project::from(typed_project)
1359 }
1360
1361 #[cfg_attr(miri, ignore)] #[mz_ore::test]
1363 fn test_full_deploy_view_not_indexed_mixed_types() {
1364 let view_obj = make_typed_object(&["CREATE VIEW my_view AS SELECT 1"]);
1365 let table_obj = make_typed_object(&["CREATE TABLE my_table (id INT)"]);
1366 let source_obj = make_typed_object(&[
1367 "CREATE SOURCE my_source IN CLUSTER source_cluster FROM LOAD GENERATOR COUNTER",
1368 ]);
1369 let conn_obj =
1370 make_typed_object(&["CREATE CONNECTION my_conn TO KAFKA (BROKER 'localhost:9092')"]);
1371 let secret_obj = make_typed_object(&["CREATE SECRET my_secret AS 'hunter2'"]);
1372
1373 let objects: Vec<ObjectRef> = vec![
1374 (
1375 ObjectId::new("db".into(), "public".into(), "my_view".into()),
1376 &view_obj,
1377 ),
1378 (
1379 ObjectId::new("db".into(), "public".into(), "my_table".into()),
1380 &table_obj,
1381 ),
1382 (
1383 ObjectId::new("db".into(), "public".into(), "my_source".into()),
1384 &source_obj,
1385 ),
1386 (
1387 ObjectId::new("db".into(), "public".into(), "my_conn".into()),
1388 &conn_obj,
1389 ),
1390 (
1391 ObjectId::new("db".into(), "public".into(), "my_secret".into()),
1392 &secret_obj,
1393 ),
1394 ];
1395
1396 let replacement_ids = BTreeSet::new();
1397 let partitioned = partition_objects(objects, &replacement_ids);
1398
1399 assert_eq!(
1401 partitioned.objects.len(),
1402 1,
1403 "Only the view should be staged"
1404 );
1405 assert_eq!(partitioned.objects[0].0.object(), "my_view");
1406
1407 assert_eq!(
1409 partitioned.table_count, 4,
1410 "Table, source, connection, and secret should all be skipped"
1411 );
1412
1413 assert!(partitioned.sinks.is_empty());
1415 assert!(partitioned.replacement_mvs.is_empty());
1416
1417 let (schema_set, cluster_set) =
1419 collect_stage_resources(&partitioned.objects, &partitioned.replacement_mvs);
1420
1421 assert_eq!(schema_set.len(), 1);
1423 assert!(schema_set.contains(&SchemaQualifier::new("db".into(), "public".into())));
1424
1425 assert!(
1427 cluster_set.is_empty(),
1428 "View without index should not require any clusters"
1429 );
1430 }
1431
1432 #[cfg_attr(miri, ignore)] #[mz_ore::test]
1434 fn test_full_deploy_view_indexed_different_cluster() {
1435 let view_obj = make_typed_object(&[
1436 "CREATE VIEW my_view AS SELECT 1",
1437 "CREATE INDEX my_idx IN CLUSTER index_cluster ON my_view (column1)",
1438 ]);
1439 let table_obj = make_typed_object(&["CREATE TABLE my_table (id INT)"]);
1440 let source_obj = make_typed_object(&[
1441 "CREATE SOURCE my_source IN CLUSTER source_cluster FROM LOAD GENERATOR COUNTER",
1442 ]);
1443 let conn_obj =
1444 make_typed_object(&["CREATE CONNECTION my_conn TO KAFKA (BROKER 'localhost:9092')"]);
1445 let secret_obj = make_typed_object(&["CREATE SECRET my_secret AS 'hunter2'"]);
1446
1447 let objects: Vec<ObjectRef> = vec![
1448 (
1449 ObjectId::new("db".into(), "public".into(), "my_view".into()),
1450 &view_obj,
1451 ),
1452 (
1453 ObjectId::new("db".into(), "public".into(), "my_table".into()),
1454 &table_obj,
1455 ),
1456 (
1457 ObjectId::new("db".into(), "public".into(), "my_source".into()),
1458 &source_obj,
1459 ),
1460 (
1461 ObjectId::new("db".into(), "public".into(), "my_conn".into()),
1462 &conn_obj,
1463 ),
1464 (
1465 ObjectId::new("db".into(), "public".into(), "my_secret".into()),
1466 &secret_obj,
1467 ),
1468 ];
1469
1470 let replacement_ids = BTreeSet::new();
1471 let partitioned = partition_objects(objects, &replacement_ids);
1472
1473 assert_eq!(partitioned.objects.len(), 1);
1475 assert_eq!(partitioned.objects[0].0.object(), "my_view");
1476 assert_eq!(partitioned.table_count, 4);
1477
1478 let (schema_set, cluster_set) =
1480 collect_stage_resources(&partitioned.objects, &partitioned.replacement_mvs);
1481
1482 assert_eq!(schema_set.len(), 1);
1484 assert!(schema_set.contains(&SchemaQualifier::new("db".into(), "public".into())));
1485
1486 assert_eq!(
1488 cluster_set.len(),
1489 1,
1490 "Should only have index_cluster, got: {:?}",
1491 cluster_set
1492 );
1493 assert!(
1494 cluster_set.contains("index_cluster"),
1495 "Should stage index_cluster from the view's index"
1496 );
1497 assert!(
1498 !cluster_set.contains("source_cluster"),
1499 "Should NOT stage source_cluster (source is not staged)"
1500 );
1501 }
1502
1503 #[cfg_attr(miri, ignore)] #[mz_ore::test]
1505 fn test_incremental_deploy_view_updated_not_indexed() {
1506 let view_obj = make_typed_object(&["CREATE VIEW my_view AS SELECT 1"]);
1508 let table_obj = make_typed_object(&["CREATE TABLE my_table (id INT)"]);
1509 let source_obj = make_typed_object(&[
1510 "CREATE SOURCE my_source IN CLUSTER source_cluster FROM LOAD GENERATOR COUNTER",
1511 ]);
1512 let conn_obj =
1513 make_typed_object(&["CREATE CONNECTION my_conn TO KAFKA (BROKER 'localhost:9092')"]);
1514 let secret_obj = make_typed_object(&["CREATE SECRET my_secret AS 'hunter2'"]);
1515
1516 let planned_project = make_planned_project(vec![
1517 ("db", "public", "my_view", view_obj),
1518 ("db", "storage", "my_table", table_obj),
1519 ("db", "storage", "my_source", source_obj),
1520 ("db", "storage", "my_conn", conn_obj),
1521 ("db", "storage", "my_secret", secret_obj),
1522 ]);
1523
1524 let new_snapshot = build_snapshot_from_planned(&planned_project).unwrap();
1526
1527 let mut old_snapshot = DeploymentSnapshot::default();
1529 for (object_id, hash) in &new_snapshot.objects {
1530 if object_id.object() == "my_view" {
1531 old_snapshot
1533 .objects
1534 .insert(object_id.clone(), "old_hash".to_string());
1535 } else {
1536 old_snapshot.objects.insert(object_id.clone(), hash.clone());
1537 }
1538 }
1539
1540 let change_set = ChangeSet::from_deployment_snapshot_comparison(
1542 &old_snapshot,
1543 &new_snapshot,
1544 &planned_project,
1545 );
1546
1547 assert!(
1549 change_set.objects_to_deploy.contains(&ObjectId::new(
1550 "db".into(),
1551 "public".into(),
1552 "my_view".into()
1553 )),
1554 "Changed view should be in objects_to_deploy"
1555 );
1556
1557 let objects = planned_project
1559 .get_sorted_objects_filtered(&change_set.objects_to_deploy)
1560 .unwrap();
1561
1562 let partitioned = partition_objects(objects, &change_set.changed_replacement_objects);
1563
1564 assert_eq!(
1566 partitioned.objects.len(),
1567 1,
1568 "Only the changed view should be staged, got: {:?}",
1569 partitioned
1570 .objects
1571 .iter()
1572 .map(|(id, _)| id.object())
1573 .collect::<Vec<_>>()
1574 );
1575 assert_eq!(partitioned.objects[0].0.object(), "my_view");
1576
1577 let (schema_set, cluster_set) =
1578 collect_stage_resources(&partitioned.objects, &partitioned.replacement_mvs);
1579
1580 assert_eq!(schema_set.len(), 1);
1581 assert!(schema_set.contains(&SchemaQualifier::new("db".into(), "public".into())));
1582 assert!(
1583 cluster_set.is_empty(),
1584 "View without index should not require any clusters"
1585 );
1586 }
1587
1588 #[cfg_attr(miri, ignore)] #[mz_ore::test]
1590 fn test_incremental_deploy_view_updated_indexed_different_cluster() {
1591 let view_obj = make_typed_object(&[
1593 "CREATE VIEW my_view AS SELECT 1",
1594 "CREATE INDEX my_idx IN CLUSTER index_cluster ON my_view (column1)",
1595 ]);
1596 let table_obj = make_typed_object(&["CREATE TABLE my_table (id INT)"]);
1597 let source_obj = make_typed_object(&[
1598 "CREATE SOURCE my_source IN CLUSTER source_cluster FROM LOAD GENERATOR COUNTER",
1599 ]);
1600 let conn_obj =
1601 make_typed_object(&["CREATE CONNECTION my_conn TO KAFKA (BROKER 'localhost:9092')"]);
1602 let secret_obj = make_typed_object(&["CREATE SECRET my_secret AS 'hunter2'"]);
1603
1604 let planned_project = make_planned_project(vec![
1605 ("db", "public", "my_view", view_obj),
1606 ("db", "storage", "my_table", table_obj),
1607 ("db", "storage", "my_source", source_obj),
1608 ("db", "storage", "my_conn", conn_obj),
1609 ("db", "storage", "my_secret", secret_obj),
1610 ]);
1611
1612 let new_snapshot = build_snapshot_from_planned(&planned_project).unwrap();
1614
1615 let mut old_snapshot = DeploymentSnapshot::default();
1617 for (object_id, hash) in &new_snapshot.objects {
1618 if object_id.object() == "my_view" {
1619 old_snapshot
1620 .objects
1621 .insert(object_id.clone(), "old_hash".to_string());
1622 } else {
1623 old_snapshot.objects.insert(object_id.clone(), hash.clone());
1624 }
1625 }
1626
1627 let change_set = ChangeSet::from_deployment_snapshot_comparison(
1629 &old_snapshot,
1630 &new_snapshot,
1631 &planned_project,
1632 );
1633
1634 assert!(
1635 change_set.objects_to_deploy.contains(&ObjectId::new(
1636 "db".into(),
1637 "public".into(),
1638 "my_view".into()
1639 )),
1640 "Changed view should be in objects_to_deploy"
1641 );
1642
1643 let objects = planned_project
1645 .get_sorted_objects_filtered(&change_set.objects_to_deploy)
1646 .unwrap();
1647
1648 let partitioned = partition_objects(objects, &change_set.changed_replacement_objects);
1649
1650 assert_eq!(
1652 partitioned.objects.len(),
1653 1,
1654 "Only the changed view should be staged, got: {:?}",
1655 partitioned
1656 .objects
1657 .iter()
1658 .map(|(id, _)| id.object())
1659 .collect::<Vec<_>>()
1660 );
1661 assert_eq!(partitioned.objects[0].0.object(), "my_view");
1662
1663 let (schema_set, cluster_set) =
1664 collect_stage_resources(&partitioned.objects, &partitioned.replacement_mvs);
1665
1666 assert_eq!(schema_set.len(), 1);
1667 assert!(schema_set.contains(&SchemaQualifier::new("db".into(), "public".into())));
1668
1669 assert_eq!(
1671 cluster_set.len(),
1672 1,
1673 "Should only have index_cluster, got: {:?}",
1674 cluster_set
1675 );
1676 assert!(
1677 cluster_set.contains("index_cluster"),
1678 "Should stage index_cluster from the view's index"
1679 );
1680 assert!(
1681 !cluster_set.contains("source_cluster"),
1682 "Should NOT stage source_cluster"
1683 );
1684 }
1685
1686 fn make_empty_change_set() -> ChangeSet {
1687 ChangeSet {
1688 changed_objects: BTreeSet::new(),
1689 dirty_schemas: BTreeSet::new(),
1690 dirty_clusters: BTreeSet::new(),
1691 objects_to_deploy: BTreeSet::new(),
1692 new_replacement_objects: BTreeSet::new(),
1693 changed_replacement_objects: BTreeSet::new(),
1694 }
1695 }
1696
1697 #[mz_ore::test]
1698 fn test_validate_no_new_replacement_objects_first_deploy() {
1699 let cs = make_empty_change_set();
1700 let snapshot = DeploymentSnapshot::default();
1701 assert!(validate_no_new_objects_in_existing_stable_schemas(&cs, &snapshot).is_ok());
1702 }
1703
1704 #[mz_ore::test]
1705 fn test_validate_new_replacement_objects_in_brand_new_schema() {
1706 let mut cs = make_empty_change_set();
1707 cs.new_replacement_objects.insert(ObjectId::new(
1708 "db".into(),
1709 "analytics".into(),
1710 "new_mv".into(),
1711 ));
1712
1713 let mut snapshot = DeploymentSnapshot::default();
1715 snapshot.objects.insert(
1716 ObjectId::new("db".into(), "public".into(), "existing_mv".into()),
1717 "hash1".into(),
1718 );
1719
1720 assert!(validate_no_new_objects_in_existing_stable_schemas(&cs, &snapshot).is_ok());
1721 }
1722
1723 #[mz_ore::test]
1724 fn test_validate_new_replacement_objects_in_existing_production_schema() {
1725 let mut cs = make_empty_change_set();
1726 cs.new_replacement_objects.insert(ObjectId::new(
1727 "db".into(),
1728 "analytics".into(),
1729 "new_mv".into(),
1730 ));
1731
1732 let mut snapshot = DeploymentSnapshot::default();
1734 snapshot.objects.insert(
1735 ObjectId::new("db".into(), "analytics".into(), "existing_mv".into()),
1736 "hash1".into(),
1737 );
1738
1739 let result = validate_no_new_objects_in_existing_stable_schemas(&cs, &snapshot);
1740 assert!(result.is_err());
1741 match result.unwrap_err() {
1742 CliError::NewObjectInExistingStableSchema {
1743 database,
1744 schema,
1745 objects,
1746 } => {
1747 assert_eq!(database, "db");
1748 assert_eq!(schema, "analytics");
1749 assert_eq!(objects, vec!["new_mv"]);
1750 }
1751 other => panic!("Expected NewObjectInExistingStableSchema, got: {:?}", other),
1752 }
1753 }
1754
1755 #[mz_ore::test]
1756 fn test_validate_changed_replacement_objects_only() {
1757 let mut cs = make_empty_change_set();
1758 cs.changed_replacement_objects.insert(ObjectId::new(
1760 "db".into(),
1761 "analytics".into(),
1762 "changed_mv".into(),
1763 ));
1764
1765 let mut snapshot = DeploymentSnapshot::default();
1766 snapshot.objects.insert(
1767 ObjectId::new("db".into(), "analytics".into(), "changed_mv".into()),
1768 "hash1".into(),
1769 );
1770
1771 assert!(validate_no_new_objects_in_existing_stable_schemas(&cs, &snapshot).is_ok());
1772 }
1773
1774 #[mz_ore::test]
1775 fn test_validate_mixed_new_in_new_schema_changed_in_existing() {
1776 let mut cs = make_empty_change_set();
1777 cs.new_replacement_objects.insert(ObjectId::new(
1779 "db".into(),
1780 "new_schema".into(),
1781 "new_mv".into(),
1782 ));
1783 cs.changed_replacement_objects.insert(ObjectId::new(
1785 "db".into(),
1786 "existing_schema".into(),
1787 "changed_mv".into(),
1788 ));
1789
1790 let mut snapshot = DeploymentSnapshot::default();
1792 snapshot.objects.insert(
1793 ObjectId::new("db".into(), "existing_schema".into(), "changed_mv".into()),
1794 "hash1".into(),
1795 );
1796
1797 assert!(validate_no_new_objects_in_existing_stable_schemas(&cs, &snapshot).is_ok());
1799 }
1800
1801 #[mz_ore::test]
1802 fn test_validate_transitioning_objects_in_existing_schema_allowed() {
1803 let mut cs = make_empty_change_set();
1804 cs.new_replacement_objects.insert(ObjectId::new(
1806 "db".into(),
1807 "analytics".into(),
1808 "existing_mv".into(),
1809 ));
1810
1811 let mut snapshot = DeploymentSnapshot::default();
1813 snapshot.objects.insert(
1814 ObjectId::new("db".into(), "analytics".into(), "existing_mv".into()),
1815 "hash1".into(),
1816 );
1817
1818 assert!(validate_no_new_objects_in_existing_stable_schemas(&cs, &snapshot).is_ok());
1820 }
1821
1822 fn make_planned_project_with_replacement_schemas(
1823 objects: Vec<(&str, &str, &str, DatabaseObject)>,
1824 replacement_schemas: BTreeSet<SchemaQualifier>,
1825 ) -> Project {
1826 let mut db_map: BTreeMap<String, BTreeMap<String, Vec<DatabaseObject>>> = BTreeMap::new();
1827
1828 for (database, schema, _name, typed_obj) in objects {
1829 db_map
1830 .entry(database.to_string())
1831 .or_default()
1832 .entry(schema.to_string())
1833 .or_default()
1834 .push(typed_obj);
1835 }
1836
1837 let databases: Vec<compiled::Database> = db_map
1838 .into_iter()
1839 .map(|(db_name, schemas)| compiled::Database {
1840 name: db_name,
1841 schemas: schemas
1842 .into_iter()
1843 .map(|(schema_name, objs)| compiled::Schema {
1844 name: schema_name,
1845 objects: objs,
1846 mod_statements: None,
1847 })
1848 .collect(),
1849 mod_statements: None,
1850 })
1851 .collect();
1852
1853 let typed_project = compiled::Project {
1854 databases,
1855 replacement_schemas,
1856 };
1857
1858 Project::from(typed_project)
1859 }
1860
1861 #[cfg_attr(miri, ignore)] #[mz_ore::test]
1863 fn test_build_snapshot_replacement_schema_kind() {
1864 let mv_obj =
1865 make_typed_object(&["CREATE MATERIALIZED VIEW my_mv IN CLUSTER compute AS SELECT 1"]);
1866 let view_obj = make_typed_object(&["CREATE VIEW my_view AS SELECT 1"]);
1867
1868 let mut replacement_schemas = BTreeSet::new();
1869 replacement_schemas.insert(SchemaQualifier::new("db".into(), "stable".into()));
1870
1871 let planned_project = make_planned_project_with_replacement_schemas(
1872 vec![
1873 ("db", "stable", "my_mv", mv_obj),
1874 ("db", "regular", "my_view", view_obj),
1875 ],
1876 replacement_schemas,
1877 );
1878
1879 let snapshot = build_snapshot_from_planned(&planned_project).unwrap();
1880
1881 assert_eq!(
1883 snapshot
1884 .schemas
1885 .get(&SchemaQualifier::new("db".into(), "stable".into())),
1886 Some(&DeploymentKind::Replacement),
1887 "Replacement schema should have Replacement kind in snapshot"
1888 );
1889
1890 assert_eq!(
1892 snapshot
1893 .schemas
1894 .get(&SchemaQualifier::new("db".into(), "regular".into())),
1895 Some(&DeploymentKind::Objects),
1896 "Regular schema should have Objects kind in snapshot"
1897 );
1898 }
1899
1900 #[cfg_attr(miri, ignore)] #[mz_ore::test]
1902 fn test_build_snapshot_no_replacement_schemas_all_objects() {
1903 let mv_obj =
1904 make_typed_object(&["CREATE MATERIALIZED VIEW my_mv IN CLUSTER compute AS SELECT 1"]);
1905 let view_obj = make_typed_object(&["CREATE VIEW my_view AS SELECT 1"]);
1906
1907 let planned_project = make_planned_project(vec![
1908 ("db", "stable", "my_mv", mv_obj),
1909 ("db", "regular", "my_view", view_obj),
1910 ]);
1911
1912 let snapshot = build_snapshot_from_planned(&planned_project).unwrap();
1913
1914 assert_eq!(
1916 snapshot
1917 .schemas
1918 .get(&SchemaQualifier::new("db".into(), "stable".into())),
1919 Some(&DeploymentKind::Objects),
1920 );
1921 assert_eq!(
1922 snapshot
1923 .schemas
1924 .get(&SchemaQualifier::new("db".into(), "regular".into())),
1925 Some(&DeploymentKind::Objects),
1926 );
1927 }
1928
1929 #[cfg_attr(miri, ignore)] #[mz_ore::test]
1931 fn test_record_stage_metadata_transition_override() {
1932 let mv_obj =
1936 make_typed_object(&["CREATE MATERIALIZED VIEW my_mv IN CLUSTER compute AS SELECT 1"]);
1937
1938 let objects: Vec<ObjectRef> = vec![(
1940 ObjectId::new("db".into(), "stable".into(), "my_mv".into()),
1941 &mv_obj,
1942 )];
1943 let sinks: Vec<ObjectRef> = vec![];
1944 let replacement_mvs: Vec<ObjectRef> = vec![];
1945
1946 let mut replacement_schemas = BTreeSet::new();
1948 replacement_schemas.insert(SchemaQualifier::new("db".into(), "stable".into()));
1949
1950 let mut staging_snapshot = DeploymentSnapshot::default();
1952
1953 for (object_id, typed_obj) in &objects {
1954 let hash = deployment_snapshot::compute_typed_hash(typed_obj);
1955 staging_snapshot.objects.insert(object_id.clone(), hash);
1956 staging_snapshot.schemas.insert(
1957 SchemaQualifier::new(
1958 object_id.expect_database().to_string(),
1959 object_id.schema().to_string(),
1960 ),
1961 DeploymentKind::Objects,
1962 );
1963 }
1964
1965 for (object_id, typed_obj) in &sinks {
1966 let hash = deployment_snapshot::compute_typed_hash(typed_obj);
1967 staging_snapshot.objects.insert(object_id.clone(), hash);
1968 staging_snapshot
1969 .schemas
1970 .entry(SchemaQualifier::new(
1971 object_id.expect_database().to_string(),
1972 object_id.schema().to_string(),
1973 ))
1974 .or_insert(DeploymentKind::Sinks);
1975 }
1976
1977 for (object_id, typed_obj) in &replacement_mvs {
1978 let hash = deployment_snapshot::compute_typed_hash(typed_obj);
1979 staging_snapshot.objects.insert(object_id.clone(), hash);
1980 staging_snapshot.schemas.insert(
1981 SchemaQualifier::new(
1982 object_id.expect_database().to_string(),
1983 object_id.schema().to_string(),
1984 ),
1985 DeploymentKind::Replacement,
1986 );
1987 }
1988
1989 assert_eq!(
1991 staging_snapshot
1992 .schemas
1993 .get(&SchemaQualifier::new("db".into(), "stable".into())),
1994 Some(&DeploymentKind::Objects),
1995 "Before override, schema should be Objects (from regular objects path)"
1996 );
1997
1998 for sq in &replacement_schemas {
2000 if staging_snapshot.schemas.contains_key(sq) {
2001 staging_snapshot
2002 .schemas
2003 .insert(sq.clone(), DeploymentKind::Replacement);
2004 }
2005 }
2006
2007 assert_eq!(
2009 staging_snapshot
2010 .schemas
2011 .get(&SchemaQualifier::new("db".into(), "stable".into())),
2012 Some(&DeploymentKind::Replacement),
2013 "After override, schema should be Replacement"
2014 );
2015 }
2016
2017 #[mz_ore::test]
2018 fn test_record_stage_metadata_override_only_applies_to_existing_schemas() {
2019 let replacement_schemas =
2022 BTreeSet::from([SchemaQualifier::new("db".into(), "nonexistent".into())]);
2023
2024 let mut staging_snapshot = DeploymentSnapshot::default();
2025
2026 for sq in &replacement_schemas {
2028 if staging_snapshot.schemas.contains_key(sq) {
2029 staging_snapshot
2030 .schemas
2031 .insert(sq.clone(), DeploymentKind::Replacement);
2032 }
2033 }
2034
2035 assert!(
2037 staging_snapshot.schemas.is_empty(),
2038 "Override should not create entries for schemas with no objects"
2039 );
2040 }
2041}