1use crate::cli::CliError;
13use crate::cli::progress;
14use crate::client::{
15 ApplyState, Client, DependentSink, DeploymentKind, PendingStatement, ReplacementMvRecord,
16};
17use crate::config::Settings;
18use crate::log;
19use crate::project::SchemaQualifier;
20use crate::project::analysis::deployment_snapshot;
21use crate::project::ir::object_id::ObjectId;
22use crate::{info, verbose};
23use itertools::Itertools;
24use owo_colors::{OwoColorize, Stream, Style};
25use std::collections::BTreeSet;
26use std::fmt;
27
28fn strip_staging_suffix<'a>(staging_name: &'a str, staging_suffix: &str) -> &'a str {
40 staging_name
41 .strip_suffix(staging_suffix)
42 .unwrap_or(staging_name)
43}
44
45struct DeploymentPlan {
50 deploy_id: String,
51 apply_state: ApplyState,
52 staging_suffix: String,
53 staging_schemas: BTreeSet<SchemaQualifier>,
54 staging_clusters: BTreeSet<String>,
55 pending_statements: Vec<PendingStatement>,
56 replacement_mvs: Vec<ReplacementMvRecord>,
57 dependent_sinks: Vec<DependentSink>,
58}
59
60#[derive(serde::Serialize)]
61struct SchemaSwapView<'a> {
62 database: &'a str,
63 production_schema: String,
64 staging_schema: &'a str,
65}
66
67#[derive(serde::Serialize)]
68struct ClusterSwapView<'a> {
69 production_cluster: &'a str,
70 staging_cluster: String,
71}
72
73#[derive(serde::Serialize)]
74struct SinkToCreateView<'a> {
75 database: &'a str,
76 schema: &'a str,
77 object: &'a str,
78}
79
80#[derive(serde::Serialize)]
81struct ReplacementMvView<'a> {
82 target_database: &'a str,
83 target_schema: &'a str,
84 target_name: &'a str,
85 replacement_schema: &'a str,
86}
87
88#[derive(serde::Serialize)]
89struct SinkToRepointView<'a> {
90 sink_database: &'a str,
91 sink_schema: &'a str,
92 sink_name: &'a str,
93 dependency_database: &'a str,
94 dependency_schema: &'a str,
95 dependency_name: &'a str,
96}
97
98#[derive(serde::Serialize)]
99struct ResourceToDropView {
100 kind: String,
101 name: String,
102}
103
104impl DeploymentPlan {
105 fn apply_state_str(&self) -> &'static str {
106 match self.apply_state {
107 ApplyState::NotStarted => "not_started",
108 ApplyState::PreSwap => "pre_swap",
109 ApplyState::PostSwap => "post_swap",
110 }
111 }
112
113 fn schema_swaps(&self) -> Vec<SchemaSwapView<'_>> {
114 self.staging_schemas
115 .iter()
116 .map(|sq| {
117 let prod_schema =
118 strip_staging_suffix(&sq.schema, &self.staging_suffix).to_string();
119 SchemaSwapView {
120 database: &sq.database,
121 production_schema: prod_schema,
122 staging_schema: &sq.schema,
123 }
124 })
125 .collect()
126 }
127
128 fn cluster_swaps(&self) -> Vec<ClusterSwapView<'_>> {
129 self.staging_clusters
130 .iter()
131 .map(|cluster| ClusterSwapView {
132 production_cluster: cluster,
133 staging_cluster: format!("{}{}", cluster, self.staging_suffix),
134 })
135 .collect()
136 }
137
138 fn sinks_to_create(&self) -> Vec<SinkToCreateView<'_>> {
139 self.pending_statements
140 .iter()
141 .map(|stmt| SinkToCreateView {
142 database: &stmt.database,
143 schema: &stmt.schema,
144 object: &stmt.object,
145 })
146 .collect()
147 }
148
149 fn replacement_mv_views(&self) -> Vec<ReplacementMvView<'_>> {
150 self.replacement_mvs
151 .iter()
152 .map(|r| ReplacementMvView {
153 target_database: &r.target_database,
154 target_schema: &r.target_schema,
155 target_name: &r.target_name,
156 replacement_schema: &r.replacement_schema,
157 })
158 .collect()
159 }
160
161 fn sinks_to_repoint(&self) -> Vec<SinkToRepointView<'_>> {
162 self.dependent_sinks
163 .iter()
164 .map(|sink| SinkToRepointView {
165 sink_database: &sink.sink_database,
166 sink_schema: &sink.sink_schema,
167 sink_name: &sink.sink_name,
168 dependency_database: &sink.dependency_database,
169 dependency_schema: &sink.dependency_schema,
170 dependency_name: &sink.dependency_name,
171 })
172 .collect()
173 }
174
175 fn replacement_schemas(&self) -> BTreeSet<SchemaQualifier> {
176 self.replacement_mvs
177 .iter()
178 .map(|r| SchemaQualifier::new(r.target_database.clone(), r.replacement_schema.clone()))
179 .collect()
180 }
181
182 fn resources_to_drop(&self) -> Vec<ResourceToDropView> {
183 let mut drops = Vec::new();
184 for sq in &self.staging_schemas {
185 let prod_schema = strip_staging_suffix(&sq.schema, &self.staging_suffix);
186 let old_schema = format!("{}{}", prod_schema, self.staging_suffix);
187 drops.push(ResourceToDropView {
188 kind: "schema".to_string(),
189 name: format!("{}.{}", sq.database, old_schema),
190 });
191 }
192 for cluster in &self.staging_clusters {
193 let old_cluster = format!("{}{}", cluster, self.staging_suffix);
194 drops.push(ResourceToDropView {
195 kind: "cluster".to_string(),
196 name: old_cluster,
197 });
198 }
199 for sq in &self.replacement_schemas() {
200 drops.push(ResourceToDropView {
201 kind: "schema".to_string(),
202 name: format!("{}.{}", sq.database, sq.schema),
203 });
204 }
205 drops
206 }
207}
208
209impl serde::Serialize for DeploymentPlan {
210 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
211 use serde::ser::SerializeStruct;
212 let mut state = serializer.serialize_struct("DeploymentPlan", 8)?;
213 state.serialize_field("deploy_id", &self.deploy_id)?;
214 state.serialize_field("apply_state", self.apply_state_str())?;
215 state.serialize_field("schema_swaps", &self.schema_swaps())?;
216 state.serialize_field("cluster_swaps", &self.cluster_swaps())?;
217 state.serialize_field("sinks_to_create", &self.sinks_to_create())?;
218 state.serialize_field("replacement_mvs", &self.replacement_mv_views())?;
219 state.serialize_field("sinks_to_repoint", &self.sinks_to_repoint())?;
220 state.serialize_field("resources_to_drop", &self.resources_to_drop())?;
221 state.end()
222 }
223}
224
225impl fmt::Display for DeploymentPlan {
226 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
227 match self.apply_state {
229 ApplyState::PreSwap => {
230 let style = Style::new().yellow().bold();
231 writeln!(
232 f,
233 "\n {} Resuming from pre-swap state (apply state schemas already created)",
234 "note:".if_supports_color(Stream::Stderr, |t| style.style(t))
235 )?;
236 }
237 ApplyState::PostSwap => {
238 let style = Style::new().yellow().bold();
239 writeln!(
240 f,
241 "\n {} Resuming from post-swap state (swap already completed, showing remaining work)",
242 "note:".if_supports_color(Stream::Stderr, |t| style.style(t))
243 )?;
244 }
245 ApplyState::NotStarted => {}
246 }
247
248 writeln!(
250 f,
251 "\n {}",
252 "Schema Swaps:".if_supports_color(Stream::Stderr, |t| t.bold())
253 )?;
254 if self.staging_schemas.is_empty() {
255 writeln!(f, " (none)")?;
256 } else {
257 for swap in &self.schema_swaps() {
258 writeln!(
259 f,
260 " {} {}.{} {} {}.{}",
261 "~".if_supports_color(Stream::Stderr, |t| t.yellow()),
262 swap.database,
263 swap.production_schema,
264 "<->".if_supports_color(Stream::Stderr, |t| t.dimmed()),
265 swap.database,
266 swap.staging_schema
267 )?;
268 }
269 }
270
271 writeln!(
273 f,
274 "\n {}",
275 "Cluster Swaps:".if_supports_color(Stream::Stderr, |t| t.bold())
276 )?;
277 if self.staging_clusters.is_empty() {
278 writeln!(f, " (none)")?;
279 } else {
280 for swap in &self.cluster_swaps() {
281 writeln!(
282 f,
283 " {} {} {} {}",
284 "~".if_supports_color(Stream::Stderr, |t| t.yellow()),
285 swap.production_cluster,
286 "<->".if_supports_color(Stream::Stderr, |t| t.dimmed()),
287 swap.staging_cluster
288 )?;
289 }
290 }
291
292 writeln!(
294 f,
295 "\n {}",
296 "Sinks to Create:".if_supports_color(Stream::Stderr, |t| t.bold())
297 )?;
298 if self.pending_statements.is_empty() {
299 writeln!(f, " (none)")?;
300 } else {
301 for stmt in &self.pending_statements {
302 writeln!(
303 f,
304 " {} {}.{}.{}",
305 "+".if_supports_color(Stream::Stderr, |t| t.green()),
306 stmt.database,
307 stmt.schema,
308 stmt.object
309 )?;
310 }
311 }
312
313 writeln!(
315 f,
316 "\n {}",
317 "Replacement Materialized Views:".if_supports_color(Stream::Stderr, |t| t.bold())
318 )?;
319 if self.replacement_mvs.is_empty() {
320 writeln!(f, " (none)")?;
321 } else {
322 for record in &self.replacement_mvs {
323 writeln!(
324 f,
325 " {} {}.{}.{} {} {}.{}.{}",
326 "~".if_supports_color(Stream::Stderr, |t| t.yellow()),
327 record.target_database,
328 record.target_schema,
329 record.target_name,
330 "<-".if_supports_color(Stream::Stderr, |t| t.dimmed()),
331 record.target_database,
332 record.replacement_schema,
333 record.target_name
334 )?;
335 }
336 }
337
338 writeln!(
340 f,
341 "\n {}",
342 "Sinks to Repoint:".if_supports_color(Stream::Stderr, |t| t.bold())
343 )?;
344 if self.dependent_sinks.is_empty() {
345 writeln!(f, " (none)")?;
346 } else {
347 for sink in &self.dependent_sinks {
348 writeln!(
349 f,
350 " {} {}.{}.{} {} {}.{}.{}",
351 "~".if_supports_color(Stream::Stderr, |t| t.yellow()),
352 sink.sink_database,
353 sink.sink_schema,
354 sink.sink_name,
355 "->".if_supports_color(Stream::Stderr, |t| t.dimmed()),
356 sink.dependency_database,
357 sink.dependency_schema,
358 sink.dependency_name
359 )?;
360 }
361 }
362
363 writeln!(
365 f,
366 "\n {}",
367 "Old Resources to Drop:".if_supports_color(Stream::Stderr, |t| t.bold())
368 )?;
369 let drops = self.resources_to_drop();
370 if drops.is_empty() {
371 writeln!(f, " (none)")?;
372 } else {
373 for drop in &drops {
374 writeln!(
375 f,
376 " {} {}",
377 "-".if_supports_color(Stream::Stderr, |t| t.red()),
378 drop.name
379 )?;
380 }
381 }
382
383 Ok(())
384 }
385}
386
387async fn generate_deployment_plan(
389 client: &Client,
390 deploy_id: &str,
391 apply_state: ApplyState,
392 force: bool,
393) -> Result<DeploymentPlan, CliError> {
394 let (staging_schemas, staging_clusters, staging_suffix) = if apply_state == ApplyState::PostSwap
396 {
397 verbose!("Resuming post-swap: skipping conflict check and resource gathering");
398 (BTreeSet::new(), BTreeSet::new(), format!("_{}", deploy_id))
399 } else {
400 gather_resources_and_check_conflicts(client, deploy_id, force).await?
401 };
402
403 let pending_statements = client
405 .deployments()
406 .get_pending_statements(deploy_id)
407 .await?;
408
409 let replacement_mvs = client.deployments().get_replacement_mvs(deploy_id).await?;
411
412 let dependent_sinks = if staging_schemas.is_empty() {
414 Vec::new()
415 } else {
416 let prod_schemas: Vec<SchemaQualifier> = staging_schemas
417 .iter()
418 .map(|sq| {
419 let prod_schema = strip_staging_suffix(&sq.schema, &staging_suffix);
420 SchemaQualifier::new(sq.database.clone(), prod_schema.to_string())
421 })
422 .collect();
423
424 client
425 .introspection()
426 .find_sinks_depending_on_schemas(&prod_schemas)
427 .await
428 .map_err(CliError::Connection)?
429 };
430
431 Ok(DeploymentPlan {
432 deploy_id: deploy_id.to_string(),
433 apply_state,
434 staging_suffix,
435 staging_schemas,
436 staging_clusters,
437 pending_statements,
438 replacement_mvs,
439 dependent_sinks,
440 })
441}
442
443pub async fn run(
445 settings: &Settings,
446 deploy_id: &str,
447 force: bool,
448 dry_run: bool,
449) -> Result<(), CliError> {
450 let profile = settings.connection();
451
452 let client = Client::connect_with_profile(profile.clone())
453 .await
454 .map_err(CliError::Connection)?;
455
456 super::setup::verify(&client, settings.emulator()).await?;
457 let role = super::setup::validate_connection(&client, settings.emulator()).await?;
458 super::setup::require_deployer(role)?;
459
460 let apply_state = client.deployments().get_apply_state(deploy_id).await?;
461 verbose!("Apply state: {:?}", apply_state);
462
463 if matches!(apply_state, ApplyState::NotStarted) {
470 client.deployments().validate_staging(deploy_id).await?;
471 }
472
473 let staging_snapshot =
474 deployment_snapshot::load_from_database(&client, Some(deploy_id)).await?;
475 verbose!(
476 "Found {} objects in staging deployment",
477 staging_snapshot.objects.len()
478 );
479
480 let plan = generate_deployment_plan(&client, deploy_id, apply_state, force).await?;
481
482 if dry_run {
483 log::output(&plan);
484 return Ok(());
485 }
486
487 if log::json_output_enabled() {
488 log::output_json(&plan);
489 }
490
491 execute_swap_phase(&client, &plan).await?;
492 maybe_crash("after-swap");
493 run_post_swap_steps(&client, &plan).await?;
494 maybe_crash("after-post-swap");
495 cleanup_apply_state(&client, &plan.deploy_id).await?;
496
497 progress::success("Deployment completed successfully!");
498
499 Ok(())
500}
501
502fn maybe_crash(phase: &str) {
511 if std::env::var("MZ_DEPLOY_FAIL_AT").ok().as_deref() == Some(phase) {
512 crate::info!("MZ_DEPLOY_FAIL_AT={}: simulating crash", phase);
513 std::process::exit(1);
514 }
515}
516
517async fn execute_swap_phase(client: &Client, plan: &DeploymentPlan) -> Result<(), CliError> {
519 match plan.apply_state {
520 ApplyState::NotStarted => {
521 verbose!("Creating apply state schemas...");
522 client
523 .deployments()
524 .create_apply_state_schemas(&plan.deploy_id)
525 .await?;
526 maybe_crash("after-markers");
527 verbose!("Executing atomic swap...");
528 execute_atomic_swap(
529 client,
530 &plan.deploy_id,
531 &plan.staging_schemas,
532 &plan.staging_clusters,
533 )
534 .await?;
535 }
536 ApplyState::PreSwap => {
537 verbose!("Resuming from pre-swap state...");
538 execute_atomic_swap(
539 client,
540 &plan.deploy_id,
541 &plan.staging_schemas,
542 &plan.staging_clusters,
543 )
544 .await?;
545 }
546 ApplyState::PostSwap => {
547 verbose!("Resuming from post-swap state...");
548 }
549 }
550 Ok(())
551}
552
553async fn run_post_swap_steps(client: &Client, plan: &DeploymentPlan) -> Result<(), CliError> {
555 execute_pending_sinks(client, plan).await?;
556 apply_replacement_mvs(client, plan).await?;
557
558 if !plan.staging_schemas.is_empty() {
559 repoint_dependent_sinks(client, plan).await?;
560 }
561
562 verbose!("\nUpdating deployment table...");
563 client
564 .deployments()
565 .update_promoted_at(&plan.deploy_id)
566 .await
567 .map_err(|source| CliError::DeploymentStateWriteFailed { source })?;
568
569 if !plan.staging_schemas.is_empty() || !plan.staging_clusters.is_empty() {
570 drop_old_resources(client, plan).await;
571 }
572 Ok(())
573}
574
575async fn cleanup_apply_state(client: &Client, deploy_id: &str) -> Result<(), CliError> {
577 verbose!("Cleaning up apply state...");
578 client
579 .deployments()
580 .delete_apply_state_schemas(deploy_id)
581 .await?;
582 client
583 .deployments()
584 .delete_pending_statements(deploy_id)
585 .await?;
586 client
587 .deployments()
588 .delete_replacement_mvs(deploy_id)
589 .await?;
590 client
591 .deployments()
592 .delete_deployment_clusters(deploy_id)
593 .await
594 .map_err(|source| CliError::DeploymentStateWriteFailed { source })?;
595 Ok(())
596}
597
598async fn gather_resources_and_check_conflicts(
604 client: &Client,
605 deploy_id: &str,
606 force: bool,
607) -> Result<(BTreeSet<SchemaQualifier>, BTreeSet<String>, String), CliError> {
608 verbose!("Checking for deployment conflicts...");
609 let conflicts = client
610 .deployments()
611 .check_deployment_conflicts(deploy_id)
612 .await?;
613
614 if !conflicts.is_empty() {
615 if force {
616 let style = Style::new().yellow().bold();
618 info!(
619 "\n{}: deployment conflicts detected, but continuing due to --force flag",
620 "warning".if_supports_color(Stream::Stderr, |t| style.style(t))
621 );
622 for conflict in &conflicts {
623 info!(
624 " - {}.{} (last promoted by '{}' deployment)",
625 conflict.database, conflict.schema, conflict.deploy_id
626 );
627 }
628 info!();
629 } else {
630 return Err(CliError::DeploymentConflict { conflicts });
632 }
633 } else {
634 verbose!("No conflicts detected");
635 }
636
637 let staging_suffix = format!("_{}", deploy_id);
639 let mut staging_schemas = BTreeSet::new();
640 let mut staging_clusters = BTreeSet::new();
641
642 let deployment_records = client
644 .deployments()
645 .get_schema_deployments(Some(deploy_id))
646 .await?;
647
648 let production_snapshot = deployment_snapshot::load_from_database(client, None).await?;
652
653 let schemas_to_check: Vec<(String, String)> = deployment_records
656 .iter()
657 .filter(|record| {
658 if record.kind == DeploymentKind::Sinks {
659 verbose!(
660 "Skipping sink-only schema {}.{} (no swap needed)",
661 record.database,
662 record.schema
663 );
664 false
665 } else if record.kind == DeploymentKind::Replacement {
666 let sq = SchemaQualifier::new(record.database.clone(), record.schema.clone());
667 let already_replacement = production_snapshot.schemas.get(&sq).copied()
668 == Some(DeploymentKind::Replacement);
669 if already_replacement {
670 verbose!(
671 "Skipping replacement schema {}.{} (already Replacement in production)",
672 record.database,
673 record.schema
674 );
675 false
676 } else {
677 verbose!(
678 "Including replacement schema {}.{} in swap (first Replacement deployment)",
679 record.database,
680 record.schema
681 );
682 true
683 }
684 } else {
685 true
686 }
687 })
688 .map(|record| {
689 let staging_schema = format!("{}{}", record.schema, staging_suffix);
690 (record.database.clone(), staging_schema)
691 })
692 .collect();
693
694 let existing_schemas = client
696 .introspection()
697 .check_schemas_exist(&schemas_to_check)
698 .await?;
699
700 for pair in schemas_to_check {
701 if existing_schemas.contains(&pair) {
702 staging_schemas.insert(SchemaQualifier::new(pair.0, pair.1));
703 } else {
704 info!("Warning: Staging schema {}.{} not found", pair.0, pair.1);
705 }
706 }
707
708 client
710 .deployments()
711 .validate_deployment_clusters(deploy_id)
712 .await?;
713
714 let cluster_names = client
716 .deployments()
717 .get_deployment_clusters(deploy_id)
718 .await?;
719
720 let staging_cluster_names: Vec<String> = cluster_names
722 .iter()
723 .map(|name| format!("{}{}", name, staging_suffix))
724 .collect();
725 let existing_clusters = client
726 .introspection()
727 .check_clusters_exist(&staging_cluster_names)
728 .await?;
729
730 for (cluster_name, staging_cluster) in cluster_names.into_iter().zip_eq(staging_cluster_names) {
731 if existing_clusters.contains(&staging_cluster) {
732 staging_clusters.insert(cluster_name);
733 } else {
734 info!("Warning: Staging cluster {} not found", staging_cluster);
735 }
736 }
737
738 verbose!("\nSchemas to swap:");
739 for sq in &staging_schemas {
740 let prod_schema = strip_staging_suffix(&sq.schema, &staging_suffix);
741 verbose!(" - {}.{} <-> {}", sq.database, sq.schema, prod_schema);
742 }
743
744 if !staging_clusters.is_empty() {
745 verbose!("\nClusters to swap:");
746 for cluster in &staging_clusters {
747 let staging_cluster = format!("{}{}", cluster, staging_suffix);
748 verbose!(" - {} <-> {}", staging_cluster, cluster);
749 }
750 }
751
752 Ok((staging_schemas, staging_clusters, staging_suffix))
753}
754
755async fn execute_atomic_swap(
757 client: &Client,
758 deploy_id: &str,
759 staging_schemas: &BTreeSet<SchemaQualifier>,
760 staging_clusters: &BTreeSet<String>,
761) -> Result<(), CliError> {
762 let staging_suffix = format!("_{}", deploy_id);
763
764 client
766 .execute("BEGIN", &[])
767 .await
768 .map_err(|e| CliError::SqlExecutionFailed {
769 statement: "BEGIN".to_string(),
770 source: e,
771 })?;
772
773 for sq in staging_schemas {
775 let prod_schema = strip_staging_suffix(&sq.schema, &staging_suffix);
776 let swap_sql = format!(
778 "ALTER SCHEMA \"{}\".\"{}\" SWAP WITH \"{}\";",
779 sq.database, prod_schema, sq.schema
780 );
781
782 verbose!(" {}", swap_sql);
783 if let Err(e) = client.execute(&swap_sql, &[]).await {
784 let _ = client.execute("ROLLBACK", &[]).await;
785 return Err(CliError::SqlExecutionFailed {
786 statement: swap_sql,
787 source: e,
788 });
789 }
790 }
791
792 for cluster in staging_clusters {
794 let staging_cluster = format!("{}{}", cluster, staging_suffix);
795 let swap_sql = format!(
796 "ALTER CLUSTER \"{}\" SWAP WITH \"{}\";",
797 cluster, staging_cluster
798 );
799
800 verbose!(" {}", swap_sql);
801 if let Err(e) = client.execute(&swap_sql, &[]).await {
802 let _ = client.execute("ROLLBACK", &[]).await;
803 return Err(CliError::SqlExecutionFailed {
804 statement: swap_sql,
805 source: e,
806 });
807 }
808 }
809
810 let pre_schema = format!("apply_{}_pre", deploy_id);
812 let post_schema = format!("apply_{}_post", deploy_id);
813 let state_swap_sql = format!(
814 "ALTER SCHEMA _mz_deploy.\"{}\" SWAP WITH \"{}\";",
815 pre_schema, post_schema
816 );
817
818 verbose!(" {}", state_swap_sql);
819 if let Err(e) = client.execute(&state_swap_sql, &[]).await {
820 let _ = client.execute("ROLLBACK", &[]).await;
821 return Err(CliError::SqlExecutionFailed {
822 statement: state_swap_sql,
823 source: e,
824 });
825 }
826
827 client
829 .execute("COMMIT", &[])
830 .await
831 .map_err(|e| CliError::SqlExecutionFailed {
832 statement: "COMMIT".to_string(),
833 source: e,
834 })?;
835
836 verbose!("Swap completed successfully");
837 Ok(())
838}
839
840async fn execute_pending_sinks(client: &Client, plan: &DeploymentPlan) -> Result<(), CliError> {
845 if plan.pending_statements.is_empty() {
846 verbose!("No pending sinks to execute");
847 return Ok(());
848 }
849
850 let sink_ids: BTreeSet<ObjectId> = plan
852 .pending_statements
853 .iter()
854 .map(|stmt| {
855 ObjectId::new(
856 stmt.database.clone(),
857 stmt.schema.clone(),
858 stmt.object.clone(),
859 )
860 })
861 .collect();
862
863 let existing_sinks = client.introspection().check_sinks_exist(&sink_ids).await?;
865
866 let sinks_to_create: Vec<_> = plan
868 .pending_statements
869 .iter()
870 .filter(|stmt| {
871 let obj_id = ObjectId::new(
872 stmt.database.clone(),
873 stmt.schema.clone(),
874 stmt.object.clone(),
875 );
876 !existing_sinks.contains(&obj_id)
877 })
878 .collect();
879
880 if !existing_sinks.is_empty() {
882 let mut existing_list: Vec<_> = existing_sinks.iter().collect();
883 existing_list.sort_by_key(|obj| obj.to_string());
884 for sink_id in existing_list {
885 verbose!(" - {}", sink_id);
886 }
887 }
888
889 if sinks_to_create.is_empty() {
890 return Ok(());
891 }
892
893 let sink_schemas: BTreeSet<(&str, &str)> = sinks_to_create
898 .iter()
899 .map(|stmt| (stmt.database.as_str(), stmt.schema.as_str()))
900 .collect();
901 for (database, schema) in sink_schemas {
902 client
903 .provisioning()
904 .create_schema(database, schema)
905 .await
906 .map_err(CliError::Connection)?;
907 }
908
909 for stmt in sinks_to_create {
910 verbose!(
911 "Creating sink {}.{}.{}...",
912 stmt.database,
913 stmt.schema,
914 stmt.object
915 );
916
917 if let Err(e) = client.execute(&stmt.statement_sql, &[]).await {
919 info!(
920 "Error creating sink {}.{}.{}: {}",
921 stmt.database, stmt.schema, stmt.object, e
922 );
923 return Err(CliError::SqlExecutionFailed {
924 statement: stmt.statement_sql.clone(),
925 source: e,
926 });
927 }
928
929 client
931 .deployments()
932 .mark_statement_executed(&plan.deploy_id, stmt.sequence_num)
933 .await?;
934
935 progress::success(&format!(
936 "{}.{}.{}",
937 stmt.database, stmt.schema, stmt.object
938 ));
939 }
940
941 Ok(())
942}
943
944async fn apply_replacement_mvs(client: &Client, plan: &DeploymentPlan) -> Result<(), CliError> {
955 if plan.replacement_mvs.is_empty() {
956 verbose!("No replacement MVs to apply");
957 return Ok(());
958 }
959
960 for record in &plan.replacement_mvs {
961 let already_applied = !client
962 .introspection()
963 .object_exists(
964 &record.target_database,
965 &record.replacement_schema,
966 &record.target_name,
967 )
968 .await
969 .map_err(CliError::Connection)?;
970 if already_applied {
971 verbose!(
972 "Skipping already-applied replacement {}.{}.{}",
973 record.target_database,
974 record.target_schema,
975 record.target_name
976 );
977 continue;
978 }
979
980 let alter_sql = format!(
981 "ALTER MATERIALIZED VIEW \"{}\".\"{}\".\"{}\"\
982 APPLY REPLACEMENT \"{}\".\"{}\".\"{}\";",
983 record.target_database,
984 record.target_schema,
985 record.target_name,
986 record.target_database,
987 record.replacement_schema,
988 record.target_name
989 );
990
991 verbose!(" {}", alter_sql);
992 if let Err(e) = client.execute(&alter_sql, &[]).await {
993 info!(
994 "Error applying replacement for {}.{}.{}: {}",
995 record.target_database, record.target_schema, record.target_name, e
996 );
997 return Err(CliError::SqlExecutionFailed {
998 statement: alter_sql,
999 source: e,
1000 });
1001 }
1002
1003 progress::success(&format!(
1004 "{}.{}.{}",
1005 record.target_database, record.target_schema, record.target_name
1006 ));
1007 }
1008
1009 let replacement_schemas = plan.replacement_schemas();
1011
1012 for sq in &replacement_schemas {
1013 let drop_sql = format!(
1014 "DROP SCHEMA IF EXISTS \"{}\".\"{}\" CASCADE;",
1015 sq.database, sq.schema
1016 );
1017 verbose!(" {}", drop_sql);
1018 if let Err(e) = client.execute(&drop_sql, &[]).await {
1019 info!(
1020 "warning: failed to drop replacement schema {}.{}: {}",
1021 sq.database, sq.schema, e
1022 );
1023 }
1024 }
1025
1026 Ok(())
1027}
1028
1029async fn repoint_dependent_sinks(client: &Client, plan: &DeploymentPlan) -> Result<(), CliError> {
1034 let staging_suffix = &plan.staging_suffix;
1035
1036 let old_schemas: Vec<SchemaQualifier> = plan
1039 .staging_schemas
1040 .iter()
1041 .map(|sq| {
1042 let prod_schema = strip_staging_suffix(&sq.schema, staging_suffix);
1043 let old_schema = format!("{}{}", prod_schema, staging_suffix);
1044 SchemaQualifier::new(sq.database.clone(), old_schema)
1045 })
1046 .collect();
1047
1048 let dependent_sinks = client
1050 .introspection()
1051 .find_sinks_depending_on_schemas(&old_schemas)
1052 .await
1053 .map_err(CliError::Connection)?;
1054
1055 if dependent_sinks.is_empty() {
1056 verbose!("No sinks depend on objects in schemas being dropped");
1057 return Ok(());
1058 }
1059
1060 let replacement_ids: BTreeSet<ObjectId> = dependent_sinks
1062 .iter()
1063 .map(|sink| {
1064 let new_schema =
1065 strip_staging_suffix(&sink.dependency_schema, staging_suffix).to_string();
1066 ObjectId::new(
1067 sink.dependency_database.clone(),
1068 new_schema,
1069 sink.dependency_name.clone(),
1070 )
1071 })
1072 .collect();
1073
1074 let existing_ids = client
1075 .introspection()
1076 .check_objects_exist(&replacement_ids)
1077 .await
1078 .map_err(CliError::Connection)?;
1079
1080 for sink in dependent_sinks {
1081 let new_schema = strip_staging_suffix(&sink.dependency_schema, staging_suffix);
1083
1084 let replacement_id = ObjectId::new(
1085 sink.dependency_database.clone(),
1086 new_schema.to_string(),
1087 sink.dependency_name.clone(),
1088 );
1089
1090 if !existing_ids.contains(&replacement_id) {
1091 return Err(CliError::SinkRepointFailed {
1092 sink: format!(
1093 "{}.{}.{}",
1094 sink.sink_database, sink.sink_schema, sink.sink_name
1095 ),
1096 reason: format!("replacement object {} does not exist", replacement_id),
1097 });
1098 }
1099
1100 let alter_sql = format!(
1102 r#"ALTER SINK "{}"."{}"."{}" SET FROM "{}"."{}"."{}""#,
1103 sink.sink_database,
1104 sink.sink_schema,
1105 sink.sink_name,
1106 sink.dependency_database,
1107 new_schema,
1108 sink.dependency_name
1109 );
1110
1111 verbose!(" {}", alter_sql);
1112 if let Err(e) = client.execute(&alter_sql, &[]).await {
1113 return Err(CliError::SinkRepointFailed {
1114 sink: format!(
1115 "{}.{}.{}",
1116 sink.sink_database, sink.sink_schema, sink.sink_name
1117 ),
1118 reason: e.to_string(),
1119 });
1120 }
1121
1122 progress::success(&format!(
1123 "{}.{}.{} -> {}.{}.{}",
1124 sink.sink_database,
1125 sink.sink_schema,
1126 sink.sink_name,
1127 sink.dependency_database,
1128 new_schema,
1129 sink.dependency_name
1130 ));
1131 }
1132
1133 Ok(())
1134}
1135
1136async fn drop_old_resources(client: &Client, plan: &DeploymentPlan) {
1138 let staging_suffix = &plan.staging_suffix;
1139
1140 for sq in &plan.staging_schemas {
1142 let prod_schema = strip_staging_suffix(&sq.schema, staging_suffix);
1143 let old_schema = format!("{}{}", prod_schema, staging_suffix);
1144 let drop_sql = format!(
1145 "DROP SCHEMA IF EXISTS \"{}\".\"{}\" CASCADE;",
1146 sq.database, old_schema
1147 );
1148
1149 verbose!(" {}", drop_sql);
1150 if let Err(e) = client.execute(&drop_sql, &[]).await {
1151 info!(
1152 "warning: failed to drop old schema {}.{}: {}",
1153 sq.database, old_schema, e
1154 );
1155 }
1156 }
1157
1158 for cluster in &plan.staging_clusters {
1160 let old_cluster = format!("{}{}", cluster, staging_suffix);
1161 let drop_sql = format!("DROP CLUSTER IF EXISTS \"{}\" CASCADE;", old_cluster);
1162
1163 verbose!(" {}", drop_sql);
1164 if let Err(e) = client.execute(&drop_sql, &[]).await {
1165 info!("warning: failed to drop old cluster {}: {}", old_cluster, e);
1166 }
1167 }
1168}
1169
1170#[cfg(test)]
1171mod tests {
1172 use super::strip_staging_suffix;
1173
1174 #[mz_ore::test]
1175 fn test_strip_staging_suffix_removes_suffix_once() {
1176 assert_eq!(
1178 strip_staging_suffix("analytics_deploy123", "_deploy123"),
1179 "analytics"
1180 );
1181 }
1182
1183 #[mz_ore::test]
1184 fn test_strip_staging_suffix_strips_exactly_once() {
1185 assert_eq!(
1195 strip_staging_suffix("analytics_prod_prod", "_prod"),
1196 "analytics_prod"
1197 );
1198 }
1199
1200 #[mz_ore::test]
1201 fn test_strip_staging_suffix_no_match_returns_input() {
1202 assert_eq!(strip_staging_suffix("analytics", "_prod"), "analytics");
1204 }
1205}