mz_orchestrator_kubernetes/
secrets.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Management of user secrets via Kubernetes.
11
12use std::iter;
13use std::sync::Arc;
14
15use anyhow::anyhow;
16use async_trait::async_trait;
17use k8s_openapi::ByteString;
18use k8s_openapi::api::core::v1::Secret;
19use kube::Api;
20use kube::api::{DeleteParams, ListParams, ObjectMeta, Patch, PatchParams};
21use mz_repr::CatalogItemId;
22use mz_secrets::{SecretsController, SecretsReader};
23
24use crate::{FIELD_MANAGER, KubernetesOrchestrator, util};
25
26#[async_trait]
27impl SecretsController for KubernetesOrchestrator {
28    async fn ensure(&self, id: CatalogItemId, contents: &[u8]) -> Result<(), anyhow::Error> {
29        let name = secret_name(id, &self.config.name_prefix());
30        let data = iter::once(("contents".into(), ByteString(contents.into())));
31        let secret = Secret {
32            metadata: ObjectMeta {
33                name: Some(name.clone()),
34                ..Default::default()
35            },
36            data: Some(data.collect()),
37            ..Default::default()
38        };
39        self.secret_api
40            .patch(
41                &name,
42                &PatchParams::apply(FIELD_MANAGER).force(),
43                &Patch::Apply(secret),
44            )
45            .await?;
46        Ok(())
47    }
48
49    async fn delete(&self, id: CatalogItemId) -> Result<(), anyhow::Error> {
50        // We intentionally don't wait for the secret to be deleted; our
51        // obligation is only to initiate the deletion. Garbage collecting
52        // secrets that fail to delete will be the responsibility of a future
53        // garbage collection task.
54        match self
55            .secret_api
56            .delete(
57                &secret_name(id, &self.config.name_prefix()),
58                &DeleteParams::default(),
59            )
60            .await
61        {
62            Ok(_) => Ok(()),
63            // Secret is already deleted.
64            Err(kube::Error::Api(e)) if e.code == 404 => Ok(()),
65            Err(e) => return Err(e.into()),
66        }
67    }
68
69    async fn list(&self) -> Result<Vec<CatalogItemId>, anyhow::Error> {
70        let objs = self.secret_api.list(&ListParams::default()).await?;
71        let mut ids = Vec::new();
72        for item in objs.items {
73            // Ignore unnamed objects.
74            let Some(name) = item.metadata.name else {
75                continue;
76            };
77            // Ignore invalidly named objects.
78            let Some(id) = from_secret_name(&name, &self.config.name_prefix()) else {
79                continue;
80            };
81            ids.push(id);
82        }
83        Ok(ids)
84    }
85
86    fn reader(&self) -> Arc<dyn SecretsReader> {
87        Arc::new(KubernetesSecretsReader {
88            secret_api: self.secret_api.clone(),
89            name_prefix: self.config.name_prefix(),
90        })
91    }
92}
93
94/// Reads secrets managed by a [`KubernetesOrchestrator`].
95#[derive(Debug)]
96pub struct KubernetesSecretsReader {
97    secret_api: Api<Secret>,
98    name_prefix: String,
99}
100
101impl KubernetesSecretsReader {
102    /// Constructs a new Kubernetes secrets reader.
103    ///
104    /// The `context` parameter works like
105    /// [`KubernetesOrchestratorConfig::context`](crate::KubernetesOrchestratorConfig::context).
106    pub async fn new(
107        context: String,
108        name_prefix: Option<String>,
109    ) -> Result<KubernetesSecretsReader, anyhow::Error> {
110        let (client, _) = util::create_client(context).await?;
111        let secret_api: Api<Secret> = Api::default_namespaced(client);
112        let name_prefix = name_prefix.clone().unwrap_or_default();
113        Ok(KubernetesSecretsReader {
114            secret_api,
115            name_prefix,
116        })
117    }
118}
119
120#[async_trait]
121impl SecretsReader for KubernetesSecretsReader {
122    async fn read(&self, id: CatalogItemId) -> Result<Vec<u8>, anyhow::Error> {
123        let secret = self
124            .secret_api
125            .get(&secret_name(id, &self.name_prefix))
126            .await?;
127        let mut data = secret
128            .data
129            .ok_or_else(|| anyhow!("internal error: secret missing data field"))?;
130        let contents = data
131            .remove("contents")
132            .ok_or_else(|| anyhow!("internal error: secret missing contents field"))?;
133        Ok(contents.0)
134    }
135}
136
137const SECRET_NAME_PREFIX: &str = "user-managed-";
138
139fn secret_name(id: CatalogItemId, name_prefix: &str) -> String {
140    format!("{name_prefix}{SECRET_NAME_PREFIX}{id}")
141}
142
143fn from_secret_name(name: &str, name_prefix: &str) -> Option<CatalogItemId> {
144    name.strip_prefix(&name_prefix)
145        .and_then(|name| name.strip_prefix(SECRET_NAME_PREFIX))
146        .and_then(|id| id.parse().ok())
147}