use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::Duration;
use anyhow::anyhow;
use mz_dyncfg::ConfigSet;
use mz_ore::url::SensitiveUrl;
use tracing::warn;
use mz_postgres_client::metrics::PostgresClientMetrics;
use mz_postgres_client::PostgresClientKnobs;
use crate::file::{FileBlob, FileBlobConfig};
use crate::location::{Blob, Consensus, Determinate, ExternalError};
use crate::mem::{MemBlob, MemBlobConfig, MemConsensus};
use crate::metrics::S3BlobMetrics;
use crate::postgres::{PostgresConsensus, PostgresConsensusConfig};
use crate::s3::{S3Blob, S3BlobConfig};
pub fn all_dyn_configs(configs: ConfigSet) -> ConfigSet {
configs
.add(&crate::indexed::columnar::arrow::ENABLE_ARROW_LGALLOC_CC_SIZES)
.add(&crate::indexed::columnar::arrow::ENABLE_ARROW_LGALLOC_NONCC_SIZES)
.add(&crate::s3::ENABLE_S3_LGALLOC_CC_SIZES)
.add(&crate::s3::ENABLE_S3_LGALLOC_NONCC_SIZES)
.add(&crate::s3::ENABLE_ONE_ALLOC_PER_REQUEST)
}
#[derive(Debug, Clone)]
pub enum BlobConfig {
File(FileBlobConfig),
S3(S3BlobConfig),
Mem(bool),
}
pub trait BlobKnobs: std::fmt::Debug + Send + Sync {
fn operation_timeout(&self) -> Duration;
fn operation_attempt_timeout(&self) -> Duration;
fn connect_timeout(&self) -> Duration;
fn read_timeout(&self) -> Duration;
fn is_cc_active(&self) -> bool;
}
impl BlobConfig {
pub async fn open(self) -> Result<Arc<dyn Blob>, ExternalError> {
match self {
BlobConfig::File(config) => Ok(Arc::new(FileBlob::open(config).await?)),
BlobConfig::S3(config) => Ok(Arc::new(S3Blob::open(config).await?)),
BlobConfig::Mem(tombstone) => {
Ok(Arc::new(MemBlob::open(MemBlobConfig::new(tombstone))))
}
}
}
pub async fn try_from(
url: &SensitiveUrl,
knobs: Box<dyn BlobKnobs>,
metrics: S3BlobMetrics,
cfg: Arc<ConfigSet>,
) -> Result<Self, ExternalError> {
let mut query_params = url.query_pairs().collect::<BTreeMap<_, _>>();
let config = match url.scheme() {
"file" => {
let mut config = FileBlobConfig::from(url.path());
if query_params.remove("tombstone").is_some() {
config.tombstone = true;
}
Ok(BlobConfig::File(config))
}
"s3" => {
let bucket = url
.host()
.ok_or_else(|| anyhow!("missing bucket: {}", &url.as_str()))?
.to_string();
let prefix = url
.path()
.strip_prefix('/')
.unwrap_or_else(|| url.path())
.to_string();
let role_arn = query_params.remove("role_arn").map(|x| x.into_owned());
let endpoint = query_params.remove("endpoint").map(|x| x.into_owned());
let region = query_params.remove("region").map(|x| x.into_owned());
let credentials = match url.password() {
None => None,
Some(password) => Some((
String::from_utf8_lossy(&urlencoding::decode_binary(
url.username().as_bytes(),
))
.into_owned(),
String::from_utf8_lossy(&urlencoding::decode_binary(password.as_bytes()))
.into_owned(),
)),
};
let config = S3BlobConfig::new(
bucket,
prefix,
role_arn,
endpoint,
region,
credentials,
knobs,
metrics,
cfg,
)
.await?;
Ok(BlobConfig::S3(config))
}
"mem" => {
if !cfg!(debug_assertions) {
warn!("persist unexpectedly using in-mem blob in a release binary");
}
let tombstone = match query_params.remove("tombstone").as_deref() {
None | Some("true") => true,
Some("false") => false,
Some(other) => Err(Determinate::new(anyhow!(
"invalid tombstone param value: {other}"
)))?,
};
query_params.clear();
Ok(BlobConfig::Mem(tombstone))
}
p => Err(anyhow!(
"unknown persist blob scheme {}: {}",
p,
url.as_str()
)),
}?;
if !query_params.is_empty() {
return Err(ExternalError::from(anyhow!(
"unknown blob location params {}: {}",
query_params
.keys()
.map(|x| x.as_ref())
.collect::<Vec<_>>()
.join(" "),
url.as_str(),
)));
}
Ok(config)
}
}
#[derive(Debug, Clone)]
pub enum ConsensusConfig {
Postgres(PostgresConsensusConfig),
Mem,
}
impl ConsensusConfig {
pub async fn open(self) -> Result<Arc<dyn Consensus>, ExternalError> {
match self {
ConsensusConfig::Postgres(config) => {
Ok(Arc::new(PostgresConsensus::open(config).await?))
}
ConsensusConfig::Mem => Ok(Arc::new(MemConsensus::default())),
}
}
pub fn try_from(
url: &SensitiveUrl,
knobs: Box<dyn PostgresClientKnobs>,
metrics: PostgresClientMetrics,
) -> Result<Self, ExternalError> {
let config = match url.scheme() {
"postgres" | "postgresql" => Ok(ConsensusConfig::Postgres(
PostgresConsensusConfig::new(url, knobs, metrics)?,
)),
"mem" => {
if !cfg!(debug_assertions) {
warn!("persist unexpectedly using in-mem consensus in a release binary");
}
Ok(ConsensusConfig::Mem)
}
p => Err(anyhow!(
"unknown persist consensus scheme {}: {}",
p,
url.as_str()
)),
}?;
Ok(config)
}
}