Skip to main content

mz_deploy/client/
provisioning.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! DDL provisioning operations.
11//!
12//! Methods on [`ProvisioningClient`] issue `CREATE … IF NOT EXISTS` and
13//! `ALTER` statements to ensure that the target region's databases, schemas,
14//! and clusters match the project definition. These are idempotent and run
15//! before any object-level deployment.
16//!
17//! ## Ordering
18//!
19//! Provisioning must follow referential order: **databases → schemas → clusters**.
20//! A schema cannot be created until its parent database exists, so callers
21//! (e.g., [`super::super::cli::executor::DeploymentExecutor`]) are responsible
22//! for invoking provisioning methods in the correct order.
23//!
24//! ## Idempotency
25//!
26//! All `create_*` methods use `IF NOT EXISTS` (or catch "already exists" errors)
27//! so that re-running provisioning on an already-provisioned environment is a
28//! no-op. `alter_cluster` is the exception — it always applies the requested
29//! configuration, which may be a no-op if options haven't changed.
30
31use crate::client::connection::ProvisioningClient;
32use crate::client::errors::ConnectionError;
33use crate::client::models::{ClusterConfig, ClusterOptions};
34use crate::client::quote_identifier;
35
36impl ProvisioningClient<'_> {
37    /// Create a database if it does not already exist.
38    pub async fn create_database(&self, database: &str) -> Result<(), ConnectionError> {
39        let sql = format!(
40            "CREATE DATABASE IF NOT EXISTS {}",
41            quote_identifier(database)
42        );
43
44        self.client.execute(&sql, &[]).await.map_err(|e| {
45            ConnectionError::DatabaseCreationFailed {
46                database: database.to_string(),
47                source: Box::new(e),
48            }
49        })?;
50
51        Ok(())
52    }
53
54    /// Create a schema in the specified database if it does not already exist.
55    pub async fn create_schema(&self, database: &str, schema: &str) -> Result<(), ConnectionError> {
56        let sql = format!(
57            "CREATE SCHEMA IF NOT EXISTS {}.{}",
58            quote_identifier(database),
59            quote_identifier(schema)
60        );
61
62        self.client.execute(&sql, &[]).await.map_err(|e| {
63            ConnectionError::SchemaCreationFailed {
64                database: database.to_string(),
65                schema: schema.to_string(),
66                source: Box::new(e),
67            }
68        })?;
69
70        Ok(())
71    }
72
73    /// Create a managed cluster with the requested size and replication factor.
74    pub async fn create_cluster(
75        &self,
76        name: &str,
77        options: &ClusterOptions,
78    ) -> Result<(), ConnectionError> {
79        let sql = format!(
80            "CREATE CLUSTER {} (SIZE = '{}', REPLICATION FACTOR = {})",
81            quote_identifier(name),
82            options.size,
83            options.replication_factor
84        );
85
86        self.client.execute(&sql, &[]).await.map_err(|e| {
87            if e.to_string().contains("already exists") {
88                ConnectionError::ClusterAlreadyExists {
89                    name: name.to_string(),
90                }
91            } else {
92                ConnectionError::ClusterCreationFailed {
93                    name: name.to_string(),
94                    source: Box::new(e),
95                }
96            }
97        })?;
98
99        Ok(())
100    }
101
102    /// Create a cluster from a captured cluster configuration.
103    pub async fn create_cluster_with_config(
104        &self,
105        name: &str,
106        config: &ClusterConfig,
107    ) -> Result<(), ConnectionError> {
108        let grants = match config {
109            ClusterConfig::Managed { options, grants } => {
110                self.create_cluster(name, options).await?;
111                grants
112            }
113            ClusterConfig::Unmanaged { replicas, grants } => {
114                let create_cluster_sql =
115                    format!("CREATE CLUSTER {} REPLICAS ()", quote_identifier(name));
116
117                self.client
118                    .execute(&create_cluster_sql, &[])
119                    .await
120                    .map_err(|e| {
121                        if e.to_string().contains("already exists") {
122                            ConnectionError::ClusterAlreadyExists {
123                                name: name.to_string(),
124                            }
125                        } else {
126                            ConnectionError::ClusterCreationFailed {
127                                name: name.to_string(),
128                                source: Box::new(e),
129                            }
130                        }
131                    })?;
132
133                for replica in replicas {
134                    let mut options_parts = vec![format!("SIZE = '{}'", replica.size)];
135
136                    if let Some(ref az) = replica.availability_zone {
137                        options_parts.push(format!("AVAILABILITY ZONE '{}'", az));
138                    }
139
140                    let create_replica_sql = format!(
141                        "CREATE CLUSTER REPLICA {}.{} ({})",
142                        quote_identifier(name),
143                        quote_identifier(&replica.name),
144                        options_parts.join(", ")
145                    );
146
147                    self.client
148                        .execute(&create_replica_sql, &[])
149                        .await
150                        .map_err(|e| ConnectionError::ClusterCreationFailed {
151                            name: format!("{}.{}", name, replica.name),
152                            source: Box::new(e),
153                        })?;
154                }
155
156                grants
157            }
158        };
159
160        for grant in grants {
161            let sql = format!(
162                "GRANT {} ON CLUSTER {} TO {}",
163                grant.privilege_type,
164                quote_identifier(name),
165                quote_identifier(&grant.grantee)
166            );
167            self.client.execute(&sql, &[]).await.map_err(|e| {
168                ConnectionError::Message(format!(
169                    "Failed to grant {} to {} on cluster '{}': {}",
170                    grant.privilege_type, grant.grantee, name, e
171                ))
172            })?;
173        }
174
175        Ok(())
176    }
177
178    /// Update options for an existing managed cluster.
179    pub async fn alter_cluster(
180        &self,
181        name: &str,
182        options: &ClusterOptions,
183    ) -> Result<(), ConnectionError> {
184        let sql = format!(
185            "ALTER CLUSTER {} SET (SIZE = '{}', REPLICATION FACTOR = {})",
186            quote_identifier(name),
187            options.size,
188            options.replication_factor
189        );
190
191        self.client.execute(&sql, &[]).await.map_err(|e| {
192            ConnectionError::Message(format!("Failed to alter cluster '{}': {}", name, e))
193        })?;
194
195        Ok(())
196    }
197}