1use std::collections::BTreeMap;
13use std::sync::Arc;
14use std::time::Duration;
15
16use anyhow::anyhow;
17use mz_dyncfg::ConfigSet;
18use mz_ore::url::SensitiveUrl;
19use tracing::warn;
20
21use mz_postgres_client::PostgresClientKnobs;
22use mz_postgres_client::metrics::PostgresClientMetrics;
23
24use crate::azure::{AzureBlob, AzureBlobConfig};
25use crate::file::{FileBlob, FileBlobConfig};
26use crate::location::{Blob, Consensus, Determinate, ExternalError};
27use crate::mem::{MemBlob, MemBlobConfig, MemConsensus};
28use crate::metrics::S3BlobMetrics;
29use crate::postgres::{PostgresConsensus, PostgresConsensusConfig};
30use crate::s3::{S3Blob, S3BlobConfig};
31
32pub fn all_dyn_configs(configs: ConfigSet) -> ConfigSet {
34 configs
35 .add(&crate::indexed::columnar::arrow::ENABLE_ARROW_LGALLOC_CC_SIZES)
36 .add(&crate::indexed::columnar::arrow::ENABLE_ARROW_LGALLOC_NONCC_SIZES)
37 .add(&crate::s3::ENABLE_S3_LGALLOC_CC_SIZES)
38 .add(&crate::s3::ENABLE_S3_LGALLOC_NONCC_SIZES)
39 .add(&crate::postgres::USE_POSTGRES_TUNED_QUERIES)
40}
41
42#[derive(Debug, Clone)]
44pub enum BlobConfig {
45 File(FileBlobConfig),
47 S3(S3BlobConfig),
49 Mem(bool),
52 Azure(AzureBlobConfig),
54}
55
56pub trait BlobKnobs: std::fmt::Debug + Send + Sync {
58 fn operation_timeout(&self) -> Duration;
60 fn operation_attempt_timeout(&self) -> Duration;
62 fn connect_timeout(&self) -> Duration;
64 fn read_timeout(&self) -> Duration;
66 fn is_cc_active(&self) -> bool;
68}
69
70impl BlobConfig {
71 pub async fn open(self) -> Result<Arc<dyn Blob>, ExternalError> {
73 match self {
74 BlobConfig::File(config) => Ok(Arc::new(FileBlob::open(config).await?)),
75 BlobConfig::S3(config) => Ok(Arc::new(S3Blob::open(config).await?)),
76 BlobConfig::Azure(config) => Ok(Arc::new(AzureBlob::open(config).await?)),
77 BlobConfig::Mem(tombstone) => {
78 Ok(Arc::new(MemBlob::open(MemBlobConfig::new(tombstone))))
79 }
80 }
81 }
82
83 pub async fn try_from(
85 url: &SensitiveUrl,
86 knobs: Box<dyn BlobKnobs>,
87 metrics: S3BlobMetrics,
88 cfg: Arc<ConfigSet>,
89 ) -> Result<Self, ExternalError> {
90 let mut query_params = url.query_pairs().collect::<BTreeMap<_, _>>();
91
92 let config = match url.scheme() {
93 "file" => {
94 let mut config = FileBlobConfig::from(url.path());
95 if query_params.remove("tombstone").is_some() {
96 config.tombstone = true;
97 }
98 Ok(BlobConfig::File(config))
99 }
100 "s3" => {
101 let bucket = url
102 .host()
103 .ok_or_else(|| anyhow!("missing bucket: {}", &url.as_str()))?
104 .to_string();
105 let prefix = url
106 .path()
107 .strip_prefix('/')
108 .unwrap_or_else(|| url.path())
109 .to_string();
110 let role_arn = query_params.remove("role_arn").map(|x| x.into_owned());
111 let endpoint = query_params.remove("endpoint").map(|x| x.into_owned());
112 let region = query_params.remove("region").map(|x| x.into_owned());
113
114 let credentials = match url.password() {
115 None => None,
116 Some(password) => Some((
117 String::from_utf8_lossy(&urlencoding::decode_binary(
118 url.username().as_bytes(),
119 ))
120 .into_owned(),
121 String::from_utf8_lossy(&urlencoding::decode_binary(password.as_bytes()))
122 .into_owned(),
123 )),
124 };
125
126 let config = S3BlobConfig::new(
127 bucket,
128 prefix,
129 role_arn,
130 endpoint,
131 region,
132 credentials,
133 knobs,
134 metrics,
135 cfg,
136 )
137 .await?;
138
139 Ok(BlobConfig::S3(config))
140 }
141 "mem" => {
142 if !cfg!(debug_assertions) {
143 warn!("persist unexpectedly using in-mem blob in a release binary");
144 }
145 let tombstone = match query_params.remove("tombstone").as_deref() {
146 None | Some("true") => true,
147 Some("false") => false,
148 Some(other) => Err(Determinate::new(anyhow!(
149 "invalid tombstone param value: {other}"
150 )))?,
151 };
152 query_params.clear();
153 Ok(BlobConfig::Mem(tombstone))
154 }
155 "http" | "https" => match url
156 .host()
157 .ok_or_else(|| anyhow!("missing protocol: {}", &url.as_str()))?
158 .to_string()
159 .split_once('.')
160 {
161 Some((account, root))
163 if account == "devstoreaccount1" || root == "blob.core.windows.net" =>
164 {
165 if let Some(container) = url
166 .path_segments()
167 .expect("azure blob storage container")
168 .next()
169 {
170 query_params.clear();
171 Ok(BlobConfig::Azure(AzureBlobConfig::new(
172 account.to_string(),
173 container.to_string(),
174 "".to_string(),
178 metrics,
179 url.clone().into_redacted(),
180 knobs,
181 cfg,
182 )?))
183 } else {
184 Err(anyhow!("unknown persist blob scheme: {}", url.as_str()))
185 }
186 }
187 _ => Err(anyhow!("unknown persist blob scheme: {}", url.as_str())),
188 },
189 p => Err(anyhow!(
190 "unknown persist blob scheme {}: {}",
191 p,
192 url.as_str()
193 )),
194 }?;
195
196 if !query_params.is_empty() {
197 return Err(ExternalError::from(anyhow!(
198 "unknown blob location params {}: {}",
199 query_params
200 .keys()
201 .map(|x| x.as_ref())
202 .collect::<Vec<_>>()
203 .join(" "),
204 url.as_str(),
205 )));
206 }
207
208 Ok(config)
209 }
210}
211
212#[derive(Debug, Clone)]
214pub enum ConsensusConfig {
215 Postgres(PostgresConsensusConfig),
217 Mem,
219}
220
221impl ConsensusConfig {
222 pub async fn open(self) -> Result<Arc<dyn Consensus>, ExternalError> {
224 match self {
225 ConsensusConfig::Postgres(config) => {
226 Ok(Arc::new(PostgresConsensus::open(config).await?))
227 }
228 ConsensusConfig::Mem => Ok(Arc::new(MemConsensus::default())),
229 }
230 }
231
232 pub fn try_from(
234 url: &SensitiveUrl,
235 knobs: Box<dyn PostgresClientKnobs>,
236 metrics: PostgresClientMetrics,
237 dyncfg: Arc<ConfigSet>,
238 ) -> Result<Self, ExternalError> {
239 let config = match url.scheme() {
240 "postgres" | "postgresql" => Ok(ConsensusConfig::Postgres(
241 PostgresConsensusConfig::new(url, knobs, metrics, dyncfg)?,
242 )),
243 "mem" => {
244 if !cfg!(debug_assertions) {
245 warn!("persist unexpectedly using in-mem consensus in a release binary");
246 }
247 Ok(ConsensusConfig::Mem)
248 }
249 p => Err(anyhow!(
250 "unknown persist consensus scheme {}: {}",
251 p,
252 url.as_str()
253 )),
254 }?;
255 Ok(config)
256 }
257}