mz_deploy/cli/commands/
clusters.rs1use crate::cli::CliError;
13use crate::cli::commands::grants;
14use crate::cli::executor::{
15 ApplyPlan, ApplyResult, DeploymentExecutor, ObjectAction, ObjectResult, connect_apply_client,
16};
17use crate::client::{Client, ClusterOptions, quote_identifier};
18use crate::config::Settings;
19use crate::project::clusters::{self, ClusterDefinition, extract_replication_factor, extract_size};
20
21pub async fn plan(
23 settings: &Settings,
24 client: &Client,
25 executor: &DeploymentExecutor<'_>,
26) -> Result<ApplyResult, CliError> {
27 let profile = settings.connection();
28 let directory = &settings.directory;
29
30 let definitions = clusters::load_clusters(
31 directory,
32 &profile.name,
33 settings.profile_suffix(),
34 settings.variables(),
35 )?;
36
37 if definitions.is_empty() {
38 return Ok(ApplyResult {
39 phase: "clusters".to_string(),
40 results: vec![],
41 });
42 }
43
44 let mut object_results = Vec::new();
45 for def in &definitions {
46 let obj_result = plan_cluster(client, executor, def).await?;
47 object_results.push(obj_result);
48 }
49
50 Ok(ApplyResult {
51 phase: "clusters".to_string(),
52 results: object_results,
53 })
54}
55
56pub async fn run(settings: &Settings, dry_run: bool) -> Result<ApplyPlan, CliError> {
58 let client = connect_apply_client(settings).await?;
59 let executor = DeploymentExecutor::new_dry_run(&client);
60 let mut plan_result = ApplyPlan::new();
61 let phase = plan(settings, &client, &executor).await?;
62 plan_result.add_phase(phase);
63
64 if !dry_run {
65 plan_result.execute(&client).await?;
66 }
67
68 Ok(plan_result)
69}
70
71async fn plan_cluster(
74 client: &Client,
75 executor: &DeploymentExecutor<'_>,
76 def: &ClusterDefinition,
77) -> Result<ObjectResult, CliError> {
78 let cluster_name = &def.name;
79
80 executor.take_statements();
82
83 let existing = client
85 .introspection()
86 .get_cluster(cluster_name)
87 .await
88 .map_err(CliError::Connection)?;
89
90 let action = match existing {
91 None => {
92 executor.execute_sql(&def.create_stmt).await?;
93 ObjectAction::Created
94 }
95 Some(existing_cluster) => {
96 let desired_size = extract_size(&def.create_stmt);
97 let desired_rf = extract_replication_factor(&def.create_stmt);
98
99 let needs_alter = {
100 let size_differs = desired_size.as_deref() != existing_cluster.size.as_deref();
101 let rf_differs = desired_rf.map(i64::from) != existing_cluster.replication_factor;
102 size_differs || rf_differs
103 };
104
105 if needs_alter {
106 let size = desired_size.unwrap_or_else(|| {
107 existing_cluster
108 .size
109 .clone()
110 .unwrap_or_else(|| "25cc".to_string())
111 });
112 let rf = desired_rf.unwrap_or_else(|| {
113 existing_cluster
114 .replication_factor
115 .unwrap_or(1)
116 .try_into()
117 .unwrap_or(1)
118 });
119
120 let options = ClusterOptions {
121 size,
122 replication_factor: rf,
123 };
124 let alter_sql = format!(
125 "ALTER CLUSTER {} SET (SIZE = '{}', REPLICATION FACTOR = {})",
126 quote_identifier(cluster_name),
127 options.size,
128 options.replication_factor
129 );
130 executor.execute_sql(&alter_sql).await?;
131 ObjectAction::Altered
132 } else {
133 ObjectAction::UpToDate
134 }
135 }
136 };
137
138 grants::reconcile_named_object(
140 client,
141 executor,
142 cluster_name,
143 &def.grants,
144 &grants::GrantNamedObjectKind::Cluster,
145 )
146 .await?;
147
148 for comment in &def.comments {
150 executor.execute_sql(comment).await?;
151 }
152
153 Ok(ObjectResult {
154 object: cluster_name.clone(),
155 action,
156 statements: executor.take_statements(),
157 redacted_statements: vec![],
158 transaction_group: None,
159 })
160}