Skip to main content

mz_deploy/cli/commands/
promote.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Promote command - promote staging deployment to production via ALTER SWAP.
11
12use crate::cli::CliError;
13use crate::cli::progress;
14use crate::client::{
15    ApplyState, Client, DependentSink, DeploymentKind, PendingStatement, ReplacementMvRecord,
16};
17use crate::config::Settings;
18use crate::log;
19use crate::project::SchemaQualifier;
20use crate::project::analysis::deployment_snapshot;
21use crate::project::ir::object_id::ObjectId;
22use crate::{info, verbose};
23use itertools::Itertools;
24use owo_colors::{OwoColorize, Stream, Style};
25use std::collections::BTreeSet;
26use std::fmt;
27
28/// Recover a production name from a staging name by removing the staging suffix.
29///
30/// Staging appends the suffix (`_<deploy_id>`) exactly once, so it must be
31/// removed exactly once. `str::trim_end_matches` strips *every* trailing
32/// occurrence, which mangles production names that legitimately end in the
33/// suffix: with `--deploy-id prod`, production schema `analytics_prod` is staged
34/// as `analytics_prod_prod`, and trimming all occurrences would yield
35/// `analytics` instead of `analytics_prod` — pointing the cutover swap and the
36/// subsequent `DROP ... CASCADE` at the wrong production schema.
37///
38/// If the name does not end with the suffix it is returned unchanged.
39fn strip_staging_suffix<'a>(staging_name: &'a str, staging_suffix: &str) -> &'a str {
40    staging_name
41        .strip_suffix(staging_suffix)
42        .unwrap_or(staging_name)
43}
44
45/// Central data structure holding everything needed to display and execute a deployment.
46///
47/// Computed once by `generate_deployment_plan()` and consumed for display (human + JSON)
48/// and execution. Raw domain objects are stored so execution functions don't re-query.
49struct DeploymentPlan {
50    deploy_id: String,
51    apply_state: ApplyState,
52    staging_suffix: String,
53    staging_schemas: BTreeSet<SchemaQualifier>,
54    staging_clusters: BTreeSet<String>,
55    pending_statements: Vec<PendingStatement>,
56    replacement_mvs: Vec<ReplacementMvRecord>,
57    dependent_sinks: Vec<DependentSink>,
58}
59
60#[derive(serde::Serialize)]
61struct SchemaSwapView<'a> {
62    database: &'a str,
63    production_schema: String,
64    staging_schema: &'a str,
65}
66
67#[derive(serde::Serialize)]
68struct ClusterSwapView<'a> {
69    production_cluster: &'a str,
70    staging_cluster: String,
71}
72
73#[derive(serde::Serialize)]
74struct SinkToCreateView<'a> {
75    database: &'a str,
76    schema: &'a str,
77    object: &'a str,
78}
79
80#[derive(serde::Serialize)]
81struct ReplacementMvView<'a> {
82    target_database: &'a str,
83    target_schema: &'a str,
84    target_name: &'a str,
85    replacement_schema: &'a str,
86}
87
88#[derive(serde::Serialize)]
89struct SinkToRepointView<'a> {
90    sink_database: &'a str,
91    sink_schema: &'a str,
92    sink_name: &'a str,
93    dependency_database: &'a str,
94    dependency_schema: &'a str,
95    dependency_name: &'a str,
96}
97
98#[derive(serde::Serialize)]
99struct ResourceToDropView {
100    kind: String,
101    name: String,
102}
103
104impl DeploymentPlan {
105    fn apply_state_str(&self) -> &'static str {
106        match self.apply_state {
107            ApplyState::NotStarted => "not_started",
108            ApplyState::PreSwap => "pre_swap",
109            ApplyState::PostSwap => "post_swap",
110        }
111    }
112
113    fn schema_swaps(&self) -> Vec<SchemaSwapView<'_>> {
114        self.staging_schemas
115            .iter()
116            .map(|sq| {
117                let prod_schema =
118                    strip_staging_suffix(&sq.schema, &self.staging_suffix).to_string();
119                SchemaSwapView {
120                    database: &sq.database,
121                    production_schema: prod_schema,
122                    staging_schema: &sq.schema,
123                }
124            })
125            .collect()
126    }
127
128    fn cluster_swaps(&self) -> Vec<ClusterSwapView<'_>> {
129        self.staging_clusters
130            .iter()
131            .map(|cluster| ClusterSwapView {
132                production_cluster: cluster,
133                staging_cluster: format!("{}{}", cluster, self.staging_suffix),
134            })
135            .collect()
136    }
137
138    fn sinks_to_create(&self) -> Vec<SinkToCreateView<'_>> {
139        self.pending_statements
140            .iter()
141            .map(|stmt| SinkToCreateView {
142                database: &stmt.database,
143                schema: &stmt.schema,
144                object: &stmt.object,
145            })
146            .collect()
147    }
148
149    fn replacement_mv_views(&self) -> Vec<ReplacementMvView<'_>> {
150        self.replacement_mvs
151            .iter()
152            .map(|r| ReplacementMvView {
153                target_database: &r.target_database,
154                target_schema: &r.target_schema,
155                target_name: &r.target_name,
156                replacement_schema: &r.replacement_schema,
157            })
158            .collect()
159    }
160
161    fn sinks_to_repoint(&self) -> Vec<SinkToRepointView<'_>> {
162        self.dependent_sinks
163            .iter()
164            .map(|sink| SinkToRepointView {
165                sink_database: &sink.sink_database,
166                sink_schema: &sink.sink_schema,
167                sink_name: &sink.sink_name,
168                dependency_database: &sink.dependency_database,
169                dependency_schema: &sink.dependency_schema,
170                dependency_name: &sink.dependency_name,
171            })
172            .collect()
173    }
174
175    fn replacement_schemas(&self) -> BTreeSet<SchemaQualifier> {
176        self.replacement_mvs
177            .iter()
178            .map(|r| SchemaQualifier::new(r.target_database.clone(), r.replacement_schema.clone()))
179            .collect()
180    }
181
182    fn resources_to_drop(&self) -> Vec<ResourceToDropView> {
183        let mut drops = Vec::new();
184        for sq in &self.staging_schemas {
185            let prod_schema = strip_staging_suffix(&sq.schema, &self.staging_suffix);
186            let old_schema = format!("{}{}", prod_schema, self.staging_suffix);
187            drops.push(ResourceToDropView {
188                kind: "schema".to_string(),
189                name: format!("{}.{}", sq.database, old_schema),
190            });
191        }
192        for cluster in &self.staging_clusters {
193            let old_cluster = format!("{}{}", cluster, self.staging_suffix);
194            drops.push(ResourceToDropView {
195                kind: "cluster".to_string(),
196                name: old_cluster,
197            });
198        }
199        for sq in &self.replacement_schemas() {
200            drops.push(ResourceToDropView {
201                kind: "schema".to_string(),
202                name: format!("{}.{}", sq.database, sq.schema),
203            });
204        }
205        drops
206    }
207}
208
209impl serde::Serialize for DeploymentPlan {
210    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
211        use serde::ser::SerializeStruct;
212        let mut state = serializer.serialize_struct("DeploymentPlan", 8)?;
213        state.serialize_field("deploy_id", &self.deploy_id)?;
214        state.serialize_field("apply_state", self.apply_state_str())?;
215        state.serialize_field("schema_swaps", &self.schema_swaps())?;
216        state.serialize_field("cluster_swaps", &self.cluster_swaps())?;
217        state.serialize_field("sinks_to_create", &self.sinks_to_create())?;
218        state.serialize_field("replacement_mvs", &self.replacement_mv_views())?;
219        state.serialize_field("sinks_to_repoint", &self.sinks_to_repoint())?;
220        state.serialize_field("resources_to_drop", &self.resources_to_drop())?;
221        state.end()
222    }
223}
224
225impl fmt::Display for DeploymentPlan {
226    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
227        // Note resume state if applicable
228        match self.apply_state {
229            ApplyState::PreSwap => {
230                let style = Style::new().yellow().bold();
231                writeln!(
232                    f,
233                    "\n  {} Resuming from pre-swap state (apply state schemas already created)",
234                    "note:".if_supports_color(Stream::Stderr, |t| style.style(t))
235                )?;
236            }
237            ApplyState::PostSwap => {
238                let style = Style::new().yellow().bold();
239                writeln!(
240                    f,
241                    "\n  {} Resuming from post-swap state (swap already completed, showing remaining work)",
242                    "note:".if_supports_color(Stream::Stderr, |t| style.style(t))
243                )?;
244            }
245            ApplyState::NotStarted => {}
246        }
247
248        // Schema Swaps
249        writeln!(
250            f,
251            "\n  {}",
252            "Schema Swaps:".if_supports_color(Stream::Stderr, |t| t.bold())
253        )?;
254        if self.staging_schemas.is_empty() {
255            writeln!(f, "    (none)")?;
256        } else {
257            for swap in &self.schema_swaps() {
258                writeln!(
259                    f,
260                    "    {} {}.{} {} {}.{}",
261                    "~".if_supports_color(Stream::Stderr, |t| t.yellow()),
262                    swap.database,
263                    swap.production_schema,
264                    "<->".if_supports_color(Stream::Stderr, |t| t.dimmed()),
265                    swap.database,
266                    swap.staging_schema
267                )?;
268            }
269        }
270
271        // Cluster Swaps
272        writeln!(
273            f,
274            "\n  {}",
275            "Cluster Swaps:".if_supports_color(Stream::Stderr, |t| t.bold())
276        )?;
277        if self.staging_clusters.is_empty() {
278            writeln!(f, "    (none)")?;
279        } else {
280            for swap in &self.cluster_swaps() {
281                writeln!(
282                    f,
283                    "    {} {} {} {}",
284                    "~".if_supports_color(Stream::Stderr, |t| t.yellow()),
285                    swap.production_cluster,
286                    "<->".if_supports_color(Stream::Stderr, |t| t.dimmed()),
287                    swap.staging_cluster
288                )?;
289            }
290        }
291
292        // Sinks to Create
293        writeln!(
294            f,
295            "\n  {}",
296            "Sinks to Create:".if_supports_color(Stream::Stderr, |t| t.bold())
297        )?;
298        if self.pending_statements.is_empty() {
299            writeln!(f, "    (none)")?;
300        } else {
301            for stmt in &self.pending_statements {
302                writeln!(
303                    f,
304                    "    {} {}.{}.{}",
305                    "+".if_supports_color(Stream::Stderr, |t| t.green()),
306                    stmt.database,
307                    stmt.schema,
308                    stmt.object
309                )?;
310            }
311        }
312
313        // Replacement MVs
314        writeln!(
315            f,
316            "\n  {}",
317            "Replacement Materialized Views:".if_supports_color(Stream::Stderr, |t| t.bold())
318        )?;
319        if self.replacement_mvs.is_empty() {
320            writeln!(f, "    (none)")?;
321        } else {
322            for record in &self.replacement_mvs {
323                writeln!(
324                    f,
325                    "    {} {}.{}.{} {} {}.{}.{}",
326                    "~".if_supports_color(Stream::Stderr, |t| t.yellow()),
327                    record.target_database,
328                    record.target_schema,
329                    record.target_name,
330                    "<-".if_supports_color(Stream::Stderr, |t| t.dimmed()),
331                    record.target_database,
332                    record.replacement_schema,
333                    record.target_name
334                )?;
335            }
336        }
337
338        // Sinks to Repoint
339        writeln!(
340            f,
341            "\n  {}",
342            "Sinks to Repoint:".if_supports_color(Stream::Stderr, |t| t.bold())
343        )?;
344        if self.dependent_sinks.is_empty() {
345            writeln!(f, "    (none)")?;
346        } else {
347            for sink in &self.dependent_sinks {
348                writeln!(
349                    f,
350                    "    {} {}.{}.{} {} {}.{}.{}",
351                    "~".if_supports_color(Stream::Stderr, |t| t.yellow()),
352                    sink.sink_database,
353                    sink.sink_schema,
354                    sink.sink_name,
355                    "->".if_supports_color(Stream::Stderr, |t| t.dimmed()),
356                    sink.dependency_database,
357                    sink.dependency_schema,
358                    sink.dependency_name
359                )?;
360            }
361        }
362
363        // Old Resources to Drop
364        writeln!(
365            f,
366            "\n  {}",
367            "Old Resources to Drop:".if_supports_color(Stream::Stderr, |t| t.bold())
368        )?;
369        let drops = self.resources_to_drop();
370        if drops.is_empty() {
371            writeln!(f, "    (none)")?;
372        } else {
373            for drop in &drops {
374                writeln!(
375                    f,
376                    "    {} {}",
377                    "-".if_supports_color(Stream::Stderr, |t| t.red()),
378                    drop.name
379                )?;
380            }
381        }
382
383        Ok(())
384    }
385}
386
387/// Build a complete deployment plan by querying the database once for all data.
388async fn generate_deployment_plan(
389    client: &Client,
390    deploy_id: &str,
391    apply_state: ApplyState,
392    force: bool,
393) -> Result<DeploymentPlan, CliError> {
394    // 1. Gather swap resources (empty for PostSwap)
395    let (staging_schemas, staging_clusters, staging_suffix) = if apply_state == ApplyState::PostSwap
396    {
397        verbose!("Resuming post-swap: skipping conflict check and resource gathering");
398        (BTreeSet::new(), BTreeSet::new(), format!("_{}", deploy_id))
399    } else {
400        gather_resources_and_check_conflicts(client, deploy_id, force).await?
401    };
402
403    // 2. Query pending statements once
404    let pending_statements = client
405        .deployments()
406        .get_pending_statements(deploy_id)
407        .await?;
408
409    // 3. Query replacement MVs once
410    let replacement_mvs = client.deployments().get_replacement_mvs(deploy_id).await?;
411
412    // 4. Query dependent sinks for display (pre-swap schema names)
413    let dependent_sinks = if staging_schemas.is_empty() {
414        Vec::new()
415    } else {
416        let prod_schemas: Vec<SchemaQualifier> = staging_schemas
417            .iter()
418            .map(|sq| {
419                let prod_schema = strip_staging_suffix(&sq.schema, &staging_suffix);
420                SchemaQualifier::new(sq.database.clone(), prod_schema.to_string())
421            })
422            .collect();
423
424        client
425            .introspection()
426            .find_sinks_depending_on_schemas(&prod_schemas)
427            .await
428            .map_err(CliError::Connection)?
429    };
430
431    Ok(DeploymentPlan {
432        deploy_id: deploy_id.to_string(),
433        apply_state,
434        staging_suffix,
435        staging_schemas,
436        staging_clusters,
437        pending_statements,
438        replacement_mvs,
439        dependent_sinks,
440    })
441}
442
443/// Promote a staging deployment to production using ALTER SWAP.
444pub async fn run(
445    settings: &Settings,
446    deploy_id: &str,
447    force: bool,
448    dry_run: bool,
449) -> Result<(), CliError> {
450    let profile = settings.connection();
451
452    let client = Client::connect_with_profile(profile.clone())
453        .await
454        .map_err(CliError::Connection)?;
455
456    super::setup::verify(&client, settings.emulator()).await?;
457    let role = super::setup::validate_connection(&client, settings.emulator()).await?;
458    super::setup::require_deployer(role)?;
459
460    let apply_state = client.deployments().get_apply_state(deploy_id).await?;
461    verbose!("Apply state: {:?}", apply_state);
462
463    // A fresh promote must target a staged, not-yet-promoted deployment. When
464    // apply-state markers already exist the promote was interrupted mid-flight
465    // (PreSwap/PostSwap), so resume it instead — even if `promoted_at` was
466    // already recorded. This makes a crash in the window between recording the
467    // promotion and dropping the markers recoverable by simply re-running
468    // `promote`, rather than leaving the markers orphaned.
469    if matches!(apply_state, ApplyState::NotStarted) {
470        client.deployments().validate_staging(deploy_id).await?;
471    }
472
473    let staging_snapshot =
474        deployment_snapshot::load_from_database(&client, Some(deploy_id)).await?;
475    verbose!(
476        "Found {} objects in staging deployment",
477        staging_snapshot.objects.len()
478    );
479
480    let plan = generate_deployment_plan(&client, deploy_id, apply_state, force).await?;
481
482    if dry_run {
483        log::output(&plan);
484        return Ok(());
485    }
486
487    if log::json_output_enabled() {
488        log::output_json(&plan);
489    }
490
491    execute_swap_phase(&client, &plan).await?;
492    maybe_crash("after-swap");
493    run_post_swap_steps(&client, &plan).await?;
494    maybe_crash("after-post-swap");
495    cleanup_apply_state(&client, &plan.deploy_id).await?;
496
497    progress::success("Deployment completed successfully!");
498
499    Ok(())
500}
501
502/// Test-only crash injection. When the `MZ_DEPLOY_FAIL_AT` environment variable
503/// matches `phase`, abort the process immediately — no unwinding, no cleanup —
504/// to faithfully simulate a crash at that boundary so promote's resume paths can
505/// be exercised end to end. Inert in normal use (the variable is never set).
506///
507/// Boundaries: `after-markers` (markers written, swap not committed → PreSwap),
508/// `after-swap` (swap committed → PostSwap), `after-post-swap` (post-swap work
509/// done, markers not yet cleaned up → PostSwap).
510fn maybe_crash(phase: &str) {
511    if std::env::var("MZ_DEPLOY_FAIL_AT").ok().as_deref() == Some(phase) {
512        crate::info!("MZ_DEPLOY_FAIL_AT={}: simulating crash", phase);
513        std::process::exit(1);
514    }
515}
516
517/// Runs the swap portion of apply according to persisted resume state.
518async fn execute_swap_phase(client: &Client, plan: &DeploymentPlan) -> Result<(), CliError> {
519    match plan.apply_state {
520        ApplyState::NotStarted => {
521            verbose!("Creating apply state schemas...");
522            client
523                .deployments()
524                .create_apply_state_schemas(&plan.deploy_id)
525                .await?;
526            maybe_crash("after-markers");
527            verbose!("Executing atomic swap...");
528            execute_atomic_swap(
529                client,
530                &plan.deploy_id,
531                &plan.staging_schemas,
532                &plan.staging_clusters,
533            )
534            .await?;
535        }
536        ApplyState::PreSwap => {
537            verbose!("Resuming from pre-swap state...");
538            execute_atomic_swap(
539                client,
540                &plan.deploy_id,
541                &plan.staging_schemas,
542                &plan.staging_clusters,
543            )
544            .await?;
545        }
546        ApplyState::PostSwap => {
547            verbose!("Resuming from post-swap state...");
548        }
549    }
550    Ok(())
551}
552
553/// Completes promotion work that must happen after schemas/clusters are swapped.
554async fn run_post_swap_steps(client: &Client, plan: &DeploymentPlan) -> Result<(), CliError> {
555    execute_pending_sinks(client, plan).await?;
556    apply_replacement_mvs(client, plan).await?;
557
558    if !plan.staging_schemas.is_empty() {
559        repoint_dependent_sinks(client, plan).await?;
560    }
561
562    verbose!("\nUpdating deployment table...");
563    client
564        .deployments()
565        .update_promoted_at(&plan.deploy_id)
566        .await
567        .map_err(|source| CliError::DeploymentStateWriteFailed { source })?;
568
569    if !plan.staging_schemas.is_empty() || !plan.staging_clusters.is_empty() {
570        drop_old_resources(client, plan).await;
571    }
572    Ok(())
573}
574
575/// Removes deployment bookkeeping that is only needed while apply is in-flight.
576async fn cleanup_apply_state(client: &Client, deploy_id: &str) -> Result<(), CliError> {
577    verbose!("Cleaning up apply state...");
578    client
579        .deployments()
580        .delete_apply_state_schemas(deploy_id)
581        .await?;
582    client
583        .deployments()
584        .delete_pending_statements(deploy_id)
585        .await?;
586    client
587        .deployments()
588        .delete_replacement_mvs(deploy_id)
589        .await?;
590    client
591        .deployments()
592        .delete_deployment_clusters(deploy_id)
593        .await
594        .map_err(|source| CliError::DeploymentStateWriteFailed { source })?;
595    Ok(())
596}
597
598/// Gather staging resources and check for deployment conflicts.
599///
600/// Returns the staging schemas, clusters, and suffix for the swap operation.
601/// Loads the production snapshot to determine which Replacement schemas need
602/// swapping (first Replacement deployment) vs. skipping (steady state).
603async fn gather_resources_and_check_conflicts(
604    client: &Client,
605    deploy_id: &str,
606    force: bool,
607) -> Result<(BTreeSet<SchemaQualifier>, BTreeSet<String>, String), CliError> {
608    verbose!("Checking for deployment conflicts...");
609    let conflicts = client
610        .deployments()
611        .check_deployment_conflicts(deploy_id)
612        .await?;
613
614    if !conflicts.is_empty() {
615        if force {
616            // With --force, show warning but continue
617            let style = Style::new().yellow().bold();
618            info!(
619                "\n{}: deployment conflicts detected, but continuing due to --force flag",
620                "warning".if_supports_color(Stream::Stderr, |t| style.style(t))
621            );
622            for conflict in &conflicts {
623                info!(
624                    "  - {}.{} (last promoted by '{}' deployment)",
625                    conflict.database, conflict.schema, conflict.deploy_id
626                );
627            }
628            info!();
629        } else {
630            // Without --force, return error
631            return Err(CliError::DeploymentConflict { conflicts });
632        }
633    } else {
634        verbose!("No conflicts detected");
635    }
636
637    // Get schemas and clusters from deployment tables
638    let staging_suffix = format!("_{}", deploy_id);
639    let mut staging_schemas = BTreeSet::new();
640    let mut staging_clusters = BTreeSet::new();
641
642    // Get schemas from deploy.deployments table for this deployment
643    let deployment_records = client
644        .deployments()
645        .get_schema_deployments(Some(deploy_id))
646        .await?;
647
648    // Load production snapshot to check which schemas are already Replacement.
649    // This distinguishes Objects→Replacement transitions (need swap) from
650    // steady-state Replacement deploys (skip swap, use APPLY REPLACEMENT).
651    let production_snapshot = deployment_snapshot::load_from_database(client, None).await?;
652
653    // Build list of (database, staging_schema) pairs to check, filtering out Sinks
654    // and steady-state Replacement schemas (already Replacement in production).
655    let schemas_to_check: Vec<(String, String)> = deployment_records
656        .iter()
657        .filter(|record| {
658            if record.kind == DeploymentKind::Sinks {
659                verbose!(
660                    "Skipping sink-only schema {}.{} (no swap needed)",
661                    record.database,
662                    record.schema
663                );
664                false
665            } else if record.kind == DeploymentKind::Replacement {
666                let sq = SchemaQualifier::new(record.database.clone(), record.schema.clone());
667                let already_replacement = production_snapshot.schemas.get(&sq).copied()
668                    == Some(DeploymentKind::Replacement);
669                if already_replacement {
670                    verbose!(
671                        "Skipping replacement schema {}.{} (already Replacement in production)",
672                        record.database,
673                        record.schema
674                    );
675                    false
676                } else {
677                    verbose!(
678                        "Including replacement schema {}.{} in swap (first Replacement deployment)",
679                        record.database,
680                        record.schema
681                    );
682                    true
683                }
684            } else {
685                true
686            }
687        })
688        .map(|record| {
689            let staging_schema = format!("{}{}", record.schema, staging_suffix);
690            (record.database.clone(), staging_schema)
691        })
692        .collect();
693
694    // Batch check which staging schemas exist
695    let existing_schemas = client
696        .introspection()
697        .check_schemas_exist(&schemas_to_check)
698        .await?;
699
700    for pair in schemas_to_check {
701        if existing_schemas.contains(&pair) {
702            staging_schemas.insert(SchemaQualifier::new(pair.0, pair.1));
703        } else {
704            info!("Warning: Staging schema {}.{} not found", pair.0, pair.1);
705        }
706    }
707
708    // Validate that all clusters in the deployment still exist
709    client
710        .deployments()
711        .validate_deployment_clusters(deploy_id)
712        .await?;
713
714    // Get clusters from deploy.clusters table
715    let cluster_names = client
716        .deployments()
717        .get_deployment_clusters(deploy_id)
718        .await?;
719
720    // Batch check which staging clusters exist
721    let staging_cluster_names: Vec<String> = cluster_names
722        .iter()
723        .map(|name| format!("{}{}", name, staging_suffix))
724        .collect();
725    let existing_clusters = client
726        .introspection()
727        .check_clusters_exist(&staging_cluster_names)
728        .await?;
729
730    for (cluster_name, staging_cluster) in cluster_names.into_iter().zip_eq(staging_cluster_names) {
731        if existing_clusters.contains(&staging_cluster) {
732            staging_clusters.insert(cluster_name);
733        } else {
734            info!("Warning: Staging cluster {} not found", staging_cluster);
735        }
736    }
737
738    verbose!("\nSchemas to swap:");
739    for sq in &staging_schemas {
740        let prod_schema = strip_staging_suffix(&sq.schema, &staging_suffix);
741        verbose!("  - {}.{} <-> {}", sq.database, sq.schema, prod_schema);
742    }
743
744    if !staging_clusters.is_empty() {
745        verbose!("\nClusters to swap:");
746        for cluster in &staging_clusters {
747            let staging_cluster = format!("{}{}", cluster, staging_suffix);
748            verbose!("  - {} <-> {}", staging_cluster, cluster);
749        }
750    }
751
752    Ok((staging_schemas, staging_clusters, staging_suffix))
753}
754
755/// Execute the atomic swap of schemas, clusters, and state schemas.
756async fn execute_atomic_swap(
757    client: &Client,
758    deploy_id: &str,
759    staging_schemas: &BTreeSet<SchemaQualifier>,
760    staging_clusters: &BTreeSet<String>,
761) -> Result<(), CliError> {
762    let staging_suffix = format!("_{}", deploy_id);
763
764    // Begin transaction for atomic swap
765    client
766        .execute("BEGIN", &[])
767        .await
768        .map_err(|e| CliError::SqlExecutionFailed {
769            statement: "BEGIN".to_string(),
770            source: e,
771        })?;
772
773    // Swap schemas
774    for sq in staging_schemas {
775        let prod_schema = strip_staging_suffix(&sq.schema, &staging_suffix);
776        // Note: second schema name is NOT fully qualified (same database)
777        let swap_sql = format!(
778            "ALTER SCHEMA \"{}\".\"{}\" SWAP WITH \"{}\";",
779            sq.database, prod_schema, sq.schema
780        );
781
782        verbose!("  {}", swap_sql);
783        if let Err(e) = client.execute(&swap_sql, &[]).await {
784            let _ = client.execute("ROLLBACK", &[]).await;
785            return Err(CliError::SqlExecutionFailed {
786                statement: swap_sql,
787                source: e,
788            });
789        }
790    }
791
792    // Swap clusters
793    for cluster in staging_clusters {
794        let staging_cluster = format!("{}{}", cluster, staging_suffix);
795        let swap_sql = format!(
796            "ALTER CLUSTER \"{}\" SWAP WITH \"{}\";",
797            cluster, staging_cluster
798        );
799
800        verbose!("  {}", swap_sql);
801        if let Err(e) = client.execute(&swap_sql, &[]).await {
802            let _ = client.execute("ROLLBACK", &[]).await;
803            return Err(CliError::SqlExecutionFailed {
804                statement: swap_sql,
805                source: e,
806            });
807        }
808    }
809
810    // Swap the apply state schemas - this atomically marks the swap as complete
811    let pre_schema = format!("apply_{}_pre", deploy_id);
812    let post_schema = format!("apply_{}_post", deploy_id);
813    let state_swap_sql = format!(
814        "ALTER SCHEMA _mz_deploy.\"{}\" SWAP WITH \"{}\";",
815        pre_schema, post_schema
816    );
817
818    verbose!("  {}", state_swap_sql);
819    if let Err(e) = client.execute(&state_swap_sql, &[]).await {
820        let _ = client.execute("ROLLBACK", &[]).await;
821        return Err(CliError::SqlExecutionFailed {
822            statement: state_swap_sql,
823            source: e,
824        });
825    }
826
827    // Commit transaction
828    client
829        .execute("COMMIT", &[])
830        .await
831        .map_err(|e| CliError::SqlExecutionFailed {
832            statement: "COMMIT".to_string(),
833            source: e,
834        })?;
835
836    verbose!("Swap completed successfully");
837    Ok(())
838}
839
840/// Execute pending sink statements using data from the plan.
841///
842/// Uses `plan.pending_statements` instead of re-querying. Still checks sink
843/// existence at execution time for idempotency.
844async fn execute_pending_sinks(client: &Client, plan: &DeploymentPlan) -> Result<(), CliError> {
845    if plan.pending_statements.is_empty() {
846        verbose!("No pending sinks to execute");
847        return Ok(());
848    }
849
850    // Build set of sink ObjectIds from pending statements
851    let sink_ids: BTreeSet<ObjectId> = plan
852        .pending_statements
853        .iter()
854        .map(|stmt| {
855            ObjectId::new(
856                stmt.database.clone(),
857                stmt.schema.clone(),
858                stmt.object.clone(),
859            )
860        })
861        .collect();
862
863    // Check which sinks already exist (like tables, skip existing ones)
864    let existing_sinks = client.introspection().check_sinks_exist(&sink_ids).await?;
865
866    // Filter to only sinks that don't exist
867    let sinks_to_create: Vec<_> = plan
868        .pending_statements
869        .iter()
870        .filter(|stmt| {
871            let obj_id = ObjectId::new(
872                stmt.database.clone(),
873                stmt.schema.clone(),
874                stmt.object.clone(),
875            );
876            !existing_sinks.contains(&obj_id)
877        })
878        .collect();
879
880    // Log skipped sinks
881    if !existing_sinks.is_empty() {
882        let mut existing_list: Vec<_> = existing_sinks.iter().collect();
883        existing_list.sort_by_key(|obj| obj.to_string());
884        for sink_id in existing_list {
885            verbose!("  - {}", sink_id);
886        }
887    }
888
889    if sinks_to_create.is_empty() {
890        return Ok(());
891    }
892
893    // Sink-only schemas are created here: `apply` only ensures schemas for
894    // its own phases (secrets/connections/sources/tables) and `stage`
895    // excludes sinks from staging schemas, so a schema containing nothing
896    // but sinks does not exist yet.
897    let sink_schemas: BTreeSet<(&str, &str)> = sinks_to_create
898        .iter()
899        .map(|stmt| (stmt.database.as_str(), stmt.schema.as_str()))
900        .collect();
901    for (database, schema) in sink_schemas {
902        client
903            .provisioning()
904            .create_schema(database, schema)
905            .await
906            .map_err(CliError::Connection)?;
907    }
908
909    for stmt in sinks_to_create {
910        verbose!(
911            "Creating sink {}.{}.{}...",
912            stmt.database,
913            stmt.schema,
914            stmt.object
915        );
916
917        // Execute the sink creation statement
918        if let Err(e) = client.execute(&stmt.statement_sql, &[]).await {
919            info!(
920                "Error creating sink {}.{}.{}: {}",
921                stmt.database, stmt.schema, stmt.object, e
922            );
923            return Err(CliError::SqlExecutionFailed {
924                statement: stmt.statement_sql.clone(),
925                source: e,
926            });
927        }
928
929        // Mark the statement as executed
930        client
931            .deployments()
932            .mark_statement_executed(&plan.deploy_id, stmt.sequence_num)
933            .await?;
934
935        progress::success(&format!(
936            "{}.{}.{}",
937            stmt.database, stmt.schema, stmt.object
938        ));
939    }
940
941    Ok(())
942}
943
944/// Apply replacement materialized views using data from the plan.
945///
946/// Uses `plan.replacement_mvs` instead of re-querying.
947///
948/// Idempotent across a post-swap resume: `APPLY REPLACEMENT` consumes the
949/// staging MV, and the replacement-MV records are not deleted until
950/// `cleanup_apply_state`. A crash between recording the promotion and that
951/// cleanup leaves the records in place while their staging MVs are already
952/// gone, so re-running skips any replacement whose staging source MV no longer
953/// exists rather than erroring on the dropped staging schema.
954async fn apply_replacement_mvs(client: &Client, plan: &DeploymentPlan) -> Result<(), CliError> {
955    if plan.replacement_mvs.is_empty() {
956        verbose!("No replacement MVs to apply");
957        return Ok(());
958    }
959
960    for record in &plan.replacement_mvs {
961        let already_applied = !client
962            .introspection()
963            .object_exists(
964                &record.target_database,
965                &record.replacement_schema,
966                &record.target_name,
967            )
968            .await
969            .map_err(CliError::Connection)?;
970        if already_applied {
971            verbose!(
972                "Skipping already-applied replacement {}.{}.{}",
973                record.target_database,
974                record.target_schema,
975                record.target_name
976            );
977            continue;
978        }
979
980        let alter_sql = format!(
981            "ALTER MATERIALIZED VIEW \"{}\".\"{}\".\"{}\"\
982             APPLY REPLACEMENT \"{}\".\"{}\".\"{}\";",
983            record.target_database,
984            record.target_schema,
985            record.target_name,
986            record.target_database,
987            record.replacement_schema,
988            record.target_name
989        );
990
991        verbose!("  {}", alter_sql);
992        if let Err(e) = client.execute(&alter_sql, &[]).await {
993            info!(
994                "Error applying replacement for {}.{}.{}: {}",
995                record.target_database, record.target_schema, record.target_name, e
996            );
997            return Err(CliError::SqlExecutionFailed {
998                statement: alter_sql,
999                source: e,
1000            });
1001        }
1002
1003        progress::success(&format!(
1004            "{}.{}.{}",
1005            record.target_database, record.target_schema, record.target_name
1006        ));
1007    }
1008
1009    // Drop now-empty replacement staging schemas
1010    let replacement_schemas = plan.replacement_schemas();
1011
1012    for sq in &replacement_schemas {
1013        let drop_sql = format!(
1014            "DROP SCHEMA IF EXISTS \"{}\".\"{}\" CASCADE;",
1015            sq.database, sq.schema
1016        );
1017        verbose!("  {}", drop_sql);
1018        if let Err(e) = client.execute(&drop_sql, &[]).await {
1019            info!(
1020                "warning: failed to drop replacement schema {}.{}: {}",
1021                sq.database, sq.schema, e
1022            );
1023        }
1024    }
1025
1026    Ok(())
1027}
1028
1029/// Repoint sinks that depend on objects in schemas about to be dropped.
1030///
1031/// Re-queries post-swap (cannot use `plan.dependent_sinks` because schema names
1032/// differ after swap). Reads `plan.staging_schemas` and `plan.staging_suffix`.
1033async fn repoint_dependent_sinks(client: &Client, plan: &DeploymentPlan) -> Result<(), CliError> {
1034    let staging_suffix = &plan.staging_suffix;
1035
1036    // Build list of old schema names (database, old_schema_with_suffix)
1037    // After swap, old production schemas have the staging suffix
1038    let old_schemas: Vec<SchemaQualifier> = plan
1039        .staging_schemas
1040        .iter()
1041        .map(|sq| {
1042            let prod_schema = strip_staging_suffix(&sq.schema, staging_suffix);
1043            let old_schema = format!("{}{}", prod_schema, staging_suffix);
1044            SchemaQualifier::new(sq.database.clone(), old_schema)
1045        })
1046        .collect();
1047
1048    // Find sinks depending on objects in old schemas
1049    let dependent_sinks = client
1050        .introspection()
1051        .find_sinks_depending_on_schemas(&old_schemas)
1052        .await
1053        .map_err(CliError::Connection)?;
1054
1055    if dependent_sinks.is_empty() {
1056        verbose!("No sinks depend on objects in schemas being dropped");
1057        return Ok(());
1058    }
1059
1060    // Batch check which replacement objects exist
1061    let replacement_ids: BTreeSet<ObjectId> = dependent_sinks
1062        .iter()
1063        .map(|sink| {
1064            let new_schema =
1065                strip_staging_suffix(&sink.dependency_schema, staging_suffix).to_string();
1066            ObjectId::new(
1067                sink.dependency_database.clone(),
1068                new_schema,
1069                sink.dependency_name.clone(),
1070            )
1071        })
1072        .collect();
1073
1074    let existing_ids = client
1075        .introspection()
1076        .check_objects_exist(&replacement_ids)
1077        .await
1078        .map_err(CliError::Connection)?;
1079
1080    for sink in dependent_sinks {
1081        // Compute new schema name (strip suffix to get production schema name)
1082        let new_schema = strip_staging_suffix(&sink.dependency_schema, staging_suffix);
1083
1084        let replacement_id = ObjectId::new(
1085            sink.dependency_database.clone(),
1086            new_schema.to_string(),
1087            sink.dependency_name.clone(),
1088        );
1089
1090        if !existing_ids.contains(&replacement_id) {
1091            return Err(CliError::SinkRepointFailed {
1092                sink: format!(
1093                    "{}.{}.{}",
1094                    sink.sink_database, sink.sink_schema, sink.sink_name
1095                ),
1096                reason: format!("replacement object {} does not exist", replacement_id),
1097            });
1098        }
1099
1100        // Execute ALTER SINK ... SET FROM
1101        let alter_sql = format!(
1102            r#"ALTER SINK "{}"."{}"."{}" SET FROM "{}"."{}"."{}""#,
1103            sink.sink_database,
1104            sink.sink_schema,
1105            sink.sink_name,
1106            sink.dependency_database,
1107            new_schema,
1108            sink.dependency_name
1109        );
1110
1111        verbose!("  {}", alter_sql);
1112        if let Err(e) = client.execute(&alter_sql, &[]).await {
1113            return Err(CliError::SinkRepointFailed {
1114                sink: format!(
1115                    "{}.{}.{}",
1116                    sink.sink_database, sink.sink_schema, sink.sink_name
1117                ),
1118                reason: e.to_string(),
1119            });
1120        }
1121
1122        progress::success(&format!(
1123            "{}.{}.{} -> {}.{}.{}",
1124            sink.sink_database,
1125            sink.sink_schema,
1126            sink.sink_name,
1127            sink.dependency_database,
1128            new_schema,
1129            sink.dependency_name
1130        ));
1131    }
1132
1133    Ok(())
1134}
1135
1136/// Drop old production resources after the swap.
1137async fn drop_old_resources(client: &Client, plan: &DeploymentPlan) {
1138    let staging_suffix = &plan.staging_suffix;
1139
1140    // Drop schemas
1141    for sq in &plan.staging_schemas {
1142        let prod_schema = strip_staging_suffix(&sq.schema, staging_suffix);
1143        let old_schema = format!("{}{}", prod_schema, staging_suffix);
1144        let drop_sql = format!(
1145            "DROP SCHEMA IF EXISTS \"{}\".\"{}\" CASCADE;",
1146            sq.database, old_schema
1147        );
1148
1149        verbose!("  {}", drop_sql);
1150        if let Err(e) = client.execute(&drop_sql, &[]).await {
1151            info!(
1152                "warning: failed to drop old schema {}.{}: {}",
1153                sq.database, old_schema, e
1154            );
1155        }
1156    }
1157
1158    // Drop clusters
1159    for cluster in &plan.staging_clusters {
1160        let old_cluster = format!("{}{}", cluster, staging_suffix);
1161        let drop_sql = format!("DROP CLUSTER IF EXISTS \"{}\" CASCADE;", old_cluster);
1162
1163        verbose!("  {}", drop_sql);
1164        if let Err(e) = client.execute(&drop_sql, &[]).await {
1165            info!("warning: failed to drop old cluster {}: {}", old_cluster, e);
1166        }
1167    }
1168}
1169
1170#[cfg(test)]
1171mod tests {
1172    use super::strip_staging_suffix;
1173
1174    #[mz_ore::test]
1175    fn test_strip_staging_suffix_removes_suffix_once() {
1176        // Ordinary case: a single suffix is removed.
1177        assert_eq!(
1178            strip_staging_suffix("analytics_deploy123", "_deploy123"),
1179            "analytics"
1180        );
1181    }
1182
1183    #[mz_ore::test]
1184    fn test_strip_staging_suffix_strips_exactly_once() {
1185        // Regression test for QA Finding 4.
1186        //
1187        // With `--deploy-id prod`, the staging suffix is `_prod`. A production
1188        // schema that legitimately ends in `_prod` (e.g. `analytics_prod`) stages
1189        // as `analytics_prod_prod`. Recovering the production name must strip the
1190        // suffix exactly once, yielding `analytics_prod` — NOT `analytics`, which
1191        // would aim the cutover swap and the follow-up `DROP ... CASCADE` at the
1192        // wrong production schema. `str::trim_end_matches` strips every trailing
1193        // occurrence and would regress this.
1194        assert_eq!(
1195            strip_staging_suffix("analytics_prod_prod", "_prod"),
1196            "analytics_prod"
1197        );
1198    }
1199
1200    #[mz_ore::test]
1201    fn test_strip_staging_suffix_no_match_returns_input() {
1202        // A name that does not end with the suffix is returned unchanged.
1203        assert_eq!(strip_staging_suffix("analytics", "_prod"), "analytics");
1204    }
1205}