mz_persist/
cfg.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Configuration for [crate::location] implementations.
11
12use std::collections::BTreeMap;
13use std::sync::Arc;
14use std::time::Duration;
15
16use anyhow::anyhow;
17use mz_dyncfg::ConfigSet;
18use mz_ore::url::SensitiveUrl;
19use tracing::warn;
20
21use mz_postgres_client::PostgresClientKnobs;
22use mz_postgres_client::metrics::PostgresClientMetrics;
23
24use crate::azure::{AzureBlob, AzureBlobConfig};
25use crate::file::{FileBlob, FileBlobConfig};
26use crate::location::{Blob, Consensus, Determinate, ExternalError};
27use crate::mem::{MemBlob, MemBlobConfig, MemConsensus};
28use crate::metrics::S3BlobMetrics;
29use crate::postgres::{PostgresConsensus, PostgresConsensusConfig};
30use crate::s3::{S3Blob, S3BlobConfig};
31
32/// Adds the full set of all mz_persist `Config`s.
33pub fn all_dyn_configs(configs: ConfigSet) -> ConfigSet {
34    configs
35        .add(&crate::indexed::columnar::arrow::ENABLE_ARROW_LGALLOC_CC_SIZES)
36        .add(&crate::indexed::columnar::arrow::ENABLE_ARROW_LGALLOC_NONCC_SIZES)
37        .add(&crate::s3::ENABLE_S3_LGALLOC_CC_SIZES)
38        .add(&crate::s3::ENABLE_S3_LGALLOC_NONCC_SIZES)
39}
40
41/// Config for an implementation of [Blob].
42#[derive(Debug, Clone)]
43pub enum BlobConfig {
44    /// Config for [FileBlob].
45    File(FileBlobConfig),
46    /// Config for [S3Blob].
47    S3(S3BlobConfig),
48    /// Config for [MemBlob], only available in testing to prevent
49    /// footguns.
50    Mem(bool),
51    /// Config for [AzureBlob].
52    Azure(AzureBlobConfig),
53}
54
55/// Configuration knobs for [Blob].
56pub trait BlobKnobs: std::fmt::Debug + Send + Sync {
57    /// Maximum time allowed for a network call, including retry attempts.
58    fn operation_timeout(&self) -> Duration;
59    /// Maximum time allowed for a single network call.
60    fn operation_attempt_timeout(&self) -> Duration;
61    /// Maximum time to wait for a socket connection to be made.
62    fn connect_timeout(&self) -> Duration;
63    /// Maximum time to wait to read the first byte of a response, including connection time.
64    fn read_timeout(&self) -> Duration;
65    /// Whether this is running in a "cc" sized cluster.
66    fn is_cc_active(&self) -> bool;
67}
68
69impl BlobConfig {
70    /// Opens the associated implementation of [Blob].
71    pub async fn open(self) -> Result<Arc<dyn Blob>, ExternalError> {
72        match self {
73            BlobConfig::File(config) => Ok(Arc::new(FileBlob::open(config).await?)),
74            BlobConfig::S3(config) => Ok(Arc::new(S3Blob::open(config).await?)),
75            BlobConfig::Azure(config) => Ok(Arc::new(AzureBlob::open(config).await?)),
76            BlobConfig::Mem(tombstone) => {
77                Ok(Arc::new(MemBlob::open(MemBlobConfig::new(tombstone))))
78            }
79        }
80    }
81
82    /// Parses a [Blob] config from a uri string.
83    pub async fn try_from(
84        url: &SensitiveUrl,
85        knobs: Box<dyn BlobKnobs>,
86        metrics: S3BlobMetrics,
87        cfg: Arc<ConfigSet>,
88    ) -> Result<Self, ExternalError> {
89        let mut query_params = url.query_pairs().collect::<BTreeMap<_, _>>();
90
91        let config = match url.scheme() {
92            "file" => {
93                let mut config = FileBlobConfig::from(url.path());
94                if query_params.remove("tombstone").is_some() {
95                    config.tombstone = true;
96                }
97                Ok(BlobConfig::File(config))
98            }
99            "s3" => {
100                let bucket = url
101                    .host()
102                    .ok_or_else(|| anyhow!("missing bucket: {}", &url.as_str()))?
103                    .to_string();
104                let prefix = url
105                    .path()
106                    .strip_prefix('/')
107                    .unwrap_or_else(|| url.path())
108                    .to_string();
109                let role_arn = query_params.remove("role_arn").map(|x| x.into_owned());
110                let endpoint = query_params.remove("endpoint").map(|x| x.into_owned());
111                let region = query_params.remove("region").map(|x| x.into_owned());
112
113                let credentials = match url.password() {
114                    None => None,
115                    Some(password) => Some((
116                        String::from_utf8_lossy(&urlencoding::decode_binary(
117                            url.username().as_bytes(),
118                        ))
119                        .into_owned(),
120                        String::from_utf8_lossy(&urlencoding::decode_binary(password.as_bytes()))
121                            .into_owned(),
122                    )),
123                };
124
125                let config = S3BlobConfig::new(
126                    bucket,
127                    prefix,
128                    role_arn,
129                    endpoint,
130                    region,
131                    credentials,
132                    knobs,
133                    metrics,
134                    cfg,
135                )
136                .await?;
137
138                Ok(BlobConfig::S3(config))
139            }
140            "mem" => {
141                if !cfg!(debug_assertions) {
142                    warn!("persist unexpectedly using in-mem blob in a release binary");
143                }
144                let tombstone = match query_params.remove("tombstone").as_deref() {
145                    None | Some("true") => true,
146                    Some("false") => false,
147                    Some(other) => Err(Determinate::new(anyhow!(
148                        "invalid tombstone param value: {other}"
149                    )))?,
150                };
151                query_params.clear();
152                Ok(BlobConfig::Mem(tombstone))
153            }
154            "http" | "https" => match url
155                .host()
156                .ok_or_else(|| anyhow!("missing protocol: {}", &url.as_str()))?
157                .to_string()
158                .split_once('.')
159            {
160                // The Azurite emulator always uses the well-known account name devstoreaccount1
161                Some((account, root))
162                    if account == "devstoreaccount1" || root == "blob.core.windows.net" =>
163                {
164                    if let Some(container) = url
165                        .path_segments()
166                        .expect("azure blob storage container")
167                        .next()
168                    {
169                        query_params.clear();
170                        Ok(BlobConfig::Azure(AzureBlobConfig::new(
171                            account.to_string(),
172                            container.to_string(),
173                            // Azure doesn't support prefixes in the way S3 does.
174                            // This is always empty, but we leave the field for
175                            // compatibility with our existing test suite.
176                            "".to_string(),
177                            metrics,
178                            url.clone().into_redacted(),
179                            knobs,
180                            cfg,
181                        )?))
182                    } else {
183                        Err(anyhow!("unknown persist blob scheme: {}", url.as_str()))
184                    }
185                }
186                _ => Err(anyhow!("unknown persist blob scheme: {}", url.as_str())),
187            },
188            p => Err(anyhow!(
189                "unknown persist blob scheme {}: {}",
190                p,
191                url.as_str()
192            )),
193        }?;
194
195        if !query_params.is_empty() {
196            return Err(ExternalError::from(anyhow!(
197                "unknown blob location params {}: {}",
198                query_params
199                    .keys()
200                    .map(|x| x.as_ref())
201                    .collect::<Vec<_>>()
202                    .join(" "),
203                url.as_str(),
204            )));
205        }
206
207        Ok(config)
208    }
209}
210
211/// Config for an implementation of [Consensus].
212#[derive(Debug, Clone)]
213pub enum ConsensusConfig {
214    /// Config for [PostgresConsensus].
215    Postgres(PostgresConsensusConfig),
216    /// Config for [MemConsensus], only available in testing.
217    Mem,
218}
219
220impl ConsensusConfig {
221    /// Opens the associated implementation of [Consensus].
222    pub async fn open(self) -> Result<Arc<dyn Consensus>, ExternalError> {
223        match self {
224            ConsensusConfig::Postgres(config) => {
225                Ok(Arc::new(PostgresConsensus::open(config).await?))
226            }
227            ConsensusConfig::Mem => Ok(Arc::new(MemConsensus::default())),
228        }
229    }
230
231    /// Parses a [Consensus] config from a uri string.
232    pub fn try_from(
233        url: &SensitiveUrl,
234        knobs: Box<dyn PostgresClientKnobs>,
235        metrics: PostgresClientMetrics,
236    ) -> Result<Self, ExternalError> {
237        let config = match url.scheme() {
238            "postgres" | "postgresql" => Ok(ConsensusConfig::Postgres(
239                PostgresConsensusConfig::new(url, knobs, metrics)?,
240            )),
241            "mem" => {
242                if !cfg!(debug_assertions) {
243                    warn!("persist unexpectedly using in-mem consensus in a release binary");
244                }
245                Ok(ConsensusConfig::Mem)
246            }
247            p => Err(anyhow!(
248                "unknown persist consensus scheme {}: {}",
249                p,
250                url.as_str()
251            )),
252        }?;
253        Ok(config)
254    }
255}