1use crate::cli::CliError;
13use crate::cli::progress;
14use crate::client::{
15 ApplyState, Client, DependentSink, DeploymentKind, PendingStatement, ReplacementMvRecord,
16};
17use crate::config::Settings;
18use crate::log;
19use crate::project::SchemaQualifier;
20use crate::project::analysis::deployment_snapshot;
21use crate::project::ir::object_id::ObjectId;
22use crate::{info, verbose};
23use itertools::Itertools;
24use owo_colors::{OwoColorize, Stream, Style};
25use std::collections::BTreeSet;
26use std::fmt;
27
28fn strip_staging_suffix<'a>(staging_name: &'a str, staging_suffix: &str) -> &'a str {
40 staging_name
41 .strip_suffix(staging_suffix)
42 .unwrap_or(staging_name)
43}
44
45struct DeploymentPlan {
50 deploy_id: String,
51 apply_state: ApplyState,
52 staging_suffix: String,
53 staging_schemas: BTreeSet<SchemaQualifier>,
54 staging_clusters: BTreeSet<String>,
55 pending_statements: Vec<PendingStatement>,
56 replacement_mvs: Vec<ReplacementMvRecord>,
57 dependent_sinks: Vec<DependentSink>,
58}
59
60#[derive(serde::Serialize)]
61struct SchemaSwapView<'a> {
62 database: &'a str,
63 production_schema: String,
64 staging_schema: &'a str,
65}
66
67#[derive(serde::Serialize)]
68struct ClusterSwapView<'a> {
69 production_cluster: &'a str,
70 staging_cluster: String,
71}
72
73#[derive(serde::Serialize)]
74struct SinkToCreateView<'a> {
75 database: &'a str,
76 schema: &'a str,
77 object: &'a str,
78}
79
80#[derive(serde::Serialize)]
81struct ReplacementMvView<'a> {
82 target_database: &'a str,
83 target_schema: &'a str,
84 target_name: &'a str,
85 replacement_schema: &'a str,
86}
87
88#[derive(serde::Serialize)]
89struct SinkToRepointView<'a> {
90 sink_database: &'a str,
91 sink_schema: &'a str,
92 sink_name: &'a str,
93 dependency_database: &'a str,
94 dependency_schema: &'a str,
95 dependency_name: &'a str,
96}
97
98#[derive(serde::Serialize)]
99struct ResourceToDropView {
100 kind: String,
101 name: String,
102}
103
104impl DeploymentPlan {
105 fn apply_state_str(&self) -> &'static str {
106 match self.apply_state {
107 ApplyState::NotStarted => "not_started",
108 ApplyState::PreSwap => "pre_swap",
109 ApplyState::PostSwap => "post_swap",
110 }
111 }
112
113 fn schema_swaps(&self) -> Vec<SchemaSwapView<'_>> {
114 self.staging_schemas
115 .iter()
116 .map(|sq| {
117 let prod_schema =
118 strip_staging_suffix(&sq.schema, &self.staging_suffix).to_string();
119 SchemaSwapView {
120 database: &sq.database,
121 production_schema: prod_schema,
122 staging_schema: &sq.schema,
123 }
124 })
125 .collect()
126 }
127
128 fn cluster_swaps(&self) -> Vec<ClusterSwapView<'_>> {
129 self.staging_clusters
130 .iter()
131 .map(|cluster| ClusterSwapView {
132 production_cluster: cluster,
133 staging_cluster: format!("{}{}", cluster, self.staging_suffix),
134 })
135 .collect()
136 }
137
138 fn sinks_to_create(&self) -> Vec<SinkToCreateView<'_>> {
139 self.pending_statements
140 .iter()
141 .map(|stmt| SinkToCreateView {
142 database: &stmt.database,
143 schema: &stmt.schema,
144 object: &stmt.object,
145 })
146 .collect()
147 }
148
149 fn replacement_mv_views(&self) -> Vec<ReplacementMvView<'_>> {
150 self.replacement_mvs
151 .iter()
152 .map(|r| ReplacementMvView {
153 target_database: &r.target_database,
154 target_schema: &r.target_schema,
155 target_name: &r.target_name,
156 replacement_schema: &r.replacement_schema,
157 })
158 .collect()
159 }
160
161 fn sinks_to_repoint(&self) -> Vec<SinkToRepointView<'_>> {
162 self.dependent_sinks
163 .iter()
164 .map(|sink| SinkToRepointView {
165 sink_database: &sink.sink_database,
166 sink_schema: &sink.sink_schema,
167 sink_name: &sink.sink_name,
168 dependency_database: &sink.dependency_database,
169 dependency_schema: &sink.dependency_schema,
170 dependency_name: &sink.dependency_name,
171 })
172 .collect()
173 }
174
175 fn replacement_schemas(&self) -> BTreeSet<SchemaQualifier> {
176 self.replacement_mvs
177 .iter()
178 .map(|r| SchemaQualifier::new(r.target_database.clone(), r.replacement_schema.clone()))
179 .collect()
180 }
181
182 fn resources_to_drop(&self) -> Vec<ResourceToDropView> {
183 let mut drops = Vec::new();
184 for sq in &self.staging_schemas {
185 let prod_schema = strip_staging_suffix(&sq.schema, &self.staging_suffix);
186 let old_schema = format!("{}{}", prod_schema, self.staging_suffix);
187 drops.push(ResourceToDropView {
188 kind: "schema".to_string(),
189 name: format!("{}.{}", sq.database, old_schema),
190 });
191 }
192 for cluster in &self.staging_clusters {
193 let old_cluster = format!("{}{}", cluster, self.staging_suffix);
194 drops.push(ResourceToDropView {
195 kind: "cluster".to_string(),
196 name: old_cluster,
197 });
198 }
199 for sq in &self.replacement_schemas() {
200 drops.push(ResourceToDropView {
201 kind: "schema".to_string(),
202 name: format!("{}.{}", sq.database, sq.schema),
203 });
204 }
205 drops
206 }
207}
208
209impl serde::Serialize for DeploymentPlan {
210 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
211 use serde::ser::SerializeStruct;
212 let mut state = serializer.serialize_struct("DeploymentPlan", 8)?;
213 state.serialize_field("deploy_id", &self.deploy_id)?;
214 state.serialize_field("apply_state", self.apply_state_str())?;
215 state.serialize_field("schema_swaps", &self.schema_swaps())?;
216 state.serialize_field("cluster_swaps", &self.cluster_swaps())?;
217 state.serialize_field("sinks_to_create", &self.sinks_to_create())?;
218 state.serialize_field("replacement_mvs", &self.replacement_mv_views())?;
219 state.serialize_field("sinks_to_repoint", &self.sinks_to_repoint())?;
220 state.serialize_field("resources_to_drop", &self.resources_to_drop())?;
221 state.end()
222 }
223}
224
225impl fmt::Display for DeploymentPlan {
226 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
227 match self.apply_state {
229 ApplyState::PreSwap => {
230 let style = Style::new().yellow().bold();
231 writeln!(
232 f,
233 "\n {} Resuming from pre-swap state (apply state schemas already created)",
234 "note:".if_supports_color(Stream::Stderr, |t| style.style(t))
235 )?;
236 }
237 ApplyState::PostSwap => {
238 let style = Style::new().yellow().bold();
239 writeln!(
240 f,
241 "\n {} Resuming from post-swap state (swap already completed, showing remaining work)",
242 "note:".if_supports_color(Stream::Stderr, |t| style.style(t))
243 )?;
244 }
245 ApplyState::NotStarted => {}
246 }
247
248 writeln!(
250 f,
251 "\n {}",
252 "Schema Swaps:".if_supports_color(Stream::Stderr, |t| t.bold())
253 )?;
254 if self.staging_schemas.is_empty() {
255 writeln!(f, " (none)")?;
256 } else {
257 for swap in &self.schema_swaps() {
258 writeln!(
259 f,
260 " {} {}.{} {} {}.{}",
261 "~".if_supports_color(Stream::Stderr, |t| t.yellow()),
262 swap.database,
263 swap.production_schema,
264 "<->".if_supports_color(Stream::Stderr, |t| t.dimmed()),
265 swap.database,
266 swap.staging_schema
267 )?;
268 }
269 }
270
271 writeln!(
273 f,
274 "\n {}",
275 "Cluster Swaps:".if_supports_color(Stream::Stderr, |t| t.bold())
276 )?;
277 if self.staging_clusters.is_empty() {
278 writeln!(f, " (none)")?;
279 } else {
280 for swap in &self.cluster_swaps() {
281 writeln!(
282 f,
283 " {} {} {} {}",
284 "~".if_supports_color(Stream::Stderr, |t| t.yellow()),
285 swap.production_cluster,
286 "<->".if_supports_color(Stream::Stderr, |t| t.dimmed()),
287 swap.staging_cluster
288 )?;
289 }
290 }
291
292 writeln!(
294 f,
295 "\n {}",
296 "Sinks to Create:".if_supports_color(Stream::Stderr, |t| t.bold())
297 )?;
298 if self.pending_statements.is_empty() {
299 writeln!(f, " (none)")?;
300 } else {
301 for stmt in &self.pending_statements {
302 writeln!(
303 f,
304 " {} {}.{}.{}",
305 "+".if_supports_color(Stream::Stderr, |t| t.green()),
306 stmt.database,
307 stmt.schema,
308 stmt.object
309 )?;
310 }
311 }
312
313 writeln!(
315 f,
316 "\n {}",
317 "Replacement Materialized Views:".if_supports_color(Stream::Stderr, |t| t.bold())
318 )?;
319 if self.replacement_mvs.is_empty() {
320 writeln!(f, " (none)")?;
321 } else {
322 for record in &self.replacement_mvs {
323 writeln!(
324 f,
325 " {} {}.{}.{} {} {}.{}.{}",
326 "~".if_supports_color(Stream::Stderr, |t| t.yellow()),
327 record.target_database,
328 record.target_schema,
329 record.target_name,
330 "<-".if_supports_color(Stream::Stderr, |t| t.dimmed()),
331 record.target_database,
332 record.replacement_schema,
333 record.target_name
334 )?;
335 }
336 }
337
338 writeln!(
340 f,
341 "\n {}",
342 "Sinks to Repoint:".if_supports_color(Stream::Stderr, |t| t.bold())
343 )?;
344 if self.dependent_sinks.is_empty() {
345 writeln!(f, " (none)")?;
346 } else {
347 for sink in &self.dependent_sinks {
348 writeln!(
349 f,
350 " {} {}.{}.{} {} {}.{}.{}",
351 "~".if_supports_color(Stream::Stderr, |t| t.yellow()),
352 sink.sink_database,
353 sink.sink_schema,
354 sink.sink_name,
355 "->".if_supports_color(Stream::Stderr, |t| t.dimmed()),
356 sink.dependency_database,
357 sink.dependency_schema,
358 sink.dependency_name
359 )?;
360 }
361 }
362
363 writeln!(
365 f,
366 "\n {}",
367 "Old Resources to Drop:".if_supports_color(Stream::Stderr, |t| t.bold())
368 )?;
369 let drops = self.resources_to_drop();
370 if drops.is_empty() {
371 writeln!(f, " (none)")?;
372 } else {
373 for drop in &drops {
374 writeln!(
375 f,
376 " {} {}",
377 "-".if_supports_color(Stream::Stderr, |t| t.red()),
378 drop.name
379 )?;
380 }
381 }
382
383 Ok(())
384 }
385}
386
387async fn generate_deployment_plan(
389 client: &Client,
390 deploy_id: &str,
391 apply_state: ApplyState,
392 force: bool,
393) -> Result<DeploymentPlan, CliError> {
394 let (staging_schemas, staging_clusters, staging_suffix) = if apply_state == ApplyState::PostSwap
396 {
397 verbose!("Resuming post-swap: skipping conflict check and resource gathering");
398 (BTreeSet::new(), BTreeSet::new(), format!("_{}", deploy_id))
399 } else {
400 gather_resources_and_check_conflicts(client, deploy_id, force).await?
401 };
402
403 let pending_statements = client
405 .deployments()
406 .get_pending_statements(deploy_id)
407 .await?;
408
409 let replacement_mvs = client.deployments().get_replacement_mvs(deploy_id).await?;
411
412 let dependent_sinks = if staging_schemas.is_empty() {
414 Vec::new()
415 } else {
416 let prod_schemas: Vec<SchemaQualifier> = staging_schemas
417 .iter()
418 .map(|sq| {
419 let prod_schema = strip_staging_suffix(&sq.schema, &staging_suffix);
420 SchemaQualifier::new(sq.database.clone(), prod_schema.to_string())
421 })
422 .collect();
423
424 client
425 .introspection()
426 .find_sinks_depending_on_schemas(&prod_schemas)
427 .await
428 .map_err(CliError::Connection)?
429 };
430
431 Ok(DeploymentPlan {
432 deploy_id: deploy_id.to_string(),
433 apply_state,
434 staging_suffix,
435 staging_schemas,
436 staging_clusters,
437 pending_statements,
438 replacement_mvs,
439 dependent_sinks,
440 })
441}
442
443pub async fn run(
445 settings: &Settings,
446 deploy_id: &str,
447 force: bool,
448 dry_run: bool,
449) -> Result<(), CliError> {
450 let profile = settings.connection();
451
452 let client = Client::connect_with_profile(profile.clone())
453 .await
454 .map_err(CliError::Connection)?;
455
456 super::setup::verify(&client, settings.emulator()).await?;
457 let role = super::setup::validate_connection(&client, settings.emulator()).await?;
458 super::setup::require_deployer(role)?;
459
460 client.deployments().validate_staging(deploy_id).await?;
462
463 let apply_state = client.deployments().get_apply_state(deploy_id).await?;
464 verbose!("Apply state: {:?}", apply_state);
465
466 let staging_snapshot =
467 deployment_snapshot::load_from_database(&client, Some(deploy_id)).await?;
468 verbose!(
469 "Found {} objects in staging deployment",
470 staging_snapshot.objects.len()
471 );
472
473 let plan = generate_deployment_plan(&client, deploy_id, apply_state, force).await?;
474
475 if dry_run {
476 log::output(&plan);
477 return Ok(());
478 }
479
480 if log::json_output_enabled() {
481 log::output_json(&plan);
482 }
483
484 execute_swap_phase(&client, &plan).await?;
485 run_post_swap_steps(&client, &plan).await?;
486 cleanup_apply_state(&client, &plan.deploy_id).await?;
487
488 progress::success("Deployment completed successfully!");
489
490 Ok(())
491}
492
493async fn execute_swap_phase(client: &Client, plan: &DeploymentPlan) -> Result<(), CliError> {
495 match plan.apply_state {
496 ApplyState::NotStarted => {
497 verbose!("Creating apply state schemas...");
498 client
499 .deployments()
500 .create_apply_state_schemas(&plan.deploy_id)
501 .await?;
502 verbose!("Executing atomic swap...");
503 execute_atomic_swap(
504 client,
505 &plan.deploy_id,
506 &plan.staging_schemas,
507 &plan.staging_clusters,
508 )
509 .await?;
510 }
511 ApplyState::PreSwap => {
512 verbose!("Resuming from pre-swap state...");
513 execute_atomic_swap(
514 client,
515 &plan.deploy_id,
516 &plan.staging_schemas,
517 &plan.staging_clusters,
518 )
519 .await?;
520 }
521 ApplyState::PostSwap => {
522 verbose!("Resuming from post-swap state...");
523 }
524 }
525 Ok(())
526}
527
528async fn run_post_swap_steps(client: &Client, plan: &DeploymentPlan) -> Result<(), CliError> {
530 execute_pending_sinks(client, plan).await?;
531 apply_replacement_mvs(client, plan).await?;
532
533 if !plan.staging_schemas.is_empty() {
534 repoint_dependent_sinks(client, plan).await?;
535 }
536
537 verbose!("\nUpdating deployment table...");
538 client
539 .deployments()
540 .update_promoted_at(&plan.deploy_id)
541 .await
542 .map_err(|source| CliError::DeploymentStateWriteFailed { source })?;
543
544 if !plan.staging_schemas.is_empty() || !plan.staging_clusters.is_empty() {
545 drop_old_resources(client, plan).await;
546 }
547 Ok(())
548}
549
550async fn cleanup_apply_state(client: &Client, deploy_id: &str) -> Result<(), CliError> {
552 verbose!("Cleaning up apply state...");
553 client
554 .deployments()
555 .delete_apply_state_schemas(deploy_id)
556 .await?;
557 client
558 .deployments()
559 .delete_pending_statements(deploy_id)
560 .await?;
561 client
562 .deployments()
563 .delete_replacement_mvs(deploy_id)
564 .await?;
565 client
566 .deployments()
567 .delete_deployment_clusters(deploy_id)
568 .await
569 .map_err(|source| CliError::DeploymentStateWriteFailed { source })?;
570 Ok(())
571}
572
573async fn gather_resources_and_check_conflicts(
579 client: &Client,
580 deploy_id: &str,
581 force: bool,
582) -> Result<(BTreeSet<SchemaQualifier>, BTreeSet<String>, String), CliError> {
583 verbose!("Checking for deployment conflicts...");
584 let conflicts = client
585 .deployments()
586 .check_deployment_conflicts(deploy_id)
587 .await?;
588
589 if !conflicts.is_empty() {
590 if force {
591 let style = Style::new().yellow().bold();
593 info!(
594 "\n{}: deployment conflicts detected, but continuing due to --force flag",
595 "warning".if_supports_color(Stream::Stderr, |t| style.style(t))
596 );
597 for conflict in &conflicts {
598 info!(
599 " - {}.{} (last promoted by '{}' deployment)",
600 conflict.database, conflict.schema, conflict.deploy_id
601 );
602 }
603 info!();
604 } else {
605 return Err(CliError::DeploymentConflict { conflicts });
607 }
608 } else {
609 verbose!("No conflicts detected");
610 }
611
612 let staging_suffix = format!("_{}", deploy_id);
614 let mut staging_schemas = BTreeSet::new();
615 let mut staging_clusters = BTreeSet::new();
616
617 let deployment_records = client
619 .deployments()
620 .get_schema_deployments(Some(deploy_id))
621 .await?;
622
623 let production_snapshot = deployment_snapshot::load_from_database(client, None).await?;
627
628 let schemas_to_check: Vec<(String, String)> = deployment_records
631 .iter()
632 .filter(|record| {
633 if record.kind == DeploymentKind::Sinks {
634 verbose!(
635 "Skipping sink-only schema {}.{} (no swap needed)",
636 record.database,
637 record.schema
638 );
639 false
640 } else if record.kind == DeploymentKind::Replacement {
641 let sq = SchemaQualifier::new(record.database.clone(), record.schema.clone());
642 let already_replacement = production_snapshot.schemas.get(&sq).copied()
643 == Some(DeploymentKind::Replacement);
644 if already_replacement {
645 verbose!(
646 "Skipping replacement schema {}.{} (already Replacement in production)",
647 record.database,
648 record.schema
649 );
650 false
651 } else {
652 verbose!(
653 "Including replacement schema {}.{} in swap (first Replacement deployment)",
654 record.database,
655 record.schema
656 );
657 true
658 }
659 } else {
660 true
661 }
662 })
663 .map(|record| {
664 let staging_schema = format!("{}{}", record.schema, staging_suffix);
665 (record.database.clone(), staging_schema)
666 })
667 .collect();
668
669 let existing_schemas = client
671 .introspection()
672 .check_schemas_exist(&schemas_to_check)
673 .await?;
674
675 for pair in schemas_to_check {
676 if existing_schemas.contains(&pair) {
677 staging_schemas.insert(SchemaQualifier::new(pair.0, pair.1));
678 } else {
679 info!("Warning: Staging schema {}.{} not found", pair.0, pair.1);
680 }
681 }
682
683 client
685 .deployments()
686 .validate_deployment_clusters(deploy_id)
687 .await?;
688
689 let cluster_names = client
691 .deployments()
692 .get_deployment_clusters(deploy_id)
693 .await?;
694
695 let staging_cluster_names: Vec<String> = cluster_names
697 .iter()
698 .map(|name| format!("{}{}", name, staging_suffix))
699 .collect();
700 let existing_clusters = client
701 .introspection()
702 .check_clusters_exist(&staging_cluster_names)
703 .await?;
704
705 for (cluster_name, staging_cluster) in cluster_names.into_iter().zip_eq(staging_cluster_names) {
706 if existing_clusters.contains(&staging_cluster) {
707 staging_clusters.insert(cluster_name);
708 } else {
709 info!("Warning: Staging cluster {} not found", staging_cluster);
710 }
711 }
712
713 verbose!("\nSchemas to swap:");
714 for sq in &staging_schemas {
715 let prod_schema = strip_staging_suffix(&sq.schema, &staging_suffix);
716 verbose!(" - {}.{} <-> {}", sq.database, sq.schema, prod_schema);
717 }
718
719 if !staging_clusters.is_empty() {
720 verbose!("\nClusters to swap:");
721 for cluster in &staging_clusters {
722 let staging_cluster = format!("{}{}", cluster, staging_suffix);
723 verbose!(" - {} <-> {}", staging_cluster, cluster);
724 }
725 }
726
727 Ok((staging_schemas, staging_clusters, staging_suffix))
728}
729
730async fn execute_atomic_swap(
732 client: &Client,
733 deploy_id: &str,
734 staging_schemas: &BTreeSet<SchemaQualifier>,
735 staging_clusters: &BTreeSet<String>,
736) -> Result<(), CliError> {
737 let staging_suffix = format!("_{}", deploy_id);
738
739 client
741 .execute("BEGIN", &[])
742 .await
743 .map_err(|e| CliError::SqlExecutionFailed {
744 statement: "BEGIN".to_string(),
745 source: e,
746 })?;
747
748 for sq in staging_schemas {
750 let prod_schema = strip_staging_suffix(&sq.schema, &staging_suffix);
751 let swap_sql = format!(
753 "ALTER SCHEMA \"{}\".\"{}\" SWAP WITH \"{}\";",
754 sq.database, prod_schema, sq.schema
755 );
756
757 verbose!(" {}", swap_sql);
758 if let Err(e) = client.execute(&swap_sql, &[]).await {
759 let _ = client.execute("ROLLBACK", &[]).await;
760 return Err(CliError::SqlExecutionFailed {
761 statement: swap_sql,
762 source: e,
763 });
764 }
765 }
766
767 for cluster in staging_clusters {
769 let staging_cluster = format!("{}{}", cluster, staging_suffix);
770 let swap_sql = format!(
771 "ALTER CLUSTER \"{}\" SWAP WITH \"{}\";",
772 cluster, staging_cluster
773 );
774
775 verbose!(" {}", swap_sql);
776 if let Err(e) = client.execute(&swap_sql, &[]).await {
777 let _ = client.execute("ROLLBACK", &[]).await;
778 return Err(CliError::SqlExecutionFailed {
779 statement: swap_sql,
780 source: e,
781 });
782 }
783 }
784
785 let pre_schema = format!("apply_{}_pre", deploy_id);
787 let post_schema = format!("apply_{}_post", deploy_id);
788 let state_swap_sql = format!(
789 "ALTER SCHEMA _mz_deploy.\"{}\" SWAP WITH \"{}\";",
790 pre_schema, post_schema
791 );
792
793 verbose!(" {}", state_swap_sql);
794 if let Err(e) = client.execute(&state_swap_sql, &[]).await {
795 let _ = client.execute("ROLLBACK", &[]).await;
796 return Err(CliError::SqlExecutionFailed {
797 statement: state_swap_sql,
798 source: e,
799 });
800 }
801
802 client
804 .execute("COMMIT", &[])
805 .await
806 .map_err(|e| CliError::SqlExecutionFailed {
807 statement: "COMMIT".to_string(),
808 source: e,
809 })?;
810
811 verbose!("Swap completed successfully");
812 Ok(())
813}
814
815async fn execute_pending_sinks(client: &Client, plan: &DeploymentPlan) -> Result<(), CliError> {
820 if plan.pending_statements.is_empty() {
821 verbose!("No pending sinks to execute");
822 return Ok(());
823 }
824
825 let sink_ids: BTreeSet<ObjectId> = plan
827 .pending_statements
828 .iter()
829 .map(|stmt| {
830 ObjectId::new(
831 stmt.database.clone(),
832 stmt.schema.clone(),
833 stmt.object.clone(),
834 )
835 })
836 .collect();
837
838 let existing_sinks = client.introspection().check_sinks_exist(&sink_ids).await?;
840
841 let sinks_to_create: Vec<_> = plan
843 .pending_statements
844 .iter()
845 .filter(|stmt| {
846 let obj_id = ObjectId::new(
847 stmt.database.clone(),
848 stmt.schema.clone(),
849 stmt.object.clone(),
850 );
851 !existing_sinks.contains(&obj_id)
852 })
853 .collect();
854
855 if !existing_sinks.is_empty() {
857 let mut existing_list: Vec<_> = existing_sinks.iter().collect();
858 existing_list.sort_by_key(|obj| obj.to_string());
859 for sink_id in existing_list {
860 verbose!(" - {}", sink_id);
861 }
862 }
863
864 if sinks_to_create.is_empty() {
865 return Ok(());
866 }
867
868 let sink_schemas: BTreeSet<(&str, &str)> = sinks_to_create
873 .iter()
874 .map(|stmt| (stmt.database.as_str(), stmt.schema.as_str()))
875 .collect();
876 for (database, schema) in sink_schemas {
877 client
878 .provisioning()
879 .create_schema(database, schema)
880 .await
881 .map_err(CliError::Connection)?;
882 }
883
884 for stmt in sinks_to_create {
885 verbose!(
886 "Creating sink {}.{}.{}...",
887 stmt.database,
888 stmt.schema,
889 stmt.object
890 );
891
892 if let Err(e) = client.execute(&stmt.statement_sql, &[]).await {
894 info!(
895 "Error creating sink {}.{}.{}: {}",
896 stmt.database, stmt.schema, stmt.object, e
897 );
898 return Err(CliError::SqlExecutionFailed {
899 statement: stmt.statement_sql.clone(),
900 source: e,
901 });
902 }
903
904 client
906 .deployments()
907 .mark_statement_executed(&plan.deploy_id, stmt.sequence_num)
908 .await?;
909
910 progress::success(&format!(
911 "{}.{}.{}",
912 stmt.database, stmt.schema, stmt.object
913 ));
914 }
915
916 Ok(())
917}
918
919async fn apply_replacement_mvs(client: &Client, plan: &DeploymentPlan) -> Result<(), CliError> {
923 if plan.replacement_mvs.is_empty() {
924 verbose!("No replacement MVs to apply");
925 return Ok(());
926 }
927
928 for record in &plan.replacement_mvs {
929 let alter_sql = format!(
930 "ALTER MATERIALIZED VIEW \"{}\".\"{}\".\"{}\"\
931 APPLY REPLACEMENT \"{}\".\"{}\".\"{}\";",
932 record.target_database,
933 record.target_schema,
934 record.target_name,
935 record.target_database,
936 record.replacement_schema,
937 record.target_name
938 );
939
940 verbose!(" {}", alter_sql);
941 if let Err(e) = client.execute(&alter_sql, &[]).await {
942 info!(
943 "Error applying replacement for {}.{}.{}: {}",
944 record.target_database, record.target_schema, record.target_name, e
945 );
946 return Err(CliError::SqlExecutionFailed {
947 statement: alter_sql,
948 source: e,
949 });
950 }
951
952 progress::success(&format!(
953 "{}.{}.{}",
954 record.target_database, record.target_schema, record.target_name
955 ));
956 }
957
958 let replacement_schemas = plan.replacement_schemas();
960
961 for sq in &replacement_schemas {
962 let drop_sql = format!(
963 "DROP SCHEMA IF EXISTS \"{}\".\"{}\" CASCADE;",
964 sq.database, sq.schema
965 );
966 verbose!(" {}", drop_sql);
967 if let Err(e) = client.execute(&drop_sql, &[]).await {
968 info!(
969 "warning: failed to drop replacement schema {}.{}: {}",
970 sq.database, sq.schema, e
971 );
972 }
973 }
974
975 Ok(())
976}
977
978async fn repoint_dependent_sinks(client: &Client, plan: &DeploymentPlan) -> Result<(), CliError> {
983 let staging_suffix = &plan.staging_suffix;
984
985 let old_schemas: Vec<SchemaQualifier> = plan
988 .staging_schemas
989 .iter()
990 .map(|sq| {
991 let prod_schema = strip_staging_suffix(&sq.schema, staging_suffix);
992 let old_schema = format!("{}{}", prod_schema, staging_suffix);
993 SchemaQualifier::new(sq.database.clone(), old_schema)
994 })
995 .collect();
996
997 let dependent_sinks = client
999 .introspection()
1000 .find_sinks_depending_on_schemas(&old_schemas)
1001 .await
1002 .map_err(CliError::Connection)?;
1003
1004 if dependent_sinks.is_empty() {
1005 verbose!("No sinks depend on objects in schemas being dropped");
1006 return Ok(());
1007 }
1008
1009 let replacement_ids: BTreeSet<ObjectId> = dependent_sinks
1011 .iter()
1012 .map(|sink| {
1013 let new_schema =
1014 strip_staging_suffix(&sink.dependency_schema, staging_suffix).to_string();
1015 ObjectId::new(
1016 sink.dependency_database.clone(),
1017 new_schema,
1018 sink.dependency_name.clone(),
1019 )
1020 })
1021 .collect();
1022
1023 let existing_ids = client
1024 .introspection()
1025 .check_objects_exist(&replacement_ids)
1026 .await
1027 .map_err(CliError::Connection)?;
1028
1029 for sink in dependent_sinks {
1030 let new_schema = strip_staging_suffix(&sink.dependency_schema, staging_suffix);
1032
1033 let replacement_id = ObjectId::new(
1034 sink.dependency_database.clone(),
1035 new_schema.to_string(),
1036 sink.dependency_name.clone(),
1037 );
1038
1039 if !existing_ids.contains(&replacement_id) {
1040 return Err(CliError::SinkRepointFailed {
1041 sink: format!(
1042 "{}.{}.{}",
1043 sink.sink_database, sink.sink_schema, sink.sink_name
1044 ),
1045 reason: format!("replacement object {} does not exist", replacement_id),
1046 });
1047 }
1048
1049 let alter_sql = format!(
1051 r#"ALTER SINK "{}"."{}"."{}" SET FROM "{}"."{}"."{}""#,
1052 sink.sink_database,
1053 sink.sink_schema,
1054 sink.sink_name,
1055 sink.dependency_database,
1056 new_schema,
1057 sink.dependency_name
1058 );
1059
1060 verbose!(" {}", alter_sql);
1061 if let Err(e) = client.execute(&alter_sql, &[]).await {
1062 return Err(CliError::SinkRepointFailed {
1063 sink: format!(
1064 "{}.{}.{}",
1065 sink.sink_database, sink.sink_schema, sink.sink_name
1066 ),
1067 reason: e.to_string(),
1068 });
1069 }
1070
1071 progress::success(&format!(
1072 "{}.{}.{} -> {}.{}.{}",
1073 sink.sink_database,
1074 sink.sink_schema,
1075 sink.sink_name,
1076 sink.dependency_database,
1077 new_schema,
1078 sink.dependency_name
1079 ));
1080 }
1081
1082 Ok(())
1083}
1084
1085async fn drop_old_resources(client: &Client, plan: &DeploymentPlan) {
1087 let staging_suffix = &plan.staging_suffix;
1088
1089 for sq in &plan.staging_schemas {
1091 let prod_schema = strip_staging_suffix(&sq.schema, staging_suffix);
1092 let old_schema = format!("{}{}", prod_schema, staging_suffix);
1093 let drop_sql = format!(
1094 "DROP SCHEMA IF EXISTS \"{}\".\"{}\" CASCADE;",
1095 sq.database, old_schema
1096 );
1097
1098 verbose!(" {}", drop_sql);
1099 if let Err(e) = client.execute(&drop_sql, &[]).await {
1100 info!(
1101 "warning: failed to drop old schema {}.{}: {}",
1102 sq.database, old_schema, e
1103 );
1104 }
1105 }
1106
1107 for cluster in &plan.staging_clusters {
1109 let old_cluster = format!("{}{}", cluster, staging_suffix);
1110 let drop_sql = format!("DROP CLUSTER IF EXISTS \"{}\" CASCADE;", old_cluster);
1111
1112 verbose!(" {}", drop_sql);
1113 if let Err(e) = client.execute(&drop_sql, &[]).await {
1114 info!("warning: failed to drop old cluster {}: {}", old_cluster, e);
1115 }
1116 }
1117}
1118
1119#[cfg(test)]
1120mod tests {
1121 use super::strip_staging_suffix;
1122
1123 #[mz_ore::test]
1124 fn test_strip_staging_suffix_removes_suffix_once() {
1125 assert_eq!(
1127 strip_staging_suffix("analytics_deploy123", "_deploy123"),
1128 "analytics"
1129 );
1130 }
1131
1132 #[mz_ore::test]
1133 fn test_strip_staging_suffix_strips_exactly_once() {
1134 assert_eq!(
1144 strip_staging_suffix("analytics_prod_prod", "_prod"),
1145 "analytics_prod"
1146 );
1147 }
1148
1149 #[mz_ore::test]
1150 fn test_strip_staging_suffix_no_match_returns_input() {
1151 assert_eq!(strip_staging_suffix("analytics", "_prod"), "analytics");
1153 }
1154}