1use std::collections::BTreeSet;
11use std::sync::Arc;
12
13use mz_catalog::memory::error::ErrorKind;
14use mz_catalog::memory::objects::{CatalogItem, Secret};
15use mz_expr::MirScalarExpr;
16use mz_ore::collections::CollectionExt;
17use mz_ore::instrument;
18use mz_repr::{CatalogItemId, Datum, RowArena};
19use mz_sql::ast::display::AstDisplay;
20use mz_sql::ast::{ConnectionOption, ConnectionOptionName, Statement, Value, WithOptionValue};
21use mz_sql::catalog::{CatalogError, ObjectType};
22use mz_sql::plan::{self, CreateSecretPlan};
23use mz_sql::session::metadata::SessionMetadata;
24use mz_ssh_util::keys::SshKeyPairSet;
25use tracing::{Instrument, Span, warn};
26
27use crate::coord::sequencer::inner::return_if_err;
28use crate::coord::{
29 AlterSecret, Coordinator, CreateSecretEnsure, CreateSecretFinish, Message, PlanValidity,
30 RotateKeysSecretEnsure, RotateKeysSecretFinish, SecretStage, StageResult, Staged,
31};
32use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
33use crate::session::Session;
34use crate::{AdapterError, AdapterNotice, ExecuteContext, ExecuteResponse, catalog};
35
36impl Staged for SecretStage {
37 type Ctx = ExecuteContext;
38
39 fn validity(&mut self) -> &mut crate::coord::PlanValidity {
40 match self {
41 SecretStage::CreateFinish(stage) => &mut stage.validity,
42 SecretStage::RotateKeysFinish(stage) => &mut stage.validity,
43 SecretStage::RotateKeysEnsure(stage) => &mut stage.validity,
44 SecretStage::CreateEnsure(stage) => &mut stage.validity,
45 SecretStage::Alter(stage) => &mut stage.validity,
46 }
47 }
48
49 async fn stage(
50 self,
51 coord: &mut Coordinator,
52 ctx: &mut ExecuteContext,
53 ) -> Result<crate::coord::StageResult<Box<Self>>, AdapterError> {
54 match self {
55 SecretStage::CreateEnsure(stage) => {
56 coord.create_secret_ensure(ctx.session(), stage).await
57 }
58 SecretStage::CreateFinish(stage) => {
59 coord.create_secret_finish(ctx.session(), stage).await
60 }
61 SecretStage::RotateKeysEnsure(stage) => coord.rotate_keys_ensure(stage),
62 SecretStage::RotateKeysFinish(stage) => {
63 coord.rotate_keys_finish(ctx.session(), stage).await
64 }
65 SecretStage::Alter(stage) => coord.alter_secret(ctx.session(), stage.plan),
66 }
67 }
68
69 fn message(self, ctx: ExecuteContext, span: tracing::Span) -> Message {
70 Message::SecretStageReady {
71 ctx,
72 span,
73 stage: self,
74 }
75 }
76
77 fn cancel_enabled(&self) -> bool {
78 false
81 }
82}
83
84impl Coordinator {
85 #[instrument]
86 pub(crate) async fn sequence_create_secret(
87 &mut self,
88 ctx: ExecuteContext,
89 plan: plan::CreateSecretPlan,
90 ) {
91 let stage = return_if_err!(self.create_secret_validate(ctx.session(), plan).await, ctx);
92 self.sequence_staged(ctx, Span::current(), stage).await;
93 }
94
95 #[instrument]
96 async fn create_secret_validate(
97 &self,
98 session: &Session,
99 plan: plan::CreateSecretPlan,
100 ) -> Result<SecretStage, AdapterError> {
101 let validity = PlanValidity::new(
103 self.catalog().transient_revision(),
104 BTreeSet::new(),
105 None,
106 None,
107 session.role_metadata().clone(),
108 );
109 Ok(SecretStage::CreateEnsure(CreateSecretEnsure {
110 validity,
111 plan,
112 }))
113 }
114
115 #[instrument]
116 async fn create_secret_ensure(
117 &mut self,
118 session: &Session,
119 CreateSecretEnsure { validity, mut plan }: CreateSecretEnsure,
120 ) -> Result<StageResult<Box<SecretStage>>, AdapterError> {
121 let (item_id, global_id) = self.allocate_user_id().await?;
122
123 let secrets_controller = Arc::clone(&self.secrets_controller);
124 let payload = self.extract_secret(session, &mut plan.secret.secret_as)?;
125 let span = Span::current();
126 Ok(StageResult::Handle(mz_ore::task::spawn(
127 || "create secret ensure",
128 async move {
129 secrets_controller.ensure(item_id, &payload).await?;
130 let stage = SecretStage::CreateFinish(CreateSecretFinish {
131 validity,
132 item_id,
133 global_id,
134 plan,
135 });
136 Ok(Box::new(stage))
137 }
138 .instrument(span),
139 )))
140 }
141
142 fn extract_secret(
143 &self,
144 session: &Session,
145 secret_as: &mut MirScalarExpr,
146 ) -> Result<Vec<u8>, AdapterError> {
147 let temp_storage = RowArena::new();
148 let style = ExprPrepOneShot {
149 logical_time: EvalTime::NotAvailable,
150 session,
151 catalog_state: self.catalog().state(),
152 };
153 style.prep_scalar_expr(secret_as)?;
154 let evaled = secret_as.eval(&[], &temp_storage)?;
155
156 if evaled == Datum::Null {
157 coord_bail!("secret value can not be null");
158 }
159
160 let payload = evaled.unwrap_bytes();
161
162 if payload.len() > 1024 * 512 {
167 coord_bail!("secrets can not be bigger than 512KiB")
168 }
169
170 if std::str::from_utf8(payload).is_err() {
180 coord_bail!("secret value must be valid UTF-8");
185 }
186
187 Ok(Vec::from(payload))
188 }
189
190 #[instrument]
191 async fn create_secret_finish(
192 &mut self,
193 session: &Session,
194 CreateSecretFinish {
195 item_id,
196 global_id,
197 plan,
198 validity: _,
199 }: CreateSecretFinish,
200 ) -> Result<StageResult<Box<SecretStage>>, AdapterError> {
201 let CreateSecretPlan {
202 name,
203 secret,
204 if_not_exists,
205 } = plan;
206 let secret = Secret {
207 create_sql: secret.create_sql,
208 global_id,
209 };
210
211 let ops = vec![catalog::Op::CreateItem {
212 id: item_id,
213 name: name.clone(),
214 item: CatalogItem::Secret(secret),
215 owner_id: *session.current_role_id(),
216 }];
217
218 let res = match self.catalog_transact(Some(session), ops).await {
219 Ok(()) => Ok(ExecuteResponse::CreatedSecret),
220 Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
221 kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
222 })) if if_not_exists => {
223 session.add_notice(AdapterNotice::ObjectAlreadyExists {
224 name: name.item,
225 ty: "secret",
226 });
227 Ok(ExecuteResponse::CreatedSecret)
228 }
229 Err(err) => {
230 if let Err(e) = self.secrets_controller.delete(item_id).await {
231 warn!(
232 "Dropping newly created secrets has encountered an error: {}",
233 e
234 );
235 }
236 Err(err)
237 }
238 };
239 res.map(StageResult::Response)
240 }
241
242 #[instrument]
243 pub(crate) async fn sequence_alter_secret(
244 &mut self,
245 ctx: ExecuteContext,
246 plan: plan::AlterSecretPlan,
247 ) {
248 let validity = PlanValidity::new(
253 self.catalog().transient_revision(),
254 BTreeSet::new(),
255 None,
256 None,
257 ctx.session().role_metadata().clone(),
258 );
259 let stage = SecretStage::Alter(AlterSecret { validity, plan });
260 self.sequence_staged(ctx, Span::current(), stage).await;
261 }
262
263 #[instrument]
264 fn alter_secret(
265 &self,
266 session: &Session,
267 plan: plan::AlterSecretPlan,
268 ) -> Result<StageResult<Box<SecretStage>>, AdapterError> {
269 let plan::AlterSecretPlan { id, mut secret_as } = plan;
270 let secrets_controller = Arc::clone(&self.secrets_controller);
271 let payload = self.extract_secret(session, &mut secret_as)?;
272 let span = Span::current();
273 Ok(StageResult::HandleRetire(mz_ore::task::spawn(
274 || "alter secret ensure",
275 async move {
276 secrets_controller.ensure(id, &payload).await?;
277 Ok(ExecuteResponse::AlteredObject(ObjectType::Secret))
278 }
279 .instrument(span),
280 )))
281 }
282
283 #[instrument]
284 pub(crate) async fn sequence_rotate_keys(&mut self, ctx: ExecuteContext, id: CatalogItemId) {
285 let validity = PlanValidity::new(
292 self.catalog().transient_revision(),
293 BTreeSet::from_iter(std::iter::once(id)),
294 None,
295 None,
296 ctx.session().role_metadata().clone(),
297 );
298 let stage = SecretStage::RotateKeysEnsure(RotateKeysSecretEnsure { validity, id });
299 self.sequence_staged(ctx, Span::current(), stage).await;
300 }
301
302 #[instrument]
303 fn rotate_keys_ensure(
304 &self,
305 RotateKeysSecretEnsure { validity, id }: RotateKeysSecretEnsure,
306 ) -> Result<StageResult<Box<SecretStage>>, AdapterError> {
307 let secrets_controller = Arc::clone(&self.secrets_controller);
308 let entry = self.catalog().get_entry(&id).clone();
309 let span = Span::current();
310 Ok(StageResult::Handle(mz_ore::task::spawn(
311 || "rotate keys ensure",
312 async move {
313 let secret = secrets_controller.reader().read(id).await?;
314 let previous_key_set = SshKeyPairSet::from_bytes(&secret)?;
315 let new_key_set = previous_key_set.rotate()?;
316 secrets_controller
317 .ensure(id, &new_key_set.to_bytes())
318 .await?;
319
320 let mut to_item = entry.item;
321 match &mut to_item {
322 CatalogItem::Connection(c) => {
323 let mut stmt = match mz_sql::parse::parse(&c.create_sql)
324 .expect("invalid create sql persisted to catalog")
325 .into_element()
326 .ast
327 {
328 Statement::CreateConnection(stmt) => stmt,
329 _ => coord_bail!("internal error: persisted SQL for {id} is invalid"),
330 };
331
332 stmt.values.retain(|v| {
333 v.name != ConnectionOptionName::PublicKey1
334 && v.name != ConnectionOptionName::PublicKey2
335 });
336 stmt.values.push(ConnectionOption {
337 name: ConnectionOptionName::PublicKey1,
338 value: Some(WithOptionValue::Value(Value::String(
339 new_key_set.primary().ssh_public_key(),
340 ))),
341 });
342 stmt.values.push(ConnectionOption {
343 name: ConnectionOptionName::PublicKey2,
344 value: Some(WithOptionValue::Value(Value::String(
345 new_key_set.secondary().ssh_public_key(),
346 ))),
347 });
348
349 c.create_sql = stmt.to_ast_string_stable();
350 }
351 _ => coord_bail!(
352 "internal error: rotate keys called on non-connection object {id}"
353 ),
354 }
355
356 let ops = vec![catalog::Op::UpdateItem {
357 id,
358 name: entry.name,
359 to_item,
360 }];
361 let stage = SecretStage::RotateKeysFinish(RotateKeysSecretFinish { validity, ops });
362 Ok(Box::new(stage))
363 }
364 .instrument(span),
365 )))
366 }
367
368 #[instrument]
369 async fn rotate_keys_finish(
370 &mut self,
371 session: &Session,
372 RotateKeysSecretFinish { ops, validity: _ }: RotateKeysSecretFinish,
373 ) -> Result<StageResult<Box<SecretStage>>, AdapterError> {
374 self.catalog_transact(Some(session), ops).await?;
375 Ok(StageResult::Response(ExecuteResponse::AlteredObject(
376 ObjectType::Connection,
377 )))
378 }
379}