1use crate::cli::CliError;
13use crate::cli::commands::apply_objects;
14use crate::cli::commands::grants;
15use crate::cli::executor::ObjectAction;
16use crate::cli::executor::{
17 ApplyPlan, ApplyResult, DeploymentExecutor, ObjectResult, compile_apply_project_and_connect,
18};
19use crate::client::Client;
20use crate::config::Settings;
21use crate::project;
22use crate::project::ast::Statement;
23use crate::project::ir::compiled;
24use crate::project::ir::object_id::ObjectId;
25use crate::secret_resolver::SecretResolver;
26use mz_sql_parser::ast::{
27 AlterConnectionAction, AlterConnectionStatement, ConnectionOption, ConnectionOptionName, Raw,
28 Statement as ParserStatement,
29};
30use mz_sql_parser::parser::parse_statements;
31use std::collections::{BTreeMap, BTreeSet};
32
33const PHASE_NAME: &str = "connections";
34const GRANT_KIND: grants::GrantObjectKind = grants::GrantObjectKind::Connection;
35
36fn matches(stmt: &Statement) -> bool {
37 matches!(stmt, Statement::CreateConnection(_))
38}
39
40struct Connections {
41 resolver: SecretResolver,
42}
43
44impl Connections {
45 fn new(settings: &Settings) -> Result<Self, CliError> {
46 Ok(Connections {
47 resolver: SecretResolver::new(&settings.profile_config.security),
48 })
49 }
50
51 async fn handle_existing(
52 &self,
53 client: &Client,
54 executor: &DeploymentExecutor<'_>,
55 obj_id: &ObjectId,
56 typed_obj: &compiled::DatabaseObject,
57 ) -> Result<ObjectAction, CliError> {
58 let Statement::CreateConnection(ref create_stmt) = typed_obj.stmt else {
59 unreachable!("filtered for CreateConnection");
60 };
61
62 let resolved_stmt = match self
63 .resolver
64 .resolve_statement_for_cli(&typed_obj.stmt)
65 .await?
66 {
67 Statement::CreateConnection(s) => s,
68 _ => unreachable!(),
69 };
70
71 let live_sql = client
72 .introspection()
73 .get_connection_create_sql(obj_id.expect_database(), obj_id.schema(), obj_id.object())
74 .await
75 .map_err(CliError::Connection)?;
76
77 let action = match live_sql {
78 None => {
79 executor.execute_sql(&resolved_stmt).await?;
82 ObjectAction::Created
83 }
84 Some(sql) => {
85 let live_create = parse_create_connection_sql(&sql)?;
86 let (to_set, to_drop) =
87 diff_connection_options(&resolved_stmt.values, &live_create.values);
88
89 if to_set.is_empty() && to_drop.is_empty() {
90 ObjectAction::UpToDate
91 } else {
92 let actions: Vec<AlterConnectionAction<Raw>> = to_set
93 .into_iter()
94 .map(AlterConnectionAction::SetOption)
95 .chain(to_drop.into_iter().map(AlterConnectionAction::DropOption))
96 .collect();
97
98 let alter_stmt = AlterConnectionStatement::<Raw> {
99 name: create_stmt.name.clone(),
100 if_exists: false,
101 actions,
102 with_options: vec![],
103 };
104 executor.execute_sql(&alter_stmt).await?;
105 ObjectAction::Altered
106 }
107 }
108 };
109
110 apply_objects::reconcile_grants_and_comments(
111 client,
112 executor,
113 obj_id,
114 typed_obj,
115 &GRANT_KIND,
116 )
117 .await?;
118
119 Ok(action)
120 }
121
122 async fn handle_new(
123 &self,
124 client: &Client,
125 executor: &DeploymentExecutor<'_>,
126 obj_id: &ObjectId,
127 typed_obj: &compiled::DatabaseObject,
128 ) -> Result<ObjectAction, CliError> {
129 let resolved_stmt = match self
130 .resolver
131 .resolve_statement_for_cli(&typed_obj.stmt)
132 .await?
133 {
134 Statement::CreateConnection(s) => s,
135 _ => unreachable!(),
136 };
137
138 executor.execute_sql(&resolved_stmt).await?;
139
140 apply_objects::reconcile_grants_and_comments(
141 client,
142 executor,
143 obj_id,
144 typed_obj,
145 &GRANT_KIND,
146 )
147 .await?;
148
149 Ok(ObjectAction::Created)
150 }
151}
152
153pub async fn plan(
155 settings: &Settings,
156 client: &Client,
157 executor: &DeploymentExecutor<'_>,
158 planned_project: &project::ir::graph::Project,
159 apply_plan: &mut ApplyPlan,
160) -> Result<ApplyResult, CliError> {
161 let connections = Connections::new(settings)?;
162 let mut target_ids = BTreeSet::new();
163 for obj in planned_project.iter_objects() {
164 if matches(&obj.typed_object.stmt) {
165 target_ids.insert(obj.id.clone());
166 }
167 }
168
169 if target_ids.is_empty() {
170 return Ok(ApplyResult {
171 phase: PHASE_NAME.to_string(),
172 results: vec![],
173 });
174 }
175
176 let target_objects = planned_project.get_sorted_objects_filtered(&target_ids)?;
177 let existing = client
178 .introspection()
179 .check_catalog_objects_exist(&target_ids, GRANT_KIND.catalog_table())
180 .await
181 .map_err(CliError::Connection)?;
182
183 let schemas: BTreeSet<_> = target_objects
184 .iter()
185 .filter(|(obj_id, _)| !existing.contains(obj_id))
186 .map(|(obj_id, _)| {
187 project::SchemaQualifier::new(
188 obj_id.expect_database().to_string(),
189 obj_id.schema().to_string(),
190 )
191 })
192 .collect();
193 apply_plan
194 .prepare_schemas(executor, planned_project, &schemas)
195 .await?;
196
197 let mut results = Vec::new();
198
199 for (obj_id, typed_obj) in target_objects {
200 executor.take_statements();
201 let action = if existing.contains(&obj_id) {
202 connections
203 .handle_existing(client, executor, &obj_id, typed_obj)
204 .await?
205 } else {
206 connections
207 .handle_new(client, executor, &obj_id, typed_obj)
208 .await?
209 };
210 results.push(ObjectResult {
211 object: obj_id.to_string(),
212 action,
213 statements: executor.take_statements(),
214 redacted_statements: vec![],
215 transaction_group: None,
216 post_statements: vec![],
217 });
218 }
219
220 Ok(ApplyResult {
221 phase: PHASE_NAME.to_string(),
222 results,
223 })
224}
225
226pub async fn run(settings: &Settings, dry_run: bool) -> Result<ApplyPlan, CliError> {
228 let (planned_project, client) = compile_apply_project_and_connect(settings).await?;
229 let mut apply_plan = ApplyPlan::new();
230 let executor = DeploymentExecutor::new_dry_run(&client);
231 let phase = plan(
232 settings,
233 &client,
234 &executor,
235 &planned_project,
236 &mut apply_plan,
237 )
238 .await?;
239 apply_plan.add_phase(phase);
240
241 if !dry_run {
242 apply_plan.execute(&client).await?;
243 }
244
245 Ok(apply_plan)
246}
247
248fn parse_create_connection_sql(
250 sql: &str,
251) -> Result<mz_sql_parser::ast::CreateConnectionStatement<Raw>, CliError> {
252 let stmts = parse_statements(sql).map_err(|e| {
253 CliError::Message(format!(
254 "failed to parse SHOW CREATE CONNECTION output: {}",
255 e.error
256 ))
257 })?;
258
259 let stmt = stmts
260 .into_iter()
261 .next()
262 .ok_or_else(|| CliError::Message("SHOW CREATE CONNECTION returned empty SQL".into()))?;
263
264 match stmt.ast {
265 ParserStatement::CreateConnection(c) => Ok(c),
266 other => Err(CliError::Message(format!(
267 "expected CREATE CONNECTION, got: {}",
268 other
269 ))),
270 }
271}
272
273fn diff_connection_options(
279 project_opts: &[ConnectionOption<Raw>],
280 live_opts: &[ConnectionOption<Raw>],
281) -> (Vec<ConnectionOption<Raw>>, Vec<ConnectionOptionName>) {
282 let project_map: BTreeMap<ConnectionOptionName, &ConnectionOption<Raw>> =
283 project_opts.iter().map(|o| (o.name, o)).collect();
284 let live_map: BTreeMap<ConnectionOptionName, &ConnectionOption<Raw>> =
285 live_opts.iter().map(|o| (o.name, o)).collect();
286
287 let mut to_set = Vec::new();
288 let mut to_drop = Vec::new();
289
290 for (name, project_opt) in &project_map {
292 match live_map.get(name) {
293 None => to_set.push((*project_opt).clone()),
294 Some(live_opt) => {
295 if *project_opt != *live_opt {
296 to_set.push((*project_opt).clone());
297 }
298 }
299 }
300 }
301
302 for name in live_map.keys() {
304 if !project_map.contains_key(name) {
305 to_drop.push(*name);
306 }
307 }
308
309 (to_set, to_drop)
310}
311
312#[cfg(test)]
313mod tests {
314 use super::*;
315 use mz_sql_parser::ast::WithOptionValue;
316
317 fn make_option(name: ConnectionOptionName, value: &str) -> ConnectionOption<Raw> {
318 ConnectionOption {
319 name,
320 value: Some(WithOptionValue::Value(mz_sql_parser::ast::Value::String(
321 value.to_string(),
322 ))),
323 }
324 }
325
326 #[mz_ore::test]
327 fn test_diff_no_changes() {
328 let opts = vec![
329 make_option(ConnectionOptionName::Host, "localhost"),
330 make_option(ConnectionOptionName::Port, "5432"),
331 ];
332 let (to_set, to_drop) = diff_connection_options(&opts, &opts);
333 assert!(to_set.is_empty());
334 assert!(to_drop.is_empty());
335 }
336
337 #[mz_ore::test]
338 fn test_diff_option_changed() {
339 let project = vec![make_option(
340 ConnectionOptionName::Host,
341 "new-host.example.com",
342 )];
343 let live = vec![make_option(
344 ConnectionOptionName::Host,
345 "old-host.example.com",
346 )];
347 let (to_set, to_drop) = diff_connection_options(&project, &live);
348 assert_eq!(to_set.len(), 1);
349 assert_eq!(to_set[0].name, ConnectionOptionName::Host);
350 assert!(to_drop.is_empty());
351 }
352
353 #[mz_ore::test]
354 fn test_diff_option_added() {
355 let project = vec![
356 make_option(ConnectionOptionName::Host, "localhost"),
357 make_option(ConnectionOptionName::Database, "mydb"),
358 ];
359 let live = vec![make_option(ConnectionOptionName::Host, "localhost")];
360 let (to_set, to_drop) = diff_connection_options(&project, &live);
361 assert_eq!(to_set.len(), 1);
362 assert_eq!(to_set[0].name, ConnectionOptionName::Database);
363 assert!(to_drop.is_empty());
364 }
365
366 #[mz_ore::test]
367 fn test_diff_option_dropped() {
368 let project = vec![make_option(ConnectionOptionName::Host, "localhost")];
369 let live = vec![
370 make_option(ConnectionOptionName::Host, "localhost"),
371 make_option(ConnectionOptionName::Database, "mydb"),
372 ];
373 let (to_set, to_drop) = diff_connection_options(&project, &live);
374 assert!(to_set.is_empty());
375 assert_eq!(to_drop.len(), 1);
376 assert_eq!(to_drop[0], ConnectionOptionName::Database);
377 }
378
379 #[mz_ore::test]
380 fn test_diff_multiple_changes() {
381 let project = vec![
382 make_option(ConnectionOptionName::Host, "new-host"),
383 make_option(ConnectionOptionName::Port, "5433"),
384 ];
385 let live = vec![
386 make_option(ConnectionOptionName::Host, "old-host"),
387 make_option(ConnectionOptionName::Database, "mydb"),
388 ];
389 let (to_set, to_drop) = diff_connection_options(&project, &live);
390 assert_eq!(to_set.len(), 2);
391 assert!(to_set.iter().any(|o| o.name == ConnectionOptionName::Host));
392 assert!(to_set.iter().any(|o| o.name == ConnectionOptionName::Port));
393 assert_eq!(to_drop.len(), 1);
394 assert_eq!(to_drop[0], ConnectionOptionName::Database);
395 }
396
397 #[mz_ore::test]
398 fn test_diff_secret_option_compared_structurally() {
399 use mz_sql_parser::ast::{Ident, RawItemName, UnresolvedItemName};
400
401 let secret_ref =
402 WithOptionValue::Secret(RawItemName::Name(UnresolvedItemName::qualified(&[
403 Ident::new_unchecked("db"),
404 Ident::new_unchecked("public"),
405 Ident::new_unchecked("my_pass"),
406 ])));
407 let project = vec![ConnectionOption {
408 name: ConnectionOptionName::SaslPassword,
409 value: Some(secret_ref.clone()),
410 }];
411 let live = vec![ConnectionOption {
412 name: ConnectionOptionName::SaslPassword,
413 value: Some(secret_ref),
414 }];
415 let (to_set, to_drop) = diff_connection_options(&project, &live);
416 assert!(to_set.is_empty());
417 assert!(to_drop.is_empty());
418 }
419}