mz_deploy/cli/commands/
apply_secrets.rs1use crate::cli::CliError;
13use crate::cli::commands::apply_objects;
14use crate::cli::commands::grants;
15use crate::cli::executor::ObjectAction;
16use crate::cli::executor::{
17 ApplyPlan, ApplyResult, DeploymentExecutor, ObjectResult, compile_apply_project_and_connect,
18};
19use crate::client::Client;
20use crate::config::Settings;
21use crate::project;
22use crate::project::ast::Statement;
23use crate::project::ir::compiled;
24use crate::project::ir::object_id::ObjectId;
25use crate::secret_resolver::SecretResolver;
26use mz_sql_parser::ast::{AlterSecretStatement, Raw};
27use std::collections::BTreeSet;
28
29const PHASE_NAME: &str = "secrets";
30const GRANT_KIND: grants::GrantObjectKind = grants::GrantObjectKind::Secret;
31
32fn matches(stmt: &Statement) -> bool {
33 matches!(stmt, Statement::CreateSecret(_))
34}
35
36struct Secrets {
37 resolver: SecretResolver,
38}
39
40impl Secrets {
41 fn new(settings: &Settings) -> Result<Self, CliError> {
42 Ok(Secrets {
43 resolver: SecretResolver::new(&settings.profile_config.security),
44 })
45 }
46
47 async fn handle_existing(
48 &self,
49 client: &Client,
50 executor: &DeploymentExecutor<'_>,
51 obj_id: &ObjectId,
52 typed_obj: &compiled::DatabaseObject,
53 ) -> Result<(ObjectAction, Vec<String>), CliError> {
54 let Statement::CreateSecret(ref create_stmt) = typed_obj.stmt else {
55 unreachable!("filtered for CreateSecret");
56 };
57 let resolved_stmt = self.resolver.resolve_secret_for_cli(create_stmt).await?;
58 let alter_stmt = AlterSecretStatement::<Raw> {
59 name: create_stmt.name.clone(),
60 if_exists: false,
61 value: resolved_stmt.value.clone(),
62 };
63 let redacted_statements = vec![alter_stmt.to_string()];
64
65 apply_objects::reconcile_grants_and_comments(
66 client,
67 executor,
68 obj_id,
69 typed_obj,
70 &GRANT_KIND,
71 )
72 .await?;
73
74 Ok((ObjectAction::Altered, redacted_statements))
75 }
76
77 async fn handle_new(
78 &self,
79 client: &Client,
80 executor: &DeploymentExecutor<'_>,
81 obj_id: &ObjectId,
82 typed_obj: &compiled::DatabaseObject,
83 ) -> Result<(ObjectAction, Vec<String>), CliError> {
84 let Statement::CreateSecret(ref create_stmt) = typed_obj.stmt else {
85 unreachable!("filtered for CreateSecret");
86 };
87 let resolved_stmt = self.resolver.resolve_secret_for_cli(create_stmt).await?;
88 let redacted_statements = vec![resolved_stmt.to_string()];
89
90 apply_objects::reconcile_grants_and_comments(
91 client,
92 executor,
93 obj_id,
94 typed_obj,
95 &GRANT_KIND,
96 )
97 .await?;
98
99 Ok((ObjectAction::Created, redacted_statements))
100 }
101}
102
103pub async fn plan(
105 settings: &Settings,
106 client: &Client,
107 executor: &DeploymentExecutor<'_>,
108 planned_project: &project::ir::graph::Project,
109 apply_plan: &mut ApplyPlan,
110) -> Result<ApplyResult, CliError> {
111 let secrets = Secrets::new(settings)?;
112 let mut target_ids = BTreeSet::new();
113 for obj in planned_project.iter_objects() {
114 if matches(&obj.typed_object.stmt) {
115 target_ids.insert(obj.id.clone());
116 }
117 }
118
119 if target_ids.is_empty() {
120 return Ok(ApplyResult {
121 phase: PHASE_NAME.to_string(),
122 results: vec![],
123 });
124 }
125
126 let target_objects = planned_project.get_sorted_objects_filtered(&target_ids)?;
127 let existing = client
128 .introspection()
129 .check_catalog_objects_exist(&target_ids, GRANT_KIND.catalog_table())
130 .await
131 .map_err(CliError::Connection)?;
132
133 let schemas: BTreeSet<_> = target_objects
134 .iter()
135 .filter(|(obj_id, _)| !existing.contains(obj_id))
136 .map(|(obj_id, _)| {
137 project::SchemaQualifier::new(
138 obj_id.expect_database().to_string(),
139 obj_id.schema().to_string(),
140 )
141 })
142 .collect();
143 apply_plan
144 .prepare_schemas(executor, planned_project, &schemas)
145 .await?;
146
147 let mut results = Vec::new();
148
149 for (obj_id, typed_obj) in target_objects {
150 executor.take_statements();
151 let (action, redacted_statements) = if existing.contains(&obj_id) {
152 secrets
153 .handle_existing(client, executor, &obj_id, typed_obj)
154 .await?
155 } else {
156 secrets
157 .handle_new(client, executor, &obj_id, typed_obj)
158 .await?
159 };
160 results.push(ObjectResult {
161 object: obj_id.to_string(),
162 action,
163 statements: executor.take_statements(),
164 redacted_statements,
165 transaction_group: None,
166 });
167 }
168
169 Ok(ApplyResult {
170 phase: PHASE_NAME.to_string(),
171 results,
172 })
173}
174
175pub async fn run(settings: &Settings, dry_run: bool) -> Result<ApplyPlan, CliError> {
177 let (planned_project, client) = compile_apply_project_and_connect(settings).await?;
178 let mut apply_plan = ApplyPlan::new();
179 let executor = DeploymentExecutor::new_dry_run(&client);
180 let phase = plan(
181 settings,
182 &client,
183 &executor,
184 &planned_project,
185 &mut apply_plan,
186 )
187 .await?;
188 apply_plan.add_phase(phase);
189
190 if !dry_run {
191 apply_plan.execute(&client).await?;
192 }
193
194 Ok(apply_plan)
195}