1use std::collections::BTreeSet;
11use std::sync::Arc;
12
13use mz_catalog::memory::error::ErrorKind;
14use mz_catalog::memory::objects::{CatalogItem, Secret};
15use mz_expr::MirScalarExpr;
16use mz_ore::collections::CollectionExt;
17use mz_ore::instrument;
18use mz_repr::{CatalogItemId, Datum, RowArena};
19use mz_sql::ast::display::AstDisplay;
20use mz_sql::ast::{ConnectionOption, ConnectionOptionName, Statement, Value, WithOptionValue};
21use mz_sql::catalog::{CatalogError, ObjectType};
22use mz_sql::plan::{self, CreateSecretPlan};
23use mz_sql::session::metadata::SessionMetadata;
24use mz_ssh_util::keys::SshKeyPairSet;
25use tracing::{Instrument, Span, warn};
26
27use crate::coord::sequencer::inner::return_if_err;
28use crate::coord::{
29 AlterSecret, Coordinator, CreateSecretEnsure, CreateSecretFinish, Message, PlanValidity,
30 RotateKeysSecretEnsure, RotateKeysSecretFinish, SecretStage, StageResult, Staged,
31};
32use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
33use crate::session::Session;
34use crate::{AdapterError, AdapterNotice, ExecuteContext, ExecuteResponse, catalog};
35
36impl Staged for SecretStage {
37 type Ctx = ExecuteContext;
38
39 fn validity(&mut self) -> &mut crate::coord::PlanValidity {
40 match self {
41 SecretStage::CreateFinish(stage) => &mut stage.validity,
42 SecretStage::RotateKeysFinish(stage) => &mut stage.validity,
43 SecretStage::RotateKeysEnsure(stage) => &mut stage.validity,
44 SecretStage::CreateEnsure(stage) => &mut stage.validity,
45 SecretStage::Alter(stage) => &mut stage.validity,
46 }
47 }
48
49 async fn stage(
50 self,
51 coord: &mut Coordinator,
52 ctx: &mut ExecuteContext,
53 ) -> Result<crate::coord::StageResult<Box<Self>>, AdapterError> {
54 match self {
55 SecretStage::CreateEnsure(stage) => {
56 coord.create_secret_ensure(ctx.session(), stage).await
57 }
58 SecretStage::CreateFinish(stage) => {
59 coord.create_secret_finish(ctx.session(), stage).await
60 }
61 SecretStage::RotateKeysEnsure(stage) => coord.rotate_keys_ensure(stage),
62 SecretStage::RotateKeysFinish(stage) => {
63 coord.rotate_keys_finish(ctx.session(), stage).await
64 }
65 SecretStage::Alter(stage) => coord.alter_secret(ctx.session(), stage.plan),
66 }
67 }
68
69 fn message(self, ctx: ExecuteContext, span: tracing::Span) -> Message {
70 Message::SecretStageReady {
71 ctx,
72 span,
73 stage: self,
74 }
75 }
76
77 fn cancel_enabled(&self) -> bool {
78 false
81 }
82}
83
84impl Coordinator {
85 #[instrument]
86 pub(crate) async fn sequence_create_secret(
87 &mut self,
88 ctx: ExecuteContext,
89 plan: plan::CreateSecretPlan,
90 ) {
91 let stage = return_if_err!(self.create_secret_validate(ctx.session(), plan).await, ctx);
92 self.sequence_staged(ctx, Span::current(), stage).await;
93 }
94
95 #[instrument]
96 async fn create_secret_validate(
97 &self,
98 session: &Session,
99 plan: plan::CreateSecretPlan,
100 ) -> Result<SecretStage, AdapterError> {
101 let validity = PlanValidity::new(
103 self.catalog().transient_revision(),
104 BTreeSet::new(),
105 None,
106 None,
107 session.role_metadata().clone(),
108 );
109 Ok(SecretStage::CreateEnsure(CreateSecretEnsure {
110 validity,
111 plan,
112 }))
113 }
114
115 #[instrument]
116 async fn create_secret_ensure(
117 &mut self,
118 session: &Session,
119 CreateSecretEnsure { validity, mut plan }: CreateSecretEnsure,
120 ) -> Result<StageResult<Box<SecretStage>>, AdapterError> {
121 let (item_id, global_id) = self.allocate_user_id().await?;
122
123 let secrets_controller = Arc::clone(&self.secrets_controller);
124 let payload = self.extract_secret(session, &mut plan.secret.secret_as)?;
125 let span = Span::current();
126 Ok(StageResult::Handle(mz_ore::task::spawn(
127 || "create secret ensure",
128 async move {
129 secrets_controller.ensure(item_id, &payload).await?;
130 let stage = SecretStage::CreateFinish(CreateSecretFinish {
131 validity,
132 item_id,
133 global_id,
134 plan,
135 });
136 Ok(Box::new(stage))
137 }
138 .instrument(span),
139 )))
140 }
141
142 fn extract_secret(
143 &self,
144 session: &Session,
145 secret_as: &mut MirScalarExpr,
146 ) -> Result<Vec<u8>, AdapterError> {
147 let temp_storage = RowArena::new();
148 let style = ExprPrepOneShot {
149 logical_time: EvalTime::NotAvailable,
150 session,
151 catalog_state: self.catalog().state(),
152 };
153 style.prep_scalar_expr(secret_as)?;
154 let evaled = secret_as.eval(&[], &temp_storage)?;
155
156 if evaled == Datum::Null {
157 coord_bail!("secret value can not be null");
158 }
159
160 let payload = evaled.unwrap_bytes();
161
162 if payload.len() > 1024 * 512 {
167 coord_bail!("secrets can not be bigger than 512KiB")
168 }
169
170 if std::str::from_utf8(payload).is_err() {
180 coord_bail!("secret value must be valid UTF-8");
185 }
186
187 Ok(Vec::from(payload))
188 }
189
190 #[instrument]
191 async fn create_secret_finish(
192 &mut self,
193 session: &Session,
194 CreateSecretFinish {
195 item_id,
196 global_id,
197 plan,
198 validity: _,
199 }: CreateSecretFinish,
200 ) -> Result<StageResult<Box<SecretStage>>, AdapterError> {
201 let CreateSecretPlan {
202 name,
203 secret,
204 if_not_exists,
205 } = plan;
206 let secret = Secret {
207 create_sql: secret.create_sql,
208 global_id,
209 };
210
211 let ops = vec![catalog::Op::CreateItem {
212 id: item_id,
213 name: name.clone(),
214 item: CatalogItem::Secret(secret),
215 owner_id: *session.current_role_id(),
216 }];
217
218 let res = match self.catalog_transact(Some(session), ops).await {
219 Ok(()) => Ok(ExecuteResponse::CreatedSecret),
220 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
221 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
222 })) if if_not_exists => {
223 session.add_notice(AdapterNotice::ObjectAlreadyExists {
224 name: name.item,
225 ty: "secret",
226 });
227 Ok(ExecuteResponse::CreatedSecret)
228 }
229 Err(err) => {
230 if let Err(e) = self.secrets_controller.delete(item_id).await {
231 warn!(
232 "Dropping newly created secrets has encountered an error: {}",
233 e
234 );
235 } else {
236 self.caching_secrets_reader.invalidate(item_id);
237 }
238 Err(err)
239 }
240 };
241 res.map(StageResult::Response)
242 }
243
244 #[instrument]
245 pub(crate) async fn sequence_alter_secret(
246 &mut self,
247 ctx: ExecuteContext,
248 plan: plan::AlterSecretPlan,
249 ) {
250 let validity = PlanValidity::new(
255 self.catalog().transient_revision(),
256 BTreeSet::new(),
257 None,
258 None,
259 ctx.session().role_metadata().clone(),
260 );
261 let stage = SecretStage::Alter(AlterSecret { validity, plan });
262 self.sequence_staged(ctx, Span::current(), stage).await;
263 }
264
265 #[instrument]
266 fn alter_secret(
267 &self,
268 session: &Session,
269 plan: plan::AlterSecretPlan,
270 ) -> Result<StageResult<Box<SecretStage>>, AdapterError> {
271 let plan::AlterSecretPlan { id, mut secret_as } = plan;
272 let caching_secrets_reader = self.caching_secrets_reader.clone();
273 let secrets_controller = Arc::clone(&self.secrets_controller);
274 let payload = self.extract_secret(session, &mut secret_as)?;
275 let span = Span::current();
276 Ok(StageResult::HandleRetire(mz_ore::task::spawn(
277 || "alter secret ensure",
278 async move {
279 secrets_controller.ensure(id, &payload).await?;
280 caching_secrets_reader.invalidate(id);
281 Ok(ExecuteResponse::AlteredObject(ObjectType::Secret))
282 }
283 .instrument(span),
284 )))
285 }
286
287 #[instrument]
288 pub(crate) async fn sequence_rotate_keys(&mut self, ctx: ExecuteContext, id: CatalogItemId) {
289 let validity = PlanValidity::new(
296 self.catalog().transient_revision(),
297 BTreeSet::from_iter(std::iter::once(id)),
298 None,
299 None,
300 ctx.session().role_metadata().clone(),
301 );
302 let stage = SecretStage::RotateKeysEnsure(RotateKeysSecretEnsure { validity, id });
303 self.sequence_staged(ctx, Span::current(), stage).await;
304 }
305
306 #[instrument]
307 fn rotate_keys_ensure(
308 &self,
309 RotateKeysSecretEnsure { validity, id }: RotateKeysSecretEnsure,
310 ) -> Result<StageResult<Box<SecretStage>>, AdapterError> {
311 let caching_secrets_reader = self.caching_secrets_reader.clone();
312 let secrets_controller = Arc::clone(&self.secrets_controller);
313 let entry = self.catalog().get_entry(&id).clone();
314 let span = Span::current();
315 Ok(StageResult::Handle(mz_ore::task::spawn(
316 || "rotate keys ensure",
317 async move {
318 let secret = secrets_controller.reader().read(id).await?;
319 let previous_key_set = SshKeyPairSet::from_bytes(&secret)?;
320 let new_key_set = previous_key_set.rotate()?;
321 secrets_controller
322 .ensure(id, &new_key_set.to_bytes())
323 .await?;
324 caching_secrets_reader.invalidate(id);
325
326 let mut to_item = entry.item;
327 match &mut to_item {
328 CatalogItem::Connection(c) => {
329 let mut stmt = match mz_sql::parse::parse(&c.create_sql)
330 .expect("invalid create sql persisted to catalog")
331 .into_element()
332 .ast
333 {
334 Statement::CreateConnection(stmt) => stmt,
335 _ => coord_bail!("internal error: persisted SQL for {id} is invalid"),
336 };
337
338 stmt.values.retain(|v| {
339 v.name != ConnectionOptionName::PublicKey1
340 && v.name != ConnectionOptionName::PublicKey2
341 });
342 stmt.values.push(ConnectionOption {
343 name: ConnectionOptionName::PublicKey1,
344 value: Some(WithOptionValue::Value(Value::String(
345 new_key_set.primary().ssh_public_key(),
346 ))),
347 });
348 stmt.values.push(ConnectionOption {
349 name: ConnectionOptionName::PublicKey2,
350 value: Some(WithOptionValue::Value(Value::String(
351 new_key_set.secondary().ssh_public_key(),
352 ))),
353 });
354
355 c.create_sql = stmt.to_ast_string_stable();
356 }
357 _ => coord_bail!(
358 "internal error: rotate keys called on non-connection object {id}"
359 ),
360 }
361
362 let ops = vec![catalog::Op::UpdateItem {
363 id,
364 name: entry.name,
365 to_item,
366 }];
367 let stage = SecretStage::RotateKeysFinish(RotateKeysSecretFinish { validity, ops });
368 Ok(Box::new(stage))
369 }
370 .instrument(span),
371 )))
372 }
373
374 #[instrument]
375 async fn rotate_keys_finish(
376 &mut self,
377 session: &Session,
378 RotateKeysSecretFinish { ops, validity: _ }: RotateKeysSecretFinish,
379 ) -> Result<StageResult<Box<SecretStage>>, AdapterError> {
380 self.catalog_transact(Some(session), ops).await?;
381 Ok(StageResult::Response(ExecuteResponse::AlteredObject(
382 ObjectType::Connection,
383 )))
384 }
385}