Skip to main content

mz_persist/
cfg.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Configuration for [crate::location] implementations.
11
12use std::collections::BTreeMap;
13use std::sync::Arc;
14use std::time::Duration;
15
16use anyhow::anyhow;
17use mz_dyncfg::ConfigSet;
18use mz_ore::url::SensitiveUrl;
19use tracing::warn;
20
21use mz_postgres_client::PostgresClientKnobs;
22use mz_postgres_client::metrics::PostgresClientMetrics;
23
24use crate::azure::{AzureBlob, AzureBlobConfig};
25use crate::file::{FileBlob, FileBlobConfig};
26#[cfg(feature = "foundationdb")]
27use crate::foundationdb::{FdbConsensus, FdbConsensusConfig};
28use crate::location::{Blob, Consensus, Determinate, ExternalError};
29use crate::mem::{MemBlob, MemBlobConfig, MemConsensus};
30use crate::metrics::S3BlobMetrics;
31use crate::postgres::{PostgresConsensus, PostgresConsensusConfig};
32use crate::s3::{S3Blob, S3BlobConfig};
33
34/// Adds the full set of all mz_persist `Config`s.
35pub fn all_dyn_configs(configs: ConfigSet) -> ConfigSet {
36    configs
37        .add(&crate::indexed::columnar::arrow::ENABLE_ARROW_LGALLOC_CC_SIZES)
38        .add(&crate::indexed::columnar::arrow::ENABLE_ARROW_LGALLOC_NONCC_SIZES)
39        .add(&crate::postgres::USE_POSTGRES_TUNED_QUERIES)
40}
41
42/// Config for an implementation of [Blob].
43#[derive(Debug, Clone)]
44pub enum BlobConfig {
45    /// Config for [FileBlob].
46    File(FileBlobConfig),
47    /// Config for [S3Blob].
48    S3(S3BlobConfig),
49    /// Config for [MemBlob], only available in testing to prevent
50    /// footguns.
51    Mem(bool),
52    /// Config for [AzureBlob].
53    Azure(AzureBlobConfig),
54    #[cfg(feature = "turmoil")]
55    /// Config for [crate::turmoil::TurmoilBlob].
56    Turmoil(crate::turmoil::BlobConfig),
57}
58
59/// Configuration knobs for [Blob].
60pub trait BlobKnobs: std::fmt::Debug + Send + Sync {
61    /// Maximum time allowed for a network call, including retry attempts.
62    fn operation_timeout(&self) -> Duration;
63    /// Maximum time allowed for a single network call.
64    fn operation_attempt_timeout(&self) -> Duration;
65    /// Maximum time to wait for a socket connection to be made.
66    fn connect_timeout(&self) -> Duration;
67    /// Maximum time to wait to read the first byte of a response, including connection time.
68    fn read_timeout(&self) -> Duration;
69    /// Whether this is running in a "cc" sized cluster.
70    fn is_cc_active(&self) -> bool;
71}
72
73impl BlobConfig {
74    /// Opens the associated implementation of [Blob].
75    pub async fn open(self) -> Result<Arc<dyn Blob>, ExternalError> {
76        match self {
77            BlobConfig::File(config) => Ok(Arc::new(FileBlob::open(config).await?)),
78            BlobConfig::S3(config) => Ok(Arc::new(S3Blob::open(config).await?)),
79            BlobConfig::Azure(config) => Ok(Arc::new(AzureBlob::open(config).await?)),
80            BlobConfig::Mem(tombstone) => {
81                Ok(Arc::new(MemBlob::open(MemBlobConfig::new(tombstone))))
82            }
83            #[cfg(feature = "turmoil")]
84            BlobConfig::Turmoil(config) => Ok(Arc::new(crate::turmoil::TurmoilBlob::open(config))),
85        }
86    }
87
88    /// Parses a [Blob] config from a uri string.
89    pub async fn try_from(
90        url: &SensitiveUrl,
91        knobs: Box<dyn BlobKnobs>,
92        metrics: S3BlobMetrics,
93        cfg: Arc<ConfigSet>,
94    ) -> Result<Self, ExternalError> {
95        let mut query_params = url.query_pairs().collect::<BTreeMap<_, _>>();
96
97        let config = match url.scheme() {
98            "file" => {
99                let mut config = FileBlobConfig::from(url.path());
100                if query_params.remove("tombstone").is_some() {
101                    config.tombstone = true;
102                }
103                Ok(BlobConfig::File(config))
104            }
105            "s3" => {
106                let bucket = url
107                    .host()
108                    .ok_or_else(|| anyhow!("missing bucket: {}", &url.as_str()))?
109                    .to_string();
110                let prefix = url
111                    .path()
112                    .strip_prefix('/')
113                    .unwrap_or_else(|| url.path())
114                    .to_string();
115                let role_arn = query_params.remove("role_arn").map(|x| x.into_owned());
116                let endpoint = query_params.remove("endpoint").map(|x| x.into_owned());
117                let region = query_params.remove("region").map(|x| x.into_owned());
118
119                let credentials = match url.password() {
120                    None => None,
121                    Some(password) => Some((
122                        String::from_utf8_lossy(&urlencoding::decode_binary(
123                            url.username().as_bytes(),
124                        ))
125                        .into_owned(),
126                        String::from_utf8_lossy(&urlencoding::decode_binary(password.as_bytes()))
127                            .into_owned(),
128                    )),
129                };
130
131                let config = S3BlobConfig::new(
132                    bucket,
133                    prefix,
134                    role_arn,
135                    endpoint,
136                    region,
137                    credentials,
138                    knobs,
139                    metrics,
140                )
141                .await?;
142
143                Ok(BlobConfig::S3(config))
144            }
145            "mem" => {
146                if !cfg!(debug_assertions) {
147                    warn!("persist unexpectedly using in-mem blob in a release binary");
148                }
149                let tombstone = match query_params.remove("tombstone").as_deref() {
150                    None | Some("true") => true,
151                    Some("false") => false,
152                    Some(other) => Err(Determinate::new(anyhow!(
153                        "invalid tombstone param value: {other}"
154                    )))?,
155                };
156                query_params.clear();
157                Ok(BlobConfig::Mem(tombstone))
158            }
159            "http" | "https" => match url
160                .host()
161                .ok_or_else(|| anyhow!("missing protocol: {}", &url.as_str()))?
162                .to_string()
163                .split_once('.')
164            {
165                // The Azurite emulator always uses the well-known account name devstoreaccount1
166                Some((account, root))
167                    if account == "devstoreaccount1" || root == "blob.core.windows.net" =>
168                {
169                    if let Some(container) = url
170                        .path_segments()
171                        .expect("azure blob storage container")
172                        .next()
173                    {
174                        query_params.clear();
175                        Ok(BlobConfig::Azure(AzureBlobConfig::new(
176                            account.to_string(),
177                            container.to_string(),
178                            // Azure doesn't support prefixes in the way S3 does.
179                            // This is always empty, but we leave the field for
180                            // compatibility with our existing test suite.
181                            "".to_string(),
182                            metrics,
183                            url.clone().into_redacted(),
184                            knobs,
185                            cfg,
186                        )?))
187                    } else {
188                        Err(anyhow!("unknown persist blob scheme: {}", url.as_str()))
189                    }
190                }
191                _ => Err(anyhow!("unknown persist blob scheme: {}", url.as_str())),
192            },
193            #[cfg(feature = "turmoil")]
194            "turmoil" => {
195                let cfg = crate::turmoil::BlobConfig::new(url);
196                Ok(BlobConfig::Turmoil(cfg))
197            }
198            p => Err(anyhow!(
199                "unknown persist blob scheme {}: {}",
200                p,
201                url.as_str()
202            )),
203        }?;
204
205        if !query_params.is_empty() {
206            return Err(ExternalError::from(anyhow!(
207                "unknown blob location params {}: {}",
208                query_params
209                    .keys()
210                    .map(|x| x.as_ref())
211                    .collect::<Vec<_>>()
212                    .join(" "),
213                url.as_str(),
214            )));
215        }
216
217        Ok(config)
218    }
219}
220
221/// Config for an implementation of [Consensus].
222#[derive(Debug, Clone)]
223pub enum ConsensusConfig {
224    #[cfg(feature = "foundationdb")]
225    /// Config for FoundationDB.
226    FoundationDB(FdbConsensusConfig),
227    /// Config for [PostgresConsensus].
228    Postgres(PostgresConsensusConfig),
229    /// Config for [MemConsensus], only available in testing.
230    Mem,
231    #[cfg(feature = "turmoil")]
232    /// Config for [crate::turmoil::TurmoilConsensus].
233    Turmoil(crate::turmoil::ConsensusConfig),
234}
235
236impl ConsensusConfig {
237    /// Opens the associated implementation of [Consensus].
238    pub async fn open(self) -> Result<Arc<dyn Consensus>, ExternalError> {
239        match self {
240            #[cfg(feature = "foundationdb")]
241            ConsensusConfig::FoundationDB(config) => {
242                Ok(Arc::new(FdbConsensus::open(config).await?))
243            }
244            ConsensusConfig::Postgres(config) => {
245                Ok(Arc::new(PostgresConsensus::open(config).await?))
246            }
247            ConsensusConfig::Mem => Ok(Arc::new(MemConsensus::default())),
248            #[cfg(feature = "turmoil")]
249            ConsensusConfig::Turmoil(config) => {
250                Ok(Arc::new(crate::turmoil::TurmoilConsensus::open(config)))
251            }
252        }
253    }
254
255    /// Parses a [Consensus] config from a uri string.
256    pub fn try_from(
257        url: &SensitiveUrl,
258        knobs: Box<dyn PostgresClientKnobs>,
259        metrics: PostgresClientMetrics,
260        dyncfg: Arc<ConfigSet>,
261    ) -> Result<Self, ExternalError> {
262        let config = match url.scheme() {
263            #[cfg(feature = "foundationdb")]
264            "foundationdb" => Ok(ConsensusConfig::FoundationDB(FdbConsensusConfig::new(
265                url.clone(),
266            )?)),
267            "postgres" | "postgresql" => Ok(ConsensusConfig::Postgres(
268                PostgresConsensusConfig::new(url, knobs, metrics, dyncfg)?,
269            )),
270            "mem" => {
271                if !cfg!(debug_assertions) {
272                    warn!("persist unexpectedly using in-mem consensus in a release binary");
273                }
274                Ok(ConsensusConfig::Mem)
275            }
276            #[cfg(feature = "turmoil")]
277            "turmoil" => {
278                let cfg = crate::turmoil::ConsensusConfig::new(url);
279                Ok(ConsensusConfig::Turmoil(cfg))
280            }
281            p => Err(anyhow!(
282                "unknown persist consensus scheme {}: {}",
283                p,
284                url.as_str()
285            )),
286        }?;
287        Ok(config)
288    }
289}