mz_deploy/cli/commands/
roles.rs1use crate::cli::CliError;
13use crate::cli::executor::{
14 ApplyPlan, ApplyResult, DeploymentExecutor, ObjectAction, ObjectResult, connect_apply_client,
15};
16use crate::client::Client;
17use crate::client::quote_identifier;
18use crate::config::Settings;
19use crate::project::roles::{self, RoleDefinition};
20use itertools::Itertools;
21use mz_sql_parser::ast::AlterRoleOption;
22use mz_sql_parser::ast::SetRoleVar;
23use std::collections::BTreeSet;
24
25pub async fn plan(
27 settings: &Settings,
28 client: &Client,
29 executor: &DeploymentExecutor<'_>,
30) -> Result<ApplyResult, CliError> {
31 let profile = settings.connection();
32 let directory = &settings.directory;
33
34 let definitions = roles::load_roles(directory, &profile.name, settings.variables())?;
35
36 if definitions.is_empty() {
37 return Ok(ApplyResult {
38 phase: "roles".to_string(),
39 results: vec![],
40 });
41 }
42
43 let mut actions = Vec::new();
45 for def in &definitions {
46 executor.take_statements();
47 let action = create_role(client, executor, def).await?;
48 actions.push((action, executor.take_statements()));
49 }
50
51 let mut object_results = Vec::new();
53 for (def, (action, create_stmts)) in definitions.iter().zip_eq(actions) {
54 executor.take_statements();
55 configure_role(client, executor, def).await?;
56 let mut statements = create_stmts;
57 statements.extend(executor.take_statements());
58 object_results.push(ObjectResult {
59 object: def.name.clone(),
60 action,
61 statements,
62 redacted_statements: vec![],
63 transaction_group: None,
64 post_statements: vec![],
65 });
66 }
67
68 Ok(ApplyResult {
69 phase: "roles".to_string(),
70 results: object_results,
71 })
72}
73
74pub async fn run(settings: &Settings, dry_run: bool) -> Result<ApplyPlan, CliError> {
76 let client = connect_apply_client(settings).await?;
77 let executor = DeploymentExecutor::new_dry_run(&client);
78 let mut plan_result = ApplyPlan::new();
79 let phase = plan(settings, &client, &executor).await?;
80 plan_result.add_phase(phase);
81
82 if !dry_run {
83 plan_result.execute(&client).await?;
84 }
85
86 Ok(plan_result)
87}
88
89async fn create_role(
91 client: &Client,
92 executor: &DeploymentExecutor<'_>,
93 def: &RoleDefinition,
94) -> Result<ObjectAction, CliError> {
95 let exists = client
96 .introspection()
97 .role_exists(&def.name)
98 .await
99 .map_err(CliError::Connection)?;
100
101 if exists {
102 Ok(ObjectAction::UpToDate)
103 } else {
104 executor.execute_sql(&def.create_stmt).await?;
105 Ok(ObjectAction::Created)
106 }
107}
108
109async fn configure_role(
111 client: &Client,
112 executor: &DeploymentExecutor<'_>,
113 def: &RoleDefinition,
114) -> Result<(), CliError> {
115 let role_name = &def.name;
116
117 for alter in &def.alter_stmts {
119 executor.execute_sql(alter).await?;
120 }
121
122 for grant in &def.grants {
124 executor.execute_sql(grant).await?;
125 }
126
127 for comment in &def.comments {
129 executor.execute_sql(comment).await?;
130 }
131
132 let current_members = client
134 .introspection()
135 .get_role_members(role_name)
136 .await
137 .map_err(CliError::Connection)?;
138
139 let desired_members: BTreeSet<String> = def
140 .grants
141 .iter()
142 .flat_map(|g| g.member_names.iter().map(|m| m.as_str().to_lowercase()))
143 .collect();
144
145 for member in ¤t_members {
146 if !desired_members.contains(&member.to_lowercase()) {
147 let sql = format!(
148 "REVOKE {} FROM {}",
149 quote_identifier(role_name),
150 quote_identifier(member)
151 );
152 executor.execute_sql(&sql).await?;
153 }
154 }
155
156 let current_params = client
158 .introspection()
159 .get_role_parameters(role_name)
160 .await
161 .map_err(CliError::Connection)?;
162
163 let desired_params: BTreeSet<String> = def
164 .alter_stmts
165 .iter()
166 .filter_map(|alter| match &alter.option {
167 AlterRoleOption::Variable(SetRoleVar::Set { name, .. }) => {
168 Some(name.as_str().to_lowercase())
169 }
170 _ => None,
171 })
172 .collect();
173
174 for param in ¤t_params {
175 if !desired_params.contains(¶m.to_lowercase()) {
176 let sql = format!(
177 "ALTER ROLE {} RESET {}",
178 quote_identifier(role_name),
179 quote_identifier(param)
180 );
181 executor.execute_sql(&sql).await?;
182 }
183 }
184
185 Ok(())
186}