Skip to main content

mz_deploy/cli/commands/
apply_connections.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Apply connections command - create missing connections and reconcile drifted ones.
11
12use crate::cli::CliError;
13use crate::cli::commands::apply_objects;
14use crate::cli::commands::grants;
15use crate::cli::executor::ObjectAction;
16use crate::cli::executor::{
17    ApplyPlan, ApplyResult, DeploymentExecutor, ObjectResult, compile_apply_project_and_connect,
18};
19use crate::client::Client;
20use crate::config::Settings;
21use crate::project;
22use crate::project::ast::Statement;
23use crate::project::ir::compiled;
24use crate::project::ir::object_id::ObjectId;
25use crate::secret_resolver::SecretResolver;
26use mz_sql_parser::ast::{
27    AlterConnectionAction, AlterConnectionStatement, ConnectionOption, ConnectionOptionName, Raw,
28    Statement as ParserStatement,
29};
30use mz_sql_parser::parser::parse_statements;
31use std::collections::{BTreeMap, BTreeSet};
32
33const PHASE_NAME: &str = "connections";
34const GRANT_KIND: grants::GrantObjectKind = grants::GrantObjectKind::Connection;
35
36fn matches(stmt: &Statement) -> bool {
37    matches!(stmt, Statement::CreateConnection(_))
38}
39
40struct Connections {
41    resolver: SecretResolver,
42}
43
44impl Connections {
45    fn new(settings: &Settings) -> Result<Self, CliError> {
46        Ok(Connections {
47            resolver: SecretResolver::new(&settings.profile_config.security),
48        })
49    }
50
51    async fn handle_existing(
52        &self,
53        client: &Client,
54        executor: &DeploymentExecutor<'_>,
55        obj_id: &ObjectId,
56        typed_obj: &compiled::DatabaseObject,
57    ) -> Result<ObjectAction, CliError> {
58        let Statement::CreateConnection(ref create_stmt) = typed_obj.stmt else {
59            unreachable!("filtered for CreateConnection");
60        };
61
62        let resolved_stmt = match self
63            .resolver
64            .resolve_statement_for_cli(&typed_obj.stmt)
65            .await?
66        {
67            Statement::CreateConnection(s) => s,
68            _ => unreachable!(),
69        };
70
71        let live_sql = client
72            .introspection()
73            .get_connection_create_sql(obj_id.expect_database(), obj_id.schema(), obj_id.object())
74            .await
75            .map_err(CliError::Connection)?;
76
77        let action = match live_sql {
78            None => {
79                // Object was in catalog batch check but SHOW CREATE returned nothing —
80                // treat as needing creation.
81                executor.execute_sql(&resolved_stmt).await?;
82                ObjectAction::Created
83            }
84            Some(sql) => {
85                let live_create = parse_create_connection_sql(&sql)?;
86                let (to_set, to_drop) =
87                    diff_connection_options(&resolved_stmt.values, &live_create.values);
88
89                if to_set.is_empty() && to_drop.is_empty() {
90                    ObjectAction::UpToDate
91                } else {
92                    let actions: Vec<AlterConnectionAction<Raw>> = to_set
93                        .into_iter()
94                        .map(AlterConnectionAction::SetOption)
95                        .chain(to_drop.into_iter().map(AlterConnectionAction::DropOption))
96                        .collect();
97
98                    let alter_stmt = AlterConnectionStatement::<Raw> {
99                        name: create_stmt.name.clone(),
100                        if_exists: false,
101                        actions,
102                        with_options: vec![],
103                    };
104                    executor.execute_sql(&alter_stmt).await?;
105                    ObjectAction::Altered
106                }
107            }
108        };
109
110        apply_objects::reconcile_grants_and_comments(
111            client,
112            executor,
113            obj_id,
114            typed_obj,
115            &GRANT_KIND,
116        )
117        .await?;
118
119        Ok(action)
120    }
121
122    async fn handle_new(
123        &self,
124        client: &Client,
125        executor: &DeploymentExecutor<'_>,
126        obj_id: &ObjectId,
127        typed_obj: &compiled::DatabaseObject,
128    ) -> Result<ObjectAction, CliError> {
129        let resolved_stmt = match self
130            .resolver
131            .resolve_statement_for_cli(&typed_obj.stmt)
132            .await?
133        {
134            Statement::CreateConnection(s) => s,
135            _ => unreachable!(),
136        };
137
138        executor.execute_sql(&resolved_stmt).await?;
139
140        apply_objects::reconcile_grants_and_comments(
141            client,
142            executor,
143            obj_id,
144            typed_obj,
145            &GRANT_KIND,
146        )
147        .await?;
148
149        Ok(ObjectAction::Created)
150    }
151}
152
153/// Plan connection changes without executing or printing.
154pub async fn plan(
155    settings: &Settings,
156    client: &Client,
157    executor: &DeploymentExecutor<'_>,
158    planned_project: &project::ir::graph::Project,
159    apply_plan: &mut ApplyPlan,
160) -> Result<ApplyResult, CliError> {
161    let connections = Connections::new(settings)?;
162    let mut target_ids = BTreeSet::new();
163    for obj in planned_project.iter_objects() {
164        if matches(&obj.typed_object.stmt) {
165            target_ids.insert(obj.id.clone());
166        }
167    }
168
169    if target_ids.is_empty() {
170        return Ok(ApplyResult {
171            phase: PHASE_NAME.to_string(),
172            results: vec![],
173        });
174    }
175
176    let target_objects = planned_project.get_sorted_objects_filtered(&target_ids)?;
177    let existing = client
178        .introspection()
179        .check_catalog_objects_exist(&target_ids, GRANT_KIND.catalog_table())
180        .await
181        .map_err(CliError::Connection)?;
182
183    let schemas: BTreeSet<_> = target_objects
184        .iter()
185        .filter(|(obj_id, _)| !existing.contains(obj_id))
186        .map(|(obj_id, _)| {
187            project::SchemaQualifier::new(
188                obj_id.expect_database().to_string(),
189                obj_id.schema().to_string(),
190            )
191        })
192        .collect();
193    apply_plan
194        .prepare_schemas(executor, planned_project, &schemas)
195        .await?;
196
197    let mut results = Vec::new();
198
199    for (obj_id, typed_obj) in target_objects {
200        executor.take_statements();
201        let action = if existing.contains(&obj_id) {
202            connections
203                .handle_existing(client, executor, &obj_id, typed_obj)
204                .await?
205        } else {
206            connections
207                .handle_new(client, executor, &obj_id, typed_obj)
208                .await?
209        };
210        results.push(ObjectResult {
211            object: obj_id.to_string(),
212            action,
213            statements: executor.take_statements(),
214            redacted_statements: vec![],
215            transaction_group: None,
216        });
217    }
218
219    Ok(ApplyResult {
220        phase: PHASE_NAME.to_string(),
221        results,
222    })
223}
224
225/// Run the `apply connections` command: plan, render, optionally execute.
226pub async fn run(settings: &Settings, dry_run: bool) -> Result<ApplyPlan, CliError> {
227    let (planned_project, client) = compile_apply_project_and_connect(settings).await?;
228    let mut apply_plan = ApplyPlan::new();
229    let executor = DeploymentExecutor::new_dry_run(&client);
230    let phase = plan(
231        settings,
232        &client,
233        &executor,
234        &planned_project,
235        &mut apply_plan,
236    )
237    .await?;
238    apply_plan.add_phase(phase);
239
240    if !dry_run {
241        apply_plan.execute(&client).await?;
242    }
243
244    Ok(apply_plan)
245}
246
247/// Parse a `CREATE CONNECTION` SQL string back into its AST statement.
248fn parse_create_connection_sql(
249    sql: &str,
250) -> Result<mz_sql_parser::ast::CreateConnectionStatement<Raw>, CliError> {
251    let stmts = parse_statements(sql).map_err(|e| {
252        CliError::Message(format!(
253            "failed to parse SHOW CREATE CONNECTION output: {}",
254            e.error
255        ))
256    })?;
257
258    let stmt = stmts
259        .into_iter()
260        .next()
261        .ok_or_else(|| CliError::Message("SHOW CREATE CONNECTION returned empty SQL".into()))?;
262
263    match stmt.ast {
264        ParserStatement::CreateConnection(c) => Ok(c),
265        other => Err(CliError::Message(format!(
266            "expected CREATE CONNECTION, got: {}",
267            other
268        ))),
269    }
270}
271
272/// Diff two sets of connection options.
273///
274/// Returns `(to_set, to_drop)`:
275/// - `to_set`: options that need `ALTER CONNECTION ... SET`
276/// - `to_drop`: option names that need `ALTER CONNECTION ... DROP`
277fn diff_connection_options(
278    project_opts: &[ConnectionOption<Raw>],
279    live_opts: &[ConnectionOption<Raw>],
280) -> (Vec<ConnectionOption<Raw>>, Vec<ConnectionOptionName>) {
281    let project_map: BTreeMap<ConnectionOptionName, &ConnectionOption<Raw>> =
282        project_opts.iter().map(|o| (o.name, o)).collect();
283    let live_map: BTreeMap<ConnectionOptionName, &ConnectionOption<Raw>> =
284        live_opts.iter().map(|o| (o.name, o)).collect();
285
286    let mut to_set = Vec::new();
287    let mut to_drop = Vec::new();
288
289    // Options in project but not in live, or with different values → SET
290    for (name, project_opt) in &project_map {
291        match live_map.get(name) {
292            None => to_set.push((*project_opt).clone()),
293            Some(live_opt) => {
294                if *project_opt != *live_opt {
295                    to_set.push((*project_opt).clone());
296                }
297            }
298        }
299    }
300
301    // Options in live but not in project → DROP
302    for name in live_map.keys() {
303        if !project_map.contains_key(name) {
304            to_drop.push(*name);
305        }
306    }
307
308    (to_set, to_drop)
309}
310
311#[cfg(test)]
312mod tests {
313    use super::*;
314    use mz_sql_parser::ast::WithOptionValue;
315
316    fn make_option(name: ConnectionOptionName, value: &str) -> ConnectionOption<Raw> {
317        ConnectionOption {
318            name,
319            value: Some(WithOptionValue::Value(mz_sql_parser::ast::Value::String(
320                value.to_string(),
321            ))),
322        }
323    }
324
325    #[mz_ore::test]
326    fn test_diff_no_changes() {
327        let opts = vec![
328            make_option(ConnectionOptionName::Host, "localhost"),
329            make_option(ConnectionOptionName::Port, "5432"),
330        ];
331        let (to_set, to_drop) = diff_connection_options(&opts, &opts);
332        assert!(to_set.is_empty());
333        assert!(to_drop.is_empty());
334    }
335
336    #[mz_ore::test]
337    fn test_diff_option_changed() {
338        let project = vec![make_option(
339            ConnectionOptionName::Host,
340            "new-host.example.com",
341        )];
342        let live = vec![make_option(
343            ConnectionOptionName::Host,
344            "old-host.example.com",
345        )];
346        let (to_set, to_drop) = diff_connection_options(&project, &live);
347        assert_eq!(to_set.len(), 1);
348        assert_eq!(to_set[0].name, ConnectionOptionName::Host);
349        assert!(to_drop.is_empty());
350    }
351
352    #[mz_ore::test]
353    fn test_diff_option_added() {
354        let project = vec![
355            make_option(ConnectionOptionName::Host, "localhost"),
356            make_option(ConnectionOptionName::Database, "mydb"),
357        ];
358        let live = vec![make_option(ConnectionOptionName::Host, "localhost")];
359        let (to_set, to_drop) = diff_connection_options(&project, &live);
360        assert_eq!(to_set.len(), 1);
361        assert_eq!(to_set[0].name, ConnectionOptionName::Database);
362        assert!(to_drop.is_empty());
363    }
364
365    #[mz_ore::test]
366    fn test_diff_option_dropped() {
367        let project = vec![make_option(ConnectionOptionName::Host, "localhost")];
368        let live = vec![
369            make_option(ConnectionOptionName::Host, "localhost"),
370            make_option(ConnectionOptionName::Database, "mydb"),
371        ];
372        let (to_set, to_drop) = diff_connection_options(&project, &live);
373        assert!(to_set.is_empty());
374        assert_eq!(to_drop.len(), 1);
375        assert_eq!(to_drop[0], ConnectionOptionName::Database);
376    }
377
378    #[mz_ore::test]
379    fn test_diff_multiple_changes() {
380        let project = vec![
381            make_option(ConnectionOptionName::Host, "new-host"),
382            make_option(ConnectionOptionName::Port, "5433"),
383        ];
384        let live = vec![
385            make_option(ConnectionOptionName::Host, "old-host"),
386            make_option(ConnectionOptionName::Database, "mydb"),
387        ];
388        let (to_set, to_drop) = diff_connection_options(&project, &live);
389        assert_eq!(to_set.len(), 2);
390        assert!(to_set.iter().any(|o| o.name == ConnectionOptionName::Host));
391        assert!(to_set.iter().any(|o| o.name == ConnectionOptionName::Port));
392        assert_eq!(to_drop.len(), 1);
393        assert_eq!(to_drop[0], ConnectionOptionName::Database);
394    }
395
396    #[mz_ore::test]
397    fn test_diff_secret_option_compared_structurally() {
398        use mz_sql_parser::ast::{Ident, RawItemName, UnresolvedItemName};
399
400        let secret_ref =
401            WithOptionValue::Secret(RawItemName::Name(UnresolvedItemName::qualified(&[
402                Ident::new_unchecked("db"),
403                Ident::new_unchecked("public"),
404                Ident::new_unchecked("my_pass"),
405            ])));
406        let project = vec![ConnectionOption {
407            name: ConnectionOptionName::SaslPassword,
408            value: Some(secret_ref.clone()),
409        }];
410        let live = vec![ConnectionOption {
411            name: ConnectionOptionName::SaslPassword,
412            value: Some(secret_ref),
413        }];
414        let (to_set, to_drop) = diff_connection_options(&project, &live);
415        assert!(to_set.is_empty());
416        assert!(to_drop.is_empty());
417    }
418}