1use std::collections::BTreeMap;
13use std::sync::Arc;
14use std::time::Duration;
15
16use anyhow::anyhow;
17use mz_dyncfg::ConfigSet;
18use mz_ore::url::SensitiveUrl;
19use tracing::warn;
20
21use mz_postgres_client::PostgresClientKnobs;
22use mz_postgres_client::metrics::PostgresClientMetrics;
23
24use crate::azure::{AzureBlob, AzureBlobConfig};
25use crate::file::{FileBlob, FileBlobConfig};
26use crate::location::{Blob, Consensus, Determinate, ExternalError};
27use crate::mem::{MemBlob, MemBlobConfig, MemConsensus};
28use crate::metrics::S3BlobMetrics;
29use crate::postgres::{PostgresConsensus, PostgresConsensusConfig};
30use crate::s3::{S3Blob, S3BlobConfig};
31
32pub fn all_dyn_configs(configs: ConfigSet) -> ConfigSet {
34 configs
35 .add(&crate::indexed::columnar::arrow::ENABLE_ARROW_LGALLOC_CC_SIZES)
36 .add(&crate::indexed::columnar::arrow::ENABLE_ARROW_LGALLOC_NONCC_SIZES)
37 .add(&crate::s3::ENABLE_S3_LGALLOC_CC_SIZES)
38 .add(&crate::s3::ENABLE_S3_LGALLOC_NONCC_SIZES)
39 .add(&crate::postgres::USE_POSTGRES_TUNED_QUERIES)
40}
41
42#[derive(Debug, Clone)]
44pub enum BlobConfig {
45 File(FileBlobConfig),
47 S3(S3BlobConfig),
49 Mem(bool),
52 Azure(AzureBlobConfig),
54 #[cfg(feature = "turmoil")]
55 Turmoil(crate::turmoil::BlobConfig),
57}
58
59pub trait BlobKnobs: std::fmt::Debug + Send + Sync {
61 fn operation_timeout(&self) -> Duration;
63 fn operation_attempt_timeout(&self) -> Duration;
65 fn connect_timeout(&self) -> Duration;
67 fn read_timeout(&self) -> Duration;
69 fn is_cc_active(&self) -> bool;
71}
72
73impl BlobConfig {
74 pub async fn open(self) -> Result<Arc<dyn Blob>, ExternalError> {
76 match self {
77 BlobConfig::File(config) => Ok(Arc::new(FileBlob::open(config).await?)),
78 BlobConfig::S3(config) => Ok(Arc::new(S3Blob::open(config).await?)),
79 BlobConfig::Azure(config) => Ok(Arc::new(AzureBlob::open(config).await?)),
80 BlobConfig::Mem(tombstone) => {
81 Ok(Arc::new(MemBlob::open(MemBlobConfig::new(tombstone))))
82 }
83 #[cfg(feature = "turmoil")]
84 BlobConfig::Turmoil(config) => Ok(Arc::new(crate::turmoil::TurmoilBlob::open(config))),
85 }
86 }
87
88 pub async fn try_from(
90 url: &SensitiveUrl,
91 knobs: Box<dyn BlobKnobs>,
92 metrics: S3BlobMetrics,
93 cfg: Arc<ConfigSet>,
94 ) -> Result<Self, ExternalError> {
95 let mut query_params = url.query_pairs().collect::<BTreeMap<_, _>>();
96
97 let config = match url.scheme() {
98 "file" => {
99 let mut config = FileBlobConfig::from(url.path());
100 if query_params.remove("tombstone").is_some() {
101 config.tombstone = true;
102 }
103 Ok(BlobConfig::File(config))
104 }
105 "s3" => {
106 let bucket = url
107 .host()
108 .ok_or_else(|| anyhow!("missing bucket: {}", &url.as_str()))?
109 .to_string();
110 let prefix = url
111 .path()
112 .strip_prefix('/')
113 .unwrap_or_else(|| url.path())
114 .to_string();
115 let role_arn = query_params.remove("role_arn").map(|x| x.into_owned());
116 let endpoint = query_params.remove("endpoint").map(|x| x.into_owned());
117 let region = query_params.remove("region").map(|x| x.into_owned());
118
119 let credentials = match url.password() {
120 None => None,
121 Some(password) => Some((
122 String::from_utf8_lossy(&urlencoding::decode_binary(
123 url.username().as_bytes(),
124 ))
125 .into_owned(),
126 String::from_utf8_lossy(&urlencoding::decode_binary(password.as_bytes()))
127 .into_owned(),
128 )),
129 };
130
131 let config = S3BlobConfig::new(
132 bucket,
133 prefix,
134 role_arn,
135 endpoint,
136 region,
137 credentials,
138 knobs,
139 metrics,
140 cfg,
141 )
142 .await?;
143
144 Ok(BlobConfig::S3(config))
145 }
146 "mem" => {
147 if !cfg!(debug_assertions) {
148 warn!("persist unexpectedly using in-mem blob in a release binary");
149 }
150 let tombstone = match query_params.remove("tombstone").as_deref() {
151 None | Some("true") => true,
152 Some("false") => false,
153 Some(other) => Err(Determinate::new(anyhow!(
154 "invalid tombstone param value: {other}"
155 )))?,
156 };
157 query_params.clear();
158 Ok(BlobConfig::Mem(tombstone))
159 }
160 "http" | "https" => match url
161 .host()
162 .ok_or_else(|| anyhow!("missing protocol: {}", &url.as_str()))?
163 .to_string()
164 .split_once('.')
165 {
166 Some((account, root))
168 if account == "devstoreaccount1" || root == "blob.core.windows.net" =>
169 {
170 if let Some(container) = url
171 .path_segments()
172 .expect("azure blob storage container")
173 .next()
174 {
175 query_params.clear();
176 Ok(BlobConfig::Azure(AzureBlobConfig::new(
177 account.to_string(),
178 container.to_string(),
179 "".to_string(),
183 metrics,
184 url.clone().into_redacted(),
185 knobs,
186 cfg,
187 )?))
188 } else {
189 Err(anyhow!("unknown persist blob scheme: {}", url.as_str()))
190 }
191 }
192 _ => Err(anyhow!("unknown persist blob scheme: {}", url.as_str())),
193 },
194 #[cfg(feature = "turmoil")]
195 "turmoil" => {
196 let cfg = crate::turmoil::BlobConfig::new(url);
197 Ok(BlobConfig::Turmoil(cfg))
198 }
199 p => Err(anyhow!(
200 "unknown persist blob scheme {}: {}",
201 p,
202 url.as_str()
203 )),
204 }?;
205
206 if !query_params.is_empty() {
207 return Err(ExternalError::from(anyhow!(
208 "unknown blob location params {}: {}",
209 query_params
210 .keys()
211 .map(|x| x.as_ref())
212 .collect::<Vec<_>>()
213 .join(" "),
214 url.as_str(),
215 )));
216 }
217
218 Ok(config)
219 }
220}
221
222#[derive(Debug, Clone)]
224pub enum ConsensusConfig {
225 Postgres(PostgresConsensusConfig),
227 Mem,
229 #[cfg(feature = "turmoil")]
230 Turmoil(crate::turmoil::ConsensusConfig),
232}
233
234impl ConsensusConfig {
235 pub async fn open(self) -> Result<Arc<dyn Consensus>, ExternalError> {
237 match self {
238 ConsensusConfig::Postgres(config) => {
239 Ok(Arc::new(PostgresConsensus::open(config).await?))
240 }
241 ConsensusConfig::Mem => Ok(Arc::new(MemConsensus::default())),
242 #[cfg(feature = "turmoil")]
243 ConsensusConfig::Turmoil(config) => {
244 Ok(Arc::new(crate::turmoil::TurmoilConsensus::open(config)))
245 }
246 }
247 }
248
249 pub fn try_from(
251 url: &SensitiveUrl,
252 knobs: Box<dyn PostgresClientKnobs>,
253 metrics: PostgresClientMetrics,
254 dyncfg: Arc<ConfigSet>,
255 ) -> Result<Self, ExternalError> {
256 let config = match url.scheme() {
257 "postgres" | "postgresql" => Ok(ConsensusConfig::Postgres(
258 PostgresConsensusConfig::new(url, knobs, metrics, dyncfg)?,
259 )),
260 "mem" => {
261 if !cfg!(debug_assertions) {
262 warn!("persist unexpectedly using in-mem consensus in a release binary");
263 }
264 Ok(ConsensusConfig::Mem)
265 }
266 #[cfg(feature = "turmoil")]
267 "turmoil" => {
268 let cfg = crate::turmoil::ConsensusConfig::new(url);
269 Ok(ConsensusConfig::Turmoil(cfg))
270 }
271 p => Err(anyhow!(
272 "unknown persist consensus scheme {}: {}",
273 p,
274 url.as_str()
275 )),
276 }?;
277 Ok(config)
278 }
279}