Skip to main content

mz_adapter/coord/sequencer/inner/
secret.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeSet;
11use std::sync::Arc;
12
13use mz_catalog::memory::error::ErrorKind;
14use mz_catalog::memory::objects::{CatalogItem, Secret};
15use mz_expr::MirScalarExpr;
16use mz_ore::collections::CollectionExt;
17use mz_ore::instrument;
18use mz_repr::{CatalogItemId, Datum, RowArena};
19use mz_sql::ast::display::AstDisplay;
20use mz_sql::ast::{ConnectionOption, ConnectionOptionName, Statement, Value, WithOptionValue};
21use mz_sql::catalog::{CatalogError, ObjectType};
22use mz_sql::plan::{self, CreateSecretPlan};
23use mz_sql::session::metadata::SessionMetadata;
24use mz_ssh_util::keys::SshKeyPairSet;
25use tracing::{Instrument, Span, warn};
26
27use crate::coord::sequencer::inner::return_if_err;
28use crate::coord::{
29    AlterSecret, Coordinator, CreateSecretEnsure, CreateSecretFinish, Message, PlanValidity,
30    RotateKeysSecretEnsure, RotateKeysSecretFinish, SecretStage, StageResult, Staged,
31};
32use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
33use crate::session::Session;
34use crate::{AdapterError, AdapterNotice, ExecuteContext, ExecuteResponse, catalog};
35
36impl Staged for SecretStage {
37    type Ctx = ExecuteContext;
38
39    fn validity(&mut self) -> &mut crate::coord::PlanValidity {
40        match self {
41            SecretStage::CreateFinish(stage) => &mut stage.validity,
42            SecretStage::RotateKeysFinish(stage) => &mut stage.validity,
43            SecretStage::RotateKeysEnsure(stage) => &mut stage.validity,
44            SecretStage::CreateEnsure(stage) => &mut stage.validity,
45            SecretStage::Alter(stage) => &mut stage.validity,
46        }
47    }
48
49    async fn stage(
50        self,
51        coord: &mut Coordinator,
52        ctx: &mut ExecuteContext,
53    ) -> Result<crate::coord::StageResult<Box<Self>>, AdapterError> {
54        match self {
55            SecretStage::CreateEnsure(stage) => {
56                coord.create_secret_ensure(ctx.session(), stage).await
57            }
58            SecretStage::CreateFinish(stage) => {
59                coord.create_secret_finish(ctx.session(), stage).await
60            }
61            SecretStage::RotateKeysEnsure(stage) => coord.rotate_keys_ensure(stage),
62            SecretStage::RotateKeysFinish(stage) => {
63                coord.rotate_keys_finish(ctx.session(), stage).await
64            }
65            SecretStage::Alter(stage) => coord.alter_secret(ctx.session(), stage.plan),
66        }
67    }
68
69    fn message(self, ctx: ExecuteContext, span: tracing::Span) -> Message {
70        Message::SecretStageReady {
71            ctx,
72            span,
73            stage: self,
74        }
75    }
76
77    fn cancel_enabled(&self) -> bool {
78        // Because secrets operations call out to external services and transact the catalog
79        // separately, disable cancellation.
80        false
81    }
82}
83
84impl Coordinator {
85    #[instrument]
86    pub(crate) async fn sequence_create_secret(
87        &mut self,
88        ctx: ExecuteContext,
89        plan: plan::CreateSecretPlan,
90    ) {
91        let stage = return_if_err!(self.create_secret_validate(ctx.session(), plan).await, ctx);
92        self.sequence_staged(ctx, Span::current(), stage).await;
93    }
94
95    #[instrument]
96    async fn create_secret_validate(
97        &self,
98        session: &Session,
99        plan: plan::CreateSecretPlan,
100    ) -> Result<SecretStage, AdapterError> {
101        // No dependencies.
102        let validity = PlanValidity::new(
103            self.catalog().transient_revision(),
104            BTreeSet::new(),
105            None,
106            None,
107            session.role_metadata().clone(),
108        );
109        Ok(SecretStage::CreateEnsure(CreateSecretEnsure {
110            validity,
111            plan,
112        }))
113    }
114
115    #[instrument]
116    async fn create_secret_ensure(
117        &mut self,
118        session: &Session,
119        CreateSecretEnsure { validity, mut plan }: CreateSecretEnsure,
120    ) -> Result<StageResult<Box<SecretStage>>, AdapterError> {
121        let (item_id, global_id) = self.allocate_user_id().await?;
122
123        let secrets_controller = Arc::clone(&self.secrets_controller);
124        let payload = self.extract_secret(session, &mut plan.secret.secret_as)?;
125        let span = Span::current();
126        Ok(StageResult::Handle(mz_ore::task::spawn(
127            || "create secret ensure",
128            async move {
129                secrets_controller.ensure(item_id, &payload).await?;
130                let stage = SecretStage::CreateFinish(CreateSecretFinish {
131                    validity,
132                    item_id,
133                    global_id,
134                    plan,
135                });
136                Ok(Box::new(stage))
137            }
138            .instrument(span),
139        )))
140    }
141
142    fn extract_secret(
143        &self,
144        session: &Session,
145        secret_as: &mut MirScalarExpr,
146    ) -> Result<Vec<u8>, AdapterError> {
147        let temp_storage = RowArena::new();
148        let style = ExprPrepOneShot {
149            logical_time: EvalTime::NotAvailable,
150            session,
151            catalog_state: self.catalog().state(),
152        };
153        style.prep_scalar_expr(secret_as)?;
154        let evaled = secret_as.eval(&[], &temp_storage)?;
155
156        if evaled == Datum::Null {
157            coord_bail!("secret value can not be null");
158        }
159
160        let payload = evaled.unwrap_bytes();
161
162        // Limit the size of a secret to 512 KiB
163        // This is the largest size of a single secret in Consul/Kubernetes
164        // We are enforcing this limit across all types of Secrets Controllers
165        // Most secrets are expected to be roughly 75B
166        if payload.len() > 1024 * 512 {
167            coord_bail!("secrets can not be bigger than 512KiB")
168        }
169
170        // Enforce that all secrets are valid UTF-8 for now. We expect to lift
171        // this restriction in the future, when we discover a connection type
172        // that requires binary secrets, but for now it is convenient to ensure
173        // here that `SecretsReader::read_string` can never fail due to invalid
174        // UTF-8.
175        //
176        // If you want to remove this line, verify that no caller of
177        // `SecretsReader::read_string` will panic if the secret contains
178        // invalid UTF-8.
179        if std::str::from_utf8(payload).is_err() {
180            // Intentionally produce a vague error message (rather than
181            // including the invalid bytes, for example), to avoid including
182            // secret material in the error message, which might end up in a log
183            // file somewhere.
184            coord_bail!("secret value must be valid UTF-8");
185        }
186
187        Ok(Vec::from(payload))
188    }
189
190    #[instrument]
191    async fn create_secret_finish(
192        &mut self,
193        session: &Session,
194        CreateSecretFinish {
195            item_id,
196            global_id,
197            plan,
198            validity: _,
199        }: CreateSecretFinish,
200    ) -> Result<StageResult<Box<SecretStage>>, AdapterError> {
201        let CreateSecretPlan {
202            name,
203            secret,
204            if_not_exists,
205        } = plan;
206        let secret = Secret {
207            create_sql: secret.create_sql,
208            global_id,
209        };
210
211        let ops = vec![catalog::Op::CreateItem {
212            id: item_id,
213            name: name.clone(),
214            item: CatalogItem::Secret(secret),
215            owner_id: *session.current_role_id(),
216        }];
217
218        let res = match self.catalog_transact(Some(session), ops).await {
219            Ok(()) => Ok(ExecuteResponse::CreatedSecret),
220            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
221                kind: ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
222            })) if if_not_exists => {
223                // Clean up the secret we already persisted, since the catalog
224                // item was not created.
225                if let Err(e) = self.secrets_controller.delete(item_id).await {
226                    warn!(
227                        "Dropping newly created secrets has encountered an error: {}",
228                        e
229                    );
230                } else {
231                    self.caching_secrets_reader.invalidate(item_id);
232                }
233                session.add_notice(AdapterNotice::ObjectAlreadyExists {
234                    name: name.item,
235                    ty: "secret",
236                });
237                Ok(ExecuteResponse::CreatedSecret)
238            }
239            Err(err) => {
240                if let Err(e) = self.secrets_controller.delete(item_id).await {
241                    warn!(
242                        "Dropping newly created secrets has encountered an error: {}",
243                        e
244                    );
245                } else {
246                    self.caching_secrets_reader.invalidate(item_id);
247                }
248                Err(err)
249            }
250        };
251        res.map(StageResult::Response)
252    }
253
254    #[instrument]
255    pub(crate) async fn sequence_alter_secret(
256        &mut self,
257        ctx: ExecuteContext,
258        plan: plan::AlterSecretPlan,
259    ) {
260        // Notably this does not include `plan.id` in `dependency_ids` because `alter_secret()` just
261        // calls `ensure()` and returns a success result to the client. If there's a concurrent
262        // delete of the secret, the persisted secret is in an unknown state (but will be cleaned up
263        // if needed at next envd boot), but we will still return success.
264        let validity = PlanValidity::new(
265            self.catalog().transient_revision(),
266            BTreeSet::new(),
267            None,
268            None,
269            ctx.session().role_metadata().clone(),
270        );
271        let stage = SecretStage::Alter(AlterSecret { validity, plan });
272        self.sequence_staged(ctx, Span::current(), stage).await;
273    }
274
275    #[instrument]
276    fn alter_secret(
277        &self,
278        session: &Session,
279        plan: plan::AlterSecretPlan,
280    ) -> Result<StageResult<Box<SecretStage>>, AdapterError> {
281        let plan::AlterSecretPlan { id, mut secret_as } = plan;
282        let caching_secrets_reader = self.caching_secrets_reader.clone();
283        let secrets_controller = Arc::clone(&self.secrets_controller);
284        let payload = self.extract_secret(session, &mut secret_as)?;
285        let span = Span::current();
286        Ok(StageResult::HandleRetire(mz_ore::task::spawn(
287            || "alter secret ensure",
288            async move {
289                secrets_controller.ensure(id, &payload).await?;
290                caching_secrets_reader.invalidate(id);
291                Ok(ExecuteResponse::AlteredObject(ObjectType::Secret))
292            }
293            .instrument(span),
294        )))
295    }
296
297    #[instrument]
298    pub(crate) async fn sequence_rotate_keys(&mut self, ctx: ExecuteContext, id: CatalogItemId) {
299        // If the secret is deleted from the catalog during
300        // `rotate_keys_ensure()`, this will prevent `rotate_keys_finish()` from
301        // issuing the catalog update for the change. The state of the persisted
302        // secret is unknown, and if the rotate ensure'd after the delete (i.e.,
303        // the secret is persisted to the secret store but not the catalog), the
304        // secret will be cleaned up during next envd boot.
305        let validity = PlanValidity::new(
306            self.catalog().transient_revision(),
307            BTreeSet::from_iter(std::iter::once(id)),
308            None,
309            None,
310            ctx.session().role_metadata().clone(),
311        );
312        let stage = SecretStage::RotateKeysEnsure(RotateKeysSecretEnsure { validity, id });
313        self.sequence_staged(ctx, Span::current(), stage).await;
314    }
315
316    #[instrument]
317    fn rotate_keys_ensure(
318        &self,
319        RotateKeysSecretEnsure { validity, id }: RotateKeysSecretEnsure,
320    ) -> Result<StageResult<Box<SecretStage>>, AdapterError> {
321        let caching_secrets_reader = self.caching_secrets_reader.clone();
322        let secrets_controller = Arc::clone(&self.secrets_controller);
323        let entry = self.catalog().get_entry(&id).clone();
324        let span = Span::current();
325        Ok(StageResult::Handle(mz_ore::task::spawn(
326            || "rotate keys ensure",
327            async move {
328                let secret = secrets_controller.reader().read(id).await?;
329                let previous_key_set = SshKeyPairSet::from_bytes(&secret)?;
330                let new_key_set = previous_key_set.rotate()?;
331                secrets_controller
332                    .ensure(id, &new_key_set.to_bytes())
333                    .await?;
334                caching_secrets_reader.invalidate(id);
335
336                let mut to_item = entry.item;
337                match &mut to_item {
338                    CatalogItem::Connection(c) => {
339                        let mut stmt = match mz_sql::parse::parse(&c.create_sql)
340                            .expect("invalid create sql persisted to catalog")
341                            .into_element()
342                            .ast
343                        {
344                            Statement::CreateConnection(stmt) => stmt,
345                            _ => coord_bail!("internal error: persisted SQL for {id} is invalid"),
346                        };
347
348                        stmt.values.retain(|v| {
349                            v.name != ConnectionOptionName::PublicKey1
350                                && v.name != ConnectionOptionName::PublicKey2
351                        });
352                        stmt.values.push(ConnectionOption {
353                            name: ConnectionOptionName::PublicKey1,
354                            value: Some(WithOptionValue::Value(Value::String(
355                                new_key_set.primary().ssh_public_key(),
356                            ))),
357                        });
358                        stmt.values.push(ConnectionOption {
359                            name: ConnectionOptionName::PublicKey2,
360                            value: Some(WithOptionValue::Value(Value::String(
361                                new_key_set.secondary().ssh_public_key(),
362                            ))),
363                        });
364
365                        c.create_sql = stmt.to_ast_string_stable();
366                    }
367                    _ => coord_bail!(
368                        "internal error: rotate keys called on non-connection object {id}"
369                    ),
370                }
371
372                let ops = vec![catalog::Op::UpdateItem {
373                    id,
374                    name: entry.name,
375                    to_item,
376                }];
377                let stage = SecretStage::RotateKeysFinish(RotateKeysSecretFinish { validity, ops });
378                Ok(Box::new(stage))
379            }
380            .instrument(span),
381        )))
382    }
383
384    #[instrument]
385    async fn rotate_keys_finish(
386        &mut self,
387        session: &Session,
388        RotateKeysSecretFinish { ops, validity: _ }: RotateKeysSecretFinish,
389    ) -> Result<StageResult<Box<SecretStage>>, AdapterError> {
390        self.catalog_transact(Some(session), ops).await?;
391        Ok(StageResult::Response(ExecuteResponse::AlteredObject(
392            ObjectType::Connection,
393        )))
394    }
395}