Skip to main content

mz_deploy/cli/commands/
clusters.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Clusters apply command - converge live cluster state to match definitions.
11
12use crate::cli::CliError;
13use crate::cli::commands::grants;
14use crate::cli::executor::{
15    ApplyPlan, ApplyResult, DeploymentExecutor, ObjectAction, ObjectResult, connect_apply_client,
16};
17use crate::client::{Client, ClusterOptions, quote_identifier};
18use crate::config::Settings;
19use crate::project::clusters::{self, ClusterDefinition, extract_replication_factor, extract_size};
20
21/// Plan cluster changes without executing or printing.
22pub async fn plan(
23    settings: &Settings,
24    client: &Client,
25    executor: &DeploymentExecutor<'_>,
26) -> Result<ApplyResult, CliError> {
27    let profile = settings.connection();
28    let directory = &settings.directory;
29
30    let definitions = clusters::load_clusters(
31        directory,
32        &profile.name,
33        settings.profile_suffix(),
34        settings.variables(),
35    )?;
36
37    if definitions.is_empty() {
38        return Ok(ApplyResult {
39            phase: "clusters".to_string(),
40            results: vec![],
41        });
42    }
43
44    let mut object_results = Vec::new();
45    for def in &definitions {
46        let obj_result = plan_cluster(client, executor, def).await?;
47        object_results.push(obj_result);
48    }
49
50    Ok(ApplyResult {
51        phase: "clusters".to_string(),
52        results: object_results,
53    })
54}
55
56/// Run the `clusters apply` command: plan, render, optionally execute.
57pub async fn run(settings: &Settings, dry_run: bool) -> Result<ApplyPlan, CliError> {
58    let client = connect_apply_client(settings).await?;
59    let executor = DeploymentExecutor::new_dry_run(&client);
60    let mut plan_result = ApplyPlan::new();
61    let phase = plan(settings, &client, &executor).await?;
62    plan_result.add_phase(phase);
63
64    if !dry_run {
65        plan_result.execute(&client).await?;
66    }
67
68    Ok(plan_result)
69}
70
71/// Plan a single cluster definition: create if missing, alter if drifted,
72/// then plan grants, revocations, and comments.
73async fn plan_cluster(
74    client: &Client,
75    executor: &DeploymentExecutor<'_>,
76    def: &ClusterDefinition,
77) -> Result<ObjectResult, CliError> {
78    let cluster_name = &def.name;
79
80    // Drain any prior statements
81    executor.take_statements();
82
83    // Check if cluster already exists
84    let existing = client
85        .introspection()
86        .get_cluster(cluster_name)
87        .await
88        .map_err(CliError::Connection)?;
89
90    let action = match existing {
91        None => {
92            executor.execute_sql(&def.create_stmt).await?;
93            ObjectAction::Created
94        }
95        Some(existing_cluster) => {
96            let desired_size = extract_size(&def.create_stmt);
97            let desired_rf = extract_replication_factor(&def.create_stmt);
98
99            let needs_alter = {
100                let size_differs = desired_size.as_deref() != existing_cluster.size.as_deref();
101                let rf_differs = desired_rf.map(i64::from) != existing_cluster.replication_factor;
102                size_differs || rf_differs
103            };
104
105            if needs_alter {
106                let size = desired_size.unwrap_or_else(|| {
107                    existing_cluster
108                        .size
109                        .clone()
110                        .unwrap_or_else(|| "25cc".to_string())
111                });
112                let rf = desired_rf.unwrap_or_else(|| {
113                    existing_cluster
114                        .replication_factor
115                        .unwrap_or(1)
116                        .try_into()
117                        .unwrap_or(1)
118                });
119
120                let options = ClusterOptions {
121                    size,
122                    replication_factor: rf,
123                };
124                let alter_sql = format!(
125                    "ALTER CLUSTER {} SET (SIZE = '{}', REPLICATION FACTOR = {})",
126                    quote_identifier(cluster_name),
127                    options.size,
128                    options.replication_factor
129                );
130                executor.execute_sql(&alter_sql).await?;
131                ObjectAction::Altered
132            } else {
133                ObjectAction::UpToDate
134            }
135        }
136    };
137
138    // Reconcile grants
139    grants::reconcile_named_object(
140        client,
141        executor,
142        cluster_name,
143        &def.grants,
144        &grants::GrantNamedObjectKind::Cluster,
145    )
146    .await?;
147
148    // Execute COMMENT statements
149    for comment in &def.comments {
150        executor.execute_sql(comment).await?;
151    }
152
153    Ok(ObjectResult {
154        object: cluster_name.clone(),
155        action,
156        statements: executor.take_statements(),
157        redacted_statements: vec![],
158        transaction_group: None,
159    })
160}