1use std::collections::BTreeMap;
13use std::sync::Arc;
14use std::time::Duration;
15
16use anyhow::anyhow;
17use mz_dyncfg::ConfigSet;
18use mz_ore::url::SensitiveUrl;
19use tracing::warn;
20
21use mz_postgres_client::PostgresClientKnobs;
22use mz_postgres_client::metrics::PostgresClientMetrics;
23
24use crate::azure::{AzureBlob, AzureBlobConfig};
25use crate::file::{FileBlob, FileBlobConfig};
26#[cfg(feature = "foundationdb")]
27use crate::foundationdb::{FdbConsensus, FdbConsensusConfig};
28use crate::location::{Blob, Consensus, Determinate, ExternalError};
29use crate::mem::{MemBlob, MemBlobConfig, MemConsensus};
30use crate::metrics::S3BlobMetrics;
31use crate::postgres::{PostgresConsensus, PostgresConsensusConfig};
32use crate::s3::{S3Blob, S3BlobConfig};
33
34pub fn all_dyn_configs(configs: ConfigSet) -> ConfigSet {
36 configs.add(&crate::postgres::USE_POSTGRES_TUNED_QUERIES)
37}
38
39#[derive(Debug, Clone)]
41pub enum BlobConfig {
42 File(FileBlobConfig),
44 S3(S3BlobConfig),
46 Mem(bool),
49 Azure(AzureBlobConfig),
51 #[cfg(feature = "turmoil")]
52 Turmoil(crate::turmoil::BlobConfig),
54}
55
56pub trait BlobKnobs: std::fmt::Debug + Send + Sync {
58 fn operation_timeout(&self) -> Duration;
60 fn operation_attempt_timeout(&self) -> Duration;
62 fn connect_timeout(&self) -> Duration;
64 fn read_timeout(&self) -> Duration;
66 fn is_cc_active(&self) -> bool;
68}
69
70impl BlobConfig {
71 pub async fn open(self) -> Result<Arc<dyn Blob>, ExternalError> {
73 match self {
74 BlobConfig::File(config) => Ok(Arc::new(FileBlob::open(config).await?)),
75 BlobConfig::S3(config) => Ok(Arc::new(S3Blob::open(config).await?)),
76 BlobConfig::Azure(config) => Ok(Arc::new(AzureBlob::open(config).await?)),
77 BlobConfig::Mem(tombstone) => {
78 Ok(Arc::new(MemBlob::open(MemBlobConfig::new(tombstone))))
79 }
80 #[cfg(feature = "turmoil")]
81 BlobConfig::Turmoil(config) => Ok(Arc::new(crate::turmoil::TurmoilBlob::open(config))),
82 }
83 }
84
85 pub async fn try_from(
87 url: &SensitiveUrl,
88 knobs: Box<dyn BlobKnobs>,
89 metrics: S3BlobMetrics,
90 ) -> Result<Self, ExternalError> {
91 let mut query_params = url.query_pairs().collect::<BTreeMap<_, _>>();
92
93 let config = match url.scheme() {
94 "file" => {
95 let mut config = FileBlobConfig::from(url.path());
96 if query_params.remove("tombstone").is_some() {
97 config.tombstone = true;
98 }
99 Ok(BlobConfig::File(config))
100 }
101 "s3" => {
102 let bucket = url
103 .host()
104 .ok_or_else(|| anyhow!("missing bucket: {}", &url.as_str()))?
105 .to_string();
106 let prefix = url
107 .path()
108 .strip_prefix('/')
109 .unwrap_or_else(|| url.path())
110 .to_string();
111 let role_arn = query_params.remove("role_arn").map(|x| x.into_owned());
112 let endpoint = query_params.remove("endpoint").map(|x| x.into_owned());
113 let region = query_params.remove("region").map(|x| x.into_owned());
114
115 let credentials = match url.password() {
116 None => None,
117 Some(password) => Some((
118 String::from_utf8_lossy(&urlencoding::decode_binary(
119 url.username().as_bytes(),
120 ))
121 .into_owned(),
122 String::from_utf8_lossy(&urlencoding::decode_binary(password.as_bytes()))
123 .into_owned(),
124 )),
125 };
126
127 let config = S3BlobConfig::new(
128 bucket,
129 prefix,
130 role_arn,
131 endpoint,
132 region,
133 credentials,
134 knobs,
135 metrics,
136 )
137 .await?;
138
139 Ok(BlobConfig::S3(config))
140 }
141 "mem" => {
142 if !cfg!(debug_assertions) {
143 warn!("persist unexpectedly using in-mem blob in a release binary");
144 }
145 let tombstone = match query_params.remove("tombstone").as_deref() {
146 None | Some("true") => true,
147 Some("false") => false,
148 Some(other) => Err(Determinate::new(anyhow!(
149 "invalid tombstone param value: {other}"
150 )))?,
151 };
152 query_params.clear();
153 Ok(BlobConfig::Mem(tombstone))
154 }
155 "http" | "https" => match url
156 .host()
157 .ok_or_else(|| anyhow!("missing protocol: {}", &url.as_str()))?
158 .to_string()
159 .split_once('.')
160 {
161 Some((account, root))
163 if account == "devstoreaccount1" || root == "blob.core.windows.net" =>
164 {
165 if let Some(container) = url
166 .path_segments()
167 .expect("azure blob storage container")
168 .next()
169 {
170 query_params.clear();
171 Ok(BlobConfig::Azure(AzureBlobConfig::new(
172 account.to_string(),
173 container.to_string(),
174 "".to_string(),
178 metrics,
179 url.clone().into_redacted(),
180 knobs,
181 )?))
182 } else {
183 Err(anyhow!("unknown persist blob scheme: {}", url.as_str()))
184 }
185 }
186 _ => Err(anyhow!("unknown persist blob scheme: {}", url.as_str())),
187 },
188 #[cfg(feature = "turmoil")]
189 "turmoil" => {
190 let cfg = crate::turmoil::BlobConfig::new(url);
191 Ok(BlobConfig::Turmoil(cfg))
192 }
193 p => Err(anyhow!(
194 "unknown persist blob scheme {}: {}",
195 p,
196 url.as_str()
197 )),
198 }?;
199
200 if !query_params.is_empty() {
201 return Err(ExternalError::from(anyhow!(
202 "unknown blob location params {}: {}",
203 query_params
204 .keys()
205 .map(|x| x.as_ref())
206 .collect::<Vec<_>>()
207 .join(" "),
208 url.as_str(),
209 )));
210 }
211
212 Ok(config)
213 }
214}
215
216#[derive(Debug, Clone)]
218pub enum ConsensusConfig {
219 #[cfg(feature = "foundationdb")]
220 FoundationDB(FdbConsensusConfig),
222 Postgres(PostgresConsensusConfig),
224 Mem,
226 #[cfg(feature = "turmoil")]
227 Turmoil(crate::turmoil::ConsensusConfig),
229}
230
231impl ConsensusConfig {
232 pub async fn open(self) -> Result<Arc<dyn Consensus>, ExternalError> {
234 match self {
235 #[cfg(feature = "foundationdb")]
236 ConsensusConfig::FoundationDB(config) => {
237 Ok(Arc::new(FdbConsensus::open(config).await?))
238 }
239 ConsensusConfig::Postgres(config) => {
240 Ok(Arc::new(PostgresConsensus::open(config).await?))
241 }
242 ConsensusConfig::Mem => Ok(Arc::new(MemConsensus::default())),
243 #[cfg(feature = "turmoil")]
244 ConsensusConfig::Turmoil(config) => {
245 Ok(Arc::new(crate::turmoil::TurmoilConsensus::open(config)))
246 }
247 }
248 }
249
250 pub fn try_from(
252 url: &SensitiveUrl,
253 knobs: Box<dyn PostgresClientKnobs>,
254 metrics: PostgresClientMetrics,
255 dyncfg: Arc<ConfigSet>,
256 ) -> Result<Self, ExternalError> {
257 let config = match url.scheme() {
258 #[cfg(feature = "foundationdb")]
259 "foundationdb" => Ok(ConsensusConfig::FoundationDB(FdbConsensusConfig::new(
260 url.clone(),
261 )?)),
262 "postgres" | "postgresql" => Ok(ConsensusConfig::Postgres(
263 PostgresConsensusConfig::new(url, knobs, metrics, dyncfg)?,
264 )),
265 "mem" => {
266 if !cfg!(debug_assertions) {
267 warn!("persist unexpectedly using in-mem consensus in a release binary");
268 }
269 Ok(ConsensusConfig::Mem)
270 }
271 #[cfg(feature = "turmoil")]
272 "turmoil" => {
273 let cfg = crate::turmoil::ConsensusConfig::new(url);
274 Ok(ConsensusConfig::Turmoil(cfg))
275 }
276 p => Err(anyhow!(
277 "unknown persist consensus scheme {}: {}",
278 p,
279 url.as_str()
280 )),
281 }?;
282 Ok(config)
283 }
284}