1use std::collections::BTreeMap;
13use std::sync::Arc;
14use std::time::Duration;
15
16use anyhow::anyhow;
17use mz_dyncfg::ConfigSet;
18use mz_ore::url::SensitiveUrl;
19use tracing::warn;
20
21use mz_postgres_client::PostgresClientKnobs;
22use mz_postgres_client::metrics::PostgresClientMetrics;
23
24use crate::azure::{AzureBlob, AzureBlobConfig};
25use crate::file::{FileBlob, FileBlobConfig};
26#[cfg(feature = "foundationdb")]
27use crate::foundationdb::{FdbConsensus, FdbConsensusConfig};
28use crate::location::{Blob, Consensus, Determinate, ExternalError};
29use crate::mem::{MemBlob, MemBlobConfig, MemConsensus};
30use crate::metrics::S3BlobMetrics;
31use crate::postgres::{PostgresConsensus, PostgresConsensusConfig};
32use crate::s3::{S3Blob, S3BlobConfig};
33
34pub fn all_dyn_configs(configs: ConfigSet) -> ConfigSet {
36 configs
37 .add(&crate::indexed::columnar::arrow::ENABLE_ARROW_LGALLOC_CC_SIZES)
38 .add(&crate::indexed::columnar::arrow::ENABLE_ARROW_LGALLOC_NONCC_SIZES)
39 .add(&crate::postgres::USE_POSTGRES_TUNED_QUERIES)
40}
41
42#[derive(Debug, Clone)]
44pub enum BlobConfig {
45 File(FileBlobConfig),
47 S3(S3BlobConfig),
49 Mem(bool),
52 Azure(AzureBlobConfig),
54 #[cfg(feature = "turmoil")]
55 Turmoil(crate::turmoil::BlobConfig),
57}
58
59pub trait BlobKnobs: std::fmt::Debug + Send + Sync {
61 fn operation_timeout(&self) -> Duration;
63 fn operation_attempt_timeout(&self) -> Duration;
65 fn connect_timeout(&self) -> Duration;
67 fn read_timeout(&self) -> Duration;
69 fn is_cc_active(&self) -> bool;
71}
72
73impl BlobConfig {
74 pub async fn open(self) -> Result<Arc<dyn Blob>, ExternalError> {
76 match self {
77 BlobConfig::File(config) => Ok(Arc::new(FileBlob::open(config).await?)),
78 BlobConfig::S3(config) => Ok(Arc::new(S3Blob::open(config).await?)),
79 BlobConfig::Azure(config) => Ok(Arc::new(AzureBlob::open(config).await?)),
80 BlobConfig::Mem(tombstone) => {
81 Ok(Arc::new(MemBlob::open(MemBlobConfig::new(tombstone))))
82 }
83 #[cfg(feature = "turmoil")]
84 BlobConfig::Turmoil(config) => Ok(Arc::new(crate::turmoil::TurmoilBlob::open(config))),
85 }
86 }
87
88 pub async fn try_from(
90 url: &SensitiveUrl,
91 knobs: Box<dyn BlobKnobs>,
92 metrics: S3BlobMetrics,
93 cfg: Arc<ConfigSet>,
94 ) -> Result<Self, ExternalError> {
95 let mut query_params = url.query_pairs().collect::<BTreeMap<_, _>>();
96
97 let config = match url.scheme() {
98 "file" => {
99 let mut config = FileBlobConfig::from(url.path());
100 if query_params.remove("tombstone").is_some() {
101 config.tombstone = true;
102 }
103 Ok(BlobConfig::File(config))
104 }
105 "s3" => {
106 let bucket = url
107 .host()
108 .ok_or_else(|| anyhow!("missing bucket: {}", &url.as_str()))?
109 .to_string();
110 let prefix = url
111 .path()
112 .strip_prefix('/')
113 .unwrap_or_else(|| url.path())
114 .to_string();
115 let role_arn = query_params.remove("role_arn").map(|x| x.into_owned());
116 let endpoint = query_params.remove("endpoint").map(|x| x.into_owned());
117 let region = query_params.remove("region").map(|x| x.into_owned());
118
119 let credentials = match url.password() {
120 None => None,
121 Some(password) => Some((
122 String::from_utf8_lossy(&urlencoding::decode_binary(
123 url.username().as_bytes(),
124 ))
125 .into_owned(),
126 String::from_utf8_lossy(&urlencoding::decode_binary(password.as_bytes()))
127 .into_owned(),
128 )),
129 };
130
131 let config = S3BlobConfig::new(
132 bucket,
133 prefix,
134 role_arn,
135 endpoint,
136 region,
137 credentials,
138 knobs,
139 metrics,
140 )
141 .await?;
142
143 Ok(BlobConfig::S3(config))
144 }
145 "mem" => {
146 if !cfg!(debug_assertions) {
147 warn!("persist unexpectedly using in-mem blob in a release binary");
148 }
149 let tombstone = match query_params.remove("tombstone").as_deref() {
150 None | Some("true") => true,
151 Some("false") => false,
152 Some(other) => Err(Determinate::new(anyhow!(
153 "invalid tombstone param value: {other}"
154 )))?,
155 };
156 query_params.clear();
157 Ok(BlobConfig::Mem(tombstone))
158 }
159 "http" | "https" => match url
160 .host()
161 .ok_or_else(|| anyhow!("missing protocol: {}", &url.as_str()))?
162 .to_string()
163 .split_once('.')
164 {
165 Some((account, root))
167 if account == "devstoreaccount1" || root == "blob.core.windows.net" =>
168 {
169 if let Some(container) = url
170 .path_segments()
171 .expect("azure blob storage container")
172 .next()
173 {
174 query_params.clear();
175 Ok(BlobConfig::Azure(AzureBlobConfig::new(
176 account.to_string(),
177 container.to_string(),
178 "".to_string(),
182 metrics,
183 url.clone().into_redacted(),
184 knobs,
185 cfg,
186 )?))
187 } else {
188 Err(anyhow!("unknown persist blob scheme: {}", url.as_str()))
189 }
190 }
191 _ => Err(anyhow!("unknown persist blob scheme: {}", url.as_str())),
192 },
193 #[cfg(feature = "turmoil")]
194 "turmoil" => {
195 let cfg = crate::turmoil::BlobConfig::new(url);
196 Ok(BlobConfig::Turmoil(cfg))
197 }
198 p => Err(anyhow!(
199 "unknown persist blob scheme {}: {}",
200 p,
201 url.as_str()
202 )),
203 }?;
204
205 if !query_params.is_empty() {
206 return Err(ExternalError::from(anyhow!(
207 "unknown blob location params {}: {}",
208 query_params
209 .keys()
210 .map(|x| x.as_ref())
211 .collect::<Vec<_>>()
212 .join(" "),
213 url.as_str(),
214 )));
215 }
216
217 Ok(config)
218 }
219}
220
221#[derive(Debug, Clone)]
223pub enum ConsensusConfig {
224 #[cfg(feature = "foundationdb")]
225 FoundationDB(FdbConsensusConfig),
227 Postgres(PostgresConsensusConfig),
229 Mem,
231 #[cfg(feature = "turmoil")]
232 Turmoil(crate::turmoil::ConsensusConfig),
234}
235
236impl ConsensusConfig {
237 pub async fn open(self) -> Result<Arc<dyn Consensus>, ExternalError> {
239 match self {
240 #[cfg(feature = "foundationdb")]
241 ConsensusConfig::FoundationDB(config) => {
242 Ok(Arc::new(FdbConsensus::open(config).await?))
243 }
244 ConsensusConfig::Postgres(config) => {
245 Ok(Arc::new(PostgresConsensus::open(config).await?))
246 }
247 ConsensusConfig::Mem => Ok(Arc::new(MemConsensus::default())),
248 #[cfg(feature = "turmoil")]
249 ConsensusConfig::Turmoil(config) => {
250 Ok(Arc::new(crate::turmoil::TurmoilConsensus::open(config)))
251 }
252 }
253 }
254
255 pub fn try_from(
257 url: &SensitiveUrl,
258 knobs: Box<dyn PostgresClientKnobs>,
259 metrics: PostgresClientMetrics,
260 dyncfg: Arc<ConfigSet>,
261 ) -> Result<Self, ExternalError> {
262 let config = match url.scheme() {
263 #[cfg(feature = "foundationdb")]
264 "foundationdb" => Ok(ConsensusConfig::FoundationDB(FdbConsensusConfig::new(
265 url.clone(),
266 )?)),
267 "postgres" | "postgresql" => Ok(ConsensusConfig::Postgres(
268 PostgresConsensusConfig::new(url, knobs, metrics, dyncfg)?,
269 )),
270 "mem" => {
271 if !cfg!(debug_assertions) {
272 warn!("persist unexpectedly using in-mem consensus in a release binary");
273 }
274 Ok(ConsensusConfig::Mem)
275 }
276 #[cfg(feature = "turmoil")]
277 "turmoil" => {
278 let cfg = crate::turmoil::ConsensusConfig::new(url);
279 Ok(ConsensusConfig::Turmoil(cfg))
280 }
281 p => Err(anyhow!(
282 "unknown persist consensus scheme {}: {}",
283 p,
284 url.as_str()
285 )),
286 }?;
287 Ok(config)
288 }
289}