1use std::borrow::Cow;
67use std::clone::Clone;
68use std::collections::BTreeMap;
69use std::fmt::Debug;
70use std::net::IpAddr;
71use std::num::NonZeroU32;
72use std::string::ToString;
73use std::sync::{Arc, LazyLock};
74use std::time::Duration;
75
76use chrono::{DateTime, Utc};
77use derivative::Derivative;
78use imbl::OrdMap;
79use mz_build_info::BuildInfo;
80use mz_dyncfg::{ConfigSet, ConfigType, ConfigUpdates, ConfigVal};
81use mz_persist_client::cfg::{
82 CRDB_CONNECT_TIMEOUT, CRDB_KEEPALIVES_IDLE, CRDB_KEEPALIVES_INTERVAL, CRDB_KEEPALIVES_RETRIES,
83 CRDB_TCP_USER_TIMEOUT,
84};
85use mz_repr::adt::numeric::Numeric;
86use mz_repr::adt::timestamp::CheckedTimestamp;
87use mz_repr::bytes::ByteSize;
88use mz_repr::user::{ExternalUserMetadata, InternalUserMetadata};
89use mz_tracing::{CloneableEnvFilter, SerializableDirective};
90use serde::Serialize;
91use thiserror::Error;
92use uncased::UncasedStr;
93
94use crate::ast::Ident;
95use crate::session::user::User;
96
97pub(crate) mod constraints;
98pub(crate) mod definitions;
99pub(crate) mod errors;
100pub(crate) mod polyfill;
101pub(crate) mod value;
102
103pub use definitions::*;
104pub use errors::*;
105pub use value::*;
106
107#[derive(Debug, Clone, Copy, PartialEq, Eq)]
112pub enum EndTransactionAction {
113 Commit,
115 Rollback,
117}
118
119#[derive(Debug, Clone, Copy)]
125pub enum VarInput<'a> {
126 Flat(&'a str),
128 SqlSet(&'a [String]),
131}
132
133impl<'a> VarInput<'a> {
134 pub fn to_vec(&self) -> Vec<String> {
136 match self {
137 VarInput::Flat(v) => vec![v.to_string()],
138 VarInput::SqlSet(values) => values.into_iter().map(|v| v.to_string()).collect(),
139 }
140 }
141}
142
143#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize)]
145pub enum OwnedVarInput {
146 Flat(String),
148 SqlSet(Vec<String>),
150}
151
152impl OwnedVarInput {
153 pub fn borrow(&self) -> VarInput<'_> {
155 match self {
156 OwnedVarInput::Flat(v) => VarInput::Flat(v),
157 OwnedVarInput::SqlSet(v) => VarInput::SqlSet(v),
158 }
159 }
160}
161
162pub trait Var: Debug {
164 fn name(&self) -> &'static str;
166
167 fn value(&self) -> String;
173
174 fn description(&self) -> &'static str;
177
178 fn type_name(&self) -> Cow<'static, str>;
180
181 fn visible(&self, user: &User, system_vars: &SystemVars) -> Result<(), VarError>;
186
187 fn is_unsafe(&self) -> bool {
189 self.name().starts_with("unsafe_")
190 }
191
192 fn as_var(&self) -> &dyn Var
195 where
196 Self: Sized,
197 {
198 self
199 }
200}
201
202#[derive(Debug)]
209pub struct SessionVar {
210 definition: VarDefinition,
211 default_value: Option<Box<dyn Value>>,
213 local_value: Option<Box<dyn Value>>,
215 staged_value: Option<Box<dyn Value>>,
217 session_value: Option<Box<dyn Value>>,
219}
220
221impl Clone for SessionVar {
222 fn clone(&self) -> Self {
223 SessionVar {
224 definition: self.definition.clone(),
225 default_value: self.default_value.as_ref().map(|v| v.box_clone()),
226 local_value: self.local_value.as_ref().map(|v| v.box_clone()),
227 staged_value: self.staged_value.as_ref().map(|v| v.box_clone()),
228 session_value: self.session_value.as_ref().map(|v| v.box_clone()),
229 }
230 }
231}
232
233impl SessionVar {
234 pub const fn new(var: VarDefinition) -> Self {
235 SessionVar {
236 definition: var,
237 default_value: None,
238 local_value: None,
239 staged_value: None,
240 session_value: None,
241 }
242 }
243
244 pub fn check(&self, input: VarInput) -> Result<String, VarError> {
247 let v = self.definition.parse(input)?;
248 self.validate_constraints(v.as_ref())?;
249
250 Ok(v.format())
251 }
252
253 pub fn set(&mut self, input: VarInput, local: bool) -> Result<(), VarError> {
255 let v = self.definition.parse(input)?;
256
257 self.validate_constraints(v.as_ref())?;
259
260 if local {
261 self.local_value = Some(v);
262 } else {
263 self.local_value = None;
264 self.staged_value = Some(v);
265 }
266 Ok(())
267 }
268
269 pub fn set_default(&mut self, input: VarInput) -> Result<(), VarError> {
271 let v = self.definition.parse(input)?;
272 self.validate_constraints(v.as_ref())?;
273 self.default_value = Some(v);
274 Ok(())
275 }
276
277 pub fn reset(&mut self, local: bool) {
279 let value = self
280 .default_value
281 .as_ref()
282 .map(|v| v.as_ref())
283 .unwrap_or_else(|| self.definition.value.value());
284 if local {
285 self.local_value = Some(value.box_clone());
286 } else {
287 self.local_value = None;
288 self.staged_value = Some(value.box_clone());
289 }
290 }
291
292 #[must_use]
294 pub fn end_transaction(&self, action: EndTransactionAction) -> Option<Self> {
295 if !self.is_mutating() {
296 return None;
297 }
298 let mut next: Self = self.clone();
299 next.local_value = None;
300 match action {
301 EndTransactionAction::Commit if next.staged_value.is_some() => {
302 next.session_value = next.staged_value.take()
303 }
304 _ => next.staged_value = None,
305 }
306 Some(next)
307 }
308
309 pub fn is_mutating(&self) -> bool {
311 self.local_value.is_some() || self.staged_value.is_some()
312 }
313
314 pub fn value_dyn(&self) -> &dyn Value {
315 self.local_value
316 .as_deref()
317 .or(self.staged_value.as_deref())
318 .or(self.session_value.as_deref())
319 .or(self.default_value.as_deref())
320 .unwrap_or_else(|| self.definition.value.value())
321 }
322
323 pub fn inspect_session_value(&self) -> Option<&dyn Value> {
328 self.session_value.as_deref()
329 }
330
331 fn validate_constraints(&self, val: &dyn Value) -> Result<(), VarError> {
332 if let Some(constraint) = &self.definition.constraint {
333 constraint.check_constraint(self, self.value_dyn(), val)
334 } else {
335 Ok(())
336 }
337 }
338}
339
340impl Var for SessionVar {
341 fn name(&self) -> &'static str {
342 self.definition.name.as_str()
343 }
344
345 fn value(&self) -> String {
346 self.value_dyn().format()
347 }
348
349 fn description(&self) -> &'static str {
350 self.definition.description
351 }
352
353 fn type_name(&self) -> Cow<'static, str> {
354 self.definition.type_name()
355 }
356
357 fn visible(
358 &self,
359 user: &User,
360 system_vars: &super::vars::SystemVars,
361 ) -> Result<(), super::vars::VarError> {
362 self.definition.visible(user, system_vars)
363 }
364}
365
366#[derive(Debug, Clone, PartialEq, Eq)]
367pub struct MzVersion {
368 build_info: &'static BuildInfo,
370 helm_chart_version: Option<String>,
372}
373
374impl MzVersion {
375 pub fn new(build_info: &'static BuildInfo, helm_chart_version: Option<String>) -> Self {
376 MzVersion {
377 build_info,
378 helm_chart_version,
379 }
380 }
381}
382
383#[derive(Debug, Clone)]
388pub struct SessionVars {
389 vars: OrdMap<&'static UncasedStr, SessionVar>,
391 mz_version: MzVersion,
393 user: User,
395}
396
397impl SessionVars {
398 pub fn new_unchecked(
400 build_info: &'static BuildInfo,
401 user: User,
402 helm_chart_version: Option<String>,
403 ) -> SessionVars {
404 use definitions::*;
405
406 let vars = [
407 &FAILPOINTS,
408 &SERVER_VERSION,
409 &SERVER_VERSION_NUM,
410 &SQL_SAFE_UPDATES,
411 &REAL_TIME_RECENCY,
412 &EMIT_PLAN_INSIGHTS_NOTICE,
413 &EMIT_TIMESTAMP_NOTICE,
414 &EMIT_TRACE_ID_NOTICE,
415 &AUTO_ROUTE_CATALOG_QUERIES,
416 &ENABLE_SESSION_RBAC_CHECKS,
417 &ENABLE_SESSION_CARDINALITY_ESTIMATES,
418 &MAX_IDENTIFIER_LENGTH,
419 &STATEMENT_LOGGING_SAMPLE_RATE,
420 &EMIT_INTROSPECTION_QUERY_NOTICE,
421 &UNSAFE_NEW_TRANSACTION_WALL_TIME,
422 &WELCOME_MESSAGE,
423 ]
424 .into_iter()
425 .chain(SESSION_SYSTEM_VARS.iter().map(|(_name, var)| *var))
426 .map(|var| (var.name, SessionVar::new(var.clone())))
427 .collect();
428
429 SessionVars {
430 vars,
431 mz_version: MzVersion::new(build_info, helm_chart_version),
432 user,
433 }
434 }
435
436 fn expect_value<V: Value>(&self, var: &VarDefinition) -> &V {
437 let var = self
438 .vars
439 .get(var.name)
440 .expect("provided var should be in state");
441 let val = var.value_dyn();
442 val.as_any().downcast_ref::<V>().expect("success")
443 }
444
445 pub fn iter(&self) -> impl Iterator<Item = &dyn Var> {
452 #[allow(clippy::as_conversions)]
453 self.vars
454 .values()
455 .map(|v| v.as_var())
456 .chain([&self.mz_version as &dyn Var, &self.user])
457 }
458
459 pub fn notify_set(&self) -> impl Iterator<Item = &dyn Var> {
463 [
470 &APPLICATION_NAME,
471 &CLIENT_ENCODING,
472 &DATE_STYLE,
473 &INTEGER_DATETIMES,
474 &SERVER_VERSION,
475 &STANDARD_CONFORMING_STRINGS,
476 &TIMEZONE,
477 &INTERVAL_STYLE,
478 &CLUSTER,
483 &CLUSTER_REPLICA,
484 &DEFAULT_CLUSTER_REPLICATION_FACTOR,
485 &DATABASE,
486 &SEARCH_PATH,
487 ]
488 .into_iter()
489 .map(|v| self.vars[v.name].as_var())
490 .chain(std::iter::once(self.mz_version.as_var()))
497 }
498
499 pub fn reset_all(&mut self) {
501 let names: Vec<_> = self.vars.keys().copied().collect();
502 for name in names {
503 self.vars[name].reset(false);
504 }
505 }
506
507 pub fn get(&self, system_vars: &SystemVars, name: &str) -> Result<&dyn Var, VarError> {
518 let name = compat_translate_name(name);
519
520 let name = UncasedStr::new(name);
521 if name == MZ_VERSION_NAME {
522 Ok(&self.mz_version)
523 } else if name == IS_SUPERUSER_NAME {
524 Ok(&self.user)
525 } else {
526 self.vars
527 .get(name)
528 .map(|v| {
529 v.visible(&self.user, system_vars)?;
530 Ok(v.as_var())
531 })
532 .transpose()?
533 .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
534 }
535 }
536
537 pub fn inspect(&self, name: &str) -> Result<&SessionVar, VarError> {
542 let name = compat_translate_name(name);
543
544 self.vars
545 .get(UncasedStr::new(name))
546 .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
547 }
548
549 pub fn set(
562 &mut self,
563 system_vars: &SystemVars,
564 name: &str,
565 input: VarInput,
566 local: bool,
567 ) -> Result<(), VarError> {
568 let (name, input) = compat_translate(name, input);
569
570 let name = UncasedStr::new(name);
571 self.check_read_only(name)?;
572
573 self.vars
574 .get_mut(name)
575 .map(|v| {
576 v.visible(&self.user, system_vars)?;
577 v.set(input, local)
578 })
579 .transpose()?
580 .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
581 }
582
583 pub fn set_default(&mut self, name: &str, input: VarInput) -> Result<(), VarError> {
586 let (name, input) = compat_translate(name, input);
587
588 let name = UncasedStr::new(name);
589 self.check_read_only(name)?;
590
591 self.vars
592 .get_mut(name)
593 .map(|v| v.set_default(input))
595 .transpose()?
596 .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
597 }
598
599 pub fn reset(
613 &mut self,
614 system_vars: &SystemVars,
615 name: &str,
616 local: bool,
617 ) -> Result<(), VarError> {
618 let name = compat_translate_name(name);
619
620 let name = UncasedStr::new(name);
621 self.check_read_only(name)?;
622
623 self.vars
624 .get_mut(name)
625 .map(|v| {
626 v.visible(&self.user, system_vars)?;
627 v.reset(local);
628 Ok(())
629 })
630 .transpose()?
631 .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
632 }
633
634 fn check_read_only(&self, name: &UncasedStr) -> Result<(), VarError> {
636 if name == MZ_VERSION_NAME {
637 Err(VarError::ReadOnlyParameter(MZ_VERSION_NAME.as_str()))
638 } else if name == IS_SUPERUSER_NAME {
639 Err(VarError::ReadOnlyParameter(IS_SUPERUSER_NAME.as_str()))
640 } else if name == MAX_IDENTIFIER_LENGTH.name {
641 Err(VarError::ReadOnlyParameter(
642 MAX_IDENTIFIER_LENGTH.name.as_str(),
643 ))
644 } else {
645 Ok(())
646 }
647 }
648
649 #[mz_ore::instrument(level = "debug")]
654 pub fn end_transaction(
655 &mut self,
656 action: EndTransactionAction,
657 ) -> BTreeMap<&'static str, String> {
658 let mut changed = BTreeMap::new();
659 let mut updates = Vec::new();
660 for (name, var) in self.vars.iter() {
661 if !var.is_mutating() {
662 continue;
663 }
664 let before = var.value();
665 let next = var.end_transaction(action).expect("must mutate");
666 let after = next.value();
667 updates.push((*name, next));
668
669 if before != after {
671 changed.insert(var.name(), after);
672 }
673 }
674 self.vars.extend(updates);
675 changed
676 }
677
678 pub fn application_name(&self) -> &str {
680 self.expect_value::<String>(&APPLICATION_NAME).as_str()
681 }
682
683 pub fn build_info(&self) -> &'static BuildInfo {
685 self.mz_version.build_info
686 }
687
688 pub fn client_encoding(&self) -> &ClientEncoding {
690 self.expect_value(&CLIENT_ENCODING)
691 }
692
693 pub fn client_min_messages(&self) -> &ClientSeverity {
695 self.expect_value(&CLIENT_MIN_MESSAGES)
696 }
697
698 pub fn cluster(&self) -> &str {
700 self.expect_value::<String>(&CLUSTER).as_str()
701 }
702
703 pub fn cluster_replica(&self) -> Option<&str> {
705 self.expect_value::<Option<String>>(&CLUSTER_REPLICA)
706 .as_deref()
707 }
708
709 pub fn current_object_missing_warnings(&self) -> bool {
712 *self.expect_value::<bool>(&CURRENT_OBJECT_MISSING_WARNINGS)
713 }
714
715 pub fn date_style(&self) -> &[&str] {
717 &self.expect_value::<DateStyle>(&DATE_STYLE).0
718 }
719
720 pub fn database(&self) -> &str {
722 self.expect_value::<String>(&DATABASE).as_str()
723 }
724
725 pub fn extra_float_digits(&self) -> i32 {
727 *self.expect_value(&EXTRA_FLOAT_DIGITS)
728 }
729
730 pub fn integer_datetimes(&self) -> bool {
732 *self.expect_value(&INTEGER_DATETIMES)
733 }
734
735 pub fn intervalstyle(&self) -> &IntervalStyle {
737 self.expect_value(&INTERVAL_STYLE)
738 }
739
740 pub fn mz_version(&self) -> String {
742 self.mz_version.value()
743 }
744
745 pub fn search_path(&self) -> &[Ident] {
747 self.expect_value::<Vec<Ident>>(&SEARCH_PATH).as_slice()
748 }
749
750 pub fn server_version(&self) -> &str {
752 self.expect_value::<String>(&SERVER_VERSION).as_str()
753 }
754
755 pub fn server_version_num(&self) -> i32 {
757 *self.expect_value(&SERVER_VERSION_NUM)
758 }
759
760 pub fn sql_safe_updates(&self) -> bool {
762 *self.expect_value(&SQL_SAFE_UPDATES)
763 }
764
765 pub fn standard_conforming_strings(&self) -> bool {
768 *self.expect_value(&STANDARD_CONFORMING_STRINGS)
769 }
770
771 pub fn statement_timeout(&self) -> &Duration {
773 self.expect_value(&STATEMENT_TIMEOUT)
774 }
775
776 pub fn idle_in_transaction_session_timeout(&self) -> &Duration {
778 self.expect_value(&IDLE_IN_TRANSACTION_SESSION_TIMEOUT)
779 }
780
781 pub fn timezone(&self) -> &TimeZone {
783 self.expect_value(&TIMEZONE)
784 }
785
786 pub fn transaction_isolation(&self) -> &IsolationLevel {
789 self.expect_value(&TRANSACTION_ISOLATION)
790 }
791
792 pub fn real_time_recency(&self) -> bool {
794 *self.expect_value(&REAL_TIME_RECENCY)
795 }
796
797 pub fn real_time_recency_timeout(&self) -> &Duration {
799 self.expect_value(&REAL_TIME_RECENCY_TIMEOUT)
800 }
801
802 pub fn emit_plan_insights_notice(&self) -> bool {
804 *self.expect_value(&EMIT_PLAN_INSIGHTS_NOTICE)
805 }
806
807 pub fn emit_timestamp_notice(&self) -> bool {
809 *self.expect_value(&EMIT_TIMESTAMP_NOTICE)
810 }
811
812 pub fn emit_trace_id_notice(&self) -> bool {
814 *self.expect_value(&EMIT_TRACE_ID_NOTICE)
815 }
816
817 pub fn auto_route_catalog_queries(&self) -> bool {
819 *self.expect_value(&AUTO_ROUTE_CATALOG_QUERIES)
820 }
821
822 pub fn enable_session_rbac_checks(&self) -> bool {
824 *self.expect_value(&ENABLE_SESSION_RBAC_CHECKS)
825 }
826
827 pub fn enable_session_cardinality_estimates(&self) -> bool {
829 *self.expect_value(&ENABLE_SESSION_CARDINALITY_ESTIMATES)
830 }
831
832 pub fn is_superuser(&self) -> bool {
834 self.user.is_superuser()
835 }
836
837 pub fn user(&self) -> &User {
839 &self.user
840 }
841
842 pub fn max_query_result_size(&self) -> u64 {
844 self.expect_value::<ByteSize>(&MAX_QUERY_RESULT_SIZE)
845 .as_bytes()
846 }
847
848 pub fn set_internal_user_metadata(&mut self, metadata: InternalUserMetadata) {
850 self.user.internal_metadata = Some(metadata);
851 }
852
853 pub fn set_external_user_metadata(&mut self, metadata: ExternalUserMetadata) {
855 self.user.external_metadata = Some(metadata);
856 }
857
858 pub fn set_cluster(&mut self, cluster: String) {
859 let var = self
860 .vars
861 .get_mut(UncasedStr::new(CLUSTER.name()))
862 .expect("cluster variable must exist");
863 var.set(VarInput::Flat(&cluster), false)
864 .expect("setting cluster must succeed");
865 }
866
867 pub fn set_local_transaction_isolation(&mut self, transaction_isolation: IsolationLevel) {
868 let var = self
869 .vars
870 .get_mut(UncasedStr::new(TRANSACTION_ISOLATION.name()))
871 .expect("transaction_isolation variable must exist");
872 var.set(VarInput::Flat(transaction_isolation.as_str()), true)
873 .expect("setting transaction isolation must succeed");
874 }
875
876 pub fn get_statement_logging_sample_rate(&self) -> Numeric {
877 *self.expect_value(&STATEMENT_LOGGING_SAMPLE_RATE)
878 }
879
880 pub fn emit_introspection_query_notice(&self) -> bool {
882 *self.expect_value(&EMIT_INTROSPECTION_QUERY_NOTICE)
883 }
884
885 pub fn unsafe_new_transaction_wall_time(&self) -> Option<CheckedTimestamp<DateTime<Utc>>> {
886 *self.expect_value(&UNSAFE_NEW_TRANSACTION_WALL_TIME)
887 }
888
889 pub fn welcome_message(&self) -> bool {
891 *self.expect_value(&WELCOME_MESSAGE)
892 }
893}
894
895pub const OLD_CATALOG_SERVER_CLUSTER: &str = "mz_introspection";
897pub const OLD_AUTO_ROUTE_CATALOG_QUERIES: &str = "auto_route_introspection_queries";
898
899fn compat_translate<'a, 'b>(name: &'a str, input: VarInput<'b>) -> (&'a str, VarInput<'b>) {
907 if name == CLUSTER.name() {
908 if let Ok(value) = CLUSTER.parse(input) {
909 if value.format() == OLD_CATALOG_SERVER_CLUSTER {
910 tracing::debug!(
911 github_27285 = true,
912 "encountered deprecated `cluster` variable value: {}",
913 OLD_CATALOG_SERVER_CLUSTER,
914 );
915 return (name, VarInput::Flat("mz_catalog_server"));
916 }
917 }
918 }
919
920 if name == OLD_AUTO_ROUTE_CATALOG_QUERIES {
921 tracing::debug!(
922 github_27285 = true,
923 "encountered deprecated `{}` variable name",
924 OLD_AUTO_ROUTE_CATALOG_QUERIES,
925 );
926 return (AUTO_ROUTE_CATALOG_QUERIES.name(), input);
927 }
928
929 (name, input)
930}
931
932fn compat_translate_name(name: &str) -> &str {
933 let (name, _) = compat_translate(name, VarInput::Flat(""));
934 name
935}
936
937#[derive(Debug)]
940pub struct SystemVar {
941 definition: VarDefinition,
942 persisted_value: Option<Box<dyn Value>>,
944 dynamic_default: Option<Box<dyn Value>>,
946}
947
948impl Clone for SystemVar {
949 fn clone(&self) -> Self {
950 SystemVar {
951 definition: self.definition.clone(),
952 persisted_value: self.persisted_value.as_ref().map(|v| v.box_clone()),
953 dynamic_default: self.dynamic_default.as_ref().map(|v| v.box_clone()),
954 }
955 }
956}
957
958impl SystemVar {
959 pub fn new(definition: VarDefinition) -> Self {
960 SystemVar {
961 definition,
962 persisted_value: None,
963 dynamic_default: None,
964 }
965 }
966
967 fn is_default(&self, input: VarInput) -> Result<bool, VarError> {
968 let v = self.definition.parse(input)?;
969 Ok(self.definition.default_value() == v.as_ref())
970 }
971
972 pub fn value_dyn(&self) -> &dyn Value {
973 self.persisted_value
974 .as_deref()
975 .or(self.dynamic_default.as_deref())
976 .unwrap_or_else(|| self.definition.default_value())
977 }
978
979 pub fn value<V: 'static>(&self) -> &V {
980 let val = self.value_dyn();
981 val.as_any().downcast_ref::<V>().expect("success")
982 }
983
984 fn parse(&self, input: VarInput) -> Result<Box<dyn Value>, VarError> {
985 let v = self.definition.parse(input)?;
986 self.validate_constraints(v.as_ref())?;
988 Ok(v)
989 }
990
991 fn set(&mut self, input: VarInput) -> Result<bool, VarError> {
992 let v = self.parse(input)?;
993
994 if self.persisted_value.as_ref() != Some(&v) {
995 self.persisted_value = Some(v);
996 Ok(true)
997 } else {
998 Ok(false)
999 }
1000 }
1001
1002 fn reset(&mut self) -> bool {
1003 if self.persisted_value.is_some() {
1004 self.persisted_value = None;
1005 true
1006 } else {
1007 false
1008 }
1009 }
1010
1011 fn set_default(&mut self, input: VarInput) -> Result<(), VarError> {
1012 let v = self.definition.parse(input)?;
1013 self.dynamic_default = Some(v);
1014 Ok(())
1015 }
1016
1017 fn validate_constraints(&self, val: &dyn Value) -> Result<(), VarError> {
1018 if let Some(constraint) = &self.definition.constraint {
1019 constraint.check_constraint(self, self.value_dyn(), val)
1020 } else {
1021 Ok(())
1022 }
1023 }
1024}
1025
1026impl Var for SystemVar {
1027 fn name(&self) -> &'static str {
1028 self.definition.name.as_str()
1029 }
1030
1031 fn value(&self) -> String {
1032 self.value_dyn().format()
1033 }
1034
1035 fn description(&self) -> &'static str {
1036 self.definition.description
1037 }
1038
1039 fn type_name(&self) -> Cow<'static, str> {
1040 self.definition.type_name()
1041 }
1042
1043 fn visible(&self, user: &User, system_vars: &SystemVars) -> Result<(), VarError> {
1044 self.definition.visible(user, system_vars)
1045 }
1046}
1047
1048#[derive(Debug, Error)]
1049pub enum NetworkPolicyError {
1050 #[error("Access denied for address {0}")]
1051 AddressDenied(IpAddr),
1052}
1053
1054#[derive(Derivative, Clone)]
1059#[derivative(Debug)]
1060pub struct SystemVars {
1061 allow_unsafe: bool,
1063 vars: BTreeMap<&'static UncasedStr, SystemVar>,
1065 #[derivative(Debug = "ignore")]
1067 callbacks: BTreeMap<String, Vec<Arc<dyn Fn(&SystemVars) + Send + Sync>>>,
1068
1069 dyncfgs: ConfigSet,
1073}
1074
1075impl Default for SystemVars {
1076 fn default() -> Self {
1077 Self::new()
1078 }
1079}
1080
1081impl SystemVars {
1082 pub fn new() -> Self {
1083 let system_vars = vec![
1084 &MAX_KAFKA_CONNECTIONS,
1085 &MAX_POSTGRES_CONNECTIONS,
1086 &MAX_MYSQL_CONNECTIONS,
1087 &MAX_SQL_SERVER_CONNECTIONS,
1088 &MAX_AWS_PRIVATELINK_CONNECTIONS,
1089 &MAX_TABLES,
1090 &MAX_SOURCES,
1091 &MAX_SINKS,
1092 &MAX_MATERIALIZED_VIEWS,
1093 &MAX_CLUSTERS,
1094 &MAX_REPLICAS_PER_CLUSTER,
1095 &MAX_CREDIT_CONSUMPTION_RATE,
1096 &MAX_DATABASES,
1097 &MAX_SCHEMAS_PER_DATABASE,
1098 &MAX_OBJECTS_PER_SCHEMA,
1099 &MAX_SECRETS,
1100 &MAX_ROLES,
1101 &MAX_CONTINUAL_TASKS,
1102 &MAX_NETWORK_POLICIES,
1103 &MAX_RULES_PER_NETWORK_POLICY,
1104 &MAX_RESULT_SIZE,
1105 &MAX_COPY_FROM_ROW_SIZE,
1106 &ALLOWED_CLUSTER_REPLICA_SIZES,
1107 &upsert_rocksdb::UPSERT_ROCKSDB_COMPACTION_STYLE,
1108 &upsert_rocksdb::UPSERT_ROCKSDB_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET,
1109 &upsert_rocksdb::UPSERT_ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES,
1110 &upsert_rocksdb::UPSERT_ROCKSDB_UNIVERSAL_COMPACTION_RATIO,
1111 &upsert_rocksdb::UPSERT_ROCKSDB_PARALLELISM,
1112 &upsert_rocksdb::UPSERT_ROCKSDB_COMPRESSION_TYPE,
1113 &upsert_rocksdb::UPSERT_ROCKSDB_BOTTOMMOST_COMPRESSION_TYPE,
1114 &upsert_rocksdb::UPSERT_ROCKSDB_BATCH_SIZE,
1115 &upsert_rocksdb::UPSERT_ROCKSDB_RETRY_DURATION,
1116 &upsert_rocksdb::UPSERT_ROCKSDB_STATS_LOG_INTERVAL_SECONDS,
1117 &upsert_rocksdb::UPSERT_ROCKSDB_STATS_PERSIST_INTERVAL_SECONDS,
1118 &upsert_rocksdb::UPSERT_ROCKSDB_POINT_LOOKUP_BLOCK_CACHE_SIZE_MB,
1119 &upsert_rocksdb::UPSERT_ROCKSDB_SHRINK_ALLOCATED_BUFFERS_BY_RATIO,
1120 &upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_CLUSTER_MEMORY_FRACTION,
1121 &upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_MEMORY_BYTES,
1122 &upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_ALLOW_STALL,
1123 &STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES,
1124 &STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_TO_CLUSTER_SIZE_FRACTION,
1125 &STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_DISK_ONLY,
1126 &STORAGE_STATISTICS_INTERVAL,
1127 &STORAGE_STATISTICS_COLLECTION_INTERVAL,
1128 &STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO,
1129 &STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS,
1130 &PERSIST_FAST_PATH_LIMIT,
1131 &METRICS_RETENTION,
1132 &UNSAFE_MOCK_AUDIT_EVENT_TIMESTAMP,
1133 &ENABLE_RBAC_CHECKS,
1134 &PG_SOURCE_CONNECT_TIMEOUT,
1135 &PG_SOURCE_TCP_KEEPALIVES_IDLE,
1136 &PG_SOURCE_TCP_KEEPALIVES_INTERVAL,
1137 &PG_SOURCE_TCP_KEEPALIVES_RETRIES,
1138 &PG_SOURCE_TCP_USER_TIMEOUT,
1139 &PG_SOURCE_TCP_CONFIGURE_SERVER,
1140 &PG_SOURCE_SNAPSHOT_STATEMENT_TIMEOUT,
1141 &PG_SOURCE_WAL_SENDER_TIMEOUT,
1142 &PG_SOURCE_SNAPSHOT_COLLECT_STRICT_COUNT,
1143 &MYSQL_SOURCE_TCP_KEEPALIVE,
1144 &MYSQL_SOURCE_SNAPSHOT_MAX_EXECUTION_TIME,
1145 &MYSQL_SOURCE_SNAPSHOT_LOCK_WAIT_TIMEOUT,
1146 &MYSQL_SOURCE_CONNECT_TIMEOUT,
1147 &SSH_CHECK_INTERVAL,
1148 &SSH_CONNECT_TIMEOUT,
1149 &SSH_KEEPALIVES_IDLE,
1150 &KAFKA_SOCKET_KEEPALIVE,
1151 &KAFKA_SOCKET_TIMEOUT,
1152 &KAFKA_TRANSACTION_TIMEOUT,
1153 &KAFKA_SOCKET_CONNECTION_SETUP_TIMEOUT,
1154 &KAFKA_FETCH_METADATA_TIMEOUT,
1155 &KAFKA_PROGRESS_RECORD_FETCH_TIMEOUT,
1156 &ENABLE_LAUNCHDARKLY,
1157 &MAX_CONNECTIONS,
1158 &NETWORK_POLICY,
1159 &SUPERUSER_RESERVED_CONNECTIONS,
1160 &KEEP_N_SOURCE_STATUS_HISTORY_ENTRIES,
1161 &KEEP_N_SINK_STATUS_HISTORY_ENTRIES,
1162 &KEEP_N_PRIVATELINK_STATUS_HISTORY_ENTRIES,
1163 &REPLICA_STATUS_HISTORY_RETENTION_WINDOW,
1164 &ENABLE_STORAGE_SHARD_FINALIZATION,
1165 &ENABLE_CONSOLIDATE_AFTER_UNION_NEGATE,
1166 &ENABLE_DEFAULT_CONNECTION_VALIDATION,
1167 &DEFAULT_TIMESTAMP_INTERVAL,
1168 &MIN_TIMESTAMP_INTERVAL,
1169 &MAX_TIMESTAMP_INTERVAL,
1170 &LOGGING_FILTER,
1171 &OPENTELEMETRY_FILTER,
1172 &LOGGING_FILTER_DEFAULTS,
1173 &OPENTELEMETRY_FILTER_DEFAULTS,
1174 &SENTRY_FILTERS,
1175 &WEBHOOKS_SECRETS_CACHING_TTL_SECS,
1176 &COORD_SLOW_MESSAGE_WARN_THRESHOLD,
1177 &grpc_client::CONNECT_TIMEOUT,
1178 &grpc_client::HTTP2_KEEP_ALIVE_INTERVAL,
1179 &grpc_client::HTTP2_KEEP_ALIVE_TIMEOUT,
1180 &cluster_scheduling::CLUSTER_MULTI_PROCESS_REPLICA_AZ_AFFINITY_WEIGHT,
1181 &cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY,
1182 &cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT,
1183 &cluster_scheduling::CLUSTER_ENABLE_TOPOLOGY_SPREAD,
1184 &cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE,
1185 &cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MAX_SKEW,
1186 &cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MIN_DOMAINS,
1187 &cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_SOFT,
1188 &cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY,
1189 &cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY_WEIGHT,
1190 &cluster_scheduling::CLUSTER_ALTER_CHECK_READY_INTERVAL,
1191 &cluster_scheduling::CLUSTER_CHECK_SCHEDULING_POLICIES_INTERVAL,
1192 &cluster_scheduling::CLUSTER_SECURITY_CONTEXT_ENABLED,
1193 &cluster_scheduling::CLUSTER_REFRESH_MV_COMPACTION_ESTIMATE,
1194 &grpc_client::HTTP2_KEEP_ALIVE_TIMEOUT,
1195 &STATEMENT_LOGGING_MAX_SAMPLE_RATE,
1196 &STATEMENT_LOGGING_DEFAULT_SAMPLE_RATE,
1197 &STATEMENT_LOGGING_TARGET_DATA_RATE,
1198 &STATEMENT_LOGGING_MAX_DATA_CREDIT,
1199 &ENABLE_INTERNAL_STATEMENT_LOGGING,
1200 &OPTIMIZER_STATS_TIMEOUT,
1201 &OPTIMIZER_ONESHOT_STATS_TIMEOUT,
1202 &PRIVATELINK_STATUS_UPDATE_QUOTA_PER_MINUTE,
1203 &WEBHOOK_CONCURRENT_REQUEST_LIMIT,
1204 &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_SIZE,
1205 &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_WAIT,
1206 &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL,
1207 &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL_STAGGER,
1208 &USER_STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION,
1209 &FORCE_SOURCE_TABLE_SYNTAX,
1210 &OPTIMIZER_E2E_LATENCY_WARNING_THRESHOLD,
1211 &SCRAM_ITERATIONS,
1212 ];
1213
1214 let dyncfgs = mz_dyncfgs::all_dyncfgs();
1215 let dyncfg_vars: Vec<_> = dyncfgs
1216 .entries()
1217 .map(|cfg| match cfg.default() {
1218 ConfigVal::Bool(default) => {
1219 VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
1220 }
1221 ConfigVal::U32(default) => {
1222 VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
1223 }
1224 ConfigVal::Usize(default) => {
1225 VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
1226 }
1227 ConfigVal::OptUsize(default) => {
1228 VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
1229 }
1230 ConfigVal::F64(default) => {
1231 VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
1232 }
1233 ConfigVal::String(default) => {
1234 VarDefinition::new_runtime(cfg.name(), default.clone(), cfg.desc(), false)
1235 }
1236 ConfigVal::OptString(default) => {
1237 VarDefinition::new_runtime(cfg.name(), default.clone(), cfg.desc(), false)
1238 }
1239 ConfigVal::Duration(default) => {
1240 VarDefinition::new_runtime(cfg.name(), default.clone(), cfg.desc(), false)
1241 }
1242 ConfigVal::Json(default) => {
1243 VarDefinition::new_runtime(cfg.name(), default.clone(), cfg.desc(), false)
1244 }
1245 })
1246 .collect();
1247
1248 let vars: BTreeMap<_, _> = system_vars
1249 .into_iter()
1250 .chain(definitions::FEATURE_FLAGS.iter().copied())
1252 .chain(SESSION_SYSTEM_VARS.values().copied())
1254 .cloned()
1255 .chain(dyncfg_vars)
1257 .map(|var| (var.name, SystemVar::new(var)))
1258 .collect();
1259
1260 let vars = SystemVars {
1261 vars,
1262 callbacks: BTreeMap::new(),
1263 allow_unsafe: false,
1264 dyncfgs,
1265 };
1266
1267 vars
1268 }
1269
1270 pub fn dyncfgs(&self) -> &ConfigSet {
1271 &self.dyncfgs
1272 }
1273
1274 pub fn set_unsafe(mut self, allow_unsafe: bool) -> Self {
1275 self.allow_unsafe = allow_unsafe;
1276 self
1277 }
1278
1279 pub fn allow_unsafe(&self) -> bool {
1280 self.allow_unsafe
1281 }
1282
1283 fn expect_value<V: 'static>(&self, var: &VarDefinition) -> &V {
1284 let val = self
1285 .vars
1286 .get(var.name)
1287 .expect("provided var should be in state");
1288
1289 val.value_dyn()
1290 .as_any()
1291 .downcast_ref::<V>()
1292 .expect("provided var type should matched stored var")
1293 }
1294
1295 fn expect_config_value<V: ConfigType + 'static>(&self, name: &UncasedStr) -> &V {
1296 let val = self
1297 .vars
1298 .get(name)
1299 .unwrap_or_else(|| panic!("provided var {name} should be in state"));
1300
1301 val.value_dyn()
1302 .as_any()
1303 .downcast_ref()
1304 .expect("provided var type should matched stored var")
1305 }
1306
1307 pub fn reset_all(&mut self) {
1310 for (_, var) in &mut self.vars {
1311 var.reset();
1312 }
1313 }
1314
1315 pub fn iter(&self) -> impl Iterator<Item = &dyn Var> {
1318 self.vars
1319 .values()
1320 .map(|v| v.as_var())
1321 .filter(|v| !SESSION_SYSTEM_VARS.contains_key(UncasedStr::new(v.name())))
1322 }
1323
1324 pub fn iter_synced(&self) -> impl Iterator<Item = &dyn Var> {
1328 self.iter().filter(|v| v.name() != ENABLE_LAUNCHDARKLY.name)
1329 }
1330
1331 pub fn iter_session(&self) -> impl Iterator<Item = &dyn Var> {
1333 self.vars
1334 .values()
1335 .map(|v| v.as_var())
1336 .filter(|v| SESSION_SYSTEM_VARS.contains_key(UncasedStr::new(v.name())))
1337 }
1338
1339 pub fn user_modifiable(&self, name: &str) -> bool {
1341 SESSION_SYSTEM_VARS.contains_key(UncasedStr::new(name))
1342 || name == ENABLE_RBAC_CHECKS.name()
1343 || name == NETWORK_POLICY.name()
1344 }
1345
1346 pub fn get(&self, name: &str) -> Result<&dyn Var, VarError> {
1367 self.vars
1368 .get(UncasedStr::new(name))
1369 .map(|v| v.as_var())
1370 .ok_or_else(|| VarError::UnknownParameter(name.into()))
1371 }
1372
1373 pub fn is_default(&self, name: &str, input: VarInput) -> Result<bool, VarError> {
1387 self.vars
1388 .get(UncasedStr::new(name))
1389 .ok_or_else(|| VarError::UnknownParameter(name.into()))
1390 .and_then(|v| v.is_default(input))
1391 }
1392
1393 pub fn set(&mut self, name: &str, input: VarInput) -> Result<bool, VarError> {
1416 let result = self
1417 .vars
1418 .get_mut(UncasedStr::new(name))
1419 .ok_or_else(|| VarError::UnknownParameter(name.into()))
1420 .and_then(|v| v.set(input))?;
1421 self.notify_callbacks(name);
1422 Ok(result)
1423 }
1424
1425 pub fn parse(&self, name: &str, input: VarInput) -> Result<Box<dyn Value>, VarError> {
1446 self.vars
1447 .get(UncasedStr::new(name))
1448 .ok_or_else(|| VarError::UnknownParameter(name.into()))
1449 .and_then(|v| v.parse(input))
1450 }
1451
1452 pub fn set_default(&mut self, name: &str, input: VarInput) -> Result<(), VarError> {
1460 let result = self
1461 .vars
1462 .get_mut(UncasedStr::new(name))
1463 .ok_or_else(|| VarError::UnknownParameter(name.into()))
1464 .and_then(|v| v.set_default(input))?;
1465 self.notify_callbacks(name);
1466 Ok(result)
1467 }
1468
1469 pub fn reset(&mut self, name: &str) -> Result<bool, VarError> {
1487 let result = self
1488 .vars
1489 .get_mut(UncasedStr::new(name))
1490 .ok_or_else(|| VarError::UnknownParameter(name.into()))
1491 .map(|v| v.reset())?;
1492 self.notify_callbacks(name);
1493 Ok(result)
1494 }
1495
1496 pub fn defaults(&self) -> BTreeMap<String, String> {
1498 self.vars
1499 .iter()
1500 .map(|(name, var)| {
1501 let default = var
1502 .dynamic_default
1503 .as_deref()
1504 .unwrap_or_else(|| var.definition.default_value());
1505 (name.as_str().to_owned(), default.format())
1506 })
1507 .collect()
1508 }
1509
1510 pub fn register_callback(
1515 &mut self,
1516 var: &VarDefinition,
1517 callback: Arc<dyn Fn(&SystemVars) + Send + Sync>,
1518 ) {
1519 self.callbacks
1520 .entry(var.name().to_string())
1521 .or_default()
1522 .push(callback);
1523 self.notify_callbacks(var.name());
1524 }
1525
1526 fn notify_callbacks(&self, name: &str) {
1528 if let Some(callbacks) = self.callbacks.get(name) {
1530 for callback in callbacks {
1531 (callback)(self);
1532 }
1533 }
1534 }
1535
1536 pub fn default_cluster(&self) -> String {
1539 self.expect_value::<String>(&CLUSTER).to_owned()
1540 }
1541
1542 pub fn max_kafka_connections(&self) -> u32 {
1544 *self.expect_value(&MAX_KAFKA_CONNECTIONS)
1545 }
1546
1547 pub fn max_postgres_connections(&self) -> u32 {
1549 *self.expect_value(&MAX_POSTGRES_CONNECTIONS)
1550 }
1551
1552 pub fn max_mysql_connections(&self) -> u32 {
1554 *self.expect_value(&MAX_MYSQL_CONNECTIONS)
1555 }
1556
1557 pub fn max_sql_server_connections(&self) -> u32 {
1559 *self.expect_value(&MAX_SQL_SERVER_CONNECTIONS)
1560 }
1561
1562 pub fn max_aws_privatelink_connections(&self) -> u32 {
1564 *self.expect_value(&MAX_AWS_PRIVATELINK_CONNECTIONS)
1565 }
1566
1567 pub fn max_tables(&self) -> u32 {
1569 *self.expect_value(&MAX_TABLES)
1570 }
1571
1572 pub fn max_sources(&self) -> u32 {
1574 *self.expect_value(&MAX_SOURCES)
1575 }
1576
1577 pub fn max_sinks(&self) -> u32 {
1579 *self.expect_value(&MAX_SINKS)
1580 }
1581
1582 pub fn max_materialized_views(&self) -> u32 {
1584 *self.expect_value(&MAX_MATERIALIZED_VIEWS)
1585 }
1586
1587 pub fn max_clusters(&self) -> u32 {
1589 *self.expect_value(&MAX_CLUSTERS)
1590 }
1591
1592 pub fn max_replicas_per_cluster(&self) -> u32 {
1594 *self.expect_value(&MAX_REPLICAS_PER_CLUSTER)
1595 }
1596
1597 pub fn max_credit_consumption_rate(&self) -> Numeric {
1599 *self.expect_value(&MAX_CREDIT_CONSUMPTION_RATE)
1600 }
1601
1602 pub fn max_databases(&self) -> u32 {
1604 *self.expect_value(&MAX_DATABASES)
1605 }
1606
1607 pub fn max_schemas_per_database(&self) -> u32 {
1609 *self.expect_value(&MAX_SCHEMAS_PER_DATABASE)
1610 }
1611
1612 pub fn max_objects_per_schema(&self) -> u32 {
1614 *self.expect_value(&MAX_OBJECTS_PER_SCHEMA)
1615 }
1616
1617 pub fn max_secrets(&self) -> u32 {
1619 *self.expect_value(&MAX_SECRETS)
1620 }
1621
1622 pub fn max_roles(&self) -> u32 {
1624 *self.expect_value(&MAX_ROLES)
1625 }
1626
1627 pub fn max_continual_tasks(&self) -> u32 {
1629 *self.expect_value(&MAX_CONTINUAL_TASKS)
1630 }
1631
1632 pub fn max_network_policies(&self) -> u32 {
1634 *self.expect_value(&MAX_NETWORK_POLICIES)
1635 }
1636
1637 pub fn max_rules_per_network_policy(&self) -> u32 {
1639 *self.expect_value(&MAX_RULES_PER_NETWORK_POLICY)
1640 }
1641
1642 pub fn max_result_size(&self) -> u64 {
1644 self.expect_value::<ByteSize>(&MAX_RESULT_SIZE).as_bytes()
1645 }
1646
1647 pub fn max_copy_from_row_size(&self) -> u64 {
1649 self.expect_value::<ByteSize>(&MAX_COPY_FROM_ROW_SIZE)
1650 .as_bytes()
1651 }
1652
1653 pub fn allowed_cluster_replica_sizes(&self) -> Vec<String> {
1655 self.expect_value::<Vec<Ident>>(&ALLOWED_CLUSTER_REPLICA_SIZES)
1656 .into_iter()
1657 .map(|s| s.as_str().into())
1658 .collect()
1659 }
1660
1661 pub fn default_cluster_replication_factor(&self) -> u32 {
1663 *self.expect_value::<u32>(&DEFAULT_CLUSTER_REPLICATION_FACTOR)
1664 }
1665
1666 pub fn upsert_rocksdb_compaction_style(&self) -> mz_rocksdb_types::config::CompactionStyle {
1667 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_COMPACTION_STYLE)
1668 }
1669
1670 pub fn upsert_rocksdb_optimize_compaction_memtable_budget(&self) -> usize {
1671 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET)
1672 }
1673
1674 pub fn upsert_rocksdb_level_compaction_dynamic_level_bytes(&self) -> bool {
1675 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES)
1676 }
1677
1678 pub fn upsert_rocksdb_universal_compaction_ratio(&self) -> i32 {
1679 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_UNIVERSAL_COMPACTION_RATIO)
1680 }
1681
1682 pub fn upsert_rocksdb_parallelism(&self) -> Option<i32> {
1683 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_PARALLELISM)
1684 }
1685
1686 pub fn upsert_rocksdb_compression_type(&self) -> mz_rocksdb_types::config::CompressionType {
1687 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_COMPRESSION_TYPE)
1688 }
1689
1690 pub fn upsert_rocksdb_bottommost_compression_type(
1691 &self,
1692 ) -> mz_rocksdb_types::config::CompressionType {
1693 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_BOTTOMMOST_COMPRESSION_TYPE)
1694 }
1695
1696 pub fn upsert_rocksdb_batch_size(&self) -> usize {
1697 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_BATCH_SIZE)
1698 }
1699
1700 pub fn upsert_rocksdb_retry_duration(&self) -> Duration {
1701 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_RETRY_DURATION)
1702 }
1703
1704 pub fn upsert_rocksdb_stats_log_interval_seconds(&self) -> u32 {
1705 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_STATS_LOG_INTERVAL_SECONDS)
1706 }
1707
1708 pub fn upsert_rocksdb_stats_persist_interval_seconds(&self) -> u32 {
1709 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_STATS_PERSIST_INTERVAL_SECONDS)
1710 }
1711
1712 pub fn upsert_rocksdb_point_lookup_block_cache_size_mb(&self) -> Option<u32> {
1713 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_POINT_LOOKUP_BLOCK_CACHE_SIZE_MB)
1714 }
1715
1716 pub fn upsert_rocksdb_shrink_allocated_buffers_by_ratio(&self) -> usize {
1717 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_SHRINK_ALLOCATED_BUFFERS_BY_RATIO)
1718 }
1719
1720 pub fn upsert_rocksdb_write_buffer_manager_cluster_memory_fraction(&self) -> Option<Numeric> {
1721 *self.expect_value(
1722 &upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_CLUSTER_MEMORY_FRACTION,
1723 )
1724 }
1725
1726 pub fn upsert_rocksdb_write_buffer_manager_memory_bytes(&self) -> Option<usize> {
1727 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_MEMORY_BYTES)
1728 }
1729
1730 pub fn upsert_rocksdb_write_buffer_manager_allow_stall(&self) -> bool {
1731 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_ALLOW_STALL)
1732 }
1733
1734 pub fn persist_fast_path_limit(&self) -> usize {
1735 *self.expect_value(&PERSIST_FAST_PATH_LIMIT)
1736 }
1737
1738 pub fn pg_source_connect_timeout(&self) -> Duration {
1740 *self.expect_value(&PG_SOURCE_CONNECT_TIMEOUT)
1741 }
1742
1743 pub fn pg_source_tcp_keepalives_retries(&self) -> u32 {
1745 *self.expect_value(&PG_SOURCE_TCP_KEEPALIVES_RETRIES)
1746 }
1747
1748 pub fn pg_source_tcp_keepalives_idle(&self) -> Duration {
1750 *self.expect_value(&PG_SOURCE_TCP_KEEPALIVES_IDLE)
1751 }
1752
1753 pub fn pg_source_tcp_keepalives_interval(&self) -> Duration {
1755 *self.expect_value(&PG_SOURCE_TCP_KEEPALIVES_INTERVAL)
1756 }
1757
1758 pub fn pg_source_tcp_user_timeout(&self) -> Duration {
1760 *self.expect_value(&PG_SOURCE_TCP_USER_TIMEOUT)
1761 }
1762
1763 pub fn pg_source_tcp_configure_server(&self) -> bool {
1765 *self.expect_value(&PG_SOURCE_TCP_CONFIGURE_SERVER)
1766 }
1767
1768 pub fn pg_source_snapshot_statement_timeout(&self) -> Duration {
1770 *self.expect_value(&PG_SOURCE_SNAPSHOT_STATEMENT_TIMEOUT)
1771 }
1772
1773 pub fn pg_source_wal_sender_timeout(&self) -> Option<Duration> {
1775 *self.expect_value(&PG_SOURCE_WAL_SENDER_TIMEOUT)
1776 }
1777
1778 pub fn pg_source_snapshot_collect_strict_count(&self) -> bool {
1780 *self.expect_value(&PG_SOURCE_SNAPSHOT_COLLECT_STRICT_COUNT)
1781 }
1782
1783 pub fn mysql_source_tcp_keepalive(&self) -> Duration {
1785 *self.expect_value(&MYSQL_SOURCE_TCP_KEEPALIVE)
1786 }
1787
1788 pub fn mysql_source_snapshot_max_execution_time(&self) -> Duration {
1790 *self.expect_value(&MYSQL_SOURCE_SNAPSHOT_MAX_EXECUTION_TIME)
1791 }
1792
1793 pub fn mysql_source_snapshot_lock_wait_timeout(&self) -> Duration {
1795 *self.expect_value(&MYSQL_SOURCE_SNAPSHOT_LOCK_WAIT_TIMEOUT)
1796 }
1797
1798 pub fn mysql_source_connect_timeout(&self) -> Duration {
1800 *self.expect_value(&MYSQL_SOURCE_CONNECT_TIMEOUT)
1801 }
1802
1803 pub fn ssh_check_interval(&self) -> Duration {
1805 *self.expect_value(&SSH_CHECK_INTERVAL)
1806 }
1807
1808 pub fn ssh_connect_timeout(&self) -> Duration {
1810 *self.expect_value(&SSH_CONNECT_TIMEOUT)
1811 }
1812
1813 pub fn ssh_keepalives_idle(&self) -> Duration {
1815 *self.expect_value(&SSH_KEEPALIVES_IDLE)
1816 }
1817
1818 pub fn kafka_socket_keepalive(&self) -> bool {
1820 *self.expect_value(&KAFKA_SOCKET_KEEPALIVE)
1821 }
1822
1823 pub fn kafka_socket_timeout(&self) -> Option<Duration> {
1825 *self.expect_value(&KAFKA_SOCKET_TIMEOUT)
1826 }
1827
1828 pub fn kafka_transaction_timeout(&self) -> Duration {
1830 *self.expect_value(&KAFKA_TRANSACTION_TIMEOUT)
1831 }
1832
1833 pub fn kafka_socket_connection_setup_timeout(&self) -> Duration {
1835 *self.expect_value(&KAFKA_SOCKET_CONNECTION_SETUP_TIMEOUT)
1836 }
1837
1838 pub fn kafka_fetch_metadata_timeout(&self) -> Duration {
1840 *self.expect_value(&KAFKA_FETCH_METADATA_TIMEOUT)
1841 }
1842
1843 pub fn kafka_progress_record_fetch_timeout(&self) -> Option<Duration> {
1845 *self.expect_value(&KAFKA_PROGRESS_RECORD_FETCH_TIMEOUT)
1846 }
1847
1848 pub fn crdb_connect_timeout(&self) -> Duration {
1850 *self.expect_config_value(UncasedStr::new(
1851 mz_persist_client::cfg::CRDB_CONNECT_TIMEOUT.name(),
1852 ))
1853 }
1854
1855 pub fn crdb_tcp_user_timeout(&self) -> Duration {
1857 *self.expect_config_value(UncasedStr::new(
1858 mz_persist_client::cfg::CRDB_TCP_USER_TIMEOUT.name(),
1859 ))
1860 }
1861
1862 pub fn crdb_keepalives_idle(&self) -> Duration {
1864 *self.expect_config_value(UncasedStr::new(
1865 mz_persist_client::cfg::CRDB_KEEPALIVES_IDLE.name(),
1866 ))
1867 }
1868
1869 pub fn crdb_keepalives_interval(&self) -> Duration {
1871 *self.expect_config_value(UncasedStr::new(
1872 mz_persist_client::cfg::CRDB_KEEPALIVES_INTERVAL.name(),
1873 ))
1874 }
1875
1876 pub fn crdb_keepalives_retries(&self) -> u32 {
1878 *self.expect_config_value(UncasedStr::new(
1879 mz_persist_client::cfg::CRDB_KEEPALIVES_RETRIES.name(),
1880 ))
1881 }
1882
1883 pub fn storage_dataflow_max_inflight_bytes(&self) -> Option<usize> {
1885 *self.expect_value(&STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES)
1886 }
1887
1888 pub fn storage_dataflow_max_inflight_bytes_to_cluster_size_fraction(&self) -> Option<Numeric> {
1890 *self.expect_value(&STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_TO_CLUSTER_SIZE_FRACTION)
1891 }
1892
1893 pub fn storage_shrink_upsert_unused_buffers_by_ratio(&self) -> usize {
1895 *self.expect_value(&STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO)
1896 }
1897
1898 pub fn storage_dataflow_max_inflight_bytes_disk_only(&self) -> bool {
1900 *self.expect_value(&STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_DISK_ONLY)
1901 }
1902
1903 pub fn storage_statistics_interval(&self) -> Duration {
1905 *self.expect_value(&STORAGE_STATISTICS_INTERVAL)
1906 }
1907
1908 pub fn storage_statistics_collection_interval(&self) -> Duration {
1910 *self.expect_value(&STORAGE_STATISTICS_COLLECTION_INTERVAL)
1911 }
1912
1913 pub fn storage_record_source_sink_namespaced_errors(&self) -> bool {
1915 *self.expect_value(&STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS)
1916 }
1917
1918 pub fn persist_stats_filter_enabled(&self) -> bool {
1920 *self.expect_config_value(UncasedStr::new(
1921 mz_persist_client::stats::STATS_FILTER_ENABLED.name(),
1922 ))
1923 }
1924
1925 pub fn scram_iterations(&self) -> NonZeroU32 {
1926 *self.expect_value(&SCRAM_ITERATIONS)
1927 }
1928
1929 pub fn dyncfg_updates(&self) -> ConfigUpdates {
1930 let mut updates = ConfigUpdates::default();
1931 for entry in self.dyncfgs.entries() {
1932 let name = UncasedStr::new(entry.name());
1933 let val = match entry.val() {
1934 ConfigVal::Bool(_) => ConfigVal::from(*self.expect_config_value::<bool>(name)),
1935 ConfigVal::U32(_) => ConfigVal::from(*self.expect_config_value::<u32>(name)),
1936 ConfigVal::Usize(_) => ConfigVal::from(*self.expect_config_value::<usize>(name)),
1937 ConfigVal::OptUsize(_) => {
1938 ConfigVal::from(*self.expect_config_value::<Option<usize>>(name))
1939 }
1940 ConfigVal::F64(_) => ConfigVal::from(*self.expect_config_value::<f64>(name)),
1941 ConfigVal::String(_) => {
1942 ConfigVal::from(self.expect_config_value::<String>(name).clone())
1943 }
1944 ConfigVal::OptString(_) => {
1945 ConfigVal::from(self.expect_config_value::<Option<String>>(name).clone())
1946 }
1947 ConfigVal::Duration(_) => {
1948 ConfigVal::from(*self.expect_config_value::<Duration>(name))
1949 }
1950 ConfigVal::Json(_) => {
1951 ConfigVal::from(self.expect_config_value::<serde_json::Value>(name).clone())
1952 }
1953 };
1954 updates.add_dynamic(entry.name(), val);
1955 }
1956 updates.apply(&self.dyncfgs);
1957 updates
1958 }
1959
1960 pub fn metrics_retention(&self) -> Duration {
1962 *self.expect_value(&METRICS_RETENTION)
1963 }
1964
1965 pub fn unsafe_mock_audit_event_timestamp(&self) -> Option<mz_repr::Timestamp> {
1967 *self.expect_value(&UNSAFE_MOCK_AUDIT_EVENT_TIMESTAMP)
1968 }
1969
1970 pub fn enable_rbac_checks(&self) -> bool {
1972 *self.expect_value(&ENABLE_RBAC_CHECKS)
1973 }
1974
1975 pub fn max_connections(&self) -> u32 {
1977 *self.expect_value(&MAX_CONNECTIONS)
1978 }
1979
1980 pub fn default_network_policy_name(&self) -> String {
1981 self.expect_value::<String>(&NETWORK_POLICY).clone()
1982 }
1983
1984 pub fn superuser_reserved_connections(&self) -> u32 {
1986 *self.expect_value(&SUPERUSER_RESERVED_CONNECTIONS)
1987 }
1988
1989 pub fn keep_n_source_status_history_entries(&self) -> usize {
1990 *self.expect_value(&KEEP_N_SOURCE_STATUS_HISTORY_ENTRIES)
1991 }
1992
1993 pub fn keep_n_sink_status_history_entries(&self) -> usize {
1994 *self.expect_value(&KEEP_N_SINK_STATUS_HISTORY_ENTRIES)
1995 }
1996
1997 pub fn keep_n_privatelink_status_history_entries(&self) -> usize {
1998 *self.expect_value(&KEEP_N_PRIVATELINK_STATUS_HISTORY_ENTRIES)
1999 }
2000
2001 pub fn replica_status_history_retention_window(&self) -> Duration {
2002 *self.expect_value(&REPLICA_STATUS_HISTORY_RETENTION_WINDOW)
2003 }
2004
2005 pub fn enable_storage_shard_finalization(&self) -> bool {
2007 *self.expect_value(&ENABLE_STORAGE_SHARD_FINALIZATION)
2008 }
2009
2010 pub fn enable_consolidate_after_union_negate(&self) -> bool {
2011 *self.expect_value(&ENABLE_CONSOLIDATE_AFTER_UNION_NEGATE)
2012 }
2013
2014 pub fn enable_default_connection_validation(&self) -> bool {
2016 *self.expect_value(&ENABLE_DEFAULT_CONNECTION_VALIDATION)
2017 }
2018
2019 pub fn default_timestamp_interval(&self) -> Duration {
2021 *self.expect_value(&DEFAULT_TIMESTAMP_INTERVAL)
2022 }
2023
2024 pub fn min_timestamp_interval(&self) -> Duration {
2026 *self.expect_value(&MIN_TIMESTAMP_INTERVAL)
2027 }
2028 pub fn max_timestamp_interval(&self) -> Duration {
2030 *self.expect_value(&MAX_TIMESTAMP_INTERVAL)
2031 }
2032
2033 pub fn logging_filter(&self) -> CloneableEnvFilter {
2034 self.expect_value::<CloneableEnvFilter>(&LOGGING_FILTER)
2035 .clone()
2036 }
2037
2038 pub fn opentelemetry_filter(&self) -> CloneableEnvFilter {
2039 self.expect_value::<CloneableEnvFilter>(&OPENTELEMETRY_FILTER)
2040 .clone()
2041 }
2042
2043 pub fn logging_filter_defaults(&self) -> Vec<SerializableDirective> {
2044 self.expect_value::<Vec<SerializableDirective>>(&LOGGING_FILTER_DEFAULTS)
2045 .clone()
2046 }
2047
2048 pub fn opentelemetry_filter_defaults(&self) -> Vec<SerializableDirective> {
2049 self.expect_value::<Vec<SerializableDirective>>(&OPENTELEMETRY_FILTER_DEFAULTS)
2050 .clone()
2051 }
2052
2053 pub fn sentry_filters(&self) -> Vec<SerializableDirective> {
2054 self.expect_value::<Vec<SerializableDirective>>(&SENTRY_FILTERS)
2055 .clone()
2056 }
2057
2058 pub fn webhooks_secrets_caching_ttl_secs(&self) -> usize {
2059 *self.expect_value(&WEBHOOKS_SECRETS_CACHING_TTL_SECS)
2060 }
2061
2062 pub fn coord_slow_message_warn_threshold(&self) -> Duration {
2063 *self.expect_value(&COORD_SLOW_MESSAGE_WARN_THRESHOLD)
2064 }
2065
2066 pub fn grpc_client_http2_keep_alive_interval(&self) -> Duration {
2067 *self.expect_value(&grpc_client::HTTP2_KEEP_ALIVE_INTERVAL)
2068 }
2069
2070 pub fn grpc_client_http2_keep_alive_timeout(&self) -> Duration {
2071 *self.expect_value(&grpc_client::HTTP2_KEEP_ALIVE_TIMEOUT)
2072 }
2073
2074 pub fn grpc_connect_timeout(&self) -> Duration {
2075 *self.expect_value(&grpc_client::CONNECT_TIMEOUT)
2076 }
2077
2078 pub fn cluster_multi_process_replica_az_affinity_weight(&self) -> Option<i32> {
2079 *self.expect_value(&cluster_scheduling::CLUSTER_MULTI_PROCESS_REPLICA_AZ_AFFINITY_WEIGHT)
2080 }
2081
2082 pub fn cluster_soften_replication_anti_affinity(&self) -> bool {
2083 *self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY)
2084 }
2085
2086 pub fn cluster_soften_replication_anti_affinity_weight(&self) -> i32 {
2087 *self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT)
2088 }
2089
2090 pub fn cluster_enable_topology_spread(&self) -> bool {
2091 *self.expect_value(&cluster_scheduling::CLUSTER_ENABLE_TOPOLOGY_SPREAD)
2092 }
2093
2094 pub fn cluster_topology_spread_ignore_non_singular_scale(&self) -> bool {
2095 *self.expect_value(&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE)
2096 }
2097
2098 pub fn cluster_topology_spread_max_skew(&self) -> i32 {
2099 *self.expect_value(&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MAX_SKEW)
2100 }
2101
2102 pub fn cluster_topology_spread_set_min_domains(&self) -> Option<i32> {
2103 *self.expect_value(&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MIN_DOMAINS)
2104 }
2105
2106 pub fn cluster_topology_spread_soft(&self) -> bool {
2107 *self.expect_value(&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_SOFT)
2108 }
2109
2110 pub fn cluster_soften_az_affinity(&self) -> bool {
2111 *self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY)
2112 }
2113
2114 pub fn cluster_soften_az_affinity_weight(&self) -> i32 {
2115 *self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY_WEIGHT)
2116 }
2117
2118 pub fn cluster_alter_check_ready_interval(&self) -> Duration {
2119 *self.expect_value(&cluster_scheduling::CLUSTER_ALTER_CHECK_READY_INTERVAL)
2120 }
2121
2122 pub fn cluster_check_scheduling_policies_interval(&self) -> Duration {
2123 *self.expect_value(&cluster_scheduling::CLUSTER_CHECK_SCHEDULING_POLICIES_INTERVAL)
2124 }
2125
2126 pub fn cluster_security_context_enabled(&self) -> bool {
2127 *self.expect_value(&cluster_scheduling::CLUSTER_SECURITY_CONTEXT_ENABLED)
2128 }
2129
2130 pub fn cluster_refresh_mv_compaction_estimate(&self) -> Duration {
2131 *self.expect_value(&cluster_scheduling::CLUSTER_REFRESH_MV_COMPACTION_ESTIMATE)
2132 }
2133
2134 pub fn privatelink_status_update_quota_per_minute(&self) -> u32 {
2136 *self.expect_value(&PRIVATELINK_STATUS_UPDATE_QUOTA_PER_MINUTE)
2137 }
2138
2139 pub fn statement_logging_target_data_rate(&self) -> Option<usize> {
2140 *self.expect_value(&STATEMENT_LOGGING_TARGET_DATA_RATE)
2141 }
2142
2143 pub fn statement_logging_max_data_credit(&self) -> Option<usize> {
2144 *self.expect_value(&STATEMENT_LOGGING_MAX_DATA_CREDIT)
2145 }
2146
2147 pub fn statement_logging_max_sample_rate(&self) -> Numeric {
2149 *self.expect_value(&STATEMENT_LOGGING_MAX_SAMPLE_RATE)
2150 }
2151
2152 pub fn statement_logging_default_sample_rate(&self) -> Numeric {
2154 *self.expect_value(&STATEMENT_LOGGING_DEFAULT_SAMPLE_RATE)
2155 }
2156
2157 pub fn enable_internal_statement_logging(&self) -> bool {
2159 *self.expect_value(&ENABLE_INTERNAL_STATEMENT_LOGGING)
2160 }
2161
2162 pub fn optimizer_stats_timeout(&self) -> Duration {
2164 *self.expect_value(&OPTIMIZER_STATS_TIMEOUT)
2165 }
2166
2167 pub fn optimizer_oneshot_stats_timeout(&self) -> Duration {
2169 *self.expect_value(&OPTIMIZER_ONESHOT_STATS_TIMEOUT)
2170 }
2171
2172 pub fn webhook_concurrent_request_limit(&self) -> usize {
2174 *self.expect_value(&WEBHOOK_CONCURRENT_REQUEST_LIMIT)
2175 }
2176
2177 pub fn pg_timestamp_oracle_connection_pool_max_size(&self) -> usize {
2179 *self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_SIZE)
2180 }
2181
2182 pub fn pg_timestamp_oracle_connection_pool_max_wait(&self) -> Option<Duration> {
2184 *self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_WAIT)
2185 }
2186
2187 pub fn pg_timestamp_oracle_connection_pool_ttl(&self) -> Duration {
2189 *self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL)
2190 }
2191
2192 pub fn pg_timestamp_oracle_connection_pool_ttl_stagger(&self) -> Duration {
2194 *self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL_STAGGER)
2195 }
2196
2197 pub fn user_storage_managed_collections_batch_duration(&self) -> Duration {
2199 *self.expect_value(&USER_STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION)
2200 }
2201
2202 pub fn force_source_table_syntax(&self) -> bool {
2203 *self.expect_value(&FORCE_SOURCE_TABLE_SYNTAX)
2204 }
2205
2206 pub fn optimizer_e2e_latency_warning_threshold(&self) -> Duration {
2207 *self.expect_value(&OPTIMIZER_E2E_LATENCY_WARNING_THRESHOLD)
2208 }
2209
2210 pub fn is_controller_config_var(&self, name: &str) -> bool {
2212 self.is_dyncfg_var(name)
2213 }
2214
2215 pub fn is_compute_config_var(&self, name: &str) -> bool {
2219 name == MAX_RESULT_SIZE.name() || self.is_dyncfg_var(name) || is_tracing_var(name)
2220 }
2221
2222 pub fn is_metrics_config_var(&self, name: &str) -> bool {
2224 self.is_dyncfg_var(name)
2225 }
2226
2227 pub fn is_storage_config_var(&self, name: &str) -> bool {
2229 name == PG_SOURCE_CONNECT_TIMEOUT.name()
2230 || name == PG_SOURCE_TCP_KEEPALIVES_IDLE.name()
2231 || name == PG_SOURCE_TCP_KEEPALIVES_INTERVAL.name()
2232 || name == PG_SOURCE_TCP_KEEPALIVES_RETRIES.name()
2233 || name == PG_SOURCE_TCP_USER_TIMEOUT.name()
2234 || name == PG_SOURCE_TCP_CONFIGURE_SERVER.name()
2235 || name == PG_SOURCE_SNAPSHOT_STATEMENT_TIMEOUT.name()
2236 || name == PG_SOURCE_WAL_SENDER_TIMEOUT.name()
2237 || name == PG_SOURCE_SNAPSHOT_COLLECT_STRICT_COUNT.name()
2238 || name == MYSQL_SOURCE_TCP_KEEPALIVE.name()
2239 || name == MYSQL_SOURCE_SNAPSHOT_MAX_EXECUTION_TIME.name()
2240 || name == MYSQL_SOURCE_SNAPSHOT_LOCK_WAIT_TIMEOUT.name()
2241 || name == MYSQL_SOURCE_CONNECT_TIMEOUT.name()
2242 || name == ENABLE_STORAGE_SHARD_FINALIZATION.name()
2243 || name == SSH_CHECK_INTERVAL.name()
2244 || name == SSH_CONNECT_TIMEOUT.name()
2245 || name == SSH_KEEPALIVES_IDLE.name()
2246 || name == KAFKA_SOCKET_KEEPALIVE.name()
2247 || name == KAFKA_SOCKET_TIMEOUT.name()
2248 || name == KAFKA_TRANSACTION_TIMEOUT.name()
2249 || name == KAFKA_SOCKET_CONNECTION_SETUP_TIMEOUT.name()
2250 || name == KAFKA_FETCH_METADATA_TIMEOUT.name()
2251 || name == KAFKA_PROGRESS_RECORD_FETCH_TIMEOUT.name()
2252 || name == STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES.name()
2253 || name == STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_TO_CLUSTER_SIZE_FRACTION.name()
2254 || name == STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_DISK_ONLY.name()
2255 || name == STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO.name()
2256 || name == STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS.name()
2257 || name == STORAGE_STATISTICS_INTERVAL.name()
2258 || name == STORAGE_STATISTICS_COLLECTION_INTERVAL.name()
2259 || name == USER_STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION.name()
2260 || is_upsert_rocksdb_config_var(name)
2261 || self.is_dyncfg_var(name)
2262 || is_tracing_var(name)
2263 }
2264
2265 fn is_dyncfg_var(&self, name: &str) -> bool {
2267 self.dyncfgs.entries().any(|e| name == e.name())
2268 }
2269}
2270
2271pub fn is_tracing_var(name: &str) -> bool {
2272 name == LOGGING_FILTER.name()
2273 || name == LOGGING_FILTER_DEFAULTS.name()
2274 || name == OPENTELEMETRY_FILTER.name()
2275 || name == OPENTELEMETRY_FILTER_DEFAULTS.name()
2276 || name == SENTRY_FILTERS.name()
2277}
2278
2279pub fn is_secrets_caching_var(name: &str) -> bool {
2281 name == WEBHOOKS_SECRETS_CACHING_TTL_SECS.name()
2282}
2283
2284fn is_upsert_rocksdb_config_var(name: &str) -> bool {
2285 name == upsert_rocksdb::UPSERT_ROCKSDB_COMPACTION_STYLE.name()
2286 || name == upsert_rocksdb::UPSERT_ROCKSDB_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET.name()
2287 || name == upsert_rocksdb::UPSERT_ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES.name()
2288 || name == upsert_rocksdb::UPSERT_ROCKSDB_UNIVERSAL_COMPACTION_RATIO.name()
2289 || name == upsert_rocksdb::UPSERT_ROCKSDB_PARALLELISM.name()
2290 || name == upsert_rocksdb::UPSERT_ROCKSDB_COMPRESSION_TYPE.name()
2291 || name == upsert_rocksdb::UPSERT_ROCKSDB_BOTTOMMOST_COMPRESSION_TYPE.name()
2292 || name == upsert_rocksdb::UPSERT_ROCKSDB_BATCH_SIZE.name()
2293 || name == upsert_rocksdb::UPSERT_ROCKSDB_STATS_LOG_INTERVAL_SECONDS.name()
2294 || name == upsert_rocksdb::UPSERT_ROCKSDB_STATS_PERSIST_INTERVAL_SECONDS.name()
2295 || name == upsert_rocksdb::UPSERT_ROCKSDB_POINT_LOOKUP_BLOCK_CACHE_SIZE_MB.name()
2296 || name == upsert_rocksdb::UPSERT_ROCKSDB_SHRINK_ALLOCATED_BUFFERS_BY_RATIO.name()
2297}
2298
2299pub fn is_timestamp_oracle_config_var(name: &str) -> bool {
2302 name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_SIZE.name()
2303 || name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_WAIT.name()
2304 || name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL.name()
2305 || name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL_STAGGER.name()
2306 || name == CRDB_CONNECT_TIMEOUT.name()
2307 || name == CRDB_TCP_USER_TIMEOUT.name()
2308 || name == CRDB_KEEPALIVES_IDLE.name()
2309 || name == CRDB_KEEPALIVES_INTERVAL.name()
2310 || name == CRDB_KEEPALIVES_RETRIES.name()
2311}
2312
2313pub fn is_cluster_scheduling_var(name: &str) -> bool {
2315 name == cluster_scheduling::CLUSTER_MULTI_PROCESS_REPLICA_AZ_AFFINITY_WEIGHT.name()
2316 || name == cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY.name()
2317 || name == cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT.name()
2318 || name == cluster_scheduling::CLUSTER_ENABLE_TOPOLOGY_SPREAD.name()
2319 || name == cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE.name()
2320 || name == cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MAX_SKEW.name()
2321 || name == cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_SOFT.name()
2322 || name == cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY.name()
2323 || name == cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY_WEIGHT.name()
2324}
2325
2326pub fn is_http_config_var(name: &str) -> bool {
2328 name == WEBHOOK_CONCURRENT_REQUEST_LIMIT.name()
2329}
2330
2331static SESSION_SYSTEM_VARS: LazyLock<BTreeMap<&'static UncasedStr, &'static VarDefinition>> =
2335 LazyLock::new(|| {
2336 [
2337 &APPLICATION_NAME,
2338 &CLIENT_ENCODING,
2339 &CLIENT_MIN_MESSAGES,
2340 &CLUSTER,
2341 &CLUSTER_REPLICA,
2342 &DEFAULT_CLUSTER_REPLICATION_FACTOR,
2343 &CURRENT_OBJECT_MISSING_WARNINGS,
2344 &DATABASE,
2345 &DATE_STYLE,
2346 &EXTRA_FLOAT_DIGITS,
2347 &INTEGER_DATETIMES,
2348 &INTERVAL_STYLE,
2349 &REAL_TIME_RECENCY_TIMEOUT,
2350 &SEARCH_PATH,
2351 &STANDARD_CONFORMING_STRINGS,
2352 &STATEMENT_TIMEOUT,
2353 &IDLE_IN_TRANSACTION_SESSION_TIMEOUT,
2354 &TIMEZONE,
2355 &TRANSACTION_ISOLATION,
2356 &MAX_QUERY_RESULT_SIZE,
2357 ]
2358 .into_iter()
2359 .map(|var| (UncasedStr::new(var.name()), var))
2360 .collect()
2361 });
2362
2363#[derive(Debug)]
2366pub struct FeatureFlag {
2367 pub flag: &'static VarDefinition,
2368 pub feature_desc: &'static str,
2369}
2370
2371impl FeatureFlag {
2372 pub fn require(&'static self, system_vars: &SystemVars) -> Result<(), VarError> {
2375 match *system_vars.expect_value::<bool>(self.flag) {
2376 true => Ok(()),
2377 false => Err(VarError::RequiresFeatureFlag { feature_flag: self }),
2378 }
2379 }
2380}
2381
2382impl PartialEq for FeatureFlag {
2383 fn eq(&self, other: &FeatureFlag) -> bool {
2384 self.flag.name() == other.flag.name()
2385 }
2386}
2387
2388impl Eq for FeatureFlag {}
2389
2390impl Var for MzVersion {
2391 fn name(&self) -> &'static str {
2392 MZ_VERSION_NAME.as_str()
2393 }
2394
2395 fn value(&self) -> String {
2396 self.build_info
2397 .human_version(self.helm_chart_version.clone())
2398 }
2399
2400 fn description(&self) -> &'static str {
2401 "Shows the Materialize server version (Materialize)."
2402 }
2403
2404 fn type_name(&self) -> Cow<'static, str> {
2405 String::type_name()
2406 }
2407
2408 fn visible(&self, _: &User, _: &SystemVars) -> Result<(), VarError> {
2409 Ok(())
2410 }
2411}
2412
2413impl Var for User {
2414 fn name(&self) -> &'static str {
2415 IS_SUPERUSER_NAME.as_str()
2416 }
2417
2418 fn value(&self) -> String {
2419 self.is_superuser().format()
2420 }
2421
2422 fn description(&self) -> &'static str {
2423 "Reports whether the current session is a superuser (PostgreSQL)."
2424 }
2425
2426 fn type_name(&self) -> Cow<'static, str> {
2427 bool::type_name()
2428 }
2429
2430 fn visible(&self, _: &User, _: &SystemVars) -> Result<(), VarError> {
2431 Ok(())
2432 }
2433}