mz_persist/
cfg.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Configuration for [crate::location] implementations.
11
12use std::collections::BTreeMap;
13use std::sync::Arc;
14use std::time::Duration;
15
16use anyhow::anyhow;
17use mz_dyncfg::ConfigSet;
18use mz_ore::url::SensitiveUrl;
19use tracing::warn;
20
21use mz_postgres_client::PostgresClientKnobs;
22use mz_postgres_client::metrics::PostgresClientMetrics;
23
24use crate::azure::{AzureBlob, AzureBlobConfig};
25use crate::file::{FileBlob, FileBlobConfig};
26use crate::location::{Blob, Consensus, Determinate, ExternalError};
27use crate::mem::{MemBlob, MemBlobConfig, MemConsensus};
28use crate::metrics::S3BlobMetrics;
29use crate::postgres::{PostgresConsensus, PostgresConsensusConfig};
30use crate::s3::{S3Blob, S3BlobConfig};
31
32/// Adds the full set of all mz_persist `Config`s.
33pub fn all_dyn_configs(configs: ConfigSet) -> ConfigSet {
34    configs
35        .add(&crate::indexed::columnar::arrow::ENABLE_ARROW_LGALLOC_CC_SIZES)
36        .add(&crate::indexed::columnar::arrow::ENABLE_ARROW_LGALLOC_NONCC_SIZES)
37        .add(&crate::s3::ENABLE_S3_LGALLOC_CC_SIZES)
38        .add(&crate::s3::ENABLE_S3_LGALLOC_NONCC_SIZES)
39        .add(&crate::postgres::USE_POSTGRES_TUNED_QUERIES)
40}
41
42/// Config for an implementation of [Blob].
43#[derive(Debug, Clone)]
44pub enum BlobConfig {
45    /// Config for [FileBlob].
46    File(FileBlobConfig),
47    /// Config for [S3Blob].
48    S3(S3BlobConfig),
49    /// Config for [MemBlob], only available in testing to prevent
50    /// footguns.
51    Mem(bool),
52    /// Config for [AzureBlob].
53    Azure(AzureBlobConfig),
54}
55
56/// Configuration knobs for [Blob].
57pub trait BlobKnobs: std::fmt::Debug + Send + Sync {
58    /// Maximum time allowed for a network call, including retry attempts.
59    fn operation_timeout(&self) -> Duration;
60    /// Maximum time allowed for a single network call.
61    fn operation_attempt_timeout(&self) -> Duration;
62    /// Maximum time to wait for a socket connection to be made.
63    fn connect_timeout(&self) -> Duration;
64    /// Maximum time to wait to read the first byte of a response, including connection time.
65    fn read_timeout(&self) -> Duration;
66    /// Whether this is running in a "cc" sized cluster.
67    fn is_cc_active(&self) -> bool;
68}
69
70impl BlobConfig {
71    /// Opens the associated implementation of [Blob].
72    pub async fn open(self) -> Result<Arc<dyn Blob>, ExternalError> {
73        match self {
74            BlobConfig::File(config) => Ok(Arc::new(FileBlob::open(config).await?)),
75            BlobConfig::S3(config) => Ok(Arc::new(S3Blob::open(config).await?)),
76            BlobConfig::Azure(config) => Ok(Arc::new(AzureBlob::open(config).await?)),
77            BlobConfig::Mem(tombstone) => {
78                Ok(Arc::new(MemBlob::open(MemBlobConfig::new(tombstone))))
79            }
80        }
81    }
82
83    /// Parses a [Blob] config from a uri string.
84    pub async fn try_from(
85        url: &SensitiveUrl,
86        knobs: Box<dyn BlobKnobs>,
87        metrics: S3BlobMetrics,
88        cfg: Arc<ConfigSet>,
89    ) -> Result<Self, ExternalError> {
90        let mut query_params = url.query_pairs().collect::<BTreeMap<_, _>>();
91
92        let config = match url.scheme() {
93            "file" => {
94                let mut config = FileBlobConfig::from(url.path());
95                if query_params.remove("tombstone").is_some() {
96                    config.tombstone = true;
97                }
98                Ok(BlobConfig::File(config))
99            }
100            "s3" => {
101                let bucket = url
102                    .host()
103                    .ok_or_else(|| anyhow!("missing bucket: {}", &url.as_str()))?
104                    .to_string();
105                let prefix = url
106                    .path()
107                    .strip_prefix('/')
108                    .unwrap_or_else(|| url.path())
109                    .to_string();
110                let role_arn = query_params.remove("role_arn").map(|x| x.into_owned());
111                let endpoint = query_params.remove("endpoint").map(|x| x.into_owned());
112                let region = query_params.remove("region").map(|x| x.into_owned());
113
114                let credentials = match url.password() {
115                    None => None,
116                    Some(password) => Some((
117                        String::from_utf8_lossy(&urlencoding::decode_binary(
118                            url.username().as_bytes(),
119                        ))
120                        .into_owned(),
121                        String::from_utf8_lossy(&urlencoding::decode_binary(password.as_bytes()))
122                            .into_owned(),
123                    )),
124                };
125
126                let config = S3BlobConfig::new(
127                    bucket,
128                    prefix,
129                    role_arn,
130                    endpoint,
131                    region,
132                    credentials,
133                    knobs,
134                    metrics,
135                    cfg,
136                )
137                .await?;
138
139                Ok(BlobConfig::S3(config))
140            }
141            "mem" => {
142                if !cfg!(debug_assertions) {
143                    warn!("persist unexpectedly using in-mem blob in a release binary");
144                }
145                let tombstone = match query_params.remove("tombstone").as_deref() {
146                    None | Some("true") => true,
147                    Some("false") => false,
148                    Some(other) => Err(Determinate::new(anyhow!(
149                        "invalid tombstone param value: {other}"
150                    )))?,
151                };
152                query_params.clear();
153                Ok(BlobConfig::Mem(tombstone))
154            }
155            "http" | "https" => match url
156                .host()
157                .ok_or_else(|| anyhow!("missing protocol: {}", &url.as_str()))?
158                .to_string()
159                .split_once('.')
160            {
161                // The Azurite emulator always uses the well-known account name devstoreaccount1
162                Some((account, root))
163                    if account == "devstoreaccount1" || root == "blob.core.windows.net" =>
164                {
165                    if let Some(container) = url
166                        .path_segments()
167                        .expect("azure blob storage container")
168                        .next()
169                    {
170                        query_params.clear();
171                        Ok(BlobConfig::Azure(AzureBlobConfig::new(
172                            account.to_string(),
173                            container.to_string(),
174                            // Azure doesn't support prefixes in the way S3 does.
175                            // This is always empty, but we leave the field for
176                            // compatibility with our existing test suite.
177                            "".to_string(),
178                            metrics,
179                            url.clone().into_redacted(),
180                            knobs,
181                            cfg,
182                        )?))
183                    } else {
184                        Err(anyhow!("unknown persist blob scheme: {}", url.as_str()))
185                    }
186                }
187                _ => Err(anyhow!("unknown persist blob scheme: {}", url.as_str())),
188            },
189            p => Err(anyhow!(
190                "unknown persist blob scheme {}: {}",
191                p,
192                url.as_str()
193            )),
194        }?;
195
196        if !query_params.is_empty() {
197            return Err(ExternalError::from(anyhow!(
198                "unknown blob location params {}: {}",
199                query_params
200                    .keys()
201                    .map(|x| x.as_ref())
202                    .collect::<Vec<_>>()
203                    .join(" "),
204                url.as_str(),
205            )));
206        }
207
208        Ok(config)
209    }
210}
211
212/// Config for an implementation of [Consensus].
213#[derive(Debug, Clone)]
214pub enum ConsensusConfig {
215    /// Config for [PostgresConsensus].
216    Postgres(PostgresConsensusConfig),
217    /// Config for [MemConsensus], only available in testing.
218    Mem,
219}
220
221impl ConsensusConfig {
222    /// Opens the associated implementation of [Consensus].
223    pub async fn open(self) -> Result<Arc<dyn Consensus>, ExternalError> {
224        match self {
225            ConsensusConfig::Postgres(config) => {
226                Ok(Arc::new(PostgresConsensus::open(config).await?))
227            }
228            ConsensusConfig::Mem => Ok(Arc::new(MemConsensus::default())),
229        }
230    }
231
232    /// Parses a [Consensus] config from a uri string.
233    pub fn try_from(
234        url: &SensitiveUrl,
235        knobs: Box<dyn PostgresClientKnobs>,
236        metrics: PostgresClientMetrics,
237        dyncfg: Arc<ConfigSet>,
238    ) -> Result<Self, ExternalError> {
239        let config = match url.scheme() {
240            "postgres" | "postgresql" => Ok(ConsensusConfig::Postgres(
241                PostgresConsensusConfig::new(url, knobs, metrics, dyncfg)?,
242            )),
243            "mem" => {
244                if !cfg!(debug_assertions) {
245                    warn!("persist unexpectedly using in-mem consensus in a release binary");
246                }
247                Ok(ConsensusConfig::Mem)
248            }
249            p => Err(anyhow!(
250                "unknown persist consensus scheme {}: {}",
251                p,
252                url.as_str()
253            )),
254        }?;
255        Ok(config)
256    }
257}