1use std::collections::BTreeSet;
11use std::sync::Arc;
12
13use mz_catalog::memory::error::ErrorKind;
14use mz_catalog::memory::objects::{CatalogItem, Secret};
15use mz_expr::MirScalarExpr;
16use mz_ore::collections::CollectionExt;
17use mz_ore::instrument;
18use mz_repr::{CatalogItemId, Datum, RowArena};
19use mz_sql::ast::display::AstDisplay;
20use mz_sql::ast::{ConnectionOption, ConnectionOptionName, Statement, Value, WithOptionValue};
21use mz_sql::catalog::{CatalogError, ObjectType};
22use mz_sql::plan::{self, CreateSecretPlan};
23use mz_sql::session::metadata::SessionMetadata;
24use mz_ssh_util::keys::SshKeyPairSet;
25use tracing::{Instrument, Span, warn};
26
27use crate::coord::sequencer::inner::return_if_err;
28use crate::coord::{
29 AlterSecret, Coordinator, CreateSecretEnsure, CreateSecretFinish, Message, PlanValidity,
30 RotateKeysSecretEnsure, RotateKeysSecretFinish, SecretStage, StageResult, Staged,
31};
32use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
33use crate::session::Session;
34use crate::{AdapterError, AdapterNotice, ExecuteContext, ExecuteResponse, catalog};
35
36impl Staged for SecretStage {
37 type Ctx = ExecuteContext;
38
39 fn validity(&mut self) -> &mut crate::coord::PlanValidity {
40 match self {
41 SecretStage::CreateFinish(stage) => &mut stage.validity,
42 SecretStage::RotateKeysFinish(stage) => &mut stage.validity,
43 SecretStage::RotateKeysEnsure(stage) => &mut stage.validity,
44 SecretStage::CreateEnsure(stage) => &mut stage.validity,
45 SecretStage::Alter(stage) => &mut stage.validity,
46 }
47 }
48
49 async fn stage(
50 self,
51 coord: &mut Coordinator,
52 ctx: &mut ExecuteContext,
53 ) -> Result<crate::coord::StageResult<Box<Self>>, AdapterError> {
54 match self {
55 SecretStage::CreateEnsure(stage) => {
56 coord.create_secret_ensure(ctx.session(), stage).await
57 }
58 SecretStage::CreateFinish(stage) => {
59 coord.create_secret_finish(ctx.session(), stage).await
60 }
61 SecretStage::RotateKeysEnsure(stage) => coord.rotate_keys_ensure(stage),
62 SecretStage::RotateKeysFinish(stage) => {
63 coord.rotate_keys_finish(ctx.session(), stage).await
64 }
65 SecretStage::Alter(stage) => coord.alter_secret(ctx.session(), stage.plan),
66 }
67 }
68
69 fn message(self, ctx: ExecuteContext, span: tracing::Span) -> Message {
70 Message::SecretStageReady {
71 ctx,
72 span,
73 stage: self,
74 }
75 }
76
77 fn cancel_enabled(&self) -> bool {
78 false
81 }
82}
83
84impl Coordinator {
85 #[instrument]
86 pub(crate) async fn sequence_create_secret(
87 &mut self,
88 ctx: ExecuteContext,
89 plan: plan::CreateSecretPlan,
90 ) {
91 let stage = return_if_err!(self.create_secret_validate(ctx.session(), plan).await, ctx);
92 self.sequence_staged(ctx, Span::current(), stage).await;
93 }
94
95 #[instrument]
96 async fn create_secret_validate(
97 &self,
98 session: &Session,
99 plan: plan::CreateSecretPlan,
100 ) -> Result<SecretStage, AdapterError> {
101 let validity = PlanValidity::new(
103 self.catalog().transient_revision(),
104 BTreeSet::new(),
105 None,
106 None,
107 session.role_metadata().clone(),
108 );
109 Ok(SecretStage::CreateEnsure(CreateSecretEnsure {
110 validity,
111 plan,
112 }))
113 }
114
115 #[instrument]
116 async fn create_secret_ensure(
117 &mut self,
118 session: &Session,
119 CreateSecretEnsure { validity, mut plan }: CreateSecretEnsure,
120 ) -> Result<StageResult<Box<SecretStage>>, AdapterError> {
121 let (item_id, global_id) = self.allocate_user_id().await?;
122
123 let secrets_controller = Arc::clone(&self.secrets_controller);
124 let payload = self.extract_secret(session, &mut plan.secret.secret_as)?;
125 let span = Span::current();
126 Ok(StageResult::Handle(mz_ore::task::spawn(
127 || "create secret ensure",
128 async move {
129 secrets_controller.ensure(item_id, &payload).await?;
130 let stage = SecretStage::CreateFinish(CreateSecretFinish {
131 validity,
132 item_id,
133 global_id,
134 plan,
135 });
136 Ok(Box::new(stage))
137 }
138 .instrument(span),
139 )))
140 }
141
142 fn extract_secret(
143 &self,
144 session: &Session,
145 secret_as: &mut MirScalarExpr,
146 ) -> Result<Vec<u8>, AdapterError> {
147 let temp_storage = RowArena::new();
148 let style = ExprPrepOneShot {
149 logical_time: EvalTime::NotAvailable,
150 session,
151 catalog_state: self.catalog().state(),
152 };
153 style.prep_scalar_expr(secret_as)?;
154 let evaled = secret_as.eval(&[], &temp_storage)?;
155
156 if evaled == Datum::Null {
157 coord_bail!("secret value can not be null");
158 }
159
160 let payload = evaled.unwrap_bytes();
161
162 if payload.len() > 1024 * 512 {
167 coord_bail!("secrets can not be bigger than 512KiB")
168 }
169
170 if std::str::from_utf8(payload).is_err() {
180 coord_bail!("secret value must be valid UTF-8");
185 }
186
187 Ok(Vec::from(payload))
188 }
189
190 #[instrument]
191 async fn create_secret_finish(
192 &mut self,
193 session: &Session,
194 CreateSecretFinish {
195 item_id,
196 global_id,
197 plan,
198 validity: _,
199 }: CreateSecretFinish,
200 ) -> Result<StageResult<Box<SecretStage>>, AdapterError> {
201 let CreateSecretPlan {
202 name,
203 secret,
204 if_not_exists,
205 } = plan;
206 let secret = Secret {
207 create_sql: secret.create_sql,
208 global_id,
209 };
210
211 let ops = vec![catalog::Op::CreateItem {
212 id: item_id,
213 name: name.clone(),
214 item: CatalogItem::Secret(secret),
215 owner_id: *session.current_role_id(),
216 }];
217
218 let res = match self.catalog_transact(Some(session), ops).await {
219 Ok(()) => Ok(ExecuteResponse::CreatedSecret),
220 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
221 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
222 })) if if_not_exists => {
223 if let Err(e) = self.secrets_controller.delete(item_id).await {
226 warn!(
227 "Dropping newly created secrets has encountered an error: {}",
228 e
229 );
230 } else {
231 self.caching_secrets_reader.invalidate(item_id);
232 }
233 session.add_notice(AdapterNotice::ObjectAlreadyExists {
234 name: name.item,
235 ty: "secret",
236 });
237 Ok(ExecuteResponse::CreatedSecret)
238 }
239 Err(err) => {
240 if let Err(e) = self.secrets_controller.delete(item_id).await {
241 warn!(
242 "Dropping newly created secrets has encountered an error: {}",
243 e
244 );
245 } else {
246 self.caching_secrets_reader.invalidate(item_id);
247 }
248 Err(err)
249 }
250 };
251 res.map(StageResult::Response)
252 }
253
254 #[instrument]
255 pub(crate) async fn sequence_alter_secret(
256 &mut self,
257 ctx: ExecuteContext,
258 plan: plan::AlterSecretPlan,
259 ) {
260 let validity = PlanValidity::new(
265 self.catalog().transient_revision(),
266 BTreeSet::new(),
267 None,
268 None,
269 ctx.session().role_metadata().clone(),
270 );
271 let stage = SecretStage::Alter(AlterSecret { validity, plan });
272 self.sequence_staged(ctx, Span::current(), stage).await;
273 }
274
275 #[instrument]
276 fn alter_secret(
277 &self,
278 session: &Session,
279 plan: plan::AlterSecretPlan,
280 ) -> Result<StageResult<Box<SecretStage>>, AdapterError> {
281 let plan::AlterSecretPlan { id, mut secret_as } = plan;
282 let caching_secrets_reader = self.caching_secrets_reader.clone();
283 let secrets_controller = Arc::clone(&self.secrets_controller);
284 let payload = self.extract_secret(session, &mut secret_as)?;
285 let span = Span::current();
286 Ok(StageResult::HandleRetire(mz_ore::task::spawn(
287 || "alter secret ensure",
288 async move {
289 secrets_controller.ensure(id, &payload).await?;
290 caching_secrets_reader.invalidate(id);
291 Ok(ExecuteResponse::AlteredObject(ObjectType::Secret))
292 }
293 .instrument(span),
294 )))
295 }
296
297 #[instrument]
298 pub(crate) async fn sequence_rotate_keys(&mut self, ctx: ExecuteContext, id: CatalogItemId) {
299 let validity = PlanValidity::new(
306 self.catalog().transient_revision(),
307 BTreeSet::from_iter(std::iter::once(id)),
308 None,
309 None,
310 ctx.session().role_metadata().clone(),
311 );
312 let stage = SecretStage::RotateKeysEnsure(RotateKeysSecretEnsure { validity, id });
313 self.sequence_staged(ctx, Span::current(), stage).await;
314 }
315
316 #[instrument]
317 fn rotate_keys_ensure(
318 &self,
319 RotateKeysSecretEnsure { validity, id }: RotateKeysSecretEnsure,
320 ) -> Result<StageResult<Box<SecretStage>>, AdapterError> {
321 let caching_secrets_reader = self.caching_secrets_reader.clone();
322 let secrets_controller = Arc::clone(&self.secrets_controller);
323 let entry = self.catalog().get_entry(&id).clone();
324 let span = Span::current();
325 Ok(StageResult::Handle(mz_ore::task::spawn(
326 || "rotate keys ensure",
327 async move {
328 let secret = secrets_controller.reader().read(id).await?;
329 let previous_key_set = SshKeyPairSet::from_bytes(&secret)?;
330 let new_key_set = previous_key_set.rotate()?;
331 secrets_controller
332 .ensure(id, &new_key_set.to_bytes())
333 .await?;
334 caching_secrets_reader.invalidate(id);
335
336 let mut to_item = entry.item;
337 match &mut to_item {
338 CatalogItem::Connection(c) => {
339 let mut stmt = match mz_sql::parse::parse(&c.create_sql)
340 .expect("invalid create sql persisted to catalog")
341 .into_element()
342 .ast
343 {
344 Statement::CreateConnection(stmt) => stmt,
345 _ => coord_bail!("internal error: persisted SQL for {id} is invalid"),
346 };
347
348 stmt.values.retain(|v| {
349 v.name != ConnectionOptionName::PublicKey1
350 && v.name != ConnectionOptionName::PublicKey2
351 });
352 stmt.values.push(ConnectionOption {
353 name: ConnectionOptionName::PublicKey1,
354 value: Some(WithOptionValue::Value(Value::String(
355 new_key_set.primary().ssh_public_key(),
356 ))),
357 });
358 stmt.values.push(ConnectionOption {
359 name: ConnectionOptionName::PublicKey2,
360 value: Some(WithOptionValue::Value(Value::String(
361 new_key_set.secondary().ssh_public_key(),
362 ))),
363 });
364
365 c.create_sql = stmt.to_ast_string_stable();
366 }
367 _ => coord_bail!(
368 "internal error: rotate keys called on non-connection object {id}"
369 ),
370 }
371
372 let ops = vec![catalog::Op::UpdateItem {
373 id,
374 name: entry.name,
375 to_item,
376 }];
377 let stage = SecretStage::RotateKeysFinish(RotateKeysSecretFinish { validity, ops });
378 Ok(Box::new(stage))
379 }
380 .instrument(span),
381 )))
382 }
383
384 #[instrument]
385 async fn rotate_keys_finish(
386 &mut self,
387 session: &Session,
388 RotateKeysSecretFinish { ops, validity: _ }: RotateKeysSecretFinish,
389 ) -> Result<StageResult<Box<SecretStage>>, AdapterError> {
390 self.catalog_transact(Some(session), ops).await?;
391 Ok(StageResult::Response(ExecuteResponse::AlteredObject(
392 ObjectType::Connection,
393 )))
394 }
395}