1use crate::cli::CliError;
13use crate::cli::commands::apply_objects;
14use crate::cli::commands::grants;
15use crate::cli::executor::ObjectAction;
16use crate::cli::executor::{
17 ApplyPlan, ApplyResult, DeploymentExecutor, ObjectResult, compile_apply_project_and_connect,
18};
19use crate::client::Client;
20use crate::config::Settings;
21use crate::project;
22use crate::project::ast::Statement;
23use crate::project::ir::compiled;
24use crate::project::ir::object_id::ObjectId;
25use crate::secret_resolver::SecretResolver;
26use mz_sql_parser::ast::{
27 AlterConnectionAction, AlterConnectionStatement, ConnectionOption, ConnectionOptionName, Raw,
28 Statement as ParserStatement,
29};
30use mz_sql_parser::parser::parse_statements;
31use std::collections::{BTreeMap, BTreeSet};
32
33const PHASE_NAME: &str = "connections";
34const GRANT_KIND: grants::GrantObjectKind = grants::GrantObjectKind::Connection;
35
36fn matches(stmt: &Statement) -> bool {
37 matches!(stmt, Statement::CreateConnection(_))
38}
39
40struct Connections {
41 resolver: SecretResolver,
42}
43
44impl Connections {
45 fn new(settings: &Settings) -> Result<Self, CliError> {
46 Ok(Connections {
47 resolver: SecretResolver::new(&settings.profile_config.security),
48 })
49 }
50
51 async fn handle_existing(
52 &self,
53 client: &Client,
54 executor: &DeploymentExecutor<'_>,
55 obj_id: &ObjectId,
56 typed_obj: &compiled::DatabaseObject,
57 ) -> Result<ObjectAction, CliError> {
58 let Statement::CreateConnection(ref create_stmt) = typed_obj.stmt else {
59 unreachable!("filtered for CreateConnection");
60 };
61
62 let resolved_stmt = match self
63 .resolver
64 .resolve_statement_for_cli(&typed_obj.stmt)
65 .await?
66 {
67 Statement::CreateConnection(s) => s,
68 _ => unreachable!(),
69 };
70
71 let live_sql = client
72 .introspection()
73 .get_connection_create_sql(obj_id.expect_database(), obj_id.schema(), obj_id.object())
74 .await
75 .map_err(CliError::Connection)?;
76
77 let action = match live_sql {
78 None => {
79 executor.execute_sql(&resolved_stmt).await?;
82 ObjectAction::Created
83 }
84 Some(sql) => {
85 let live_create = parse_create_connection_sql(&sql)?;
86 let (to_set, to_drop) =
87 diff_connection_options(&resolved_stmt.values, &live_create.values);
88
89 if to_set.is_empty() && to_drop.is_empty() {
90 ObjectAction::UpToDate
91 } else {
92 let actions: Vec<AlterConnectionAction<Raw>> = to_set
93 .into_iter()
94 .map(AlterConnectionAction::SetOption)
95 .chain(to_drop.into_iter().map(AlterConnectionAction::DropOption))
96 .collect();
97
98 let alter_stmt = AlterConnectionStatement::<Raw> {
99 name: create_stmt.name.clone(),
100 if_exists: false,
101 actions,
102 with_options: vec![],
103 };
104 executor.execute_sql(&alter_stmt).await?;
105 ObjectAction::Altered
106 }
107 }
108 };
109
110 apply_objects::reconcile_grants_and_comments(
111 client,
112 executor,
113 obj_id,
114 typed_obj,
115 &GRANT_KIND,
116 )
117 .await?;
118
119 Ok(action)
120 }
121
122 async fn handle_new(
123 &self,
124 client: &Client,
125 executor: &DeploymentExecutor<'_>,
126 obj_id: &ObjectId,
127 typed_obj: &compiled::DatabaseObject,
128 ) -> Result<ObjectAction, CliError> {
129 let resolved_stmt = match self
130 .resolver
131 .resolve_statement_for_cli(&typed_obj.stmt)
132 .await?
133 {
134 Statement::CreateConnection(s) => s,
135 _ => unreachable!(),
136 };
137
138 executor.execute_sql(&resolved_stmt).await?;
139
140 apply_objects::reconcile_grants_and_comments(
141 client,
142 executor,
143 obj_id,
144 typed_obj,
145 &GRANT_KIND,
146 )
147 .await?;
148
149 Ok(ObjectAction::Created)
150 }
151}
152
153pub async fn plan(
155 settings: &Settings,
156 client: &Client,
157 executor: &DeploymentExecutor<'_>,
158 planned_project: &project::ir::graph::Project,
159 apply_plan: &mut ApplyPlan,
160) -> Result<ApplyResult, CliError> {
161 let connections = Connections::new(settings)?;
162 let mut target_ids = BTreeSet::new();
163 for obj in planned_project.iter_objects() {
164 if matches(&obj.typed_object.stmt) {
165 target_ids.insert(obj.id.clone());
166 }
167 }
168
169 if target_ids.is_empty() {
170 return Ok(ApplyResult {
171 phase: PHASE_NAME.to_string(),
172 results: vec![],
173 });
174 }
175
176 let target_objects = planned_project.get_sorted_objects_filtered(&target_ids)?;
177 let existing = client
178 .introspection()
179 .check_catalog_objects_exist(&target_ids, GRANT_KIND.catalog_table())
180 .await
181 .map_err(CliError::Connection)?;
182
183 let schemas: BTreeSet<_> = target_objects
184 .iter()
185 .filter(|(obj_id, _)| !existing.contains(obj_id))
186 .map(|(obj_id, _)| {
187 project::SchemaQualifier::new(
188 obj_id.expect_database().to_string(),
189 obj_id.schema().to_string(),
190 )
191 })
192 .collect();
193 apply_plan
194 .prepare_schemas(executor, planned_project, &schemas)
195 .await?;
196
197 let mut results = Vec::new();
198
199 for (obj_id, typed_obj) in target_objects {
200 executor.take_statements();
201 let action = if existing.contains(&obj_id) {
202 connections
203 .handle_existing(client, executor, &obj_id, typed_obj)
204 .await?
205 } else {
206 connections
207 .handle_new(client, executor, &obj_id, typed_obj)
208 .await?
209 };
210 results.push(ObjectResult {
211 object: obj_id.to_string(),
212 action,
213 statements: executor.take_statements(),
214 redacted_statements: vec![],
215 transaction_group: None,
216 });
217 }
218
219 Ok(ApplyResult {
220 phase: PHASE_NAME.to_string(),
221 results,
222 })
223}
224
225pub async fn run(settings: &Settings, dry_run: bool) -> Result<ApplyPlan, CliError> {
227 let (planned_project, client) = compile_apply_project_and_connect(settings).await?;
228 let mut apply_plan = ApplyPlan::new();
229 let executor = DeploymentExecutor::new_dry_run(&client);
230 let phase = plan(
231 settings,
232 &client,
233 &executor,
234 &planned_project,
235 &mut apply_plan,
236 )
237 .await?;
238 apply_plan.add_phase(phase);
239
240 if !dry_run {
241 apply_plan.execute(&client).await?;
242 }
243
244 Ok(apply_plan)
245}
246
247fn parse_create_connection_sql(
249 sql: &str,
250) -> Result<mz_sql_parser::ast::CreateConnectionStatement<Raw>, CliError> {
251 let stmts = parse_statements(sql).map_err(|e| {
252 CliError::Message(format!(
253 "failed to parse SHOW CREATE CONNECTION output: {}",
254 e.error
255 ))
256 })?;
257
258 let stmt = stmts
259 .into_iter()
260 .next()
261 .ok_or_else(|| CliError::Message("SHOW CREATE CONNECTION returned empty SQL".into()))?;
262
263 match stmt.ast {
264 ParserStatement::CreateConnection(c) => Ok(c),
265 other => Err(CliError::Message(format!(
266 "expected CREATE CONNECTION, got: {}",
267 other
268 ))),
269 }
270}
271
272fn diff_connection_options(
278 project_opts: &[ConnectionOption<Raw>],
279 live_opts: &[ConnectionOption<Raw>],
280) -> (Vec<ConnectionOption<Raw>>, Vec<ConnectionOptionName>) {
281 let project_map: BTreeMap<ConnectionOptionName, &ConnectionOption<Raw>> =
282 project_opts.iter().map(|o| (o.name, o)).collect();
283 let live_map: BTreeMap<ConnectionOptionName, &ConnectionOption<Raw>> =
284 live_opts.iter().map(|o| (o.name, o)).collect();
285
286 let mut to_set = Vec::new();
287 let mut to_drop = Vec::new();
288
289 for (name, project_opt) in &project_map {
291 match live_map.get(name) {
292 None => to_set.push((*project_opt).clone()),
293 Some(live_opt) => {
294 if *project_opt != *live_opt {
295 to_set.push((*project_opt).clone());
296 }
297 }
298 }
299 }
300
301 for name in live_map.keys() {
303 if !project_map.contains_key(name) {
304 to_drop.push(*name);
305 }
306 }
307
308 (to_set, to_drop)
309}
310
311#[cfg(test)]
312mod tests {
313 use super::*;
314 use mz_sql_parser::ast::WithOptionValue;
315
316 fn make_option(name: ConnectionOptionName, value: &str) -> ConnectionOption<Raw> {
317 ConnectionOption {
318 name,
319 value: Some(WithOptionValue::Value(mz_sql_parser::ast::Value::String(
320 value.to_string(),
321 ))),
322 }
323 }
324
325 #[mz_ore::test]
326 fn test_diff_no_changes() {
327 let opts = vec![
328 make_option(ConnectionOptionName::Host, "localhost"),
329 make_option(ConnectionOptionName::Port, "5432"),
330 ];
331 let (to_set, to_drop) = diff_connection_options(&opts, &opts);
332 assert!(to_set.is_empty());
333 assert!(to_drop.is_empty());
334 }
335
336 #[mz_ore::test]
337 fn test_diff_option_changed() {
338 let project = vec![make_option(
339 ConnectionOptionName::Host,
340 "new-host.example.com",
341 )];
342 let live = vec![make_option(
343 ConnectionOptionName::Host,
344 "old-host.example.com",
345 )];
346 let (to_set, to_drop) = diff_connection_options(&project, &live);
347 assert_eq!(to_set.len(), 1);
348 assert_eq!(to_set[0].name, ConnectionOptionName::Host);
349 assert!(to_drop.is_empty());
350 }
351
352 #[mz_ore::test]
353 fn test_diff_option_added() {
354 let project = vec![
355 make_option(ConnectionOptionName::Host, "localhost"),
356 make_option(ConnectionOptionName::Database, "mydb"),
357 ];
358 let live = vec![make_option(ConnectionOptionName::Host, "localhost")];
359 let (to_set, to_drop) = diff_connection_options(&project, &live);
360 assert_eq!(to_set.len(), 1);
361 assert_eq!(to_set[0].name, ConnectionOptionName::Database);
362 assert!(to_drop.is_empty());
363 }
364
365 #[mz_ore::test]
366 fn test_diff_option_dropped() {
367 let project = vec![make_option(ConnectionOptionName::Host, "localhost")];
368 let live = vec![
369 make_option(ConnectionOptionName::Host, "localhost"),
370 make_option(ConnectionOptionName::Database, "mydb"),
371 ];
372 let (to_set, to_drop) = diff_connection_options(&project, &live);
373 assert!(to_set.is_empty());
374 assert_eq!(to_drop.len(), 1);
375 assert_eq!(to_drop[0], ConnectionOptionName::Database);
376 }
377
378 #[mz_ore::test]
379 fn test_diff_multiple_changes() {
380 let project = vec![
381 make_option(ConnectionOptionName::Host, "new-host"),
382 make_option(ConnectionOptionName::Port, "5433"),
383 ];
384 let live = vec![
385 make_option(ConnectionOptionName::Host, "old-host"),
386 make_option(ConnectionOptionName::Database, "mydb"),
387 ];
388 let (to_set, to_drop) = diff_connection_options(&project, &live);
389 assert_eq!(to_set.len(), 2);
390 assert!(to_set.iter().any(|o| o.name == ConnectionOptionName::Host));
391 assert!(to_set.iter().any(|o| o.name == ConnectionOptionName::Port));
392 assert_eq!(to_drop.len(), 1);
393 assert_eq!(to_drop[0], ConnectionOptionName::Database);
394 }
395
396 #[mz_ore::test]
397 fn test_diff_secret_option_compared_structurally() {
398 use mz_sql_parser::ast::{Ident, RawItemName, UnresolvedItemName};
399
400 let secret_ref =
401 WithOptionValue::Secret(RawItemName::Name(UnresolvedItemName::qualified(&[
402 Ident::new_unchecked("db"),
403 Ident::new_unchecked("public"),
404 Ident::new_unchecked("my_pass"),
405 ])));
406 let project = vec![ConnectionOption {
407 name: ConnectionOptionName::SaslPassword,
408 value: Some(secret_ref.clone()),
409 }];
410 let live = vec![ConnectionOption {
411 name: ConnectionOptionName::SaslPassword,
412 value: Some(secret_ref),
413 }];
414 let (to_set, to_drop) = diff_connection_options(&project, &live);
415 assert!(to_set.is_empty());
416 assert!(to_drop.is_empty());
417 }
418}