mz_adapter/coord/sequencer/inner/
secret.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeSet;
11use std::sync::Arc;
12
13use mz_catalog::memory::objects::{CatalogItem, Secret};
14use mz_expr::MirScalarExpr;
15use mz_ore::collections::CollectionExt;
16use mz_ore::instrument;
17use mz_repr::{CatalogItemId, Datum, RowArena};
18use mz_sql::ast::display::AstDisplay;
19use mz_sql::ast::{ConnectionOption, ConnectionOptionName, Statement, Value, WithOptionValue};
20use mz_sql::catalog::{CatalogError, ObjectType};
21use mz_sql::plan::{self, CreateSecretPlan};
22use mz_sql::session::metadata::SessionMetadata;
23use mz_ssh_util::keys::SshKeyPairSet;
24use tracing::{Instrument, Span, warn};
25
26use crate::coord::sequencer::inner::return_if_err;
27use crate::coord::{
28    AlterSecret, Coordinator, CreateSecretEnsure, CreateSecretFinish, Message, PlanValidity,
29    RotateKeysSecretEnsure, RotateKeysSecretFinish, SecretStage, StageResult, Staged,
30};
31use crate::optimize::dataflows::{EvalTime, ExprPrepStyle, prep_scalar_expr};
32use crate::session::Session;
33use crate::{AdapterError, AdapterNotice, ExecuteContext, ExecuteResponse, catalog};
34
35impl Staged for SecretStage {
36    type Ctx = ExecuteContext;
37
38    fn validity(&mut self) -> &mut crate::coord::PlanValidity {
39        match self {
40            SecretStage::CreateFinish(stage) => &mut stage.validity,
41            SecretStage::RotateKeysFinish(stage) => &mut stage.validity,
42            SecretStage::RotateKeysEnsure(stage) => &mut stage.validity,
43            SecretStage::CreateEnsure(stage) => &mut stage.validity,
44            SecretStage::Alter(stage) => &mut stage.validity,
45        }
46    }
47
48    async fn stage(
49        self,
50        coord: &mut Coordinator,
51        ctx: &mut ExecuteContext,
52    ) -> Result<crate::coord::StageResult<Box<Self>>, AdapterError> {
53        match self {
54            SecretStage::CreateEnsure(stage) => {
55                coord.create_secret_ensure(ctx.session(), stage).await
56            }
57            SecretStage::CreateFinish(stage) => {
58                coord.create_secret_finish(ctx.session(), stage).await
59            }
60            SecretStage::RotateKeysEnsure(stage) => coord.rotate_keys_ensure(stage),
61            SecretStage::RotateKeysFinish(stage) => {
62                coord.rotate_keys_finish(ctx.session(), stage).await
63            }
64            SecretStage::Alter(stage) => coord.alter_secret(ctx.session(), stage.plan),
65        }
66    }
67
68    fn message(self, ctx: ExecuteContext, span: tracing::Span) -> Message {
69        Message::SecretStageReady {
70            ctx,
71            span,
72            stage: self,
73        }
74    }
75
76    fn cancel_enabled(&self) -> bool {
77        // Because secrets operations call out to external services and transact the catalog
78        // separately, disable cancellation.
79        false
80    }
81}
82
83impl Coordinator {
84    #[instrument]
85    pub(crate) async fn sequence_create_secret(
86        &mut self,
87        ctx: ExecuteContext,
88        plan: plan::CreateSecretPlan,
89    ) {
90        let stage = return_if_err!(self.create_secret_validate(ctx.session(), plan).await, ctx);
91        self.sequence_staged(ctx, Span::current(), stage).await;
92    }
93
94    #[instrument]
95    async fn create_secret_validate(
96        &mut self,
97        session: &Session,
98        plan: plan::CreateSecretPlan,
99    ) -> Result<SecretStage, AdapterError> {
100        // No dependencies.
101        let validity = PlanValidity::new(
102            self.catalog().transient_revision(),
103            BTreeSet::new(),
104            None,
105            None,
106            session.role_metadata().clone(),
107        );
108        Ok(SecretStage::CreateEnsure(CreateSecretEnsure {
109            validity,
110            plan,
111        }))
112    }
113
114    #[instrument]
115    async fn create_secret_ensure(
116        &mut self,
117        session: &Session,
118        CreateSecretEnsure { validity, mut plan }: CreateSecretEnsure,
119    ) -> Result<StageResult<Box<SecretStage>>, AdapterError> {
120        let id_ts = self.get_catalog_write_ts().await;
121        let (item_id, global_id) = self.catalog_mut().allocate_user_id(id_ts).await?;
122
123        let secrets_controller = Arc::clone(&self.secrets_controller);
124        let payload = self.extract_secret(session, &mut plan.secret.secret_as)?;
125        let span = Span::current();
126        Ok(StageResult::Handle(mz_ore::task::spawn(
127            || "create secret ensure",
128            async move {
129                secrets_controller.ensure(item_id, &payload).await?;
130                let stage = SecretStage::CreateFinish(CreateSecretFinish {
131                    validity,
132                    item_id,
133                    global_id,
134                    plan,
135                });
136                Ok(Box::new(stage))
137            }
138            .instrument(span),
139        )))
140    }
141
142    fn extract_secret(
143        &self,
144        session: &Session,
145        secret_as: &mut MirScalarExpr,
146    ) -> Result<Vec<u8>, AdapterError> {
147        let temp_storage = RowArena::new();
148        prep_scalar_expr(
149            secret_as,
150            ExprPrepStyle::OneShot {
151                logical_time: EvalTime::NotAvailable,
152                session,
153                catalog_state: self.catalog().state(),
154            },
155        )?;
156        let evaled = secret_as.eval(&[], &temp_storage)?;
157
158        if evaled == Datum::Null {
159            coord_bail!("secret value can not be null");
160        }
161
162        let payload = evaled.unwrap_bytes();
163
164        // Limit the size of a secret to 512 KiB
165        // This is the largest size of a single secret in Consul/Kubernetes
166        // We are enforcing this limit across all types of Secrets Controllers
167        // Most secrets are expected to be roughly 75B
168        if payload.len() > 1024 * 512 {
169            coord_bail!("secrets can not be bigger than 512KiB")
170        }
171
172        // Enforce that all secrets are valid UTF-8 for now. We expect to lift
173        // this restriction in the future, when we discover a connection type
174        // that requires binary secrets, but for now it is convenient to ensure
175        // here that `SecretsReader::read_string` can never fail due to invalid
176        // UTF-8.
177        //
178        // If you want to remove this line, verify that no caller of
179        // `SecretsReader::read_string` will panic if the secret contains
180        // invalid UTF-8.
181        if std::str::from_utf8(payload).is_err() {
182            // Intentionally produce a vague error message (rather than
183            // including the invalid bytes, for example), to avoid including
184            // secret material in the error message, which might end up in a log
185            // file somewhere.
186            coord_bail!("secret value must be valid UTF-8");
187        }
188
189        Ok(Vec::from(payload))
190    }
191
192    #[instrument]
193    async fn create_secret_finish(
194        &mut self,
195        session: &Session,
196        CreateSecretFinish {
197            item_id,
198            global_id,
199            plan,
200            validity: _,
201        }: CreateSecretFinish,
202    ) -> Result<StageResult<Box<SecretStage>>, AdapterError> {
203        let CreateSecretPlan {
204            name,
205            secret,
206            if_not_exists,
207        } = plan;
208        let secret = Secret {
209            create_sql: secret.create_sql,
210            global_id,
211        };
212
213        let ops = vec![catalog::Op::CreateItem {
214            id: item_id,
215            name: name.clone(),
216            item: CatalogItem::Secret(secret),
217            owner_id: *session.current_role_id(),
218        }];
219
220        let res = match self.catalog_transact(Some(session), ops).await {
221            Ok(()) => Ok(ExecuteResponse::CreatedSecret),
222            Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
223                kind:
224                    mz_catalog::memory::error::ErrorKind::Sql(CatalogError::ItemAlreadyExists(_, _)),
225            })) if if_not_exists => {
226                session.add_notice(AdapterNotice::ObjectAlreadyExists {
227                    name: name.item,
228                    ty: "secret",
229                });
230                Ok(ExecuteResponse::CreatedSecret)
231            }
232            Err(err) => {
233                if let Err(e) = self.secrets_controller.delete(item_id).await {
234                    warn!(
235                        "Dropping newly created secrets has encountered an error: {}",
236                        e
237                    );
238                }
239                Err(err)
240            }
241        };
242        res.map(StageResult::Response)
243    }
244
245    #[instrument]
246    pub(crate) async fn sequence_alter_secret(
247        &mut self,
248        ctx: ExecuteContext,
249        plan: plan::AlterSecretPlan,
250    ) {
251        // Notably this does not include `plan.id` in `dependency_ids` because `alter_secret()` just
252        // calls `ensure()` and returns a success result to the client. If there's a concurrent
253        // delete of the secret, the persisted secret is in an unknown state (but will be cleaned up
254        // if needed at next envd boot), but we will still return success.
255        let validity = PlanValidity::new(
256            self.catalog().transient_revision(),
257            BTreeSet::new(),
258            None,
259            None,
260            ctx.session().role_metadata().clone(),
261        );
262        let stage = SecretStage::Alter(AlterSecret { validity, plan });
263        self.sequence_staged(ctx, Span::current(), stage).await;
264    }
265
266    #[instrument]
267    fn alter_secret(
268        &mut self,
269        session: &Session,
270        plan: plan::AlterSecretPlan,
271    ) -> Result<StageResult<Box<SecretStage>>, AdapterError> {
272        let plan::AlterSecretPlan { id, mut secret_as } = plan;
273        let secrets_controller = Arc::clone(&self.secrets_controller);
274        let payload = self.extract_secret(session, &mut secret_as)?;
275        let span = Span::current();
276        Ok(StageResult::HandleRetire(mz_ore::task::spawn(
277            || "alter secret ensure",
278            async move {
279                secrets_controller.ensure(id, &payload).await?;
280                Ok(ExecuteResponse::AlteredObject(ObjectType::Secret))
281            }
282            .instrument(span),
283        )))
284    }
285
286    #[instrument]
287    pub(crate) async fn sequence_rotate_keys(&mut self, ctx: ExecuteContext, id: CatalogItemId) {
288        // If the secret is deleted from the catalog during
289        // `rotate_keys_ensure()`, this will prevent `rotate_keys_finish()` from
290        // issuing the catalog update for the change. The state of the persisted
291        // secret is unknown, and if the rotate ensure'd after the delete (i.e.,
292        // the secret is persisted to the secret store but not the catalog), the
293        // secret will be cleaned up during next envd boot.
294        let validity = PlanValidity::new(
295            self.catalog().transient_revision(),
296            BTreeSet::from_iter(std::iter::once(id)),
297            None,
298            None,
299            ctx.session().role_metadata().clone(),
300        );
301        let stage = SecretStage::RotateKeysEnsure(RotateKeysSecretEnsure { validity, id });
302        self.sequence_staged(ctx, Span::current(), stage).await;
303    }
304
305    #[instrument]
306    fn rotate_keys_ensure(
307        &mut self,
308        RotateKeysSecretEnsure { validity, id }: RotateKeysSecretEnsure,
309    ) -> Result<StageResult<Box<SecretStage>>, AdapterError> {
310        let secrets_controller = Arc::clone(&self.secrets_controller);
311        let entry = self.catalog().get_entry(&id).clone();
312        let span = Span::current();
313        Ok(StageResult::Handle(mz_ore::task::spawn(
314            || "rotate keys ensure",
315            async move {
316                let secret = secrets_controller.reader().read(id).await?;
317                let previous_key_set = SshKeyPairSet::from_bytes(&secret)?;
318                let new_key_set = previous_key_set.rotate()?;
319                secrets_controller
320                    .ensure(id, &new_key_set.to_bytes())
321                    .await?;
322
323                let mut to_item = entry.item;
324                match &mut to_item {
325                    CatalogItem::Connection(c) => {
326                        let mut stmt = match mz_sql::parse::parse(&c.create_sql)
327                            .expect("invalid create sql persisted to catalog")
328                            .into_element()
329                            .ast
330                        {
331                            Statement::CreateConnection(stmt) => stmt,
332                            _ => coord_bail!("internal error: persisted SQL for {id} is invalid"),
333                        };
334
335                        stmt.values.retain(|v| {
336                            v.name != ConnectionOptionName::PublicKey1
337                                && v.name != ConnectionOptionName::PublicKey2
338                        });
339                        stmt.values.push(ConnectionOption {
340                            name: ConnectionOptionName::PublicKey1,
341                            value: Some(WithOptionValue::Value(Value::String(
342                                new_key_set.primary().ssh_public_key(),
343                            ))),
344                        });
345                        stmt.values.push(ConnectionOption {
346                            name: ConnectionOptionName::PublicKey2,
347                            value: Some(WithOptionValue::Value(Value::String(
348                                new_key_set.secondary().ssh_public_key(),
349                            ))),
350                        });
351
352                        c.create_sql = stmt.to_ast_string_stable();
353                    }
354                    _ => coord_bail!(
355                        "internal error: rotate keys called on non-connection object {id}"
356                    ),
357                }
358
359                let ops = vec![catalog::Op::UpdateItem {
360                    id,
361                    name: entry.name,
362                    to_item,
363                }];
364                let stage = SecretStage::RotateKeysFinish(RotateKeysSecretFinish { validity, ops });
365                Ok(Box::new(stage))
366            }
367            .instrument(span),
368        )))
369    }
370
371    #[instrument]
372    async fn rotate_keys_finish(
373        &mut self,
374        session: &Session,
375        RotateKeysSecretFinish { ops, validity: _ }: RotateKeysSecretFinish,
376    ) -> Result<StageResult<Box<SecretStage>>, AdapterError> {
377        self.catalog_transact(Some(session), ops).await?;
378        Ok(StageResult::Response(ExecuteResponse::AlteredObject(
379            ObjectType::Connection,
380        )))
381    }
382}