use std::borrow::Cow;
use std::clone::Clone;
use std::collections::BTreeMap;
use std::fmt::Debug;
use std::net::IpAddr;
use std::string::ToString;
use std::sync::{Arc, LazyLock};
use std::time::Duration;
use chrono::{DateTime, Utc};
use derivative::Derivative;
use im::OrdMap;
use mz_build_info::BuildInfo;
use mz_dyncfg::{ConfigSet, ConfigType, ConfigUpdates, ConfigVal};
use mz_persist_client::cfg::{CRDB_CONNECT_TIMEOUT, CRDB_TCP_USER_TIMEOUT};
use mz_repr::adt::numeric::Numeric;
use mz_repr::adt::timestamp::CheckedTimestamp;
use mz_repr::bytes::ByteSize;
use mz_repr::user::ExternalUserMetadata;
use mz_tracing::{CloneableEnvFilter, SerializableDirective};
use serde::Serialize;
use thiserror::Error;
use tracing::error;
use uncased::UncasedStr;
use crate::ast::Ident;
use crate::session::user::User;
pub(crate) mod constraints;
pub(crate) mod definitions;
pub(crate) mod errors;
pub(crate) mod polyfill;
pub(crate) mod value;
pub use definitions::*;
pub use errors::*;
pub use value::*;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum EndTransactionAction {
Commit,
Rollback,
}
#[derive(Debug, Clone, Copy)]
pub enum VarInput<'a> {
Flat(&'a str),
SqlSet(&'a [String]),
}
impl<'a> VarInput<'a> {
pub fn to_vec(&self) -> Vec<String> {
match self {
VarInput::Flat(v) => vec![v.to_string()],
VarInput::SqlSet(values) => values.into_iter().map(|v| v.to_string()).collect(),
}
}
}
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize)]
pub enum OwnedVarInput {
Flat(String),
SqlSet(Vec<String>),
}
impl OwnedVarInput {
pub fn borrow(&self) -> VarInput {
match self {
OwnedVarInput::Flat(v) => VarInput::Flat(v),
OwnedVarInput::SqlSet(v) => VarInput::SqlSet(v),
}
}
}
pub trait Var: Debug {
fn name(&self) -> &'static str;
fn value(&self) -> String;
fn description(&self) -> &'static str;
fn type_name(&self) -> Cow<'static, str>;
fn visible(&self, user: &User, system_vars: Option<&SystemVars>) -> Result<(), VarError>;
fn as_var(&self) -> &dyn Var
where
Self: Sized,
{
self
}
}
#[derive(Debug)]
pub struct SessionVar {
definition: VarDefinition,
default_value: Option<Box<dyn Value>>,
local_value: Option<Box<dyn Value>>,
staged_value: Option<Box<dyn Value>>,
session_value: Option<Box<dyn Value>>,
}
impl Clone for SessionVar {
fn clone(&self) -> Self {
SessionVar {
definition: self.definition.clone(),
default_value: self.default_value.as_ref().map(|v| v.box_clone()),
local_value: self.local_value.as_ref().map(|v| v.box_clone()),
staged_value: self.staged_value.as_ref().map(|v| v.box_clone()),
session_value: self.session_value.as_ref().map(|v| v.box_clone()),
}
}
}
impl SessionVar {
pub const fn new(var: VarDefinition) -> Self {
SessionVar {
definition: var,
default_value: None,
local_value: None,
staged_value: None,
session_value: None,
}
}
pub fn check(&self, input: VarInput) -> Result<String, VarError> {
let v = self.definition.parse(input)?;
self.validate_constraints(v.as_ref())?;
Ok(v.format())
}
pub fn set(&mut self, input: VarInput, local: bool) -> Result<(), VarError> {
let v = self.definition.parse(input)?;
self.validate_constraints(v.as_ref())?;
if local {
self.local_value = Some(v);
} else {
self.local_value = None;
self.staged_value = Some(v);
}
Ok(())
}
pub fn set_default(&mut self, input: VarInput) -> Result<(), VarError> {
let v = self.definition.parse(input)?;
self.validate_constraints(v.as_ref())?;
self.default_value = Some(v);
Ok(())
}
pub fn reset(&mut self, local: bool) {
let value = self
.default_value
.as_ref()
.map(|v| v.as_ref())
.unwrap_or(self.definition.value.value());
if local {
self.local_value = Some(value.box_clone());
} else {
self.local_value = None;
self.staged_value = Some(value.box_clone());
}
}
#[must_use]
pub fn end_transaction(&self, action: EndTransactionAction) -> Option<Self> {
if !self.is_mutating() {
return None;
}
let mut next: Self = self.clone();
next.local_value = None;
match action {
EndTransactionAction::Commit if next.staged_value.is_some() => {
next.session_value = next.staged_value.take()
}
_ => next.staged_value = None,
}
Some(next)
}
pub fn is_mutating(&self) -> bool {
self.local_value.is_some() || self.staged_value.is_some()
}
pub fn value_dyn(&self) -> &dyn Value {
self.local_value
.as_deref()
.or(self.staged_value.as_deref())
.or(self.session_value.as_deref())
.or(self.default_value.as_deref())
.unwrap_or(self.definition.value.value())
}
pub fn inspect_session_value(&self) -> Option<&dyn Value> {
self.session_value.as_deref()
}
fn validate_constraints(&self, val: &dyn Value) -> Result<(), VarError> {
if let Some(constraint) = &self.definition.constraint {
constraint.check_constraint(self, self.value_dyn(), val)
} else {
Ok(())
}
}
}
impl Var for SessionVar {
fn name(&self) -> &'static str {
self.definition.name.as_str()
}
fn value(&self) -> String {
self.value_dyn().format()
}
fn description(&self) -> &'static str {
self.definition.description
}
fn type_name(&self) -> Cow<'static, str> {
self.definition.type_name()
}
fn visible(
&self,
user: &User,
system_vars: Option<&super::vars::SystemVars>,
) -> Result<(), super::vars::VarError> {
self.definition.visible(user, system_vars)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MzVersion {
build_info: &'static BuildInfo,
helm_chart_version: Option<String>,
}
impl MzVersion {
pub fn new(build_info: &'static BuildInfo, helm_chart_version: Option<String>) -> Self {
MzVersion {
build_info,
helm_chart_version,
}
}
}
#[derive(Debug, Clone)]
pub struct SessionVars {
vars: OrdMap<&'static UncasedStr, SessionVar>,
mz_version: MzVersion,
user: User,
}
impl SessionVars {
pub fn new_unchecked(
build_info: &'static BuildInfo,
user: User,
helm_chart_version: Option<String>,
) -> SessionVars {
use definitions::*;
let vars = [
&FAILPOINTS,
&SERVER_VERSION,
&SERVER_VERSION_NUM,
&SQL_SAFE_UPDATES,
&REAL_TIME_RECENCY,
&EMIT_PLAN_INSIGHTS_NOTICE,
&EMIT_TIMESTAMP_NOTICE,
&EMIT_TRACE_ID_NOTICE,
&AUTO_ROUTE_CATALOG_QUERIES,
&ENABLE_SESSION_RBAC_CHECKS,
&ENABLE_SESSION_CARDINALITY_ESTIMATES,
&MAX_IDENTIFIER_LENGTH,
&STATEMENT_LOGGING_SAMPLE_RATE,
&EMIT_INTROSPECTION_QUERY_NOTICE,
&UNSAFE_NEW_TRANSACTION_WALL_TIME,
&WELCOME_MESSAGE,
]
.into_iter()
.chain(SystemVars::SESSION_VARS.iter().map(|(_name, var)| *var))
.map(|var| (var.name, SessionVar::new(var.clone())))
.collect();
SessionVars {
vars,
mz_version: MzVersion::new(build_info, helm_chart_version),
user,
}
}
fn expect_value<V: Value>(&self, var: &VarDefinition) -> &V {
let var = self
.vars
.get(var.name)
.expect("provided var should be in state");
let val = var.value_dyn();
val.as_any().downcast_ref::<V>().expect("success")
}
pub fn iter(&self) -> impl Iterator<Item = &dyn Var> {
#[allow(clippy::as_conversions)]
self.vars
.values()
.map(|v| v.as_var())
.chain([&self.mz_version as &dyn Var, &self.user])
}
pub fn notify_set(&self) -> impl Iterator<Item = &dyn Var> {
[
&APPLICATION_NAME,
&CLIENT_ENCODING,
&DATE_STYLE,
&INTEGER_DATETIMES,
&SERVER_VERSION,
&STANDARD_CONFORMING_STRINGS,
&TIMEZONE,
&INTERVAL_STYLE,
&CLUSTER,
&CLUSTER_REPLICA,
&DATABASE,
&SEARCH_PATH,
]
.into_iter()
.map(|p| self.get(None, p.name()).expect("SystemVars known to exist"))
.chain(std::iter::once(self.mz_version.as_var()))
}
pub fn reset_all(&mut self) {
let names: Vec<_> = self.vars.keys().copied().collect();
for name in names {
self.vars[name].reset(false);
}
}
pub fn get(&self, system_vars: Option<&SystemVars>, name: &str) -> Result<&dyn Var, VarError> {
let name = compat_translate_name(name);
let name = UncasedStr::new(name);
if name == MZ_VERSION_NAME {
Ok(&self.mz_version)
} else if name == IS_SUPERUSER_NAME {
Ok(&self.user)
} else {
self.vars
.get(name)
.map(|v| {
v.visible(&self.user, system_vars)?;
Ok(v.as_var())
})
.transpose()?
.ok_or_else(|| VarError::UnknownParameter(name.to_string()))
}
}
pub fn inspect(&self, name: &str) -> Result<&SessionVar, VarError> {
let name = compat_translate_name(name);
self.vars
.get(UncasedStr::new(name))
.ok_or_else(|| VarError::UnknownParameter(name.to_string()))
}
pub fn set(
&mut self,
system_vars: Option<&SystemVars>,
name: &str,
input: VarInput,
local: bool,
) -> Result<(), VarError> {
let (name, input) = compat_translate(name, input);
let name = UncasedStr::new(name);
self.check_read_only(name)?;
self.vars
.get_mut(name)
.map(|v| {
v.visible(&self.user, system_vars)?;
v.set(input, local)
})
.transpose()?
.ok_or_else(|| VarError::UnknownParameter(name.to_string()))
}
pub fn set_default(&mut self, name: &str, input: VarInput) -> Result<(), VarError> {
let (name, input) = compat_translate(name, input);
let name = UncasedStr::new(name);
self.check_read_only(name)?;
self.vars
.get_mut(name)
.map(|v| v.set_default(input))
.transpose()?
.ok_or_else(|| VarError::UnknownParameter(name.to_string()))
}
pub fn reset(
&mut self,
system_vars: Option<&SystemVars>,
name: &str,
local: bool,
) -> Result<(), VarError> {
let name = compat_translate_name(name);
let name = UncasedStr::new(name);
self.check_read_only(name)?;
self.vars
.get_mut(name)
.map(|v| {
v.visible(&self.user, system_vars)?;
v.reset(local);
Ok(())
})
.transpose()?
.ok_or_else(|| VarError::UnknownParameter(name.to_string()))
}
fn check_read_only(&self, name: &UncasedStr) -> Result<(), VarError> {
if name == MZ_VERSION_NAME {
Err(VarError::ReadOnlyParameter(MZ_VERSION_NAME.as_str()))
} else if name == IS_SUPERUSER_NAME {
Err(VarError::ReadOnlyParameter(IS_SUPERUSER_NAME.as_str()))
} else if name == MAX_IDENTIFIER_LENGTH.name {
Err(VarError::ReadOnlyParameter(
MAX_IDENTIFIER_LENGTH.name.as_str(),
))
} else {
Ok(())
}
}
#[mz_ore::instrument(level = "debug")]
pub fn end_transaction(
&mut self,
action: EndTransactionAction,
) -> BTreeMap<&'static str, String> {
let mut changed = BTreeMap::new();
let mut updates = Vec::new();
for (name, var) in self.vars.iter() {
if !var.is_mutating() {
continue;
}
let before = var.value();
let next = var.end_transaction(action).expect("must mutate");
let after = next.value();
updates.push((*name, next));
if before != after {
changed.insert(var.name(), after);
}
}
self.vars.extend(updates);
changed
}
pub fn application_name(&self) -> &str {
self.expect_value::<String>(&APPLICATION_NAME).as_str()
}
pub fn build_info(&self) -> &'static BuildInfo {
self.mz_version.build_info
}
pub fn client_encoding(&self) -> &ClientEncoding {
self.expect_value(&CLIENT_ENCODING)
}
pub fn client_min_messages(&self) -> &ClientSeverity {
self.expect_value(&CLIENT_MIN_MESSAGES)
}
pub fn cluster(&self) -> &str {
self.expect_value::<String>(&CLUSTER).as_str()
}
pub fn cluster_replica(&self) -> Option<&str> {
self.expect_value::<Option<String>>(&CLUSTER_REPLICA)
.as_deref()
}
pub fn current_object_missing_warnings(&self) -> bool {
*self.expect_value::<bool>(&CURRENT_OBJECT_MISSING_WARNINGS)
}
pub fn date_style(&self) -> &[&str] {
&self.expect_value::<DateStyle>(&DATE_STYLE).0
}
pub fn database(&self) -> &str {
self.expect_value::<String>(&DATABASE).as_str()
}
pub fn extra_float_digits(&self) -> i32 {
*self.expect_value(&EXTRA_FLOAT_DIGITS)
}
pub fn integer_datetimes(&self) -> bool {
*self.expect_value(&INTEGER_DATETIMES)
}
pub fn intervalstyle(&self) -> &IntervalStyle {
self.expect_value(&INTERVAL_STYLE)
}
pub fn mz_version(&self) -> String {
self.mz_version.value()
}
pub fn search_path(&self) -> &[Ident] {
self.expect_value::<Vec<Ident>>(&SEARCH_PATH).as_slice()
}
pub fn server_version(&self) -> &str {
self.expect_value::<String>(&SERVER_VERSION).as_str()
}
pub fn server_version_num(&self) -> i32 {
*self.expect_value(&SERVER_VERSION_NUM)
}
pub fn sql_safe_updates(&self) -> bool {
*self.expect_value(&SQL_SAFE_UPDATES)
}
pub fn standard_conforming_strings(&self) -> bool {
*self.expect_value(&STANDARD_CONFORMING_STRINGS)
}
pub fn statement_timeout(&self) -> &Duration {
self.expect_value(&STATEMENT_TIMEOUT)
}
pub fn idle_in_transaction_session_timeout(&self) -> &Duration {
self.expect_value(&IDLE_IN_TRANSACTION_SESSION_TIMEOUT)
}
pub fn timezone(&self) -> &TimeZone {
self.expect_value(&TIMEZONE)
}
pub fn transaction_isolation(&self) -> &IsolationLevel {
self.expect_value(&TRANSACTION_ISOLATION)
}
pub fn real_time_recency(&self) -> bool {
*self.expect_value(&REAL_TIME_RECENCY)
}
pub fn real_time_recency_timeout(&self) -> &Duration {
self.expect_value(&REAL_TIME_RECENCY_TIMEOUT)
}
pub fn emit_plan_insights_notice(&self) -> bool {
*self.expect_value(&EMIT_PLAN_INSIGHTS_NOTICE)
}
pub fn emit_timestamp_notice(&self) -> bool {
*self.expect_value(&EMIT_TIMESTAMP_NOTICE)
}
pub fn emit_trace_id_notice(&self) -> bool {
*self.expect_value(&EMIT_TRACE_ID_NOTICE)
}
pub fn auto_route_catalog_queries(&self) -> bool {
*self.expect_value(&AUTO_ROUTE_CATALOG_QUERIES)
}
pub fn enable_session_rbac_checks(&self) -> bool {
*self.expect_value(&ENABLE_SESSION_RBAC_CHECKS)
}
pub fn enable_session_cardinality_estimates(&self) -> bool {
*self.expect_value(&ENABLE_SESSION_CARDINALITY_ESTIMATES)
}
pub fn is_superuser(&self) -> bool {
self.user.is_superuser()
}
pub fn user(&self) -> &User {
&self.user
}
pub fn max_query_result_size(&self) -> u64 {
self.expect_value::<ByteSize>(&MAX_QUERY_RESULT_SIZE)
.as_bytes()
}
pub fn set_external_user_metadata(&mut self, metadata: ExternalUserMetadata) {
self.user.external_metadata = Some(metadata);
}
pub fn set_cluster(&mut self, cluster: String) {
self.set(None, CLUSTER.name(), VarInput::Flat(&cluster), false)
.expect("setting cluster from string succeeds");
}
pub fn get_statement_logging_sample_rate(&self) -> Numeric {
*self.expect_value(&STATEMENT_LOGGING_SAMPLE_RATE)
}
pub fn emit_introspection_query_notice(&self) -> bool {
*self.expect_value(&EMIT_INTROSPECTION_QUERY_NOTICE)
}
pub fn unsafe_new_transaction_wall_time(&self) -> Option<CheckedTimestamp<DateTime<Utc>>> {
*self.expect_value(&UNSAFE_NEW_TRANSACTION_WALL_TIME)
}
pub fn welcome_message(&self) -> bool {
*self.expect_value(&WELCOME_MESSAGE)
}
}
pub const OLD_CATALOG_SERVER_CLUSTER: &str = "mz_introspection";
pub const OLD_AUTO_ROUTE_CATALOG_QUERIES: &str = "auto_route_introspection_queries";
fn compat_translate<'a, 'b>(name: &'a str, input: VarInput<'b>) -> (&'a str, VarInput<'b>) {
if name == CLUSTER.name() {
if let Ok(value) = CLUSTER.parse(input) {
if value.format() == OLD_CATALOG_SERVER_CLUSTER {
tracing::debug!(
github_27285 = true,
"encountered deprecated `cluster` variable value: {}",
OLD_CATALOG_SERVER_CLUSTER,
);
return (name, VarInput::Flat("mz_catalog_server"));
}
}
}
if name == OLD_AUTO_ROUTE_CATALOG_QUERIES {
tracing::debug!(
github_27285 = true,
"encountered deprecated `{}` variable name",
OLD_AUTO_ROUTE_CATALOG_QUERIES,
);
return (AUTO_ROUTE_CATALOG_QUERIES.name(), input);
}
(name, input)
}
fn compat_translate_name(name: &str) -> &str {
let (name, _) = compat_translate(name, VarInput::Flat(""));
name
}
#[derive(Debug)]
pub struct SystemVar {
definition: VarDefinition,
persisted_value: Option<Box<dyn Value>>,
dynamic_default: Option<Box<dyn Value>>,
}
impl Clone for SystemVar {
fn clone(&self) -> Self {
SystemVar {
definition: self.definition.clone(),
persisted_value: self.persisted_value.as_ref().map(|v| v.box_clone()),
dynamic_default: self.dynamic_default.as_ref().map(|v| v.box_clone()),
}
}
}
impl SystemVar {
pub fn new(definition: VarDefinition) -> Self {
SystemVar {
definition,
persisted_value: None,
dynamic_default: None,
}
}
fn is_default(&self, input: VarInput) -> Result<bool, VarError> {
let v = self.definition.parse(input)?;
Ok(self.definition.default_value() == v.as_ref())
}
pub fn value_dyn(&self) -> &dyn Value {
self.persisted_value
.as_deref()
.or(self.dynamic_default.as_deref())
.unwrap_or(self.definition.default_value())
}
pub fn value<V: 'static>(&self) -> &V {
let val = self.value_dyn();
val.as_any().downcast_ref::<V>().expect("success")
}
fn parse(&self, input: VarInput) -> Result<Box<dyn Value>, VarError> {
let v = self.definition.parse(input)?;
self.validate_constraints(v.as_ref())?;
Ok(v)
}
fn set(&mut self, input: VarInput) -> Result<bool, VarError> {
let v = self.parse(input)?;
if self.persisted_value.as_ref() != Some(&v) {
self.persisted_value = Some(v);
Ok(true)
} else {
Ok(false)
}
}
fn reset(&mut self) -> bool {
if self.persisted_value.is_some() {
self.persisted_value = None;
true
} else {
false
}
}
fn set_default(&mut self, input: VarInput) -> Result<(), VarError> {
let v = self.definition.parse(input)?;
self.dynamic_default = Some(v);
Ok(())
}
fn validate_constraints(&self, val: &dyn Value) -> Result<(), VarError> {
if let Some(constraint) = &self.definition.constraint {
constraint.check_constraint(self, self.value_dyn(), val)
} else {
Ok(())
}
}
}
impl Var for SystemVar {
fn name(&self) -> &'static str {
self.definition.name.as_str()
}
fn value(&self) -> String {
self.value_dyn().format()
}
fn description(&self) -> &'static str {
self.definition.description
}
fn type_name(&self) -> Cow<'static, str> {
self.definition.type_name()
}
fn visible(
&self,
user: &User,
system_vars: Option<&super::vars::SystemVars>,
) -> Result<(), super::vars::VarError> {
self.definition.visible(user, system_vars)
}
}
#[derive(Debug, Error)]
pub enum NetworkPolicyError {
#[error("Access denied for address {0}")]
AddressDenied(IpAddr),
}
#[derive(Derivative, Clone)]
#[derivative(Debug)]
pub struct SystemVars {
allow_unsafe: bool,
vars: BTreeMap<&'static UncasedStr, SystemVar>,
#[derivative(Debug = "ignore")]
callbacks: BTreeMap<String, Vec<Arc<dyn Fn(&SystemVars) + Send + Sync>>>,
dyncfgs: ConfigSet,
}
impl Default for SystemVars {
fn default() -> Self {
Self::new()
}
}
impl SystemVars {
const SESSION_VARS: LazyLock<BTreeMap<&'static UncasedStr, &'static VarDefinition>> =
LazyLock::new(|| {
[
&APPLICATION_NAME,
&CLIENT_ENCODING,
&CLIENT_MIN_MESSAGES,
&CLUSTER,
&CLUSTER_REPLICA,
&CURRENT_OBJECT_MISSING_WARNINGS,
&DATABASE,
&DATE_STYLE,
&EXTRA_FLOAT_DIGITS,
&INTEGER_DATETIMES,
&INTERVAL_STYLE,
&REAL_TIME_RECENCY_TIMEOUT,
&SEARCH_PATH,
&STANDARD_CONFORMING_STRINGS,
&STATEMENT_TIMEOUT,
&IDLE_IN_TRANSACTION_SESSION_TIMEOUT,
&TIMEZONE,
&TRANSACTION_ISOLATION,
&MAX_QUERY_RESULT_SIZE,
]
.into_iter()
.map(|var| (UncasedStr::new(var.name()), var))
.collect()
});
pub fn new() -> Self {
let system_vars = vec![
&MAX_KAFKA_CONNECTIONS,
&MAX_POSTGRES_CONNECTIONS,
&MAX_MYSQL_CONNECTIONS,
&MAX_AWS_PRIVATELINK_CONNECTIONS,
&MAX_TABLES,
&MAX_SOURCES,
&MAX_SINKS,
&MAX_MATERIALIZED_VIEWS,
&MAX_CLUSTERS,
&MAX_REPLICAS_PER_CLUSTER,
&MAX_CREDIT_CONSUMPTION_RATE,
&MAX_DATABASES,
&MAX_SCHEMAS_PER_DATABASE,
&MAX_OBJECTS_PER_SCHEMA,
&MAX_SECRETS,
&MAX_ROLES,
&MAX_CONTINUAL_TASKS,
&MAX_NETWORK_POLICIES,
&MAX_RULES_PER_NETWORK_POLICY,
&MAX_RESULT_SIZE,
&MAX_COPY_FROM_SIZE,
&ALLOWED_CLUSTER_REPLICA_SIZES,
&DISK_CLUSTER_REPLICAS_DEFAULT,
&upsert_rocksdb::UPSERT_ROCKSDB_AUTO_SPILL_TO_DISK,
&upsert_rocksdb::UPSERT_ROCKSDB_AUTO_SPILL_THRESHOLD_BYTES,
&upsert_rocksdb::UPSERT_ROCKSDB_COMPACTION_STYLE,
&upsert_rocksdb::UPSERT_ROCKSDB_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET,
&upsert_rocksdb::UPSERT_ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES,
&upsert_rocksdb::UPSERT_ROCKSDB_UNIVERSAL_COMPACTION_RATIO,
&upsert_rocksdb::UPSERT_ROCKSDB_PARALLELISM,
&upsert_rocksdb::UPSERT_ROCKSDB_COMPRESSION_TYPE,
&upsert_rocksdb::UPSERT_ROCKSDB_BOTTOMMOST_COMPRESSION_TYPE,
&upsert_rocksdb::UPSERT_ROCKSDB_BATCH_SIZE,
&upsert_rocksdb::UPSERT_ROCKSDB_RETRY_DURATION,
&upsert_rocksdb::UPSERT_ROCKSDB_STATS_LOG_INTERVAL_SECONDS,
&upsert_rocksdb::UPSERT_ROCKSDB_STATS_PERSIST_INTERVAL_SECONDS,
&upsert_rocksdb::UPSERT_ROCKSDB_POINT_LOOKUP_BLOCK_CACHE_SIZE_MB,
&upsert_rocksdb::UPSERT_ROCKSDB_SHRINK_ALLOCATED_BUFFERS_BY_RATIO,
&upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_CLUSTER_MEMORY_FRACTION,
&upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_MEMORY_BYTES,
&upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_ALLOW_STALL,
&STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES,
&STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_TO_CLUSTER_SIZE_FRACTION,
&STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_DISK_ONLY,
&STORAGE_STATISTICS_INTERVAL,
&STORAGE_STATISTICS_COLLECTION_INTERVAL,
&STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO,
&STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS,
&PERSIST_FAST_PATH_LIMIT,
&METRICS_RETENTION,
&UNSAFE_MOCK_AUDIT_EVENT_TIMESTAMP,
&ENABLE_RBAC_CHECKS,
&PG_SOURCE_CONNECT_TIMEOUT,
&PG_SOURCE_TCP_KEEPALIVES_IDLE,
&PG_SOURCE_TCP_KEEPALIVES_INTERVAL,
&PG_SOURCE_TCP_KEEPALIVES_RETRIES,
&PG_SOURCE_TCP_USER_TIMEOUT,
&PG_SOURCE_TCP_CONFIGURE_SERVER,
&PG_SOURCE_SNAPSHOT_STATEMENT_TIMEOUT,
&PG_SOURCE_WAL_SENDER_TIMEOUT,
&PG_SOURCE_SNAPSHOT_COLLECT_STRICT_COUNT,
&PG_SOURCE_SNAPSHOT_FALLBACK_TO_STRICT_COUNT,
&PG_SOURCE_SNAPSHOT_WAIT_FOR_COUNT,
&MYSQL_SOURCE_TCP_KEEPALIVE,
&MYSQL_SOURCE_SNAPSHOT_MAX_EXECUTION_TIME,
&MYSQL_SOURCE_SNAPSHOT_LOCK_WAIT_TIMEOUT,
&SSH_CHECK_INTERVAL,
&SSH_CONNECT_TIMEOUT,
&SSH_KEEPALIVES_IDLE,
&KAFKA_SOCKET_KEEPALIVE,
&KAFKA_SOCKET_TIMEOUT,
&KAFKA_TRANSACTION_TIMEOUT,
&KAFKA_SOCKET_CONNECTION_SETUP_TIMEOUT,
&KAFKA_FETCH_METADATA_TIMEOUT,
&KAFKA_PROGRESS_RECORD_FETCH_TIMEOUT,
&KAFKA_DEFAULT_METADATA_FETCH_INTERVAL,
&ENABLE_LAUNCHDARKLY,
&MAX_CONNECTIONS,
&NETWORK_POLICY,
&SUPERUSER_RESERVED_CONNECTIONS,
&KEEP_N_SOURCE_STATUS_HISTORY_ENTRIES,
&KEEP_N_SINK_STATUS_HISTORY_ENTRIES,
&KEEP_N_PRIVATELINK_STATUS_HISTORY_ENTRIES,
&REPLICA_STATUS_HISTORY_RETENTION_WINDOW,
&ARRANGEMENT_EXERT_PROPORTIONALITY,
&ENABLE_STORAGE_SHARD_FINALIZATION,
&ENABLE_CONSOLIDATE_AFTER_UNION_NEGATE,
&ENABLE_DEFAULT_CONNECTION_VALIDATION,
&ENABLE_REDUCE_REDUCTION,
&MIN_TIMESTAMP_INTERVAL,
&MAX_TIMESTAMP_INTERVAL,
&LOGGING_FILTER,
&OPENTELEMETRY_FILTER,
&LOGGING_FILTER_DEFAULTS,
&OPENTELEMETRY_FILTER_DEFAULTS,
&SENTRY_FILTERS,
&WEBHOOKS_SECRETS_CACHING_TTL_SECS,
&COORD_SLOW_MESSAGE_WARN_THRESHOLD,
&grpc_client::CONNECT_TIMEOUT,
&grpc_client::HTTP2_KEEP_ALIVE_INTERVAL,
&grpc_client::HTTP2_KEEP_ALIVE_TIMEOUT,
&cluster_scheduling::CLUSTER_MULTI_PROCESS_REPLICA_AZ_AFFINITY_WEIGHT,
&cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY,
&cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT,
&cluster_scheduling::CLUSTER_ENABLE_TOPOLOGY_SPREAD,
&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE,
&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MAX_SKEW,
&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_SOFT,
&cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY,
&cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY_WEIGHT,
&cluster_scheduling::CLUSTER_ALWAYS_USE_DISK,
&cluster_scheduling::CLUSTER_ALTER_CHECK_READY_INTERVAL,
&cluster_scheduling::CLUSTER_CHECK_SCHEDULING_POLICIES_INTERVAL,
&cluster_scheduling::CLUSTER_SECURITY_CONTEXT_ENABLED,
&cluster_scheduling::CLUSTER_REFRESH_MV_COMPACTION_ESTIMATE,
&grpc_client::HTTP2_KEEP_ALIVE_TIMEOUT,
&STATEMENT_LOGGING_MAX_SAMPLE_RATE,
&STATEMENT_LOGGING_DEFAULT_SAMPLE_RATE,
&STATEMENT_LOGGING_TARGET_DATA_RATE,
&STATEMENT_LOGGING_MAX_DATA_CREDIT,
&ENABLE_INTERNAL_STATEMENT_LOGGING,
&OPTIMIZER_STATS_TIMEOUT,
&OPTIMIZER_ONESHOT_STATS_TIMEOUT,
&PRIVATELINK_STATUS_UPDATE_QUOTA_PER_MINUTE,
&WEBHOOK_CONCURRENT_REQUEST_LIMIT,
&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_SIZE,
&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_WAIT,
&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL,
&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL_STAGGER,
&USER_STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION,
&FORCE_SOURCE_TABLE_SYNTAX,
&OPTIMIZER_E2E_LATENCY_WARNING_THRESHOLD,
];
let dyncfgs = mz_dyncfgs::all_dyncfgs();
let dyncfg_vars: Vec<_> = dyncfgs
.entries()
.map(|cfg| match cfg.default() {
ConfigVal::Bool(default) => {
VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
}
ConfigVal::U32(default) => {
VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
}
ConfigVal::Usize(default) => {
VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
}
ConfigVal::OptUsize(default) => {
VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
}
ConfigVal::F64(default) => {
VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
}
ConfigVal::String(default) => {
VarDefinition::new_runtime(cfg.name(), default.clone(), cfg.desc(), false)
}
ConfigVal::Duration(default) => {
VarDefinition::new_runtime(cfg.name(), default.clone(), cfg.desc(), false)
}
ConfigVal::Json(default) => {
VarDefinition::new_runtime(cfg.name(), default.clone(), cfg.desc(), false)
}
})
.collect();
let vars: BTreeMap<_, _> = system_vars
.into_iter()
.chain(definitions::FEATURE_FLAGS.iter().copied())
.chain(Self::SESSION_VARS.values().copied())
.cloned()
.chain(dyncfg_vars)
.map(|var| (var.name, SystemVar::new(var)))
.collect();
let vars = SystemVars {
vars,
callbacks: BTreeMap::new(),
allow_unsafe: false,
dyncfgs,
};
vars
}
pub fn dyncfgs(&self) -> &ConfigSet {
&self.dyncfgs
}
pub fn set_unsafe(mut self, allow_unsafe: bool) -> Self {
self.allow_unsafe = allow_unsafe;
self
}
pub fn allow_unsafe(&self) -> bool {
self.allow_unsafe
}
fn expect_value<V: 'static>(&self, var: &VarDefinition) -> &V {
let val = self
.vars
.get(var.name)
.expect("provided var should be in state");
val.value_dyn()
.as_any()
.downcast_ref::<V>()
.expect("provided var type should matched stored var")
}
fn expect_config_value<V: ConfigType + 'static>(&self, name: &UncasedStr) -> &V {
let val = self
.vars
.get(name)
.unwrap_or_else(|| panic!("provided var {name} should be in state"));
val.value_dyn()
.as_any()
.downcast_ref()
.expect("provided var type should matched stored var")
}
pub fn reset_all(&mut self) {
for (_, var) in &mut self.vars {
var.reset();
}
}
pub fn iter(&self) -> impl Iterator<Item = &dyn Var> {
self.vars
.values()
.map(|v| v.as_var())
.filter(|v| !Self::SESSION_VARS.contains_key(UncasedStr::new(v.name())))
}
pub fn iter_synced(&self) -> impl Iterator<Item = &dyn Var> {
self.iter().filter(|v| v.name() != ENABLE_LAUNCHDARKLY.name)
}
pub fn iter_session(&self) -> impl Iterator<Item = &dyn Var> {
self.vars
.values()
.map(|v| v.as_var())
.filter(|v| Self::SESSION_VARS.contains_key(UncasedStr::new(v.name())))
}
pub fn user_modifiable(&self, name: &str) -> bool {
Self::SESSION_VARS.contains_key(UncasedStr::new(name))
|| name == ENABLE_RBAC_CHECKS.name()
|| name == NETWORK_POLICY.name()
}
pub fn get(&self, name: &str) -> Result<&dyn Var, VarError> {
self.vars
.get(UncasedStr::new(name))
.map(|v| v.as_var())
.ok_or_else(|| VarError::UnknownParameter(name.into()))
}
pub fn is_default(&self, name: &str, input: VarInput) -> Result<bool, VarError> {
self.vars
.get(UncasedStr::new(name))
.ok_or_else(|| VarError::UnknownParameter(name.into()))
.and_then(|v| v.is_default(input))
}
pub fn set(&mut self, name: &str, input: VarInput) -> Result<bool, VarError> {
let result = self
.vars
.get_mut(UncasedStr::new(name))
.ok_or_else(|| VarError::UnknownParameter(name.into()))
.and_then(|v| v.set(input))?;
self.notify_callbacks(name);
Ok(result)
}
pub fn parse(&self, name: &str, input: VarInput) -> Result<Box<dyn Value>, VarError> {
self.vars
.get(UncasedStr::new(name))
.ok_or_else(|| VarError::UnknownParameter(name.into()))
.and_then(|v| v.parse(input))
}
pub fn set_default(&mut self, name: &str, input: VarInput) -> Result<(), VarError> {
let result = self
.vars
.get_mut(UncasedStr::new(name))
.ok_or_else(|| VarError::UnknownParameter(name.into()))
.and_then(|v| v.set_default(input))?;
self.notify_callbacks(name);
Ok(result)
}
pub fn reset(&mut self, name: &str) -> Result<bool, VarError> {
let result = self
.vars
.get_mut(UncasedStr::new(name))
.ok_or_else(|| VarError::UnknownParameter(name.into()))
.map(|v| v.reset())?;
self.notify_callbacks(name);
Ok(result)
}
pub fn defaults(&self) -> BTreeMap<String, String> {
self.vars
.iter()
.map(|(name, var)| {
let default = var
.dynamic_default
.as_deref()
.unwrap_or(var.definition.default_value());
(name.as_str().to_owned(), default.format())
})
.collect()
}
pub fn register_callback(
&mut self,
var: &VarDefinition,
callback: Arc<dyn Fn(&SystemVars) + Send + Sync>,
) {
self.callbacks
.entry(var.name().to_string())
.or_default()
.push(callback);
self.notify_callbacks(var.name());
}
fn notify_callbacks(&self, name: &str) {
if let Some(callbacks) = self.callbacks.get(name) {
for callback in callbacks {
(callback)(self);
}
}
}
pub fn default_cluster(&self) -> String {
self.expect_value::<String>(&CLUSTER).to_owned()
}
pub fn max_kafka_connections(&self) -> u32 {
*self.expect_value(&MAX_KAFKA_CONNECTIONS)
}
pub fn max_postgres_connections(&self) -> u32 {
*self.expect_value(&MAX_POSTGRES_CONNECTIONS)
}
pub fn max_mysql_connections(&self) -> u32 {
*self.expect_value(&MAX_MYSQL_CONNECTIONS)
}
pub fn max_aws_privatelink_connections(&self) -> u32 {
*self.expect_value(&MAX_AWS_PRIVATELINK_CONNECTIONS)
}
pub fn max_tables(&self) -> u32 {
*self.expect_value(&MAX_TABLES)
}
pub fn max_sources(&self) -> u32 {
*self.expect_value(&MAX_SOURCES)
}
pub fn max_sinks(&self) -> u32 {
*self.expect_value(&MAX_SINKS)
}
pub fn max_materialized_views(&self) -> u32 {
*self.expect_value(&MAX_MATERIALIZED_VIEWS)
}
pub fn max_clusters(&self) -> u32 {
*self.expect_value(&MAX_CLUSTERS)
}
pub fn max_replicas_per_cluster(&self) -> u32 {
*self.expect_value(&MAX_REPLICAS_PER_CLUSTER)
}
pub fn max_credit_consumption_rate(&self) -> Numeric {
*self.expect_value(&MAX_CREDIT_CONSUMPTION_RATE)
}
pub fn max_databases(&self) -> u32 {
*self.expect_value(&MAX_DATABASES)
}
pub fn max_schemas_per_database(&self) -> u32 {
*self.expect_value(&MAX_SCHEMAS_PER_DATABASE)
}
pub fn max_objects_per_schema(&self) -> u32 {
*self.expect_value(&MAX_OBJECTS_PER_SCHEMA)
}
pub fn max_secrets(&self) -> u32 {
*self.expect_value(&MAX_SECRETS)
}
pub fn max_roles(&self) -> u32 {
*self.expect_value(&MAX_ROLES)
}
pub fn max_continual_tasks(&self) -> u32 {
*self.expect_value(&MAX_CONTINUAL_TASKS)
}
pub fn max_network_policies(&self) -> u32 {
*self.expect_value(&MAX_NETWORK_POLICIES)
}
pub fn max_rules_per_network_policy(&self) -> u32 {
*self.expect_value(&MAX_RULES_PER_NETWORK_POLICY)
}
pub fn max_result_size(&self) -> u64 {
self.expect_value::<ByteSize>(&MAX_RESULT_SIZE).as_bytes()
}
pub fn max_copy_from_size(&self) -> u32 {
*self.expect_value(&MAX_COPY_FROM_SIZE)
}
pub fn allowed_cluster_replica_sizes(&self) -> Vec<String> {
self.expect_value::<Vec<Ident>>(&ALLOWED_CLUSTER_REPLICA_SIZES)
.into_iter()
.map(|s| s.as_str().into())
.collect()
}
pub fn disk_cluster_replicas_default(&self) -> bool {
*self.expect_value(&DISK_CLUSTER_REPLICAS_DEFAULT)
}
pub fn upsert_rocksdb_auto_spill_to_disk(&self) -> bool {
*self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_AUTO_SPILL_TO_DISK)
}
pub fn upsert_rocksdb_auto_spill_threshold_bytes(&self) -> usize {
*self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_AUTO_SPILL_THRESHOLD_BYTES)
}
pub fn upsert_rocksdb_compaction_style(&self) -> mz_rocksdb_types::config::CompactionStyle {
*self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_COMPACTION_STYLE)
}
pub fn upsert_rocksdb_optimize_compaction_memtable_budget(&self) -> usize {
*self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET)
}
pub fn upsert_rocksdb_level_compaction_dynamic_level_bytes(&self) -> bool {
*self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES)
}
pub fn upsert_rocksdb_universal_compaction_ratio(&self) -> i32 {
*self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_UNIVERSAL_COMPACTION_RATIO)
}
pub fn upsert_rocksdb_parallelism(&self) -> Option<i32> {
*self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_PARALLELISM)
}
pub fn upsert_rocksdb_compression_type(&self) -> mz_rocksdb_types::config::CompressionType {
*self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_COMPRESSION_TYPE)
}
pub fn upsert_rocksdb_bottommost_compression_type(
&self,
) -> mz_rocksdb_types::config::CompressionType {
*self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_BOTTOMMOST_COMPRESSION_TYPE)
}
pub fn upsert_rocksdb_batch_size(&self) -> usize {
*self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_BATCH_SIZE)
}
pub fn upsert_rocksdb_retry_duration(&self) -> Duration {
*self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_RETRY_DURATION)
}
pub fn upsert_rocksdb_stats_log_interval_seconds(&self) -> u32 {
*self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_STATS_LOG_INTERVAL_SECONDS)
}
pub fn upsert_rocksdb_stats_persist_interval_seconds(&self) -> u32 {
*self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_STATS_PERSIST_INTERVAL_SECONDS)
}
pub fn upsert_rocksdb_point_lookup_block_cache_size_mb(&self) -> Option<u32> {
*self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_POINT_LOOKUP_BLOCK_CACHE_SIZE_MB)
}
pub fn upsert_rocksdb_shrink_allocated_buffers_by_ratio(&self) -> usize {
*self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_SHRINK_ALLOCATED_BUFFERS_BY_RATIO)
}
pub fn upsert_rocksdb_write_buffer_manager_cluster_memory_fraction(&self) -> Option<Numeric> {
*self.expect_value(
&upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_CLUSTER_MEMORY_FRACTION,
)
}
pub fn upsert_rocksdb_write_buffer_manager_memory_bytes(&self) -> Option<usize> {
*self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_MEMORY_BYTES)
}
pub fn upsert_rocksdb_write_buffer_manager_allow_stall(&self) -> bool {
*self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_ALLOW_STALL)
}
pub fn persist_fast_path_limit(&self) -> usize {
*self.expect_value(&PERSIST_FAST_PATH_LIMIT)
}
pub fn pg_source_connect_timeout(&self) -> Duration {
*self.expect_value(&PG_SOURCE_CONNECT_TIMEOUT)
}
pub fn pg_source_tcp_keepalives_retries(&self) -> u32 {
*self.expect_value(&PG_SOURCE_TCP_KEEPALIVES_RETRIES)
}
pub fn pg_source_tcp_keepalives_idle(&self) -> Duration {
*self.expect_value(&PG_SOURCE_TCP_KEEPALIVES_IDLE)
}
pub fn pg_source_tcp_keepalives_interval(&self) -> Duration {
*self.expect_value(&PG_SOURCE_TCP_KEEPALIVES_INTERVAL)
}
pub fn pg_source_tcp_user_timeout(&self) -> Duration {
*self.expect_value(&PG_SOURCE_TCP_USER_TIMEOUT)
}
pub fn pg_source_tcp_configure_server(&self) -> bool {
*self.expect_value(&PG_SOURCE_TCP_CONFIGURE_SERVER)
}
pub fn pg_source_snapshot_statement_timeout(&self) -> Duration {
*self.expect_value(&PG_SOURCE_SNAPSHOT_STATEMENT_TIMEOUT)
}
pub fn pg_source_wal_sender_timeout(&self) -> Option<Duration> {
*self.expect_value(&PG_SOURCE_WAL_SENDER_TIMEOUT)
}
pub fn pg_source_snapshot_collect_strict_count(&self) -> bool {
*self.expect_value(&PG_SOURCE_SNAPSHOT_COLLECT_STRICT_COUNT)
}
pub fn pg_source_snapshot_fallback_to_strict_count(&self) -> bool {
*self.expect_value(&PG_SOURCE_SNAPSHOT_FALLBACK_TO_STRICT_COUNT)
}
pub fn pg_source_snapshot_wait_for_count(&self) -> bool {
*self.expect_value(&PG_SOURCE_SNAPSHOT_WAIT_FOR_COUNT)
}
pub fn mysql_source_tcp_keepalive(&self) -> Duration {
*self.expect_value(&MYSQL_SOURCE_TCP_KEEPALIVE)
}
pub fn mysql_source_snapshot_max_execution_time(&self) -> Duration {
*self.expect_value(&MYSQL_SOURCE_SNAPSHOT_MAX_EXECUTION_TIME)
}
pub fn mysql_source_snapshot_lock_wait_timeout(&self) -> Duration {
*self.expect_value(&MYSQL_SOURCE_SNAPSHOT_LOCK_WAIT_TIMEOUT)
}
pub fn ssh_check_interval(&self) -> Duration {
*self.expect_value(&SSH_CHECK_INTERVAL)
}
pub fn ssh_connect_timeout(&self) -> Duration {
*self.expect_value(&SSH_CONNECT_TIMEOUT)
}
pub fn ssh_keepalives_idle(&self) -> Duration {
*self.expect_value(&SSH_KEEPALIVES_IDLE)
}
pub fn kafka_socket_keepalive(&self) -> bool {
*self.expect_value(&KAFKA_SOCKET_KEEPALIVE)
}
pub fn kafka_socket_timeout(&self) -> Option<Duration> {
*self.expect_value(&KAFKA_SOCKET_TIMEOUT)
}
pub fn kafka_transaction_timeout(&self) -> Duration {
*self.expect_value(&KAFKA_TRANSACTION_TIMEOUT)
}
pub fn kafka_socket_connection_setup_timeout(&self) -> Duration {
*self.expect_value(&KAFKA_SOCKET_CONNECTION_SETUP_TIMEOUT)
}
pub fn kafka_fetch_metadata_timeout(&self) -> Duration {
*self.expect_value(&KAFKA_FETCH_METADATA_TIMEOUT)
}
pub fn kafka_progress_record_fetch_timeout(&self) -> Option<Duration> {
*self.expect_value(&KAFKA_PROGRESS_RECORD_FETCH_TIMEOUT)
}
pub fn kafka_default_metadata_fetch_interval(&self) -> Duration {
*self.expect_value(&KAFKA_DEFAULT_METADATA_FETCH_INTERVAL)
}
pub fn crdb_connect_timeout(&self) -> Duration {
*self.expect_config_value(UncasedStr::new(
mz_persist_client::cfg::CRDB_CONNECT_TIMEOUT.name(),
))
}
pub fn crdb_tcp_user_timeout(&self) -> Duration {
*self.expect_config_value(UncasedStr::new(
mz_persist_client::cfg::CRDB_TCP_USER_TIMEOUT.name(),
))
}
pub fn storage_dataflow_max_inflight_bytes(&self) -> Option<usize> {
*self.expect_value(&STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES)
}
pub fn storage_dataflow_max_inflight_bytes_to_cluster_size_fraction(&self) -> Option<Numeric> {
*self.expect_value(&STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_TO_CLUSTER_SIZE_FRACTION)
}
pub fn storage_shrink_upsert_unused_buffers_by_ratio(&self) -> usize {
*self.expect_value(&STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO)
}
pub fn storage_dataflow_max_inflight_bytes_disk_only(&self) -> bool {
*self.expect_value(&STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_DISK_ONLY)
}
pub fn storage_statistics_interval(&self) -> Duration {
*self.expect_value(&STORAGE_STATISTICS_INTERVAL)
}
pub fn storage_statistics_collection_interval(&self) -> Duration {
*self.expect_value(&STORAGE_STATISTICS_COLLECTION_INTERVAL)
}
pub fn storage_record_source_sink_namespaced_errors(&self) -> bool {
*self.expect_value(&STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS)
}
pub fn persist_stats_filter_enabled(&self) -> bool {
*self.expect_config_value(UncasedStr::new(
mz_persist_client::stats::STATS_FILTER_ENABLED.name(),
))
}
pub fn dyncfg_updates(&self) -> ConfigUpdates {
let mut updates = ConfigUpdates::default();
for entry in self.dyncfgs.entries() {
let name = UncasedStr::new(entry.name());
let val = match entry.val() {
ConfigVal::Bool(_) => ConfigVal::from(*self.expect_config_value::<bool>(name)),
ConfigVal::U32(_) => ConfigVal::from(*self.expect_config_value::<u32>(name)),
ConfigVal::Usize(_) => ConfigVal::from(*self.expect_config_value::<usize>(name)),
ConfigVal::OptUsize(_) => {
ConfigVal::from(*self.expect_config_value::<Option<usize>>(name))
}
ConfigVal::F64(_) => ConfigVal::from(*self.expect_config_value::<f64>(name)),
ConfigVal::String(_) => {
ConfigVal::from(self.expect_config_value::<String>(name).clone())
}
ConfigVal::Duration(_) => {
ConfigVal::from(*self.expect_config_value::<Duration>(name))
}
ConfigVal::Json(_) => {
ConfigVal::from(self.expect_config_value::<serde_json::Value>(name).clone())
}
};
updates.add_dynamic(entry.name(), val);
}
updates.apply(&self.dyncfgs);
updates
}
pub fn metrics_retention(&self) -> Duration {
*self.expect_value(&METRICS_RETENTION)
}
pub fn unsafe_mock_audit_event_timestamp(&self) -> Option<mz_repr::Timestamp> {
*self.expect_value(&UNSAFE_MOCK_AUDIT_EVENT_TIMESTAMP)
}
pub fn enable_rbac_checks(&self) -> bool {
*self.expect_value(&ENABLE_RBAC_CHECKS)
}
pub fn max_connections(&self) -> u32 {
*self.expect_value(&MAX_CONNECTIONS)
}
pub fn default_network_policy_name(&self) -> String {
self.expect_value::<String>(&NETWORK_POLICY).clone()
}
pub fn superuser_reserved_connections(&self) -> u32 {
*self.expect_value(&SUPERUSER_RESERVED_CONNECTIONS)
}
pub fn keep_n_source_status_history_entries(&self) -> usize {
*self.expect_value(&KEEP_N_SOURCE_STATUS_HISTORY_ENTRIES)
}
pub fn keep_n_sink_status_history_entries(&self) -> usize {
*self.expect_value(&KEEP_N_SINK_STATUS_HISTORY_ENTRIES)
}
pub fn keep_n_privatelink_status_history_entries(&self) -> usize {
*self.expect_value(&KEEP_N_PRIVATELINK_STATUS_HISTORY_ENTRIES)
}
pub fn replica_status_history_retention_window(&self) -> Duration {
*self.expect_value(&REPLICA_STATUS_HISTORY_RETENTION_WINDOW)
}
pub fn arrangement_exert_proportionality(&self) -> u32 {
*self.expect_value(&ARRANGEMENT_EXERT_PROPORTIONALITY)
}
pub fn enable_storage_shard_finalization(&self) -> bool {
*self.expect_value(&ENABLE_STORAGE_SHARD_FINALIZATION)
}
pub fn enable_consolidate_after_union_negate(&self) -> bool {
*self.expect_value(&ENABLE_CONSOLIDATE_AFTER_UNION_NEGATE)
}
pub fn enable_reduce_reduction(&self) -> bool {
*self.expect_value(&ENABLE_REDUCE_REDUCTION)
}
pub fn enable_default_connection_validation(&self) -> bool {
*self.expect_value(&ENABLE_DEFAULT_CONNECTION_VALIDATION)
}
pub fn min_timestamp_interval(&self) -> Duration {
*self.expect_value(&MIN_TIMESTAMP_INTERVAL)
}
pub fn max_timestamp_interval(&self) -> Duration {
*self.expect_value(&MAX_TIMESTAMP_INTERVAL)
}
pub fn logging_filter(&self) -> CloneableEnvFilter {
self.expect_value::<CloneableEnvFilter>(&LOGGING_FILTER)
.clone()
}
pub fn opentelemetry_filter(&self) -> CloneableEnvFilter {
self.expect_value::<CloneableEnvFilter>(&OPENTELEMETRY_FILTER)
.clone()
}
pub fn logging_filter_defaults(&self) -> Vec<SerializableDirective> {
self.expect_value::<Vec<SerializableDirective>>(&LOGGING_FILTER_DEFAULTS)
.clone()
}
pub fn opentelemetry_filter_defaults(&self) -> Vec<SerializableDirective> {
self.expect_value::<Vec<SerializableDirective>>(&OPENTELEMETRY_FILTER_DEFAULTS)
.clone()
}
pub fn sentry_filters(&self) -> Vec<SerializableDirective> {
self.expect_value::<Vec<SerializableDirective>>(&SENTRY_FILTERS)
.clone()
}
pub fn webhooks_secrets_caching_ttl_secs(&self) -> usize {
*self.expect_value(&WEBHOOKS_SECRETS_CACHING_TTL_SECS)
}
pub fn coord_slow_message_warn_threshold(&self) -> Duration {
*self.expect_value(&COORD_SLOW_MESSAGE_WARN_THRESHOLD)
}
pub fn grpc_client_http2_keep_alive_interval(&self) -> Duration {
*self.expect_value(&grpc_client::HTTP2_KEEP_ALIVE_INTERVAL)
}
pub fn grpc_client_http2_keep_alive_timeout(&self) -> Duration {
*self.expect_value(&grpc_client::HTTP2_KEEP_ALIVE_TIMEOUT)
}
pub fn grpc_connect_timeout(&self) -> Duration {
*self.expect_value(&grpc_client::CONNECT_TIMEOUT)
}
pub fn cluster_multi_process_replica_az_affinity_weight(&self) -> Option<i32> {
*self.expect_value(&cluster_scheduling::CLUSTER_MULTI_PROCESS_REPLICA_AZ_AFFINITY_WEIGHT)
}
pub fn cluster_soften_replication_anti_affinity(&self) -> bool {
*self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY)
}
pub fn cluster_soften_replication_anti_affinity_weight(&self) -> i32 {
*self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT)
}
pub fn cluster_enable_topology_spread(&self) -> bool {
*self.expect_value(&cluster_scheduling::CLUSTER_ENABLE_TOPOLOGY_SPREAD)
}
pub fn cluster_topology_spread_ignore_non_singular_scale(&self) -> bool {
*self.expect_value(&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE)
}
pub fn cluster_topology_spread_max_skew(&self) -> i32 {
*self.expect_value(&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MAX_SKEW)
}
pub fn cluster_topology_spread_soft(&self) -> bool {
*self.expect_value(&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_SOFT)
}
pub fn cluster_soften_az_affinity(&self) -> bool {
*self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY)
}
pub fn cluster_soften_az_affinity_weight(&self) -> i32 {
*self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY_WEIGHT)
}
pub fn cluster_always_use_disk(&self) -> bool {
*self.expect_value(&cluster_scheduling::CLUSTER_ALWAYS_USE_DISK)
}
pub fn cluster_alter_check_ready_interval(&self) -> Duration {
*self.expect_value(&cluster_scheduling::CLUSTER_ALTER_CHECK_READY_INTERVAL)
}
pub fn cluster_check_scheduling_policies_interval(&self) -> Duration {
*self.expect_value(&cluster_scheduling::CLUSTER_CHECK_SCHEDULING_POLICIES_INTERVAL)
}
pub fn cluster_security_context_enabled(&self) -> bool {
*self.expect_value(&cluster_scheduling::CLUSTER_SECURITY_CONTEXT_ENABLED)
}
pub fn cluster_refresh_mv_compaction_estimate(&self) -> Duration {
*self.expect_value(&cluster_scheduling::CLUSTER_REFRESH_MV_COMPACTION_ESTIMATE)
}
pub fn privatelink_status_update_quota_per_minute(&self) -> u32 {
*self.expect_value(&PRIVATELINK_STATUS_UPDATE_QUOTA_PER_MINUTE)
}
pub fn statement_logging_target_data_rate(&self) -> Option<usize> {
*self.expect_value(&STATEMENT_LOGGING_TARGET_DATA_RATE)
}
pub fn statement_logging_max_data_credit(&self) -> Option<usize> {
*self.expect_value(&STATEMENT_LOGGING_MAX_DATA_CREDIT)
}
pub fn statement_logging_max_sample_rate(&self) -> Numeric {
*self.expect_value(&STATEMENT_LOGGING_MAX_SAMPLE_RATE)
}
pub fn statement_logging_default_sample_rate(&self) -> Numeric {
*self.expect_value(&STATEMENT_LOGGING_DEFAULT_SAMPLE_RATE)
}
pub fn enable_internal_statement_logging(&self) -> bool {
*self.expect_value(&ENABLE_INTERNAL_STATEMENT_LOGGING)
}
pub fn optimizer_stats_timeout(&self) -> Duration {
*self.expect_value(&OPTIMIZER_STATS_TIMEOUT)
}
pub fn optimizer_oneshot_stats_timeout(&self) -> Duration {
*self.expect_value(&OPTIMIZER_ONESHOT_STATS_TIMEOUT)
}
pub fn webhook_concurrent_request_limit(&self) -> usize {
*self.expect_value(&WEBHOOK_CONCURRENT_REQUEST_LIMIT)
}
pub fn pg_timestamp_oracle_connection_pool_max_size(&self) -> usize {
*self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_SIZE)
}
pub fn pg_timestamp_oracle_connection_pool_max_wait(&self) -> Option<Duration> {
*self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_WAIT)
}
pub fn pg_timestamp_oracle_connection_pool_ttl(&self) -> Duration {
*self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL)
}
pub fn pg_timestamp_oracle_connection_pool_ttl_stagger(&self) -> Duration {
*self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL_STAGGER)
}
pub fn user_storage_managed_collections_batch_duration(&self) -> Duration {
*self.expect_value(&USER_STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION)
}
pub fn force_source_table_syntax(&self) -> bool {
*self.expect_value(&FORCE_SOURCE_TABLE_SYNTAX)
}
pub fn optimizer_e2e_latency_warning_threshold(&self) -> Duration {
*self.expect_value(&OPTIMIZER_E2E_LATENCY_WARNING_THRESHOLD)
}
pub fn is_compute_config_var(&self, name: &str) -> bool {
name == MAX_RESULT_SIZE.name() || self.is_dyncfg_var(name) || is_tracing_var(name)
}
pub fn is_storage_config_var(&self, name: &str) -> bool {
name == PG_SOURCE_CONNECT_TIMEOUT.name()
|| name == PG_SOURCE_TCP_KEEPALIVES_IDLE.name()
|| name == PG_SOURCE_TCP_KEEPALIVES_INTERVAL.name()
|| name == PG_SOURCE_TCP_KEEPALIVES_RETRIES.name()
|| name == PG_SOURCE_TCP_USER_TIMEOUT.name()
|| name == PG_SOURCE_TCP_CONFIGURE_SERVER.name()
|| name == PG_SOURCE_SNAPSHOT_STATEMENT_TIMEOUT.name()
|| name == PG_SOURCE_WAL_SENDER_TIMEOUT.name()
|| name == PG_SOURCE_SNAPSHOT_COLLECT_STRICT_COUNT.name()
|| name == PG_SOURCE_SNAPSHOT_FALLBACK_TO_STRICT_COUNT.name()
|| name == PG_SOURCE_SNAPSHOT_WAIT_FOR_COUNT.name()
|| name == MYSQL_SOURCE_TCP_KEEPALIVE.name()
|| name == MYSQL_SOURCE_SNAPSHOT_MAX_EXECUTION_TIME.name()
|| name == MYSQL_SOURCE_SNAPSHOT_LOCK_WAIT_TIMEOUT.name()
|| name == ENABLE_STORAGE_SHARD_FINALIZATION.name()
|| name == SSH_CHECK_INTERVAL.name()
|| name == SSH_CONNECT_TIMEOUT.name()
|| name == SSH_KEEPALIVES_IDLE.name()
|| name == KAFKA_SOCKET_KEEPALIVE.name()
|| name == KAFKA_SOCKET_TIMEOUT.name()
|| name == KAFKA_TRANSACTION_TIMEOUT.name()
|| name == KAFKA_SOCKET_CONNECTION_SETUP_TIMEOUT.name()
|| name == KAFKA_FETCH_METADATA_TIMEOUT.name()
|| name == KAFKA_PROGRESS_RECORD_FETCH_TIMEOUT.name()
|| name == KAFKA_DEFAULT_METADATA_FETCH_INTERVAL.name()
|| name == STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES.name()
|| name == STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_TO_CLUSTER_SIZE_FRACTION.name()
|| name == STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_DISK_ONLY.name()
|| name == STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO.name()
|| name == STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS.name()
|| name == STORAGE_STATISTICS_INTERVAL.name()
|| name == STORAGE_STATISTICS_COLLECTION_INTERVAL.name()
|| name == USER_STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION.name()
|| is_upsert_rocksdb_config_var(name)
|| self.is_dyncfg_var(name)
|| is_tracing_var(name)
}
fn is_dyncfg_var(&self, name: &str) -> bool {
self.dyncfgs.entries().any(|e| name == e.name())
}
}
pub fn is_tracing_var(name: &str) -> bool {
name == LOGGING_FILTER.name()
|| name == LOGGING_FILTER_DEFAULTS.name()
|| name == OPENTELEMETRY_FILTER.name()
|| name == OPENTELEMETRY_FILTER_DEFAULTS.name()
|| name == SENTRY_FILTERS.name()
}
pub fn is_secrets_caching_var(name: &str) -> bool {
name == WEBHOOKS_SECRETS_CACHING_TTL_SECS.name()
}
fn is_upsert_rocksdb_config_var(name: &str) -> bool {
name == upsert_rocksdb::UPSERT_ROCKSDB_COMPACTION_STYLE.name()
|| name == upsert_rocksdb::UPSERT_ROCKSDB_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET.name()
|| name == upsert_rocksdb::UPSERT_ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES.name()
|| name == upsert_rocksdb::UPSERT_ROCKSDB_UNIVERSAL_COMPACTION_RATIO.name()
|| name == upsert_rocksdb::UPSERT_ROCKSDB_PARALLELISM.name()
|| name == upsert_rocksdb::UPSERT_ROCKSDB_COMPRESSION_TYPE.name()
|| name == upsert_rocksdb::UPSERT_ROCKSDB_BOTTOMMOST_COMPRESSION_TYPE.name()
|| name == upsert_rocksdb::UPSERT_ROCKSDB_BATCH_SIZE.name()
|| name == upsert_rocksdb::UPSERT_ROCKSDB_STATS_LOG_INTERVAL_SECONDS.name()
|| name == upsert_rocksdb::UPSERT_ROCKSDB_STATS_PERSIST_INTERVAL_SECONDS.name()
|| name == upsert_rocksdb::UPSERT_ROCKSDB_POINT_LOOKUP_BLOCK_CACHE_SIZE_MB.name()
|| name == upsert_rocksdb::UPSERT_ROCKSDB_SHRINK_ALLOCATED_BUFFERS_BY_RATIO.name()
}
pub fn is_pg_timestamp_oracle_config_var(name: &str) -> bool {
name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_SIZE.name()
|| name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_WAIT.name()
|| name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL.name()
|| name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL_STAGGER.name()
|| name == CRDB_CONNECT_TIMEOUT.name()
|| name == CRDB_TCP_USER_TIMEOUT.name()
}
pub fn is_cluster_scheduling_var(name: &str) -> bool {
name == cluster_scheduling::CLUSTER_MULTI_PROCESS_REPLICA_AZ_AFFINITY_WEIGHT.name()
|| name == cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY.name()
|| name == cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT.name()
|| name == cluster_scheduling::CLUSTER_ENABLE_TOPOLOGY_SPREAD.name()
|| name == cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE.name()
|| name == cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MAX_SKEW.name()
|| name == cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_SOFT.name()
|| name == cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY.name()
|| name == cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY_WEIGHT.name()
|| name == cluster_scheduling::CLUSTER_ALWAYS_USE_DISK.name()
}
pub fn is_http_config_var(name: &str) -> bool {
name == WEBHOOK_CONCURRENT_REQUEST_LIMIT.name()
}
#[derive(Debug)]
pub struct FeatureFlag {
pub flag: &'static VarDefinition,
pub feature_desc: &'static str,
}
impl Var for FeatureFlag {
fn name(&self) -> &'static str {
self.flag.name()
}
fn value(&self) -> String {
self.flag.value()
}
fn description(&self) -> &'static str {
self.flag.description()
}
fn type_name(&self) -> Cow<'static, str> {
self.flag.type_name()
}
fn visible(&self, user: &User, system_vars: Option<&SystemVars>) -> Result<(), VarError> {
self.flag.visible(user, system_vars)
}
}
impl FeatureFlag {
pub fn enabled(
&self,
system_vars: Option<&SystemVars>,
feature: Option<String>,
detail: Option<String>,
) -> Result<(), VarError> {
match system_vars {
Some(system_vars) if *system_vars.expect_value::<bool>(self.flag) => Ok(()),
_ => Err(VarError::RequiresFeatureFlag {
feature: feature.unwrap_or(self.feature_desc.to_string()),
detail,
name_hint: system_vars
.map(|s| {
if s.allow_unsafe {
Some(self.flag.name)
} else {
None
}
})
.flatten(),
}),
}
}
}
impl Var for MzVersion {
fn name(&self) -> &'static str {
MZ_VERSION_NAME.as_str()
}
fn value(&self) -> String {
self.build_info
.human_version(self.helm_chart_version.clone())
}
fn description(&self) -> &'static str {
"Shows the Materialize server version (Materialize)."
}
fn type_name(&self) -> Cow<'static, str> {
String::type_name()
}
fn visible(&self, _: &User, _: Option<&SystemVars>) -> Result<(), VarError> {
Ok(())
}
}
impl Var for User {
fn name(&self) -> &'static str {
IS_SUPERUSER_NAME.as_str()
}
fn value(&self) -> String {
self.is_superuser().format()
}
fn description(&self) -> &'static str {
"Reports whether the current session is a superuser (PostgreSQL)."
}
fn type_name(&self) -> Cow<'static, str> {
bool::type_name()
}
fn visible(&self, _: &User, _: Option<&SystemVars>) -> Result<(), VarError> {
Ok(())
}
}