mz_orchestrator_process/
secrets.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Management of user secrets via the local file system.
11
12use std::path::PathBuf;
13use std::sync::Arc;
14
15use anyhow::Context;
16use async_trait::async_trait;
17use mz_repr::CatalogItemId;
18use mz_secrets::{SecretsController, SecretsReader};
19use tokio::fs::{self, OpenOptions};
20use tokio::io::AsyncWriteExt;
21
22use crate::ProcessOrchestrator;
23
24#[async_trait]
25impl SecretsController for ProcessOrchestrator {
26    async fn ensure(&self, id: CatalogItemId, contents: &[u8]) -> Result<(), anyhow::Error> {
27        let file_path = self.secrets_dir.join(id.to_string());
28        let mut file = OpenOptions::new()
29            .mode(0o600)
30            .create(true)
31            .write(true)
32            .truncate(true)
33            .open(file_path)
34            .await
35            .with_context(|| format!("writing secret {id}"))?;
36        file.write_all(contents)
37            .await
38            .with_context(|| format!("writing secret {id}"))?;
39        file.sync_all()
40            .await
41            .with_context(|| format!("writing secret {id}"))?;
42        Ok(())
43    }
44
45    async fn delete(&self, id: CatalogItemId) -> Result<(), anyhow::Error> {
46        fs::remove_file(self.secrets_dir.join(id.to_string()))
47            .await
48            .with_context(|| format!("deleting secret {id}"))?;
49        Ok(())
50    }
51
52    async fn list(&self) -> Result<Vec<CatalogItemId>, anyhow::Error> {
53        let mut ids = Vec::new();
54        let mut entries = fs::read_dir(&self.secrets_dir)
55            .await
56            .context("listing secrets")?;
57        while let Some(dir) = entries.next_entry().await? {
58            let id: CatalogItemId = dir.file_name().to_string_lossy().parse()?;
59            ids.push(id);
60        }
61        Ok(ids)
62    }
63
64    fn reader(&self) -> Arc<dyn SecretsReader> {
65        Arc::new(ProcessSecretsReader {
66            secrets_dir: self.secrets_dir.clone(),
67        })
68    }
69}
70
71/// A secrets reader associated with a [`ProcessOrchestrator`].
72#[derive(Debug)]
73pub struct ProcessSecretsReader {
74    secrets_dir: PathBuf,
75}
76
77impl ProcessSecretsReader {
78    /// Constructs a new [`ProcessSecretsReader`] that reads secrets out of the
79    /// specified directory.
80    pub fn new(secrets_dir: PathBuf) -> ProcessSecretsReader {
81        ProcessSecretsReader { secrets_dir }
82    }
83}
84
85#[async_trait]
86impl SecretsReader for ProcessSecretsReader {
87    async fn read(&self, id: CatalogItemId) -> Result<Vec<u8>, anyhow::Error> {
88        let contents = fs::read(self.secrets_dir.join(id.to_string()))
89            .await
90            .with_context(|| format!("reading secret {id}"))?;
91        Ok(contents)
92    }
93}