Skip to main content

mz_deploy/cli/commands/
promote.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Promote command - promote staging deployment to production via ALTER SWAP.
11
12use crate::cli::CliError;
13use crate::cli::progress;
14use crate::client::{
15    ApplyState, Client, DependentSink, DeploymentKind, PendingStatement, ReplacementMvRecord,
16};
17use crate::config::Settings;
18use crate::log;
19use crate::project::SchemaQualifier;
20use crate::project::analysis::deployment_snapshot;
21use crate::project::ir::object_id::ObjectId;
22use crate::{info, verbose};
23use itertools::Itertools;
24use owo_colors::{OwoColorize, Stream, Style};
25use std::collections::BTreeSet;
26use std::fmt;
27
28/// Recover a production name from a staging name by removing the staging suffix.
29///
30/// Staging appends the suffix (`_<deploy_id>`) exactly once, so it must be
31/// removed exactly once. `str::trim_end_matches` strips *every* trailing
32/// occurrence, which mangles production names that legitimately end in the
33/// suffix: with `--deploy-id prod`, production schema `analytics_prod` is staged
34/// as `analytics_prod_prod`, and trimming all occurrences would yield
35/// `analytics` instead of `analytics_prod` — pointing the cutover swap and the
36/// subsequent `DROP ... CASCADE` at the wrong production schema.
37///
38/// If the name does not end with the suffix it is returned unchanged.
39fn strip_staging_suffix<'a>(staging_name: &'a str, staging_suffix: &str) -> &'a str {
40    staging_name
41        .strip_suffix(staging_suffix)
42        .unwrap_or(staging_name)
43}
44
45/// Central data structure holding everything needed to display and execute a deployment.
46///
47/// Computed once by `generate_deployment_plan()` and consumed for display (human + JSON)
48/// and execution. Raw domain objects are stored so execution functions don't re-query.
49struct DeploymentPlan {
50    deploy_id: String,
51    apply_state: ApplyState,
52    staging_suffix: String,
53    staging_schemas: BTreeSet<SchemaQualifier>,
54    staging_clusters: BTreeSet<String>,
55    pending_statements: Vec<PendingStatement>,
56    replacement_mvs: Vec<ReplacementMvRecord>,
57    dependent_sinks: Vec<DependentSink>,
58}
59
60#[derive(serde::Serialize)]
61struct SchemaSwapView<'a> {
62    database: &'a str,
63    production_schema: String,
64    staging_schema: &'a str,
65}
66
67#[derive(serde::Serialize)]
68struct ClusterSwapView<'a> {
69    production_cluster: &'a str,
70    staging_cluster: String,
71}
72
73#[derive(serde::Serialize)]
74struct SinkToCreateView<'a> {
75    database: &'a str,
76    schema: &'a str,
77    object: &'a str,
78}
79
80#[derive(serde::Serialize)]
81struct ReplacementMvView<'a> {
82    target_database: &'a str,
83    target_schema: &'a str,
84    target_name: &'a str,
85    replacement_schema: &'a str,
86}
87
88#[derive(serde::Serialize)]
89struct SinkToRepointView<'a> {
90    sink_database: &'a str,
91    sink_schema: &'a str,
92    sink_name: &'a str,
93    dependency_database: &'a str,
94    dependency_schema: &'a str,
95    dependency_name: &'a str,
96}
97
98#[derive(serde::Serialize)]
99struct ResourceToDropView {
100    kind: String,
101    name: String,
102}
103
104impl DeploymentPlan {
105    fn apply_state_str(&self) -> &'static str {
106        match self.apply_state {
107            ApplyState::NotStarted => "not_started",
108            ApplyState::PreSwap => "pre_swap",
109            ApplyState::PostSwap => "post_swap",
110        }
111    }
112
113    fn schema_swaps(&self) -> Vec<SchemaSwapView<'_>> {
114        self.staging_schemas
115            .iter()
116            .map(|sq| {
117                let prod_schema =
118                    strip_staging_suffix(&sq.schema, &self.staging_suffix).to_string();
119                SchemaSwapView {
120                    database: &sq.database,
121                    production_schema: prod_schema,
122                    staging_schema: &sq.schema,
123                }
124            })
125            .collect()
126    }
127
128    fn cluster_swaps(&self) -> Vec<ClusterSwapView<'_>> {
129        self.staging_clusters
130            .iter()
131            .map(|cluster| ClusterSwapView {
132                production_cluster: cluster,
133                staging_cluster: format!("{}{}", cluster, self.staging_suffix),
134            })
135            .collect()
136    }
137
138    fn sinks_to_create(&self) -> Vec<SinkToCreateView<'_>> {
139        self.pending_statements
140            .iter()
141            .map(|stmt| SinkToCreateView {
142                database: &stmt.database,
143                schema: &stmt.schema,
144                object: &stmt.object,
145            })
146            .collect()
147    }
148
149    fn replacement_mv_views(&self) -> Vec<ReplacementMvView<'_>> {
150        self.replacement_mvs
151            .iter()
152            .map(|r| ReplacementMvView {
153                target_database: &r.target_database,
154                target_schema: &r.target_schema,
155                target_name: &r.target_name,
156                replacement_schema: &r.replacement_schema,
157            })
158            .collect()
159    }
160
161    fn sinks_to_repoint(&self) -> Vec<SinkToRepointView<'_>> {
162        self.dependent_sinks
163            .iter()
164            .map(|sink| SinkToRepointView {
165                sink_database: &sink.sink_database,
166                sink_schema: &sink.sink_schema,
167                sink_name: &sink.sink_name,
168                dependency_database: &sink.dependency_database,
169                dependency_schema: &sink.dependency_schema,
170                dependency_name: &sink.dependency_name,
171            })
172            .collect()
173    }
174
175    fn replacement_schemas(&self) -> BTreeSet<SchemaQualifier> {
176        self.replacement_mvs
177            .iter()
178            .map(|r| SchemaQualifier::new(r.target_database.clone(), r.replacement_schema.clone()))
179            .collect()
180    }
181
182    fn resources_to_drop(&self) -> Vec<ResourceToDropView> {
183        let mut drops = Vec::new();
184        for sq in &self.staging_schemas {
185            let prod_schema = strip_staging_suffix(&sq.schema, &self.staging_suffix);
186            let old_schema = format!("{}{}", prod_schema, self.staging_suffix);
187            drops.push(ResourceToDropView {
188                kind: "schema".to_string(),
189                name: format!("{}.{}", sq.database, old_schema),
190            });
191        }
192        for cluster in &self.staging_clusters {
193            let old_cluster = format!("{}{}", cluster, self.staging_suffix);
194            drops.push(ResourceToDropView {
195                kind: "cluster".to_string(),
196                name: old_cluster,
197            });
198        }
199        for sq in &self.replacement_schemas() {
200            drops.push(ResourceToDropView {
201                kind: "schema".to_string(),
202                name: format!("{}.{}", sq.database, sq.schema),
203            });
204        }
205        drops
206    }
207}
208
209impl serde::Serialize for DeploymentPlan {
210    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
211        use serde::ser::SerializeStruct;
212        let mut state = serializer.serialize_struct("DeploymentPlan", 8)?;
213        state.serialize_field("deploy_id", &self.deploy_id)?;
214        state.serialize_field("apply_state", self.apply_state_str())?;
215        state.serialize_field("schema_swaps", &self.schema_swaps())?;
216        state.serialize_field("cluster_swaps", &self.cluster_swaps())?;
217        state.serialize_field("sinks_to_create", &self.sinks_to_create())?;
218        state.serialize_field("replacement_mvs", &self.replacement_mv_views())?;
219        state.serialize_field("sinks_to_repoint", &self.sinks_to_repoint())?;
220        state.serialize_field("resources_to_drop", &self.resources_to_drop())?;
221        state.end()
222    }
223}
224
225impl fmt::Display for DeploymentPlan {
226    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
227        // Note resume state if applicable
228        match self.apply_state {
229            ApplyState::PreSwap => {
230                let style = Style::new().yellow().bold();
231                writeln!(
232                    f,
233                    "\n  {} Resuming from pre-swap state (apply state schemas already created)",
234                    "note:".if_supports_color(Stream::Stderr, |t| style.style(t))
235                )?;
236            }
237            ApplyState::PostSwap => {
238                let style = Style::new().yellow().bold();
239                writeln!(
240                    f,
241                    "\n  {} Resuming from post-swap state (swap already completed, showing remaining work)",
242                    "note:".if_supports_color(Stream::Stderr, |t| style.style(t))
243                )?;
244            }
245            ApplyState::NotStarted => {}
246        }
247
248        // Schema Swaps
249        writeln!(
250            f,
251            "\n  {}",
252            "Schema Swaps:".if_supports_color(Stream::Stderr, |t| t.bold())
253        )?;
254        if self.staging_schemas.is_empty() {
255            writeln!(f, "    (none)")?;
256        } else {
257            for swap in &self.schema_swaps() {
258                writeln!(
259                    f,
260                    "    {} {}.{} {} {}.{}",
261                    "~".if_supports_color(Stream::Stderr, |t| t.yellow()),
262                    swap.database,
263                    swap.production_schema,
264                    "<->".if_supports_color(Stream::Stderr, |t| t.dimmed()),
265                    swap.database,
266                    swap.staging_schema
267                )?;
268            }
269        }
270
271        // Cluster Swaps
272        writeln!(
273            f,
274            "\n  {}",
275            "Cluster Swaps:".if_supports_color(Stream::Stderr, |t| t.bold())
276        )?;
277        if self.staging_clusters.is_empty() {
278            writeln!(f, "    (none)")?;
279        } else {
280            for swap in &self.cluster_swaps() {
281                writeln!(
282                    f,
283                    "    {} {} {} {}",
284                    "~".if_supports_color(Stream::Stderr, |t| t.yellow()),
285                    swap.production_cluster,
286                    "<->".if_supports_color(Stream::Stderr, |t| t.dimmed()),
287                    swap.staging_cluster
288                )?;
289            }
290        }
291
292        // Sinks to Create
293        writeln!(
294            f,
295            "\n  {}",
296            "Sinks to Create:".if_supports_color(Stream::Stderr, |t| t.bold())
297        )?;
298        if self.pending_statements.is_empty() {
299            writeln!(f, "    (none)")?;
300        } else {
301            for stmt in &self.pending_statements {
302                writeln!(
303                    f,
304                    "    {} {}.{}.{}",
305                    "+".if_supports_color(Stream::Stderr, |t| t.green()),
306                    stmt.database,
307                    stmt.schema,
308                    stmt.object
309                )?;
310            }
311        }
312
313        // Replacement MVs
314        writeln!(
315            f,
316            "\n  {}",
317            "Replacement Materialized Views:".if_supports_color(Stream::Stderr, |t| t.bold())
318        )?;
319        if self.replacement_mvs.is_empty() {
320            writeln!(f, "    (none)")?;
321        } else {
322            for record in &self.replacement_mvs {
323                writeln!(
324                    f,
325                    "    {} {}.{}.{} {} {}.{}.{}",
326                    "~".if_supports_color(Stream::Stderr, |t| t.yellow()),
327                    record.target_database,
328                    record.target_schema,
329                    record.target_name,
330                    "<-".if_supports_color(Stream::Stderr, |t| t.dimmed()),
331                    record.target_database,
332                    record.replacement_schema,
333                    record.target_name
334                )?;
335            }
336        }
337
338        // Sinks to Repoint
339        writeln!(
340            f,
341            "\n  {}",
342            "Sinks to Repoint:".if_supports_color(Stream::Stderr, |t| t.bold())
343        )?;
344        if self.dependent_sinks.is_empty() {
345            writeln!(f, "    (none)")?;
346        } else {
347            for sink in &self.dependent_sinks {
348                writeln!(
349                    f,
350                    "    {} {}.{}.{} {} {}.{}.{}",
351                    "~".if_supports_color(Stream::Stderr, |t| t.yellow()),
352                    sink.sink_database,
353                    sink.sink_schema,
354                    sink.sink_name,
355                    "->".if_supports_color(Stream::Stderr, |t| t.dimmed()),
356                    sink.dependency_database,
357                    sink.dependency_schema,
358                    sink.dependency_name
359                )?;
360            }
361        }
362
363        // Old Resources to Drop
364        writeln!(
365            f,
366            "\n  {}",
367            "Old Resources to Drop:".if_supports_color(Stream::Stderr, |t| t.bold())
368        )?;
369        let drops = self.resources_to_drop();
370        if drops.is_empty() {
371            writeln!(f, "    (none)")?;
372        } else {
373            for drop in &drops {
374                writeln!(
375                    f,
376                    "    {} {}",
377                    "-".if_supports_color(Stream::Stderr, |t| t.red()),
378                    drop.name
379                )?;
380            }
381        }
382
383        Ok(())
384    }
385}
386
387/// Build a complete deployment plan by querying the database once for all data.
388async fn generate_deployment_plan(
389    client: &Client,
390    deploy_id: &str,
391    apply_state: ApplyState,
392    force: bool,
393) -> Result<DeploymentPlan, CliError> {
394    // 1. Gather swap resources (empty for PostSwap)
395    let (staging_schemas, staging_clusters, staging_suffix) = if apply_state == ApplyState::PostSwap
396    {
397        verbose!("Resuming post-swap: skipping conflict check and resource gathering");
398        (BTreeSet::new(), BTreeSet::new(), format!("_{}", deploy_id))
399    } else {
400        gather_resources_and_check_conflicts(client, deploy_id, force).await?
401    };
402
403    // 2. Query pending statements once
404    let pending_statements = client
405        .deployments()
406        .get_pending_statements(deploy_id)
407        .await?;
408
409    // 3. Query replacement MVs once
410    let replacement_mvs = client.deployments().get_replacement_mvs(deploy_id).await?;
411
412    // 4. Query dependent sinks for display (pre-swap schema names)
413    let dependent_sinks = if staging_schemas.is_empty() {
414        Vec::new()
415    } else {
416        let prod_schemas: Vec<SchemaQualifier> = staging_schemas
417            .iter()
418            .map(|sq| {
419                let prod_schema = strip_staging_suffix(&sq.schema, &staging_suffix);
420                SchemaQualifier::new(sq.database.clone(), prod_schema.to_string())
421            })
422            .collect();
423
424        client
425            .introspection()
426            .find_sinks_depending_on_schemas(&prod_schemas)
427            .await
428            .map_err(CliError::Connection)?
429    };
430
431    Ok(DeploymentPlan {
432        deploy_id: deploy_id.to_string(),
433        apply_state,
434        staging_suffix,
435        staging_schemas,
436        staging_clusters,
437        pending_statements,
438        replacement_mvs,
439        dependent_sinks,
440    })
441}
442
443/// Promote a staging deployment to production using ALTER SWAP.
444pub async fn run(
445    settings: &Settings,
446    deploy_id: &str,
447    force: bool,
448    dry_run: bool,
449) -> Result<(), CliError> {
450    let profile = settings.connection();
451
452    let client = Client::connect_with_profile(profile.clone())
453        .await
454        .map_err(CliError::Connection)?;
455
456    super::setup::verify(&client, settings.emulator()).await?;
457    let role = super::setup::validate_connection(&client, settings.emulator()).await?;
458    super::setup::require_deployer(role)?;
459
460    // Validate deployment exists and is not promoted
461    client.deployments().validate_staging(deploy_id).await?;
462
463    let apply_state = client.deployments().get_apply_state(deploy_id).await?;
464    verbose!("Apply state: {:?}", apply_state);
465
466    let staging_snapshot =
467        deployment_snapshot::load_from_database(&client, Some(deploy_id)).await?;
468    verbose!(
469        "Found {} objects in staging deployment",
470        staging_snapshot.objects.len()
471    );
472
473    let plan = generate_deployment_plan(&client, deploy_id, apply_state, force).await?;
474
475    if dry_run {
476        log::output(&plan);
477        return Ok(());
478    }
479
480    if log::json_output_enabled() {
481        log::output_json(&plan);
482    }
483
484    execute_swap_phase(&client, &plan).await?;
485    run_post_swap_steps(&client, &plan).await?;
486    cleanup_apply_state(&client, &plan.deploy_id).await?;
487
488    progress::success("Deployment completed successfully!");
489
490    Ok(())
491}
492
493/// Runs the swap portion of apply according to persisted resume state.
494async fn execute_swap_phase(client: &Client, plan: &DeploymentPlan) -> Result<(), CliError> {
495    match plan.apply_state {
496        ApplyState::NotStarted => {
497            verbose!("Creating apply state schemas...");
498            client
499                .deployments()
500                .create_apply_state_schemas(&plan.deploy_id)
501                .await?;
502            verbose!("Executing atomic swap...");
503            execute_atomic_swap(
504                client,
505                &plan.deploy_id,
506                &plan.staging_schemas,
507                &plan.staging_clusters,
508            )
509            .await?;
510        }
511        ApplyState::PreSwap => {
512            verbose!("Resuming from pre-swap state...");
513            execute_atomic_swap(
514                client,
515                &plan.deploy_id,
516                &plan.staging_schemas,
517                &plan.staging_clusters,
518            )
519            .await?;
520        }
521        ApplyState::PostSwap => {
522            verbose!("Resuming from post-swap state...");
523        }
524    }
525    Ok(())
526}
527
528/// Completes promotion work that must happen after schemas/clusters are swapped.
529async fn run_post_swap_steps(client: &Client, plan: &DeploymentPlan) -> Result<(), CliError> {
530    execute_pending_sinks(client, plan).await?;
531    apply_replacement_mvs(client, plan).await?;
532
533    if !plan.staging_schemas.is_empty() {
534        repoint_dependent_sinks(client, plan).await?;
535    }
536
537    verbose!("\nUpdating deployment table...");
538    client
539        .deployments()
540        .update_promoted_at(&plan.deploy_id)
541        .await
542        .map_err(|source| CliError::DeploymentStateWriteFailed { source })?;
543
544    if !plan.staging_schemas.is_empty() || !plan.staging_clusters.is_empty() {
545        drop_old_resources(client, plan).await;
546    }
547    Ok(())
548}
549
550/// Removes deployment bookkeeping that is only needed while apply is in-flight.
551async fn cleanup_apply_state(client: &Client, deploy_id: &str) -> Result<(), CliError> {
552    verbose!("Cleaning up apply state...");
553    client
554        .deployments()
555        .delete_apply_state_schemas(deploy_id)
556        .await?;
557    client
558        .deployments()
559        .delete_pending_statements(deploy_id)
560        .await?;
561    client
562        .deployments()
563        .delete_replacement_mvs(deploy_id)
564        .await?;
565    client
566        .deployments()
567        .delete_deployment_clusters(deploy_id)
568        .await
569        .map_err(|source| CliError::DeploymentStateWriteFailed { source })?;
570    Ok(())
571}
572
573/// Gather staging resources and check for deployment conflicts.
574///
575/// Returns the staging schemas, clusters, and suffix for the swap operation.
576/// Loads the production snapshot to determine which Replacement schemas need
577/// swapping (first Replacement deployment) vs. skipping (steady state).
578async fn gather_resources_and_check_conflicts(
579    client: &Client,
580    deploy_id: &str,
581    force: bool,
582) -> Result<(BTreeSet<SchemaQualifier>, BTreeSet<String>, String), CliError> {
583    verbose!("Checking for deployment conflicts...");
584    let conflicts = client
585        .deployments()
586        .check_deployment_conflicts(deploy_id)
587        .await?;
588
589    if !conflicts.is_empty() {
590        if force {
591            // With --force, show warning but continue
592            let style = Style::new().yellow().bold();
593            info!(
594                "\n{}: deployment conflicts detected, but continuing due to --force flag",
595                "warning".if_supports_color(Stream::Stderr, |t| style.style(t))
596            );
597            for conflict in &conflicts {
598                info!(
599                    "  - {}.{} (last promoted by '{}' deployment)",
600                    conflict.database, conflict.schema, conflict.deploy_id
601                );
602            }
603            info!();
604        } else {
605            // Without --force, return error
606            return Err(CliError::DeploymentConflict { conflicts });
607        }
608    } else {
609        verbose!("No conflicts detected");
610    }
611
612    // Get schemas and clusters from deployment tables
613    let staging_suffix = format!("_{}", deploy_id);
614    let mut staging_schemas = BTreeSet::new();
615    let mut staging_clusters = BTreeSet::new();
616
617    // Get schemas from deploy.deployments table for this deployment
618    let deployment_records = client
619        .deployments()
620        .get_schema_deployments(Some(deploy_id))
621        .await?;
622
623    // Load production snapshot to check which schemas are already Replacement.
624    // This distinguishes Objects→Replacement transitions (need swap) from
625    // steady-state Replacement deploys (skip swap, use APPLY REPLACEMENT).
626    let production_snapshot = deployment_snapshot::load_from_database(client, None).await?;
627
628    // Build list of (database, staging_schema) pairs to check, filtering out Sinks
629    // and steady-state Replacement schemas (already Replacement in production).
630    let schemas_to_check: Vec<(String, String)> = deployment_records
631        .iter()
632        .filter(|record| {
633            if record.kind == DeploymentKind::Sinks {
634                verbose!(
635                    "Skipping sink-only schema {}.{} (no swap needed)",
636                    record.database,
637                    record.schema
638                );
639                false
640            } else if record.kind == DeploymentKind::Replacement {
641                let sq = SchemaQualifier::new(record.database.clone(), record.schema.clone());
642                let already_replacement = production_snapshot.schemas.get(&sq).copied()
643                    == Some(DeploymentKind::Replacement);
644                if already_replacement {
645                    verbose!(
646                        "Skipping replacement schema {}.{} (already Replacement in production)",
647                        record.database,
648                        record.schema
649                    );
650                    false
651                } else {
652                    verbose!(
653                        "Including replacement schema {}.{} in swap (first Replacement deployment)",
654                        record.database,
655                        record.schema
656                    );
657                    true
658                }
659            } else {
660                true
661            }
662        })
663        .map(|record| {
664            let staging_schema = format!("{}{}", record.schema, staging_suffix);
665            (record.database.clone(), staging_schema)
666        })
667        .collect();
668
669    // Batch check which staging schemas exist
670    let existing_schemas = client
671        .introspection()
672        .check_schemas_exist(&schemas_to_check)
673        .await?;
674
675    for pair in schemas_to_check {
676        if existing_schemas.contains(&pair) {
677            staging_schemas.insert(SchemaQualifier::new(pair.0, pair.1));
678        } else {
679            info!("Warning: Staging schema {}.{} not found", pair.0, pair.1);
680        }
681    }
682
683    // Validate that all clusters in the deployment still exist
684    client
685        .deployments()
686        .validate_deployment_clusters(deploy_id)
687        .await?;
688
689    // Get clusters from deploy.clusters table
690    let cluster_names = client
691        .deployments()
692        .get_deployment_clusters(deploy_id)
693        .await?;
694
695    // Batch check which staging clusters exist
696    let staging_cluster_names: Vec<String> = cluster_names
697        .iter()
698        .map(|name| format!("{}{}", name, staging_suffix))
699        .collect();
700    let existing_clusters = client
701        .introspection()
702        .check_clusters_exist(&staging_cluster_names)
703        .await?;
704
705    for (cluster_name, staging_cluster) in cluster_names.into_iter().zip_eq(staging_cluster_names) {
706        if existing_clusters.contains(&staging_cluster) {
707            staging_clusters.insert(cluster_name);
708        } else {
709            info!("Warning: Staging cluster {} not found", staging_cluster);
710        }
711    }
712
713    verbose!("\nSchemas to swap:");
714    for sq in &staging_schemas {
715        let prod_schema = strip_staging_suffix(&sq.schema, &staging_suffix);
716        verbose!("  - {}.{} <-> {}", sq.database, sq.schema, prod_schema);
717    }
718
719    if !staging_clusters.is_empty() {
720        verbose!("\nClusters to swap:");
721        for cluster in &staging_clusters {
722            let staging_cluster = format!("{}{}", cluster, staging_suffix);
723            verbose!("  - {} <-> {}", staging_cluster, cluster);
724        }
725    }
726
727    Ok((staging_schemas, staging_clusters, staging_suffix))
728}
729
730/// Execute the atomic swap of schemas, clusters, and state schemas.
731async fn execute_atomic_swap(
732    client: &Client,
733    deploy_id: &str,
734    staging_schemas: &BTreeSet<SchemaQualifier>,
735    staging_clusters: &BTreeSet<String>,
736) -> Result<(), CliError> {
737    let staging_suffix = format!("_{}", deploy_id);
738
739    // Begin transaction for atomic swap
740    client
741        .execute("BEGIN", &[])
742        .await
743        .map_err(|e| CliError::SqlExecutionFailed {
744            statement: "BEGIN".to_string(),
745            source: e,
746        })?;
747
748    // Swap schemas
749    for sq in staging_schemas {
750        let prod_schema = strip_staging_suffix(&sq.schema, &staging_suffix);
751        // Note: second schema name is NOT fully qualified (same database)
752        let swap_sql = format!(
753            "ALTER SCHEMA \"{}\".\"{}\" SWAP WITH \"{}\";",
754            sq.database, prod_schema, sq.schema
755        );
756
757        verbose!("  {}", swap_sql);
758        if let Err(e) = client.execute(&swap_sql, &[]).await {
759            let _ = client.execute("ROLLBACK", &[]).await;
760            return Err(CliError::SqlExecutionFailed {
761                statement: swap_sql,
762                source: e,
763            });
764        }
765    }
766
767    // Swap clusters
768    for cluster in staging_clusters {
769        let staging_cluster = format!("{}{}", cluster, staging_suffix);
770        let swap_sql = format!(
771            "ALTER CLUSTER \"{}\" SWAP WITH \"{}\";",
772            cluster, staging_cluster
773        );
774
775        verbose!("  {}", swap_sql);
776        if let Err(e) = client.execute(&swap_sql, &[]).await {
777            let _ = client.execute("ROLLBACK", &[]).await;
778            return Err(CliError::SqlExecutionFailed {
779                statement: swap_sql,
780                source: e,
781            });
782        }
783    }
784
785    // Swap the apply state schemas - this atomically marks the swap as complete
786    let pre_schema = format!("apply_{}_pre", deploy_id);
787    let post_schema = format!("apply_{}_post", deploy_id);
788    let state_swap_sql = format!(
789        "ALTER SCHEMA _mz_deploy.\"{}\" SWAP WITH \"{}\";",
790        pre_schema, post_schema
791    );
792
793    verbose!("  {}", state_swap_sql);
794    if let Err(e) = client.execute(&state_swap_sql, &[]).await {
795        let _ = client.execute("ROLLBACK", &[]).await;
796        return Err(CliError::SqlExecutionFailed {
797            statement: state_swap_sql,
798            source: e,
799        });
800    }
801
802    // Commit transaction
803    client
804        .execute("COMMIT", &[])
805        .await
806        .map_err(|e| CliError::SqlExecutionFailed {
807            statement: "COMMIT".to_string(),
808            source: e,
809        })?;
810
811    verbose!("Swap completed successfully");
812    Ok(())
813}
814
815/// Execute pending sink statements using data from the plan.
816///
817/// Uses `plan.pending_statements` instead of re-querying. Still checks sink
818/// existence at execution time for idempotency.
819async fn execute_pending_sinks(client: &Client, plan: &DeploymentPlan) -> Result<(), CliError> {
820    if plan.pending_statements.is_empty() {
821        verbose!("No pending sinks to execute");
822        return Ok(());
823    }
824
825    // Build set of sink ObjectIds from pending statements
826    let sink_ids: BTreeSet<ObjectId> = plan
827        .pending_statements
828        .iter()
829        .map(|stmt| {
830            ObjectId::new(
831                stmt.database.clone(),
832                stmt.schema.clone(),
833                stmt.object.clone(),
834            )
835        })
836        .collect();
837
838    // Check which sinks already exist (like tables, skip existing ones)
839    let existing_sinks = client.introspection().check_sinks_exist(&sink_ids).await?;
840
841    // Filter to only sinks that don't exist
842    let sinks_to_create: Vec<_> = plan
843        .pending_statements
844        .iter()
845        .filter(|stmt| {
846            let obj_id = ObjectId::new(
847                stmt.database.clone(),
848                stmt.schema.clone(),
849                stmt.object.clone(),
850            );
851            !existing_sinks.contains(&obj_id)
852        })
853        .collect();
854
855    // Log skipped sinks
856    if !existing_sinks.is_empty() {
857        let mut existing_list: Vec<_> = existing_sinks.iter().collect();
858        existing_list.sort_by_key(|obj| obj.to_string());
859        for sink_id in existing_list {
860            verbose!("  - {}", sink_id);
861        }
862    }
863
864    if sinks_to_create.is_empty() {
865        return Ok(());
866    }
867
868    // Sink-only schemas are created here: `apply` only ensures schemas for
869    // its own phases (secrets/connections/sources/tables) and `stage`
870    // excludes sinks from staging schemas, so a schema containing nothing
871    // but sinks does not exist yet.
872    let sink_schemas: BTreeSet<(&str, &str)> = sinks_to_create
873        .iter()
874        .map(|stmt| (stmt.database.as_str(), stmt.schema.as_str()))
875        .collect();
876    for (database, schema) in sink_schemas {
877        client
878            .provisioning()
879            .create_schema(database, schema)
880            .await
881            .map_err(CliError::Connection)?;
882    }
883
884    for stmt in sinks_to_create {
885        verbose!(
886            "Creating sink {}.{}.{}...",
887            stmt.database,
888            stmt.schema,
889            stmt.object
890        );
891
892        // Execute the sink creation statement
893        if let Err(e) = client.execute(&stmt.statement_sql, &[]).await {
894            info!(
895                "Error creating sink {}.{}.{}: {}",
896                stmt.database, stmt.schema, stmt.object, e
897            );
898            return Err(CliError::SqlExecutionFailed {
899                statement: stmt.statement_sql.clone(),
900                source: e,
901            });
902        }
903
904        // Mark the statement as executed
905        client
906            .deployments()
907            .mark_statement_executed(&plan.deploy_id, stmt.sequence_num)
908            .await?;
909
910        progress::success(&format!(
911            "{}.{}.{}",
912            stmt.database, stmt.schema, stmt.object
913        ));
914    }
915
916    Ok(())
917}
918
919/// Apply replacement materialized views using data from the plan.
920///
921/// Uses `plan.replacement_mvs` instead of re-querying.
922async fn apply_replacement_mvs(client: &Client, plan: &DeploymentPlan) -> Result<(), CliError> {
923    if plan.replacement_mvs.is_empty() {
924        verbose!("No replacement MVs to apply");
925        return Ok(());
926    }
927
928    for record in &plan.replacement_mvs {
929        let alter_sql = format!(
930            "ALTER MATERIALIZED VIEW \"{}\".\"{}\".\"{}\"\
931             APPLY REPLACEMENT \"{}\".\"{}\".\"{}\";",
932            record.target_database,
933            record.target_schema,
934            record.target_name,
935            record.target_database,
936            record.replacement_schema,
937            record.target_name
938        );
939
940        verbose!("  {}", alter_sql);
941        if let Err(e) = client.execute(&alter_sql, &[]).await {
942            info!(
943                "Error applying replacement for {}.{}.{}: {}",
944                record.target_database, record.target_schema, record.target_name, e
945            );
946            return Err(CliError::SqlExecutionFailed {
947                statement: alter_sql,
948                source: e,
949            });
950        }
951
952        progress::success(&format!(
953            "{}.{}.{}",
954            record.target_database, record.target_schema, record.target_name
955        ));
956    }
957
958    // Drop now-empty replacement staging schemas
959    let replacement_schemas = plan.replacement_schemas();
960
961    for sq in &replacement_schemas {
962        let drop_sql = format!(
963            "DROP SCHEMA IF EXISTS \"{}\".\"{}\" CASCADE;",
964            sq.database, sq.schema
965        );
966        verbose!("  {}", drop_sql);
967        if let Err(e) = client.execute(&drop_sql, &[]).await {
968            info!(
969                "warning: failed to drop replacement schema {}.{}: {}",
970                sq.database, sq.schema, e
971            );
972        }
973    }
974
975    Ok(())
976}
977
978/// Repoint sinks that depend on objects in schemas about to be dropped.
979///
980/// Re-queries post-swap (cannot use `plan.dependent_sinks` because schema names
981/// differ after swap). Reads `plan.staging_schemas` and `plan.staging_suffix`.
982async fn repoint_dependent_sinks(client: &Client, plan: &DeploymentPlan) -> Result<(), CliError> {
983    let staging_suffix = &plan.staging_suffix;
984
985    // Build list of old schema names (database, old_schema_with_suffix)
986    // After swap, old production schemas have the staging suffix
987    let old_schemas: Vec<SchemaQualifier> = plan
988        .staging_schemas
989        .iter()
990        .map(|sq| {
991            let prod_schema = strip_staging_suffix(&sq.schema, staging_suffix);
992            let old_schema = format!("{}{}", prod_schema, staging_suffix);
993            SchemaQualifier::new(sq.database.clone(), old_schema)
994        })
995        .collect();
996
997    // Find sinks depending on objects in old schemas
998    let dependent_sinks = client
999        .introspection()
1000        .find_sinks_depending_on_schemas(&old_schemas)
1001        .await
1002        .map_err(CliError::Connection)?;
1003
1004    if dependent_sinks.is_empty() {
1005        verbose!("No sinks depend on objects in schemas being dropped");
1006        return Ok(());
1007    }
1008
1009    // Batch check which replacement objects exist
1010    let replacement_ids: BTreeSet<ObjectId> = dependent_sinks
1011        .iter()
1012        .map(|sink| {
1013            let new_schema =
1014                strip_staging_suffix(&sink.dependency_schema, staging_suffix).to_string();
1015            ObjectId::new(
1016                sink.dependency_database.clone(),
1017                new_schema,
1018                sink.dependency_name.clone(),
1019            )
1020        })
1021        .collect();
1022
1023    let existing_ids = client
1024        .introspection()
1025        .check_objects_exist(&replacement_ids)
1026        .await
1027        .map_err(CliError::Connection)?;
1028
1029    for sink in dependent_sinks {
1030        // Compute new schema name (strip suffix to get production schema name)
1031        let new_schema = strip_staging_suffix(&sink.dependency_schema, staging_suffix);
1032
1033        let replacement_id = ObjectId::new(
1034            sink.dependency_database.clone(),
1035            new_schema.to_string(),
1036            sink.dependency_name.clone(),
1037        );
1038
1039        if !existing_ids.contains(&replacement_id) {
1040            return Err(CliError::SinkRepointFailed {
1041                sink: format!(
1042                    "{}.{}.{}",
1043                    sink.sink_database, sink.sink_schema, sink.sink_name
1044                ),
1045                reason: format!("replacement object {} does not exist", replacement_id),
1046            });
1047        }
1048
1049        // Execute ALTER SINK ... SET FROM
1050        let alter_sql = format!(
1051            r#"ALTER SINK "{}"."{}"."{}" SET FROM "{}"."{}"."{}""#,
1052            sink.sink_database,
1053            sink.sink_schema,
1054            sink.sink_name,
1055            sink.dependency_database,
1056            new_schema,
1057            sink.dependency_name
1058        );
1059
1060        verbose!("  {}", alter_sql);
1061        if let Err(e) = client.execute(&alter_sql, &[]).await {
1062            return Err(CliError::SinkRepointFailed {
1063                sink: format!(
1064                    "{}.{}.{}",
1065                    sink.sink_database, sink.sink_schema, sink.sink_name
1066                ),
1067                reason: e.to_string(),
1068            });
1069        }
1070
1071        progress::success(&format!(
1072            "{}.{}.{} -> {}.{}.{}",
1073            sink.sink_database,
1074            sink.sink_schema,
1075            sink.sink_name,
1076            sink.dependency_database,
1077            new_schema,
1078            sink.dependency_name
1079        ));
1080    }
1081
1082    Ok(())
1083}
1084
1085/// Drop old production resources after the swap.
1086async fn drop_old_resources(client: &Client, plan: &DeploymentPlan) {
1087    let staging_suffix = &plan.staging_suffix;
1088
1089    // Drop schemas
1090    for sq in &plan.staging_schemas {
1091        let prod_schema = strip_staging_suffix(&sq.schema, staging_suffix);
1092        let old_schema = format!("{}{}", prod_schema, staging_suffix);
1093        let drop_sql = format!(
1094            "DROP SCHEMA IF EXISTS \"{}\".\"{}\" CASCADE;",
1095            sq.database, old_schema
1096        );
1097
1098        verbose!("  {}", drop_sql);
1099        if let Err(e) = client.execute(&drop_sql, &[]).await {
1100            info!(
1101                "warning: failed to drop old schema {}.{}: {}",
1102                sq.database, old_schema, e
1103            );
1104        }
1105    }
1106
1107    // Drop clusters
1108    for cluster in &plan.staging_clusters {
1109        let old_cluster = format!("{}{}", cluster, staging_suffix);
1110        let drop_sql = format!("DROP CLUSTER IF EXISTS \"{}\" CASCADE;", old_cluster);
1111
1112        verbose!("  {}", drop_sql);
1113        if let Err(e) = client.execute(&drop_sql, &[]).await {
1114            info!("warning: failed to drop old cluster {}: {}", old_cluster, e);
1115        }
1116    }
1117}
1118
1119#[cfg(test)]
1120mod tests {
1121    use super::strip_staging_suffix;
1122
1123    #[mz_ore::test]
1124    fn test_strip_staging_suffix_removes_suffix_once() {
1125        // Ordinary case: a single suffix is removed.
1126        assert_eq!(
1127            strip_staging_suffix("analytics_deploy123", "_deploy123"),
1128            "analytics"
1129        );
1130    }
1131
1132    #[mz_ore::test]
1133    fn test_strip_staging_suffix_strips_exactly_once() {
1134        // Regression test for QA Finding 4.
1135        //
1136        // With `--deploy-id prod`, the staging suffix is `_prod`. A production
1137        // schema that legitimately ends in `_prod` (e.g. `analytics_prod`) stages
1138        // as `analytics_prod_prod`. Recovering the production name must strip the
1139        // suffix exactly once, yielding `analytics_prod` — NOT `analytics`, which
1140        // would aim the cutover swap and the follow-up `DROP ... CASCADE` at the
1141        // wrong production schema. `str::trim_end_matches` strips every trailing
1142        // occurrence and would regress this.
1143        assert_eq!(
1144            strip_staging_suffix("analytics_prod_prod", "_prod"),
1145            "analytics_prod"
1146        );
1147    }
1148
1149    #[mz_ore::test]
1150    fn test_strip_staging_suffix_no_match_returns_input() {
1151        // A name that does not end with the suffix is returned unchanged.
1152        assert_eq!(strip_staging_suffix("analytics", "_prod"), "analytics");
1153    }
1154}