1use std::collections::BTreeSet;
11use std::sync::Arc;
12
13use mz_catalog::memory::objects::{CatalogItem, Secret};
14use mz_expr::MirScalarExpr;
15use mz_ore::collections::CollectionExt;
16use mz_ore::instrument;
17use mz_repr::{CatalogItemId, Datum, RowArena};
18use mz_sql::ast::display::AstDisplay;
19use mz_sql::ast::{ConnectionOption, ConnectionOptionName, Statement, Value, WithOptionValue};
20use mz_sql::catalog::{CatalogError, ObjectType};
21use mz_sql::plan::{self, CreateSecretPlan};
22use mz_sql::session::metadata::SessionMetadata;
23use mz_ssh_util::keys::SshKeyPairSet;
24use tracing::{Instrument, Span, warn};
25
26use crate::coord::sequencer::inner::return_if_err;
27use crate::coord::{
28 AlterSecret, Coordinator, CreateSecretEnsure, CreateSecretFinish, Message, PlanValidity,
29 RotateKeysSecretEnsure, RotateKeysSecretFinish, SecretStage, StageResult, Staged,
30};
31use crate::optimize::dataflows::{EvalTime, ExprPrepStyle, prep_scalar_expr};
32use crate::session::Session;
33use crate::{AdapterError, AdapterNotice, ExecuteContext, ExecuteResponse, catalog};
34
35impl Staged for SecretStage {
36 type Ctx = ExecuteContext;
37
38 fn validity(&mut self) -> &mut crate::coord::PlanValidity {
39 match self {
40 SecretStage::CreateFinish(stage) => &mut stage.validity,
41 SecretStage::RotateKeysFinish(stage) => &mut stage.validity,
42 SecretStage::RotateKeysEnsure(stage) => &mut stage.validity,
43 SecretStage::CreateEnsure(stage) => &mut stage.validity,
44 SecretStage::Alter(stage) => &mut stage.validity,
45 }
46 }
47
48 async fn stage(
49 self,
50 coord: &mut Coordinator,
51 ctx: &mut ExecuteContext,
52 ) -> Result<crate::coord::StageResult<Box<Self>>, AdapterError> {
53 match self {
54 SecretStage::CreateEnsure(stage) => {
55 coord.create_secret_ensure(ctx.session(), stage).await
56 }
57 SecretStage::CreateFinish(stage) => {
58 coord.create_secret_finish(ctx.session(), stage).await
59 }
60 SecretStage::RotateKeysEnsure(stage) => coord.rotate_keys_ensure(stage),
61 SecretStage::RotateKeysFinish(stage) => {
62 coord.rotate_keys_finish(ctx.session(), stage).await
63 }
64 SecretStage::Alter(stage) => coord.alter_secret(ctx.session(), stage.plan),
65 }
66 }
67
68 fn message(self, ctx: ExecuteContext, span: tracing::Span) -> Message {
69 Message::SecretStageReady {
70 ctx,
71 span,
72 stage: self,
73 }
74 }
75
76 fn cancel_enabled(&self) -> bool {
77 false
80 }
81}
82
83impl Coordinator {
84 #[instrument]
85 pub(crate) async fn sequence_create_secret(
86 &mut self,
87 ctx: ExecuteContext,
88 plan: plan::CreateSecretPlan,
89 ) {
90 let stage = return_if_err!(self.create_secret_validate(ctx.session(), plan).await, ctx);
91 self.sequence_staged(ctx, Span::current(), stage).await;
92 }
93
94 #[instrument]
95 async fn create_secret_validate(
96 &mut self,
97 session: &Session,
98 plan: plan::CreateSecretPlan,
99 ) -> Result<SecretStage, AdapterError> {
100 let validity = PlanValidity::new(
102 self.catalog().transient_revision(),
103 BTreeSet::new(),
104 None,
105 None,
106 session.role_metadata().clone(),
107 );
108 Ok(SecretStage::CreateEnsure(CreateSecretEnsure {
109 validity,
110 plan,
111 }))
112 }
113
114 #[instrument]
115 async fn create_secret_ensure(
116 &mut self,
117 session: &Session,
118 CreateSecretEnsure { validity, mut plan }: CreateSecretEnsure,
119 ) -> Result<StageResult<Box<SecretStage>>, AdapterError> {
120 let id_ts = self.get_catalog_write_ts().await;
121 let (item_id, global_id) = self.catalog_mut().allocate_user_id(id_ts).await?;
122
123 let secrets_controller = Arc::clone(&self.secrets_controller);
124 let payload = self.extract_secret(session, &mut plan.secret.secret_as)?;
125 let span = Span::current();
126 Ok(StageResult::Handle(mz_ore::task::spawn(
127 || "create secret ensure",
128 async move {
129 secrets_controller.ensure(item_id, &payload).await?;
130 let stage = SecretStage::CreateFinish(CreateSecretFinish {
131 validity,
132 item_id,
133 global_id,
134 plan,
135 });
136 Ok(Box::new(stage))
137 }
138 .instrument(span),
139 )))
140 }
141
142 fn extract_secret(
143 &self,
144 session: &Session,
145 secret_as: &mut MirScalarExpr,
146 ) -> Result<Vec<u8>, AdapterError> {
147 let temp_storage = RowArena::new();
148 prep_scalar_expr(
149 secret_as,
150 ExprPrepStyle::OneShot {
151 logical_time: EvalTime::NotAvailable,
152 session,
153 catalog_state: self.catalog().state(),
154 },
155 )?;
156 let evaled = secret_as.eval(&[], &temp_storage)?;
157
158 if evaled == Datum::Null {
159 coord_bail!("secret value can not be null");
160 }
161
162 let payload = evaled.unwrap_bytes();
163
164 if payload.len() > 1024 * 512 {
169 coord_bail!("secrets can not be bigger than 512KiB")
170 }
171
172 if std::str::from_utf8(payload).is_err() {
182 coord_bail!("secret value must be valid UTF-8");
187 }
188
189 Ok(Vec::from(payload))
190 }
191
192 #[instrument]
193 async fn create_secret_finish(
194 &mut self,
195 session: &Session,
196 CreateSecretFinish {
197 item_id,
198 global_id,
199 plan,
200 validity: _,
201 }: CreateSecretFinish,
202 ) -> Result<StageResult<Box<SecretStage>>, AdapterError> {
203 let CreateSecretPlan {
204 name,
205 secret,
206 if_not_exists,
207 } = plan;
208 let secret = Secret {
209 create_sql: secret.create_sql,
210 global_id,
211 };
212
213 let ops = vec![catalog::Op::CreateItem {
214 id: item_id,
215 name: name.clone(),
216 item: CatalogItem::Secret(secret),
217 owner_id: *session.current_role_id(),
218 }];
219
220 let res = match self.catalog_transact(Some(session), ops).await {
221 Ok(()) => Ok(ExecuteResponse::CreatedSecret),
222 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
223 kind:
224 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
225 })) if if_not_exists => {
226 session.add_notice(AdapterNotice::ObjectAlreadyExists {
227 name: name.item,
228 ty: "secret",
229 });
230 Ok(ExecuteResponse::CreatedSecret)
231 }
232 Err(err) => {
233 if let Err(e) = self.secrets_controller.delete(item_id).await {
234 warn!(
235 "Dropping newly created secrets has encountered an error: {}",
236 e
237 );
238 }
239 Err(err)
240 }
241 };
242 res.map(StageResult::Response)
243 }
244
245 #[instrument]
246 pub(crate) async fn sequence_alter_secret(
247 &mut self,
248 ctx: ExecuteContext,
249 plan: plan::AlterSecretPlan,
250 ) {
251 let validity = PlanValidity::new(
256 self.catalog().transient_revision(),
257 BTreeSet::new(),
258 None,
259 None,
260 ctx.session().role_metadata().clone(),
261 );
262 let stage = SecretStage::Alter(AlterSecret { validity, plan });
263 self.sequence_staged(ctx, Span::current(), stage).await;
264 }
265
266 #[instrument]
267 fn alter_secret(
268 &mut self,
269 session: &Session,
270 plan: plan::AlterSecretPlan,
271 ) -> Result<StageResult<Box<SecretStage>>, AdapterError> {
272 let plan::AlterSecretPlan { id, mut secret_as } = plan;
273 let secrets_controller = Arc::clone(&self.secrets_controller);
274 let payload = self.extract_secret(session, &mut secret_as)?;
275 let span = Span::current();
276 Ok(StageResult::HandleRetire(mz_ore::task::spawn(
277 || "alter secret ensure",
278 async move {
279 secrets_controller.ensure(id, &payload).await?;
280 Ok(ExecuteResponse::AlteredObject(ObjectType::Secret))
281 }
282 .instrument(span),
283 )))
284 }
285
286 #[instrument]
287 pub(crate) async fn sequence_rotate_keys(&mut self, ctx: ExecuteContext, id: CatalogItemId) {
288 let validity = PlanValidity::new(
295 self.catalog().transient_revision(),
296 BTreeSet::from_iter(std::iter::once(id)),
297 None,
298 None,
299 ctx.session().role_metadata().clone(),
300 );
301 let stage = SecretStage::RotateKeysEnsure(RotateKeysSecretEnsure { validity, id });
302 self.sequence_staged(ctx, Span::current(), stage).await;
303 }
304
305 #[instrument]
306 fn rotate_keys_ensure(
307 &mut self,
308 RotateKeysSecretEnsure { validity, id }: RotateKeysSecretEnsure,
309 ) -> Result<StageResult<Box<SecretStage>>, AdapterError> {
310 let secrets_controller = Arc::clone(&self.secrets_controller);
311 let entry = self.catalog().get_entry(&id).clone();
312 let span = Span::current();
313 Ok(StageResult::Handle(mz_ore::task::spawn(
314 || "rotate keys ensure",
315 async move {
316 let secret = secrets_controller.reader().read(id).await?;
317 let previous_key_set = SshKeyPairSet::from_bytes(&secret)?;
318 let new_key_set = previous_key_set.rotate()?;
319 secrets_controller
320 .ensure(id, &new_key_set.to_bytes())
321 .await?;
322
323 let mut to_item = entry.item;
324 match &mut to_item {
325 CatalogItem::Connection(c) => {
326 let mut stmt = match mz_sql::parse::parse(&c.create_sql)
327 .expect("invalid create sql persisted to catalog")
328 .into_element()
329 .ast
330 {
331 Statement::CreateConnection(stmt) => stmt,
332 _ => coord_bail!("internal error: persisted SQL for {id} is invalid"),
333 };
334
335 stmt.values.retain(|v| {
336 v.name != ConnectionOptionName::PublicKey1
337 && v.name != ConnectionOptionName::PublicKey2
338 });
339 stmt.values.push(ConnectionOption {
340 name: ConnectionOptionName::PublicKey1,
341 value: Some(WithOptionValue::Value(Value::String(
342 new_key_set.primary().ssh_public_key(),
343 ))),
344 });
345 stmt.values.push(ConnectionOption {
346 name: ConnectionOptionName::PublicKey2,
347 value: Some(WithOptionValue::Value(Value::String(
348 new_key_set.secondary().ssh_public_key(),
349 ))),
350 });
351
352 c.create_sql = stmt.to_ast_string_stable();
353 }
354 _ => coord_bail!(
355 "internal error: rotate keys called on non-connection object {id}"
356 ),
357 }
358
359 let ops = vec![catalog::Op::UpdateItem {
360 id,
361 name: entry.name,
362 to_item,
363 }];
364 let stage = SecretStage::RotateKeysFinish(RotateKeysSecretFinish { validity, ops });
365 Ok(Box::new(stage))
366 }
367 .instrument(span),
368 )))
369 }
370
371 #[instrument]
372 async fn rotate_keys_finish(
373 &mut self,
374 session: &Session,
375 RotateKeysSecretFinish { ops, validity: _ }: RotateKeysSecretFinish,
376 ) -> Result<StageResult<Box<SecretStage>>, AdapterError> {
377 self.catalog_transact(Some(session), ops).await?;
378 Ok(StageResult::Response(ExecuteResponse::AlteredObject(
379 ObjectType::Connection,
380 )))
381 }
382}