mz_deploy/cli/commands/
roles.rs1use crate::cli::CliError;
13use crate::cli::executor::{
14 ApplyPlan, ApplyResult, DeploymentExecutor, ObjectAction, ObjectResult, connect_apply_client,
15};
16use crate::client::Client;
17use crate::client::quote_identifier;
18use crate::config::Settings;
19use crate::project::roles::{self, RoleDefinition};
20use itertools::Itertools;
21use mz_sql_parser::ast::AlterRoleOption;
22use mz_sql_parser::ast::SetRoleVar;
23use std::collections::BTreeSet;
24
25pub async fn plan(
27 settings: &Settings,
28 client: &Client,
29 executor: &DeploymentExecutor<'_>,
30) -> Result<ApplyResult, CliError> {
31 let profile = settings.connection();
32 let directory = &settings.directory;
33
34 let definitions = roles::load_roles(directory, &profile.name, settings.variables())?;
35
36 if definitions.is_empty() {
37 return Ok(ApplyResult {
38 phase: "roles".to_string(),
39 results: vec![],
40 });
41 }
42
43 let mut actions = Vec::new();
45 for def in &definitions {
46 executor.take_statements();
47 let action = create_role(client, executor, def).await?;
48 actions.push((action, executor.take_statements()));
49 }
50
51 let mut object_results = Vec::new();
53 for (def, (action, create_stmts)) in definitions.iter().zip_eq(actions) {
54 executor.take_statements();
55 configure_role(client, executor, def).await?;
56 let mut statements = create_stmts;
57 statements.extend(executor.take_statements());
58 object_results.push(ObjectResult {
59 object: def.name.clone(),
60 action,
61 statements,
62 redacted_statements: vec![],
63 transaction_group: None,
64 });
65 }
66
67 Ok(ApplyResult {
68 phase: "roles".to_string(),
69 results: object_results,
70 })
71}
72
73pub async fn run(settings: &Settings, dry_run: bool) -> Result<ApplyPlan, CliError> {
75 let client = connect_apply_client(settings).await?;
76 let executor = DeploymentExecutor::new_dry_run(&client);
77 let mut plan_result = ApplyPlan::new();
78 let phase = plan(settings, &client, &executor).await?;
79 plan_result.add_phase(phase);
80
81 if !dry_run {
82 plan_result.execute(&client).await?;
83 }
84
85 Ok(plan_result)
86}
87
88async fn create_role(
90 client: &Client,
91 executor: &DeploymentExecutor<'_>,
92 def: &RoleDefinition,
93) -> Result<ObjectAction, CliError> {
94 let exists = client
95 .introspection()
96 .role_exists(&def.name)
97 .await
98 .map_err(CliError::Connection)?;
99
100 if exists {
101 Ok(ObjectAction::UpToDate)
102 } else {
103 executor.execute_sql(&def.create_stmt).await?;
104 Ok(ObjectAction::Created)
105 }
106}
107
108async fn configure_role(
110 client: &Client,
111 executor: &DeploymentExecutor<'_>,
112 def: &RoleDefinition,
113) -> Result<(), CliError> {
114 let role_name = &def.name;
115
116 for alter in &def.alter_stmts {
118 executor.execute_sql(alter).await?;
119 }
120
121 for grant in &def.grants {
123 executor.execute_sql(grant).await?;
124 }
125
126 for comment in &def.comments {
128 executor.execute_sql(comment).await?;
129 }
130
131 let current_members = client
133 .introspection()
134 .get_role_members(role_name)
135 .await
136 .map_err(CliError::Connection)?;
137
138 let desired_members: BTreeSet<String> = def
139 .grants
140 .iter()
141 .flat_map(|g| g.member_names.iter().map(|m| m.as_str().to_lowercase()))
142 .collect();
143
144 for member in ¤t_members {
145 if !desired_members.contains(&member.to_lowercase()) {
146 let sql = format!(
147 "REVOKE {} FROM {}",
148 quote_identifier(role_name),
149 quote_identifier(member)
150 );
151 executor.execute_sql(&sql).await?;
152 }
153 }
154
155 let current_params = client
157 .introspection()
158 .get_role_parameters(role_name)
159 .await
160 .map_err(CliError::Connection)?;
161
162 let desired_params: BTreeSet<String> = def
163 .alter_stmts
164 .iter()
165 .filter_map(|alter| match &alter.option {
166 AlterRoleOption::Variable(SetRoleVar::Set { name, .. }) => {
167 Some(name.as_str().to_lowercase())
168 }
169 _ => None,
170 })
171 .collect();
172
173 for param in ¤t_params {
174 if !desired_params.contains(¶m.to_lowercase()) {
175 let sql = format!(
176 "ALTER ROLE {} RESET {}",
177 quote_identifier(role_name),
178 quote_identifier(param)
179 );
180 executor.execute_sql(&sql).await?;
181 }
182 }
183
184 Ok(())
185}