1use std::collections::BTreeMap;
13use std::sync::Arc;
14use std::time::Duration;
15
16use anyhow::anyhow;
17use mz_dyncfg::ConfigSet;
18use mz_ore::url::SensitiveUrl;
19use tracing::warn;
20
21use mz_postgres_client::PostgresClientKnobs;
22use mz_postgres_client::metrics::PostgresClientMetrics;
23
24use crate::azure::{AzureBlob, AzureBlobConfig};
25use crate::file::{FileBlob, FileBlobConfig};
26#[cfg(any(feature = "foundationdb", target_os = "linux"))]
27use crate::foundationdb::{FdbConsensus, FdbConsensusConfig};
28use crate::location::{Blob, Consensus, Determinate, ExternalError};
29use crate::mem::{MemBlob, MemBlobConfig, MemConsensus};
30use crate::metrics::S3BlobMetrics;
31use crate::postgres::{PostgresConsensus, PostgresConsensusConfig};
32use crate::s3::{S3Blob, S3BlobConfig};
33
34pub fn all_dyn_configs(configs: ConfigSet) -> ConfigSet {
36 configs
37 .add(&crate::indexed::columnar::arrow::ENABLE_ARROW_LGALLOC_CC_SIZES)
38 .add(&crate::indexed::columnar::arrow::ENABLE_ARROW_LGALLOC_NONCC_SIZES)
39 .add(&crate::s3::ENABLE_S3_LGALLOC_CC_SIZES)
40 .add(&crate::s3::ENABLE_S3_LGALLOC_NONCC_SIZES)
41 .add(&crate::postgres::USE_POSTGRES_TUNED_QUERIES)
42}
43
44#[derive(Debug, Clone)]
46pub enum BlobConfig {
47 File(FileBlobConfig),
49 S3(S3BlobConfig),
51 Mem(bool),
54 Azure(AzureBlobConfig),
56 #[cfg(feature = "turmoil")]
57 Turmoil(crate::turmoil::BlobConfig),
59}
60
61pub trait BlobKnobs: std::fmt::Debug + Send + Sync {
63 fn operation_timeout(&self) -> Duration;
65 fn operation_attempt_timeout(&self) -> Duration;
67 fn connect_timeout(&self) -> Duration;
69 fn read_timeout(&self) -> Duration;
71 fn is_cc_active(&self) -> bool;
73}
74
75impl BlobConfig {
76 pub async fn open(self) -> Result<Arc<dyn Blob>, ExternalError> {
78 match self {
79 BlobConfig::File(config) => Ok(Arc::new(FileBlob::open(config).await?)),
80 BlobConfig::S3(config) => Ok(Arc::new(S3Blob::open(config).await?)),
81 BlobConfig::Azure(config) => Ok(Arc::new(AzureBlob::open(config).await?)),
82 BlobConfig::Mem(tombstone) => {
83 Ok(Arc::new(MemBlob::open(MemBlobConfig::new(tombstone))))
84 }
85 #[cfg(feature = "turmoil")]
86 BlobConfig::Turmoil(config) => Ok(Arc::new(crate::turmoil::TurmoilBlob::open(config))),
87 }
88 }
89
90 pub async fn try_from(
92 url: &SensitiveUrl,
93 knobs: Box<dyn BlobKnobs>,
94 metrics: S3BlobMetrics,
95 cfg: Arc<ConfigSet>,
96 ) -> Result<Self, ExternalError> {
97 let mut query_params = url.query_pairs().collect::<BTreeMap<_, _>>();
98
99 let config = match url.scheme() {
100 "file" => {
101 let mut config = FileBlobConfig::from(url.path());
102 if query_params.remove("tombstone").is_some() {
103 config.tombstone = true;
104 }
105 Ok(BlobConfig::File(config))
106 }
107 "s3" => {
108 let bucket = url
109 .host()
110 .ok_or_else(|| anyhow!("missing bucket: {}", &url.as_str()))?
111 .to_string();
112 let prefix = url
113 .path()
114 .strip_prefix('/')
115 .unwrap_or_else(|| url.path())
116 .to_string();
117 let role_arn = query_params.remove("role_arn").map(|x| x.into_owned());
118 let endpoint = query_params.remove("endpoint").map(|x| x.into_owned());
119 let region = query_params.remove("region").map(|x| x.into_owned());
120
121 let credentials = match url.password() {
122 None => None,
123 Some(password) => Some((
124 String::from_utf8_lossy(&urlencoding::decode_binary(
125 url.username().as_bytes(),
126 ))
127 .into_owned(),
128 String::from_utf8_lossy(&urlencoding::decode_binary(password.as_bytes()))
129 .into_owned(),
130 )),
131 };
132
133 let config = S3BlobConfig::new(
134 bucket,
135 prefix,
136 role_arn,
137 endpoint,
138 region,
139 credentials,
140 knobs,
141 metrics,
142 cfg,
143 )
144 .await?;
145
146 Ok(BlobConfig::S3(config))
147 }
148 "mem" => {
149 if !cfg!(debug_assertions) {
150 warn!("persist unexpectedly using in-mem blob in a release binary");
151 }
152 let tombstone = match query_params.remove("tombstone").as_deref() {
153 None | Some("true") => true,
154 Some("false") => false,
155 Some(other) => Err(Determinate::new(anyhow!(
156 "invalid tombstone param value: {other}"
157 )))?,
158 };
159 query_params.clear();
160 Ok(BlobConfig::Mem(tombstone))
161 }
162 "http" | "https" => match url
163 .host()
164 .ok_or_else(|| anyhow!("missing protocol: {}", &url.as_str()))?
165 .to_string()
166 .split_once('.')
167 {
168 Some((account, root))
170 if account == "devstoreaccount1" || root == "blob.core.windows.net" =>
171 {
172 if let Some(container) = url
173 .path_segments()
174 .expect("azure blob storage container")
175 .next()
176 {
177 query_params.clear();
178 Ok(BlobConfig::Azure(AzureBlobConfig::new(
179 account.to_string(),
180 container.to_string(),
181 "".to_string(),
185 metrics,
186 url.clone().into_redacted(),
187 knobs,
188 cfg,
189 )?))
190 } else {
191 Err(anyhow!("unknown persist blob scheme: {}", url.as_str()))
192 }
193 }
194 _ => Err(anyhow!("unknown persist blob scheme: {}", url.as_str())),
195 },
196 #[cfg(feature = "turmoil")]
197 "turmoil" => {
198 let cfg = crate::turmoil::BlobConfig::new(url);
199 Ok(BlobConfig::Turmoil(cfg))
200 }
201 p => Err(anyhow!(
202 "unknown persist blob scheme {}: {}",
203 p,
204 url.as_str()
205 )),
206 }?;
207
208 if !query_params.is_empty() {
209 return Err(ExternalError::from(anyhow!(
210 "unknown blob location params {}: {}",
211 query_params
212 .keys()
213 .map(|x| x.as_ref())
214 .collect::<Vec<_>>()
215 .join(" "),
216 url.as_str(),
217 )));
218 }
219
220 Ok(config)
221 }
222}
223
224#[derive(Debug, Clone)]
226pub enum ConsensusConfig {
227 #[cfg(any(feature = "foundationdb", target_os = "linux"))]
228 FoundationDB(FdbConsensusConfig),
230 Postgres(PostgresConsensusConfig),
232 Mem,
234 #[cfg(feature = "turmoil")]
235 Turmoil(crate::turmoil::ConsensusConfig),
237}
238
239impl ConsensusConfig {
240 pub async fn open(self) -> Result<Arc<dyn Consensus>, ExternalError> {
242 match self {
243 #[cfg(any(feature = "foundationdb", target_os = "linux"))]
244 ConsensusConfig::FoundationDB(config) => {
245 Ok(Arc::new(FdbConsensus::open(config).await?))
246 }
247 ConsensusConfig::Postgres(config) => {
248 Ok(Arc::new(PostgresConsensus::open(config).await?))
249 }
250 ConsensusConfig::Mem => Ok(Arc::new(MemConsensus::default())),
251 #[cfg(feature = "turmoil")]
252 ConsensusConfig::Turmoil(config) => {
253 Ok(Arc::new(crate::turmoil::TurmoilConsensus::open(config)))
254 }
255 }
256 }
257
258 pub fn try_from(
260 url: &SensitiveUrl,
261 knobs: Box<dyn PostgresClientKnobs>,
262 metrics: PostgresClientMetrics,
263 dyncfg: Arc<ConfigSet>,
264 ) -> Result<Self, ExternalError> {
265 let config = match url.scheme() {
266 #[cfg(any(feature = "foundationdb", target_os = "linux"))]
267 "foundationdb" => Ok(ConsensusConfig::FoundationDB(FdbConsensusConfig::new(
268 url.clone(),
269 )?)),
270 "postgres" | "postgresql" => Ok(ConsensusConfig::Postgres(
271 PostgresConsensusConfig::new(url, knobs, metrics, dyncfg)?,
272 )),
273 "mem" => {
274 if !cfg!(debug_assertions) {
275 warn!("persist unexpectedly using in-mem consensus in a release binary");
276 }
277 Ok(ConsensusConfig::Mem)
278 }
279 #[cfg(feature = "turmoil")]
280 "turmoil" => {
281 let cfg = crate::turmoil::ConsensusConfig::new(url);
282 Ok(ConsensusConfig::Turmoil(cfg))
283 }
284 p => Err(anyhow!(
285 "unknown persist consensus scheme {}: {}",
286 p,
287 url.as_str()
288 )),
289 }?;
290 Ok(config)
291 }
292}