Skip to main content

mz_persist/
cfg.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Configuration for [crate::location] implementations.
11
12use std::collections::BTreeMap;
13use std::sync::Arc;
14use std::time::Duration;
15
16use anyhow::anyhow;
17use mz_dyncfg::ConfigSet;
18use mz_ore::url::SensitiveUrl;
19use tracing::warn;
20
21use mz_postgres_client::PostgresClientKnobs;
22use mz_postgres_client::metrics::PostgresClientMetrics;
23
24use crate::azure::{AzureBlob, AzureBlobConfig};
25use crate::file::{FileBlob, FileBlobConfig};
26#[cfg(feature = "foundationdb")]
27use crate::foundationdb::{FdbConsensus, FdbConsensusConfig};
28use crate::location::{Blob, Consensus, Determinate, ExternalError};
29use crate::mem::{MemBlob, MemBlobConfig, MemConsensus};
30use crate::metrics::S3BlobMetrics;
31use crate::postgres::{PostgresConsensus, PostgresConsensusConfig};
32use crate::s3::{S3Blob, S3BlobConfig};
33
34/// Adds the full set of all mz_persist `Config`s.
35pub fn all_dyn_configs(configs: ConfigSet) -> ConfigSet {
36    configs.add(&crate::postgres::USE_POSTGRES_TUNED_QUERIES)
37}
38
39/// Config for an implementation of [Blob].
40#[derive(Debug, Clone)]
41pub enum BlobConfig {
42    /// Config for [FileBlob].
43    File(FileBlobConfig),
44    /// Config for [S3Blob].
45    S3(S3BlobConfig),
46    /// Config for [MemBlob], only available in testing to prevent
47    /// footguns.
48    Mem(bool),
49    /// Config for [AzureBlob].
50    Azure(AzureBlobConfig),
51    #[cfg(feature = "turmoil")]
52    /// Config for [crate::turmoil::TurmoilBlob].
53    Turmoil(crate::turmoil::BlobConfig),
54}
55
56/// Configuration knobs for [Blob].
57pub trait BlobKnobs: std::fmt::Debug + Send + Sync {
58    /// Maximum time allowed for a network call, including retry attempts.
59    fn operation_timeout(&self) -> Duration;
60    /// Maximum time allowed for a single network call.
61    fn operation_attempt_timeout(&self) -> Duration;
62    /// Maximum time to wait for a socket connection to be made.
63    fn connect_timeout(&self) -> Duration;
64    /// Maximum time to wait to read the first byte of a response, including connection time.
65    fn read_timeout(&self) -> Duration;
66    /// Whether this is running in a "cc" sized cluster.
67    fn is_cc_active(&self) -> bool;
68}
69
70impl BlobConfig {
71    /// Opens the associated implementation of [Blob].
72    pub async fn open(self) -> Result<Arc<dyn Blob>, ExternalError> {
73        match self {
74            BlobConfig::File(config) => Ok(Arc::new(FileBlob::open(config).await?)),
75            BlobConfig::S3(config) => Ok(Arc::new(S3Blob::open(config).await?)),
76            BlobConfig::Azure(config) => Ok(Arc::new(AzureBlob::open(config).await?)),
77            BlobConfig::Mem(tombstone) => {
78                Ok(Arc::new(MemBlob::open(MemBlobConfig::new(tombstone))))
79            }
80            #[cfg(feature = "turmoil")]
81            BlobConfig::Turmoil(config) => Ok(Arc::new(crate::turmoil::TurmoilBlob::open(config))),
82        }
83    }
84
85    /// Parses a [Blob] config from a uri string.
86    pub async fn try_from(
87        url: &SensitiveUrl,
88        knobs: Box<dyn BlobKnobs>,
89        metrics: S3BlobMetrics,
90    ) -> Result<Self, ExternalError> {
91        let mut query_params = url.query_pairs().collect::<BTreeMap<_, _>>();
92
93        let config = match url.scheme() {
94            "file" => {
95                let mut config = FileBlobConfig::from(url.path());
96                if query_params.remove("tombstone").is_some() {
97                    config.tombstone = true;
98                }
99                Ok(BlobConfig::File(config))
100            }
101            "s3" => {
102                let bucket = url
103                    .host()
104                    .ok_or_else(|| anyhow!("missing bucket: {}", &url.as_str()))?
105                    .to_string();
106                let prefix = url
107                    .path()
108                    .strip_prefix('/')
109                    .unwrap_or_else(|| url.path())
110                    .to_string();
111                let role_arn = query_params.remove("role_arn").map(|x| x.into_owned());
112                let endpoint = query_params.remove("endpoint").map(|x| x.into_owned());
113                let region = query_params.remove("region").map(|x| x.into_owned());
114
115                let credentials = match url.password() {
116                    None => None,
117                    Some(password) => Some((
118                        String::from_utf8_lossy(&urlencoding::decode_binary(
119                            url.username().as_bytes(),
120                        ))
121                        .into_owned(),
122                        String::from_utf8_lossy(&urlencoding::decode_binary(password.as_bytes()))
123                            .into_owned(),
124                    )),
125                };
126
127                let config = S3BlobConfig::new(
128                    bucket,
129                    prefix,
130                    role_arn,
131                    endpoint,
132                    region,
133                    credentials,
134                    knobs,
135                    metrics,
136                )
137                .await?;
138
139                Ok(BlobConfig::S3(config))
140            }
141            "mem" => {
142                if !cfg!(debug_assertions) {
143                    warn!("persist unexpectedly using in-mem blob in a release binary");
144                }
145                let tombstone = match query_params.remove("tombstone").as_deref() {
146                    None | Some("true") => true,
147                    Some("false") => false,
148                    Some(other) => Err(Determinate::new(anyhow!(
149                        "invalid tombstone param value: {other}"
150                    )))?,
151                };
152                query_params.clear();
153                Ok(BlobConfig::Mem(tombstone))
154            }
155            "http" | "https" => match url
156                .host()
157                .ok_or_else(|| anyhow!("missing protocol: {}", &url.as_str()))?
158                .to_string()
159                .split_once('.')
160            {
161                // The Azurite emulator always uses the well-known account name devstoreaccount1
162                Some((account, root))
163                    if account == "devstoreaccount1" || root == "blob.core.windows.net" =>
164                {
165                    if let Some(container) = url
166                        .path_segments()
167                        .expect("azure blob storage container")
168                        .next()
169                    {
170                        query_params.clear();
171                        Ok(BlobConfig::Azure(AzureBlobConfig::new(
172                            account.to_string(),
173                            container.to_string(),
174                            // Azure doesn't support prefixes in the way S3 does.
175                            // This is always empty, but we leave the field for
176                            // compatibility with our existing test suite.
177                            "".to_string(),
178                            metrics,
179                            url.clone().into_redacted(),
180                            knobs,
181                        )?))
182                    } else {
183                        Err(anyhow!("unknown persist blob scheme: {}", url.as_str()))
184                    }
185                }
186                _ => Err(anyhow!("unknown persist blob scheme: {}", url.as_str())),
187            },
188            #[cfg(feature = "turmoil")]
189            "turmoil" => {
190                let cfg = crate::turmoil::BlobConfig::new(url);
191                Ok(BlobConfig::Turmoil(cfg))
192            }
193            p => Err(anyhow!(
194                "unknown persist blob scheme {}: {}",
195                p,
196                url.as_str()
197            )),
198        }?;
199
200        if !query_params.is_empty() {
201            return Err(ExternalError::from(anyhow!(
202                "unknown blob location params {}: {}",
203                query_params
204                    .keys()
205                    .map(|x| x.as_ref())
206                    .collect::<Vec<_>>()
207                    .join(" "),
208                url.as_str(),
209            )));
210        }
211
212        Ok(config)
213    }
214}
215
216/// Config for an implementation of [Consensus].
217#[derive(Debug, Clone)]
218pub enum ConsensusConfig {
219    #[cfg(feature = "foundationdb")]
220    /// Config for FoundationDB.
221    FoundationDB(FdbConsensusConfig),
222    /// Config for [PostgresConsensus].
223    Postgres(PostgresConsensusConfig),
224    /// Config for [MemConsensus], only available in testing.
225    Mem,
226    #[cfg(feature = "turmoil")]
227    /// Config for [crate::turmoil::TurmoilConsensus].
228    Turmoil(crate::turmoil::ConsensusConfig),
229}
230
231impl ConsensusConfig {
232    /// Opens the associated implementation of [Consensus].
233    pub async fn open(self) -> Result<Arc<dyn Consensus>, ExternalError> {
234        match self {
235            #[cfg(feature = "foundationdb")]
236            ConsensusConfig::FoundationDB(config) => {
237                Ok(Arc::new(FdbConsensus::open(config).await?))
238            }
239            ConsensusConfig::Postgres(config) => {
240                Ok(Arc::new(PostgresConsensus::open(config).await?))
241            }
242            ConsensusConfig::Mem => Ok(Arc::new(MemConsensus::default())),
243            #[cfg(feature = "turmoil")]
244            ConsensusConfig::Turmoil(config) => {
245                Ok(Arc::new(crate::turmoil::TurmoilConsensus::open(config)))
246            }
247        }
248    }
249
250    /// Parses a [Consensus] config from a uri string.
251    pub fn try_from(
252        url: &SensitiveUrl,
253        knobs: Box<dyn PostgresClientKnobs>,
254        metrics: PostgresClientMetrics,
255        dyncfg: Arc<ConfigSet>,
256    ) -> Result<Self, ExternalError> {
257        let config = match url.scheme() {
258            #[cfg(feature = "foundationdb")]
259            "foundationdb" => Ok(ConsensusConfig::FoundationDB(FdbConsensusConfig::new(
260                url.clone(),
261            )?)),
262            "postgres" | "postgresql" => Ok(ConsensusConfig::Postgres(
263                PostgresConsensusConfig::new(url, knobs, metrics, dyncfg)?,
264            )),
265            "mem" => {
266                if !cfg!(debug_assertions) {
267                    warn!("persist unexpectedly using in-mem consensus in a release binary");
268                }
269                Ok(ConsensusConfig::Mem)
270            }
271            #[cfg(feature = "turmoil")]
272            "turmoil" => {
273                let cfg = crate::turmoil::ConsensusConfig::new(url);
274                Ok(ConsensusConfig::Turmoil(cfg))
275            }
276            p => Err(anyhow!(
277                "unknown persist consensus scheme {}: {}",
278                p,
279                url.as_str()
280            )),
281        }?;
282        Ok(config)
283    }
284}