Skip to main content

mz_deploy/cli/commands/
roles.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Roles apply command - converge live role state to match definitions.
11
12use crate::cli::CliError;
13use crate::cli::executor::{
14    ApplyPlan, ApplyResult, DeploymentExecutor, ObjectAction, ObjectResult, connect_apply_client,
15};
16use crate::client::Client;
17use crate::client::quote_identifier;
18use crate::config::Settings;
19use crate::project::roles::{self, RoleDefinition};
20use itertools::Itertools;
21use mz_sql_parser::ast::AlterRoleOption;
22use mz_sql_parser::ast::SetRoleVar;
23use std::collections::BTreeSet;
24
25/// Plan role changes without executing or printing.
26pub async fn plan(
27    settings: &Settings,
28    client: &Client,
29    executor: &DeploymentExecutor<'_>,
30) -> Result<ApplyResult, CliError> {
31    let profile = settings.connection();
32    let directory = &settings.directory;
33
34    let definitions = roles::load_roles(directory, &profile.name, settings.variables())?;
35
36    if definitions.is_empty() {
37        return Ok(ApplyResult {
38            phase: "roles".to_string(),
39            results: vec![],
40        });
41    }
42
43    // Pass 1: Create all roles so inter-role GRANT ROLE dependencies are satisfied.
44    let mut actions = Vec::new();
45    for def in &definitions {
46        executor.take_statements();
47        let action = create_role(client, executor, def).await?;
48        actions.push((action, executor.take_statements()));
49    }
50
51    // Pass 2: Configure each role (ALTER, GRANT, COMMENT, reconcile).
52    let mut object_results = Vec::new();
53    for (def, (action, create_stmts)) in definitions.iter().zip_eq(actions) {
54        executor.take_statements();
55        configure_role(client, executor, def).await?;
56        let mut statements = create_stmts;
57        statements.extend(executor.take_statements());
58        object_results.push(ObjectResult {
59            object: def.name.clone(),
60            action,
61            statements,
62            redacted_statements: vec![],
63            transaction_group: None,
64            post_statements: vec![],
65        });
66    }
67
68    Ok(ApplyResult {
69        phase: "roles".to_string(),
70        results: object_results,
71    })
72}
73
74/// Run the `roles apply` command: plan, render, optionally execute.
75pub async fn run(settings: &Settings, dry_run: bool) -> Result<ApplyPlan, CliError> {
76    let client = connect_apply_client(settings).await?;
77    let executor = DeploymentExecutor::new_dry_run(&client);
78    let mut plan_result = ApplyPlan::new();
79    let phase = plan(settings, &client, &executor).await?;
80    plan_result.add_phase(phase);
81
82    if !dry_run {
83        plan_result.execute(&client).await?;
84    }
85
86    Ok(plan_result)
87}
88
89/// Create a role if it doesn't already exist.
90async fn create_role(
91    client: &Client,
92    executor: &DeploymentExecutor<'_>,
93    def: &RoleDefinition,
94) -> Result<ObjectAction, CliError> {
95    let exists = client
96        .introspection()
97        .role_exists(&def.name)
98        .await
99        .map_err(CliError::Connection)?;
100
101    if exists {
102        Ok(ObjectAction::UpToDate)
103    } else {
104        executor.execute_sql(&def.create_stmt).await?;
105        Ok(ObjectAction::Created)
106    }
107}
108
109/// Configure a role: ALTER, GRANT, COMMENT statements and reconcile stale grants/params.
110async fn configure_role(
111    client: &Client,
112    executor: &DeploymentExecutor<'_>,
113    def: &RoleDefinition,
114) -> Result<(), CliError> {
115    let role_name = &def.name;
116
117    // Execute ALTER ROLE statements
118    for alter in &def.alter_stmts {
119        executor.execute_sql(alter).await?;
120    }
121
122    // Execute GRANT ROLE statements
123    for grant in &def.grants {
124        executor.execute_sql(grant).await?;
125    }
126
127    // Execute COMMENT statements
128    for comment in &def.comments {
129        executor.execute_sql(comment).await?;
130    }
131
132    // Revoke stale grants
133    let current_members = client
134        .introspection()
135        .get_role_members(role_name)
136        .await
137        .map_err(CliError::Connection)?;
138
139    let desired_members: BTreeSet<String> = def
140        .grants
141        .iter()
142        .flat_map(|g| g.member_names.iter().map(|m| m.as_str().to_lowercase()))
143        .collect();
144
145    for member in &current_members {
146        if !desired_members.contains(&member.to_lowercase()) {
147            let sql = format!(
148                "REVOKE {} FROM {}",
149                quote_identifier(role_name),
150                quote_identifier(member)
151            );
152            executor.execute_sql(&sql).await?;
153        }
154    }
155
156    // Reset stale session defaults
157    let current_params = client
158        .introspection()
159        .get_role_parameters(role_name)
160        .await
161        .map_err(CliError::Connection)?;
162
163    let desired_params: BTreeSet<String> = def
164        .alter_stmts
165        .iter()
166        .filter_map(|alter| match &alter.option {
167            AlterRoleOption::Variable(SetRoleVar::Set { name, .. }) => {
168                Some(name.as_str().to_lowercase())
169            }
170            _ => None,
171        })
172        .collect();
173
174    for param in &current_params {
175        if !desired_params.contains(&param.to_lowercase()) {
176            let sql = format!(
177                "ALTER ROLE {} RESET {}",
178                quote_identifier(role_name),
179                quote_identifier(param)
180            );
181            executor.execute_sql(&sql).await?;
182        }
183    }
184
185    Ok(())
186}