1use super::ObjectRef;
16use crate::cli::CliError;
17use crate::cli::executor::{self, DeploymentExecutor};
18use crate::cli::{git, progress};
19use crate::client::DeploymentMode;
20use crate::client::{
21 Client, ClusterConfig, ClusterOptions, DeploymentKind, PendingStatement, ReplacementMvRecord,
22};
23use crate::config::Settings;
24use crate::log;
25use crate::project::SchemaQualifier;
26use crate::project::analysis::changeset::ChangeSet;
27use crate::project::analysis::deployment_snapshot::{self, DeploymentSnapshot};
28use crate::project::analysis::deps::extract_external_indexes;
29use crate::project::ast::Statement;
30use crate::project::ir::compiled::{DatabaseObject, FullyQualifiedName};
31use crate::project::ir::graph::Project;
32use crate::project::ir::object_id::ObjectId;
33use crate::project::resolve::normalize::{self, NormalizingVisitor};
34use crate::verbose;
35use mz_ore::option::OptionExt;
36use std::collections::BTreeSet;
37use std::fmt;
38use std::path::Path;
39use std::time::Instant;
40
41struct StageAnalysis<'a> {
46 objects: Vec<ObjectRef<'a>>,
47 sinks: Vec<ObjectRef<'a>>,
48 replacement_mvs: Vec<ObjectRef<'a>>,
49 schema_set: BTreeSet<SchemaQualifier>,
50 cluster_set: BTreeSet<String>,
51}
52
53struct PartitionedObjects<'a> {
57 objects: Vec<ObjectRef<'a>>,
58 sinks: Vec<ObjectRef<'a>>,
59 replacement_mvs: Vec<ObjectRef<'a>>,
60 table_count: usize,
61}
62
63#[derive(serde::Serialize)]
66struct StageResult {
67 deploy_id: String,
68 objects_deployed: usize,
69 #[serde(skip)]
70 duration: std::time::Duration,
71}
72
73impl fmt::Display for StageResult {
74 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75 write!(
76 f,
77 " \u{2713} Successfully deployed {} objects to '{}' staging environment ({:.1}s)",
78 self.objects_deployed,
79 self.deploy_id,
80 self.duration.as_secs_f64()
81 )
82 }
83}
84
85#[derive(serde::Serialize)]
86struct StagePlan {
87 deploy_id: String,
88 schemas: Vec<StagePlanSchema>,
89 clusters: Vec<StagePlanCluster>,
90 objects: Vec<StagePlanObject>,
91 sinks: Vec<StagePlanObject>,
92 replacement_mvs: Vec<StagePlanObject>,
93}
94
95#[derive(serde::Serialize)]
96struct StagePlanSchema {
97 database: String,
98 schema: String,
99 staging_schema: String,
100}
101
102#[derive(serde::Serialize)]
103struct StagePlanCluster {
104 production_cluster: String,
105 staging_cluster: String,
106}
107
108#[derive(serde::Serialize)]
109struct StagePlanObject {
110 database: String,
111 schema: String,
112 object: String,
113}
114
115impl fmt::Display for StagePlan {
116 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
117 writeln!(f, "Stage plan for '{}':", self.deploy_id)?;
118
119 if !self.schemas.is_empty() {
120 writeln!(f, "\nSchemas ({}):", self.schemas.len())?;
121 for s in &self.schemas {
122 writeln!(
123 f,
124 " {}.{} \u{2192} {}",
125 s.database, s.schema, s.staging_schema
126 )?;
127 }
128 }
129
130 if !self.clusters.is_empty() {
131 writeln!(f, "\nClusters ({}):", self.clusters.len())?;
132 for c in &self.clusters {
133 writeln!(
134 f,
135 " {} \u{2192} {}",
136 c.production_cluster, c.staging_cluster
137 )?;
138 }
139 }
140
141 if !self.objects.is_empty() {
142 writeln!(f, "\nObjects ({}):", self.objects.len())?;
143 for o in &self.objects {
144 writeln!(f, " {}.{}.{}", o.database, o.schema, o.object)?;
145 }
146 }
147
148 if !self.sinks.is_empty() {
149 writeln!(f, "\nSinks ({}):", self.sinks.len())?;
150 for s in &self.sinks {
151 writeln!(f, " {}.{}.{}", s.database, s.schema, s.object)?;
152 }
153 }
154
155 if !self.replacement_mvs.is_empty() {
156 writeln!(f, "\nReplacement MVs ({}):", self.replacement_mvs.len())?;
157 for m in &self.replacement_mvs {
158 writeln!(f, " {}.{}.{}", m.database, m.schema, m.object)?;
159 }
160 }
161
162 Ok(())
163 }
164}
165
166pub async fn run(
187 settings: &Settings,
188 stage_name: Option<&str>,
189 allow_dirty: bool,
190 no_rollback: bool,
191 dry_run: bool,
192 redeploy_schemas: &[String],
193 redeploy_all: bool,
194) -> Result<(), CliError> {
195 let profile = settings.connection();
196 let directory = &settings.directory;
197 let start_time = Instant::now();
198
199 if !allow_dirty && git::is_dirty(directory) {
200 return Err(CliError::GitDirty);
201 }
202
203 let stage_name = stage_name
204 .owned()
205 .or_else(|| git::get_git_commit(directory).map(|sha| sha.chars().take(7).collect()))
206 .unwrap_or_else(executor::generate_random_env_name);
207
208 let planned_project = super::compile::run(settings, true).await?;
209 let staging_suffix = format!("_{}", stage_name);
210
211 let client = Client::connect_with_profile(profile.clone())
212 .await
213 .map_err(CliError::Connection)?;
214
215 crate::cli::commands::setup::verify(&client, settings.emulator()).await?;
216 let role =
217 crate::cli::commands::setup::validate_connection(&client, settings.emulator()).await?;
218 crate::cli::commands::setup::require_deployer(role)?;
219
220 let forced_dirty_schemas = resolve_redeploy_schemas(&planned_project, redeploy_schemas)?;
221
222 let Some(analysis) = analyze_project_changes(
223 &client,
224 &planned_project,
225 &stage_name,
226 forced_dirty_schemas,
227 redeploy_all,
228 )
229 .await?
230 else {
231 return Ok(());
232 };
233
234 validate_project_for_stage(
235 &client,
236 &planned_project,
237 directory,
238 &analysis.schema_set,
239 &analysis.cluster_set,
240 )
241 .await?;
242
243 if !dry_run {
244 record_stage_metadata(
245 &client,
246 directory,
247 &stage_name,
248 &staging_suffix,
249 &analysis.objects,
250 &analysis.sinks,
251 &analysis.replacement_mvs,
252 &planned_project.replacement_schemas,
253 )
254 .await?;
255 }
256
257 if dry_run {
258 let plan = StagePlan {
259 deploy_id: stage_name.to_string(),
260 schemas: analysis
261 .schema_set
262 .iter()
263 .map(|sq| StagePlanSchema {
264 database: sq.database.clone(),
265 schema: sq.schema.clone(),
266 staging_schema: format!("{}{}", sq.schema, staging_suffix),
267 })
268 .collect(),
269 clusters: analysis
270 .cluster_set
271 .iter()
272 .map(|c| StagePlanCluster {
273 production_cluster: c.clone(),
274 staging_cluster: format!("{}{}", c, staging_suffix),
275 })
276 .collect(),
277 objects: analysis
278 .objects
279 .iter()
280 .map(|(id, _)| StagePlanObject {
281 database: id.expect_database().to_string(),
282 schema: id.schema().to_string(),
283 object: id.object().to_string(),
284 })
285 .collect(),
286 sinks: analysis
287 .sinks
288 .iter()
289 .map(|(id, _)| StagePlanObject {
290 database: id.expect_database().to_string(),
291 schema: id.schema().to_string(),
292 object: id.object().to_string(),
293 })
294 .collect(),
295 replacement_mvs: analysis
296 .replacement_mvs
297 .iter()
298 .map(|(id, _)| StagePlanObject {
299 database: id.expect_database().to_string(),
300 schema: id.schema().to_string(),
301 object: id.object().to_string(),
302 })
303 .collect(),
304 };
305 log::output(&plan);
306 return Ok(());
307 }
308
309 let success_count = create_resources_with_rollback(
310 &client,
311 &stage_name,
312 &staging_suffix,
313 &analysis.schema_set,
314 &analysis.cluster_set,
315 &planned_project,
316 &analysis.objects,
317 &analysis.replacement_mvs,
318 no_rollback,
319 dry_run,
320 )
321 .await?;
322
323 let result = StageResult {
324 deploy_id: stage_name.to_string(),
325 objects_deployed: success_count,
326 duration: start_time.elapsed(),
327 };
328 log::output(&result);
329 log::print_deploy_id(&stage_name);
330 Ok(())
331}
332
333fn parse_qualified_schema(raw: &str) -> Result<SchemaQualifier, CliError> {
337 let unqualified = || {
338 CliError::Message(format!(
339 "invalid --redeploy-schema '{}': expected a qualified 'database.schema' name",
340 raw
341 ))
342 };
343 let name = mz_sql_parser::parser::parse_item_name(raw).map_err(|_| unqualified())?;
344 let [database, schema] = name.0.as_slice() else {
345 return Err(unqualified());
346 };
347 Ok(SchemaQualifier::new(
348 database.as_str().to_string(),
349 schema.as_str().to_string(),
350 ))
351}
352
353fn resolve_redeploy_schemas(
356 planned_project: &Project,
357 redeploy_schemas: &[String],
358) -> Result<BTreeSet<SchemaQualifier>, CliError> {
359 if redeploy_schemas.is_empty() {
360 return Ok(BTreeSet::new());
361 }
362
363 let objects: Vec<_> = planned_project.iter_objects().collect();
364 let project_schemas = SchemaQualifier::collect_from(&objects);
365
366 let mut resolved = BTreeSet::new();
367 for raw in redeploy_schemas {
368 let sq = parse_qualified_schema(raw)?;
369 if !project_schemas.contains(&sq) {
370 let available = project_schemas
371 .iter()
372 .map(|s| format!("{}.{}", s.database, s.schema))
373 .collect::<Vec<_>>()
374 .join(", ");
375 return Err(CliError::Message(format!(
376 "--redeploy-schema '{}.{}' is not a schema in this project; available: {}",
377 sq.database, sq.schema, available
378 )));
379 }
380 resolved.insert(sq);
381 }
382 Ok(resolved)
383}
384
385async fn analyze_project_changes<'a>(
390 client: &Client,
391 planned_project: &'a Project,
392 stage_name: &str,
393 forced_dirty_schemas: BTreeSet<SchemaQualifier>,
394 redeploy_all: bool,
395) -> Result<Option<StageAnalysis<'a>>, CliError> {
396 progress::stage_start("Analyzing project changes");
397 let analyze_start = Instant::now();
398
399 if client
400 .deployments()
401 .get_deployment_metadata(stage_name)
402 .await?
403 .is_some()
404 {
405 return Err(CliError::InvalidEnvironmentName {
406 name: format!("deployment '{}' already exists", stage_name),
407 });
408 }
409
410 let new_snapshot = deployment_snapshot::build_snapshot_from_planned(planned_project)?;
411 let production_snapshot = deployment_snapshot::load_from_database(client, None).await?;
412
413 let dirty_schemas = if redeploy_all {
414 new_snapshot.schemas.keys().cloned().collect()
415 } else {
416 forced_dirty_schemas
417 };
418
419 let change_set = if production_snapshot.objects.is_empty() {
420 None
421 } else {
422 Some(ChangeSet::from_deployment_snapshot_comparison(
423 &production_snapshot,
424 &new_snapshot,
425 planned_project,
426 &dirty_schemas,
427 ))
428 };
429
430 if let Some(ref cs) = change_set {
453 validate_no_new_objects_in_existing_stable_schemas(cs, &production_snapshot)?;
454 }
455
456 let objects = select_stage_objects(planned_project, change_set.as_ref())?;
457 if objects.is_empty() && change_set.as_ref().is_some_and(ChangeSet::is_empty) {
458 progress::success("No changes detected compared to production, skipping deployment");
459 return Ok(None);
460 }
461
462 let replacement_object_ids = change_set
463 .as_ref()
464 .map(|cs| cs.changed_replacement_objects.clone())
465 .unwrap_or_default();
466 let partitioned = partition_objects(objects, &replacement_object_ids);
467 log_partition_summary(&partitioned);
468
469 let object_ids: BTreeSet<_> = partitioned
470 .objects
471 .iter()
472 .map(|(id, _)| id.clone())
473 .collect();
474 client
475 .validation()
476 .validate_table_dependencies(planned_project, &object_ids)
477 .await?;
478
479 let (schema_set, cluster_set) =
480 collect_stage_resources(&partitioned.objects, &partitioned.replacement_mvs);
481
482 let analyze_duration = analyze_start.elapsed();
483 progress::stage_success(
484 &format!(
485 "Ready to deploy {} view(s)/materialized view(s)",
486 partitioned.objects.len()
487 ),
488 analyze_duration,
489 );
490
491 Ok(Some(StageAnalysis {
492 objects: partitioned.objects,
493 sinks: partitioned.sinks,
494 replacement_mvs: partitioned.replacement_mvs,
495 schema_set,
496 cluster_set,
497 }))
498}
499
500fn select_stage_objects<'a>(
504 planned_project: &'a Project,
505 change_set: Option<&ChangeSet>,
506) -> Result<Vec<ObjectRef<'a>>, CliError> {
507 if let Some(cs) = change_set {
508 if cs.is_empty() {
509 return Ok(Vec::new());
510 }
511 verbose!("{}", cs);
512 Ok(planned_project.get_sorted_objects_filtered(&cs.objects_to_deploy)?)
513 } else {
514 verbose!("Full deployment: no production deployment found");
515 Ok(planned_project.get_sorted_objects()?)
516 }
517}
518
519fn partition_objects<'a>(
524 objects: Vec<ObjectRef<'a>>,
525 replacement_object_ids: &BTreeSet<ObjectId>,
526) -> PartitionedObjects<'a> {
527 let mut kept = Vec::new();
528 let mut sinks = Vec::new();
529 let mut replacement_mvs = Vec::new();
530 let mut table_count = 0;
531
532 for (object_id, typed_obj) in objects {
533 match &typed_obj.stmt {
534 Statement::CreateTable(_)
535 | Statement::CreateTableFromSource(_)
536 | Statement::CreateSource(_)
537 | Statement::CreateSecret(_)
538 | Statement::CreateConnection(_) => {
539 table_count += 1;
540 }
541 Statement::CreateSink(_) => sinks.push((object_id, typed_obj)),
542 Statement::CreateMaterializedView(_) if replacement_object_ids.contains(&object_id) => {
543 replacement_mvs.push((object_id, typed_obj));
544 }
545 _ => kept.push((object_id, typed_obj)),
546 }
547 }
548
549 PartitionedObjects {
550 objects: kept,
551 sinks,
552 replacement_mvs,
553 table_count,
554 }
555}
556
557fn log_partition_summary(partitioned: &PartitionedObjects<'_>) {
559 if partitioned.table_count > 0 {
560 verbose!(
561 "Skipped {} table(s)/source(s) - use 'mz-deploy apply' for those",
562 partitioned.table_count
563 );
564 }
565 if !partitioned.sinks.is_empty() {
566 verbose!(
567 "Found {} sink(s) - will be created during apply after swap",
568 partitioned.sinks.len()
569 );
570 }
571 if !partitioned.replacement_mvs.is_empty() {
572 verbose!(
573 "Found {} replacement MV(s) - will use CREATE REPLACEMENT protocol",
574 partitioned.replacement_mvs.len()
575 );
576 }
577}
578
579fn collect_stage_resources(
585 objects: &[ObjectRef<'_>],
586 replacement_mvs: &[ObjectRef<'_>],
587) -> (BTreeSet<SchemaQualifier>, BTreeSet<String>) {
588 let mut schema_set = BTreeSet::new();
589 let mut cluster_set = BTreeSet::new();
590
591 for (object_id, typed_obj) in objects.iter().chain(replacement_mvs.iter()) {
592 schema_set.insert(SchemaQualifier::new(
593 object_id.expect_database().to_string(),
594 object_id.schema().to_string(),
595 ));
596 cluster_set.extend(typed_obj.clusters());
597 }
598
599 (schema_set, cluster_set)
600}
601
602async fn validate_project_for_stage(
606 client: &Client,
607 planned_project: &Project,
608 directory: &Path,
609 schema_set: &BTreeSet<SchemaQualifier>,
610 cluster_set: &BTreeSet<String>,
611) -> Result<(), CliError> {
612 progress::stage_start("Validating project");
613 let validate_start = Instant::now();
614 client
615 .validation()
616 .validate_project(planned_project, directory)
617 .await?;
618 client
619 .validation()
620 .validate_cluster_isolation(planned_project)
621 .await?;
622 client
623 .validation()
624 .validate_privileges(planned_project)
625 .await?;
626 client
627 .validation()
628 .validate_schema_ownership(schema_set)
629 .await?;
630 client
631 .validation()
632 .validate_cluster_ownership(cluster_set)
633 .await?;
634 client
635 .validation()
636 .validate_sink_connections_exist(planned_project)
637 .await?;
638 let validate_duration = validate_start.elapsed();
639 progress::stage_success("All validations passed", validate_duration);
640 Ok(())
641}
642
643async fn record_stage_metadata(
648 client: &Client,
649 directory: &Path,
650 stage_name: &str,
651 staging_suffix: &str,
652 objects: &[ObjectRef<'_>],
653 sinks: &[ObjectRef<'_>],
654 replacement_mvs: &[ObjectRef<'_>],
655 replacement_schemas: &BTreeSet<SchemaQualifier>,
656) -> Result<(), CliError> {
657 progress::stage_start("Recording deployment metadata");
658 let metadata_start = Instant::now();
659 let metadata = executor::collect_deployment_metadata(client, directory).await;
660
661 let mut staging_snapshot = DeploymentSnapshot::default();
662
663 for (object_id, typed_obj) in objects {
664 let hash = deployment_snapshot::compute_typed_hash(typed_obj);
665 staging_snapshot.objects.insert(object_id.clone(), hash);
666 staging_snapshot.schemas.insert(
667 SchemaQualifier::new(
668 object_id.expect_database().to_string(),
669 object_id.schema().to_string(),
670 ),
671 DeploymentKind::Objects,
672 );
673 }
674
675 for (object_id, typed_obj) in sinks {
676 let hash = deployment_snapshot::compute_typed_hash(typed_obj);
677 staging_snapshot.objects.insert(object_id.clone(), hash);
678 staging_snapshot
679 .schemas
680 .entry(SchemaQualifier::new(
681 object_id.expect_database().to_string(),
682 object_id.schema().to_string(),
683 ))
684 .or_insert(DeploymentKind::Sinks);
685 }
686
687 for (object_id, typed_obj) in replacement_mvs {
688 let hash = deployment_snapshot::compute_typed_hash(typed_obj);
689 staging_snapshot.objects.insert(object_id.clone(), hash);
690 staging_snapshot.schemas.insert(
691 SchemaQualifier::new(
692 object_id.expect_database().to_string(),
693 object_id.schema().to_string(),
694 ),
695 DeploymentKind::Replacement,
696 );
697 }
698
699 for sq in replacement_schemas {
704 if staging_snapshot.schemas.contains_key(sq) {
705 staging_snapshot
706 .schemas
707 .insert(sq.clone(), DeploymentKind::Replacement);
708 }
709 }
710
711 deployment_snapshot::write_to_database(
712 client,
713 &staging_snapshot,
714 stage_name,
715 &metadata,
716 None,
717 DeploymentMode::Stage,
718 )
719 .await?;
720
721 if !sinks.is_empty() {
722 let pending_statements: Vec<PendingStatement> = sinks
723 .iter()
724 .enumerate()
725 .map(|(idx, (object_id, typed_obj))| {
726 let original_fqn: FullyQualifiedName = object_id.clone().into();
727 let mut visitor = NormalizingVisitor::fully_qualifying(&original_fqn);
728 let stmt = typed_obj
729 .stmt
730 .clone()
731 .normalize_name_with(&visitor, &original_fqn.to_item_name())
732 .normalize_dependencies_with(&mut visitor);
733 let hash = deployment_snapshot::compute_typed_hash(typed_obj);
734 #[allow(clippy::as_conversions)]
735 PendingStatement {
736 deploy_id: stage_name.to_string(),
737 sequence_num: idx as i32,
738 database: object_id.expect_database().to_string(),
739 schema: object_id.schema().to_string(),
740 object: object_id.object().to_string(),
741 object_hash: hash,
742 statement_sql: stmt.to_string(),
743 statement_kind: "sink".to_string(),
744 executed_at: None,
745 }
746 })
747 .collect();
748
749 client
750 .deployments()
751 .insert_pending_statements(&pending_statements)
752 .await?;
753 verbose!(
754 "Stored {} pending sink statement(s)",
755 pending_statements.len()
756 );
757 }
758
759 if !replacement_mvs.is_empty() {
760 let records: Vec<ReplacementMvRecord> = replacement_mvs
761 .iter()
762 .map(|(object_id, _)| ReplacementMvRecord {
763 deploy_id: stage_name.to_string(),
764 target_database: object_id.expect_database().to_string(),
765 target_schema: object_id.schema().to_string(),
766 target_name: object_id.object().to_string(),
767 replacement_schema: format!("{}{}", object_id.schema(), staging_suffix),
768 })
769 .collect();
770 client
771 .deployments()
772 .insert_replacement_mvs(&records)
773 .await?;
774 verbose!("Stored {} replacement MV record(s)", records.len());
775 }
776
777 let metadata_duration = metadata_start.elapsed();
778 progress::stage_success("Deployment metadata recorded", metadata_duration);
779 Ok(())
780}
781
782#[allow(clippy::too_many_arguments)]
788async fn create_resources_with_rollback<'a>(
789 client: &Client,
790 stage_name: &str,
791 staging_suffix: &str,
792 schema_set: &BTreeSet<SchemaQualifier>,
793 cluster_set: &BTreeSet<String>,
794 planned_project: &'a Project,
795 objects: &'a [(ObjectId, &'a DatabaseObject)],
796 replacement_mvs: &'a [(ObjectId, &'a DatabaseObject)],
797 no_rollback: bool,
798 dry_run: bool,
799) -> Result<usize, CliError> {
800 let executor = DeploymentExecutor::with_dry_run(client, dry_run);
801
802 let result = async {
803 create_databases_and_schemas(&executor, planned_project, schema_set, staging_suffix)
804 .await?;
805 create_staging_clusters(&executor, client, stage_name, cluster_set, staging_suffix).await?;
806 deploy_objects_to_staging(
807 &executor,
808 objects,
809 replacement_mvs,
810 planned_project,
811 cluster_set,
812 staging_suffix,
813 )
814 .await
815 }
816 .await;
817
818 match result {
819 Ok(count) => Ok(count),
820 Err(e) if dry_run || no_rollback => {
821 if !dry_run {
822 progress::error("Deployment failed (skipping rollback due to --no-rollback flag)");
823 }
824 Err(e)
825 }
826 Err(e) => {
827 progress::error("Deployment failed, rolling back...");
828 let (schemas, clusters) = rollback_staging_resources(client, stage_name).await;
829
830 if schemas > 0 || clusters > 0 {
831 progress::success(&format!(
832 "Rolled back: {} schema(s), {} cluster(s)",
833 schemas, clusters
834 ));
835 }
836
837 Err(e)
838 }
839 }
840}
841
842async fn create_databases_and_schemas(
847 executor: &DeploymentExecutor<'_>,
848 planned_project: &Project,
849 schema_set: &BTreeSet<SchemaQualifier>,
850 staging_suffix: &str,
851) -> Result<(), CliError> {
852 let schema_set_dbs: BTreeSet<&str> = schema_set.iter().map(|sq| sq.database.as_str()).collect();
855 for db in &planned_project.databases {
856 if !schema_set_dbs.contains(db.name.as_str()) {
857 executor.ensure_database(&db.name).await?;
858 verbose!(" Ensured database {} exists", db.name);
859 }
860 }
861
862 progress::stage_start("Creating staging schemas and applying setup statements");
864 let schema_start = Instant::now();
865 executor
866 .prepare_databases_and_schemas(planned_project, schema_set, Some(staging_suffix))
867 .await?;
868 let schema_duration = schema_start.elapsed();
869 progress::stage_success(
870 &format!(
871 "Created {} staging schema(s) with setup statements",
872 schema_set.len()
873 ),
874 schema_duration,
875 );
876
877 if !executor.is_dry_run() {
879 for sq in schema_set {
880 executor.ensure_schema(&sq.database, &sq.schema).await?;
881 verbose!(" Ensured schema {}.{} exists", sq.database, sq.schema);
882 }
883 }
884
885 Ok(())
886}
887
888async fn create_staging_clusters(
894 executor: &DeploymentExecutor<'_>,
895 client: &Client,
896 stage_name: &str,
897 cluster_set: &BTreeSet<String>,
898 staging_suffix: &str,
899) -> Result<(), CliError> {
900 let cluster_names: Vec<String> = cluster_set.iter().cloned().collect();
902 executor
903 .record_deployment_clusters(stage_name, &cluster_names)
904 .await?;
905
906 progress::stage_start("Creating staging clusters");
907 let cluster_start = Instant::now();
908 let mut created_clusters = 0;
909
910 let existing_staging_clusters = if !executor.is_dry_run() {
912 let staging_cluster_names: Vec<String> = cluster_set
913 .iter()
914 .map(|name| format!("{}{}", name, staging_suffix))
915 .collect();
916 client
917 .introspection()
918 .check_clusters_exist(&staging_cluster_names)
919 .await?
920 } else {
921 BTreeSet::new()
922 };
923
924 for prod_cluster in cluster_set {
925 let staging_cluster = format!("{}{}", prod_cluster, staging_suffix);
926
927 if executor.is_dry_run() {
928 let placeholder = ClusterConfig::Managed {
930 options: ClusterOptions {
931 size: String::new(),
932 replication_factor: 1,
933 },
934 grants: Vec::new(),
935 };
936 executor
937 .create_cluster(&staging_cluster, prod_cluster, &placeholder)
938 .await?;
939 created_clusters += 1;
940 continue;
941 }
942
943 if existing_staging_clusters.contains(&staging_cluster) {
945 verbose!(" Cluster '{}' already exists, skipping", staging_cluster);
946 continue;
947 }
948
949 let config = client
951 .introspection()
952 .get_cluster_config(prod_cluster)
953 .await?;
954
955 let config = match config {
956 Some(config) => config,
957 None => {
958 return Err(CliError::ClusterNotFound {
959 name: prod_cluster.clone(),
960 });
961 }
962 };
963
964 executor
965 .create_cluster(&staging_cluster, prod_cluster, &config)
966 .await?;
967 created_clusters += 1;
968
969 log_cluster_creation(&staging_cluster, prod_cluster, &config);
970 }
971
972 let cluster_duration = cluster_start.elapsed();
973 progress::stage_success(
974 &format!("Created {} cluster(s)", created_clusters),
975 cluster_duration,
976 );
977
978 Ok(())
979}
980
981fn log_cluster_creation(staging_cluster: &str, prod_cluster: &str, config: &ClusterConfig) {
983 match config {
984 ClusterConfig::Managed { options, grants } => {
985 verbose!(
986 " Created managed cluster '{}' (size: {}, replication_factor: {}, {} grant(s), cloned from '{}')",
987 staging_cluster,
988 options.size,
989 options.replication_factor,
990 grants.len(),
991 prod_cluster
992 );
993 }
994 ClusterConfig::Unmanaged { replicas, grants } => {
995 verbose!(
996 " Created unmanaged cluster '{}' with {} replica(s), {} grant(s) (cloned from '{}')",
997 staging_cluster,
998 replicas.len(),
999 grants.len(),
1000 prod_cluster
1001 );
1002 for replica in replicas {
1003 verbose!(
1004 " - {} (size: {}{})",
1005 replica.name,
1006 replica.size,
1007 replica
1008 .availability_zone
1009 .as_ref()
1010 .map(|az| format!(", az: {}", az))
1011 .unwrap_or_default()
1012 );
1013 }
1014 }
1015 }
1016}
1017
1018async fn deploy_objects_to_staging<'a>(
1025 executor: &DeploymentExecutor<'_>,
1026 objects: &'a [(ObjectId, &'a DatabaseObject)],
1027 replacement_mvs: &'a [(ObjectId, &'a DatabaseObject)],
1028 planned_project: &'a Project,
1029 cluster_set: &BTreeSet<String>,
1030 staging_suffix: &str,
1031) -> Result<usize, CliError> {
1032 progress::stage_start("Deploying objects to staging");
1033 let deploy_start = Instant::now();
1034
1035 let objects_to_deploy_set: BTreeSet<_> = objects
1038 .iter()
1039 .chain(replacement_mvs.iter())
1040 .map(|(oid, _)| oid.clone())
1041 .collect();
1042
1043 let mut external_indexes: Vec<_> = planned_project
1045 .iter_objects()
1046 .filter(|object| !objects_to_deploy_set.contains(&object.id))
1047 .flat_map(extract_external_indexes)
1048 .filter_map(|(cluster, index)| cluster_set.contains(&cluster.name).then_some(index))
1049 .collect();
1050
1051 normalize::transform_cluster_names_for_staging(&mut external_indexes, staging_suffix);
1053 for index in external_indexes {
1054 verbose!("Creating external index {}", index);
1055 executor.execute_sql(&index).await?;
1056 }
1057
1058 let replacement_object_ids: BTreeSet<ObjectId> =
1061 replacement_mvs.iter().map(|(oid, _)| oid.clone()).collect();
1062
1063 let mut success_count = 0;
1064
1065 for (idx, (object_id, typed_obj)) in objects.iter().enumerate() {
1067 verbose!(
1068 "Applying {}/{}: {}{} (to schema {}{})",
1069 idx + 1,
1070 objects.len(),
1071 object_id.object(),
1072 staging_suffix,
1073 object_id.schema(),
1074 staging_suffix
1075 );
1076
1077 deploy_single_object(
1078 executor,
1079 object_id,
1080 typed_obj,
1081 staging_suffix,
1082 planned_project,
1083 &objects_to_deploy_set,
1084 &replacement_object_ids,
1085 |stmt| stmt,
1086 )
1087 .await?;
1088 success_count += 1;
1089 }
1090
1091 for (idx, (object_id, typed_obj)) in replacement_mvs.iter().enumerate() {
1093 verbose!(
1094 "Applying replacement MV {}/{}: {} FOR {}",
1095 idx + 1,
1096 replacement_mvs.len(),
1097 object_id.object(),
1098 object_id
1099 );
1100
1101 let production_target = object_id.to_unresolved_item_name();
1102 deploy_single_object(
1103 executor,
1104 object_id,
1105 typed_obj,
1106 staging_suffix,
1107 planned_project,
1108 &objects_to_deploy_set,
1109 &replacement_object_ids,
1110 |stmt| match stmt {
1111 Statement::CreateMaterializedView(mut mv) => {
1112 mv.replacement_for =
1113 Some(mz_sql_parser::ast::RawItemName::Name(production_target));
1114 Statement::CreateMaterializedView(mv)
1115 }
1116 other => other,
1117 },
1118 )
1119 .await?;
1120 success_count += 1;
1121 }
1122
1123 let deploy_duration = deploy_start.elapsed();
1124 progress::stage_success(
1125 &format!("Deployed {} view(s)/materialized view(s)", success_count),
1126 deploy_duration,
1127 );
1128
1129 Ok(success_count)
1130}
1131
1132async fn rollback_staging_resources(client: &Client, environment: &str) -> (usize, usize) {
1145 let staging_schemas = best_effort_fetch(
1146 client
1147 .introspection()
1148 .get_staging_schemas(environment)
1149 .await,
1150 "query staging schemas",
1151 );
1152 let staging_clusters = best_effort_fetch(
1153 client
1154 .introspection()
1155 .get_staging_clusters(environment)
1156 .await,
1157 "query staging clusters",
1158 );
1159
1160 let schema_count = staging_schemas.len();
1161 let cluster_count = staging_clusters.len();
1162
1163 if !staging_schemas.is_empty() {
1164 verbose!("Dropping staging schemas...");
1165 if let Err(e) = client
1166 .introspection()
1167 .drop_staging_schemas(&staging_schemas)
1168 .await
1169 {
1170 verbose!("Warning: Failed to drop some schemas: {}", e);
1171 } else {
1172 for sq in &staging_schemas {
1173 verbose!(" Dropped {}.{}", sq.database, sq.schema);
1174 }
1175 }
1176 }
1177
1178 if !staging_clusters.is_empty() {
1179 verbose!("Dropping staging clusters...");
1180 if let Err(e) = client
1181 .introspection()
1182 .drop_staging_clusters(&staging_clusters)
1183 .await
1184 {
1185 verbose!("Warning: Failed to drop some clusters: {}", e);
1186 } else {
1187 for cluster in &staging_clusters {
1188 verbose!(" Dropped {}", cluster);
1189 }
1190 }
1191 }
1192
1193 verbose!("Deleting deployment records...");
1194 best_effort_delete(
1195 client
1196 .deployments()
1197 .delete_deployment_clusters(environment)
1198 .await,
1199 "delete cluster records",
1200 );
1201 best_effort_delete(
1202 client
1203 .deployments()
1204 .delete_pending_statements(environment)
1205 .await,
1206 "delete pending statements",
1207 );
1208 best_effort_delete(
1209 client
1210 .deployments()
1211 .delete_replacement_mvs(environment)
1212 .await,
1213 "delete replacement MV records",
1214 );
1215 best_effort_delete(
1216 client.deployments().delete_deployment(environment).await,
1217 "delete deployment records",
1218 );
1219
1220 (schema_count, cluster_count)
1221}
1222
1223fn best_effort_fetch<T, E: fmt::Display>(result: Result<Vec<T>, E>, action: &str) -> Vec<T> {
1228 match result {
1229 Ok(values) => values,
1230 Err(e) => {
1231 verbose!("Warning: Failed to {}: {}", action, e);
1232 vec![]
1233 }
1234 }
1235}
1236
1237fn best_effort_delete<E: fmt::Display>(result: Result<(), E>, action: &str) {
1239 if let Err(e) = result {
1240 verbose!("Warning: Failed to {}: {}", action, e);
1241 }
1242}
1243
1244async fn deploy_single_object(
1255 executor: &DeploymentExecutor<'_>,
1256 object_id: &ObjectId,
1257 typed_obj: &DatabaseObject,
1258 staging_suffix: &str,
1259 planned_project: &Project,
1260 objects_to_deploy_set: &BTreeSet<ObjectId>,
1261 replacement_objects: &BTreeSet<ObjectId>,
1262 transform: impl FnOnce(Statement) -> Statement,
1263) -> Result<(), CliError> {
1264 let original_fqn: FullyQualifiedName = object_id.clone().into();
1265
1266 let mut visitor = NormalizingVisitor::staging(
1267 &original_fqn,
1268 staging_suffix.to_string(),
1269 &planned_project.external_dependencies,
1270 Some(objects_to_deploy_set),
1271 replacement_objects,
1272 );
1273
1274 let stmt = typed_obj
1275 .stmt
1276 .clone()
1277 .normalize_name_with(&visitor, &original_fqn.to_item_name())
1278 .normalize_dependencies_with(&mut visitor)
1279 .normalize_cluster_with(&visitor);
1280
1281 let stmt = transform(stmt);
1282 executor.execute_sql(&stmt).await?;
1283
1284 let mut indexes = typed_obj.indexes.clone();
1286 let mut grants = typed_obj.grants.clone();
1287 let mut comments = typed_obj.comments.clone();
1288
1289 visitor.normalize_index_references(&mut indexes);
1290 visitor.normalize_index_clusters(&mut indexes);
1291 visitor.normalize_grant_references(&mut grants);
1292 visitor.normalize_comment_references(&mut comments);
1293
1294 for index in &indexes {
1295 executor.execute_sql(index).await?;
1296 }
1297
1298 for grant in &grants {
1299 executor.execute_sql(grant).await?;
1300 }
1301
1302 for comment in &comments {
1303 executor.execute_sql(comment).await?;
1304 }
1305
1306 Ok(())
1307}
1308
1309fn validate_no_new_objects_in_existing_stable_schemas(
1312 change_set: &ChangeSet,
1313 production_snapshot: &DeploymentSnapshot,
1314) -> Result<(), CliError> {
1315 let blocked: Vec<_> = change_set
1316 .new_replacement_objects
1317 .iter()
1318 .filter(|obj| {
1319 !production_snapshot.objects.contains_key(obj)
1320 && production_snapshot
1321 .objects
1322 .keys()
1323 .any(|prod| prod.database() == obj.database() && prod.schema() == obj.schema())
1324 })
1325 .collect();
1326
1327 if blocked.is_empty() {
1328 return Ok(());
1329 }
1330
1331 let first = blocked[0];
1332 Err(CliError::NewObjectInExistingStableSchema {
1333 database: first.expect_database().to_string(),
1334 schema: first.schema().to_string(),
1335 objects: blocked.iter().map(|o| o.object().to_string()).collect(),
1336 })
1337}
1338
1339#[cfg(test)]
1340mod tests {
1341 use super::*;
1342 use crate::project::analysis::deployment_snapshot::build_snapshot_from_planned;
1343 use crate::project::ir::compiled;
1344 use crate::project::ir::object_id::ObjectId;
1345 use std::collections::{BTreeMap, BTreeSet};
1346
1347 #[mz_ore::test]
1348 fn parse_qualified_schema_requires_two_parts() {
1349 let sq = parse_qualified_schema("app.core").expect("qualified name parses");
1351 assert_eq!(
1352 sq,
1353 SchemaQualifier::new("app".to_string(), "core".to_string())
1354 );
1355
1356 let sq = parse_qualified_schema("app.\"select\"").expect("quoted keyword parses");
1358 assert_eq!(
1359 sq,
1360 SchemaQualifier::new("app".to_string(), "select".to_string())
1361 );
1362
1363 assert!(parse_qualified_schema("core").is_err());
1365 assert!(parse_qualified_schema("app.core.orders").is_err());
1366 }
1367
1368 fn make_typed_object(sqls: &[&str]) -> DatabaseObject {
1373 let mut stmt = None;
1374 let mut indexes = Vec::new();
1375
1376 for sql in sqls {
1377 let parsed = mz_sql_parser::parser::parse_statements(sql).unwrap();
1378 for p in parsed {
1379 match p.ast {
1380 mz_sql_parser::ast::Statement::CreateView(s) => {
1381 stmt = Some(Statement::CreateView(s));
1382 }
1383 mz_sql_parser::ast::Statement::CreateMaterializedView(s) => {
1384 stmt = Some(Statement::CreateMaterializedView(s));
1385 }
1386 mz_sql_parser::ast::Statement::CreateTable(s) => {
1387 stmt = Some(Statement::CreateTable(s));
1388 }
1389 mz_sql_parser::ast::Statement::CreateSource(s) => {
1390 stmt = Some(Statement::CreateSource(s));
1391 }
1392 mz_sql_parser::ast::Statement::CreateConnection(s) => {
1393 stmt = Some(Statement::CreateConnection(s));
1394 }
1395 mz_sql_parser::ast::Statement::CreateSecret(s) => {
1396 stmt = Some(Statement::CreateSecret(s));
1397 }
1398 mz_sql_parser::ast::Statement::CreateIndex(s) => {
1399 indexes.push(s);
1400 }
1401 other => panic!("Unexpected statement type: {:?}", other),
1402 }
1403 }
1404 }
1405
1406 DatabaseObject {
1407 path: std::path::PathBuf::from("test.sql"),
1408 stmt: stmt.expect("Expected at least one CREATE statement"),
1409 indexes,
1410 grants: vec![],
1411 comments: vec![],
1412 tests: vec![],
1413 }
1414 }
1415
1416 fn make_planned_project(objects: Vec<(&str, &str, &str, DatabaseObject)>) -> Project {
1418 let mut db_map: BTreeMap<String, BTreeMap<String, Vec<DatabaseObject>>> = BTreeMap::new();
1420
1421 for (database, schema, _name, typed_obj) in objects {
1422 db_map
1423 .entry(database.to_string())
1424 .or_default()
1425 .entry(schema.to_string())
1426 .or_default()
1427 .push(typed_obj);
1428 }
1429
1430 let databases: Vec<compiled::Database> = db_map
1431 .into_iter()
1432 .map(|(db_name, schemas)| compiled::Database {
1433 name: db_name,
1434 schemas: schemas
1435 .into_iter()
1436 .map(|(schema_name, objs)| compiled::Schema {
1437 name: schema_name,
1438 objects: objs,
1439 mod_statements: None,
1440 })
1441 .collect(),
1442 mod_statements: None,
1443 })
1444 .collect();
1445
1446 let typed_project = compiled::Project {
1447 databases,
1448 replacement_schemas: BTreeSet::new(),
1449 };
1450
1451 Project::from(typed_project)
1452 }
1453
1454 #[cfg_attr(miri, ignore)] #[mz_ore::test]
1456 fn test_full_deploy_view_not_indexed_mixed_types() {
1457 let view_obj = make_typed_object(&["CREATE VIEW my_view AS SELECT 1"]);
1458 let table_obj = make_typed_object(&["CREATE TABLE my_table (id INT)"]);
1459 let source_obj = make_typed_object(&[
1460 "CREATE SOURCE my_source IN CLUSTER source_cluster FROM LOAD GENERATOR COUNTER",
1461 ]);
1462 let conn_obj =
1463 make_typed_object(&["CREATE CONNECTION my_conn TO KAFKA (BROKER 'localhost:9092')"]);
1464 let secret_obj = make_typed_object(&["CREATE SECRET my_secret AS 'hunter2'"]);
1465
1466 let objects: Vec<ObjectRef> = vec![
1467 (
1468 ObjectId::new("db".into(), "public".into(), "my_view".into()),
1469 &view_obj,
1470 ),
1471 (
1472 ObjectId::new("db".into(), "public".into(), "my_table".into()),
1473 &table_obj,
1474 ),
1475 (
1476 ObjectId::new("db".into(), "public".into(), "my_source".into()),
1477 &source_obj,
1478 ),
1479 (
1480 ObjectId::new("db".into(), "public".into(), "my_conn".into()),
1481 &conn_obj,
1482 ),
1483 (
1484 ObjectId::new("db".into(), "public".into(), "my_secret".into()),
1485 &secret_obj,
1486 ),
1487 ];
1488
1489 let replacement_ids = BTreeSet::new();
1490 let partitioned = partition_objects(objects, &replacement_ids);
1491
1492 assert_eq!(
1494 partitioned.objects.len(),
1495 1,
1496 "Only the view should be staged"
1497 );
1498 assert_eq!(partitioned.objects[0].0.object(), "my_view");
1499
1500 assert_eq!(
1502 partitioned.table_count, 4,
1503 "Table, source, connection, and secret should all be skipped"
1504 );
1505
1506 assert!(partitioned.sinks.is_empty());
1508 assert!(partitioned.replacement_mvs.is_empty());
1509
1510 let (schema_set, cluster_set) =
1512 collect_stage_resources(&partitioned.objects, &partitioned.replacement_mvs);
1513
1514 assert_eq!(schema_set.len(), 1);
1516 assert!(schema_set.contains(&SchemaQualifier::new("db".into(), "public".into())));
1517
1518 assert!(
1520 cluster_set.is_empty(),
1521 "View without index should not require any clusters"
1522 );
1523 }
1524
1525 #[cfg_attr(miri, ignore)] #[mz_ore::test]
1527 fn test_full_deploy_view_indexed_different_cluster() {
1528 let view_obj = make_typed_object(&[
1529 "CREATE VIEW my_view AS SELECT 1",
1530 "CREATE INDEX my_idx IN CLUSTER index_cluster ON my_view (column1)",
1531 ]);
1532 let table_obj = make_typed_object(&["CREATE TABLE my_table (id INT)"]);
1533 let source_obj = make_typed_object(&[
1534 "CREATE SOURCE my_source IN CLUSTER source_cluster FROM LOAD GENERATOR COUNTER",
1535 ]);
1536 let conn_obj =
1537 make_typed_object(&["CREATE CONNECTION my_conn TO KAFKA (BROKER 'localhost:9092')"]);
1538 let secret_obj = make_typed_object(&["CREATE SECRET my_secret AS 'hunter2'"]);
1539
1540 let objects: Vec<ObjectRef> = vec![
1541 (
1542 ObjectId::new("db".into(), "public".into(), "my_view".into()),
1543 &view_obj,
1544 ),
1545 (
1546 ObjectId::new("db".into(), "public".into(), "my_table".into()),
1547 &table_obj,
1548 ),
1549 (
1550 ObjectId::new("db".into(), "public".into(), "my_source".into()),
1551 &source_obj,
1552 ),
1553 (
1554 ObjectId::new("db".into(), "public".into(), "my_conn".into()),
1555 &conn_obj,
1556 ),
1557 (
1558 ObjectId::new("db".into(), "public".into(), "my_secret".into()),
1559 &secret_obj,
1560 ),
1561 ];
1562
1563 let replacement_ids = BTreeSet::new();
1564 let partitioned = partition_objects(objects, &replacement_ids);
1565
1566 assert_eq!(partitioned.objects.len(), 1);
1568 assert_eq!(partitioned.objects[0].0.object(), "my_view");
1569 assert_eq!(partitioned.table_count, 4);
1570
1571 let (schema_set, cluster_set) =
1573 collect_stage_resources(&partitioned.objects, &partitioned.replacement_mvs);
1574
1575 assert_eq!(schema_set.len(), 1);
1577 assert!(schema_set.contains(&SchemaQualifier::new("db".into(), "public".into())));
1578
1579 assert_eq!(
1581 cluster_set.len(),
1582 1,
1583 "Should only have index_cluster, got: {:?}",
1584 cluster_set
1585 );
1586 assert!(
1587 cluster_set.contains("index_cluster"),
1588 "Should stage index_cluster from the view's index"
1589 );
1590 assert!(
1591 !cluster_set.contains("source_cluster"),
1592 "Should NOT stage source_cluster (source is not staged)"
1593 );
1594 }
1595
1596 #[cfg_attr(miri, ignore)] #[mz_ore::test]
1598 fn test_incremental_deploy_view_updated_not_indexed() {
1599 let view_obj = make_typed_object(&["CREATE VIEW my_view AS SELECT 1"]);
1601 let table_obj = make_typed_object(&["CREATE TABLE my_table (id INT)"]);
1602 let source_obj = make_typed_object(&[
1603 "CREATE SOURCE my_source IN CLUSTER source_cluster FROM LOAD GENERATOR COUNTER",
1604 ]);
1605 let conn_obj =
1606 make_typed_object(&["CREATE CONNECTION my_conn TO KAFKA (BROKER 'localhost:9092')"]);
1607 let secret_obj = make_typed_object(&["CREATE SECRET my_secret AS 'hunter2'"]);
1608
1609 let planned_project = make_planned_project(vec![
1610 ("db", "public", "my_view", view_obj),
1611 ("db", "storage", "my_table", table_obj),
1612 ("db", "storage", "my_source", source_obj),
1613 ("db", "storage", "my_conn", conn_obj),
1614 ("db", "storage", "my_secret", secret_obj),
1615 ]);
1616
1617 let new_snapshot = build_snapshot_from_planned(&planned_project).unwrap();
1619
1620 let mut old_snapshot = DeploymentSnapshot::default();
1622 for (object_id, hash) in &new_snapshot.objects {
1623 if object_id.object() == "my_view" {
1624 old_snapshot
1626 .objects
1627 .insert(object_id.clone(), "old_hash".to_string());
1628 } else {
1629 old_snapshot.objects.insert(object_id.clone(), hash.clone());
1630 }
1631 }
1632
1633 let change_set = ChangeSet::from_deployment_snapshot_comparison(
1635 &old_snapshot,
1636 &new_snapshot,
1637 &planned_project,
1638 &BTreeSet::new(),
1639 );
1640
1641 assert!(
1643 change_set.objects_to_deploy.contains(&ObjectId::new(
1644 "db".into(),
1645 "public".into(),
1646 "my_view".into()
1647 )),
1648 "Changed view should be in objects_to_deploy"
1649 );
1650
1651 let objects = planned_project
1653 .get_sorted_objects_filtered(&change_set.objects_to_deploy)
1654 .unwrap();
1655
1656 let partitioned = partition_objects(objects, &change_set.changed_replacement_objects);
1657
1658 assert_eq!(
1660 partitioned.objects.len(),
1661 1,
1662 "Only the changed view should be staged, got: {:?}",
1663 partitioned
1664 .objects
1665 .iter()
1666 .map(|(id, _)| id.object())
1667 .collect::<Vec<_>>()
1668 );
1669 assert_eq!(partitioned.objects[0].0.object(), "my_view");
1670
1671 let (schema_set, cluster_set) =
1672 collect_stage_resources(&partitioned.objects, &partitioned.replacement_mvs);
1673
1674 assert_eq!(schema_set.len(), 1);
1675 assert!(schema_set.contains(&SchemaQualifier::new("db".into(), "public".into())));
1676 assert!(
1677 cluster_set.is_empty(),
1678 "View without index should not require any clusters"
1679 );
1680 }
1681
1682 #[cfg_attr(miri, ignore)] #[mz_ore::test]
1684 fn test_incremental_deploy_view_updated_indexed_different_cluster() {
1685 let view_obj = make_typed_object(&[
1687 "CREATE VIEW my_view AS SELECT 1",
1688 "CREATE INDEX my_idx IN CLUSTER index_cluster ON my_view (column1)",
1689 ]);
1690 let table_obj = make_typed_object(&["CREATE TABLE my_table (id INT)"]);
1691 let source_obj = make_typed_object(&[
1692 "CREATE SOURCE my_source IN CLUSTER source_cluster FROM LOAD GENERATOR COUNTER",
1693 ]);
1694 let conn_obj =
1695 make_typed_object(&["CREATE CONNECTION my_conn TO KAFKA (BROKER 'localhost:9092')"]);
1696 let secret_obj = make_typed_object(&["CREATE SECRET my_secret AS 'hunter2'"]);
1697
1698 let planned_project = make_planned_project(vec![
1699 ("db", "public", "my_view", view_obj),
1700 ("db", "storage", "my_table", table_obj),
1701 ("db", "storage", "my_source", source_obj),
1702 ("db", "storage", "my_conn", conn_obj),
1703 ("db", "storage", "my_secret", secret_obj),
1704 ]);
1705
1706 let new_snapshot = build_snapshot_from_planned(&planned_project).unwrap();
1708
1709 let mut old_snapshot = DeploymentSnapshot::default();
1711 for (object_id, hash) in &new_snapshot.objects {
1712 if object_id.object() == "my_view" {
1713 old_snapshot
1714 .objects
1715 .insert(object_id.clone(), "old_hash".to_string());
1716 } else {
1717 old_snapshot.objects.insert(object_id.clone(), hash.clone());
1718 }
1719 }
1720
1721 let change_set = ChangeSet::from_deployment_snapshot_comparison(
1723 &old_snapshot,
1724 &new_snapshot,
1725 &planned_project,
1726 &BTreeSet::new(),
1727 );
1728
1729 assert!(
1730 change_set.objects_to_deploy.contains(&ObjectId::new(
1731 "db".into(),
1732 "public".into(),
1733 "my_view".into()
1734 )),
1735 "Changed view should be in objects_to_deploy"
1736 );
1737
1738 let objects = planned_project
1740 .get_sorted_objects_filtered(&change_set.objects_to_deploy)
1741 .unwrap();
1742
1743 let partitioned = partition_objects(objects, &change_set.changed_replacement_objects);
1744
1745 assert_eq!(
1747 partitioned.objects.len(),
1748 1,
1749 "Only the changed view should be staged, got: {:?}",
1750 partitioned
1751 .objects
1752 .iter()
1753 .map(|(id, _)| id.object())
1754 .collect::<Vec<_>>()
1755 );
1756 assert_eq!(partitioned.objects[0].0.object(), "my_view");
1757
1758 let (schema_set, cluster_set) =
1759 collect_stage_resources(&partitioned.objects, &partitioned.replacement_mvs);
1760
1761 assert_eq!(schema_set.len(), 1);
1762 assert!(schema_set.contains(&SchemaQualifier::new("db".into(), "public".into())));
1763
1764 assert_eq!(
1766 cluster_set.len(),
1767 1,
1768 "Should only have index_cluster, got: {:?}",
1769 cluster_set
1770 );
1771 assert!(
1772 cluster_set.contains("index_cluster"),
1773 "Should stage index_cluster from the view's index"
1774 );
1775 assert!(
1776 !cluster_set.contains("source_cluster"),
1777 "Should NOT stage source_cluster"
1778 );
1779 }
1780
1781 fn make_empty_change_set() -> ChangeSet {
1782 ChangeSet {
1783 changed_objects: BTreeSet::new(),
1784 dirty_schemas: BTreeSet::new(),
1785 dirty_clusters: BTreeSet::new(),
1786 objects_to_deploy: BTreeSet::new(),
1787 new_replacement_objects: BTreeSet::new(),
1788 changed_replacement_objects: BTreeSet::new(),
1789 }
1790 }
1791
1792 #[mz_ore::test]
1793 fn test_validate_no_new_replacement_objects_first_deploy() {
1794 let cs = make_empty_change_set();
1795 let snapshot = DeploymentSnapshot::default();
1796 assert!(validate_no_new_objects_in_existing_stable_schemas(&cs, &snapshot).is_ok());
1797 }
1798
1799 #[mz_ore::test]
1800 fn test_validate_new_replacement_objects_in_brand_new_schema() {
1801 let mut cs = make_empty_change_set();
1802 cs.new_replacement_objects.insert(ObjectId::new(
1803 "db".into(),
1804 "analytics".into(),
1805 "new_mv".into(),
1806 ));
1807
1808 let mut snapshot = DeploymentSnapshot::default();
1810 snapshot.objects.insert(
1811 ObjectId::new("db".into(), "public".into(), "existing_mv".into()),
1812 "hash1".into(),
1813 );
1814
1815 assert!(validate_no_new_objects_in_existing_stable_schemas(&cs, &snapshot).is_ok());
1816 }
1817
1818 #[mz_ore::test]
1819 fn test_validate_new_replacement_objects_in_existing_production_schema() {
1820 let mut cs = make_empty_change_set();
1821 cs.new_replacement_objects.insert(ObjectId::new(
1822 "db".into(),
1823 "analytics".into(),
1824 "new_mv".into(),
1825 ));
1826
1827 let mut snapshot = DeploymentSnapshot::default();
1829 snapshot.objects.insert(
1830 ObjectId::new("db".into(), "analytics".into(), "existing_mv".into()),
1831 "hash1".into(),
1832 );
1833
1834 let result = validate_no_new_objects_in_existing_stable_schemas(&cs, &snapshot);
1835 assert!(result.is_err());
1836 match result.unwrap_err() {
1837 CliError::NewObjectInExistingStableSchema {
1838 database,
1839 schema,
1840 objects,
1841 } => {
1842 assert_eq!(database, "db");
1843 assert_eq!(schema, "analytics");
1844 assert_eq!(objects, vec!["new_mv"]);
1845 }
1846 other => panic!("Expected NewObjectInExistingStableSchema, got: {:?}", other),
1847 }
1848 }
1849
1850 #[mz_ore::test]
1851 fn test_validate_changed_replacement_objects_only() {
1852 let mut cs = make_empty_change_set();
1853 cs.changed_replacement_objects.insert(ObjectId::new(
1855 "db".into(),
1856 "analytics".into(),
1857 "changed_mv".into(),
1858 ));
1859
1860 let mut snapshot = DeploymentSnapshot::default();
1861 snapshot.objects.insert(
1862 ObjectId::new("db".into(), "analytics".into(), "changed_mv".into()),
1863 "hash1".into(),
1864 );
1865
1866 assert!(validate_no_new_objects_in_existing_stable_schemas(&cs, &snapshot).is_ok());
1867 }
1868
1869 #[mz_ore::test]
1870 fn test_validate_mixed_new_in_new_schema_changed_in_existing() {
1871 let mut cs = make_empty_change_set();
1872 cs.new_replacement_objects.insert(ObjectId::new(
1874 "db".into(),
1875 "new_schema".into(),
1876 "new_mv".into(),
1877 ));
1878 cs.changed_replacement_objects.insert(ObjectId::new(
1880 "db".into(),
1881 "existing_schema".into(),
1882 "changed_mv".into(),
1883 ));
1884
1885 let mut snapshot = DeploymentSnapshot::default();
1887 snapshot.objects.insert(
1888 ObjectId::new("db".into(), "existing_schema".into(), "changed_mv".into()),
1889 "hash1".into(),
1890 );
1891
1892 assert!(validate_no_new_objects_in_existing_stable_schemas(&cs, &snapshot).is_ok());
1894 }
1895
1896 #[mz_ore::test]
1897 fn test_validate_transitioning_objects_in_existing_schema_allowed() {
1898 let mut cs = make_empty_change_set();
1899 cs.new_replacement_objects.insert(ObjectId::new(
1901 "db".into(),
1902 "analytics".into(),
1903 "existing_mv".into(),
1904 ));
1905
1906 let mut snapshot = DeploymentSnapshot::default();
1908 snapshot.objects.insert(
1909 ObjectId::new("db".into(), "analytics".into(), "existing_mv".into()),
1910 "hash1".into(),
1911 );
1912
1913 assert!(validate_no_new_objects_in_existing_stable_schemas(&cs, &snapshot).is_ok());
1915 }
1916
1917 fn make_planned_project_with_replacement_schemas(
1918 objects: Vec<(&str, &str, &str, DatabaseObject)>,
1919 replacement_schemas: BTreeSet<SchemaQualifier>,
1920 ) -> Project {
1921 let mut db_map: BTreeMap<String, BTreeMap<String, Vec<DatabaseObject>>> = BTreeMap::new();
1922
1923 for (database, schema, _name, typed_obj) in objects {
1924 db_map
1925 .entry(database.to_string())
1926 .or_default()
1927 .entry(schema.to_string())
1928 .or_default()
1929 .push(typed_obj);
1930 }
1931
1932 let databases: Vec<compiled::Database> = db_map
1933 .into_iter()
1934 .map(|(db_name, schemas)| compiled::Database {
1935 name: db_name,
1936 schemas: schemas
1937 .into_iter()
1938 .map(|(schema_name, objs)| compiled::Schema {
1939 name: schema_name,
1940 objects: objs,
1941 mod_statements: None,
1942 })
1943 .collect(),
1944 mod_statements: None,
1945 })
1946 .collect();
1947
1948 let typed_project = compiled::Project {
1949 databases,
1950 replacement_schemas,
1951 };
1952
1953 Project::from(typed_project)
1954 }
1955
1956 #[cfg_attr(miri, ignore)] #[mz_ore::test]
1958 fn test_build_snapshot_replacement_schema_kind() {
1959 let mv_obj =
1960 make_typed_object(&["CREATE MATERIALIZED VIEW my_mv IN CLUSTER compute AS SELECT 1"]);
1961 let view_obj = make_typed_object(&["CREATE VIEW my_view AS SELECT 1"]);
1962
1963 let mut replacement_schemas = BTreeSet::new();
1964 replacement_schemas.insert(SchemaQualifier::new("db".into(), "stable".into()));
1965
1966 let planned_project = make_planned_project_with_replacement_schemas(
1967 vec![
1968 ("db", "stable", "my_mv", mv_obj),
1969 ("db", "regular", "my_view", view_obj),
1970 ],
1971 replacement_schemas,
1972 );
1973
1974 let snapshot = build_snapshot_from_planned(&planned_project).unwrap();
1975
1976 assert_eq!(
1978 snapshot
1979 .schemas
1980 .get(&SchemaQualifier::new("db".into(), "stable".into())),
1981 Some(&DeploymentKind::Replacement),
1982 "Replacement schema should have Replacement kind in snapshot"
1983 );
1984
1985 assert_eq!(
1987 snapshot
1988 .schemas
1989 .get(&SchemaQualifier::new("db".into(), "regular".into())),
1990 Some(&DeploymentKind::Objects),
1991 "Regular schema should have Objects kind in snapshot"
1992 );
1993 }
1994
1995 #[cfg_attr(miri, ignore)] #[mz_ore::test]
1997 fn test_build_snapshot_no_replacement_schemas_all_objects() {
1998 let mv_obj =
1999 make_typed_object(&["CREATE MATERIALIZED VIEW my_mv IN CLUSTER compute AS SELECT 1"]);
2000 let view_obj = make_typed_object(&["CREATE VIEW my_view AS SELECT 1"]);
2001
2002 let planned_project = make_planned_project(vec![
2003 ("db", "stable", "my_mv", mv_obj),
2004 ("db", "regular", "my_view", view_obj),
2005 ]);
2006
2007 let snapshot = build_snapshot_from_planned(&planned_project).unwrap();
2008
2009 assert_eq!(
2011 snapshot
2012 .schemas
2013 .get(&SchemaQualifier::new("db".into(), "stable".into())),
2014 Some(&DeploymentKind::Objects),
2015 );
2016 assert_eq!(
2017 snapshot
2018 .schemas
2019 .get(&SchemaQualifier::new("db".into(), "regular".into())),
2020 Some(&DeploymentKind::Objects),
2021 );
2022 }
2023
2024 #[cfg_attr(miri, ignore)] #[mz_ore::test]
2026 fn test_record_stage_metadata_transition_override() {
2027 let mv_obj =
2031 make_typed_object(&["CREATE MATERIALIZED VIEW my_mv IN CLUSTER compute AS SELECT 1"]);
2032
2033 let objects: Vec<ObjectRef> = vec![(
2035 ObjectId::new("db".into(), "stable".into(), "my_mv".into()),
2036 &mv_obj,
2037 )];
2038 let sinks: Vec<ObjectRef> = vec![];
2039 let replacement_mvs: Vec<ObjectRef> = vec![];
2040
2041 let mut replacement_schemas = BTreeSet::new();
2043 replacement_schemas.insert(SchemaQualifier::new("db".into(), "stable".into()));
2044
2045 let mut staging_snapshot = DeploymentSnapshot::default();
2047
2048 for (object_id, typed_obj) in &objects {
2049 let hash = deployment_snapshot::compute_typed_hash(typed_obj);
2050 staging_snapshot.objects.insert(object_id.clone(), hash);
2051 staging_snapshot.schemas.insert(
2052 SchemaQualifier::new(
2053 object_id.expect_database().to_string(),
2054 object_id.schema().to_string(),
2055 ),
2056 DeploymentKind::Objects,
2057 );
2058 }
2059
2060 for (object_id, typed_obj) in &sinks {
2061 let hash = deployment_snapshot::compute_typed_hash(typed_obj);
2062 staging_snapshot.objects.insert(object_id.clone(), hash);
2063 staging_snapshot
2064 .schemas
2065 .entry(SchemaQualifier::new(
2066 object_id.expect_database().to_string(),
2067 object_id.schema().to_string(),
2068 ))
2069 .or_insert(DeploymentKind::Sinks);
2070 }
2071
2072 for (object_id, typed_obj) in &replacement_mvs {
2073 let hash = deployment_snapshot::compute_typed_hash(typed_obj);
2074 staging_snapshot.objects.insert(object_id.clone(), hash);
2075 staging_snapshot.schemas.insert(
2076 SchemaQualifier::new(
2077 object_id.expect_database().to_string(),
2078 object_id.schema().to_string(),
2079 ),
2080 DeploymentKind::Replacement,
2081 );
2082 }
2083
2084 assert_eq!(
2086 staging_snapshot
2087 .schemas
2088 .get(&SchemaQualifier::new("db".into(), "stable".into())),
2089 Some(&DeploymentKind::Objects),
2090 "Before override, schema should be Objects (from regular objects path)"
2091 );
2092
2093 for sq in &replacement_schemas {
2095 if staging_snapshot.schemas.contains_key(sq) {
2096 staging_snapshot
2097 .schemas
2098 .insert(sq.clone(), DeploymentKind::Replacement);
2099 }
2100 }
2101
2102 assert_eq!(
2104 staging_snapshot
2105 .schemas
2106 .get(&SchemaQualifier::new("db".into(), "stable".into())),
2107 Some(&DeploymentKind::Replacement),
2108 "After override, schema should be Replacement"
2109 );
2110 }
2111
2112 #[mz_ore::test]
2113 fn test_record_stage_metadata_override_only_applies_to_existing_schemas() {
2114 let replacement_schemas =
2117 BTreeSet::from([SchemaQualifier::new("db".into(), "nonexistent".into())]);
2118
2119 let mut staging_snapshot = DeploymentSnapshot::default();
2120
2121 for sq in &replacement_schemas {
2123 if staging_snapshot.schemas.contains_key(sq) {
2124 staging_snapshot
2125 .schemas
2126 .insert(sq.clone(), DeploymentKind::Replacement);
2127 }
2128 }
2129
2130 assert!(
2132 staging_snapshot.schemas.is_empty(),
2133 "Override should not create entries for schemas with no objects"
2134 );
2135 }
2136}