1use std::borrow::Cow;
67use std::clone::Clone;
68use std::collections::BTreeMap;
69use std::fmt::Debug;
70use std::net::IpAddr;
71use std::num::NonZeroU32;
72use std::string::ToString;
73use std::sync::{Arc, LazyLock};
74use std::time::Duration;
75
76use chrono::{DateTime, Utc};
77use derivative::Derivative;
78use imbl::OrdMap;
79use mz_build_info::BuildInfo;
80use mz_dyncfg::{ConfigSet, ConfigType, ConfigUpdates, ConfigVal, ParameterScope};
81use mz_persist_client::cfg::{
82 CRDB_CONNECT_TIMEOUT, CRDB_KEEPALIVES_IDLE, CRDB_KEEPALIVES_INTERVAL, CRDB_KEEPALIVES_RETRIES,
83 CRDB_TCP_USER_TIMEOUT,
84};
85use mz_repr::adt::numeric::Numeric;
86use mz_repr::adt::timestamp::CheckedTimestamp;
87use mz_repr::bytes::ByteSize;
88use mz_repr::user::{ExternalUserMetadata, InternalUserMetadata};
89use mz_tracing::{CloneableEnvFilter, SerializableDirective};
90use serde::Serialize;
91use thiserror::Error;
92use uncased::UncasedStr;
93
94use crate::ast::Ident;
95use crate::session::user::User;
96
97pub(crate) mod constraints;
98pub(crate) mod definitions;
99pub(crate) mod errors;
100pub(crate) mod polyfill;
101pub(crate) mod value;
102
103pub use definitions::*;
104pub use errors::*;
105pub use value::*;
106
107#[derive(Debug, Clone, Copy, PartialEq, Eq)]
112pub enum EndTransactionAction {
113 Commit,
115 Rollback,
117}
118
119#[derive(Debug, Clone, Copy)]
125pub enum VarInput<'a> {
126 Flat(&'a str),
134 SqlSet(&'a [String]),
140}
141
142impl<'a> VarInput<'a> {
143 pub fn to_vec(&self) -> Vec<String> {
145 match self {
146 VarInput::Flat(v) => vec![v.to_string()],
147 VarInput::SqlSet(values) => values.into_iter().map(|v| v.to_string()).collect(),
148 }
149 }
150}
151
152#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize)]
154pub enum OwnedVarInput {
155 Flat(String),
162 SqlSet(Vec<String>),
166}
167
168impl OwnedVarInput {
169 pub fn borrow(&self) -> VarInput<'_> {
171 match self {
172 OwnedVarInput::Flat(v) => VarInput::Flat(v),
173 OwnedVarInput::SqlSet(v) => VarInput::SqlSet(v),
174 }
175 }
176}
177
178pub trait Var: Debug {
180 fn name(&self) -> &'static str;
182
183 fn value(&self) -> String;
189
190 fn description(&self) -> &'static str;
193
194 fn type_name(&self) -> Cow<'static, str>;
196
197 fn visible(&self, user: &User, system_vars: &SystemVars) -> Result<(), VarError>;
202
203 fn is_unsafe(&self) -> bool {
205 self.name().starts_with("unsafe_")
206 }
207
208 fn scope(&self) -> ParameterScope {
212 ParameterScope::Environment
213 }
214
215 fn as_var(&self) -> &dyn Var
218 where
219 Self: Sized,
220 {
221 self
222 }
223}
224
225#[derive(Debug)]
232pub struct SessionVar {
233 definition: VarDefinition,
234 default_value: Option<Box<dyn Value>>,
236 local_value: Option<Box<dyn Value>>,
238 staged_value: Option<Box<dyn Value>>,
240 session_value: Option<Box<dyn Value>>,
242}
243
244impl Clone for SessionVar {
245 fn clone(&self) -> Self {
246 SessionVar {
247 definition: self.definition.clone(),
248 default_value: self.default_value.as_ref().map(|v| v.box_clone()),
249 local_value: self.local_value.as_ref().map(|v| v.box_clone()),
250 staged_value: self.staged_value.as_ref().map(|v| v.box_clone()),
251 session_value: self.session_value.as_ref().map(|v| v.box_clone()),
252 }
253 }
254}
255
256impl SessionVar {
257 pub const fn new(var: VarDefinition) -> Self {
258 SessionVar {
259 definition: var,
260 default_value: None,
261 local_value: None,
262 staged_value: None,
263 session_value: None,
264 }
265 }
266
267 pub fn check(&self, input: VarInput) -> Result<String, VarError> {
270 let v = self.definition.parse(input)?;
271 self.validate_constraints(v.as_ref())?;
272
273 Ok(v.format())
274 }
275
276 pub fn set(&mut self, input: VarInput, local: bool) -> Result<(), VarError> {
278 let v = self.definition.parse(input)?;
279
280 self.validate_constraints(v.as_ref())?;
282
283 if local {
284 self.local_value = Some(v);
285 } else {
286 self.local_value = None;
287 self.staged_value = Some(v);
288 }
289 Ok(())
290 }
291
292 pub fn set_default(&mut self, input: VarInput) -> Result<(), VarError> {
294 let v = self.definition.parse(input)?;
295 self.validate_constraints(v.as_ref())?;
296 self.default_value = Some(v);
297 Ok(())
298 }
299
300 pub fn reset(&mut self, local: bool) {
302 let value = self
303 .default_value
304 .as_ref()
305 .map(|v| v.as_ref())
306 .unwrap_or_else(|| self.definition.value.value());
307 if local {
308 self.local_value = Some(value.box_clone());
309 } else {
310 self.local_value = None;
311 self.staged_value = Some(value.box_clone());
312 }
313 }
314
315 #[must_use]
317 pub fn end_transaction(&self, action: EndTransactionAction) -> Option<Self> {
318 if !self.is_mutating() {
319 return None;
320 }
321 let mut next: Self = self.clone();
322 next.local_value = None;
323 match action {
324 EndTransactionAction::Commit if next.staged_value.is_some() => {
325 next.session_value = next.staged_value.take()
326 }
327 _ => next.staged_value = None,
328 }
329 Some(next)
330 }
331
332 pub fn is_mutating(&self) -> bool {
334 self.local_value.is_some() || self.staged_value.is_some()
335 }
336
337 pub fn value_dyn(&self) -> &dyn Value {
338 self.local_value
339 .as_deref()
340 .or(self.staged_value.as_deref())
341 .or(self.session_value.as_deref())
342 .or(self.default_value.as_deref())
343 .unwrap_or_else(|| self.definition.value.value())
344 }
345
346 pub fn inspect_session_value(&self) -> Option<&dyn Value> {
351 self.session_value.as_deref()
352 }
353
354 fn validate_constraints(&self, val: &dyn Value) -> Result<(), VarError> {
355 if let Some(constraint) = &self.definition.constraint {
356 constraint.check_constraint(self, self.value_dyn(), val)
357 } else {
358 Ok(())
359 }
360 }
361}
362
363impl Var for SessionVar {
364 fn name(&self) -> &'static str {
365 self.definition.name.as_str()
366 }
367
368 fn value(&self) -> String {
369 self.value_dyn().format()
370 }
371
372 fn description(&self) -> &'static str {
373 self.definition.description
374 }
375
376 fn type_name(&self) -> Cow<'static, str> {
377 self.definition.type_name()
378 }
379
380 fn visible(
381 &self,
382 user: &User,
383 system_vars: &super::vars::SystemVars,
384 ) -> Result<(), super::vars::VarError> {
385 self.definition.visible(user, system_vars)
386 }
387}
388
389#[derive(Debug, Clone, PartialEq, Eq)]
390pub struct MzVersion {
391 build_info: &'static BuildInfo,
393 helm_chart_version: Option<String>,
395}
396
397impl MzVersion {
398 pub fn new(build_info: &'static BuildInfo, helm_chart_version: Option<String>) -> Self {
399 MzVersion {
400 build_info,
401 helm_chart_version,
402 }
403 }
404}
405
406#[derive(Debug, Clone)]
411pub struct SessionVars {
412 vars: OrdMap<&'static UncasedStr, SessionVar>,
414 mz_version: MzVersion,
416 user: User,
418}
419
420impl SessionVars {
421 pub fn new_unchecked(
423 build_info: &'static BuildInfo,
424 user: User,
425 helm_chart_version: Option<String>,
426 ) -> SessionVars {
427 use definitions::*;
428
429 let vars = [
430 &FAILPOINTS,
431 &SERVER_VERSION,
432 &SERVER_VERSION_NUM,
433 &SQL_SAFE_UPDATES,
434 &REAL_TIME_RECENCY,
435 &EMIT_PLAN_INSIGHTS_NOTICE,
436 &EMIT_TIMESTAMP_NOTICE,
437 &EMIT_TRACE_ID_NOTICE,
438 &AUTO_ROUTE_CATALOG_QUERIES,
439 &ENABLE_SESSION_RBAC_CHECKS,
440 &RESTRICT_TO_USER_OBJECTS,
441 &ENABLE_SESSION_CARDINALITY_ESTIMATES,
442 &MAX_IDENTIFIER_LENGTH,
443 &STATEMENT_LOGGING_SAMPLE_RATE,
444 &EMIT_INTROSPECTION_QUERY_NOTICE,
445 &UNSAFE_NEW_TRANSACTION_WALL_TIME,
446 &WELCOME_MESSAGE,
447 ]
448 .into_iter()
449 .chain(SESSION_SYSTEM_VARS.iter().map(|(_name, var)| *var))
450 .map(|var| (var.name, SessionVar::new(var.clone())))
451 .collect();
452
453 SessionVars {
454 vars,
455 mz_version: MzVersion::new(build_info, helm_chart_version),
456 user,
457 }
458 }
459
460 fn expect_value<V: Value>(&self, var: &VarDefinition) -> &V {
461 let var = self
462 .vars
463 .get(var.name)
464 .expect("provided var should be in state");
465 let val = var.value_dyn();
466 val.as_any().downcast_ref::<V>().expect("success")
467 }
468
469 pub fn iter(&self) -> impl Iterator<Item = &dyn Var> {
476 #[allow(clippy::as_conversions)]
477 self.vars
478 .values()
479 .map(|v| v.as_var())
480 .chain([&self.mz_version as &dyn Var, &self.user])
481 }
482
483 pub fn notify_set(&self) -> impl Iterator<Item = &dyn Var> {
487 [
494 &APPLICATION_NAME,
495 &CLIENT_ENCODING,
496 &DATE_STYLE,
497 &INTEGER_DATETIMES,
498 &SERVER_VERSION,
499 &STANDARD_CONFORMING_STRINGS,
500 &TIMEZONE,
501 &INTERVAL_STYLE,
502 &CLUSTER,
507 &CLUSTER_REPLICA,
508 &DEFAULT_CLUSTER_REPLICATION_FACTOR,
509 &DATABASE,
510 &SEARCH_PATH,
511 ]
512 .into_iter()
513 .map(|v| self.vars[v.name].as_var())
514 .chain(std::iter::once(self.mz_version.as_var()))
521 }
522
523 pub fn reset_all(&mut self) {
525 let names: Vec<_> = self.vars.keys().copied().collect();
526 for name in names {
527 self.vars[name].reset(false);
528 }
529 }
530
531 pub fn get(&self, system_vars: &SystemVars, name: &str) -> Result<&dyn Var, VarError> {
542 let name = compat_translate_name(name);
543
544 let name = UncasedStr::new(name);
545 if name == MZ_VERSION_NAME {
546 Ok(&self.mz_version)
547 } else if name == IS_SUPERUSER_NAME {
548 Ok(&self.user)
549 } else {
550 self.vars
551 .get(name)
552 .map(|v| {
553 v.visible(&self.user, system_vars)?;
554 Ok(v.as_var())
555 })
556 .transpose()?
557 .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
558 }
559 }
560
561 pub fn inspect(&self, name: &str) -> Result<&SessionVar, VarError> {
566 let name = compat_translate_name(name);
567
568 self.vars
569 .get(UncasedStr::new(name))
570 .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
571 }
572
573 pub fn set(
586 &mut self,
587 system_vars: &SystemVars,
588 name: &str,
589 input: VarInput,
590 local: bool,
591 ) -> Result<(), VarError> {
592 let (name, input) = compat_translate(name, input);
593
594 check_transaction_isolation_feature_flag(name, input, system_vars)?;
595
596 let name = UncasedStr::new(name);
597 self.check_read_only(name)?;
598
599 self.vars
600 .get_mut(name)
601 .map(|v| {
602 v.visible(&self.user, system_vars)?;
603 v.set(input, local)
604 })
605 .transpose()?
606 .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
607 }
608
609 pub fn set_default(&mut self, name: &str, input: VarInput) -> Result<(), VarError> {
612 let (name, input) = compat_translate(name, input);
613
614 let name = UncasedStr::new(name);
615
616 if !Self::allow_role_default(name) {
620 self.check_read_only(name)?;
621 }
622
623 self.vars
624 .get_mut(name)
625 .map(|v| v.set_default(input))
627 .transpose()?
628 .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
629 }
630
631 fn allow_role_default(name: &UncasedStr) -> bool {
639 name == RESTRICT_TO_USER_OBJECTS.name
640 }
641
642 pub fn reset(
656 &mut self,
657 system_vars: &SystemVars,
658 name: &str,
659 local: bool,
660 ) -> Result<(), VarError> {
661 let name = compat_translate_name(name);
662
663 let name = UncasedStr::new(name);
664 self.check_read_only(name)?;
665
666 self.vars
667 .get_mut(name)
668 .map(|v| {
669 v.visible(&self.user, system_vars)?;
670 v.reset(local);
671 Ok(())
672 })
673 .transpose()?
674 .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
675 }
676
677 fn check_read_only(&self, name: &UncasedStr) -> Result<(), VarError> {
684 if name == MZ_VERSION_NAME {
685 Err(VarError::ReadOnlyParameter(MZ_VERSION_NAME.as_str()))
686 } else if name == IS_SUPERUSER_NAME {
687 Err(VarError::ReadOnlyParameter(IS_SUPERUSER_NAME.as_str()))
688 } else if name == MAX_IDENTIFIER_LENGTH.name {
689 Err(VarError::ReadOnlyParameter(
690 MAX_IDENTIFIER_LENGTH.name.as_str(),
691 ))
692 } else if name == RESTRICT_TO_USER_OBJECTS.name {
693 Err(VarError::ReadOnlyParameter(
697 RESTRICT_TO_USER_OBJECTS.name.as_str(),
698 ))
699 } else {
700 Ok(())
701 }
702 }
703
704 #[mz_ore::instrument(level = "debug")]
709 pub fn end_transaction(
710 &mut self,
711 action: EndTransactionAction,
712 ) -> BTreeMap<&'static str, String> {
713 let mut changed = BTreeMap::new();
714 let mut updates = Vec::new();
715 for (name, var) in self.vars.iter() {
716 if !var.is_mutating() {
717 continue;
718 }
719 let before = var.value();
720 let next = var.end_transaction(action).expect("must mutate");
721 let after = next.value();
722 updates.push((*name, next));
723
724 if before != after {
726 changed.insert(var.name(), after);
727 }
728 }
729 self.vars.extend(updates);
730 changed
731 }
732
733 pub fn application_name(&self) -> &str {
735 self.expect_value::<String>(&APPLICATION_NAME).as_str()
736 }
737
738 pub fn build_info(&self) -> &'static BuildInfo {
740 self.mz_version.build_info
741 }
742
743 pub fn client_encoding(&self) -> &ClientEncoding {
745 self.expect_value(&CLIENT_ENCODING)
746 }
747
748 pub fn client_min_messages(&self) -> &ClientSeverity {
750 self.expect_value(&CLIENT_MIN_MESSAGES)
751 }
752
753 pub fn cluster(&self) -> &str {
755 self.expect_value::<String>(&CLUSTER).as_str()
756 }
757
758 pub fn cluster_replica(&self) -> Option<&str> {
760 self.expect_value::<Option<String>>(&CLUSTER_REPLICA)
761 .as_deref()
762 }
763
764 pub fn current_object_missing_warnings(&self) -> bool {
767 *self.expect_value::<bool>(&CURRENT_OBJECT_MISSING_WARNINGS)
768 }
769
770 pub fn date_style(&self) -> &[&str] {
772 &self.expect_value::<DateStyle>(&DATE_STYLE).0
773 }
774
775 pub fn database(&self) -> &str {
777 self.expect_value::<String>(&DATABASE).as_str()
778 }
779
780 pub fn extra_float_digits(&self) -> i32 {
782 *self.expect_value(&EXTRA_FLOAT_DIGITS)
783 }
784
785 pub fn integer_datetimes(&self) -> bool {
787 *self.expect_value(&INTEGER_DATETIMES)
788 }
789
790 pub fn intervalstyle(&self) -> &IntervalStyle {
792 self.expect_value(&INTERVAL_STYLE)
793 }
794
795 pub fn mz_version(&self) -> String {
797 self.mz_version.value()
798 }
799
800 pub fn search_path(&self) -> &[Ident] {
802 self.expect_value::<Vec<Ident>>(&SEARCH_PATH).as_slice()
803 }
804
805 pub fn server_version(&self) -> &str {
807 self.expect_value::<String>(&SERVER_VERSION).as_str()
808 }
809
810 pub fn server_version_num(&self) -> i32 {
812 *self.expect_value(&SERVER_VERSION_NUM)
813 }
814
815 pub fn sql_safe_updates(&self) -> bool {
817 *self.expect_value(&SQL_SAFE_UPDATES)
818 }
819
820 pub fn standard_conforming_strings(&self) -> bool {
823 *self.expect_value(&STANDARD_CONFORMING_STRINGS)
824 }
825
826 pub fn statement_timeout(&self) -> &Duration {
828 self.expect_value(&STATEMENT_TIMEOUT)
829 }
830
831 pub fn idle_in_transaction_session_timeout(&self) -> &Duration {
833 self.expect_value(&IDLE_IN_TRANSACTION_SESSION_TIMEOUT)
834 }
835
836 pub fn timezone(&self) -> &TimeZone {
838 self.expect_value(&TIMEZONE)
839 }
840
841 pub fn transaction_isolation(&self) -> &IsolationLevel {
844 self.expect_value(&TRANSACTION_ISOLATION)
845 }
846
847 pub fn real_time_recency(&self) -> bool {
849 *self.expect_value(&REAL_TIME_RECENCY)
850 }
851
852 pub fn real_time_recency_timeout(&self) -> &Duration {
854 self.expect_value(&REAL_TIME_RECENCY_TIMEOUT)
855 }
856
857 pub fn emit_plan_insights_notice(&self) -> bool {
859 *self.expect_value(&EMIT_PLAN_INSIGHTS_NOTICE)
860 }
861
862 pub fn emit_timestamp_notice(&self) -> bool {
864 *self.expect_value(&EMIT_TIMESTAMP_NOTICE)
865 }
866
867 pub fn emit_trace_id_notice(&self) -> bool {
869 *self.expect_value(&EMIT_TRACE_ID_NOTICE)
870 }
871
872 pub fn auto_route_catalog_queries(&self) -> bool {
874 *self.expect_value(&AUTO_ROUTE_CATALOG_QUERIES)
875 }
876
877 pub fn enable_session_rbac_checks(&self) -> bool {
879 *self.expect_value(&ENABLE_SESSION_RBAC_CHECKS)
880 }
881
882 pub fn restrict_to_user_objects(&self) -> bool {
884 *self.expect_value(&RESTRICT_TO_USER_OBJECTS)
885 }
886
887 pub fn enable_session_cardinality_estimates(&self) -> bool {
889 *self.expect_value(&ENABLE_SESSION_CARDINALITY_ESTIMATES)
890 }
891
892 pub fn is_superuser(&self) -> bool {
894 self.user.is_superuser()
895 }
896
897 pub fn user(&self) -> &User {
899 &self.user
900 }
901
902 pub fn max_query_result_size(&self) -> u64 {
904 self.expect_value::<ByteSize>(&MAX_QUERY_RESULT_SIZE)
905 .as_bytes()
906 }
907
908 pub fn set_internal_user_metadata(&mut self, metadata: InternalUserMetadata) {
910 self.user.internal_metadata = Some(metadata);
911 }
912
913 pub fn set_external_user_metadata(&mut self, metadata: ExternalUserMetadata) {
915 self.user.external_metadata = Some(metadata);
916 }
917
918 pub fn set_cluster(&mut self, cluster: String) {
919 let var = self
920 .vars
921 .get_mut(UncasedStr::new(CLUSTER.name()))
922 .expect("cluster variable must exist");
923 var.set(VarInput::Flat(&cluster), false)
924 .expect("setting cluster must succeed");
925 }
926
927 pub fn set_local_transaction_isolation(&mut self, transaction_isolation: IsolationLevel) {
928 let var = self
929 .vars
930 .get_mut(UncasedStr::new(TRANSACTION_ISOLATION.name()))
931 .expect("transaction_isolation variable must exist");
932 var.set(VarInput::Flat(&transaction_isolation.to_string()), true)
933 .expect("setting transaction isolation must succeed");
934 }
935
936 pub fn get_statement_logging_sample_rate(&self) -> Numeric {
937 *self.expect_value(&STATEMENT_LOGGING_SAMPLE_RATE)
938 }
939
940 pub fn emit_introspection_query_notice(&self) -> bool {
942 *self.expect_value(&EMIT_INTROSPECTION_QUERY_NOTICE)
943 }
944
945 pub fn unsafe_new_transaction_wall_time(&self) -> Option<CheckedTimestamp<DateTime<Utc>>> {
946 *self.expect_value(&UNSAFE_NEW_TRANSACTION_WALL_TIME)
947 }
948
949 pub fn welcome_message(&self) -> bool {
951 *self.expect_value(&WELCOME_MESSAGE)
952 }
953}
954
955pub const OLD_CATALOG_SERVER_CLUSTER: &str = "mz_introspection";
957pub const OLD_AUTO_ROUTE_CATALOG_QUERIES: &str = "auto_route_introspection_queries";
958
959fn compat_translate<'a, 'b>(name: &'a str, input: VarInput<'b>) -> (&'a str, VarInput<'b>) {
967 if name == CLUSTER.name() {
968 if let Ok(value) = CLUSTER.parse(input) {
969 if value.format() == OLD_CATALOG_SERVER_CLUSTER {
970 tracing::debug!(
971 github_27285 = true,
972 "encountered deprecated `cluster` variable value: {}",
973 OLD_CATALOG_SERVER_CLUSTER,
974 );
975 return (name, VarInput::Flat("mz_catalog_server"));
976 }
977 }
978 }
979
980 if name == OLD_AUTO_ROUTE_CATALOG_QUERIES {
981 tracing::debug!(
982 github_27285 = true,
983 "encountered deprecated `{}` variable name",
984 OLD_AUTO_ROUTE_CATALOG_QUERIES,
985 );
986 return (AUTO_ROUTE_CATALOG_QUERIES.name(), input);
987 }
988
989 (name, input)
990}
991
992fn compat_translate_name(name: &str) -> &str {
993 let (name, _) = compat_translate(name, VarInput::Flat(""));
994 name
995}
996
997pub fn check_transaction_isolation_feature_flag(
1007 name: &str,
1008 input: VarInput,
1009 system_vars: &SystemVars,
1010) -> Result<(), VarError> {
1011 if UncasedStr::new(name) != UncasedStr::new(TRANSACTION_ISOLATION_VAR_NAME) {
1012 return Ok(());
1013 }
1014 let Ok(level) = IsolationLevel::parse(input) else {
1016 return Ok(());
1017 };
1018 match level {
1019 IsolationLevel::StrongSessionSerializable => ENABLE_SESSION_TIMELINES.require(system_vars),
1020 IsolationLevel::BoundedStaleness(_) => {
1021 ENABLE_BOUNDED_STALENESS_ISOLATION.require(system_vars)
1022 }
1023 _ => Ok(()),
1024 }
1025}
1026
1027#[derive(Debug)]
1030pub struct SystemVar {
1031 definition: VarDefinition,
1032 persisted_value: Option<Box<dyn Value>>,
1034 dynamic_default: Option<Box<dyn Value>>,
1036}
1037
1038impl Clone for SystemVar {
1039 fn clone(&self) -> Self {
1040 SystemVar {
1041 definition: self.definition.clone(),
1042 persisted_value: self.persisted_value.as_ref().map(|v| v.box_clone()),
1043 dynamic_default: self.dynamic_default.as_ref().map(|v| v.box_clone()),
1044 }
1045 }
1046}
1047
1048impl SystemVar {
1049 pub fn new(definition: VarDefinition) -> Self {
1050 SystemVar {
1051 definition,
1052 persisted_value: None,
1053 dynamic_default: None,
1054 }
1055 }
1056
1057 fn is_default(&self, input: VarInput) -> Result<bool, VarError> {
1058 let v = self.definition.parse(input)?;
1059 Ok(self.definition.default_value() == v.as_ref())
1060 }
1061
1062 pub fn value_dyn(&self) -> &dyn Value {
1063 self.persisted_value
1064 .as_deref()
1065 .or(self.dynamic_default.as_deref())
1066 .unwrap_or_else(|| self.definition.default_value())
1067 }
1068
1069 pub fn value<V: 'static>(&self) -> &V {
1070 let val = self.value_dyn();
1071 val.as_any().downcast_ref::<V>().expect("success")
1072 }
1073
1074 fn parse(&self, input: VarInput) -> Result<Box<dyn Value>, VarError> {
1075 let v = self.definition.parse(input)?;
1076 self.validate_constraints(v.as_ref())?;
1078 Ok(v)
1079 }
1080
1081 fn set(&mut self, input: VarInput) -> Result<bool, VarError> {
1082 let v = self.parse(input)?;
1083
1084 if self.persisted_value.as_ref() != Some(&v) {
1085 self.persisted_value = Some(v);
1086 Ok(true)
1087 } else {
1088 Ok(false)
1089 }
1090 }
1091
1092 fn reset(&mut self) -> bool {
1093 if self.persisted_value.is_some() {
1094 self.persisted_value = None;
1095 true
1096 } else {
1097 false
1098 }
1099 }
1100
1101 fn set_default(&mut self, input: VarInput) -> Result<(), VarError> {
1102 let v = self.definition.parse(input)?;
1103 self.dynamic_default = Some(v);
1104 Ok(())
1105 }
1106
1107 fn validate_constraints(&self, val: &dyn Value) -> Result<(), VarError> {
1108 if let Some(constraint) = &self.definition.constraint {
1109 constraint.check_constraint(self, self.value_dyn(), val)
1110 } else {
1111 Ok(())
1112 }
1113 }
1114}
1115
1116impl Var for SystemVar {
1117 fn name(&self) -> &'static str {
1118 self.definition.name.as_str()
1119 }
1120
1121 fn value(&self) -> String {
1122 self.value_dyn().format()
1123 }
1124
1125 fn description(&self) -> &'static str {
1126 self.definition.description
1127 }
1128
1129 fn type_name(&self) -> Cow<'static, str> {
1130 self.definition.type_name()
1131 }
1132
1133 fn scope(&self) -> ParameterScope {
1134 self.definition.scope()
1135 }
1136
1137 fn visible(&self, user: &User, system_vars: &SystemVars) -> Result<(), VarError> {
1138 self.definition.visible(user, system_vars)
1139 }
1140}
1141
1142#[derive(Debug, Error)]
1143pub enum NetworkPolicyError {
1144 #[error("Access denied for address {0}")]
1145 AddressDenied(IpAddr),
1146}
1147
1148#[derive(Derivative, Clone)]
1153#[derivative(Debug)]
1154pub struct SystemVars {
1155 allow_unsafe: bool,
1157 vars: BTreeMap<&'static UncasedStr, SystemVar>,
1159 #[derivative(Debug = "ignore")]
1161 callbacks: BTreeMap<String, Vec<Arc<dyn Fn(&SystemVars) + Send + Sync>>>,
1162
1163 dyncfgs: ConfigSet,
1167}
1168
1169impl Default for SystemVars {
1170 fn default() -> Self {
1171 Self::new()
1172 }
1173}
1174
1175impl SystemVars {
1176 pub fn new() -> Self {
1177 let system_vars = vec![
1178 &MAX_KAFKA_CONNECTIONS,
1179 &MAX_POSTGRES_CONNECTIONS,
1180 &MAX_MYSQL_CONNECTIONS,
1181 &MAX_SQL_SERVER_CONNECTIONS,
1182 &MAX_AWS_PRIVATELINK_CONNECTIONS,
1183 &MAX_TABLES,
1184 &MAX_SOURCES,
1185 &MAX_SINKS,
1186 &MAX_MATERIALIZED_VIEWS,
1187 &MAX_CLUSTERS,
1188 &MAX_REPLICAS_PER_CLUSTER,
1189 &MAX_CREDIT_CONSUMPTION_RATE,
1190 &MAX_DATABASES,
1191 &MAX_SCHEMAS_PER_DATABASE,
1192 &MAX_OBJECTS_PER_SCHEMA,
1193 &MAX_SECRETS,
1194 &MAX_ROLES,
1195 &MAX_NETWORK_POLICIES,
1196 &MAX_RULES_PER_NETWORK_POLICY,
1197 &MAX_RESULT_SIZE,
1198 &MAX_COPY_FROM_ROW_SIZE,
1199 &ALLOWED_CLUSTER_REPLICA_SIZES,
1200 &upsert_rocksdb::UPSERT_ROCKSDB_COMPACTION_STYLE,
1201 &upsert_rocksdb::UPSERT_ROCKSDB_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET,
1202 &upsert_rocksdb::UPSERT_ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES,
1203 &upsert_rocksdb::UPSERT_ROCKSDB_UNIVERSAL_COMPACTION_RATIO,
1204 &upsert_rocksdb::UPSERT_ROCKSDB_PARALLELISM,
1205 &upsert_rocksdb::UPSERT_ROCKSDB_COMPRESSION_TYPE,
1206 &upsert_rocksdb::UPSERT_ROCKSDB_BOTTOMMOST_COMPRESSION_TYPE,
1207 &upsert_rocksdb::UPSERT_ROCKSDB_BATCH_SIZE,
1208 &upsert_rocksdb::UPSERT_ROCKSDB_RETRY_DURATION,
1209 &upsert_rocksdb::UPSERT_ROCKSDB_STATS_LOG_INTERVAL_SECONDS,
1210 &upsert_rocksdb::UPSERT_ROCKSDB_STATS_PERSIST_INTERVAL_SECONDS,
1211 &upsert_rocksdb::UPSERT_ROCKSDB_POINT_LOOKUP_BLOCK_CACHE_SIZE_MB,
1212 &upsert_rocksdb::UPSERT_ROCKSDB_SHRINK_ALLOCATED_BUFFERS_BY_RATIO,
1213 &upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_CLUSTER_MEMORY_FRACTION,
1214 &upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_MEMORY_BYTES,
1215 &upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_ALLOW_STALL,
1216 &STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES,
1217 &STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_TO_CLUSTER_SIZE_FRACTION,
1218 &STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_DISK_ONLY,
1219 &STORAGE_STATISTICS_INTERVAL,
1220 &STORAGE_STATISTICS_COLLECTION_INTERVAL,
1221 &STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO,
1222 &STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS,
1223 &PERSIST_FAST_PATH_LIMIT,
1224 &METRICS_RETENTION,
1225 &UNSAFE_MOCK_AUDIT_EVENT_TIMESTAMP,
1226 &ENABLE_RBAC_CHECKS,
1227 &PG_SOURCE_CONNECT_TIMEOUT,
1228 &PG_SOURCE_TCP_KEEPALIVES_IDLE,
1229 &PG_SOURCE_TCP_KEEPALIVES_INTERVAL,
1230 &PG_SOURCE_TCP_KEEPALIVES_RETRIES,
1231 &PG_SOURCE_TCP_USER_TIMEOUT,
1232 &PG_SOURCE_TCP_CONFIGURE_SERVER,
1233 &PG_SOURCE_SNAPSHOT_STATEMENT_TIMEOUT,
1234 &PG_SOURCE_WAL_SENDER_TIMEOUT,
1235 &PG_SOURCE_SNAPSHOT_COLLECT_STRICT_COUNT,
1236 &MYSQL_SOURCE_TCP_KEEPALIVE,
1237 &MYSQL_SOURCE_SNAPSHOT_MAX_EXECUTION_TIME,
1238 &MYSQL_SOURCE_SNAPSHOT_LOCK_WAIT_TIMEOUT,
1239 &MYSQL_SOURCE_CONNECT_TIMEOUT,
1240 &SSH_CHECK_INTERVAL,
1241 &SSH_CONNECT_TIMEOUT,
1242 &SSH_KEEPALIVES_IDLE,
1243 &KAFKA_SOCKET_KEEPALIVE,
1244 &KAFKA_SOCKET_TIMEOUT,
1245 &KAFKA_TRANSACTION_TIMEOUT,
1246 &KAFKA_SOCKET_CONNECTION_SETUP_TIMEOUT,
1247 &KAFKA_FETCH_METADATA_TIMEOUT,
1248 &KAFKA_PROGRESS_RECORD_FETCH_TIMEOUT,
1249 &ENABLE_LAUNCHDARKLY,
1250 &MAX_CONNECTIONS,
1251 &NETWORK_POLICY,
1252 &SUPERUSER_RESERVED_CONNECTIONS,
1253 &KEEP_N_SOURCE_STATUS_HISTORY_ENTRIES,
1254 &KEEP_N_SINK_STATUS_HISTORY_ENTRIES,
1255 &KEEP_N_PRIVATELINK_STATUS_HISTORY_ENTRIES,
1256 &REPLICA_STATUS_HISTORY_RETENTION_WINDOW,
1257 &ENABLE_STORAGE_SHARD_FINALIZATION,
1258 &ENABLE_DEFAULT_CONNECTION_VALIDATION,
1259 &DEFAULT_TIMESTAMP_INTERVAL,
1260 &MIN_TIMESTAMP_INTERVAL,
1261 &MAX_TIMESTAMP_INTERVAL,
1262 &LOGGING_FILTER,
1263 &OPENTELEMETRY_FILTER,
1264 &LOGGING_FILTER_DEFAULTS,
1265 &OPENTELEMETRY_FILTER_DEFAULTS,
1266 &SENTRY_FILTERS,
1267 &WEBHOOKS_SECRETS_CACHING_TTL_SECS,
1268 &COORD_SLOW_MESSAGE_WARN_THRESHOLD,
1269 &grpc_client::CONNECT_TIMEOUT,
1270 &grpc_client::HTTP2_KEEP_ALIVE_INTERVAL,
1271 &grpc_client::HTTP2_KEEP_ALIVE_TIMEOUT,
1272 &cluster_scheduling::CLUSTER_MULTI_PROCESS_REPLICA_AZ_AFFINITY_WEIGHT,
1273 &cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY,
1274 &cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT,
1275 &cluster_scheduling::CLUSTER_ENABLE_TOPOLOGY_SPREAD,
1276 &cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE,
1277 &cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MAX_SKEW,
1278 &cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MIN_DOMAINS,
1279 &cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_SOFT,
1280 &cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY,
1281 &cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY_WEIGHT,
1282 &cluster_scheduling::CLUSTER_ALTER_CHECK_READY_INTERVAL,
1283 &cluster_scheduling::CLUSTER_CHECK_SCHEDULING_POLICIES_INTERVAL,
1284 &cluster_scheduling::CLUSTER_SECURITY_CONTEXT_ENABLED,
1285 &cluster_scheduling::CLUSTER_REFRESH_MV_COMPACTION_ESTIMATE,
1286 &grpc_client::HTTP2_KEEP_ALIVE_TIMEOUT,
1287 &STATEMENT_LOGGING_MAX_SAMPLE_RATE,
1288 &STATEMENT_LOGGING_DEFAULT_SAMPLE_RATE,
1289 &STATEMENT_LOGGING_TARGET_DATA_RATE,
1290 &STATEMENT_LOGGING_MAX_DATA_CREDIT,
1291 &ENABLE_INTERNAL_STATEMENT_LOGGING,
1292 &OPTIMIZER_STATS_TIMEOUT,
1293 &OPTIMIZER_ONESHOT_STATS_TIMEOUT,
1294 &PRIVATELINK_STATUS_UPDATE_QUOTA_PER_MINUTE,
1295 &WEBHOOK_CONCURRENT_REQUEST_LIMIT,
1296 &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_SIZE,
1297 &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_WAIT,
1298 &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL,
1299 &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL_STAGGER,
1300 &USER_STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION,
1301 &FORCE_SOURCE_TABLE_SYNTAX,
1302 &OPTIMIZER_E2E_LATENCY_WARNING_THRESHOLD,
1303 &SCRAM_ITERATIONS,
1304 ];
1305
1306 let dyncfgs = mz_dyncfgs::all_dyncfgs();
1307 let dyncfg_vars: Vec<_> = dyncfgs
1308 .entries()
1309 .map(|cfg| {
1310 let var = match cfg.default() {
1311 ConfigVal::Bool(default) => {
1312 VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
1313 }
1314 ConfigVal::U32(default) => {
1315 VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
1316 }
1317 ConfigVal::Usize(default) => {
1318 VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
1319 }
1320 ConfigVal::OptUsize(default) => {
1321 VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
1322 }
1323 ConfigVal::F64(default) => {
1324 VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
1325 }
1326 ConfigVal::String(default) => {
1327 VarDefinition::new_runtime(cfg.name(), default.clone(), cfg.desc(), false)
1328 }
1329 ConfigVal::OptString(default) => {
1330 VarDefinition::new_runtime(cfg.name(), default.clone(), cfg.desc(), false)
1331 }
1332 ConfigVal::Duration(default) => {
1333 VarDefinition::new_runtime(cfg.name(), default.clone(), cfg.desc(), false)
1334 }
1335 ConfigVal::Json(default) => {
1336 VarDefinition::new_runtime(cfg.name(), default.clone(), cfg.desc(), false)
1337 }
1338 };
1339 var.scoped(cfg.scope())
1342 })
1343 .collect();
1344
1345 let vars: BTreeMap<_, _> = system_vars
1346 .into_iter()
1347 .chain(definitions::FEATURE_FLAGS.iter().copied())
1349 .chain(SESSION_SYSTEM_VARS.values().copied())
1351 .cloned()
1352 .chain(dyncfg_vars)
1354 .map(|var| (var.name, SystemVar::new(var)))
1355 .collect();
1356
1357 let vars = SystemVars {
1358 vars,
1359 callbacks: BTreeMap::new(),
1360 allow_unsafe: false,
1361 dyncfgs,
1362 };
1363
1364 vars
1365 }
1366
1367 pub fn dyncfgs(&self) -> &ConfigSet {
1368 &self.dyncfgs
1369 }
1370
1371 pub fn set_unsafe(mut self, allow_unsafe: bool) -> Self {
1372 self.allow_unsafe = allow_unsafe;
1373 self
1374 }
1375
1376 pub fn allow_unsafe(&self) -> bool {
1377 self.allow_unsafe
1378 }
1379
1380 fn expect_value<V: 'static>(&self, var: &VarDefinition) -> &V {
1381 let val = self
1382 .vars
1383 .get(var.name)
1384 .expect("provided var should be in state");
1385
1386 val.value_dyn()
1387 .as_any()
1388 .downcast_ref::<V>()
1389 .expect("provided var type should matched stored var")
1390 }
1391
1392 fn expect_config_value<V: ConfigType + 'static>(&self, name: &UncasedStr) -> &V {
1393 let val = self
1394 .vars
1395 .get(name)
1396 .unwrap_or_else(|| panic!("provided var {name} should be in state"));
1397
1398 val.value_dyn()
1399 .as_any()
1400 .downcast_ref()
1401 .expect("provided var type should matched stored var")
1402 }
1403
1404 pub fn reset_all(&mut self) {
1407 for (_, var) in &mut self.vars {
1408 var.reset();
1409 }
1410 }
1411
1412 pub fn iter(&self) -> impl Iterator<Item = &dyn Var> {
1415 self.vars
1416 .values()
1417 .map(|v| v.as_var())
1418 .filter(|v| !SESSION_SYSTEM_VARS.contains_key(UncasedStr::new(v.name())))
1419 }
1420
1421 pub fn iter_synced(&self) -> impl Iterator<Item = &dyn Var> {
1425 self.iter().filter(|v| v.name() != ENABLE_LAUNCHDARKLY.name)
1426 }
1427
1428 pub fn iter_session(&self) -> impl Iterator<Item = &dyn Var> {
1430 self.vars
1431 .values()
1432 .map(|v| v.as_var())
1433 .filter(|v| SESSION_SYSTEM_VARS.contains_key(UncasedStr::new(v.name())))
1434 }
1435
1436 pub fn user_modifiable(&self, name: &str) -> bool {
1438 SESSION_SYSTEM_VARS.contains_key(UncasedStr::new(name))
1439 || name == ENABLE_RBAC_CHECKS.name()
1440 || name == NETWORK_POLICY.name()
1441 }
1442
1443 pub fn get(&self, name: &str) -> Result<&dyn Var, VarError> {
1464 self.vars
1465 .get(UncasedStr::new(name))
1466 .map(|v| v.as_var())
1467 .ok_or_else(|| VarError::UnknownParameter(name.into()))
1468 }
1469
1470 pub fn is_default(&self, name: &str, input: VarInput) -> Result<bool, VarError> {
1484 self.vars
1485 .get(UncasedStr::new(name))
1486 .ok_or_else(|| VarError::UnknownParameter(name.into()))
1487 .and_then(|v| v.is_default(input))
1488 }
1489
1490 pub fn set(&mut self, name: &str, input: VarInput) -> Result<bool, VarError> {
1513 let result = self
1514 .vars
1515 .get_mut(UncasedStr::new(name))
1516 .ok_or_else(|| VarError::UnknownParameter(name.into()))
1517 .and_then(|v| v.set(input))?;
1518 self.notify_callbacks(name);
1519 Ok(result)
1520 }
1521
1522 pub fn parse(&self, name: &str, input: VarInput) -> Result<Box<dyn Value>, VarError> {
1543 self.vars
1544 .get(UncasedStr::new(name))
1545 .ok_or_else(|| VarError::UnknownParameter(name.into()))
1546 .and_then(|v| v.parse(input))
1547 }
1548
1549 pub fn set_default(&mut self, name: &str, input: VarInput) -> Result<(), VarError> {
1557 self.vars
1558 .get_mut(UncasedStr::new(name))
1559 .ok_or_else(|| VarError::UnknownParameter(name.into()))
1560 .and_then(|v| v.set_default(input))?;
1561 self.notify_callbacks(name);
1562 Ok(())
1563 }
1564
1565 pub fn reset(&mut self, name: &str) -> Result<bool, VarError> {
1583 let result = self
1584 .vars
1585 .get_mut(UncasedStr::new(name))
1586 .ok_or_else(|| VarError::UnknownParameter(name.into()))
1587 .map(|v| v.reset())?;
1588 self.notify_callbacks(name);
1589 Ok(result)
1590 }
1591
1592 pub fn defaults(&self) -> BTreeMap<String, String> {
1594 self.vars
1595 .iter()
1596 .map(|(name, var)| {
1597 let default = var
1598 .dynamic_default
1599 .as_deref()
1600 .unwrap_or_else(|| var.definition.default_value());
1601 (name.as_str().to_owned(), default.format())
1602 })
1603 .collect()
1604 }
1605
1606 pub fn register_callback(
1611 &mut self,
1612 var: &VarDefinition,
1613 callback: Arc<dyn Fn(&SystemVars) + Send + Sync>,
1614 ) {
1615 self.callbacks
1616 .entry(var.name().to_string())
1617 .or_default()
1618 .push(callback);
1619 self.notify_callbacks(var.name());
1620 }
1621
1622 fn notify_callbacks(&self, name: &str) {
1624 if let Some(callbacks) = self.callbacks.get(name) {
1626 for callback in callbacks {
1627 (callback)(self);
1628 }
1629 }
1630 }
1631
1632 pub fn default_cluster(&self) -> String {
1635 self.expect_value::<String>(&CLUSTER).to_owned()
1636 }
1637
1638 pub fn max_kafka_connections(&self) -> u32 {
1640 *self.expect_value(&MAX_KAFKA_CONNECTIONS)
1641 }
1642
1643 pub fn max_postgres_connections(&self) -> u32 {
1645 *self.expect_value(&MAX_POSTGRES_CONNECTIONS)
1646 }
1647
1648 pub fn max_mysql_connections(&self) -> u32 {
1650 *self.expect_value(&MAX_MYSQL_CONNECTIONS)
1651 }
1652
1653 pub fn max_sql_server_connections(&self) -> u32 {
1655 *self.expect_value(&MAX_SQL_SERVER_CONNECTIONS)
1656 }
1657
1658 pub fn max_aws_privatelink_connections(&self) -> u32 {
1660 *self.expect_value(&MAX_AWS_PRIVATELINK_CONNECTIONS)
1661 }
1662
1663 pub fn max_tables(&self) -> u32 {
1665 *self.expect_value(&MAX_TABLES)
1666 }
1667
1668 pub fn max_sources(&self) -> u32 {
1670 *self.expect_value(&MAX_SOURCES)
1671 }
1672
1673 pub fn max_sinks(&self) -> u32 {
1675 *self.expect_value(&MAX_SINKS)
1676 }
1677
1678 pub fn max_materialized_views(&self) -> u32 {
1680 *self.expect_value(&MAX_MATERIALIZED_VIEWS)
1681 }
1682
1683 pub fn max_clusters(&self) -> u32 {
1685 *self.expect_value(&MAX_CLUSTERS)
1686 }
1687
1688 pub fn max_replicas_per_cluster(&self) -> u32 {
1690 *self.expect_value(&MAX_REPLICAS_PER_CLUSTER)
1691 }
1692
1693 pub fn max_credit_consumption_rate(&self) -> Numeric {
1695 *self.expect_value(&MAX_CREDIT_CONSUMPTION_RATE)
1696 }
1697
1698 pub fn max_databases(&self) -> u32 {
1700 *self.expect_value(&MAX_DATABASES)
1701 }
1702
1703 pub fn max_schemas_per_database(&self) -> u32 {
1705 *self.expect_value(&MAX_SCHEMAS_PER_DATABASE)
1706 }
1707
1708 pub fn max_objects_per_schema(&self) -> u32 {
1710 *self.expect_value(&MAX_OBJECTS_PER_SCHEMA)
1711 }
1712
1713 pub fn max_secrets(&self) -> u32 {
1715 *self.expect_value(&MAX_SECRETS)
1716 }
1717
1718 pub fn max_roles(&self) -> u32 {
1720 *self.expect_value(&MAX_ROLES)
1721 }
1722
1723 pub fn max_network_policies(&self) -> u32 {
1725 *self.expect_value(&MAX_NETWORK_POLICIES)
1726 }
1727
1728 pub fn max_rules_per_network_policy(&self) -> u32 {
1730 *self.expect_value(&MAX_RULES_PER_NETWORK_POLICY)
1731 }
1732
1733 pub fn max_result_size(&self) -> u64 {
1735 self.expect_value::<ByteSize>(&MAX_RESULT_SIZE).as_bytes()
1736 }
1737
1738 pub fn max_copy_from_row_size(&self) -> u64 {
1740 self.expect_value::<ByteSize>(&MAX_COPY_FROM_ROW_SIZE)
1741 .as_bytes()
1742 }
1743
1744 pub fn allowed_cluster_replica_sizes(&self) -> Vec<String> {
1746 self.expect_value::<Vec<Ident>>(&ALLOWED_CLUSTER_REPLICA_SIZES)
1747 .into_iter()
1748 .map(|s| s.as_str().into())
1749 .collect()
1750 }
1751
1752 pub fn default_cluster_replication_factor(&self) -> u32 {
1754 *self.expect_value::<u32>(&DEFAULT_CLUSTER_REPLICATION_FACTOR)
1755 }
1756
1757 pub fn upsert_rocksdb_compaction_style(&self) -> mz_rocksdb_types::config::CompactionStyle {
1758 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_COMPACTION_STYLE)
1759 }
1760
1761 pub fn upsert_rocksdb_optimize_compaction_memtable_budget(&self) -> usize {
1762 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET)
1763 }
1764
1765 pub fn upsert_rocksdb_level_compaction_dynamic_level_bytes(&self) -> bool {
1766 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES)
1767 }
1768
1769 pub fn upsert_rocksdb_universal_compaction_ratio(&self) -> i32 {
1770 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_UNIVERSAL_COMPACTION_RATIO)
1771 }
1772
1773 pub fn upsert_rocksdb_parallelism(&self) -> Option<i32> {
1774 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_PARALLELISM)
1775 }
1776
1777 pub fn upsert_rocksdb_compression_type(&self) -> mz_rocksdb_types::config::CompressionType {
1778 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_COMPRESSION_TYPE)
1779 }
1780
1781 pub fn upsert_rocksdb_bottommost_compression_type(
1782 &self,
1783 ) -> mz_rocksdb_types::config::CompressionType {
1784 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_BOTTOMMOST_COMPRESSION_TYPE)
1785 }
1786
1787 pub fn upsert_rocksdb_batch_size(&self) -> usize {
1788 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_BATCH_SIZE)
1789 }
1790
1791 pub fn upsert_rocksdb_retry_duration(&self) -> Duration {
1792 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_RETRY_DURATION)
1793 }
1794
1795 pub fn upsert_rocksdb_stats_log_interval_seconds(&self) -> u32 {
1796 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_STATS_LOG_INTERVAL_SECONDS)
1797 }
1798
1799 pub fn upsert_rocksdb_stats_persist_interval_seconds(&self) -> u32 {
1800 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_STATS_PERSIST_INTERVAL_SECONDS)
1801 }
1802
1803 pub fn upsert_rocksdb_point_lookup_block_cache_size_mb(&self) -> Option<u32> {
1804 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_POINT_LOOKUP_BLOCK_CACHE_SIZE_MB)
1805 }
1806
1807 pub fn upsert_rocksdb_shrink_allocated_buffers_by_ratio(&self) -> usize {
1808 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_SHRINK_ALLOCATED_BUFFERS_BY_RATIO)
1809 }
1810
1811 pub fn upsert_rocksdb_write_buffer_manager_cluster_memory_fraction(&self) -> Option<Numeric> {
1812 *self.expect_value(
1813 &upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_CLUSTER_MEMORY_FRACTION,
1814 )
1815 }
1816
1817 pub fn upsert_rocksdb_write_buffer_manager_memory_bytes(&self) -> Option<usize> {
1818 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_MEMORY_BYTES)
1819 }
1820
1821 pub fn upsert_rocksdb_write_buffer_manager_allow_stall(&self) -> bool {
1822 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_ALLOW_STALL)
1823 }
1824
1825 pub fn persist_fast_path_limit(&self) -> usize {
1826 *self.expect_value(&PERSIST_FAST_PATH_LIMIT)
1827 }
1828
1829 pub fn pg_source_connect_timeout(&self) -> Duration {
1831 *self.expect_value(&PG_SOURCE_CONNECT_TIMEOUT)
1832 }
1833
1834 pub fn pg_source_tcp_keepalives_retries(&self) -> u32 {
1836 *self.expect_value(&PG_SOURCE_TCP_KEEPALIVES_RETRIES)
1837 }
1838
1839 pub fn pg_source_tcp_keepalives_idle(&self) -> Duration {
1841 *self.expect_value(&PG_SOURCE_TCP_KEEPALIVES_IDLE)
1842 }
1843
1844 pub fn pg_source_tcp_keepalives_interval(&self) -> Duration {
1846 *self.expect_value(&PG_SOURCE_TCP_KEEPALIVES_INTERVAL)
1847 }
1848
1849 pub fn pg_source_tcp_user_timeout(&self) -> Duration {
1851 *self.expect_value(&PG_SOURCE_TCP_USER_TIMEOUT)
1852 }
1853
1854 pub fn pg_source_tcp_configure_server(&self) -> bool {
1856 *self.expect_value(&PG_SOURCE_TCP_CONFIGURE_SERVER)
1857 }
1858
1859 pub fn pg_source_snapshot_statement_timeout(&self) -> Duration {
1861 *self.expect_value(&PG_SOURCE_SNAPSHOT_STATEMENT_TIMEOUT)
1862 }
1863
1864 pub fn pg_source_wal_sender_timeout(&self) -> Option<Duration> {
1866 *self.expect_value(&PG_SOURCE_WAL_SENDER_TIMEOUT)
1867 }
1868
1869 pub fn pg_source_snapshot_collect_strict_count(&self) -> bool {
1871 *self.expect_value(&PG_SOURCE_SNAPSHOT_COLLECT_STRICT_COUNT)
1872 }
1873
1874 pub fn mysql_source_tcp_keepalive(&self) -> Duration {
1876 *self.expect_value(&MYSQL_SOURCE_TCP_KEEPALIVE)
1877 }
1878
1879 pub fn mysql_source_snapshot_max_execution_time(&self) -> Duration {
1881 *self.expect_value(&MYSQL_SOURCE_SNAPSHOT_MAX_EXECUTION_TIME)
1882 }
1883
1884 pub fn mysql_source_snapshot_lock_wait_timeout(&self) -> Duration {
1886 *self.expect_value(&MYSQL_SOURCE_SNAPSHOT_LOCK_WAIT_TIMEOUT)
1887 }
1888
1889 pub fn mysql_source_connect_timeout(&self) -> Duration {
1891 *self.expect_value(&MYSQL_SOURCE_CONNECT_TIMEOUT)
1892 }
1893
1894 pub fn ssh_check_interval(&self) -> Duration {
1896 *self.expect_value(&SSH_CHECK_INTERVAL)
1897 }
1898
1899 pub fn ssh_connect_timeout(&self) -> Duration {
1901 *self.expect_value(&SSH_CONNECT_TIMEOUT)
1902 }
1903
1904 pub fn ssh_keepalives_idle(&self) -> Duration {
1906 *self.expect_value(&SSH_KEEPALIVES_IDLE)
1907 }
1908
1909 pub fn kafka_socket_keepalive(&self) -> bool {
1911 *self.expect_value(&KAFKA_SOCKET_KEEPALIVE)
1912 }
1913
1914 pub fn kafka_socket_timeout(&self) -> Option<Duration> {
1916 *self.expect_value(&KAFKA_SOCKET_TIMEOUT)
1917 }
1918
1919 pub fn kafka_transaction_timeout(&self) -> Duration {
1921 *self.expect_value(&KAFKA_TRANSACTION_TIMEOUT)
1922 }
1923
1924 pub fn kafka_socket_connection_setup_timeout(&self) -> Duration {
1926 *self.expect_value(&KAFKA_SOCKET_CONNECTION_SETUP_TIMEOUT)
1927 }
1928
1929 pub fn kafka_fetch_metadata_timeout(&self) -> Duration {
1931 *self.expect_value(&KAFKA_FETCH_METADATA_TIMEOUT)
1932 }
1933
1934 pub fn kafka_progress_record_fetch_timeout(&self) -> Option<Duration> {
1936 *self.expect_value(&KAFKA_PROGRESS_RECORD_FETCH_TIMEOUT)
1937 }
1938
1939 pub fn crdb_connect_timeout(&self) -> Duration {
1941 *self.expect_config_value(UncasedStr::new(
1942 mz_persist_client::cfg::CRDB_CONNECT_TIMEOUT.name(),
1943 ))
1944 }
1945
1946 pub fn crdb_tcp_user_timeout(&self) -> Duration {
1948 *self.expect_config_value(UncasedStr::new(
1949 mz_persist_client::cfg::CRDB_TCP_USER_TIMEOUT.name(),
1950 ))
1951 }
1952
1953 pub fn crdb_keepalives_idle(&self) -> Duration {
1955 *self.expect_config_value(UncasedStr::new(
1956 mz_persist_client::cfg::CRDB_KEEPALIVES_IDLE.name(),
1957 ))
1958 }
1959
1960 pub fn crdb_keepalives_interval(&self) -> Duration {
1962 *self.expect_config_value(UncasedStr::new(
1963 mz_persist_client::cfg::CRDB_KEEPALIVES_INTERVAL.name(),
1964 ))
1965 }
1966
1967 pub fn crdb_keepalives_retries(&self) -> u32 {
1969 *self.expect_config_value(UncasedStr::new(
1970 mz_persist_client::cfg::CRDB_KEEPALIVES_RETRIES.name(),
1971 ))
1972 }
1973
1974 pub fn storage_dataflow_max_inflight_bytes(&self) -> Option<usize> {
1976 *self.expect_value(&STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES)
1977 }
1978
1979 pub fn storage_dataflow_max_inflight_bytes_to_cluster_size_fraction(&self) -> Option<Numeric> {
1981 *self.expect_value(&STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_TO_CLUSTER_SIZE_FRACTION)
1982 }
1983
1984 pub fn storage_shrink_upsert_unused_buffers_by_ratio(&self) -> usize {
1986 *self.expect_value(&STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO)
1987 }
1988
1989 pub fn storage_dataflow_max_inflight_bytes_disk_only(&self) -> bool {
1991 *self.expect_value(&STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_DISK_ONLY)
1992 }
1993
1994 pub fn storage_statistics_interval(&self) -> Duration {
1996 *self.expect_value(&STORAGE_STATISTICS_INTERVAL)
1997 }
1998
1999 pub fn storage_statistics_collection_interval(&self) -> Duration {
2001 *self.expect_value(&STORAGE_STATISTICS_COLLECTION_INTERVAL)
2002 }
2003
2004 pub fn storage_record_source_sink_namespaced_errors(&self) -> bool {
2006 *self.expect_value(&STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS)
2007 }
2008
2009 pub fn persist_stats_filter_enabled(&self) -> bool {
2011 *self.expect_config_value(UncasedStr::new(
2012 mz_persist_client::stats::STATS_FILTER_ENABLED.name(),
2013 ))
2014 }
2015
2016 pub fn scram_iterations(&self) -> NonZeroU32 {
2017 *self.expect_value(&SCRAM_ITERATIONS)
2018 }
2019
2020 pub fn dyncfg_updates(&self) -> ConfigUpdates {
2021 let mut updates = ConfigUpdates::default();
2022 for entry in self.dyncfgs.entries() {
2023 let name = UncasedStr::new(entry.name());
2024 let val = match entry.val() {
2025 ConfigVal::Bool(_) => ConfigVal::from(*self.expect_config_value::<bool>(name)),
2026 ConfigVal::U32(_) => ConfigVal::from(*self.expect_config_value::<u32>(name)),
2027 ConfigVal::Usize(_) => ConfigVal::from(*self.expect_config_value::<usize>(name)),
2028 ConfigVal::OptUsize(_) => {
2029 ConfigVal::from(*self.expect_config_value::<Option<usize>>(name))
2030 }
2031 ConfigVal::F64(_) => ConfigVal::from(*self.expect_config_value::<f64>(name)),
2032 ConfigVal::String(_) => {
2033 ConfigVal::from(self.expect_config_value::<String>(name).clone())
2034 }
2035 ConfigVal::OptString(_) => {
2036 ConfigVal::from(self.expect_config_value::<Option<String>>(name).clone())
2037 }
2038 ConfigVal::Duration(_) => {
2039 ConfigVal::from(*self.expect_config_value::<Duration>(name))
2040 }
2041 ConfigVal::Json(_) => {
2042 ConfigVal::from(self.expect_config_value::<serde_json::Value>(name).clone())
2043 }
2044 };
2045 updates.add_dynamic(entry.name(), val);
2046 }
2047 updates.apply(&self.dyncfgs);
2048 updates
2049 }
2050
2051 pub fn metrics_retention(&self) -> Duration {
2053 *self.expect_value(&METRICS_RETENTION)
2054 }
2055
2056 pub fn unsafe_mock_audit_event_timestamp(&self) -> Option<mz_repr::Timestamp> {
2058 *self.expect_value(&UNSAFE_MOCK_AUDIT_EVENT_TIMESTAMP)
2059 }
2060
2061 pub fn enable_rbac_checks(&self) -> bool {
2063 *self.expect_value(&ENABLE_RBAC_CHECKS)
2064 }
2065
2066 pub fn max_connections(&self) -> u32 {
2068 *self.expect_value(&MAX_CONNECTIONS)
2069 }
2070
2071 pub fn default_network_policy_name(&self) -> String {
2072 self.expect_value::<String>(&NETWORK_POLICY).clone()
2073 }
2074
2075 pub fn superuser_reserved_connections(&self) -> u32 {
2077 *self.expect_value(&SUPERUSER_RESERVED_CONNECTIONS)
2078 }
2079
2080 pub fn keep_n_source_status_history_entries(&self) -> usize {
2081 *self.expect_value(&KEEP_N_SOURCE_STATUS_HISTORY_ENTRIES)
2082 }
2083
2084 pub fn keep_n_sink_status_history_entries(&self) -> usize {
2085 *self.expect_value(&KEEP_N_SINK_STATUS_HISTORY_ENTRIES)
2086 }
2087
2088 pub fn keep_n_privatelink_status_history_entries(&self) -> usize {
2089 *self.expect_value(&KEEP_N_PRIVATELINK_STATUS_HISTORY_ENTRIES)
2090 }
2091
2092 pub fn replica_status_history_retention_window(&self) -> Duration {
2093 *self.expect_value(&REPLICA_STATUS_HISTORY_RETENTION_WINDOW)
2094 }
2095
2096 pub fn enable_storage_shard_finalization(&self) -> bool {
2098 *self.expect_value(&ENABLE_STORAGE_SHARD_FINALIZATION)
2099 }
2100
2101 pub fn enable_default_connection_validation(&self) -> bool {
2103 *self.expect_value(&ENABLE_DEFAULT_CONNECTION_VALIDATION)
2104 }
2105
2106 pub fn default_timestamp_interval(&self) -> Duration {
2108 *self.expect_value(&DEFAULT_TIMESTAMP_INTERVAL)
2109 }
2110
2111 pub fn min_timestamp_interval(&self) -> Duration {
2113 *self.expect_value(&MIN_TIMESTAMP_INTERVAL)
2114 }
2115 pub fn max_timestamp_interval(&self) -> Duration {
2117 *self.expect_value(&MAX_TIMESTAMP_INTERVAL)
2118 }
2119
2120 pub fn logging_filter(&self) -> CloneableEnvFilter {
2121 self.expect_value::<CloneableEnvFilter>(&LOGGING_FILTER)
2122 .clone()
2123 }
2124
2125 pub fn opentelemetry_filter(&self) -> CloneableEnvFilter {
2126 self.expect_value::<CloneableEnvFilter>(&OPENTELEMETRY_FILTER)
2127 .clone()
2128 }
2129
2130 pub fn logging_filter_defaults(&self) -> Vec<SerializableDirective> {
2131 self.expect_value::<Vec<SerializableDirective>>(&LOGGING_FILTER_DEFAULTS)
2132 .clone()
2133 }
2134
2135 pub fn opentelemetry_filter_defaults(&self) -> Vec<SerializableDirective> {
2136 self.expect_value::<Vec<SerializableDirective>>(&OPENTELEMETRY_FILTER_DEFAULTS)
2137 .clone()
2138 }
2139
2140 pub fn sentry_filters(&self) -> Vec<SerializableDirective> {
2141 self.expect_value::<Vec<SerializableDirective>>(&SENTRY_FILTERS)
2142 .clone()
2143 }
2144
2145 pub fn webhooks_secrets_caching_ttl_secs(&self) -> usize {
2146 *self.expect_value(&WEBHOOKS_SECRETS_CACHING_TTL_SECS)
2147 }
2148
2149 pub fn coord_slow_message_warn_threshold(&self) -> Duration {
2150 *self.expect_value(&COORD_SLOW_MESSAGE_WARN_THRESHOLD)
2151 }
2152
2153 pub fn grpc_client_http2_keep_alive_interval(&self) -> Duration {
2154 *self.expect_value(&grpc_client::HTTP2_KEEP_ALIVE_INTERVAL)
2155 }
2156
2157 pub fn grpc_client_http2_keep_alive_timeout(&self) -> Duration {
2158 *self.expect_value(&grpc_client::HTTP2_KEEP_ALIVE_TIMEOUT)
2159 }
2160
2161 pub fn grpc_connect_timeout(&self) -> Duration {
2162 *self.expect_value(&grpc_client::CONNECT_TIMEOUT)
2163 }
2164
2165 pub fn cluster_multi_process_replica_az_affinity_weight(&self) -> Option<i32> {
2166 *self.expect_value(&cluster_scheduling::CLUSTER_MULTI_PROCESS_REPLICA_AZ_AFFINITY_WEIGHT)
2167 }
2168
2169 pub fn cluster_soften_replication_anti_affinity(&self) -> bool {
2170 *self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY)
2171 }
2172
2173 pub fn cluster_soften_replication_anti_affinity_weight(&self) -> i32 {
2174 *self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT)
2175 }
2176
2177 pub fn cluster_enable_topology_spread(&self) -> bool {
2178 *self.expect_value(&cluster_scheduling::CLUSTER_ENABLE_TOPOLOGY_SPREAD)
2179 }
2180
2181 pub fn cluster_topology_spread_ignore_non_singular_scale(&self) -> bool {
2182 *self.expect_value(&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE)
2183 }
2184
2185 pub fn cluster_topology_spread_max_skew(&self) -> i32 {
2186 *self.expect_value(&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MAX_SKEW)
2187 }
2188
2189 pub fn cluster_topology_spread_set_min_domains(&self) -> Option<i32> {
2190 *self.expect_value(&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MIN_DOMAINS)
2191 }
2192
2193 pub fn cluster_topology_spread_soft(&self) -> bool {
2194 *self.expect_value(&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_SOFT)
2195 }
2196
2197 pub fn cluster_soften_az_affinity(&self) -> bool {
2198 *self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY)
2199 }
2200
2201 pub fn cluster_soften_az_affinity_weight(&self) -> i32 {
2202 *self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY_WEIGHT)
2203 }
2204
2205 pub fn cluster_alter_check_ready_interval(&self) -> Duration {
2206 *self.expect_value(&cluster_scheduling::CLUSTER_ALTER_CHECK_READY_INTERVAL)
2207 }
2208
2209 pub fn cluster_check_scheduling_policies_interval(&self) -> Duration {
2210 *self.expect_value(&cluster_scheduling::CLUSTER_CHECK_SCHEDULING_POLICIES_INTERVAL)
2211 }
2212
2213 pub fn cluster_security_context_enabled(&self) -> bool {
2214 *self.expect_value(&cluster_scheduling::CLUSTER_SECURITY_CONTEXT_ENABLED)
2215 }
2216
2217 pub fn cluster_refresh_mv_compaction_estimate(&self) -> Duration {
2218 *self.expect_value(&cluster_scheduling::CLUSTER_REFRESH_MV_COMPACTION_ESTIMATE)
2219 }
2220
2221 pub fn privatelink_status_update_quota_per_minute(&self) -> u32 {
2223 *self.expect_value(&PRIVATELINK_STATUS_UPDATE_QUOTA_PER_MINUTE)
2224 }
2225
2226 pub fn statement_logging_target_data_rate(&self) -> Option<usize> {
2227 *self.expect_value(&STATEMENT_LOGGING_TARGET_DATA_RATE)
2228 }
2229
2230 pub fn statement_logging_max_data_credit(&self) -> Option<usize> {
2231 *self.expect_value(&STATEMENT_LOGGING_MAX_DATA_CREDIT)
2232 }
2233
2234 pub fn statement_logging_max_sample_rate(&self) -> Numeric {
2236 *self.expect_value(&STATEMENT_LOGGING_MAX_SAMPLE_RATE)
2237 }
2238
2239 pub fn statement_logging_default_sample_rate(&self) -> Numeric {
2241 *self.expect_value(&STATEMENT_LOGGING_DEFAULT_SAMPLE_RATE)
2242 }
2243
2244 pub fn enable_internal_statement_logging(&self) -> bool {
2246 *self.expect_value(&ENABLE_INTERNAL_STATEMENT_LOGGING)
2247 }
2248
2249 pub fn optimizer_stats_timeout(&self) -> Duration {
2251 *self.expect_value(&OPTIMIZER_STATS_TIMEOUT)
2252 }
2253
2254 pub fn optimizer_oneshot_stats_timeout(&self) -> Duration {
2256 *self.expect_value(&OPTIMIZER_ONESHOT_STATS_TIMEOUT)
2257 }
2258
2259 pub fn webhook_concurrent_request_limit(&self) -> usize {
2261 *self.expect_value(&WEBHOOK_CONCURRENT_REQUEST_LIMIT)
2262 }
2263
2264 pub fn pg_timestamp_oracle_connection_pool_max_size(&self) -> usize {
2266 *self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_SIZE)
2267 }
2268
2269 pub fn pg_timestamp_oracle_connection_pool_max_wait(&self) -> Option<Duration> {
2271 *self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_WAIT)
2272 }
2273
2274 pub fn pg_timestamp_oracle_connection_pool_ttl(&self) -> Duration {
2276 *self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL)
2277 }
2278
2279 pub fn pg_timestamp_oracle_connection_pool_ttl_stagger(&self) -> Duration {
2281 *self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL_STAGGER)
2282 }
2283
2284 pub fn user_storage_managed_collections_batch_duration(&self) -> Duration {
2286 *self.expect_value(&USER_STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION)
2287 }
2288
2289 pub fn force_source_table_syntax(&self) -> bool {
2290 *self.expect_value(&FORCE_SOURCE_TABLE_SYNTAX)
2291 }
2292
2293 pub fn optimizer_e2e_latency_warning_threshold(&self) -> Duration {
2294 *self.expect_value(&OPTIMIZER_E2E_LATENCY_WARNING_THRESHOLD)
2295 }
2296
2297 pub fn is_controller_config_var(&self, name: &str) -> bool {
2299 self.is_dyncfg_var(name)
2300 }
2301
2302 pub fn is_compute_config_var(&self, name: &str) -> bool {
2306 name == MAX_RESULT_SIZE.name() || self.is_dyncfg_var(name) || is_tracing_var(name)
2307 }
2308
2309 pub fn is_metrics_config_var(&self, name: &str) -> bool {
2311 self.is_dyncfg_var(name)
2312 }
2313
2314 pub fn is_storage_config_var(&self, name: &str) -> bool {
2316 name == PG_SOURCE_CONNECT_TIMEOUT.name()
2317 || name == PG_SOURCE_TCP_KEEPALIVES_IDLE.name()
2318 || name == PG_SOURCE_TCP_KEEPALIVES_INTERVAL.name()
2319 || name == PG_SOURCE_TCP_KEEPALIVES_RETRIES.name()
2320 || name == PG_SOURCE_TCP_USER_TIMEOUT.name()
2321 || name == PG_SOURCE_TCP_CONFIGURE_SERVER.name()
2322 || name == PG_SOURCE_SNAPSHOT_STATEMENT_TIMEOUT.name()
2323 || name == PG_SOURCE_WAL_SENDER_TIMEOUT.name()
2324 || name == PG_SOURCE_SNAPSHOT_COLLECT_STRICT_COUNT.name()
2325 || name == MYSQL_SOURCE_TCP_KEEPALIVE.name()
2326 || name == MYSQL_SOURCE_SNAPSHOT_MAX_EXECUTION_TIME.name()
2327 || name == MYSQL_SOURCE_SNAPSHOT_LOCK_WAIT_TIMEOUT.name()
2328 || name == MYSQL_SOURCE_CONNECT_TIMEOUT.name()
2329 || name == ENABLE_STORAGE_SHARD_FINALIZATION.name()
2330 || name == SSH_CHECK_INTERVAL.name()
2331 || name == SSH_CONNECT_TIMEOUT.name()
2332 || name == SSH_KEEPALIVES_IDLE.name()
2333 || name == KAFKA_SOCKET_KEEPALIVE.name()
2334 || name == KAFKA_SOCKET_TIMEOUT.name()
2335 || name == KAFKA_TRANSACTION_TIMEOUT.name()
2336 || name == KAFKA_SOCKET_CONNECTION_SETUP_TIMEOUT.name()
2337 || name == KAFKA_FETCH_METADATA_TIMEOUT.name()
2338 || name == KAFKA_PROGRESS_RECORD_FETCH_TIMEOUT.name()
2339 || name == STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES.name()
2340 || name == STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_TO_CLUSTER_SIZE_FRACTION.name()
2341 || name == STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_DISK_ONLY.name()
2342 || name == STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO.name()
2343 || name == STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS.name()
2344 || name == STORAGE_STATISTICS_INTERVAL.name()
2345 || name == STORAGE_STATISTICS_COLLECTION_INTERVAL.name()
2346 || name == USER_STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION.name()
2347 || is_upsert_rocksdb_config_var(name)
2348 || self.is_dyncfg_var(name)
2349 || is_tracing_var(name)
2350 }
2351
2352 fn is_dyncfg_var(&self, name: &str) -> bool {
2354 self.dyncfgs.entries().any(|e| name == e.name())
2355 }
2356}
2357
2358pub fn is_tracing_var(name: &str) -> bool {
2359 name == LOGGING_FILTER.name()
2360 || name == LOGGING_FILTER_DEFAULTS.name()
2361 || name == OPENTELEMETRY_FILTER.name()
2362 || name == OPENTELEMETRY_FILTER_DEFAULTS.name()
2363 || name == SENTRY_FILTERS.name()
2364}
2365
2366pub fn is_secrets_caching_var(name: &str) -> bool {
2368 name == WEBHOOKS_SECRETS_CACHING_TTL_SECS.name()
2369}
2370
2371fn is_upsert_rocksdb_config_var(name: &str) -> bool {
2372 name == upsert_rocksdb::UPSERT_ROCKSDB_COMPACTION_STYLE.name()
2373 || name == upsert_rocksdb::UPSERT_ROCKSDB_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET.name()
2374 || name == upsert_rocksdb::UPSERT_ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES.name()
2375 || name == upsert_rocksdb::UPSERT_ROCKSDB_UNIVERSAL_COMPACTION_RATIO.name()
2376 || name == upsert_rocksdb::UPSERT_ROCKSDB_PARALLELISM.name()
2377 || name == upsert_rocksdb::UPSERT_ROCKSDB_COMPRESSION_TYPE.name()
2378 || name == upsert_rocksdb::UPSERT_ROCKSDB_BOTTOMMOST_COMPRESSION_TYPE.name()
2379 || name == upsert_rocksdb::UPSERT_ROCKSDB_BATCH_SIZE.name()
2380 || name == upsert_rocksdb::UPSERT_ROCKSDB_STATS_LOG_INTERVAL_SECONDS.name()
2381 || name == upsert_rocksdb::UPSERT_ROCKSDB_STATS_PERSIST_INTERVAL_SECONDS.name()
2382 || name == upsert_rocksdb::UPSERT_ROCKSDB_POINT_LOOKUP_BLOCK_CACHE_SIZE_MB.name()
2383 || name == upsert_rocksdb::UPSERT_ROCKSDB_SHRINK_ALLOCATED_BUFFERS_BY_RATIO.name()
2384}
2385
2386pub fn is_timestamp_oracle_config_var(name: &str) -> bool {
2389 name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_SIZE.name()
2390 || name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_WAIT.name()
2391 || name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL.name()
2392 || name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL_STAGGER.name()
2393 || name == CRDB_CONNECT_TIMEOUT.name()
2394 || name == CRDB_TCP_USER_TIMEOUT.name()
2395 || name == CRDB_KEEPALIVES_IDLE.name()
2396 || name == CRDB_KEEPALIVES_INTERVAL.name()
2397 || name == CRDB_KEEPALIVES_RETRIES.name()
2398}
2399
2400pub fn is_cluster_scheduling_var(name: &str) -> bool {
2402 name == cluster_scheduling::CLUSTER_MULTI_PROCESS_REPLICA_AZ_AFFINITY_WEIGHT.name()
2403 || name == cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY.name()
2404 || name == cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT.name()
2405 || name == cluster_scheduling::CLUSTER_ENABLE_TOPOLOGY_SPREAD.name()
2406 || name == cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE.name()
2407 || name == cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MAX_SKEW.name()
2408 || name == cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_SOFT.name()
2409 || name == cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY.name()
2410 || name == cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY_WEIGHT.name()
2411}
2412
2413pub fn is_http_config_var(name: &str) -> bool {
2415 name == WEBHOOK_CONCURRENT_REQUEST_LIMIT.name()
2416}
2417
2418static SESSION_SYSTEM_VARS: LazyLock<BTreeMap<&'static UncasedStr, &'static VarDefinition>> =
2422 LazyLock::new(|| {
2423 [
2424 &APPLICATION_NAME,
2425 &CLIENT_ENCODING,
2426 &CLIENT_MIN_MESSAGES,
2427 &CLUSTER,
2428 &CLUSTER_REPLICA,
2429 &DEFAULT_CLUSTER_REPLICATION_FACTOR,
2430 &CURRENT_OBJECT_MISSING_WARNINGS,
2431 &DATABASE,
2432 &DATE_STYLE,
2433 &EXTRA_FLOAT_DIGITS,
2434 &INTEGER_DATETIMES,
2435 &INTERVAL_STYLE,
2436 &REAL_TIME_RECENCY_TIMEOUT,
2437 &SEARCH_PATH,
2438 &STANDARD_CONFORMING_STRINGS,
2439 &STATEMENT_TIMEOUT,
2440 &IDLE_IN_TRANSACTION_SESSION_TIMEOUT,
2441 &TIMEZONE,
2442 &TRANSACTION_ISOLATION,
2443 &MAX_QUERY_RESULT_SIZE,
2444 ]
2445 .into_iter()
2446 .map(|var| (UncasedStr::new(var.name()), var))
2447 .collect()
2448 });
2449
2450#[derive(Debug)]
2453pub struct FeatureFlag {
2454 pub flag: &'static VarDefinition,
2455 pub feature_desc: &'static str,
2456}
2457
2458impl FeatureFlag {
2459 pub fn require(&'static self, system_vars: &SystemVars) -> Result<(), VarError> {
2462 match *system_vars.expect_value::<bool>(self.flag) {
2463 true => Ok(()),
2464 false => Err(VarError::RequiresFeatureFlag { feature_flag: self }),
2465 }
2466 }
2467}
2468
2469impl PartialEq for FeatureFlag {
2470 fn eq(&self, other: &FeatureFlag) -> bool {
2471 self.flag.name() == other.flag.name()
2472 }
2473}
2474
2475impl Eq for FeatureFlag {}
2476
2477impl Var for MzVersion {
2478 fn name(&self) -> &'static str {
2479 MZ_VERSION_NAME.as_str()
2480 }
2481
2482 fn value(&self) -> String {
2483 self.build_info
2484 .human_version(self.helm_chart_version.clone())
2485 }
2486
2487 fn description(&self) -> &'static str {
2488 "Shows the Materialize server version (Materialize)."
2489 }
2490
2491 fn type_name(&self) -> Cow<'static, str> {
2492 String::type_name()
2493 }
2494
2495 fn visible(&self, _: &User, _: &SystemVars) -> Result<(), VarError> {
2496 Ok(())
2497 }
2498}
2499
2500impl Var for User {
2501 fn name(&self) -> &'static str {
2502 IS_SUPERUSER_NAME.as_str()
2503 }
2504
2505 fn value(&self) -> String {
2506 self.is_superuser().format()
2507 }
2508
2509 fn description(&self) -> &'static str {
2510 "Reports whether the current session is a superuser (PostgreSQL)."
2511 }
2512
2513 fn type_name(&self) -> Cow<'static, str> {
2514 bool::type_name()
2515 }
2516
2517 fn visible(&self, _: &User, _: &SystemVars) -> Result<(), VarError> {
2518 Ok(())
2519 }
2520}
2521
2522#[cfg(test)]
2523mod isolation_feature_flag_tests {
2524 use super::*;
2525
2526 #[mz_ore::test]
2527 fn gates_bounded_staleness_value() {
2528 let mut system_vars = SystemVars::new();
2529
2530 check_transaction_isolation_feature_flag(
2532 TRANSACTION_ISOLATION_VAR_NAME,
2533 VarInput::Flat("bounded staleness 5s"),
2534 &system_vars,
2535 )
2536 .expect("flag on by default");
2537
2538 system_vars
2543 .set("enable_bounded_staleness_isolation", VarInput::Flat("off"))
2544 .expect("set flag");
2545 for name in ["transaction_isolation", "TRANSACTION_ISOLATION"] {
2546 let err = check_transaction_isolation_feature_flag(
2547 name,
2548 VarInput::Flat("bounded staleness 5s"),
2549 &system_vars,
2550 )
2551 .expect_err("flag off rejects bounded staleness");
2552 assert!(matches!(err, VarError::RequiresFeatureFlag { .. }));
2553 }
2554
2555 check_transaction_isolation_feature_flag(
2557 TRANSACTION_ISOLATION_VAR_NAME,
2558 VarInput::Flat("serializable"),
2559 &system_vars,
2560 )
2561 .expect("serializable always allowed");
2562
2563 check_transaction_isolation_feature_flag(
2565 CLUSTER.name(),
2566 VarInput::Flat("bounded staleness 5s"),
2567 &system_vars,
2568 )
2569 .expect("unrelated var ignored");
2570 }
2571}