mz_deploy/cli/commands/
abort.rs1use crate::cli::CliError;
13use crate::client::{Client, ConnectionError};
14use crate::config::Settings;
15use crate::log;
16use crate::verbose;
17use std::fmt;
18
19#[derive(serde::Serialize)]
20struct AbortResult {
21 deploy_id: String,
22 schemas_dropped: usize,
23 clusters_dropped: usize,
24}
25
26impl fmt::Display for AbortResult {
27 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
28 write!(
29 f,
30 " \u{2713} Successfully aborted deployment '{}'",
31 self.deploy_id
32 )
33 }
34}
35
36pub async fn run(settings: &Settings, deploy_id: &str) -> Result<(), CliError> {
57 let profile = settings.connection();
58
59 let client = Client::connect_with_profile(profile.clone())
60 .await
61 .map_err(CliError::Connection)?;
62
63 super::setup::verify(&client, settings.emulator()).await?;
64 let role = super::setup::validate_connection(&client, settings.emulator()).await?;
65
66 super::setup::require_deployer(role)?;
67
68 let metadata = client
69 .deployments()
70 .get_deployment_metadata(deploy_id)
71 .await
72 .map_err(CliError::Connection)?
73 .ok_or_else(|| {
74 CliError::Connection(ConnectionError::DeploymentNotFound {
75 deploy_id: deploy_id.to_string(),
76 })
77 })?;
78
79 if metadata.promoted_at.is_some() {
80 return Err(CliError::Connection(
81 ConnectionError::DeploymentAlreadyPromoted {
82 deploy_id: deploy_id.to_string(),
83 },
84 ));
85 }
86
87 let staging_schemas = client
89 .introspection()
90 .get_staging_schemas(deploy_id)
91 .await?;
92
93 let staging_clusters = client
94 .introspection()
95 .get_staging_clusters(deploy_id)
96 .await?;
97
98 verbose!("Dropping staging resources:");
99 verbose!(" Schemas: {}", staging_schemas.len());
100 verbose!(" Clusters: {}", staging_clusters.len());
101 verbose!();
102
103 if !staging_schemas.is_empty() {
105 verbose!("Dropping staging schemas...");
106 client
107 .introspection()
108 .drop_staging_schemas(&staging_schemas)
109 .await?;
110 for sq in &staging_schemas {
111 verbose!(" Dropped {}.{}", sq.database, sq.schema);
112 }
113 }
114
115 if !staging_clusters.is_empty() {
117 verbose!("Dropping staging clusters...");
118 client
119 .introspection()
120 .drop_staging_clusters(&staging_clusters)
121 .await?;
122 for cluster in &staging_clusters {
123 verbose!(" Dropped {}", cluster);
124 }
125 }
126
127 verbose!("Deleting deployment records...");
129
130 client
132 .deployments()
133 .delete_deployment_clusters(deploy_id)
134 .await
135 .map_err(|source| CliError::DeploymentStateWriteFailed { source })?;
136
137 client
139 .deployments()
140 .delete_pending_statements(deploy_id)
141 .await
142 .map_err(|source| CliError::DeploymentStateWriteFailed { source })?;
143
144 client
146 .deployments()
147 .delete_replacement_mvs(deploy_id)
148 .await
149 .map_err(|source| CliError::DeploymentStateWriteFailed { source })?;
150
151 client
153 .deployments()
154 .delete_apply_state_schemas(deploy_id)
155 .await
156 .map_err(|source| CliError::DeploymentStateWriteFailed { source })?;
157
158 client.deployments().delete_deployment(deploy_id).await?;
159
160 let result = AbortResult {
161 deploy_id: deploy_id.to_string(),
162 schemas_dropped: staging_schemas.len(),
163 clusters_dropped: staging_clusters.len(),
164 };
165 log::output(&result);
166
167 Ok(())
168}