Skip to main content

mz_persist/
cfg.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Configuration for [crate::location] implementations.
11
12use std::collections::BTreeMap;
13use std::sync::Arc;
14use std::time::Duration;
15
16use anyhow::anyhow;
17use mz_dyncfg::ConfigSet;
18use mz_ore::url::SensitiveUrl;
19use tracing::warn;
20
21use mz_postgres_client::PostgresClientKnobs;
22use mz_postgres_client::metrics::PostgresClientMetrics;
23
24use crate::azure::{AzureBlob, AzureBlobConfig};
25use crate::file::{FileBlob, FileBlobConfig};
26#[cfg(any(feature = "foundationdb", target_os = "linux"))]
27use crate::foundationdb::{FdbConsensus, FdbConsensusConfig};
28use crate::location::{Blob, Consensus, Determinate, ExternalError};
29use crate::mem::{MemBlob, MemBlobConfig, MemConsensus};
30use crate::metrics::S3BlobMetrics;
31use crate::postgres::{PostgresConsensus, PostgresConsensusConfig};
32use crate::s3::{S3Blob, S3BlobConfig};
33
34/// Adds the full set of all mz_persist `Config`s.
35pub fn all_dyn_configs(configs: ConfigSet) -> ConfigSet {
36    configs
37        .add(&crate::indexed::columnar::arrow::ENABLE_ARROW_LGALLOC_CC_SIZES)
38        .add(&crate::indexed::columnar::arrow::ENABLE_ARROW_LGALLOC_NONCC_SIZES)
39        .add(&crate::s3::ENABLE_S3_LGALLOC_CC_SIZES)
40        .add(&crate::s3::ENABLE_S3_LGALLOC_NONCC_SIZES)
41        .add(&crate::postgres::USE_POSTGRES_TUNED_QUERIES)
42}
43
44/// Config for an implementation of [Blob].
45#[derive(Debug, Clone)]
46pub enum BlobConfig {
47    /// Config for [FileBlob].
48    File(FileBlobConfig),
49    /// Config for [S3Blob].
50    S3(S3BlobConfig),
51    /// Config for [MemBlob], only available in testing to prevent
52    /// footguns.
53    Mem(bool),
54    /// Config for [AzureBlob].
55    Azure(AzureBlobConfig),
56    #[cfg(feature = "turmoil")]
57    /// Config for [crate::turmoil::TurmoilBlob].
58    Turmoil(crate::turmoil::BlobConfig),
59}
60
61/// Configuration knobs for [Blob].
62pub trait BlobKnobs: std::fmt::Debug + Send + Sync {
63    /// Maximum time allowed for a network call, including retry attempts.
64    fn operation_timeout(&self) -> Duration;
65    /// Maximum time allowed for a single network call.
66    fn operation_attempt_timeout(&self) -> Duration;
67    /// Maximum time to wait for a socket connection to be made.
68    fn connect_timeout(&self) -> Duration;
69    /// Maximum time to wait to read the first byte of a response, including connection time.
70    fn read_timeout(&self) -> Duration;
71    /// Whether this is running in a "cc" sized cluster.
72    fn is_cc_active(&self) -> bool;
73}
74
75impl BlobConfig {
76    /// Opens the associated implementation of [Blob].
77    pub async fn open(self) -> Result<Arc<dyn Blob>, ExternalError> {
78        match self {
79            BlobConfig::File(config) => Ok(Arc::new(FileBlob::open(config).await?)),
80            BlobConfig::S3(config) => Ok(Arc::new(S3Blob::open(config).await?)),
81            BlobConfig::Azure(config) => Ok(Arc::new(AzureBlob::open(config).await?)),
82            BlobConfig::Mem(tombstone) => {
83                Ok(Arc::new(MemBlob::open(MemBlobConfig::new(tombstone))))
84            }
85            #[cfg(feature = "turmoil")]
86            BlobConfig::Turmoil(config) => Ok(Arc::new(crate::turmoil::TurmoilBlob::open(config))),
87        }
88    }
89
90    /// Parses a [Blob] config from a uri string.
91    pub async fn try_from(
92        url: &SensitiveUrl,
93        knobs: Box<dyn BlobKnobs>,
94        metrics: S3BlobMetrics,
95        cfg: Arc<ConfigSet>,
96    ) -> Result<Self, ExternalError> {
97        let mut query_params = url.query_pairs().collect::<BTreeMap<_, _>>();
98
99        let config = match url.scheme() {
100            "file" => {
101                let mut config = FileBlobConfig::from(url.path());
102                if query_params.remove("tombstone").is_some() {
103                    config.tombstone = true;
104                }
105                Ok(BlobConfig::File(config))
106            }
107            "s3" => {
108                let bucket = url
109                    .host()
110                    .ok_or_else(|| anyhow!("missing bucket: {}", &url.as_str()))?
111                    .to_string();
112                let prefix = url
113                    .path()
114                    .strip_prefix('/')
115                    .unwrap_or_else(|| url.path())
116                    .to_string();
117                let role_arn = query_params.remove("role_arn").map(|x| x.into_owned());
118                let endpoint = query_params.remove("endpoint").map(|x| x.into_owned());
119                let region = query_params.remove("region").map(|x| x.into_owned());
120
121                let credentials = match url.password() {
122                    None => None,
123                    Some(password) => Some((
124                        String::from_utf8_lossy(&urlencoding::decode_binary(
125                            url.username().as_bytes(),
126                        ))
127                        .into_owned(),
128                        String::from_utf8_lossy(&urlencoding::decode_binary(password.as_bytes()))
129                            .into_owned(),
130                    )),
131                };
132
133                let config = S3BlobConfig::new(
134                    bucket,
135                    prefix,
136                    role_arn,
137                    endpoint,
138                    region,
139                    credentials,
140                    knobs,
141                    metrics,
142                    cfg,
143                )
144                .await?;
145
146                Ok(BlobConfig::S3(config))
147            }
148            "mem" => {
149                if !cfg!(debug_assertions) {
150                    warn!("persist unexpectedly using in-mem blob in a release binary");
151                }
152                let tombstone = match query_params.remove("tombstone").as_deref() {
153                    None | Some("true") => true,
154                    Some("false") => false,
155                    Some(other) => Err(Determinate::new(anyhow!(
156                        "invalid tombstone param value: {other}"
157                    )))?,
158                };
159                query_params.clear();
160                Ok(BlobConfig::Mem(tombstone))
161            }
162            "http" | "https" => match url
163                .host()
164                .ok_or_else(|| anyhow!("missing protocol: {}", &url.as_str()))?
165                .to_string()
166                .split_once('.')
167            {
168                // The Azurite emulator always uses the well-known account name devstoreaccount1
169                Some((account, root))
170                    if account == "devstoreaccount1" || root == "blob.core.windows.net" =>
171                {
172                    if let Some(container) = url
173                        .path_segments()
174                        .expect("azure blob storage container")
175                        .next()
176                    {
177                        query_params.clear();
178                        Ok(BlobConfig::Azure(AzureBlobConfig::new(
179                            account.to_string(),
180                            container.to_string(),
181                            // Azure doesn't support prefixes in the way S3 does.
182                            // This is always empty, but we leave the field for
183                            // compatibility with our existing test suite.
184                            "".to_string(),
185                            metrics,
186                            url.clone().into_redacted(),
187                            knobs,
188                            cfg,
189                        )?))
190                    } else {
191                        Err(anyhow!("unknown persist blob scheme: {}", url.as_str()))
192                    }
193                }
194                _ => Err(anyhow!("unknown persist blob scheme: {}", url.as_str())),
195            },
196            #[cfg(feature = "turmoil")]
197            "turmoil" => {
198                let cfg = crate::turmoil::BlobConfig::new(url);
199                Ok(BlobConfig::Turmoil(cfg))
200            }
201            p => Err(anyhow!(
202                "unknown persist blob scheme {}: {}",
203                p,
204                url.as_str()
205            )),
206        }?;
207
208        if !query_params.is_empty() {
209            return Err(ExternalError::from(anyhow!(
210                "unknown blob location params {}: {}",
211                query_params
212                    .keys()
213                    .map(|x| x.as_ref())
214                    .collect::<Vec<_>>()
215                    .join(" "),
216                url.as_str(),
217            )));
218        }
219
220        Ok(config)
221    }
222}
223
224/// Config for an implementation of [Consensus].
225#[derive(Debug, Clone)]
226pub enum ConsensusConfig {
227    #[cfg(any(feature = "foundationdb", target_os = "linux"))]
228    /// Config for FoundationDB.
229    FoundationDB(FdbConsensusConfig),
230    /// Config for [PostgresConsensus].
231    Postgres(PostgresConsensusConfig),
232    /// Config for [MemConsensus], only available in testing.
233    Mem,
234    #[cfg(feature = "turmoil")]
235    /// Config for [crate::turmoil::TurmoilConsensus].
236    Turmoil(crate::turmoil::ConsensusConfig),
237}
238
239impl ConsensusConfig {
240    /// Opens the associated implementation of [Consensus].
241    pub async fn open(self) -> Result<Arc<dyn Consensus>, ExternalError> {
242        match self {
243            #[cfg(any(feature = "foundationdb", target_os = "linux"))]
244            ConsensusConfig::FoundationDB(config) => {
245                Ok(Arc::new(FdbConsensus::open(config).await?))
246            }
247            ConsensusConfig::Postgres(config) => {
248                Ok(Arc::new(PostgresConsensus::open(config).await?))
249            }
250            ConsensusConfig::Mem => Ok(Arc::new(MemConsensus::default())),
251            #[cfg(feature = "turmoil")]
252            ConsensusConfig::Turmoil(config) => {
253                Ok(Arc::new(crate::turmoil::TurmoilConsensus::open(config)))
254            }
255        }
256    }
257
258    /// Parses a [Consensus] config from a uri string.
259    pub fn try_from(
260        url: &SensitiveUrl,
261        knobs: Box<dyn PostgresClientKnobs>,
262        metrics: PostgresClientMetrics,
263        dyncfg: Arc<ConfigSet>,
264    ) -> Result<Self, ExternalError> {
265        let config = match url.scheme() {
266            #[cfg(any(feature = "foundationdb", target_os = "linux"))]
267            "foundationdb" => Ok(ConsensusConfig::FoundationDB(FdbConsensusConfig::new(
268                url.clone(),
269            )?)),
270            "postgres" | "postgresql" => Ok(ConsensusConfig::Postgres(
271                PostgresConsensusConfig::new(url, knobs, metrics, dyncfg)?,
272            )),
273            "mem" => {
274                if !cfg!(debug_assertions) {
275                    warn!("persist unexpectedly using in-mem consensus in a release binary");
276                }
277                Ok(ConsensusConfig::Mem)
278            }
279            #[cfg(feature = "turmoil")]
280            "turmoil" => {
281                let cfg = crate::turmoil::ConsensusConfig::new(url);
282                Ok(ConsensusConfig::Turmoil(cfg))
283            }
284            p => Err(anyhow!(
285                "unknown persist consensus scheme {}: {}",
286                p,
287                url.as_str()
288            )),
289        }?;
290        Ok(config)
291    }
292}