1use std::borrow::Cow;
67use std::clone::Clone;
68use std::collections::BTreeMap;
69use std::fmt::Debug;
70use std::net::IpAddr;
71use std::num::NonZeroU32;
72use std::string::ToString;
73use std::sync::{Arc, LazyLock};
74use std::time::Duration;
75
76use chrono::{DateTime, Utc};
77use derivative::Derivative;
78use imbl::OrdMap;
79use mz_build_info::BuildInfo;
80use mz_dyncfg::{ConfigSet, ConfigType, ConfigUpdates, ConfigVal};
81use mz_persist_client::cfg::{
82 CRDB_CONNECT_TIMEOUT, CRDB_KEEPALIVES_IDLE, CRDB_KEEPALIVES_INTERVAL, CRDB_KEEPALIVES_RETRIES,
83 CRDB_TCP_USER_TIMEOUT,
84};
85use mz_repr::adt::numeric::Numeric;
86use mz_repr::adt::timestamp::CheckedTimestamp;
87use mz_repr::bytes::ByteSize;
88use mz_repr::user::{ExternalUserMetadata, InternalUserMetadata};
89use mz_tracing::{CloneableEnvFilter, SerializableDirective};
90use serde::Serialize;
91use thiserror::Error;
92use uncased::UncasedStr;
93
94use crate::ast::Ident;
95use crate::session::user::User;
96
97pub(crate) mod constraints;
98pub(crate) mod definitions;
99pub(crate) mod errors;
100pub(crate) mod polyfill;
101pub(crate) mod value;
102
103pub use definitions::*;
104pub use errors::*;
105pub use value::*;
106
107#[derive(Debug, Clone, Copy, PartialEq, Eq)]
112pub enum EndTransactionAction {
113 Commit,
115 Rollback,
117}
118
119#[derive(Debug, Clone, Copy)]
125pub enum VarInput<'a> {
126 Flat(&'a str),
128 SqlSet(&'a [String]),
131}
132
133impl<'a> VarInput<'a> {
134 pub fn to_vec(&self) -> Vec<String> {
136 match self {
137 VarInput::Flat(v) => vec![v.to_string()],
138 VarInput::SqlSet(values) => values.into_iter().map(|v| v.to_string()).collect(),
139 }
140 }
141}
142
143#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize)]
145pub enum OwnedVarInput {
146 Flat(String),
148 SqlSet(Vec<String>),
150}
151
152impl OwnedVarInput {
153 pub fn borrow(&self) -> VarInput<'_> {
155 match self {
156 OwnedVarInput::Flat(v) => VarInput::Flat(v),
157 OwnedVarInput::SqlSet(v) => VarInput::SqlSet(v),
158 }
159 }
160}
161
162pub trait Var: Debug {
164 fn name(&self) -> &'static str;
166
167 fn value(&self) -> String;
173
174 fn description(&self) -> &'static str;
177
178 fn type_name(&self) -> Cow<'static, str>;
180
181 fn visible(&self, user: &User, system_vars: &SystemVars) -> Result<(), VarError>;
186
187 fn is_unsafe(&self) -> bool {
189 self.name().starts_with("unsafe_")
190 }
191
192 fn as_var(&self) -> &dyn Var
195 where
196 Self: Sized,
197 {
198 self
199 }
200}
201
202#[derive(Debug)]
209pub struct SessionVar {
210 definition: VarDefinition,
211 default_value: Option<Box<dyn Value>>,
213 local_value: Option<Box<dyn Value>>,
215 staged_value: Option<Box<dyn Value>>,
217 session_value: Option<Box<dyn Value>>,
219}
220
221impl Clone for SessionVar {
222 fn clone(&self) -> Self {
223 SessionVar {
224 definition: self.definition.clone(),
225 default_value: self.default_value.as_ref().map(|v| v.box_clone()),
226 local_value: self.local_value.as_ref().map(|v| v.box_clone()),
227 staged_value: self.staged_value.as_ref().map(|v| v.box_clone()),
228 session_value: self.session_value.as_ref().map(|v| v.box_clone()),
229 }
230 }
231}
232
233impl SessionVar {
234 pub const fn new(var: VarDefinition) -> Self {
235 SessionVar {
236 definition: var,
237 default_value: None,
238 local_value: None,
239 staged_value: None,
240 session_value: None,
241 }
242 }
243
244 pub fn check(&self, input: VarInput) -> Result<String, VarError> {
247 let v = self.definition.parse(input)?;
248 self.validate_constraints(v.as_ref())?;
249
250 Ok(v.format())
251 }
252
253 pub fn set(&mut self, input: VarInput, local: bool) -> Result<(), VarError> {
255 let v = self.definition.parse(input)?;
256
257 self.validate_constraints(v.as_ref())?;
259
260 if local {
261 self.local_value = Some(v);
262 } else {
263 self.local_value = None;
264 self.staged_value = Some(v);
265 }
266 Ok(())
267 }
268
269 pub fn set_default(&mut self, input: VarInput) -> Result<(), VarError> {
271 let v = self.definition.parse(input)?;
272 self.validate_constraints(v.as_ref())?;
273 self.default_value = Some(v);
274 Ok(())
275 }
276
277 pub fn reset(&mut self, local: bool) {
279 let value = self
280 .default_value
281 .as_ref()
282 .map(|v| v.as_ref())
283 .unwrap_or_else(|| self.definition.value.value());
284 if local {
285 self.local_value = Some(value.box_clone());
286 } else {
287 self.local_value = None;
288 self.staged_value = Some(value.box_clone());
289 }
290 }
291
292 #[must_use]
294 pub fn end_transaction(&self, action: EndTransactionAction) -> Option<Self> {
295 if !self.is_mutating() {
296 return None;
297 }
298 let mut next: Self = self.clone();
299 next.local_value = None;
300 match action {
301 EndTransactionAction::Commit if next.staged_value.is_some() => {
302 next.session_value = next.staged_value.take()
303 }
304 _ => next.staged_value = None,
305 }
306 Some(next)
307 }
308
309 pub fn is_mutating(&self) -> bool {
311 self.local_value.is_some() || self.staged_value.is_some()
312 }
313
314 pub fn value_dyn(&self) -> &dyn Value {
315 self.local_value
316 .as_deref()
317 .or(self.staged_value.as_deref())
318 .or(self.session_value.as_deref())
319 .or(self.default_value.as_deref())
320 .unwrap_or_else(|| self.definition.value.value())
321 }
322
323 pub fn inspect_session_value(&self) -> Option<&dyn Value> {
328 self.session_value.as_deref()
329 }
330
331 fn validate_constraints(&self, val: &dyn Value) -> Result<(), VarError> {
332 if let Some(constraint) = &self.definition.constraint {
333 constraint.check_constraint(self, self.value_dyn(), val)
334 } else {
335 Ok(())
336 }
337 }
338}
339
340impl Var for SessionVar {
341 fn name(&self) -> &'static str {
342 self.definition.name.as_str()
343 }
344
345 fn value(&self) -> String {
346 self.value_dyn().format()
347 }
348
349 fn description(&self) -> &'static str {
350 self.definition.description
351 }
352
353 fn type_name(&self) -> Cow<'static, str> {
354 self.definition.type_name()
355 }
356
357 fn visible(
358 &self,
359 user: &User,
360 system_vars: &super::vars::SystemVars,
361 ) -> Result<(), super::vars::VarError> {
362 self.definition.visible(user, system_vars)
363 }
364}
365
366#[derive(Debug, Clone, PartialEq, Eq)]
367pub struct MzVersion {
368 build_info: &'static BuildInfo,
370 helm_chart_version: Option<String>,
372}
373
374impl MzVersion {
375 pub fn new(build_info: &'static BuildInfo, helm_chart_version: Option<String>) -> Self {
376 MzVersion {
377 build_info,
378 helm_chart_version,
379 }
380 }
381}
382
383#[derive(Debug, Clone)]
388pub struct SessionVars {
389 vars: OrdMap<&'static UncasedStr, SessionVar>,
391 mz_version: MzVersion,
393 user: User,
395}
396
397impl SessionVars {
398 pub fn new_unchecked(
400 build_info: &'static BuildInfo,
401 user: User,
402 helm_chart_version: Option<String>,
403 ) -> SessionVars {
404 use definitions::*;
405
406 let vars = [
407 &FAILPOINTS,
408 &SERVER_VERSION,
409 &SERVER_VERSION_NUM,
410 &SQL_SAFE_UPDATES,
411 &REAL_TIME_RECENCY,
412 &EMIT_PLAN_INSIGHTS_NOTICE,
413 &EMIT_TIMESTAMP_NOTICE,
414 &EMIT_TRACE_ID_NOTICE,
415 &AUTO_ROUTE_CATALOG_QUERIES,
416 &ENABLE_SESSION_RBAC_CHECKS,
417 &RESTRICT_TO_USER_OBJECTS,
418 &ENABLE_SESSION_CARDINALITY_ESTIMATES,
419 &MAX_IDENTIFIER_LENGTH,
420 &STATEMENT_LOGGING_SAMPLE_RATE,
421 &EMIT_INTROSPECTION_QUERY_NOTICE,
422 &UNSAFE_NEW_TRANSACTION_WALL_TIME,
423 &WELCOME_MESSAGE,
424 ]
425 .into_iter()
426 .chain(SESSION_SYSTEM_VARS.iter().map(|(_name, var)| *var))
427 .map(|var| (var.name, SessionVar::new(var.clone())))
428 .collect();
429
430 SessionVars {
431 vars,
432 mz_version: MzVersion::new(build_info, helm_chart_version),
433 user,
434 }
435 }
436
437 fn expect_value<V: Value>(&self, var: &VarDefinition) -> &V {
438 let var = self
439 .vars
440 .get(var.name)
441 .expect("provided var should be in state");
442 let val = var.value_dyn();
443 val.as_any().downcast_ref::<V>().expect("success")
444 }
445
446 pub fn iter(&self) -> impl Iterator<Item = &dyn Var> {
453 #[allow(clippy::as_conversions)]
454 self.vars
455 .values()
456 .map(|v| v.as_var())
457 .chain([&self.mz_version as &dyn Var, &self.user])
458 }
459
460 pub fn notify_set(&self) -> impl Iterator<Item = &dyn Var> {
464 [
471 &APPLICATION_NAME,
472 &CLIENT_ENCODING,
473 &DATE_STYLE,
474 &INTEGER_DATETIMES,
475 &SERVER_VERSION,
476 &STANDARD_CONFORMING_STRINGS,
477 &TIMEZONE,
478 &INTERVAL_STYLE,
479 &CLUSTER,
484 &CLUSTER_REPLICA,
485 &DEFAULT_CLUSTER_REPLICATION_FACTOR,
486 &DATABASE,
487 &SEARCH_PATH,
488 ]
489 .into_iter()
490 .map(|v| self.vars[v.name].as_var())
491 .chain(std::iter::once(self.mz_version.as_var()))
498 }
499
500 pub fn reset_all(&mut self) {
502 let names: Vec<_> = self.vars.keys().copied().collect();
503 for name in names {
504 self.vars[name].reset(false);
505 }
506 }
507
508 pub fn get(&self, system_vars: &SystemVars, name: &str) -> Result<&dyn Var, VarError> {
519 let name = compat_translate_name(name);
520
521 let name = UncasedStr::new(name);
522 if name == MZ_VERSION_NAME {
523 Ok(&self.mz_version)
524 } else if name == IS_SUPERUSER_NAME {
525 Ok(&self.user)
526 } else {
527 self.vars
528 .get(name)
529 .map(|v| {
530 v.visible(&self.user, system_vars)?;
531 Ok(v.as_var())
532 })
533 .transpose()?
534 .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
535 }
536 }
537
538 pub fn inspect(&self, name: &str) -> Result<&SessionVar, VarError> {
543 let name = compat_translate_name(name);
544
545 self.vars
546 .get(UncasedStr::new(name))
547 .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
548 }
549
550 pub fn set(
563 &mut self,
564 system_vars: &SystemVars,
565 name: &str,
566 input: VarInput,
567 local: bool,
568 ) -> Result<(), VarError> {
569 let (name, input) = compat_translate(name, input);
570
571 let name = UncasedStr::new(name);
572 self.check_read_only(name)?;
573
574 self.vars
575 .get_mut(name)
576 .map(|v| {
577 v.visible(&self.user, system_vars)?;
578 v.set(input, local)
579 })
580 .transpose()?
581 .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
582 }
583
584 pub fn set_default(&mut self, name: &str, input: VarInput) -> Result<(), VarError> {
587 let (name, input) = compat_translate(name, input);
588
589 let name = UncasedStr::new(name);
590
591 if !Self::allow_role_default(name) {
595 self.check_read_only(name)?;
596 }
597
598 self.vars
599 .get_mut(name)
600 .map(|v| v.set_default(input))
602 .transpose()?
603 .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
604 }
605
606 fn allow_role_default(name: &UncasedStr) -> bool {
614 name == RESTRICT_TO_USER_OBJECTS.name
615 }
616
617 pub fn reset(
631 &mut self,
632 system_vars: &SystemVars,
633 name: &str,
634 local: bool,
635 ) -> Result<(), VarError> {
636 let name = compat_translate_name(name);
637
638 let name = UncasedStr::new(name);
639 self.check_read_only(name)?;
640
641 self.vars
642 .get_mut(name)
643 .map(|v| {
644 v.visible(&self.user, system_vars)?;
645 v.reset(local);
646 Ok(())
647 })
648 .transpose()?
649 .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
650 }
651
652 fn check_read_only(&self, name: &UncasedStr) -> Result<(), VarError> {
659 if name == MZ_VERSION_NAME {
660 Err(VarError::ReadOnlyParameter(MZ_VERSION_NAME.as_str()))
661 } else if name == IS_SUPERUSER_NAME {
662 Err(VarError::ReadOnlyParameter(IS_SUPERUSER_NAME.as_str()))
663 } else if name == MAX_IDENTIFIER_LENGTH.name {
664 Err(VarError::ReadOnlyParameter(
665 MAX_IDENTIFIER_LENGTH.name.as_str(),
666 ))
667 } else if name == RESTRICT_TO_USER_OBJECTS.name {
668 Err(VarError::ReadOnlyParameter(
672 RESTRICT_TO_USER_OBJECTS.name.as_str(),
673 ))
674 } else {
675 Ok(())
676 }
677 }
678
679 #[mz_ore::instrument(level = "debug")]
684 pub fn end_transaction(
685 &mut self,
686 action: EndTransactionAction,
687 ) -> BTreeMap<&'static str, String> {
688 let mut changed = BTreeMap::new();
689 let mut updates = Vec::new();
690 for (name, var) in self.vars.iter() {
691 if !var.is_mutating() {
692 continue;
693 }
694 let before = var.value();
695 let next = var.end_transaction(action).expect("must mutate");
696 let after = next.value();
697 updates.push((*name, next));
698
699 if before != after {
701 changed.insert(var.name(), after);
702 }
703 }
704 self.vars.extend(updates);
705 changed
706 }
707
708 pub fn application_name(&self) -> &str {
710 self.expect_value::<String>(&APPLICATION_NAME).as_str()
711 }
712
713 pub fn build_info(&self) -> &'static BuildInfo {
715 self.mz_version.build_info
716 }
717
718 pub fn client_encoding(&self) -> &ClientEncoding {
720 self.expect_value(&CLIENT_ENCODING)
721 }
722
723 pub fn client_min_messages(&self) -> &ClientSeverity {
725 self.expect_value(&CLIENT_MIN_MESSAGES)
726 }
727
728 pub fn cluster(&self) -> &str {
730 self.expect_value::<String>(&CLUSTER).as_str()
731 }
732
733 pub fn cluster_replica(&self) -> Option<&str> {
735 self.expect_value::<Option<String>>(&CLUSTER_REPLICA)
736 .as_deref()
737 }
738
739 pub fn current_object_missing_warnings(&self) -> bool {
742 *self.expect_value::<bool>(&CURRENT_OBJECT_MISSING_WARNINGS)
743 }
744
745 pub fn date_style(&self) -> &[&str] {
747 &self.expect_value::<DateStyle>(&DATE_STYLE).0
748 }
749
750 pub fn database(&self) -> &str {
752 self.expect_value::<String>(&DATABASE).as_str()
753 }
754
755 pub fn extra_float_digits(&self) -> i32 {
757 *self.expect_value(&EXTRA_FLOAT_DIGITS)
758 }
759
760 pub fn integer_datetimes(&self) -> bool {
762 *self.expect_value(&INTEGER_DATETIMES)
763 }
764
765 pub fn intervalstyle(&self) -> &IntervalStyle {
767 self.expect_value(&INTERVAL_STYLE)
768 }
769
770 pub fn mz_version(&self) -> String {
772 self.mz_version.value()
773 }
774
775 pub fn search_path(&self) -> &[Ident] {
777 self.expect_value::<Vec<Ident>>(&SEARCH_PATH).as_slice()
778 }
779
780 pub fn server_version(&self) -> &str {
782 self.expect_value::<String>(&SERVER_VERSION).as_str()
783 }
784
785 pub fn server_version_num(&self) -> i32 {
787 *self.expect_value(&SERVER_VERSION_NUM)
788 }
789
790 pub fn sql_safe_updates(&self) -> bool {
792 *self.expect_value(&SQL_SAFE_UPDATES)
793 }
794
795 pub fn standard_conforming_strings(&self) -> bool {
798 *self.expect_value(&STANDARD_CONFORMING_STRINGS)
799 }
800
801 pub fn statement_timeout(&self) -> &Duration {
803 self.expect_value(&STATEMENT_TIMEOUT)
804 }
805
806 pub fn idle_in_transaction_session_timeout(&self) -> &Duration {
808 self.expect_value(&IDLE_IN_TRANSACTION_SESSION_TIMEOUT)
809 }
810
811 pub fn timezone(&self) -> &TimeZone {
813 self.expect_value(&TIMEZONE)
814 }
815
816 pub fn transaction_isolation(&self) -> &IsolationLevel {
819 self.expect_value(&TRANSACTION_ISOLATION)
820 }
821
822 pub fn real_time_recency(&self) -> bool {
824 *self.expect_value(&REAL_TIME_RECENCY)
825 }
826
827 pub fn real_time_recency_timeout(&self) -> &Duration {
829 self.expect_value(&REAL_TIME_RECENCY_TIMEOUT)
830 }
831
832 pub fn emit_plan_insights_notice(&self) -> bool {
834 *self.expect_value(&EMIT_PLAN_INSIGHTS_NOTICE)
835 }
836
837 pub fn emit_timestamp_notice(&self) -> bool {
839 *self.expect_value(&EMIT_TIMESTAMP_NOTICE)
840 }
841
842 pub fn emit_trace_id_notice(&self) -> bool {
844 *self.expect_value(&EMIT_TRACE_ID_NOTICE)
845 }
846
847 pub fn auto_route_catalog_queries(&self) -> bool {
849 *self.expect_value(&AUTO_ROUTE_CATALOG_QUERIES)
850 }
851
852 pub fn enable_session_rbac_checks(&self) -> bool {
854 *self.expect_value(&ENABLE_SESSION_RBAC_CHECKS)
855 }
856
857 pub fn restrict_to_user_objects(&self) -> bool {
859 *self.expect_value(&RESTRICT_TO_USER_OBJECTS)
860 }
861
862 pub fn enable_session_cardinality_estimates(&self) -> bool {
864 *self.expect_value(&ENABLE_SESSION_CARDINALITY_ESTIMATES)
865 }
866
867 pub fn is_superuser(&self) -> bool {
869 self.user.is_superuser()
870 }
871
872 pub fn user(&self) -> &User {
874 &self.user
875 }
876
877 pub fn max_query_result_size(&self) -> u64 {
879 self.expect_value::<ByteSize>(&MAX_QUERY_RESULT_SIZE)
880 .as_bytes()
881 }
882
883 pub fn set_internal_user_metadata(&mut self, metadata: InternalUserMetadata) {
885 self.user.internal_metadata = Some(metadata);
886 }
887
888 pub fn set_external_user_metadata(&mut self, metadata: ExternalUserMetadata) {
890 self.user.external_metadata = Some(metadata);
891 }
892
893 pub fn set_cluster(&mut self, cluster: String) {
894 let var = self
895 .vars
896 .get_mut(UncasedStr::new(CLUSTER.name()))
897 .expect("cluster variable must exist");
898 var.set(VarInput::Flat(&cluster), false)
899 .expect("setting cluster must succeed");
900 }
901
902 pub fn set_local_transaction_isolation(&mut self, transaction_isolation: IsolationLevel) {
903 let var = self
904 .vars
905 .get_mut(UncasedStr::new(TRANSACTION_ISOLATION.name()))
906 .expect("transaction_isolation variable must exist");
907 var.set(VarInput::Flat(transaction_isolation.as_str()), true)
908 .expect("setting transaction isolation must succeed");
909 }
910
911 pub fn get_statement_logging_sample_rate(&self) -> Numeric {
912 *self.expect_value(&STATEMENT_LOGGING_SAMPLE_RATE)
913 }
914
915 pub fn emit_introspection_query_notice(&self) -> bool {
917 *self.expect_value(&EMIT_INTROSPECTION_QUERY_NOTICE)
918 }
919
920 pub fn unsafe_new_transaction_wall_time(&self) -> Option<CheckedTimestamp<DateTime<Utc>>> {
921 *self.expect_value(&UNSAFE_NEW_TRANSACTION_WALL_TIME)
922 }
923
924 pub fn welcome_message(&self) -> bool {
926 *self.expect_value(&WELCOME_MESSAGE)
927 }
928}
929
930pub const OLD_CATALOG_SERVER_CLUSTER: &str = "mz_introspection";
932pub const OLD_AUTO_ROUTE_CATALOG_QUERIES: &str = "auto_route_introspection_queries";
933
934fn compat_translate<'a, 'b>(name: &'a str, input: VarInput<'b>) -> (&'a str, VarInput<'b>) {
942 if name == CLUSTER.name() {
943 if let Ok(value) = CLUSTER.parse(input) {
944 if value.format() == OLD_CATALOG_SERVER_CLUSTER {
945 tracing::debug!(
946 github_27285 = true,
947 "encountered deprecated `cluster` variable value: {}",
948 OLD_CATALOG_SERVER_CLUSTER,
949 );
950 return (name, VarInput::Flat("mz_catalog_server"));
951 }
952 }
953 }
954
955 if name == OLD_AUTO_ROUTE_CATALOG_QUERIES {
956 tracing::debug!(
957 github_27285 = true,
958 "encountered deprecated `{}` variable name",
959 OLD_AUTO_ROUTE_CATALOG_QUERIES,
960 );
961 return (AUTO_ROUTE_CATALOG_QUERIES.name(), input);
962 }
963
964 (name, input)
965}
966
967fn compat_translate_name(name: &str) -> &str {
968 let (name, _) = compat_translate(name, VarInput::Flat(""));
969 name
970}
971
972#[derive(Debug)]
975pub struct SystemVar {
976 definition: VarDefinition,
977 persisted_value: Option<Box<dyn Value>>,
979 dynamic_default: Option<Box<dyn Value>>,
981}
982
983impl Clone for SystemVar {
984 fn clone(&self) -> Self {
985 SystemVar {
986 definition: self.definition.clone(),
987 persisted_value: self.persisted_value.as_ref().map(|v| v.box_clone()),
988 dynamic_default: self.dynamic_default.as_ref().map(|v| v.box_clone()),
989 }
990 }
991}
992
993impl SystemVar {
994 pub fn new(definition: VarDefinition) -> Self {
995 SystemVar {
996 definition,
997 persisted_value: None,
998 dynamic_default: None,
999 }
1000 }
1001
1002 fn is_default(&self, input: VarInput) -> Result<bool, VarError> {
1003 let v = self.definition.parse(input)?;
1004 Ok(self.definition.default_value() == v.as_ref())
1005 }
1006
1007 pub fn value_dyn(&self) -> &dyn Value {
1008 self.persisted_value
1009 .as_deref()
1010 .or(self.dynamic_default.as_deref())
1011 .unwrap_or_else(|| self.definition.default_value())
1012 }
1013
1014 pub fn value<V: 'static>(&self) -> &V {
1015 let val = self.value_dyn();
1016 val.as_any().downcast_ref::<V>().expect("success")
1017 }
1018
1019 fn parse(&self, input: VarInput) -> Result<Box<dyn Value>, VarError> {
1020 let v = self.definition.parse(input)?;
1021 self.validate_constraints(v.as_ref())?;
1023 Ok(v)
1024 }
1025
1026 fn set(&mut self, input: VarInput) -> Result<bool, VarError> {
1027 let v = self.parse(input)?;
1028
1029 if self.persisted_value.as_ref() != Some(&v) {
1030 self.persisted_value = Some(v);
1031 Ok(true)
1032 } else {
1033 Ok(false)
1034 }
1035 }
1036
1037 fn reset(&mut self) -> bool {
1038 if self.persisted_value.is_some() {
1039 self.persisted_value = None;
1040 true
1041 } else {
1042 false
1043 }
1044 }
1045
1046 fn set_default(&mut self, input: VarInput) -> Result<(), VarError> {
1047 let v = self.definition.parse(input)?;
1048 self.dynamic_default = Some(v);
1049 Ok(())
1050 }
1051
1052 fn validate_constraints(&self, val: &dyn Value) -> Result<(), VarError> {
1053 if let Some(constraint) = &self.definition.constraint {
1054 constraint.check_constraint(self, self.value_dyn(), val)
1055 } else {
1056 Ok(())
1057 }
1058 }
1059}
1060
1061impl Var for SystemVar {
1062 fn name(&self) -> &'static str {
1063 self.definition.name.as_str()
1064 }
1065
1066 fn value(&self) -> String {
1067 self.value_dyn().format()
1068 }
1069
1070 fn description(&self) -> &'static str {
1071 self.definition.description
1072 }
1073
1074 fn type_name(&self) -> Cow<'static, str> {
1075 self.definition.type_name()
1076 }
1077
1078 fn visible(&self, user: &User, system_vars: &SystemVars) -> Result<(), VarError> {
1079 self.definition.visible(user, system_vars)
1080 }
1081}
1082
1083#[derive(Debug, Error)]
1084pub enum NetworkPolicyError {
1085 #[error("Access denied for address {0}")]
1086 AddressDenied(IpAddr),
1087}
1088
1089#[derive(Derivative, Clone)]
1094#[derivative(Debug)]
1095pub struct SystemVars {
1096 allow_unsafe: bool,
1098 vars: BTreeMap<&'static UncasedStr, SystemVar>,
1100 #[derivative(Debug = "ignore")]
1102 callbacks: BTreeMap<String, Vec<Arc<dyn Fn(&SystemVars) + Send + Sync>>>,
1103
1104 dyncfgs: ConfigSet,
1108}
1109
1110impl Default for SystemVars {
1111 fn default() -> Self {
1112 Self::new()
1113 }
1114}
1115
1116impl SystemVars {
1117 pub fn new() -> Self {
1118 let system_vars = vec![
1119 &MAX_KAFKA_CONNECTIONS,
1120 &MAX_POSTGRES_CONNECTIONS,
1121 &MAX_MYSQL_CONNECTIONS,
1122 &MAX_SQL_SERVER_CONNECTIONS,
1123 &MAX_AWS_PRIVATELINK_CONNECTIONS,
1124 &MAX_TABLES,
1125 &MAX_SOURCES,
1126 &MAX_SINKS,
1127 &MAX_MATERIALIZED_VIEWS,
1128 &MAX_CLUSTERS,
1129 &MAX_REPLICAS_PER_CLUSTER,
1130 &MAX_CREDIT_CONSUMPTION_RATE,
1131 &MAX_DATABASES,
1132 &MAX_SCHEMAS_PER_DATABASE,
1133 &MAX_OBJECTS_PER_SCHEMA,
1134 &MAX_SECRETS,
1135 &MAX_ROLES,
1136 &MAX_NETWORK_POLICIES,
1137 &MAX_RULES_PER_NETWORK_POLICY,
1138 &MAX_RESULT_SIZE,
1139 &MAX_COPY_FROM_ROW_SIZE,
1140 &ALLOWED_CLUSTER_REPLICA_SIZES,
1141 &upsert_rocksdb::UPSERT_ROCKSDB_COMPACTION_STYLE,
1142 &upsert_rocksdb::UPSERT_ROCKSDB_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET,
1143 &upsert_rocksdb::UPSERT_ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES,
1144 &upsert_rocksdb::UPSERT_ROCKSDB_UNIVERSAL_COMPACTION_RATIO,
1145 &upsert_rocksdb::UPSERT_ROCKSDB_PARALLELISM,
1146 &upsert_rocksdb::UPSERT_ROCKSDB_COMPRESSION_TYPE,
1147 &upsert_rocksdb::UPSERT_ROCKSDB_BOTTOMMOST_COMPRESSION_TYPE,
1148 &upsert_rocksdb::UPSERT_ROCKSDB_BATCH_SIZE,
1149 &upsert_rocksdb::UPSERT_ROCKSDB_RETRY_DURATION,
1150 &upsert_rocksdb::UPSERT_ROCKSDB_STATS_LOG_INTERVAL_SECONDS,
1151 &upsert_rocksdb::UPSERT_ROCKSDB_STATS_PERSIST_INTERVAL_SECONDS,
1152 &upsert_rocksdb::UPSERT_ROCKSDB_POINT_LOOKUP_BLOCK_CACHE_SIZE_MB,
1153 &upsert_rocksdb::UPSERT_ROCKSDB_SHRINK_ALLOCATED_BUFFERS_BY_RATIO,
1154 &upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_CLUSTER_MEMORY_FRACTION,
1155 &upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_MEMORY_BYTES,
1156 &upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_ALLOW_STALL,
1157 &STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES,
1158 &STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_TO_CLUSTER_SIZE_FRACTION,
1159 &STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_DISK_ONLY,
1160 &STORAGE_STATISTICS_INTERVAL,
1161 &STORAGE_STATISTICS_COLLECTION_INTERVAL,
1162 &STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO,
1163 &STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS,
1164 &PERSIST_FAST_PATH_LIMIT,
1165 &METRICS_RETENTION,
1166 &UNSAFE_MOCK_AUDIT_EVENT_TIMESTAMP,
1167 &ENABLE_RBAC_CHECKS,
1168 &PG_SOURCE_CONNECT_TIMEOUT,
1169 &PG_SOURCE_TCP_KEEPALIVES_IDLE,
1170 &PG_SOURCE_TCP_KEEPALIVES_INTERVAL,
1171 &PG_SOURCE_TCP_KEEPALIVES_RETRIES,
1172 &PG_SOURCE_TCP_USER_TIMEOUT,
1173 &PG_SOURCE_TCP_CONFIGURE_SERVER,
1174 &PG_SOURCE_SNAPSHOT_STATEMENT_TIMEOUT,
1175 &PG_SOURCE_WAL_SENDER_TIMEOUT,
1176 &PG_SOURCE_SNAPSHOT_COLLECT_STRICT_COUNT,
1177 &MYSQL_SOURCE_TCP_KEEPALIVE,
1178 &MYSQL_SOURCE_SNAPSHOT_MAX_EXECUTION_TIME,
1179 &MYSQL_SOURCE_SNAPSHOT_LOCK_WAIT_TIMEOUT,
1180 &MYSQL_SOURCE_CONNECT_TIMEOUT,
1181 &SSH_CHECK_INTERVAL,
1182 &SSH_CONNECT_TIMEOUT,
1183 &SSH_KEEPALIVES_IDLE,
1184 &KAFKA_SOCKET_KEEPALIVE,
1185 &KAFKA_SOCKET_TIMEOUT,
1186 &KAFKA_TRANSACTION_TIMEOUT,
1187 &KAFKA_SOCKET_CONNECTION_SETUP_TIMEOUT,
1188 &KAFKA_FETCH_METADATA_TIMEOUT,
1189 &KAFKA_PROGRESS_RECORD_FETCH_TIMEOUT,
1190 &ENABLE_LAUNCHDARKLY,
1191 &MAX_CONNECTIONS,
1192 &NETWORK_POLICY,
1193 &SUPERUSER_RESERVED_CONNECTIONS,
1194 &KEEP_N_SOURCE_STATUS_HISTORY_ENTRIES,
1195 &KEEP_N_SINK_STATUS_HISTORY_ENTRIES,
1196 &KEEP_N_PRIVATELINK_STATUS_HISTORY_ENTRIES,
1197 &REPLICA_STATUS_HISTORY_RETENTION_WINDOW,
1198 &ENABLE_STORAGE_SHARD_FINALIZATION,
1199 &ENABLE_CONSOLIDATE_AFTER_UNION_NEGATE,
1200 &ENABLE_DEFAULT_CONNECTION_VALIDATION,
1201 &DEFAULT_TIMESTAMP_INTERVAL,
1202 &MIN_TIMESTAMP_INTERVAL,
1203 &MAX_TIMESTAMP_INTERVAL,
1204 &LOGGING_FILTER,
1205 &OPENTELEMETRY_FILTER,
1206 &LOGGING_FILTER_DEFAULTS,
1207 &OPENTELEMETRY_FILTER_DEFAULTS,
1208 &SENTRY_FILTERS,
1209 &WEBHOOKS_SECRETS_CACHING_TTL_SECS,
1210 &COORD_SLOW_MESSAGE_WARN_THRESHOLD,
1211 &grpc_client::CONNECT_TIMEOUT,
1212 &grpc_client::HTTP2_KEEP_ALIVE_INTERVAL,
1213 &grpc_client::HTTP2_KEEP_ALIVE_TIMEOUT,
1214 &cluster_scheduling::CLUSTER_MULTI_PROCESS_REPLICA_AZ_AFFINITY_WEIGHT,
1215 &cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY,
1216 &cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT,
1217 &cluster_scheduling::CLUSTER_ENABLE_TOPOLOGY_SPREAD,
1218 &cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE,
1219 &cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MAX_SKEW,
1220 &cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MIN_DOMAINS,
1221 &cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_SOFT,
1222 &cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY,
1223 &cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY_WEIGHT,
1224 &cluster_scheduling::CLUSTER_ALTER_CHECK_READY_INTERVAL,
1225 &cluster_scheduling::CLUSTER_CHECK_SCHEDULING_POLICIES_INTERVAL,
1226 &cluster_scheduling::CLUSTER_SECURITY_CONTEXT_ENABLED,
1227 &cluster_scheduling::CLUSTER_REFRESH_MV_COMPACTION_ESTIMATE,
1228 &grpc_client::HTTP2_KEEP_ALIVE_TIMEOUT,
1229 &STATEMENT_LOGGING_MAX_SAMPLE_RATE,
1230 &STATEMENT_LOGGING_DEFAULT_SAMPLE_RATE,
1231 &STATEMENT_LOGGING_TARGET_DATA_RATE,
1232 &STATEMENT_LOGGING_MAX_DATA_CREDIT,
1233 &ENABLE_INTERNAL_STATEMENT_LOGGING,
1234 &OPTIMIZER_STATS_TIMEOUT,
1235 &OPTIMIZER_ONESHOT_STATS_TIMEOUT,
1236 &PRIVATELINK_STATUS_UPDATE_QUOTA_PER_MINUTE,
1237 &WEBHOOK_CONCURRENT_REQUEST_LIMIT,
1238 &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_SIZE,
1239 &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_WAIT,
1240 &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL,
1241 &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL_STAGGER,
1242 &USER_STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION,
1243 &FORCE_SOURCE_TABLE_SYNTAX,
1244 &OPTIMIZER_E2E_LATENCY_WARNING_THRESHOLD,
1245 &SCRAM_ITERATIONS,
1246 ];
1247
1248 let dyncfgs = mz_dyncfgs::all_dyncfgs();
1249 let dyncfg_vars: Vec<_> = dyncfgs
1250 .entries()
1251 .map(|cfg| match cfg.default() {
1252 ConfigVal::Bool(default) => {
1253 VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
1254 }
1255 ConfigVal::U32(default) => {
1256 VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
1257 }
1258 ConfigVal::Usize(default) => {
1259 VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
1260 }
1261 ConfigVal::OptUsize(default) => {
1262 VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
1263 }
1264 ConfigVal::F64(default) => {
1265 VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
1266 }
1267 ConfigVal::String(default) => {
1268 VarDefinition::new_runtime(cfg.name(), default.clone(), cfg.desc(), false)
1269 }
1270 ConfigVal::OptString(default) => {
1271 VarDefinition::new_runtime(cfg.name(), default.clone(), cfg.desc(), false)
1272 }
1273 ConfigVal::Duration(default) => {
1274 VarDefinition::new_runtime(cfg.name(), default.clone(), cfg.desc(), false)
1275 }
1276 ConfigVal::Json(default) => {
1277 VarDefinition::new_runtime(cfg.name(), default.clone(), cfg.desc(), false)
1278 }
1279 })
1280 .collect();
1281
1282 let vars: BTreeMap<_, _> = system_vars
1283 .into_iter()
1284 .chain(definitions::FEATURE_FLAGS.iter().copied())
1286 .chain(SESSION_SYSTEM_VARS.values().copied())
1288 .cloned()
1289 .chain(dyncfg_vars)
1291 .map(|var| (var.name, SystemVar::new(var)))
1292 .collect();
1293
1294 let vars = SystemVars {
1295 vars,
1296 callbacks: BTreeMap::new(),
1297 allow_unsafe: false,
1298 dyncfgs,
1299 };
1300
1301 vars
1302 }
1303
1304 pub fn dyncfgs(&self) -> &ConfigSet {
1305 &self.dyncfgs
1306 }
1307
1308 pub fn set_unsafe(mut self, allow_unsafe: bool) -> Self {
1309 self.allow_unsafe = allow_unsafe;
1310 self
1311 }
1312
1313 pub fn allow_unsafe(&self) -> bool {
1314 self.allow_unsafe
1315 }
1316
1317 fn expect_value<V: 'static>(&self, var: &VarDefinition) -> &V {
1318 let val = self
1319 .vars
1320 .get(var.name)
1321 .expect("provided var should be in state");
1322
1323 val.value_dyn()
1324 .as_any()
1325 .downcast_ref::<V>()
1326 .expect("provided var type should matched stored var")
1327 }
1328
1329 fn expect_config_value<V: ConfigType + 'static>(&self, name: &UncasedStr) -> &V {
1330 let val = self
1331 .vars
1332 .get(name)
1333 .unwrap_or_else(|| panic!("provided var {name} should be in state"));
1334
1335 val.value_dyn()
1336 .as_any()
1337 .downcast_ref()
1338 .expect("provided var type should matched stored var")
1339 }
1340
1341 pub fn reset_all(&mut self) {
1344 for (_, var) in &mut self.vars {
1345 var.reset();
1346 }
1347 }
1348
1349 pub fn iter(&self) -> impl Iterator<Item = &dyn Var> {
1352 self.vars
1353 .values()
1354 .map(|v| v.as_var())
1355 .filter(|v| !SESSION_SYSTEM_VARS.contains_key(UncasedStr::new(v.name())))
1356 }
1357
1358 pub fn iter_synced(&self) -> impl Iterator<Item = &dyn Var> {
1362 self.iter().filter(|v| v.name() != ENABLE_LAUNCHDARKLY.name)
1363 }
1364
1365 pub fn iter_session(&self) -> impl Iterator<Item = &dyn Var> {
1367 self.vars
1368 .values()
1369 .map(|v| v.as_var())
1370 .filter(|v| SESSION_SYSTEM_VARS.contains_key(UncasedStr::new(v.name())))
1371 }
1372
1373 pub fn user_modifiable(&self, name: &str) -> bool {
1375 SESSION_SYSTEM_VARS.contains_key(UncasedStr::new(name))
1376 || name == ENABLE_RBAC_CHECKS.name()
1377 || name == NETWORK_POLICY.name()
1378 }
1379
1380 pub fn get(&self, name: &str) -> Result<&dyn Var, VarError> {
1401 self.vars
1402 .get(UncasedStr::new(name))
1403 .map(|v| v.as_var())
1404 .ok_or_else(|| VarError::UnknownParameter(name.into()))
1405 }
1406
1407 pub fn is_default(&self, name: &str, input: VarInput) -> Result<bool, VarError> {
1421 self.vars
1422 .get(UncasedStr::new(name))
1423 .ok_or_else(|| VarError::UnknownParameter(name.into()))
1424 .and_then(|v| v.is_default(input))
1425 }
1426
1427 pub fn set(&mut self, name: &str, input: VarInput) -> Result<bool, VarError> {
1450 let result = self
1451 .vars
1452 .get_mut(UncasedStr::new(name))
1453 .ok_or_else(|| VarError::UnknownParameter(name.into()))
1454 .and_then(|v| v.set(input))?;
1455 self.notify_callbacks(name);
1456 Ok(result)
1457 }
1458
1459 pub fn parse(&self, name: &str, input: VarInput) -> Result<Box<dyn Value>, VarError> {
1480 self.vars
1481 .get(UncasedStr::new(name))
1482 .ok_or_else(|| VarError::UnknownParameter(name.into()))
1483 .and_then(|v| v.parse(input))
1484 }
1485
1486 pub fn set_default(&mut self, name: &str, input: VarInput) -> Result<(), VarError> {
1494 self.vars
1495 .get_mut(UncasedStr::new(name))
1496 .ok_or_else(|| VarError::UnknownParameter(name.into()))
1497 .and_then(|v| v.set_default(input))?;
1498 self.notify_callbacks(name);
1499 Ok(())
1500 }
1501
1502 pub fn reset(&mut self, name: &str) -> Result<bool, VarError> {
1520 let result = self
1521 .vars
1522 .get_mut(UncasedStr::new(name))
1523 .ok_or_else(|| VarError::UnknownParameter(name.into()))
1524 .map(|v| v.reset())?;
1525 self.notify_callbacks(name);
1526 Ok(result)
1527 }
1528
1529 pub fn defaults(&self) -> BTreeMap<String, String> {
1531 self.vars
1532 .iter()
1533 .map(|(name, var)| {
1534 let default = var
1535 .dynamic_default
1536 .as_deref()
1537 .unwrap_or_else(|| var.definition.default_value());
1538 (name.as_str().to_owned(), default.format())
1539 })
1540 .collect()
1541 }
1542
1543 pub fn register_callback(
1548 &mut self,
1549 var: &VarDefinition,
1550 callback: Arc<dyn Fn(&SystemVars) + Send + Sync>,
1551 ) {
1552 self.callbacks
1553 .entry(var.name().to_string())
1554 .or_default()
1555 .push(callback);
1556 self.notify_callbacks(var.name());
1557 }
1558
1559 fn notify_callbacks(&self, name: &str) {
1561 if let Some(callbacks) = self.callbacks.get(name) {
1563 for callback in callbacks {
1564 (callback)(self);
1565 }
1566 }
1567 }
1568
1569 pub fn default_cluster(&self) -> String {
1572 self.expect_value::<String>(&CLUSTER).to_owned()
1573 }
1574
1575 pub fn max_kafka_connections(&self) -> u32 {
1577 *self.expect_value(&MAX_KAFKA_CONNECTIONS)
1578 }
1579
1580 pub fn max_postgres_connections(&self) -> u32 {
1582 *self.expect_value(&MAX_POSTGRES_CONNECTIONS)
1583 }
1584
1585 pub fn max_mysql_connections(&self) -> u32 {
1587 *self.expect_value(&MAX_MYSQL_CONNECTIONS)
1588 }
1589
1590 pub fn max_sql_server_connections(&self) -> u32 {
1592 *self.expect_value(&MAX_SQL_SERVER_CONNECTIONS)
1593 }
1594
1595 pub fn max_aws_privatelink_connections(&self) -> u32 {
1597 *self.expect_value(&MAX_AWS_PRIVATELINK_CONNECTIONS)
1598 }
1599
1600 pub fn max_tables(&self) -> u32 {
1602 *self.expect_value(&MAX_TABLES)
1603 }
1604
1605 pub fn max_sources(&self) -> u32 {
1607 *self.expect_value(&MAX_SOURCES)
1608 }
1609
1610 pub fn max_sinks(&self) -> u32 {
1612 *self.expect_value(&MAX_SINKS)
1613 }
1614
1615 pub fn max_materialized_views(&self) -> u32 {
1617 *self.expect_value(&MAX_MATERIALIZED_VIEWS)
1618 }
1619
1620 pub fn max_clusters(&self) -> u32 {
1622 *self.expect_value(&MAX_CLUSTERS)
1623 }
1624
1625 pub fn max_replicas_per_cluster(&self) -> u32 {
1627 *self.expect_value(&MAX_REPLICAS_PER_CLUSTER)
1628 }
1629
1630 pub fn max_credit_consumption_rate(&self) -> Numeric {
1632 *self.expect_value(&MAX_CREDIT_CONSUMPTION_RATE)
1633 }
1634
1635 pub fn max_databases(&self) -> u32 {
1637 *self.expect_value(&MAX_DATABASES)
1638 }
1639
1640 pub fn max_schemas_per_database(&self) -> u32 {
1642 *self.expect_value(&MAX_SCHEMAS_PER_DATABASE)
1643 }
1644
1645 pub fn max_objects_per_schema(&self) -> u32 {
1647 *self.expect_value(&MAX_OBJECTS_PER_SCHEMA)
1648 }
1649
1650 pub fn max_secrets(&self) -> u32 {
1652 *self.expect_value(&MAX_SECRETS)
1653 }
1654
1655 pub fn max_roles(&self) -> u32 {
1657 *self.expect_value(&MAX_ROLES)
1658 }
1659
1660 pub fn max_network_policies(&self) -> u32 {
1662 *self.expect_value(&MAX_NETWORK_POLICIES)
1663 }
1664
1665 pub fn max_rules_per_network_policy(&self) -> u32 {
1667 *self.expect_value(&MAX_RULES_PER_NETWORK_POLICY)
1668 }
1669
1670 pub fn max_result_size(&self) -> u64 {
1672 self.expect_value::<ByteSize>(&MAX_RESULT_SIZE).as_bytes()
1673 }
1674
1675 pub fn max_copy_from_row_size(&self) -> u64 {
1677 self.expect_value::<ByteSize>(&MAX_COPY_FROM_ROW_SIZE)
1678 .as_bytes()
1679 }
1680
1681 pub fn allowed_cluster_replica_sizes(&self) -> Vec<String> {
1683 self.expect_value::<Vec<Ident>>(&ALLOWED_CLUSTER_REPLICA_SIZES)
1684 .into_iter()
1685 .map(|s| s.as_str().into())
1686 .collect()
1687 }
1688
1689 pub fn default_cluster_replication_factor(&self) -> u32 {
1691 *self.expect_value::<u32>(&DEFAULT_CLUSTER_REPLICATION_FACTOR)
1692 }
1693
1694 pub fn upsert_rocksdb_compaction_style(&self) -> mz_rocksdb_types::config::CompactionStyle {
1695 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_COMPACTION_STYLE)
1696 }
1697
1698 pub fn upsert_rocksdb_optimize_compaction_memtable_budget(&self) -> usize {
1699 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET)
1700 }
1701
1702 pub fn upsert_rocksdb_level_compaction_dynamic_level_bytes(&self) -> bool {
1703 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES)
1704 }
1705
1706 pub fn upsert_rocksdb_universal_compaction_ratio(&self) -> i32 {
1707 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_UNIVERSAL_COMPACTION_RATIO)
1708 }
1709
1710 pub fn upsert_rocksdb_parallelism(&self) -> Option<i32> {
1711 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_PARALLELISM)
1712 }
1713
1714 pub fn upsert_rocksdb_compression_type(&self) -> mz_rocksdb_types::config::CompressionType {
1715 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_COMPRESSION_TYPE)
1716 }
1717
1718 pub fn upsert_rocksdb_bottommost_compression_type(
1719 &self,
1720 ) -> mz_rocksdb_types::config::CompressionType {
1721 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_BOTTOMMOST_COMPRESSION_TYPE)
1722 }
1723
1724 pub fn upsert_rocksdb_batch_size(&self) -> usize {
1725 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_BATCH_SIZE)
1726 }
1727
1728 pub fn upsert_rocksdb_retry_duration(&self) -> Duration {
1729 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_RETRY_DURATION)
1730 }
1731
1732 pub fn upsert_rocksdb_stats_log_interval_seconds(&self) -> u32 {
1733 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_STATS_LOG_INTERVAL_SECONDS)
1734 }
1735
1736 pub fn upsert_rocksdb_stats_persist_interval_seconds(&self) -> u32 {
1737 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_STATS_PERSIST_INTERVAL_SECONDS)
1738 }
1739
1740 pub fn upsert_rocksdb_point_lookup_block_cache_size_mb(&self) -> Option<u32> {
1741 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_POINT_LOOKUP_BLOCK_CACHE_SIZE_MB)
1742 }
1743
1744 pub fn upsert_rocksdb_shrink_allocated_buffers_by_ratio(&self) -> usize {
1745 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_SHRINK_ALLOCATED_BUFFERS_BY_RATIO)
1746 }
1747
1748 pub fn upsert_rocksdb_write_buffer_manager_cluster_memory_fraction(&self) -> Option<Numeric> {
1749 *self.expect_value(
1750 &upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_CLUSTER_MEMORY_FRACTION,
1751 )
1752 }
1753
1754 pub fn upsert_rocksdb_write_buffer_manager_memory_bytes(&self) -> Option<usize> {
1755 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_MEMORY_BYTES)
1756 }
1757
1758 pub fn upsert_rocksdb_write_buffer_manager_allow_stall(&self) -> bool {
1759 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_ALLOW_STALL)
1760 }
1761
1762 pub fn persist_fast_path_limit(&self) -> usize {
1763 *self.expect_value(&PERSIST_FAST_PATH_LIMIT)
1764 }
1765
1766 pub fn pg_source_connect_timeout(&self) -> Duration {
1768 *self.expect_value(&PG_SOURCE_CONNECT_TIMEOUT)
1769 }
1770
1771 pub fn pg_source_tcp_keepalives_retries(&self) -> u32 {
1773 *self.expect_value(&PG_SOURCE_TCP_KEEPALIVES_RETRIES)
1774 }
1775
1776 pub fn pg_source_tcp_keepalives_idle(&self) -> Duration {
1778 *self.expect_value(&PG_SOURCE_TCP_KEEPALIVES_IDLE)
1779 }
1780
1781 pub fn pg_source_tcp_keepalives_interval(&self) -> Duration {
1783 *self.expect_value(&PG_SOURCE_TCP_KEEPALIVES_INTERVAL)
1784 }
1785
1786 pub fn pg_source_tcp_user_timeout(&self) -> Duration {
1788 *self.expect_value(&PG_SOURCE_TCP_USER_TIMEOUT)
1789 }
1790
1791 pub fn pg_source_tcp_configure_server(&self) -> bool {
1793 *self.expect_value(&PG_SOURCE_TCP_CONFIGURE_SERVER)
1794 }
1795
1796 pub fn pg_source_snapshot_statement_timeout(&self) -> Duration {
1798 *self.expect_value(&PG_SOURCE_SNAPSHOT_STATEMENT_TIMEOUT)
1799 }
1800
1801 pub fn pg_source_wal_sender_timeout(&self) -> Option<Duration> {
1803 *self.expect_value(&PG_SOURCE_WAL_SENDER_TIMEOUT)
1804 }
1805
1806 pub fn pg_source_snapshot_collect_strict_count(&self) -> bool {
1808 *self.expect_value(&PG_SOURCE_SNAPSHOT_COLLECT_STRICT_COUNT)
1809 }
1810
1811 pub fn mysql_source_tcp_keepalive(&self) -> Duration {
1813 *self.expect_value(&MYSQL_SOURCE_TCP_KEEPALIVE)
1814 }
1815
1816 pub fn mysql_source_snapshot_max_execution_time(&self) -> Duration {
1818 *self.expect_value(&MYSQL_SOURCE_SNAPSHOT_MAX_EXECUTION_TIME)
1819 }
1820
1821 pub fn mysql_source_snapshot_lock_wait_timeout(&self) -> Duration {
1823 *self.expect_value(&MYSQL_SOURCE_SNAPSHOT_LOCK_WAIT_TIMEOUT)
1824 }
1825
1826 pub fn mysql_source_connect_timeout(&self) -> Duration {
1828 *self.expect_value(&MYSQL_SOURCE_CONNECT_TIMEOUT)
1829 }
1830
1831 pub fn ssh_check_interval(&self) -> Duration {
1833 *self.expect_value(&SSH_CHECK_INTERVAL)
1834 }
1835
1836 pub fn ssh_connect_timeout(&self) -> Duration {
1838 *self.expect_value(&SSH_CONNECT_TIMEOUT)
1839 }
1840
1841 pub fn ssh_keepalives_idle(&self) -> Duration {
1843 *self.expect_value(&SSH_KEEPALIVES_IDLE)
1844 }
1845
1846 pub fn kafka_socket_keepalive(&self) -> bool {
1848 *self.expect_value(&KAFKA_SOCKET_KEEPALIVE)
1849 }
1850
1851 pub fn kafka_socket_timeout(&self) -> Option<Duration> {
1853 *self.expect_value(&KAFKA_SOCKET_TIMEOUT)
1854 }
1855
1856 pub fn kafka_transaction_timeout(&self) -> Duration {
1858 *self.expect_value(&KAFKA_TRANSACTION_TIMEOUT)
1859 }
1860
1861 pub fn kafka_socket_connection_setup_timeout(&self) -> Duration {
1863 *self.expect_value(&KAFKA_SOCKET_CONNECTION_SETUP_TIMEOUT)
1864 }
1865
1866 pub fn kafka_fetch_metadata_timeout(&self) -> Duration {
1868 *self.expect_value(&KAFKA_FETCH_METADATA_TIMEOUT)
1869 }
1870
1871 pub fn kafka_progress_record_fetch_timeout(&self) -> Option<Duration> {
1873 *self.expect_value(&KAFKA_PROGRESS_RECORD_FETCH_TIMEOUT)
1874 }
1875
1876 pub fn crdb_connect_timeout(&self) -> Duration {
1878 *self.expect_config_value(UncasedStr::new(
1879 mz_persist_client::cfg::CRDB_CONNECT_TIMEOUT.name(),
1880 ))
1881 }
1882
1883 pub fn crdb_tcp_user_timeout(&self) -> Duration {
1885 *self.expect_config_value(UncasedStr::new(
1886 mz_persist_client::cfg::CRDB_TCP_USER_TIMEOUT.name(),
1887 ))
1888 }
1889
1890 pub fn crdb_keepalives_idle(&self) -> Duration {
1892 *self.expect_config_value(UncasedStr::new(
1893 mz_persist_client::cfg::CRDB_KEEPALIVES_IDLE.name(),
1894 ))
1895 }
1896
1897 pub fn crdb_keepalives_interval(&self) -> Duration {
1899 *self.expect_config_value(UncasedStr::new(
1900 mz_persist_client::cfg::CRDB_KEEPALIVES_INTERVAL.name(),
1901 ))
1902 }
1903
1904 pub fn crdb_keepalives_retries(&self) -> u32 {
1906 *self.expect_config_value(UncasedStr::new(
1907 mz_persist_client::cfg::CRDB_KEEPALIVES_RETRIES.name(),
1908 ))
1909 }
1910
1911 pub fn storage_dataflow_max_inflight_bytes(&self) -> Option<usize> {
1913 *self.expect_value(&STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES)
1914 }
1915
1916 pub fn storage_dataflow_max_inflight_bytes_to_cluster_size_fraction(&self) -> Option<Numeric> {
1918 *self.expect_value(&STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_TO_CLUSTER_SIZE_FRACTION)
1919 }
1920
1921 pub fn storage_shrink_upsert_unused_buffers_by_ratio(&self) -> usize {
1923 *self.expect_value(&STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO)
1924 }
1925
1926 pub fn storage_dataflow_max_inflight_bytes_disk_only(&self) -> bool {
1928 *self.expect_value(&STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_DISK_ONLY)
1929 }
1930
1931 pub fn storage_statistics_interval(&self) -> Duration {
1933 *self.expect_value(&STORAGE_STATISTICS_INTERVAL)
1934 }
1935
1936 pub fn storage_statistics_collection_interval(&self) -> Duration {
1938 *self.expect_value(&STORAGE_STATISTICS_COLLECTION_INTERVAL)
1939 }
1940
1941 pub fn storage_record_source_sink_namespaced_errors(&self) -> bool {
1943 *self.expect_value(&STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS)
1944 }
1945
1946 pub fn persist_stats_filter_enabled(&self) -> bool {
1948 *self.expect_config_value(UncasedStr::new(
1949 mz_persist_client::stats::STATS_FILTER_ENABLED.name(),
1950 ))
1951 }
1952
1953 pub fn scram_iterations(&self) -> NonZeroU32 {
1954 *self.expect_value(&SCRAM_ITERATIONS)
1955 }
1956
1957 pub fn dyncfg_updates(&self) -> ConfigUpdates {
1958 let mut updates = ConfigUpdates::default();
1959 for entry in self.dyncfgs.entries() {
1960 let name = UncasedStr::new(entry.name());
1961 let val = match entry.val() {
1962 ConfigVal::Bool(_) => ConfigVal::from(*self.expect_config_value::<bool>(name)),
1963 ConfigVal::U32(_) => ConfigVal::from(*self.expect_config_value::<u32>(name)),
1964 ConfigVal::Usize(_) => ConfigVal::from(*self.expect_config_value::<usize>(name)),
1965 ConfigVal::OptUsize(_) => {
1966 ConfigVal::from(*self.expect_config_value::<Option<usize>>(name))
1967 }
1968 ConfigVal::F64(_) => ConfigVal::from(*self.expect_config_value::<f64>(name)),
1969 ConfigVal::String(_) => {
1970 ConfigVal::from(self.expect_config_value::<String>(name).clone())
1971 }
1972 ConfigVal::OptString(_) => {
1973 ConfigVal::from(self.expect_config_value::<Option<String>>(name).clone())
1974 }
1975 ConfigVal::Duration(_) => {
1976 ConfigVal::from(*self.expect_config_value::<Duration>(name))
1977 }
1978 ConfigVal::Json(_) => {
1979 ConfigVal::from(self.expect_config_value::<serde_json::Value>(name).clone())
1980 }
1981 };
1982 updates.add_dynamic(entry.name(), val);
1983 }
1984 updates.apply(&self.dyncfgs);
1985 updates
1986 }
1987
1988 pub fn metrics_retention(&self) -> Duration {
1990 *self.expect_value(&METRICS_RETENTION)
1991 }
1992
1993 pub fn unsafe_mock_audit_event_timestamp(&self) -> Option<mz_repr::Timestamp> {
1995 *self.expect_value(&UNSAFE_MOCK_AUDIT_EVENT_TIMESTAMP)
1996 }
1997
1998 pub fn enable_rbac_checks(&self) -> bool {
2000 *self.expect_value(&ENABLE_RBAC_CHECKS)
2001 }
2002
2003 pub fn max_connections(&self) -> u32 {
2005 *self.expect_value(&MAX_CONNECTIONS)
2006 }
2007
2008 pub fn default_network_policy_name(&self) -> String {
2009 self.expect_value::<String>(&NETWORK_POLICY).clone()
2010 }
2011
2012 pub fn superuser_reserved_connections(&self) -> u32 {
2014 *self.expect_value(&SUPERUSER_RESERVED_CONNECTIONS)
2015 }
2016
2017 pub fn keep_n_source_status_history_entries(&self) -> usize {
2018 *self.expect_value(&KEEP_N_SOURCE_STATUS_HISTORY_ENTRIES)
2019 }
2020
2021 pub fn keep_n_sink_status_history_entries(&self) -> usize {
2022 *self.expect_value(&KEEP_N_SINK_STATUS_HISTORY_ENTRIES)
2023 }
2024
2025 pub fn keep_n_privatelink_status_history_entries(&self) -> usize {
2026 *self.expect_value(&KEEP_N_PRIVATELINK_STATUS_HISTORY_ENTRIES)
2027 }
2028
2029 pub fn replica_status_history_retention_window(&self) -> Duration {
2030 *self.expect_value(&REPLICA_STATUS_HISTORY_RETENTION_WINDOW)
2031 }
2032
2033 pub fn enable_storage_shard_finalization(&self) -> bool {
2035 *self.expect_value(&ENABLE_STORAGE_SHARD_FINALIZATION)
2036 }
2037
2038 pub fn enable_consolidate_after_union_negate(&self) -> bool {
2039 *self.expect_value(&ENABLE_CONSOLIDATE_AFTER_UNION_NEGATE)
2040 }
2041
2042 pub fn enable_default_connection_validation(&self) -> bool {
2044 *self.expect_value(&ENABLE_DEFAULT_CONNECTION_VALIDATION)
2045 }
2046
2047 pub fn default_timestamp_interval(&self) -> Duration {
2049 *self.expect_value(&DEFAULT_TIMESTAMP_INTERVAL)
2050 }
2051
2052 pub fn min_timestamp_interval(&self) -> Duration {
2054 *self.expect_value(&MIN_TIMESTAMP_INTERVAL)
2055 }
2056 pub fn max_timestamp_interval(&self) -> Duration {
2058 *self.expect_value(&MAX_TIMESTAMP_INTERVAL)
2059 }
2060
2061 pub fn logging_filter(&self) -> CloneableEnvFilter {
2062 self.expect_value::<CloneableEnvFilter>(&LOGGING_FILTER)
2063 .clone()
2064 }
2065
2066 pub fn opentelemetry_filter(&self) -> CloneableEnvFilter {
2067 self.expect_value::<CloneableEnvFilter>(&OPENTELEMETRY_FILTER)
2068 .clone()
2069 }
2070
2071 pub fn logging_filter_defaults(&self) -> Vec<SerializableDirective> {
2072 self.expect_value::<Vec<SerializableDirective>>(&LOGGING_FILTER_DEFAULTS)
2073 .clone()
2074 }
2075
2076 pub fn opentelemetry_filter_defaults(&self) -> Vec<SerializableDirective> {
2077 self.expect_value::<Vec<SerializableDirective>>(&OPENTELEMETRY_FILTER_DEFAULTS)
2078 .clone()
2079 }
2080
2081 pub fn sentry_filters(&self) -> Vec<SerializableDirective> {
2082 self.expect_value::<Vec<SerializableDirective>>(&SENTRY_FILTERS)
2083 .clone()
2084 }
2085
2086 pub fn webhooks_secrets_caching_ttl_secs(&self) -> usize {
2087 *self.expect_value(&WEBHOOKS_SECRETS_CACHING_TTL_SECS)
2088 }
2089
2090 pub fn coord_slow_message_warn_threshold(&self) -> Duration {
2091 *self.expect_value(&COORD_SLOW_MESSAGE_WARN_THRESHOLD)
2092 }
2093
2094 pub fn grpc_client_http2_keep_alive_interval(&self) -> Duration {
2095 *self.expect_value(&grpc_client::HTTP2_KEEP_ALIVE_INTERVAL)
2096 }
2097
2098 pub fn grpc_client_http2_keep_alive_timeout(&self) -> Duration {
2099 *self.expect_value(&grpc_client::HTTP2_KEEP_ALIVE_TIMEOUT)
2100 }
2101
2102 pub fn grpc_connect_timeout(&self) -> Duration {
2103 *self.expect_value(&grpc_client::CONNECT_TIMEOUT)
2104 }
2105
2106 pub fn cluster_multi_process_replica_az_affinity_weight(&self) -> Option<i32> {
2107 *self.expect_value(&cluster_scheduling::CLUSTER_MULTI_PROCESS_REPLICA_AZ_AFFINITY_WEIGHT)
2108 }
2109
2110 pub fn cluster_soften_replication_anti_affinity(&self) -> bool {
2111 *self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY)
2112 }
2113
2114 pub fn cluster_soften_replication_anti_affinity_weight(&self) -> i32 {
2115 *self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT)
2116 }
2117
2118 pub fn cluster_enable_topology_spread(&self) -> bool {
2119 *self.expect_value(&cluster_scheduling::CLUSTER_ENABLE_TOPOLOGY_SPREAD)
2120 }
2121
2122 pub fn cluster_topology_spread_ignore_non_singular_scale(&self) -> bool {
2123 *self.expect_value(&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE)
2124 }
2125
2126 pub fn cluster_topology_spread_max_skew(&self) -> i32 {
2127 *self.expect_value(&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MAX_SKEW)
2128 }
2129
2130 pub fn cluster_topology_spread_set_min_domains(&self) -> Option<i32> {
2131 *self.expect_value(&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MIN_DOMAINS)
2132 }
2133
2134 pub fn cluster_topology_spread_soft(&self) -> bool {
2135 *self.expect_value(&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_SOFT)
2136 }
2137
2138 pub fn cluster_soften_az_affinity(&self) -> bool {
2139 *self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY)
2140 }
2141
2142 pub fn cluster_soften_az_affinity_weight(&self) -> i32 {
2143 *self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY_WEIGHT)
2144 }
2145
2146 pub fn cluster_alter_check_ready_interval(&self) -> Duration {
2147 *self.expect_value(&cluster_scheduling::CLUSTER_ALTER_CHECK_READY_INTERVAL)
2148 }
2149
2150 pub fn cluster_check_scheduling_policies_interval(&self) -> Duration {
2151 *self.expect_value(&cluster_scheduling::CLUSTER_CHECK_SCHEDULING_POLICIES_INTERVAL)
2152 }
2153
2154 pub fn cluster_security_context_enabled(&self) -> bool {
2155 *self.expect_value(&cluster_scheduling::CLUSTER_SECURITY_CONTEXT_ENABLED)
2156 }
2157
2158 pub fn cluster_refresh_mv_compaction_estimate(&self) -> Duration {
2159 *self.expect_value(&cluster_scheduling::CLUSTER_REFRESH_MV_COMPACTION_ESTIMATE)
2160 }
2161
2162 pub fn privatelink_status_update_quota_per_minute(&self) -> u32 {
2164 *self.expect_value(&PRIVATELINK_STATUS_UPDATE_QUOTA_PER_MINUTE)
2165 }
2166
2167 pub fn statement_logging_target_data_rate(&self) -> Option<usize> {
2168 *self.expect_value(&STATEMENT_LOGGING_TARGET_DATA_RATE)
2169 }
2170
2171 pub fn statement_logging_max_data_credit(&self) -> Option<usize> {
2172 *self.expect_value(&STATEMENT_LOGGING_MAX_DATA_CREDIT)
2173 }
2174
2175 pub fn statement_logging_max_sample_rate(&self) -> Numeric {
2177 *self.expect_value(&STATEMENT_LOGGING_MAX_SAMPLE_RATE)
2178 }
2179
2180 pub fn statement_logging_default_sample_rate(&self) -> Numeric {
2182 *self.expect_value(&STATEMENT_LOGGING_DEFAULT_SAMPLE_RATE)
2183 }
2184
2185 pub fn enable_internal_statement_logging(&self) -> bool {
2187 *self.expect_value(&ENABLE_INTERNAL_STATEMENT_LOGGING)
2188 }
2189
2190 pub fn optimizer_stats_timeout(&self) -> Duration {
2192 *self.expect_value(&OPTIMIZER_STATS_TIMEOUT)
2193 }
2194
2195 pub fn optimizer_oneshot_stats_timeout(&self) -> Duration {
2197 *self.expect_value(&OPTIMIZER_ONESHOT_STATS_TIMEOUT)
2198 }
2199
2200 pub fn webhook_concurrent_request_limit(&self) -> usize {
2202 *self.expect_value(&WEBHOOK_CONCURRENT_REQUEST_LIMIT)
2203 }
2204
2205 pub fn pg_timestamp_oracle_connection_pool_max_size(&self) -> usize {
2207 *self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_SIZE)
2208 }
2209
2210 pub fn pg_timestamp_oracle_connection_pool_max_wait(&self) -> Option<Duration> {
2212 *self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_WAIT)
2213 }
2214
2215 pub fn pg_timestamp_oracle_connection_pool_ttl(&self) -> Duration {
2217 *self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL)
2218 }
2219
2220 pub fn pg_timestamp_oracle_connection_pool_ttl_stagger(&self) -> Duration {
2222 *self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL_STAGGER)
2223 }
2224
2225 pub fn user_storage_managed_collections_batch_duration(&self) -> Duration {
2227 *self.expect_value(&USER_STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION)
2228 }
2229
2230 pub fn force_source_table_syntax(&self) -> bool {
2231 *self.expect_value(&FORCE_SOURCE_TABLE_SYNTAX)
2232 }
2233
2234 pub fn optimizer_e2e_latency_warning_threshold(&self) -> Duration {
2235 *self.expect_value(&OPTIMIZER_E2E_LATENCY_WARNING_THRESHOLD)
2236 }
2237
2238 pub fn is_controller_config_var(&self, name: &str) -> bool {
2240 self.is_dyncfg_var(name)
2241 }
2242
2243 pub fn is_compute_config_var(&self, name: &str) -> bool {
2247 name == MAX_RESULT_SIZE.name() || self.is_dyncfg_var(name) || is_tracing_var(name)
2248 }
2249
2250 pub fn is_metrics_config_var(&self, name: &str) -> bool {
2252 self.is_dyncfg_var(name)
2253 }
2254
2255 pub fn is_storage_config_var(&self, name: &str) -> bool {
2257 name == PG_SOURCE_CONNECT_TIMEOUT.name()
2258 || name == PG_SOURCE_TCP_KEEPALIVES_IDLE.name()
2259 || name == PG_SOURCE_TCP_KEEPALIVES_INTERVAL.name()
2260 || name == PG_SOURCE_TCP_KEEPALIVES_RETRIES.name()
2261 || name == PG_SOURCE_TCP_USER_TIMEOUT.name()
2262 || name == PG_SOURCE_TCP_CONFIGURE_SERVER.name()
2263 || name == PG_SOURCE_SNAPSHOT_STATEMENT_TIMEOUT.name()
2264 || name == PG_SOURCE_WAL_SENDER_TIMEOUT.name()
2265 || name == PG_SOURCE_SNAPSHOT_COLLECT_STRICT_COUNT.name()
2266 || name == MYSQL_SOURCE_TCP_KEEPALIVE.name()
2267 || name == MYSQL_SOURCE_SNAPSHOT_MAX_EXECUTION_TIME.name()
2268 || name == MYSQL_SOURCE_SNAPSHOT_LOCK_WAIT_TIMEOUT.name()
2269 || name == MYSQL_SOURCE_CONNECT_TIMEOUT.name()
2270 || name == ENABLE_STORAGE_SHARD_FINALIZATION.name()
2271 || name == SSH_CHECK_INTERVAL.name()
2272 || name == SSH_CONNECT_TIMEOUT.name()
2273 || name == SSH_KEEPALIVES_IDLE.name()
2274 || name == KAFKA_SOCKET_KEEPALIVE.name()
2275 || name == KAFKA_SOCKET_TIMEOUT.name()
2276 || name == KAFKA_TRANSACTION_TIMEOUT.name()
2277 || name == KAFKA_SOCKET_CONNECTION_SETUP_TIMEOUT.name()
2278 || name == KAFKA_FETCH_METADATA_TIMEOUT.name()
2279 || name == KAFKA_PROGRESS_RECORD_FETCH_TIMEOUT.name()
2280 || name == STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES.name()
2281 || name == STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_TO_CLUSTER_SIZE_FRACTION.name()
2282 || name == STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_DISK_ONLY.name()
2283 || name == STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO.name()
2284 || name == STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS.name()
2285 || name == STORAGE_STATISTICS_INTERVAL.name()
2286 || name == STORAGE_STATISTICS_COLLECTION_INTERVAL.name()
2287 || name == USER_STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION.name()
2288 || is_upsert_rocksdb_config_var(name)
2289 || self.is_dyncfg_var(name)
2290 || is_tracing_var(name)
2291 }
2292
2293 fn is_dyncfg_var(&self, name: &str) -> bool {
2295 self.dyncfgs.entries().any(|e| name == e.name())
2296 }
2297}
2298
2299pub fn is_tracing_var(name: &str) -> bool {
2300 name == LOGGING_FILTER.name()
2301 || name == LOGGING_FILTER_DEFAULTS.name()
2302 || name == OPENTELEMETRY_FILTER.name()
2303 || name == OPENTELEMETRY_FILTER_DEFAULTS.name()
2304 || name == SENTRY_FILTERS.name()
2305}
2306
2307pub fn is_secrets_caching_var(name: &str) -> bool {
2309 name == WEBHOOKS_SECRETS_CACHING_TTL_SECS.name()
2310}
2311
2312fn is_upsert_rocksdb_config_var(name: &str) -> bool {
2313 name == upsert_rocksdb::UPSERT_ROCKSDB_COMPACTION_STYLE.name()
2314 || name == upsert_rocksdb::UPSERT_ROCKSDB_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET.name()
2315 || name == upsert_rocksdb::UPSERT_ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES.name()
2316 || name == upsert_rocksdb::UPSERT_ROCKSDB_UNIVERSAL_COMPACTION_RATIO.name()
2317 || name == upsert_rocksdb::UPSERT_ROCKSDB_PARALLELISM.name()
2318 || name == upsert_rocksdb::UPSERT_ROCKSDB_COMPRESSION_TYPE.name()
2319 || name == upsert_rocksdb::UPSERT_ROCKSDB_BOTTOMMOST_COMPRESSION_TYPE.name()
2320 || name == upsert_rocksdb::UPSERT_ROCKSDB_BATCH_SIZE.name()
2321 || name == upsert_rocksdb::UPSERT_ROCKSDB_STATS_LOG_INTERVAL_SECONDS.name()
2322 || name == upsert_rocksdb::UPSERT_ROCKSDB_STATS_PERSIST_INTERVAL_SECONDS.name()
2323 || name == upsert_rocksdb::UPSERT_ROCKSDB_POINT_LOOKUP_BLOCK_CACHE_SIZE_MB.name()
2324 || name == upsert_rocksdb::UPSERT_ROCKSDB_SHRINK_ALLOCATED_BUFFERS_BY_RATIO.name()
2325}
2326
2327pub fn is_timestamp_oracle_config_var(name: &str) -> bool {
2330 name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_SIZE.name()
2331 || name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_WAIT.name()
2332 || name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL.name()
2333 || name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL_STAGGER.name()
2334 || name == CRDB_CONNECT_TIMEOUT.name()
2335 || name == CRDB_TCP_USER_TIMEOUT.name()
2336 || name == CRDB_KEEPALIVES_IDLE.name()
2337 || name == CRDB_KEEPALIVES_INTERVAL.name()
2338 || name == CRDB_KEEPALIVES_RETRIES.name()
2339}
2340
2341pub fn is_cluster_scheduling_var(name: &str) -> bool {
2343 name == cluster_scheduling::CLUSTER_MULTI_PROCESS_REPLICA_AZ_AFFINITY_WEIGHT.name()
2344 || name == cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY.name()
2345 || name == cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT.name()
2346 || name == cluster_scheduling::CLUSTER_ENABLE_TOPOLOGY_SPREAD.name()
2347 || name == cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE.name()
2348 || name == cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MAX_SKEW.name()
2349 || name == cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_SOFT.name()
2350 || name == cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY.name()
2351 || name == cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY_WEIGHT.name()
2352}
2353
2354pub fn is_http_config_var(name: &str) -> bool {
2356 name == WEBHOOK_CONCURRENT_REQUEST_LIMIT.name()
2357}
2358
2359static SESSION_SYSTEM_VARS: LazyLock<BTreeMap<&'static UncasedStr, &'static VarDefinition>> =
2363 LazyLock::new(|| {
2364 [
2365 &APPLICATION_NAME,
2366 &CLIENT_ENCODING,
2367 &CLIENT_MIN_MESSAGES,
2368 &CLUSTER,
2369 &CLUSTER_REPLICA,
2370 &DEFAULT_CLUSTER_REPLICATION_FACTOR,
2371 &CURRENT_OBJECT_MISSING_WARNINGS,
2372 &DATABASE,
2373 &DATE_STYLE,
2374 &EXTRA_FLOAT_DIGITS,
2375 &INTEGER_DATETIMES,
2376 &INTERVAL_STYLE,
2377 &REAL_TIME_RECENCY_TIMEOUT,
2378 &SEARCH_PATH,
2379 &STANDARD_CONFORMING_STRINGS,
2380 &STATEMENT_TIMEOUT,
2381 &IDLE_IN_TRANSACTION_SESSION_TIMEOUT,
2382 &TIMEZONE,
2383 &TRANSACTION_ISOLATION,
2384 &MAX_QUERY_RESULT_SIZE,
2385 ]
2386 .into_iter()
2387 .map(|var| (UncasedStr::new(var.name()), var))
2388 .collect()
2389 });
2390
2391#[derive(Debug)]
2394pub struct FeatureFlag {
2395 pub flag: &'static VarDefinition,
2396 pub feature_desc: &'static str,
2397}
2398
2399impl FeatureFlag {
2400 pub fn require(&'static self, system_vars: &SystemVars) -> Result<(), VarError> {
2403 match *system_vars.expect_value::<bool>(self.flag) {
2404 true => Ok(()),
2405 false => Err(VarError::RequiresFeatureFlag { feature_flag: self }),
2406 }
2407 }
2408}
2409
2410impl PartialEq for FeatureFlag {
2411 fn eq(&self, other: &FeatureFlag) -> bool {
2412 self.flag.name() == other.flag.name()
2413 }
2414}
2415
2416impl Eq for FeatureFlag {}
2417
2418impl Var for MzVersion {
2419 fn name(&self) -> &'static str {
2420 MZ_VERSION_NAME.as_str()
2421 }
2422
2423 fn value(&self) -> String {
2424 self.build_info
2425 .human_version(self.helm_chart_version.clone())
2426 }
2427
2428 fn description(&self) -> &'static str {
2429 "Shows the Materialize server version (Materialize)."
2430 }
2431
2432 fn type_name(&self) -> Cow<'static, str> {
2433 String::type_name()
2434 }
2435
2436 fn visible(&self, _: &User, _: &SystemVars) -> Result<(), VarError> {
2437 Ok(())
2438 }
2439}
2440
2441impl Var for User {
2442 fn name(&self) -> &'static str {
2443 IS_SUPERUSER_NAME.as_str()
2444 }
2445
2446 fn value(&self) -> String {
2447 self.is_superuser().format()
2448 }
2449
2450 fn description(&self) -> &'static str {
2451 "Reports whether the current session is a superuser (PostgreSQL)."
2452 }
2453
2454 fn type_name(&self) -> Cow<'static, str> {
2455 bool::type_name()
2456 }
2457
2458 fn visible(&self, _: &User, _: &SystemVars) -> Result<(), VarError> {
2459 Ok(())
2460 }
2461}