mz_persist/
cfg.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Configuration for [crate::location] implementations.
11
12use std::collections::BTreeMap;
13use std::sync::Arc;
14use std::time::Duration;
15
16use anyhow::anyhow;
17use mz_dyncfg::ConfigSet;
18use mz_ore::url::SensitiveUrl;
19use tracing::warn;
20
21use mz_postgres_client::PostgresClientKnobs;
22use mz_postgres_client::metrics::PostgresClientMetrics;
23
24use crate::azure::{AzureBlob, AzureBlobConfig};
25use crate::file::{FileBlob, FileBlobConfig};
26use crate::location::{Blob, Consensus, Determinate, ExternalError};
27use crate::mem::{MemBlob, MemBlobConfig, MemConsensus};
28use crate::metrics::S3BlobMetrics;
29use crate::postgres::{PostgresConsensus, PostgresConsensusConfig};
30use crate::s3::{S3Blob, S3BlobConfig};
31
32/// Adds the full set of all mz_persist `Config`s.
33pub fn all_dyn_configs(configs: ConfigSet) -> ConfigSet {
34    configs
35        .add(&crate::indexed::columnar::arrow::ENABLE_ARROW_LGALLOC_CC_SIZES)
36        .add(&crate::indexed::columnar::arrow::ENABLE_ARROW_LGALLOC_NONCC_SIZES)
37        .add(&crate::s3::ENABLE_S3_LGALLOC_CC_SIZES)
38        .add(&crate::s3::ENABLE_S3_LGALLOC_NONCC_SIZES)
39        .add(&crate::postgres::USE_POSTGRES_TUNED_QUERIES)
40}
41
42/// Config for an implementation of [Blob].
43#[derive(Debug, Clone)]
44pub enum BlobConfig {
45    /// Config for [FileBlob].
46    File(FileBlobConfig),
47    /// Config for [S3Blob].
48    S3(S3BlobConfig),
49    /// Config for [MemBlob], only available in testing to prevent
50    /// footguns.
51    Mem(bool),
52    /// Config for [AzureBlob].
53    Azure(AzureBlobConfig),
54    #[cfg(feature = "turmoil")]
55    /// Config for [crate::turmoil::TurmoilBlob].
56    Turmoil(crate::turmoil::BlobConfig),
57}
58
59/// Configuration knobs for [Blob].
60pub trait BlobKnobs: std::fmt::Debug + Send + Sync {
61    /// Maximum time allowed for a network call, including retry attempts.
62    fn operation_timeout(&self) -> Duration;
63    /// Maximum time allowed for a single network call.
64    fn operation_attempt_timeout(&self) -> Duration;
65    /// Maximum time to wait for a socket connection to be made.
66    fn connect_timeout(&self) -> Duration;
67    /// Maximum time to wait to read the first byte of a response, including connection time.
68    fn read_timeout(&self) -> Duration;
69    /// Whether this is running in a "cc" sized cluster.
70    fn is_cc_active(&self) -> bool;
71}
72
73impl BlobConfig {
74    /// Opens the associated implementation of [Blob].
75    pub async fn open(self) -> Result<Arc<dyn Blob>, ExternalError> {
76        match self {
77            BlobConfig::File(config) => Ok(Arc::new(FileBlob::open(config).await?)),
78            BlobConfig::S3(config) => Ok(Arc::new(S3Blob::open(config).await?)),
79            BlobConfig::Azure(config) => Ok(Arc::new(AzureBlob::open(config).await?)),
80            BlobConfig::Mem(tombstone) => {
81                Ok(Arc::new(MemBlob::open(MemBlobConfig::new(tombstone))))
82            }
83            #[cfg(feature = "turmoil")]
84            BlobConfig::Turmoil(config) => Ok(Arc::new(crate::turmoil::TurmoilBlob::open(config))),
85        }
86    }
87
88    /// Parses a [Blob] config from a uri string.
89    pub async fn try_from(
90        url: &SensitiveUrl,
91        knobs: Box<dyn BlobKnobs>,
92        metrics: S3BlobMetrics,
93        cfg: Arc<ConfigSet>,
94    ) -> Result<Self, ExternalError> {
95        let mut query_params = url.query_pairs().collect::<BTreeMap<_, _>>();
96
97        let config = match url.scheme() {
98            "file" => {
99                let mut config = FileBlobConfig::from(url.path());
100                if query_params.remove("tombstone").is_some() {
101                    config.tombstone = true;
102                }
103                Ok(BlobConfig::File(config))
104            }
105            "s3" => {
106                let bucket = url
107                    .host()
108                    .ok_or_else(|| anyhow!("missing bucket: {}", &url.as_str()))?
109                    .to_string();
110                let prefix = url
111                    .path()
112                    .strip_prefix('/')
113                    .unwrap_or_else(|| url.path())
114                    .to_string();
115                let role_arn = query_params.remove("role_arn").map(|x| x.into_owned());
116                let endpoint = query_params.remove("endpoint").map(|x| x.into_owned());
117                let region = query_params.remove("region").map(|x| x.into_owned());
118
119                let credentials = match url.password() {
120                    None => None,
121                    Some(password) => Some((
122                        String::from_utf8_lossy(&urlencoding::decode_binary(
123                            url.username().as_bytes(),
124                        ))
125                        .into_owned(),
126                        String::from_utf8_lossy(&urlencoding::decode_binary(password.as_bytes()))
127                            .into_owned(),
128                    )),
129                };
130
131                let config = S3BlobConfig::new(
132                    bucket,
133                    prefix,
134                    role_arn,
135                    endpoint,
136                    region,
137                    credentials,
138                    knobs,
139                    metrics,
140                    cfg,
141                )
142                .await?;
143
144                Ok(BlobConfig::S3(config))
145            }
146            "mem" => {
147                if !cfg!(debug_assertions) {
148                    warn!("persist unexpectedly using in-mem blob in a release binary");
149                }
150                let tombstone = match query_params.remove("tombstone").as_deref() {
151                    None | Some("true") => true,
152                    Some("false") => false,
153                    Some(other) => Err(Determinate::new(anyhow!(
154                        "invalid tombstone param value: {other}"
155                    )))?,
156                };
157                query_params.clear();
158                Ok(BlobConfig::Mem(tombstone))
159            }
160            "http" | "https" => match url
161                .host()
162                .ok_or_else(|| anyhow!("missing protocol: {}", &url.as_str()))?
163                .to_string()
164                .split_once('.')
165            {
166                // The Azurite emulator always uses the well-known account name devstoreaccount1
167                Some((account, root))
168                    if account == "devstoreaccount1" || root == "blob.core.windows.net" =>
169                {
170                    if let Some(container) = url
171                        .path_segments()
172                        .expect("azure blob storage container")
173                        .next()
174                    {
175                        query_params.clear();
176                        Ok(BlobConfig::Azure(AzureBlobConfig::new(
177                            account.to_string(),
178                            container.to_string(),
179                            // Azure doesn't support prefixes in the way S3 does.
180                            // This is always empty, but we leave the field for
181                            // compatibility with our existing test suite.
182                            "".to_string(),
183                            metrics,
184                            url.clone().into_redacted(),
185                            knobs,
186                            cfg,
187                        )?))
188                    } else {
189                        Err(anyhow!("unknown persist blob scheme: {}", url.as_str()))
190                    }
191                }
192                _ => Err(anyhow!("unknown persist blob scheme: {}", url.as_str())),
193            },
194            #[cfg(feature = "turmoil")]
195            "turmoil" => {
196                let cfg = crate::turmoil::BlobConfig::new(url);
197                Ok(BlobConfig::Turmoil(cfg))
198            }
199            p => Err(anyhow!(
200                "unknown persist blob scheme {}: {}",
201                p,
202                url.as_str()
203            )),
204        }?;
205
206        if !query_params.is_empty() {
207            return Err(ExternalError::from(anyhow!(
208                "unknown blob location params {}: {}",
209                query_params
210                    .keys()
211                    .map(|x| x.as_ref())
212                    .collect::<Vec<_>>()
213                    .join(" "),
214                url.as_str(),
215            )));
216        }
217
218        Ok(config)
219    }
220}
221
222/// Config for an implementation of [Consensus].
223#[derive(Debug, Clone)]
224pub enum ConsensusConfig {
225    /// Config for [PostgresConsensus].
226    Postgres(PostgresConsensusConfig),
227    /// Config for [MemConsensus], only available in testing.
228    Mem,
229    #[cfg(feature = "turmoil")]
230    /// Config for [crate::turmoil::TurmoilConsensus].
231    Turmoil(crate::turmoil::ConsensusConfig),
232}
233
234impl ConsensusConfig {
235    /// Opens the associated implementation of [Consensus].
236    pub async fn open(self) -> Result<Arc<dyn Consensus>, ExternalError> {
237        match self {
238            ConsensusConfig::Postgres(config) => {
239                Ok(Arc::new(PostgresConsensus::open(config).await?))
240            }
241            ConsensusConfig::Mem => Ok(Arc::new(MemConsensus::default())),
242            #[cfg(feature = "turmoil")]
243            ConsensusConfig::Turmoil(config) => {
244                Ok(Arc::new(crate::turmoil::TurmoilConsensus::open(config)))
245            }
246        }
247    }
248
249    /// Parses a [Consensus] config from a uri string.
250    pub fn try_from(
251        url: &SensitiveUrl,
252        knobs: Box<dyn PostgresClientKnobs>,
253        metrics: PostgresClientMetrics,
254        dyncfg: Arc<ConfigSet>,
255    ) -> Result<Self, ExternalError> {
256        let config = match url.scheme() {
257            "postgres" | "postgresql" => Ok(ConsensusConfig::Postgres(
258                PostgresConsensusConfig::new(url, knobs, metrics, dyncfg)?,
259            )),
260            "mem" => {
261                if !cfg!(debug_assertions) {
262                    warn!("persist unexpectedly using in-mem consensus in a release binary");
263                }
264                Ok(ConsensusConfig::Mem)
265            }
266            #[cfg(feature = "turmoil")]
267            "turmoil" => {
268                let cfg = crate::turmoil::ConsensusConfig::new(url);
269                Ok(ConsensusConfig::Turmoil(cfg))
270            }
271            p => Err(anyhow!(
272                "unknown persist consensus scheme {}: {}",
273                p,
274                url.as_str()
275            )),
276        }?;
277        Ok(config)
278    }
279}