1use std::collections::BTreeSet;
11use std::sync::Arc;
12
13use mz_catalog::memory::error::ErrorKind;
14use mz_catalog::memory::objects::{CatalogItem, Secret};
15use mz_expr::MirScalarExpr;
16use mz_ore::collections::CollectionExt;
17use mz_ore::instrument;
18use mz_repr::{CatalogItemId, Datum, RowArena};
19use mz_sql::ast::display::AstDisplay;
20use mz_sql::ast::{ConnectionOption, ConnectionOptionName, Statement, Value, WithOptionValue};
21use mz_sql::catalog::{CatalogError, ObjectType};
22use mz_sql::plan::{self, CreateSecretPlan};
23use mz_sql::session::metadata::SessionMetadata;
24use mz_ssh_util::keys::SshKeyPairSet;
25use tracing::{Instrument, Span, warn};
26
27use crate::coord::sequencer::inner::return_if_err;
28use crate::coord::{
29 AlterSecret, Coordinator, CreateSecretEnsure, CreateSecretFinish, Message, PlanValidity,
30 RotateKeysSecretEnsure, RotateKeysSecretFinish, SecretStage, StageResult, Staged,
31};
32use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
33use crate::session::Session;
34use crate::{AdapterError, AdapterNotice, ExecuteContext, ExecuteResponse, catalog};
35
36impl Staged for SecretStage {
37 type Ctx = ExecuteContext;
38
39 fn validity(&mut self) -> &mut crate::coord::PlanValidity {
40 match self {
41 SecretStage::CreateFinish(stage) => &mut stage.validity,
42 SecretStage::RotateKeysFinish(stage) => &mut stage.validity,
43 SecretStage::RotateKeysEnsure(stage) => &mut stage.validity,
44 SecretStage::CreateEnsure(stage) => &mut stage.validity,
45 SecretStage::Alter(stage) => &mut stage.validity,
46 }
47 }
48
49 async fn stage(
50 self,
51 coord: &mut Coordinator,
52 ctx: &mut ExecuteContext,
53 ) -> Result<crate::coord::StageResult<Box<Self>>, AdapterError> {
54 match self {
55 SecretStage::CreateEnsure(stage) => {
56 coord.create_secret_ensure(ctx.session(), stage).await
57 }
58 SecretStage::CreateFinish(stage) => {
59 coord.create_secret_finish(ctx.session(), stage).await
60 }
61 SecretStage::RotateKeysEnsure(stage) => coord.rotate_keys_ensure(stage),
62 SecretStage::RotateKeysFinish(stage) => {
63 coord.rotate_keys_finish(ctx.session(), stage).await
64 }
65 SecretStage::Alter(stage) => coord.alter_secret(ctx.session(), stage.plan),
66 }
67 }
68
69 fn message(self, ctx: ExecuteContext, span: tracing::Span) -> Message {
70 Message::SecretStageReady {
71 ctx,
72 span,
73 stage: self,
74 }
75 }
76
77 fn cancel_enabled(&self) -> bool {
78 false
81 }
82}
83
84impl Coordinator {
85 #[instrument]
86 pub(crate) async fn sequence_create_secret(
87 &mut self,
88 ctx: ExecuteContext,
89 plan: plan::CreateSecretPlan,
90 ) {
91 let stage = return_if_err!(self.create_secret_validate(ctx.session(), plan).await, ctx);
92 self.sequence_staged(ctx, Span::current(), stage).await;
93 }
94
95 #[instrument]
96 async fn create_secret_validate(
97 &self,
98 session: &Session,
99 plan: plan::CreateSecretPlan,
100 ) -> Result<SecretStage, AdapterError> {
101 let validity = PlanValidity::new(
103 self.catalog().transient_revision(),
104 BTreeSet::new(),
105 None,
106 None,
107 session.role_metadata().clone(),
108 );
109 Ok(SecretStage::CreateEnsure(CreateSecretEnsure {
110 validity,
111 plan,
112 }))
113 }
114
115 #[instrument]
116 async fn create_secret_ensure(
117 &mut self,
118 session: &Session,
119 CreateSecretEnsure { validity, mut plan }: CreateSecretEnsure,
120 ) -> Result<StageResult<Box<SecretStage>>, AdapterError> {
121 let id_ts = self.get_catalog_write_ts().await;
122 let (item_id, global_id) = self.catalog().allocate_user_id(id_ts).await?;
123
124 let secrets_controller = Arc::clone(&self.secrets_controller);
125 let payload = self.extract_secret(session, &mut plan.secret.secret_as)?;
126 let span = Span::current();
127 Ok(StageResult::Handle(mz_ore::task::spawn(
128 || "create secret ensure",
129 async move {
130 secrets_controller.ensure(item_id, &payload).await?;
131 let stage = SecretStage::CreateFinish(CreateSecretFinish {
132 validity,
133 item_id,
134 global_id,
135 plan,
136 });
137 Ok(Box::new(stage))
138 }
139 .instrument(span),
140 )))
141 }
142
143 fn extract_secret(
144 &self,
145 session: &Session,
146 secret_as: &mut MirScalarExpr,
147 ) -> Result<Vec<u8>, AdapterError> {
148 let temp_storage = RowArena::new();
149 let style = ExprPrepOneShot {
150 logical_time: EvalTime::NotAvailable,
151 session,
152 catalog_state: self.catalog().state(),
153 };
154 style.prep_scalar_expr(secret_as)?;
155 let evaled = secret_as.eval(&[], &temp_storage)?;
156
157 if evaled == Datum::Null {
158 coord_bail!("secret value can not be null");
159 }
160
161 let payload = evaled.unwrap_bytes();
162
163 if payload.len() > 1024 * 512 {
168 coord_bail!("secrets can not be bigger than 512KiB")
169 }
170
171 if std::str::from_utf8(payload).is_err() {
181 coord_bail!("secret value must be valid UTF-8");
186 }
187
188 Ok(Vec::from(payload))
189 }
190
191 #[instrument]
192 async fn create_secret_finish(
193 &mut self,
194 session: &Session,
195 CreateSecretFinish {
196 item_id,
197 global_id,
198 plan,
199 validity: _,
200 }: CreateSecretFinish,
201 ) -> Result<StageResult<Box<SecretStage>>, AdapterError> {
202 let CreateSecretPlan {
203 name,
204 secret,
205 if_not_exists,
206 } = plan;
207 let secret = Secret {
208 create_sql: secret.create_sql,
209 global_id,
210 };
211
212 let ops = vec![catalog::Op::CreateItem {
213 id: item_id,
214 name: name.clone(),
215 item: CatalogItem::Secret(secret),
216 owner_id: *session.current_role_id(),
217 }];
218
219 let res = match self.catalog_transact(Some(session), ops).await {
220 Ok(()) => Ok(ExecuteResponse::CreatedSecret),
221 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
222 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
223 })) if if_not_exists => {
224 session.add_notice(AdapterNotice::ObjectAlreadyExists {
225 name: name.item,
226 ty: "secret",
227 });
228 Ok(ExecuteResponse::CreatedSecret)
229 }
230 Err(err) => {
231 if let Err(e) = self.secrets_controller.delete(item_id).await {
232 warn!(
233 "Dropping newly created secrets has encountered an error: {}",
234 e
235 );
236 }
237 Err(err)
238 }
239 };
240 res.map(StageResult::Response)
241 }
242
243 #[instrument]
244 pub(crate) async fn sequence_alter_secret(
245 &mut self,
246 ctx: ExecuteContext,
247 plan: plan::AlterSecretPlan,
248 ) {
249 let validity = PlanValidity::new(
254 self.catalog().transient_revision(),
255 BTreeSet::new(),
256 None,
257 None,
258 ctx.session().role_metadata().clone(),
259 );
260 let stage = SecretStage::Alter(AlterSecret { validity, plan });
261 self.sequence_staged(ctx, Span::current(), stage).await;
262 }
263
264 #[instrument]
265 fn alter_secret(
266 &self,
267 session: &Session,
268 plan: plan::AlterSecretPlan,
269 ) -> Result<StageResult<Box<SecretStage>>, AdapterError> {
270 let plan::AlterSecretPlan { id, mut secret_as } = plan;
271 let secrets_controller = Arc::clone(&self.secrets_controller);
272 let payload = self.extract_secret(session, &mut secret_as)?;
273 let span = Span::current();
274 Ok(StageResult::HandleRetire(mz_ore::task::spawn(
275 || "alter secret ensure",
276 async move {
277 secrets_controller.ensure(id, &payload).await?;
278 Ok(ExecuteResponse::AlteredObject(ObjectType::Secret))
279 }
280 .instrument(span),
281 )))
282 }
283
284 #[instrument]
285 pub(crate) async fn sequence_rotate_keys(&mut self, ctx: ExecuteContext, id: CatalogItemId) {
286 let validity = PlanValidity::new(
293 self.catalog().transient_revision(),
294 BTreeSet::from_iter(std::iter::once(id)),
295 None,
296 None,
297 ctx.session().role_metadata().clone(),
298 );
299 let stage = SecretStage::RotateKeysEnsure(RotateKeysSecretEnsure { validity, id });
300 self.sequence_staged(ctx, Span::current(), stage).await;
301 }
302
303 #[instrument]
304 fn rotate_keys_ensure(
305 &self,
306 RotateKeysSecretEnsure { validity, id }: RotateKeysSecretEnsure,
307 ) -> Result<StageResult<Box<SecretStage>>, AdapterError> {
308 let secrets_controller = Arc::clone(&self.secrets_controller);
309 let entry = self.catalog().get_entry(&id).clone();
310 let span = Span::current();
311 Ok(StageResult::Handle(mz_ore::task::spawn(
312 || "rotate keys ensure",
313 async move {
314 let secret = secrets_controller.reader().read(id).await?;
315 let previous_key_set = SshKeyPairSet::from_bytes(&secret)?;
316 let new_key_set = previous_key_set.rotate()?;
317 secrets_controller
318 .ensure(id, &new_key_set.to_bytes())
319 .await?;
320
321 let mut to_item = entry.item;
322 match &mut to_item {
323 CatalogItem::Connection(c) => {
324 let mut stmt = match mz_sql::parse::parse(&c.create_sql)
325 .expect("invalid create sql persisted to catalog")
326 .into_element()
327 .ast
328 {
329 Statement::CreateConnection(stmt) => stmt,
330 _ => coord_bail!("internal error: persisted SQL for {id} is invalid"),
331 };
332
333 stmt.values.retain(|v| {
334 v.name != ConnectionOptionName::PublicKey1
335 && v.name != ConnectionOptionName::PublicKey2
336 });
337 stmt.values.push(ConnectionOption {
338 name: ConnectionOptionName::PublicKey1,
339 value: Some(WithOptionValue::Value(Value::String(
340 new_key_set.primary().ssh_public_key(),
341 ))),
342 });
343 stmt.values.push(ConnectionOption {
344 name: ConnectionOptionName::PublicKey2,
345 value: Some(WithOptionValue::Value(Value::String(
346 new_key_set.secondary().ssh_public_key(),
347 ))),
348 });
349
350 c.create_sql = stmt.to_ast_string_stable();
351 }
352 _ => coord_bail!(
353 "internal error: rotate keys called on non-connection object {id}"
354 ),
355 }
356
357 let ops = vec![catalog::Op::UpdateItem {
358 id,
359 name: entry.name,
360 to_item,
361 }];
362 let stage = SecretStage::RotateKeysFinish(RotateKeysSecretFinish { validity, ops });
363 Ok(Box::new(stage))
364 }
365 .instrument(span),
366 )))
367 }
368
369 #[instrument]
370 async fn rotate_keys_finish(
371 &mut self,
372 session: &Session,
373 RotateKeysSecretFinish { ops, validity: _ }: RotateKeysSecretFinish,
374 ) -> Result<StageResult<Box<SecretStage>>, AdapterError> {
375 self.catalog_transact(Some(session), ops).await?;
376 Ok(StageResult::Response(ExecuteResponse::AlteredObject(
377 ObjectType::Connection,
378 )))
379 }
380}