1use std::collections::BTreeSet;
11use std::sync::Arc;
12
13use mz_catalog::memory::objects::{CatalogItem, Secret};
14use mz_expr::MirScalarExpr;
15use mz_ore::collections::CollectionExt;
16use mz_ore::instrument;
17use mz_repr::{CatalogItemId, Datum, RowArena};
18use mz_sql::ast::display::AstDisplay;
19use mz_sql::ast::{ConnectionOption, ConnectionOptionName, Statement, Value, WithOptionValue};
20use mz_sql::catalog::{CatalogError, ObjectType};
21use mz_sql::plan::{self, CreateSecretPlan};
22use mz_sql::session::metadata::SessionMetadata;
23use mz_ssh_util::keys::SshKeyPairSet;
24use tracing::{Instrument, Span, warn};
25
26use crate::coord::sequencer::inner::return_if_err;
27use crate::coord::{
28 AlterSecret, Coordinator, CreateSecretEnsure, CreateSecretFinish, Message, PlanValidity,
29 RotateKeysSecretEnsure, RotateKeysSecretFinish, SecretStage, StageResult, Staged,
30};
31use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
32use crate::session::Session;
33use crate::{AdapterError, AdapterNotice, ExecuteContext, ExecuteResponse, catalog};
34
35impl Staged for SecretStage {
36 type Ctx = ExecuteContext;
37
38 fn validity(&mut self) -> &mut crate::coord::PlanValidity {
39 match self {
40 SecretStage::CreateFinish(stage) => &mut stage.validity,
41 SecretStage::RotateKeysFinish(stage) => &mut stage.validity,
42 SecretStage::RotateKeysEnsure(stage) => &mut stage.validity,
43 SecretStage::CreateEnsure(stage) => &mut stage.validity,
44 SecretStage::Alter(stage) => &mut stage.validity,
45 }
46 }
47
48 async fn stage(
49 self,
50 coord: &mut Coordinator,
51 ctx: &mut ExecuteContext,
52 ) -> Result<crate::coord::StageResult<Box<Self>>, AdapterError> {
53 match self {
54 SecretStage::CreateEnsure(stage) => {
55 coord.create_secret_ensure(ctx.session(), stage).await
56 }
57 SecretStage::CreateFinish(stage) => {
58 coord.create_secret_finish(ctx.session(), stage).await
59 }
60 SecretStage::RotateKeysEnsure(stage) => coord.rotate_keys_ensure(stage),
61 SecretStage::RotateKeysFinish(stage) => {
62 coord.rotate_keys_finish(ctx.session(), stage).await
63 }
64 SecretStage::Alter(stage) => coord.alter_secret(ctx.session(), stage.plan),
65 }
66 }
67
68 fn message(self, ctx: ExecuteContext, span: tracing::Span) -> Message {
69 Message::SecretStageReady {
70 ctx,
71 span,
72 stage: self,
73 }
74 }
75
76 fn cancel_enabled(&self) -> bool {
77 false
80 }
81}
82
83impl Coordinator {
84 #[instrument]
85 pub(crate) async fn sequence_create_secret(
86 &mut self,
87 ctx: ExecuteContext,
88 plan: plan::CreateSecretPlan,
89 ) {
90 let stage = return_if_err!(self.create_secret_validate(ctx.session(), plan).await, ctx);
91 self.sequence_staged(ctx, Span::current(), stage).await;
92 }
93
94 #[instrument]
95 async fn create_secret_validate(
96 &self,
97 session: &Session,
98 plan: plan::CreateSecretPlan,
99 ) -> Result<SecretStage, AdapterError> {
100 let validity = PlanValidity::new(
102 self.catalog().transient_revision(),
103 BTreeSet::new(),
104 None,
105 None,
106 session.role_metadata().clone(),
107 );
108 Ok(SecretStage::CreateEnsure(CreateSecretEnsure {
109 validity,
110 plan,
111 }))
112 }
113
114 #[instrument]
115 async fn create_secret_ensure(
116 &mut self,
117 session: &Session,
118 CreateSecretEnsure { validity, mut plan }: CreateSecretEnsure,
119 ) -> Result<StageResult<Box<SecretStage>>, AdapterError> {
120 let id_ts = self.get_catalog_write_ts().await;
121 let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
122
123 let secrets_controller = Arc::clone(&self.secrets_controller);
124 let payload = self.extract_secret(session, &mut plan.secret.secret_as)?;
125 let span = Span::current();
126 Ok(StageResult::Handle(mz_ore::task::spawn(
127 || "create secret ensure",
128 async move {
129 secrets_controller.ensure(item_id, &payload).await?;
130 let stage = SecretStage::CreateFinish(CreateSecretFinish {
131 validity,
132 item_id,
133 global_id,
134 plan,
135 });
136 Ok(Box::new(stage))
137 }
138 .instrument(span),
139 )))
140 }
141
142 fn extract_secret(
143 &self,
144 session: &Session,
145 secret_as: &mut MirScalarExpr,
146 ) -> Result<Vec<u8>, AdapterError> {
147 let temp_storage = RowArena::new();
148 let style = ExprPrepOneShot {
149 logical_time: EvalTime::NotAvailable,
150 session,
151 catalog_state: self.catalog().state(),
152 };
153 style.prep_scalar_expr(secret_as)?;
154 let evaled = secret_as.eval(&[], &temp_storage)?;
155
156 if evaled == Datum::Null {
157 coord_bail!("secret value can not be null");
158 }
159
160 let payload = evaled.unwrap_bytes();
161
162 if payload.len() > 1024 * 512 {
167 coord_bail!("secrets can not be bigger than 512KiB")
168 }
169
170 if std::str::from_utf8(payload).is_err() {
180 coord_bail!("secret value must be valid UTF-8");
185 }
186
187 Ok(Vec::from(payload))
188 }
189
190 #[instrument]
191 async fn create_secret_finish(
192 &mut self,
193 session: &Session,
194 CreateSecretFinish {
195 item_id,
196 global_id,
197 plan,
198 validity: _,
199 }: CreateSecretFinish,
200 ) -> Result<StageResult<Box<SecretStage>>, AdapterError> {
201 let CreateSecretPlan {
202 name,
203 secret,
204 if_not_exists,
205 } = plan;
206 let secret = Secret {
207 create_sql: secret.create_sql,
208 global_id,
209 };
210
211 let ops = vec![catalog::Op::CreateItem {
212 id: item_id,
213 name: name.clone(),
214 item: CatalogItem::Secret(secret),
215 owner_id: *session.current_role_id(),
216 }];
217
218 let res = match self.catalog_transact(Some(session), ops).await {
219 Ok(()) => Ok(ExecuteResponse::CreatedSecret),
220 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
221 kind:
222 mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
223 })) if if_not_exists => {
224 session.add_notice(AdapterNotice::ObjectAlreadyExists {
225 name: name.item,
226 ty: "secret",
227 });
228 Ok(ExecuteResponse::CreatedSecret)
229 }
230 Err(err) => {
231 if let Err(e) = self.secrets_controller.delete(item_id).await {
232 warn!(
233 "Dropping newly created secrets has encountered an error: {}",
234 e
235 );
236 }
237 Err(err)
238 }
239 };
240 res.map(StageResult::Response)
241 }
242
243 #[instrument]
244 pub(crate) async fn sequence_alter_secret(
245 &mut self,
246 ctx: ExecuteContext,
247 plan: plan::AlterSecretPlan,
248 ) {
249 let validity = PlanValidity::new(
254 self.catalog().transient_revision(),
255 BTreeSet::new(),
256 None,
257 None,
258 ctx.session().role_metadata().clone(),
259 );
260 let stage = SecretStage::Alter(AlterSecret { validity, plan });
261 self.sequence_staged(ctx, Span::current(), stage).await;
262 }
263
264 #[instrument]
265 fn alter_secret(
266 &self,
267 session: &Session,
268 plan: plan::AlterSecretPlan,
269 ) -> Result<StageResult<Box<SecretStage>>, AdapterError> {
270 let plan::AlterSecretPlan { id, mut secret_as } = plan;
271 let secrets_controller = Arc::clone(&self.secrets_controller);
272 let payload = self.extract_secret(session, &mut secret_as)?;
273 let span = Span::current();
274 Ok(StageResult::HandleRetire(mz_ore::task::spawn(
275 || "alter secret ensure",
276 async move {
277 secrets_controller.ensure(id, &payload).await?;
278 Ok(ExecuteResponse::AlteredObject(ObjectType::Secret))
279 }
280 .instrument(span),
281 )))
282 }
283
284 #[instrument]
285 pub(crate) async fn sequence_rotate_keys(&mut self, ctx: ExecuteContext, id: CatalogItemId) {
286 let validity = PlanValidity::new(
293 self.catalog().transient_revision(),
294 BTreeSet::from_iter(std::iter::once(id)),
295 None,
296 None,
297 ctx.session().role_metadata().clone(),
298 );
299 let stage = SecretStage::RotateKeysEnsure(RotateKeysSecretEnsure { validity, id });
300 self.sequence_staged(ctx, Span::current(), stage).await;
301 }
302
303 #[instrument]
304 fn rotate_keys_ensure(
305 &self,
306 RotateKeysSecretEnsure { validity, id }: RotateKeysSecretEnsure,
307 ) -> Result<StageResult<Box<SecretStage>>, AdapterError> {
308 let secrets_controller = Arc::clone(&self.secrets_controller);
309 let entry = self.catalog().get_entry(&id).clone();
310 let span = Span::current();
311 Ok(StageResult::Handle(mz_ore::task::spawn(
312 || "rotate keys ensure",
313 async move {
314 let secret = secrets_controller.reader().read(id).await?;
315 let previous_key_set = SshKeyPairSet::from_bytes(&secret)?;
316 let new_key_set = previous_key_set.rotate()?;
317 secrets_controller
318 .ensure(id, &new_key_set.to_bytes())
319 .await?;
320
321 let mut to_item = entry.item;
322 match &mut to_item {
323 CatalogItem::Connection(c) => {
324 let mut stmt = match mz_sql::parse::parse(&c.create_sql)
325 .expect("invalid create sql persisted to catalog")
326 .into_element()
327 .ast
328 {
329 Statement::CreateConnection(stmt) => stmt,
330 _ => coord_bail!("internal error: persisted SQL for {id} is invalid"),
331 };
332
333 stmt.values.retain(|v| {
334 v.name != ConnectionOptionName::PublicKey1
335 && v.name != ConnectionOptionName::PublicKey2
336 });
337 stmt.values.push(ConnectionOption {
338 name: ConnectionOptionName::PublicKey1,
339 value: Some(WithOptionValue::Value(Value::String(
340 new_key_set.primary().ssh_public_key(),
341 ))),
342 });
343 stmt.values.push(ConnectionOption {
344 name: ConnectionOptionName::PublicKey2,
345 value: Some(WithOptionValue::Value(Value::String(
346 new_key_set.secondary().ssh_public_key(),
347 ))),
348 });
349
350 c.create_sql = stmt.to_ast_string_stable();
351 }
352 _ => coord_bail!(
353 "internal error: rotate keys called on non-connection object {id}"
354 ),
355 }
356
357 let ops = vec![catalog::Op::UpdateItem {
358 id,
359 name: entry.name,
360 to_item,
361 }];
362 let stage = SecretStage::RotateKeysFinish(RotateKeysSecretFinish { validity, ops });
363 Ok(Box::new(stage))
364 }
365 .instrument(span),
366 )))
367 }
368
369 #[instrument]
370 async fn rotate_keys_finish(
371 &mut self,
372 session: &Session,
373 RotateKeysSecretFinish { ops, validity: _ }: RotateKeysSecretFinish,
374 ) -> Result<StageResult<Box<SecretStage>>, AdapterError> {
375 self.catalog_transact(Some(session), ops).await?;
376 Ok(StageResult::Response(ExecuteResponse::AlteredObject(
377 ObjectType::Connection,
378 )))
379 }
380}