mz_deploy/client/
provisioning.rs1use crate::client::connection::ProvisioningClient;
32use crate::client::errors::ConnectionError;
33use crate::client::models::{ClusterConfig, ClusterOptions};
34use crate::client::quote_identifier;
35
36impl ProvisioningClient<'_> {
37 pub async fn create_database(&self, database: &str) -> Result<(), ConnectionError> {
39 let sql = format!(
40 "CREATE DATABASE IF NOT EXISTS {}",
41 quote_identifier(database)
42 );
43
44 self.client.execute(&sql, &[]).await.map_err(|e| {
45 ConnectionError::DatabaseCreationFailed {
46 database: database.to_string(),
47 source: Box::new(e),
48 }
49 })?;
50
51 Ok(())
52 }
53
54 pub async fn create_schema(&self, database: &str, schema: &str) -> Result<(), ConnectionError> {
56 let sql = format!(
57 "CREATE SCHEMA IF NOT EXISTS {}.{}",
58 quote_identifier(database),
59 quote_identifier(schema)
60 );
61
62 self.client.execute(&sql, &[]).await.map_err(|e| {
63 ConnectionError::SchemaCreationFailed {
64 database: database.to_string(),
65 schema: schema.to_string(),
66 source: Box::new(e),
67 }
68 })?;
69
70 Ok(())
71 }
72
73 pub async fn create_cluster(
75 &self,
76 name: &str,
77 options: &ClusterOptions,
78 ) -> Result<(), ConnectionError> {
79 let sql = format!(
80 "CREATE CLUSTER {} (SIZE = '{}', REPLICATION FACTOR = {})",
81 quote_identifier(name),
82 options.size,
83 options.replication_factor
84 );
85
86 self.client.execute(&sql, &[]).await.map_err(|e| {
87 if e.to_string().contains("already exists") {
88 ConnectionError::ClusterAlreadyExists {
89 name: name.to_string(),
90 }
91 } else {
92 ConnectionError::ClusterCreationFailed {
93 name: name.to_string(),
94 source: Box::new(e),
95 }
96 }
97 })?;
98
99 Ok(())
100 }
101
102 pub async fn create_cluster_with_config(
104 &self,
105 name: &str,
106 config: &ClusterConfig,
107 ) -> Result<(), ConnectionError> {
108 let grants = match config {
109 ClusterConfig::Managed { options, grants } => {
110 self.create_cluster(name, options).await?;
111 grants
112 }
113 ClusterConfig::Unmanaged { replicas, grants } => {
114 let create_cluster_sql =
115 format!("CREATE CLUSTER {} REPLICAS ()", quote_identifier(name));
116
117 self.client
118 .execute(&create_cluster_sql, &[])
119 .await
120 .map_err(|e| {
121 if e.to_string().contains("already exists") {
122 ConnectionError::ClusterAlreadyExists {
123 name: name.to_string(),
124 }
125 } else {
126 ConnectionError::ClusterCreationFailed {
127 name: name.to_string(),
128 source: Box::new(e),
129 }
130 }
131 })?;
132
133 for replica in replicas {
134 let mut options_parts = vec![format!("SIZE = '{}'", replica.size)];
135
136 if let Some(ref az) = replica.availability_zone {
137 options_parts.push(format!("AVAILABILITY ZONE '{}'", az));
138 }
139
140 let create_replica_sql = format!(
141 "CREATE CLUSTER REPLICA {}.{} ({})",
142 quote_identifier(name),
143 quote_identifier(&replica.name),
144 options_parts.join(", ")
145 );
146
147 self.client
148 .execute(&create_replica_sql, &[])
149 .await
150 .map_err(|e| ConnectionError::ClusterCreationFailed {
151 name: format!("{}.{}", name, replica.name),
152 source: Box::new(e),
153 })?;
154 }
155
156 grants
157 }
158 };
159
160 for grant in grants {
161 let sql = format!(
162 "GRANT {} ON CLUSTER {} TO {}",
163 grant.privilege_type,
164 quote_identifier(name),
165 quote_identifier(&grant.grantee)
166 );
167 self.client.execute(&sql, &[]).await.map_err(|e| {
168 ConnectionError::Message(format!(
169 "Failed to grant {} to {} on cluster '{}': {}",
170 grant.privilege_type, grant.grantee, name, e
171 ))
172 })?;
173 }
174
175 Ok(())
176 }
177
178 pub async fn alter_cluster(
180 &self,
181 name: &str,
182 options: &ClusterOptions,
183 ) -> Result<(), ConnectionError> {
184 let sql = format!(
185 "ALTER CLUSTER {} SET (SIZE = '{}', REPLICATION FACTOR = {})",
186 quote_identifier(name),
187 options.size,
188 options.replication_factor
189 );
190
191 self.client.execute(&sql, &[]).await.map_err(|e| {
192 ConnectionError::Message(format!("Failed to alter cluster '{}': {}", name, e))
193 })?;
194
195 Ok(())
196 }
197}