mz_persist/
cfg.rs
1use std::collections::BTreeMap;
13use std::sync::Arc;
14use std::time::Duration;
15
16use anyhow::anyhow;
17use mz_dyncfg::ConfigSet;
18use mz_ore::url::SensitiveUrl;
19use tracing::warn;
20
21use mz_postgres_client::PostgresClientKnobs;
22use mz_postgres_client::metrics::PostgresClientMetrics;
23
24use crate::azure::{AzureBlob, AzureBlobConfig};
25use crate::file::{FileBlob, FileBlobConfig};
26use crate::location::{Blob, Consensus, Determinate, ExternalError};
27use crate::mem::{MemBlob, MemBlobConfig, MemConsensus};
28use crate::metrics::S3BlobMetrics;
29use crate::postgres::{PostgresConsensus, PostgresConsensusConfig};
30use crate::s3::{S3Blob, S3BlobConfig};
31
32pub fn all_dyn_configs(configs: ConfigSet) -> ConfigSet {
34 configs
35 .add(&crate::indexed::columnar::arrow::ENABLE_ARROW_LGALLOC_CC_SIZES)
36 .add(&crate::indexed::columnar::arrow::ENABLE_ARROW_LGALLOC_NONCC_SIZES)
37 .add(&crate::s3::ENABLE_S3_LGALLOC_CC_SIZES)
38 .add(&crate::s3::ENABLE_S3_LGALLOC_NONCC_SIZES)
39}
40
41#[derive(Debug, Clone)]
43pub enum BlobConfig {
44 File(FileBlobConfig),
46 S3(S3BlobConfig),
48 Mem(bool),
51 Azure(AzureBlobConfig),
53}
54
55pub trait BlobKnobs: std::fmt::Debug + Send + Sync {
57 fn operation_timeout(&self) -> Duration;
59 fn operation_attempt_timeout(&self) -> Duration;
61 fn connect_timeout(&self) -> Duration;
63 fn read_timeout(&self) -> Duration;
65 fn is_cc_active(&self) -> bool;
67}
68
69impl BlobConfig {
70 pub async fn open(self) -> Result<Arc<dyn Blob>, ExternalError> {
72 match self {
73 BlobConfig::File(config) => Ok(Arc::new(FileBlob::open(config).await?)),
74 BlobConfig::S3(config) => Ok(Arc::new(S3Blob::open(config).await?)),
75 BlobConfig::Azure(config) => Ok(Arc::new(AzureBlob::open(config).await?)),
76 BlobConfig::Mem(tombstone) => {
77 Ok(Arc::new(MemBlob::open(MemBlobConfig::new(tombstone))))
78 }
79 }
80 }
81
82 pub async fn try_from(
84 url: &SensitiveUrl,
85 knobs: Box<dyn BlobKnobs>,
86 metrics: S3BlobMetrics,
87 cfg: Arc<ConfigSet>,
88 ) -> Result<Self, ExternalError> {
89 let mut query_params = url.query_pairs().collect::<BTreeMap<_, _>>();
90
91 let config = match url.scheme() {
92 "file" => {
93 let mut config = FileBlobConfig::from(url.path());
94 if query_params.remove("tombstone").is_some() {
95 config.tombstone = true;
96 }
97 Ok(BlobConfig::File(config))
98 }
99 "s3" => {
100 let bucket = url
101 .host()
102 .ok_or_else(|| anyhow!("missing bucket: {}", &url.as_str()))?
103 .to_string();
104 let prefix = url
105 .path()
106 .strip_prefix('/')
107 .unwrap_or_else(|| url.path())
108 .to_string();
109 let role_arn = query_params.remove("role_arn").map(|x| x.into_owned());
110 let endpoint = query_params.remove("endpoint").map(|x| x.into_owned());
111 let region = query_params.remove("region").map(|x| x.into_owned());
112
113 let credentials = match url.password() {
114 None => None,
115 Some(password) => Some((
116 String::from_utf8_lossy(&urlencoding::decode_binary(
117 url.username().as_bytes(),
118 ))
119 .into_owned(),
120 String::from_utf8_lossy(&urlencoding::decode_binary(password.as_bytes()))
121 .into_owned(),
122 )),
123 };
124
125 let config = S3BlobConfig::new(
126 bucket,
127 prefix,
128 role_arn,
129 endpoint,
130 region,
131 credentials,
132 knobs,
133 metrics,
134 cfg,
135 )
136 .await?;
137
138 Ok(BlobConfig::S3(config))
139 }
140 "mem" => {
141 if !cfg!(debug_assertions) {
142 warn!("persist unexpectedly using in-mem blob in a release binary");
143 }
144 let tombstone = match query_params.remove("tombstone").as_deref() {
145 None | Some("true") => true,
146 Some("false") => false,
147 Some(other) => Err(Determinate::new(anyhow!(
148 "invalid tombstone param value: {other}"
149 )))?,
150 };
151 query_params.clear();
152 Ok(BlobConfig::Mem(tombstone))
153 }
154 "http" | "https" => match url
155 .host()
156 .ok_or_else(|| anyhow!("missing protocol: {}", &url.as_str()))?
157 .to_string()
158 .split_once('.')
159 {
160 Some((account, root))
162 if account == "devstoreaccount1" || root == "blob.core.windows.net" =>
163 {
164 if let Some(container) = url
165 .path_segments()
166 .expect("azure blob storage container")
167 .next()
168 {
169 query_params.clear();
170 Ok(BlobConfig::Azure(AzureBlobConfig::new(
171 account.to_string(),
172 container.to_string(),
173 "".to_string(),
177 metrics,
178 url.clone().into_redacted(),
179 knobs,
180 cfg,
181 )?))
182 } else {
183 Err(anyhow!("unknown persist blob scheme: {}", url.as_str()))
184 }
185 }
186 _ => Err(anyhow!("unknown persist blob scheme: {}", url.as_str())),
187 },
188 p => Err(anyhow!(
189 "unknown persist blob scheme {}: {}",
190 p,
191 url.as_str()
192 )),
193 }?;
194
195 if !query_params.is_empty() {
196 return Err(ExternalError::from(anyhow!(
197 "unknown blob location params {}: {}",
198 query_params
199 .keys()
200 .map(|x| x.as_ref())
201 .collect::<Vec<_>>()
202 .join(" "),
203 url.as_str(),
204 )));
205 }
206
207 Ok(config)
208 }
209}
210
211#[derive(Debug, Clone)]
213pub enum ConsensusConfig {
214 Postgres(PostgresConsensusConfig),
216 Mem,
218}
219
220impl ConsensusConfig {
221 pub async fn open(self) -> Result<Arc<dyn Consensus>, ExternalError> {
223 match self {
224 ConsensusConfig::Postgres(config) => {
225 Ok(Arc::new(PostgresConsensus::open(config).await?))
226 }
227 ConsensusConfig::Mem => Ok(Arc::new(MemConsensus::default())),
228 }
229 }
230
231 pub fn try_from(
233 url: &SensitiveUrl,
234 knobs: Box<dyn PostgresClientKnobs>,
235 metrics: PostgresClientMetrics,
236 ) -> Result<Self, ExternalError> {
237 let config = match url.scheme() {
238 "postgres" | "postgresql" => Ok(ConsensusConfig::Postgres(
239 PostgresConsensusConfig::new(url, knobs, metrics)?,
240 )),
241 "mem" => {
242 if !cfg!(debug_assertions) {
243 warn!("persist unexpectedly using in-mem consensus in a release binary");
244 }
245 Ok(ConsensusConfig::Mem)
246 }
247 p => Err(anyhow!(
248 "unknown persist consensus scheme {}: {}",
249 p,
250 url.as_str()
251 )),
252 }?;
253 Ok(config)
254 }
255}