Skip to main content

mz_deploy/cli/commands/
abort.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Abort command - cleanup a staged deployment.
11
12use crate::cli::CliError;
13use crate::client::{Client, ConnectionError};
14use crate::config::Settings;
15use crate::log;
16use crate::verbose;
17use std::fmt;
18
19#[derive(serde::Serialize)]
20struct AbortResult {
21    deploy_id: String,
22    schemas_dropped: usize,
23    clusters_dropped: usize,
24}
25
26impl fmt::Display for AbortResult {
27    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
28        write!(
29            f,
30            "  \u{2713} Successfully aborted deployment '{}'",
31            self.deploy_id
32        )
33    }
34}
35
36/// Abort a staged deployment by dropping schemas, clusters, and deployment records.
37///
38/// This command:
39/// - Validates that the deployment exists and hasn't been promoted
40/// - Requires the `materialize_deployer` role
41/// - Drops all staging schemas (with _<deploy_id> suffix)
42/// - Drops all staging clusters (with _<deploy_id> suffix)
43/// - Deletes deployment tracking records
44///
45/// # Arguments
46/// * `settings` - CLI settings containing connection and directory info
47/// * `deploy_id` - Staging deployment ID to abort
48///
49/// # Returns
50/// Ok(()) if abort succeeds
51///
52/// # Errors
53/// Returns `CliError::Connection` if the deployment doesn't exist
54/// Returns `CliError::Connection` if the deployment was already promoted
55/// Returns `CliError::RoleNotAuthorized` if the user lacks the required role
56pub async fn run(settings: &Settings, deploy_id: &str) -> Result<(), CliError> {
57    let profile = settings.connection();
58
59    let client = Client::connect_with_profile(profile.clone())
60        .await
61        .map_err(CliError::Connection)?;
62
63    super::setup::verify(&client, settings.emulator()).await?;
64    let role = super::setup::validate_connection(&client, settings.emulator()).await?;
65
66    super::setup::require_deployer(role)?;
67
68    let metadata = client
69        .deployments()
70        .get_deployment_metadata(deploy_id)
71        .await
72        .map_err(CliError::Connection)?
73        .ok_or_else(|| {
74            CliError::Connection(ConnectionError::DeploymentNotFound {
75                deploy_id: deploy_id.to_string(),
76            })
77        })?;
78
79    if metadata.promoted_at.is_some() {
80        return Err(CliError::Connection(
81            ConnectionError::DeploymentAlreadyPromoted {
82                deploy_id: deploy_id.to_string(),
83            },
84        ));
85    }
86
87    // Get staging schemas and clusters
88    let staging_schemas = client
89        .introspection()
90        .get_staging_schemas(deploy_id)
91        .await?;
92
93    let staging_clusters = client
94        .introspection()
95        .get_staging_clusters(deploy_id)
96        .await?;
97
98    verbose!("Dropping staging resources:");
99    verbose!("  Schemas: {}", staging_schemas.len());
100    verbose!("  Clusters: {}", staging_clusters.len());
101    verbose!();
102
103    // Drop staging schemas
104    if !staging_schemas.is_empty() {
105        verbose!("Dropping staging schemas...");
106        client
107            .introspection()
108            .drop_staging_schemas(&staging_schemas)
109            .await?;
110        for sq in &staging_schemas {
111            verbose!("  Dropped {}.{}", sq.database, sq.schema);
112        }
113    }
114
115    // Drop staging clusters
116    if !staging_clusters.is_empty() {
117        verbose!("Dropping staging clusters...");
118        client
119            .introspection()
120            .drop_staging_clusters(&staging_clusters)
121            .await?;
122        for cluster in &staging_clusters {
123            verbose!("  Dropped {}", cluster);
124        }
125    }
126
127    // Delete deployment records
128    verbose!("Deleting deployment records...");
129
130    // Clean up cluster tracking records
131    client
132        .deployments()
133        .delete_deployment_clusters(deploy_id)
134        .await
135        .map_err(|source| CliError::DeploymentStateWriteFailed { source })?;
136
137    // Clean up pending statements (for sinks)
138    client
139        .deployments()
140        .delete_pending_statements(deploy_id)
141        .await
142        .map_err(|source| CliError::DeploymentStateWriteFailed { source })?;
143
144    // Clean up replacement MV records
145    client
146        .deployments()
147        .delete_replacement_mvs(deploy_id)
148        .await
149        .map_err(|source| CliError::DeploymentStateWriteFailed { source })?;
150
151    // Clean up apply state schemas if they exist (from interrupted apply)
152    client
153        .deployments()
154        .delete_apply_state_schemas(deploy_id)
155        .await
156        .map_err(|source| CliError::DeploymentStateWriteFailed { source })?;
157
158    client.deployments().delete_deployment(deploy_id).await?;
159
160    let result = AbortResult {
161        deploy_id: deploy_id.to_string(),
162        schemas_dropped: staging_schemas.len(),
163        clusters_dropped: staging_clusters.len(),
164    };
165    log::output(&result);
166
167    Ok(())
168}