1use std::borrow::Cow;
67use std::clone::Clone;
68use std::collections::BTreeMap;
69use std::fmt::Debug;
70use std::net::IpAddr;
71use std::num::NonZeroU32;
72use std::string::ToString;
73use std::sync::{Arc, LazyLock};
74use std::time::Duration;
75
76use chrono::{DateTime, Utc};
77use derivative::Derivative;
78use im::OrdMap;
79use mz_build_info::BuildInfo;
80use mz_dyncfg::{ConfigSet, ConfigType, ConfigUpdates, ConfigVal};
81use mz_persist_client::cfg::{CRDB_CONNECT_TIMEOUT, CRDB_TCP_USER_TIMEOUT};
82use mz_repr::adt::numeric::Numeric;
83use mz_repr::adt::timestamp::CheckedTimestamp;
84use mz_repr::bytes::ByteSize;
85use mz_repr::user::ExternalUserMetadata;
86use mz_tracing::{CloneableEnvFilter, SerializableDirective};
87use serde::Serialize;
88use thiserror::Error;
89use tracing::error;
90use uncased::UncasedStr;
91
92use crate::ast::Ident;
93use crate::session::user::User;
94
95pub(crate) mod constraints;
96pub(crate) mod definitions;
97pub(crate) mod errors;
98pub(crate) mod polyfill;
99pub(crate) mod value;
100
101pub use definitions::*;
102pub use errors::*;
103pub use value::*;
104
105#[derive(Debug, Clone, Copy, PartialEq, Eq)]
110pub enum EndTransactionAction {
111 Commit,
113 Rollback,
115}
116
117#[derive(Debug, Clone, Copy)]
123pub enum VarInput<'a> {
124 Flat(&'a str),
126 SqlSet(&'a [String]),
129}
130
131impl<'a> VarInput<'a> {
132 pub fn to_vec(&self) -> Vec<String> {
134 match self {
135 VarInput::Flat(v) => vec![v.to_string()],
136 VarInput::SqlSet(values) => values.into_iter().map(|v| v.to_string()).collect(),
137 }
138 }
139}
140
141#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize)]
143pub enum OwnedVarInput {
144 Flat(String),
146 SqlSet(Vec<String>),
148}
149
150impl OwnedVarInput {
151 pub fn borrow(&self) -> VarInput<'_> {
153 match self {
154 OwnedVarInput::Flat(v) => VarInput::Flat(v),
155 OwnedVarInput::SqlSet(v) => VarInput::SqlSet(v),
156 }
157 }
158}
159
160pub trait Var: Debug {
162 fn name(&self) -> &'static str;
164
165 fn value(&self) -> String;
171
172 fn description(&self) -> &'static str;
175
176 fn type_name(&self) -> Cow<'static, str>;
178
179 fn visible(&self, user: &User, system_vars: &SystemVars) -> Result<(), VarError>;
184
185 fn is_unsafe(&self) -> bool {
187 self.name().starts_with("unsafe_")
188 }
189
190 fn as_var(&self) -> &dyn Var
193 where
194 Self: Sized,
195 {
196 self
197 }
198}
199
200#[derive(Debug)]
207pub struct SessionVar {
208 definition: VarDefinition,
209 default_value: Option<Box<dyn Value>>,
211 local_value: Option<Box<dyn Value>>,
213 staged_value: Option<Box<dyn Value>>,
215 session_value: Option<Box<dyn Value>>,
217}
218
219impl Clone for SessionVar {
220 fn clone(&self) -> Self {
221 SessionVar {
222 definition: self.definition.clone(),
223 default_value: self.default_value.as_ref().map(|v| v.box_clone()),
224 local_value: self.local_value.as_ref().map(|v| v.box_clone()),
225 staged_value: self.staged_value.as_ref().map(|v| v.box_clone()),
226 session_value: self.session_value.as_ref().map(|v| v.box_clone()),
227 }
228 }
229}
230
231impl SessionVar {
232 pub const fn new(var: VarDefinition) -> Self {
233 SessionVar {
234 definition: var,
235 default_value: None,
236 local_value: None,
237 staged_value: None,
238 session_value: None,
239 }
240 }
241
242 pub fn check(&self, input: VarInput) -> Result<String, VarError> {
245 let v = self.definition.parse(input)?;
246 self.validate_constraints(v.as_ref())?;
247
248 Ok(v.format())
249 }
250
251 pub fn set(&mut self, input: VarInput, local: bool) -> Result<(), VarError> {
253 let v = self.definition.parse(input)?;
254
255 self.validate_constraints(v.as_ref())?;
257
258 if local {
259 self.local_value = Some(v);
260 } else {
261 self.local_value = None;
262 self.staged_value = Some(v);
263 }
264 Ok(())
265 }
266
267 pub fn set_default(&mut self, input: VarInput) -> Result<(), VarError> {
269 let v = self.definition.parse(input)?;
270 self.validate_constraints(v.as_ref())?;
271 self.default_value = Some(v);
272 Ok(())
273 }
274
275 pub fn reset(&mut self, local: bool) {
277 let value = self
278 .default_value
279 .as_ref()
280 .map(|v| v.as_ref())
281 .unwrap_or_else(|| self.definition.value.value());
282 if local {
283 self.local_value = Some(value.box_clone());
284 } else {
285 self.local_value = None;
286 self.staged_value = Some(value.box_clone());
287 }
288 }
289
290 #[must_use]
292 pub fn end_transaction(&self, action: EndTransactionAction) -> Option<Self> {
293 if !self.is_mutating() {
294 return None;
295 }
296 let mut next: Self = self.clone();
297 next.local_value = None;
298 match action {
299 EndTransactionAction::Commit if next.staged_value.is_some() => {
300 next.session_value = next.staged_value.take()
301 }
302 _ => next.staged_value = None,
303 }
304 Some(next)
305 }
306
307 pub fn is_mutating(&self) -> bool {
309 self.local_value.is_some() || self.staged_value.is_some()
310 }
311
312 pub fn value_dyn(&self) -> &dyn Value {
313 self.local_value
314 .as_deref()
315 .or(self.staged_value.as_deref())
316 .or(self.session_value.as_deref())
317 .or(self.default_value.as_deref())
318 .unwrap_or_else(|| self.definition.value.value())
319 }
320
321 pub fn inspect_session_value(&self) -> Option<&dyn Value> {
326 self.session_value.as_deref()
327 }
328
329 fn validate_constraints(&self, val: &dyn Value) -> Result<(), VarError> {
330 if let Some(constraint) = &self.definition.constraint {
331 constraint.check_constraint(self, self.value_dyn(), val)
332 } else {
333 Ok(())
334 }
335 }
336}
337
338impl Var for SessionVar {
339 fn name(&self) -> &'static str {
340 self.definition.name.as_str()
341 }
342
343 fn value(&self) -> String {
344 self.value_dyn().format()
345 }
346
347 fn description(&self) -> &'static str {
348 self.definition.description
349 }
350
351 fn type_name(&self) -> Cow<'static, str> {
352 self.definition.type_name()
353 }
354
355 fn visible(
356 &self,
357 user: &User,
358 system_vars: &super::vars::SystemVars,
359 ) -> Result<(), super::vars::VarError> {
360 self.definition.visible(user, system_vars)
361 }
362}
363
364#[derive(Debug, Clone, PartialEq, Eq)]
365pub struct MzVersion {
366 build_info: &'static BuildInfo,
368 helm_chart_version: Option<String>,
370}
371
372impl MzVersion {
373 pub fn new(build_info: &'static BuildInfo, helm_chart_version: Option<String>) -> Self {
374 MzVersion {
375 build_info,
376 helm_chart_version,
377 }
378 }
379}
380
381#[derive(Debug, Clone)]
386pub struct SessionVars {
387 vars: OrdMap<&'static UncasedStr, SessionVar>,
389 mz_version: MzVersion,
391 user: User,
393}
394
395impl SessionVars {
396 pub fn new_unchecked(
398 build_info: &'static BuildInfo,
399 user: User,
400 helm_chart_version: Option<String>,
401 ) -> SessionVars {
402 use definitions::*;
403
404 let vars = [
405 &FAILPOINTS,
406 &SERVER_VERSION,
407 &SERVER_VERSION_NUM,
408 &SQL_SAFE_UPDATES,
409 &REAL_TIME_RECENCY,
410 &EMIT_PLAN_INSIGHTS_NOTICE,
411 &EMIT_TIMESTAMP_NOTICE,
412 &EMIT_TRACE_ID_NOTICE,
413 &AUTO_ROUTE_CATALOG_QUERIES,
414 &ENABLE_SESSION_RBAC_CHECKS,
415 &ENABLE_SESSION_CARDINALITY_ESTIMATES,
416 &MAX_IDENTIFIER_LENGTH,
417 &STATEMENT_LOGGING_SAMPLE_RATE,
418 &EMIT_INTROSPECTION_QUERY_NOTICE,
419 &UNSAFE_NEW_TRANSACTION_WALL_TIME,
420 &WELCOME_MESSAGE,
421 ]
422 .into_iter()
423 .chain(SESSION_SYSTEM_VARS.iter().map(|(_name, var)| *var))
424 .map(|var| (var.name, SessionVar::new(var.clone())))
425 .collect();
426
427 SessionVars {
428 vars,
429 mz_version: MzVersion::new(build_info, helm_chart_version),
430 user,
431 }
432 }
433
434 fn expect_value<V: Value>(&self, var: &VarDefinition) -> &V {
435 let var = self
436 .vars
437 .get(var.name)
438 .expect("provided var should be in state");
439 let val = var.value_dyn();
440 val.as_any().downcast_ref::<V>().expect("success")
441 }
442
443 pub fn iter(&self) -> impl Iterator<Item = &dyn Var> {
450 #[allow(clippy::as_conversions)]
451 self.vars
452 .values()
453 .map(|v| v.as_var())
454 .chain([&self.mz_version as &dyn Var, &self.user])
455 }
456
457 pub fn notify_set(&self) -> impl Iterator<Item = &dyn Var> {
461 [
468 &APPLICATION_NAME,
469 &CLIENT_ENCODING,
470 &DATE_STYLE,
471 &INTEGER_DATETIMES,
472 &SERVER_VERSION,
473 &STANDARD_CONFORMING_STRINGS,
474 &TIMEZONE,
475 &INTERVAL_STYLE,
476 &CLUSTER,
481 &CLUSTER_REPLICA,
482 &DEFAULT_CLUSTER_REPLICATION_FACTOR,
483 &DATABASE,
484 &SEARCH_PATH,
485 ]
486 .into_iter()
487 .map(|v| self.vars[v.name].as_var())
488 .chain(std::iter::once(self.mz_version.as_var()))
495 }
496
497 pub fn reset_all(&mut self) {
499 let names: Vec<_> = self.vars.keys().copied().collect();
500 for name in names {
501 self.vars[name].reset(false);
502 }
503 }
504
505 pub fn get(&self, system_vars: &SystemVars, name: &str) -> Result<&dyn Var, VarError> {
516 let name = compat_translate_name(name);
517
518 let name = UncasedStr::new(name);
519 if name == MZ_VERSION_NAME {
520 Ok(&self.mz_version)
521 } else if name == IS_SUPERUSER_NAME {
522 Ok(&self.user)
523 } else {
524 self.vars
525 .get(name)
526 .map(|v| {
527 v.visible(&self.user, system_vars)?;
528 Ok(v.as_var())
529 })
530 .transpose()?
531 .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
532 }
533 }
534
535 pub fn inspect(&self, name: &str) -> Result<&SessionVar, VarError> {
540 let name = compat_translate_name(name);
541
542 self.vars
543 .get(UncasedStr::new(name))
544 .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
545 }
546
547 pub fn set(
560 &mut self,
561 system_vars: &SystemVars,
562 name: &str,
563 input: VarInput,
564 local: bool,
565 ) -> Result<(), VarError> {
566 let (name, input) = compat_translate(name, input);
567
568 let name = UncasedStr::new(name);
569 self.check_read_only(name)?;
570
571 self.vars
572 .get_mut(name)
573 .map(|v| {
574 v.visible(&self.user, system_vars)?;
575 v.set(input, local)
576 })
577 .transpose()?
578 .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
579 }
580
581 pub fn set_default(&mut self, name: &str, input: VarInput) -> Result<(), VarError> {
584 let (name, input) = compat_translate(name, input);
585
586 let name = UncasedStr::new(name);
587 self.check_read_only(name)?;
588
589 self.vars
590 .get_mut(name)
591 .map(|v| v.set_default(input))
593 .transpose()?
594 .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
595 }
596
597 pub fn reset(
611 &mut self,
612 system_vars: &SystemVars,
613 name: &str,
614 local: bool,
615 ) -> Result<(), VarError> {
616 let name = compat_translate_name(name);
617
618 let name = UncasedStr::new(name);
619 self.check_read_only(name)?;
620
621 self.vars
622 .get_mut(name)
623 .map(|v| {
624 v.visible(&self.user, system_vars)?;
625 v.reset(local);
626 Ok(())
627 })
628 .transpose()?
629 .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
630 }
631
632 fn check_read_only(&self, name: &UncasedStr) -> Result<(), VarError> {
634 if name == MZ_VERSION_NAME {
635 Err(VarError::ReadOnlyParameter(MZ_VERSION_NAME.as_str()))
636 } else if name == IS_SUPERUSER_NAME {
637 Err(VarError::ReadOnlyParameter(IS_SUPERUSER_NAME.as_str()))
638 } else if name == MAX_IDENTIFIER_LENGTH.name {
639 Err(VarError::ReadOnlyParameter(
640 MAX_IDENTIFIER_LENGTH.name.as_str(),
641 ))
642 } else {
643 Ok(())
644 }
645 }
646
647 #[mz_ore::instrument(level = "debug")]
652 pub fn end_transaction(
653 &mut self,
654 action: EndTransactionAction,
655 ) -> BTreeMap<&'static str, String> {
656 let mut changed = BTreeMap::new();
657 let mut updates = Vec::new();
658 for (name, var) in self.vars.iter() {
659 if !var.is_mutating() {
660 continue;
661 }
662 let before = var.value();
663 let next = var.end_transaction(action).expect("must mutate");
664 let after = next.value();
665 updates.push((*name, next));
666
667 if before != after {
669 changed.insert(var.name(), after);
670 }
671 }
672 self.vars.extend(updates);
673 changed
674 }
675
676 pub fn application_name(&self) -> &str {
678 self.expect_value::<String>(&APPLICATION_NAME).as_str()
679 }
680
681 pub fn build_info(&self) -> &'static BuildInfo {
683 self.mz_version.build_info
684 }
685
686 pub fn client_encoding(&self) -> &ClientEncoding {
688 self.expect_value(&CLIENT_ENCODING)
689 }
690
691 pub fn client_min_messages(&self) -> &ClientSeverity {
693 self.expect_value(&CLIENT_MIN_MESSAGES)
694 }
695
696 pub fn cluster(&self) -> &str {
698 self.expect_value::<String>(&CLUSTER).as_str()
699 }
700
701 pub fn cluster_replica(&self) -> Option<&str> {
703 self.expect_value::<Option<String>>(&CLUSTER_REPLICA)
704 .as_deref()
705 }
706
707 pub fn current_object_missing_warnings(&self) -> bool {
710 *self.expect_value::<bool>(&CURRENT_OBJECT_MISSING_WARNINGS)
711 }
712
713 pub fn date_style(&self) -> &[&str] {
715 &self.expect_value::<DateStyle>(&DATE_STYLE).0
716 }
717
718 pub fn database(&self) -> &str {
720 self.expect_value::<String>(&DATABASE).as_str()
721 }
722
723 pub fn extra_float_digits(&self) -> i32 {
725 *self.expect_value(&EXTRA_FLOAT_DIGITS)
726 }
727
728 pub fn integer_datetimes(&self) -> bool {
730 *self.expect_value(&INTEGER_DATETIMES)
731 }
732
733 pub fn intervalstyle(&self) -> &IntervalStyle {
735 self.expect_value(&INTERVAL_STYLE)
736 }
737
738 pub fn mz_version(&self) -> String {
740 self.mz_version.value()
741 }
742
743 pub fn search_path(&self) -> &[Ident] {
745 self.expect_value::<Vec<Ident>>(&SEARCH_PATH).as_slice()
746 }
747
748 pub fn server_version(&self) -> &str {
750 self.expect_value::<String>(&SERVER_VERSION).as_str()
751 }
752
753 pub fn server_version_num(&self) -> i32 {
755 *self.expect_value(&SERVER_VERSION_NUM)
756 }
757
758 pub fn sql_safe_updates(&self) -> bool {
760 *self.expect_value(&SQL_SAFE_UPDATES)
761 }
762
763 pub fn standard_conforming_strings(&self) -> bool {
766 *self.expect_value(&STANDARD_CONFORMING_STRINGS)
767 }
768
769 pub fn statement_timeout(&self) -> &Duration {
771 self.expect_value(&STATEMENT_TIMEOUT)
772 }
773
774 pub fn idle_in_transaction_session_timeout(&self) -> &Duration {
776 self.expect_value(&IDLE_IN_TRANSACTION_SESSION_TIMEOUT)
777 }
778
779 pub fn timezone(&self) -> &TimeZone {
781 self.expect_value(&TIMEZONE)
782 }
783
784 pub fn transaction_isolation(&self) -> &IsolationLevel {
787 self.expect_value(&TRANSACTION_ISOLATION)
788 }
789
790 pub fn real_time_recency(&self) -> bool {
792 *self.expect_value(&REAL_TIME_RECENCY)
793 }
794
795 pub fn real_time_recency_timeout(&self) -> &Duration {
797 self.expect_value(&REAL_TIME_RECENCY_TIMEOUT)
798 }
799
800 pub fn emit_plan_insights_notice(&self) -> bool {
802 *self.expect_value(&EMIT_PLAN_INSIGHTS_NOTICE)
803 }
804
805 pub fn emit_timestamp_notice(&self) -> bool {
807 *self.expect_value(&EMIT_TIMESTAMP_NOTICE)
808 }
809
810 pub fn emit_trace_id_notice(&self) -> bool {
812 *self.expect_value(&EMIT_TRACE_ID_NOTICE)
813 }
814
815 pub fn auto_route_catalog_queries(&self) -> bool {
817 *self.expect_value(&AUTO_ROUTE_CATALOG_QUERIES)
818 }
819
820 pub fn enable_session_rbac_checks(&self) -> bool {
822 *self.expect_value(&ENABLE_SESSION_RBAC_CHECKS)
823 }
824
825 pub fn enable_session_cardinality_estimates(&self) -> bool {
827 *self.expect_value(&ENABLE_SESSION_CARDINALITY_ESTIMATES)
828 }
829
830 pub fn is_superuser(&self) -> bool {
832 self.user.is_superuser()
833 }
834
835 pub fn user(&self) -> &User {
837 &self.user
838 }
839
840 pub fn max_query_result_size(&self) -> u64 {
842 self.expect_value::<ByteSize>(&MAX_QUERY_RESULT_SIZE)
843 .as_bytes()
844 }
845
846 pub fn set_external_user_metadata(&mut self, metadata: ExternalUserMetadata) {
848 self.user.external_metadata = Some(metadata);
849 }
850
851 pub fn set_cluster(&mut self, cluster: String) {
852 let var = self
853 .vars
854 .get_mut(UncasedStr::new(CLUSTER.name()))
855 .expect("cluster variable must exist");
856 var.set(VarInput::Flat(&cluster), false)
857 .expect("setting cluster must succeed");
858 }
859
860 pub fn set_local_transaction_isolation(&mut self, transaction_isolation: IsolationLevel) {
861 let var = self
862 .vars
863 .get_mut(UncasedStr::new(TRANSACTION_ISOLATION.name()))
864 .expect("transaction_isolation variable must exist");
865 var.set(VarInput::Flat(transaction_isolation.as_str()), true)
866 .expect("setting transaction isolation must succeed");
867 }
868
869 pub fn get_statement_logging_sample_rate(&self) -> Numeric {
870 *self.expect_value(&STATEMENT_LOGGING_SAMPLE_RATE)
871 }
872
873 pub fn emit_introspection_query_notice(&self) -> bool {
875 *self.expect_value(&EMIT_INTROSPECTION_QUERY_NOTICE)
876 }
877
878 pub fn unsafe_new_transaction_wall_time(&self) -> Option<CheckedTimestamp<DateTime<Utc>>> {
879 *self.expect_value(&UNSAFE_NEW_TRANSACTION_WALL_TIME)
880 }
881
882 pub fn welcome_message(&self) -> bool {
884 *self.expect_value(&WELCOME_MESSAGE)
885 }
886}
887
888pub const OLD_CATALOG_SERVER_CLUSTER: &str = "mz_introspection";
890pub const OLD_AUTO_ROUTE_CATALOG_QUERIES: &str = "auto_route_introspection_queries";
891
892fn compat_translate<'a, 'b>(name: &'a str, input: VarInput<'b>) -> (&'a str, VarInput<'b>) {
900 if name == CLUSTER.name() {
901 if let Ok(value) = CLUSTER.parse(input) {
902 if value.format() == OLD_CATALOG_SERVER_CLUSTER {
903 tracing::debug!(
904 github_27285 = true,
905 "encountered deprecated `cluster` variable value: {}",
906 OLD_CATALOG_SERVER_CLUSTER,
907 );
908 return (name, VarInput::Flat("mz_catalog_server"));
909 }
910 }
911 }
912
913 if name == OLD_AUTO_ROUTE_CATALOG_QUERIES {
914 tracing::debug!(
915 github_27285 = true,
916 "encountered deprecated `{}` variable name",
917 OLD_AUTO_ROUTE_CATALOG_QUERIES,
918 );
919 return (AUTO_ROUTE_CATALOG_QUERIES.name(), input);
920 }
921
922 (name, input)
923}
924
925fn compat_translate_name(name: &str) -> &str {
926 let (name, _) = compat_translate(name, VarInput::Flat(""));
927 name
928}
929
930#[derive(Debug)]
933pub struct SystemVar {
934 definition: VarDefinition,
935 persisted_value: Option<Box<dyn Value>>,
937 dynamic_default: Option<Box<dyn Value>>,
939}
940
941impl Clone for SystemVar {
942 fn clone(&self) -> Self {
943 SystemVar {
944 definition: self.definition.clone(),
945 persisted_value: self.persisted_value.as_ref().map(|v| v.box_clone()),
946 dynamic_default: self.dynamic_default.as_ref().map(|v| v.box_clone()),
947 }
948 }
949}
950
951impl SystemVar {
952 pub fn new(definition: VarDefinition) -> Self {
953 SystemVar {
954 definition,
955 persisted_value: None,
956 dynamic_default: None,
957 }
958 }
959
960 fn is_default(&self, input: VarInput) -> Result<bool, VarError> {
961 let v = self.definition.parse(input)?;
962 Ok(self.definition.default_value() == v.as_ref())
963 }
964
965 pub fn value_dyn(&self) -> &dyn Value {
966 self.persisted_value
967 .as_deref()
968 .or(self.dynamic_default.as_deref())
969 .unwrap_or_else(|| self.definition.default_value())
970 }
971
972 pub fn value<V: 'static>(&self) -> &V {
973 let val = self.value_dyn();
974 val.as_any().downcast_ref::<V>().expect("success")
975 }
976
977 fn parse(&self, input: VarInput) -> Result<Box<dyn Value>, VarError> {
978 let v = self.definition.parse(input)?;
979 self.validate_constraints(v.as_ref())?;
981 Ok(v)
982 }
983
984 fn set(&mut self, input: VarInput) -> Result<bool, VarError> {
985 let v = self.parse(input)?;
986
987 if self.persisted_value.as_ref() != Some(&v) {
988 self.persisted_value = Some(v);
989 Ok(true)
990 } else {
991 Ok(false)
992 }
993 }
994
995 fn reset(&mut self) -> bool {
996 if self.persisted_value.is_some() {
997 self.persisted_value = None;
998 true
999 } else {
1000 false
1001 }
1002 }
1003
1004 fn set_default(&mut self, input: VarInput) -> Result<(), VarError> {
1005 let v = self.definition.parse(input)?;
1006 self.dynamic_default = Some(v);
1007 Ok(())
1008 }
1009
1010 fn validate_constraints(&self, val: &dyn Value) -> Result<(), VarError> {
1011 if let Some(constraint) = &self.definition.constraint {
1012 constraint.check_constraint(self, self.value_dyn(), val)
1013 } else {
1014 Ok(())
1015 }
1016 }
1017}
1018
1019impl Var for SystemVar {
1020 fn name(&self) -> &'static str {
1021 self.definition.name.as_str()
1022 }
1023
1024 fn value(&self) -> String {
1025 self.value_dyn().format()
1026 }
1027
1028 fn description(&self) -> &'static str {
1029 self.definition.description
1030 }
1031
1032 fn type_name(&self) -> Cow<'static, str> {
1033 self.definition.type_name()
1034 }
1035
1036 fn visible(&self, user: &User, system_vars: &SystemVars) -> Result<(), VarError> {
1037 self.definition.visible(user, system_vars)
1038 }
1039}
1040
1041#[derive(Debug, Error)]
1042pub enum NetworkPolicyError {
1043 #[error("Access denied for address {0}")]
1044 AddressDenied(IpAddr),
1045}
1046
1047#[derive(Derivative, Clone)]
1052#[derivative(Debug)]
1053pub struct SystemVars {
1054 allow_unsafe: bool,
1056 vars: BTreeMap<&'static UncasedStr, SystemVar>,
1058 #[derivative(Debug = "ignore")]
1060 callbacks: BTreeMap<String, Vec<Arc<dyn Fn(&SystemVars) + Send + Sync>>>,
1061
1062 dyncfgs: ConfigSet,
1066}
1067
1068impl Default for SystemVars {
1069 fn default() -> Self {
1070 Self::new()
1071 }
1072}
1073
1074impl SystemVars {
1075 pub fn new() -> Self {
1076 let system_vars = vec![
1077 &MAX_KAFKA_CONNECTIONS,
1078 &MAX_POSTGRES_CONNECTIONS,
1079 &MAX_MYSQL_CONNECTIONS,
1080 &MAX_SQL_SERVER_CONNECTIONS,
1081 &MAX_AWS_PRIVATELINK_CONNECTIONS,
1082 &MAX_TABLES,
1083 &MAX_SOURCES,
1084 &MAX_SINKS,
1085 &MAX_MATERIALIZED_VIEWS,
1086 &MAX_CLUSTERS,
1087 &MAX_REPLICAS_PER_CLUSTER,
1088 &MAX_CREDIT_CONSUMPTION_RATE,
1089 &MAX_DATABASES,
1090 &MAX_SCHEMAS_PER_DATABASE,
1091 &MAX_OBJECTS_PER_SCHEMA,
1092 &MAX_SECRETS,
1093 &MAX_ROLES,
1094 &MAX_CONTINUAL_TASKS,
1095 &MAX_NETWORK_POLICIES,
1096 &MAX_RULES_PER_NETWORK_POLICY,
1097 &MAX_RESULT_SIZE,
1098 &MAX_COPY_FROM_SIZE,
1099 &ALLOWED_CLUSTER_REPLICA_SIZES,
1100 &upsert_rocksdb::UPSERT_ROCKSDB_COMPACTION_STYLE,
1101 &upsert_rocksdb::UPSERT_ROCKSDB_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET,
1102 &upsert_rocksdb::UPSERT_ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES,
1103 &upsert_rocksdb::UPSERT_ROCKSDB_UNIVERSAL_COMPACTION_RATIO,
1104 &upsert_rocksdb::UPSERT_ROCKSDB_PARALLELISM,
1105 &upsert_rocksdb::UPSERT_ROCKSDB_COMPRESSION_TYPE,
1106 &upsert_rocksdb::UPSERT_ROCKSDB_BOTTOMMOST_COMPRESSION_TYPE,
1107 &upsert_rocksdb::UPSERT_ROCKSDB_BATCH_SIZE,
1108 &upsert_rocksdb::UPSERT_ROCKSDB_RETRY_DURATION,
1109 &upsert_rocksdb::UPSERT_ROCKSDB_STATS_LOG_INTERVAL_SECONDS,
1110 &upsert_rocksdb::UPSERT_ROCKSDB_STATS_PERSIST_INTERVAL_SECONDS,
1111 &upsert_rocksdb::UPSERT_ROCKSDB_POINT_LOOKUP_BLOCK_CACHE_SIZE_MB,
1112 &upsert_rocksdb::UPSERT_ROCKSDB_SHRINK_ALLOCATED_BUFFERS_BY_RATIO,
1113 &upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_CLUSTER_MEMORY_FRACTION,
1114 &upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_MEMORY_BYTES,
1115 &upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_ALLOW_STALL,
1116 &STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES,
1117 &STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_TO_CLUSTER_SIZE_FRACTION,
1118 &STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_DISK_ONLY,
1119 &STORAGE_STATISTICS_INTERVAL,
1120 &STORAGE_STATISTICS_COLLECTION_INTERVAL,
1121 &STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO,
1122 &STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS,
1123 &PERSIST_FAST_PATH_LIMIT,
1124 &METRICS_RETENTION,
1125 &UNSAFE_MOCK_AUDIT_EVENT_TIMESTAMP,
1126 &ENABLE_RBAC_CHECKS,
1127 &PG_SOURCE_CONNECT_TIMEOUT,
1128 &PG_SOURCE_TCP_KEEPALIVES_IDLE,
1129 &PG_SOURCE_TCP_KEEPALIVES_INTERVAL,
1130 &PG_SOURCE_TCP_KEEPALIVES_RETRIES,
1131 &PG_SOURCE_TCP_USER_TIMEOUT,
1132 &PG_SOURCE_TCP_CONFIGURE_SERVER,
1133 &PG_SOURCE_SNAPSHOT_STATEMENT_TIMEOUT,
1134 &PG_SOURCE_WAL_SENDER_TIMEOUT,
1135 &PG_SOURCE_SNAPSHOT_COLLECT_STRICT_COUNT,
1136 &MYSQL_SOURCE_TCP_KEEPALIVE,
1137 &MYSQL_SOURCE_SNAPSHOT_MAX_EXECUTION_TIME,
1138 &MYSQL_SOURCE_SNAPSHOT_LOCK_WAIT_TIMEOUT,
1139 &MYSQL_SOURCE_CONNECT_TIMEOUT,
1140 &SSH_CHECK_INTERVAL,
1141 &SSH_CONNECT_TIMEOUT,
1142 &SSH_KEEPALIVES_IDLE,
1143 &KAFKA_SOCKET_KEEPALIVE,
1144 &KAFKA_SOCKET_TIMEOUT,
1145 &KAFKA_TRANSACTION_TIMEOUT,
1146 &KAFKA_SOCKET_CONNECTION_SETUP_TIMEOUT,
1147 &KAFKA_FETCH_METADATA_TIMEOUT,
1148 &KAFKA_PROGRESS_RECORD_FETCH_TIMEOUT,
1149 &ENABLE_LAUNCHDARKLY,
1150 &MAX_CONNECTIONS,
1151 &NETWORK_POLICY,
1152 &SUPERUSER_RESERVED_CONNECTIONS,
1153 &KEEP_N_SOURCE_STATUS_HISTORY_ENTRIES,
1154 &KEEP_N_SINK_STATUS_HISTORY_ENTRIES,
1155 &KEEP_N_PRIVATELINK_STATUS_HISTORY_ENTRIES,
1156 &REPLICA_STATUS_HISTORY_RETENTION_WINDOW,
1157 &ENABLE_STORAGE_SHARD_FINALIZATION,
1158 &ENABLE_CONSOLIDATE_AFTER_UNION_NEGATE,
1159 &ENABLE_DEFAULT_CONNECTION_VALIDATION,
1160 &ENABLE_REDUCE_REDUCTION,
1161 &MIN_TIMESTAMP_INTERVAL,
1162 &MAX_TIMESTAMP_INTERVAL,
1163 &LOGGING_FILTER,
1164 &OPENTELEMETRY_FILTER,
1165 &LOGGING_FILTER_DEFAULTS,
1166 &OPENTELEMETRY_FILTER_DEFAULTS,
1167 &SENTRY_FILTERS,
1168 &WEBHOOKS_SECRETS_CACHING_TTL_SECS,
1169 &COORD_SLOW_MESSAGE_WARN_THRESHOLD,
1170 &grpc_client::CONNECT_TIMEOUT,
1171 &grpc_client::HTTP2_KEEP_ALIVE_INTERVAL,
1172 &grpc_client::HTTP2_KEEP_ALIVE_TIMEOUT,
1173 &cluster_scheduling::CLUSTER_MULTI_PROCESS_REPLICA_AZ_AFFINITY_WEIGHT,
1174 &cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY,
1175 &cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT,
1176 &cluster_scheduling::CLUSTER_ENABLE_TOPOLOGY_SPREAD,
1177 &cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE,
1178 &cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MAX_SKEW,
1179 &cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MIN_DOMAINS,
1180 &cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_SOFT,
1181 &cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY,
1182 &cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY_WEIGHT,
1183 &cluster_scheduling::CLUSTER_ALTER_CHECK_READY_INTERVAL,
1184 &cluster_scheduling::CLUSTER_CHECK_SCHEDULING_POLICIES_INTERVAL,
1185 &cluster_scheduling::CLUSTER_SECURITY_CONTEXT_ENABLED,
1186 &cluster_scheduling::CLUSTER_REFRESH_MV_COMPACTION_ESTIMATE,
1187 &grpc_client::HTTP2_KEEP_ALIVE_TIMEOUT,
1188 &STATEMENT_LOGGING_MAX_SAMPLE_RATE,
1189 &STATEMENT_LOGGING_DEFAULT_SAMPLE_RATE,
1190 &STATEMENT_LOGGING_TARGET_DATA_RATE,
1191 &STATEMENT_LOGGING_MAX_DATA_CREDIT,
1192 &ENABLE_INTERNAL_STATEMENT_LOGGING,
1193 &OPTIMIZER_STATS_TIMEOUT,
1194 &OPTIMIZER_ONESHOT_STATS_TIMEOUT,
1195 &PRIVATELINK_STATUS_UPDATE_QUOTA_PER_MINUTE,
1196 &WEBHOOK_CONCURRENT_REQUEST_LIMIT,
1197 &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_SIZE,
1198 &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_WAIT,
1199 &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL,
1200 &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL_STAGGER,
1201 &USER_STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION,
1202 &FORCE_SOURCE_TABLE_SYNTAX,
1203 &OPTIMIZER_E2E_LATENCY_WARNING_THRESHOLD,
1204 &SCRAM_ITERATIONS,
1205 ];
1206
1207 let dyncfgs = mz_dyncfgs::all_dyncfgs();
1208 let dyncfg_vars: Vec<_> = dyncfgs
1209 .entries()
1210 .map(|cfg| match cfg.default() {
1211 ConfigVal::Bool(default) => {
1212 VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
1213 }
1214 ConfigVal::U32(default) => {
1215 VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
1216 }
1217 ConfigVal::Usize(default) => {
1218 VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
1219 }
1220 ConfigVal::OptUsize(default) => {
1221 VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
1222 }
1223 ConfigVal::F64(default) => {
1224 VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
1225 }
1226 ConfigVal::String(default) => {
1227 VarDefinition::new_runtime(cfg.name(), default.clone(), cfg.desc(), false)
1228 }
1229 ConfigVal::Duration(default) => {
1230 VarDefinition::new_runtime(cfg.name(), default.clone(), cfg.desc(), false)
1231 }
1232 ConfigVal::Json(default) => {
1233 VarDefinition::new_runtime(cfg.name(), default.clone(), cfg.desc(), false)
1234 }
1235 })
1236 .collect();
1237
1238 let vars: BTreeMap<_, _> = system_vars
1239 .into_iter()
1240 .chain(definitions::FEATURE_FLAGS.iter().copied())
1242 .chain(SESSION_SYSTEM_VARS.values().copied())
1244 .cloned()
1245 .chain(dyncfg_vars)
1247 .map(|var| (var.name, SystemVar::new(var)))
1248 .collect();
1249
1250 let vars = SystemVars {
1251 vars,
1252 callbacks: BTreeMap::new(),
1253 allow_unsafe: false,
1254 dyncfgs,
1255 };
1256
1257 vars
1258 }
1259
1260 pub fn dyncfgs(&self) -> &ConfigSet {
1261 &self.dyncfgs
1262 }
1263
1264 pub fn set_unsafe(mut self, allow_unsafe: bool) -> Self {
1265 self.allow_unsafe = allow_unsafe;
1266 self
1267 }
1268
1269 pub fn allow_unsafe(&self) -> bool {
1270 self.allow_unsafe
1271 }
1272
1273 fn expect_value<V: 'static>(&self, var: &VarDefinition) -> &V {
1274 let val = self
1275 .vars
1276 .get(var.name)
1277 .expect("provided var should be in state");
1278
1279 val.value_dyn()
1280 .as_any()
1281 .downcast_ref::<V>()
1282 .expect("provided var type should matched stored var")
1283 }
1284
1285 fn expect_config_value<V: ConfigType + 'static>(&self, name: &UncasedStr) -> &V {
1286 let val = self
1287 .vars
1288 .get(name)
1289 .unwrap_or_else(|| panic!("provided var {name} should be in state"));
1290
1291 val.value_dyn()
1292 .as_any()
1293 .downcast_ref()
1294 .expect("provided var type should matched stored var")
1295 }
1296
1297 pub fn reset_all(&mut self) {
1300 for (_, var) in &mut self.vars {
1301 var.reset();
1302 }
1303 }
1304
1305 pub fn iter(&self) -> impl Iterator<Item = &dyn Var> {
1308 self.vars
1309 .values()
1310 .map(|v| v.as_var())
1311 .filter(|v| !SESSION_SYSTEM_VARS.contains_key(UncasedStr::new(v.name())))
1312 }
1313
1314 pub fn iter_synced(&self) -> impl Iterator<Item = &dyn Var> {
1318 self.iter().filter(|v| v.name() != ENABLE_LAUNCHDARKLY.name)
1319 }
1320
1321 pub fn iter_session(&self) -> impl Iterator<Item = &dyn Var> {
1323 self.vars
1324 .values()
1325 .map(|v| v.as_var())
1326 .filter(|v| SESSION_SYSTEM_VARS.contains_key(UncasedStr::new(v.name())))
1327 }
1328
1329 pub fn user_modifiable(&self, name: &str) -> bool {
1331 SESSION_SYSTEM_VARS.contains_key(UncasedStr::new(name))
1332 || name == ENABLE_RBAC_CHECKS.name()
1333 || name == NETWORK_POLICY.name()
1334 }
1335
1336 pub fn get(&self, name: &str) -> Result<&dyn Var, VarError> {
1357 self.vars
1358 .get(UncasedStr::new(name))
1359 .map(|v| v.as_var())
1360 .ok_or_else(|| VarError::UnknownParameter(name.into()))
1361 }
1362
1363 pub fn is_default(&self, name: &str, input: VarInput) -> Result<bool, VarError> {
1377 self.vars
1378 .get(UncasedStr::new(name))
1379 .ok_or_else(|| VarError::UnknownParameter(name.into()))
1380 .and_then(|v| v.is_default(input))
1381 }
1382
1383 pub fn set(&mut self, name: &str, input: VarInput) -> Result<bool, VarError> {
1406 let result = self
1407 .vars
1408 .get_mut(UncasedStr::new(name))
1409 .ok_or_else(|| VarError::UnknownParameter(name.into()))
1410 .and_then(|v| v.set(input))?;
1411 self.notify_callbacks(name);
1412 Ok(result)
1413 }
1414
1415 pub fn parse(&self, name: &str, input: VarInput) -> Result<Box<dyn Value>, VarError> {
1436 self.vars
1437 .get(UncasedStr::new(name))
1438 .ok_or_else(|| VarError::UnknownParameter(name.into()))
1439 .and_then(|v| v.parse(input))
1440 }
1441
1442 pub fn set_default(&mut self, name: &str, input: VarInput) -> Result<(), VarError> {
1450 let result = self
1451 .vars
1452 .get_mut(UncasedStr::new(name))
1453 .ok_or_else(|| VarError::UnknownParameter(name.into()))
1454 .and_then(|v| v.set_default(input))?;
1455 self.notify_callbacks(name);
1456 Ok(result)
1457 }
1458
1459 pub fn reset(&mut self, name: &str) -> Result<bool, VarError> {
1477 let result = self
1478 .vars
1479 .get_mut(UncasedStr::new(name))
1480 .ok_or_else(|| VarError::UnknownParameter(name.into()))
1481 .map(|v| v.reset())?;
1482 self.notify_callbacks(name);
1483 Ok(result)
1484 }
1485
1486 pub fn defaults(&self) -> BTreeMap<String, String> {
1488 self.vars
1489 .iter()
1490 .map(|(name, var)| {
1491 let default = var
1492 .dynamic_default
1493 .as_deref()
1494 .unwrap_or_else(|| var.definition.default_value());
1495 (name.as_str().to_owned(), default.format())
1496 })
1497 .collect()
1498 }
1499
1500 pub fn register_callback(
1505 &mut self,
1506 var: &VarDefinition,
1507 callback: Arc<dyn Fn(&SystemVars) + Send + Sync>,
1508 ) {
1509 self.callbacks
1510 .entry(var.name().to_string())
1511 .or_default()
1512 .push(callback);
1513 self.notify_callbacks(var.name());
1514 }
1515
1516 fn notify_callbacks(&self, name: &str) {
1518 if let Some(callbacks) = self.callbacks.get(name) {
1520 for callback in callbacks {
1521 (callback)(self);
1522 }
1523 }
1524 }
1525
1526 pub fn default_cluster(&self) -> String {
1529 self.expect_value::<String>(&CLUSTER).to_owned()
1530 }
1531
1532 pub fn max_kafka_connections(&self) -> u32 {
1534 *self.expect_value(&MAX_KAFKA_CONNECTIONS)
1535 }
1536
1537 pub fn max_postgres_connections(&self) -> u32 {
1539 *self.expect_value(&MAX_POSTGRES_CONNECTIONS)
1540 }
1541
1542 pub fn max_mysql_connections(&self) -> u32 {
1544 *self.expect_value(&MAX_MYSQL_CONNECTIONS)
1545 }
1546
1547 pub fn max_sql_server_connections(&self) -> u32 {
1549 *self.expect_value(&MAX_SQL_SERVER_CONNECTIONS)
1550 }
1551
1552 pub fn max_aws_privatelink_connections(&self) -> u32 {
1554 *self.expect_value(&MAX_AWS_PRIVATELINK_CONNECTIONS)
1555 }
1556
1557 pub fn max_tables(&self) -> u32 {
1559 *self.expect_value(&MAX_TABLES)
1560 }
1561
1562 pub fn max_sources(&self) -> u32 {
1564 *self.expect_value(&MAX_SOURCES)
1565 }
1566
1567 pub fn max_sinks(&self) -> u32 {
1569 *self.expect_value(&MAX_SINKS)
1570 }
1571
1572 pub fn max_materialized_views(&self) -> u32 {
1574 *self.expect_value(&MAX_MATERIALIZED_VIEWS)
1575 }
1576
1577 pub fn max_clusters(&self) -> u32 {
1579 *self.expect_value(&MAX_CLUSTERS)
1580 }
1581
1582 pub fn max_replicas_per_cluster(&self) -> u32 {
1584 *self.expect_value(&MAX_REPLICAS_PER_CLUSTER)
1585 }
1586
1587 pub fn max_credit_consumption_rate(&self) -> Numeric {
1589 *self.expect_value(&MAX_CREDIT_CONSUMPTION_RATE)
1590 }
1591
1592 pub fn max_databases(&self) -> u32 {
1594 *self.expect_value(&MAX_DATABASES)
1595 }
1596
1597 pub fn max_schemas_per_database(&self) -> u32 {
1599 *self.expect_value(&MAX_SCHEMAS_PER_DATABASE)
1600 }
1601
1602 pub fn max_objects_per_schema(&self) -> u32 {
1604 *self.expect_value(&MAX_OBJECTS_PER_SCHEMA)
1605 }
1606
1607 pub fn max_secrets(&self) -> u32 {
1609 *self.expect_value(&MAX_SECRETS)
1610 }
1611
1612 pub fn max_roles(&self) -> u32 {
1614 *self.expect_value(&MAX_ROLES)
1615 }
1616
1617 pub fn max_continual_tasks(&self) -> u32 {
1619 *self.expect_value(&MAX_CONTINUAL_TASKS)
1620 }
1621
1622 pub fn max_network_policies(&self) -> u32 {
1624 *self.expect_value(&MAX_NETWORK_POLICIES)
1625 }
1626
1627 pub fn max_rules_per_network_policy(&self) -> u32 {
1629 *self.expect_value(&MAX_RULES_PER_NETWORK_POLICY)
1630 }
1631
1632 pub fn max_result_size(&self) -> u64 {
1634 self.expect_value::<ByteSize>(&MAX_RESULT_SIZE).as_bytes()
1635 }
1636
1637 pub fn max_copy_from_size(&self) -> u32 {
1639 *self.expect_value(&MAX_COPY_FROM_SIZE)
1640 }
1641
1642 pub fn allowed_cluster_replica_sizes(&self) -> Vec<String> {
1644 self.expect_value::<Vec<Ident>>(&ALLOWED_CLUSTER_REPLICA_SIZES)
1645 .into_iter()
1646 .map(|s| s.as_str().into())
1647 .collect()
1648 }
1649
1650 pub fn default_cluster_replication_factor(&self) -> u32 {
1652 *self.expect_value::<u32>(&DEFAULT_CLUSTER_REPLICATION_FACTOR)
1653 }
1654
1655 pub fn upsert_rocksdb_compaction_style(&self) -> mz_rocksdb_types::config::CompactionStyle {
1656 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_COMPACTION_STYLE)
1657 }
1658
1659 pub fn upsert_rocksdb_optimize_compaction_memtable_budget(&self) -> usize {
1660 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET)
1661 }
1662
1663 pub fn upsert_rocksdb_level_compaction_dynamic_level_bytes(&self) -> bool {
1664 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES)
1665 }
1666
1667 pub fn upsert_rocksdb_universal_compaction_ratio(&self) -> i32 {
1668 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_UNIVERSAL_COMPACTION_RATIO)
1669 }
1670
1671 pub fn upsert_rocksdb_parallelism(&self) -> Option<i32> {
1672 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_PARALLELISM)
1673 }
1674
1675 pub fn upsert_rocksdb_compression_type(&self) -> mz_rocksdb_types::config::CompressionType {
1676 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_COMPRESSION_TYPE)
1677 }
1678
1679 pub fn upsert_rocksdb_bottommost_compression_type(
1680 &self,
1681 ) -> mz_rocksdb_types::config::CompressionType {
1682 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_BOTTOMMOST_COMPRESSION_TYPE)
1683 }
1684
1685 pub fn upsert_rocksdb_batch_size(&self) -> usize {
1686 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_BATCH_SIZE)
1687 }
1688
1689 pub fn upsert_rocksdb_retry_duration(&self) -> Duration {
1690 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_RETRY_DURATION)
1691 }
1692
1693 pub fn upsert_rocksdb_stats_log_interval_seconds(&self) -> u32 {
1694 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_STATS_LOG_INTERVAL_SECONDS)
1695 }
1696
1697 pub fn upsert_rocksdb_stats_persist_interval_seconds(&self) -> u32 {
1698 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_STATS_PERSIST_INTERVAL_SECONDS)
1699 }
1700
1701 pub fn upsert_rocksdb_point_lookup_block_cache_size_mb(&self) -> Option<u32> {
1702 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_POINT_LOOKUP_BLOCK_CACHE_SIZE_MB)
1703 }
1704
1705 pub fn upsert_rocksdb_shrink_allocated_buffers_by_ratio(&self) -> usize {
1706 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_SHRINK_ALLOCATED_BUFFERS_BY_RATIO)
1707 }
1708
1709 pub fn upsert_rocksdb_write_buffer_manager_cluster_memory_fraction(&self) -> Option<Numeric> {
1710 *self.expect_value(
1711 &upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_CLUSTER_MEMORY_FRACTION,
1712 )
1713 }
1714
1715 pub fn upsert_rocksdb_write_buffer_manager_memory_bytes(&self) -> Option<usize> {
1716 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_MEMORY_BYTES)
1717 }
1718
1719 pub fn upsert_rocksdb_write_buffer_manager_allow_stall(&self) -> bool {
1720 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_ALLOW_STALL)
1721 }
1722
1723 pub fn persist_fast_path_limit(&self) -> usize {
1724 *self.expect_value(&PERSIST_FAST_PATH_LIMIT)
1725 }
1726
1727 pub fn pg_source_connect_timeout(&self) -> Duration {
1729 *self.expect_value(&PG_SOURCE_CONNECT_TIMEOUT)
1730 }
1731
1732 pub fn pg_source_tcp_keepalives_retries(&self) -> u32 {
1734 *self.expect_value(&PG_SOURCE_TCP_KEEPALIVES_RETRIES)
1735 }
1736
1737 pub fn pg_source_tcp_keepalives_idle(&self) -> Duration {
1739 *self.expect_value(&PG_SOURCE_TCP_KEEPALIVES_IDLE)
1740 }
1741
1742 pub fn pg_source_tcp_keepalives_interval(&self) -> Duration {
1744 *self.expect_value(&PG_SOURCE_TCP_KEEPALIVES_INTERVAL)
1745 }
1746
1747 pub fn pg_source_tcp_user_timeout(&self) -> Duration {
1749 *self.expect_value(&PG_SOURCE_TCP_USER_TIMEOUT)
1750 }
1751
1752 pub fn pg_source_tcp_configure_server(&self) -> bool {
1754 *self.expect_value(&PG_SOURCE_TCP_CONFIGURE_SERVER)
1755 }
1756
1757 pub fn pg_source_snapshot_statement_timeout(&self) -> Duration {
1759 *self.expect_value(&PG_SOURCE_SNAPSHOT_STATEMENT_TIMEOUT)
1760 }
1761
1762 pub fn pg_source_wal_sender_timeout(&self) -> Option<Duration> {
1764 *self.expect_value(&PG_SOURCE_WAL_SENDER_TIMEOUT)
1765 }
1766
1767 pub fn pg_source_snapshot_collect_strict_count(&self) -> bool {
1769 *self.expect_value(&PG_SOURCE_SNAPSHOT_COLLECT_STRICT_COUNT)
1770 }
1771
1772 pub fn mysql_source_tcp_keepalive(&self) -> Duration {
1774 *self.expect_value(&MYSQL_SOURCE_TCP_KEEPALIVE)
1775 }
1776
1777 pub fn mysql_source_snapshot_max_execution_time(&self) -> Duration {
1779 *self.expect_value(&MYSQL_SOURCE_SNAPSHOT_MAX_EXECUTION_TIME)
1780 }
1781
1782 pub fn mysql_source_snapshot_lock_wait_timeout(&self) -> Duration {
1784 *self.expect_value(&MYSQL_SOURCE_SNAPSHOT_LOCK_WAIT_TIMEOUT)
1785 }
1786
1787 pub fn mysql_source_connect_timeout(&self) -> Duration {
1789 *self.expect_value(&MYSQL_SOURCE_CONNECT_TIMEOUT)
1790 }
1791
1792 pub fn ssh_check_interval(&self) -> Duration {
1794 *self.expect_value(&SSH_CHECK_INTERVAL)
1795 }
1796
1797 pub fn ssh_connect_timeout(&self) -> Duration {
1799 *self.expect_value(&SSH_CONNECT_TIMEOUT)
1800 }
1801
1802 pub fn ssh_keepalives_idle(&self) -> Duration {
1804 *self.expect_value(&SSH_KEEPALIVES_IDLE)
1805 }
1806
1807 pub fn kafka_socket_keepalive(&self) -> bool {
1809 *self.expect_value(&KAFKA_SOCKET_KEEPALIVE)
1810 }
1811
1812 pub fn kafka_socket_timeout(&self) -> Option<Duration> {
1814 *self.expect_value(&KAFKA_SOCKET_TIMEOUT)
1815 }
1816
1817 pub fn kafka_transaction_timeout(&self) -> Duration {
1819 *self.expect_value(&KAFKA_TRANSACTION_TIMEOUT)
1820 }
1821
1822 pub fn kafka_socket_connection_setup_timeout(&self) -> Duration {
1824 *self.expect_value(&KAFKA_SOCKET_CONNECTION_SETUP_TIMEOUT)
1825 }
1826
1827 pub fn kafka_fetch_metadata_timeout(&self) -> Duration {
1829 *self.expect_value(&KAFKA_FETCH_METADATA_TIMEOUT)
1830 }
1831
1832 pub fn kafka_progress_record_fetch_timeout(&self) -> Option<Duration> {
1834 *self.expect_value(&KAFKA_PROGRESS_RECORD_FETCH_TIMEOUT)
1835 }
1836
1837 pub fn crdb_connect_timeout(&self) -> Duration {
1839 *self.expect_config_value(UncasedStr::new(
1840 mz_persist_client::cfg::CRDB_CONNECT_TIMEOUT.name(),
1841 ))
1842 }
1843
1844 pub fn crdb_tcp_user_timeout(&self) -> Duration {
1846 *self.expect_config_value(UncasedStr::new(
1847 mz_persist_client::cfg::CRDB_TCP_USER_TIMEOUT.name(),
1848 ))
1849 }
1850
1851 pub fn storage_dataflow_max_inflight_bytes(&self) -> Option<usize> {
1853 *self.expect_value(&STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES)
1854 }
1855
1856 pub fn storage_dataflow_max_inflight_bytes_to_cluster_size_fraction(&self) -> Option<Numeric> {
1858 *self.expect_value(&STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_TO_CLUSTER_SIZE_FRACTION)
1859 }
1860
1861 pub fn storage_shrink_upsert_unused_buffers_by_ratio(&self) -> usize {
1863 *self.expect_value(&STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO)
1864 }
1865
1866 pub fn storage_dataflow_max_inflight_bytes_disk_only(&self) -> bool {
1868 *self.expect_value(&STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_DISK_ONLY)
1869 }
1870
1871 pub fn storage_statistics_interval(&self) -> Duration {
1873 *self.expect_value(&STORAGE_STATISTICS_INTERVAL)
1874 }
1875
1876 pub fn storage_statistics_collection_interval(&self) -> Duration {
1878 *self.expect_value(&STORAGE_STATISTICS_COLLECTION_INTERVAL)
1879 }
1880
1881 pub fn storage_record_source_sink_namespaced_errors(&self) -> bool {
1883 *self.expect_value(&STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS)
1884 }
1885
1886 pub fn persist_stats_filter_enabled(&self) -> bool {
1888 *self.expect_config_value(UncasedStr::new(
1889 mz_persist_client::stats::STATS_FILTER_ENABLED.name(),
1890 ))
1891 }
1892
1893 pub fn scram_iterations(&self) -> NonZeroU32 {
1894 *self.expect_value(&SCRAM_ITERATIONS)
1895 }
1896
1897 pub fn dyncfg_updates(&self) -> ConfigUpdates {
1898 let mut updates = ConfigUpdates::default();
1899 for entry in self.dyncfgs.entries() {
1900 let name = UncasedStr::new(entry.name());
1901 let val = match entry.val() {
1902 ConfigVal::Bool(_) => ConfigVal::from(*self.expect_config_value::<bool>(name)),
1903 ConfigVal::U32(_) => ConfigVal::from(*self.expect_config_value::<u32>(name)),
1904 ConfigVal::Usize(_) => ConfigVal::from(*self.expect_config_value::<usize>(name)),
1905 ConfigVal::OptUsize(_) => {
1906 ConfigVal::from(*self.expect_config_value::<Option<usize>>(name))
1907 }
1908 ConfigVal::F64(_) => ConfigVal::from(*self.expect_config_value::<f64>(name)),
1909 ConfigVal::String(_) => {
1910 ConfigVal::from(self.expect_config_value::<String>(name).clone())
1911 }
1912 ConfigVal::Duration(_) => {
1913 ConfigVal::from(*self.expect_config_value::<Duration>(name))
1914 }
1915 ConfigVal::Json(_) => {
1916 ConfigVal::from(self.expect_config_value::<serde_json::Value>(name).clone())
1917 }
1918 };
1919 updates.add_dynamic(entry.name(), val);
1920 }
1921 updates.apply(&self.dyncfgs);
1922 updates
1923 }
1924
1925 pub fn metrics_retention(&self) -> Duration {
1927 *self.expect_value(&METRICS_RETENTION)
1928 }
1929
1930 pub fn unsafe_mock_audit_event_timestamp(&self) -> Option<mz_repr::Timestamp> {
1932 *self.expect_value(&UNSAFE_MOCK_AUDIT_EVENT_TIMESTAMP)
1933 }
1934
1935 pub fn enable_rbac_checks(&self) -> bool {
1937 *self.expect_value(&ENABLE_RBAC_CHECKS)
1938 }
1939
1940 pub fn max_connections(&self) -> u32 {
1942 *self.expect_value(&MAX_CONNECTIONS)
1943 }
1944
1945 pub fn default_network_policy_name(&self) -> String {
1946 self.expect_value::<String>(&NETWORK_POLICY).clone()
1947 }
1948
1949 pub fn superuser_reserved_connections(&self) -> u32 {
1951 *self.expect_value(&SUPERUSER_RESERVED_CONNECTIONS)
1952 }
1953
1954 pub fn keep_n_source_status_history_entries(&self) -> usize {
1955 *self.expect_value(&KEEP_N_SOURCE_STATUS_HISTORY_ENTRIES)
1956 }
1957
1958 pub fn keep_n_sink_status_history_entries(&self) -> usize {
1959 *self.expect_value(&KEEP_N_SINK_STATUS_HISTORY_ENTRIES)
1960 }
1961
1962 pub fn keep_n_privatelink_status_history_entries(&self) -> usize {
1963 *self.expect_value(&KEEP_N_PRIVATELINK_STATUS_HISTORY_ENTRIES)
1964 }
1965
1966 pub fn replica_status_history_retention_window(&self) -> Duration {
1967 *self.expect_value(&REPLICA_STATUS_HISTORY_RETENTION_WINDOW)
1968 }
1969
1970 pub fn enable_storage_shard_finalization(&self) -> bool {
1972 *self.expect_value(&ENABLE_STORAGE_SHARD_FINALIZATION)
1973 }
1974
1975 pub fn enable_consolidate_after_union_negate(&self) -> bool {
1976 *self.expect_value(&ENABLE_CONSOLIDATE_AFTER_UNION_NEGATE)
1977 }
1978
1979 pub fn enable_reduce_reduction(&self) -> bool {
1980 *self.expect_value(&ENABLE_REDUCE_REDUCTION)
1981 }
1982
1983 pub fn enable_default_connection_validation(&self) -> bool {
1985 *self.expect_value(&ENABLE_DEFAULT_CONNECTION_VALIDATION)
1986 }
1987
1988 pub fn min_timestamp_interval(&self) -> Duration {
1990 *self.expect_value(&MIN_TIMESTAMP_INTERVAL)
1991 }
1992 pub fn max_timestamp_interval(&self) -> Duration {
1994 *self.expect_value(&MAX_TIMESTAMP_INTERVAL)
1995 }
1996
1997 pub fn logging_filter(&self) -> CloneableEnvFilter {
1998 self.expect_value::<CloneableEnvFilter>(&LOGGING_FILTER)
1999 .clone()
2000 }
2001
2002 pub fn opentelemetry_filter(&self) -> CloneableEnvFilter {
2003 self.expect_value::<CloneableEnvFilter>(&OPENTELEMETRY_FILTER)
2004 .clone()
2005 }
2006
2007 pub fn logging_filter_defaults(&self) -> Vec<SerializableDirective> {
2008 self.expect_value::<Vec<SerializableDirective>>(&LOGGING_FILTER_DEFAULTS)
2009 .clone()
2010 }
2011
2012 pub fn opentelemetry_filter_defaults(&self) -> Vec<SerializableDirective> {
2013 self.expect_value::<Vec<SerializableDirective>>(&OPENTELEMETRY_FILTER_DEFAULTS)
2014 .clone()
2015 }
2016
2017 pub fn sentry_filters(&self) -> Vec<SerializableDirective> {
2018 self.expect_value::<Vec<SerializableDirective>>(&SENTRY_FILTERS)
2019 .clone()
2020 }
2021
2022 pub fn webhooks_secrets_caching_ttl_secs(&self) -> usize {
2023 *self.expect_value(&WEBHOOKS_SECRETS_CACHING_TTL_SECS)
2024 }
2025
2026 pub fn coord_slow_message_warn_threshold(&self) -> Duration {
2027 *self.expect_value(&COORD_SLOW_MESSAGE_WARN_THRESHOLD)
2028 }
2029
2030 pub fn grpc_client_http2_keep_alive_interval(&self) -> Duration {
2031 *self.expect_value(&grpc_client::HTTP2_KEEP_ALIVE_INTERVAL)
2032 }
2033
2034 pub fn grpc_client_http2_keep_alive_timeout(&self) -> Duration {
2035 *self.expect_value(&grpc_client::HTTP2_KEEP_ALIVE_TIMEOUT)
2036 }
2037
2038 pub fn grpc_connect_timeout(&self) -> Duration {
2039 *self.expect_value(&grpc_client::CONNECT_TIMEOUT)
2040 }
2041
2042 pub fn cluster_multi_process_replica_az_affinity_weight(&self) -> Option<i32> {
2043 *self.expect_value(&cluster_scheduling::CLUSTER_MULTI_PROCESS_REPLICA_AZ_AFFINITY_WEIGHT)
2044 }
2045
2046 pub fn cluster_soften_replication_anti_affinity(&self) -> bool {
2047 *self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY)
2048 }
2049
2050 pub fn cluster_soften_replication_anti_affinity_weight(&self) -> i32 {
2051 *self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT)
2052 }
2053
2054 pub fn cluster_enable_topology_spread(&self) -> bool {
2055 *self.expect_value(&cluster_scheduling::CLUSTER_ENABLE_TOPOLOGY_SPREAD)
2056 }
2057
2058 pub fn cluster_topology_spread_ignore_non_singular_scale(&self) -> bool {
2059 *self.expect_value(&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE)
2060 }
2061
2062 pub fn cluster_topology_spread_max_skew(&self) -> i32 {
2063 *self.expect_value(&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MAX_SKEW)
2064 }
2065
2066 pub fn cluster_topology_spread_set_min_domains(&self) -> Option<i32> {
2067 *self.expect_value(&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MIN_DOMAINS)
2068 }
2069
2070 pub fn cluster_topology_spread_soft(&self) -> bool {
2071 *self.expect_value(&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_SOFT)
2072 }
2073
2074 pub fn cluster_soften_az_affinity(&self) -> bool {
2075 *self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY)
2076 }
2077
2078 pub fn cluster_soften_az_affinity_weight(&self) -> i32 {
2079 *self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY_WEIGHT)
2080 }
2081
2082 pub fn cluster_alter_check_ready_interval(&self) -> Duration {
2083 *self.expect_value(&cluster_scheduling::CLUSTER_ALTER_CHECK_READY_INTERVAL)
2084 }
2085
2086 pub fn cluster_check_scheduling_policies_interval(&self) -> Duration {
2087 *self.expect_value(&cluster_scheduling::CLUSTER_CHECK_SCHEDULING_POLICIES_INTERVAL)
2088 }
2089
2090 pub fn cluster_security_context_enabled(&self) -> bool {
2091 *self.expect_value(&cluster_scheduling::CLUSTER_SECURITY_CONTEXT_ENABLED)
2092 }
2093
2094 pub fn cluster_refresh_mv_compaction_estimate(&self) -> Duration {
2095 *self.expect_value(&cluster_scheduling::CLUSTER_REFRESH_MV_COMPACTION_ESTIMATE)
2096 }
2097
2098 pub fn privatelink_status_update_quota_per_minute(&self) -> u32 {
2100 *self.expect_value(&PRIVATELINK_STATUS_UPDATE_QUOTA_PER_MINUTE)
2101 }
2102
2103 pub fn statement_logging_target_data_rate(&self) -> Option<usize> {
2104 *self.expect_value(&STATEMENT_LOGGING_TARGET_DATA_RATE)
2105 }
2106
2107 pub fn statement_logging_max_data_credit(&self) -> Option<usize> {
2108 *self.expect_value(&STATEMENT_LOGGING_MAX_DATA_CREDIT)
2109 }
2110
2111 pub fn statement_logging_max_sample_rate(&self) -> Numeric {
2113 *self.expect_value(&STATEMENT_LOGGING_MAX_SAMPLE_RATE)
2114 }
2115
2116 pub fn statement_logging_default_sample_rate(&self) -> Numeric {
2118 *self.expect_value(&STATEMENT_LOGGING_DEFAULT_SAMPLE_RATE)
2119 }
2120
2121 pub fn enable_internal_statement_logging(&self) -> bool {
2123 *self.expect_value(&ENABLE_INTERNAL_STATEMENT_LOGGING)
2124 }
2125
2126 pub fn optimizer_stats_timeout(&self) -> Duration {
2128 *self.expect_value(&OPTIMIZER_STATS_TIMEOUT)
2129 }
2130
2131 pub fn optimizer_oneshot_stats_timeout(&self) -> Duration {
2133 *self.expect_value(&OPTIMIZER_ONESHOT_STATS_TIMEOUT)
2134 }
2135
2136 pub fn webhook_concurrent_request_limit(&self) -> usize {
2138 *self.expect_value(&WEBHOOK_CONCURRENT_REQUEST_LIMIT)
2139 }
2140
2141 pub fn pg_timestamp_oracle_connection_pool_max_size(&self) -> usize {
2143 *self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_SIZE)
2144 }
2145
2146 pub fn pg_timestamp_oracle_connection_pool_max_wait(&self) -> Option<Duration> {
2148 *self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_WAIT)
2149 }
2150
2151 pub fn pg_timestamp_oracle_connection_pool_ttl(&self) -> Duration {
2153 *self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL)
2154 }
2155
2156 pub fn pg_timestamp_oracle_connection_pool_ttl_stagger(&self) -> Duration {
2158 *self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL_STAGGER)
2159 }
2160
2161 pub fn user_storage_managed_collections_batch_duration(&self) -> Duration {
2163 *self.expect_value(&USER_STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION)
2164 }
2165
2166 pub fn force_source_table_syntax(&self) -> bool {
2167 *self.expect_value(&FORCE_SOURCE_TABLE_SYNTAX)
2168 }
2169
2170 pub fn optimizer_e2e_latency_warning_threshold(&self) -> Duration {
2171 *self.expect_value(&OPTIMIZER_E2E_LATENCY_WARNING_THRESHOLD)
2172 }
2173
2174 pub fn is_controller_config_var(&self, name: &str) -> bool {
2176 self.is_dyncfg_var(name)
2177 }
2178
2179 pub fn is_compute_config_var(&self, name: &str) -> bool {
2183 name == MAX_RESULT_SIZE.name() || self.is_dyncfg_var(name) || is_tracing_var(name)
2184 }
2185
2186 pub fn is_metrics_config_var(&self, name: &str) -> bool {
2188 self.is_dyncfg_var(name)
2189 }
2190
2191 pub fn is_storage_config_var(&self, name: &str) -> bool {
2193 name == PG_SOURCE_CONNECT_TIMEOUT.name()
2194 || name == PG_SOURCE_TCP_KEEPALIVES_IDLE.name()
2195 || name == PG_SOURCE_TCP_KEEPALIVES_INTERVAL.name()
2196 || name == PG_SOURCE_TCP_KEEPALIVES_RETRIES.name()
2197 || name == PG_SOURCE_TCP_USER_TIMEOUT.name()
2198 || name == PG_SOURCE_TCP_CONFIGURE_SERVER.name()
2199 || name == PG_SOURCE_SNAPSHOT_STATEMENT_TIMEOUT.name()
2200 || name == PG_SOURCE_WAL_SENDER_TIMEOUT.name()
2201 || name == PG_SOURCE_SNAPSHOT_COLLECT_STRICT_COUNT.name()
2202 || name == MYSQL_SOURCE_TCP_KEEPALIVE.name()
2203 || name == MYSQL_SOURCE_SNAPSHOT_MAX_EXECUTION_TIME.name()
2204 || name == MYSQL_SOURCE_SNAPSHOT_LOCK_WAIT_TIMEOUT.name()
2205 || name == MYSQL_SOURCE_CONNECT_TIMEOUT.name()
2206 || name == ENABLE_STORAGE_SHARD_FINALIZATION.name()
2207 || name == SSH_CHECK_INTERVAL.name()
2208 || name == SSH_CONNECT_TIMEOUT.name()
2209 || name == SSH_KEEPALIVES_IDLE.name()
2210 || name == KAFKA_SOCKET_KEEPALIVE.name()
2211 || name == KAFKA_SOCKET_TIMEOUT.name()
2212 || name == KAFKA_TRANSACTION_TIMEOUT.name()
2213 || name == KAFKA_SOCKET_CONNECTION_SETUP_TIMEOUT.name()
2214 || name == KAFKA_FETCH_METADATA_TIMEOUT.name()
2215 || name == KAFKA_PROGRESS_RECORD_FETCH_TIMEOUT.name()
2216 || name == STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES.name()
2217 || name == STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_TO_CLUSTER_SIZE_FRACTION.name()
2218 || name == STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_DISK_ONLY.name()
2219 || name == STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO.name()
2220 || name == STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS.name()
2221 || name == STORAGE_STATISTICS_INTERVAL.name()
2222 || name == STORAGE_STATISTICS_COLLECTION_INTERVAL.name()
2223 || name == USER_STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION.name()
2224 || is_upsert_rocksdb_config_var(name)
2225 || self.is_dyncfg_var(name)
2226 || is_tracing_var(name)
2227 }
2228
2229 fn is_dyncfg_var(&self, name: &str) -> bool {
2231 self.dyncfgs.entries().any(|e| name == e.name())
2232 }
2233}
2234
2235pub fn is_tracing_var(name: &str) -> bool {
2236 name == LOGGING_FILTER.name()
2237 || name == LOGGING_FILTER_DEFAULTS.name()
2238 || name == OPENTELEMETRY_FILTER.name()
2239 || name == OPENTELEMETRY_FILTER_DEFAULTS.name()
2240 || name == SENTRY_FILTERS.name()
2241}
2242
2243pub fn is_secrets_caching_var(name: &str) -> bool {
2245 name == WEBHOOKS_SECRETS_CACHING_TTL_SECS.name()
2246}
2247
2248fn is_upsert_rocksdb_config_var(name: &str) -> bool {
2249 name == upsert_rocksdb::UPSERT_ROCKSDB_COMPACTION_STYLE.name()
2250 || name == upsert_rocksdb::UPSERT_ROCKSDB_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET.name()
2251 || name == upsert_rocksdb::UPSERT_ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES.name()
2252 || name == upsert_rocksdb::UPSERT_ROCKSDB_UNIVERSAL_COMPACTION_RATIO.name()
2253 || name == upsert_rocksdb::UPSERT_ROCKSDB_PARALLELISM.name()
2254 || name == upsert_rocksdb::UPSERT_ROCKSDB_COMPRESSION_TYPE.name()
2255 || name == upsert_rocksdb::UPSERT_ROCKSDB_BOTTOMMOST_COMPRESSION_TYPE.name()
2256 || name == upsert_rocksdb::UPSERT_ROCKSDB_BATCH_SIZE.name()
2257 || name == upsert_rocksdb::UPSERT_ROCKSDB_STATS_LOG_INTERVAL_SECONDS.name()
2258 || name == upsert_rocksdb::UPSERT_ROCKSDB_STATS_PERSIST_INTERVAL_SECONDS.name()
2259 || name == upsert_rocksdb::UPSERT_ROCKSDB_POINT_LOOKUP_BLOCK_CACHE_SIZE_MB.name()
2260 || name == upsert_rocksdb::UPSERT_ROCKSDB_SHRINK_ALLOCATED_BUFFERS_BY_RATIO.name()
2261}
2262
2263pub fn is_pg_timestamp_oracle_config_var(name: &str) -> bool {
2266 name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_SIZE.name()
2267 || name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_WAIT.name()
2268 || name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL.name()
2269 || name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL_STAGGER.name()
2270 || name == CRDB_CONNECT_TIMEOUT.name()
2271 || name == CRDB_TCP_USER_TIMEOUT.name()
2272}
2273
2274pub fn is_cluster_scheduling_var(name: &str) -> bool {
2276 name == cluster_scheduling::CLUSTER_MULTI_PROCESS_REPLICA_AZ_AFFINITY_WEIGHT.name()
2277 || name == cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY.name()
2278 || name == cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT.name()
2279 || name == cluster_scheduling::CLUSTER_ENABLE_TOPOLOGY_SPREAD.name()
2280 || name == cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE.name()
2281 || name == cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MAX_SKEW.name()
2282 || name == cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_SOFT.name()
2283 || name == cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY.name()
2284 || name == cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY_WEIGHT.name()
2285}
2286
2287pub fn is_http_config_var(name: &str) -> bool {
2289 name == WEBHOOK_CONCURRENT_REQUEST_LIMIT.name()
2290}
2291
2292static SESSION_SYSTEM_VARS: LazyLock<BTreeMap<&'static UncasedStr, &'static VarDefinition>> =
2296 LazyLock::new(|| {
2297 [
2298 &APPLICATION_NAME,
2299 &CLIENT_ENCODING,
2300 &CLIENT_MIN_MESSAGES,
2301 &CLUSTER,
2302 &CLUSTER_REPLICA,
2303 &DEFAULT_CLUSTER_REPLICATION_FACTOR,
2304 &CURRENT_OBJECT_MISSING_WARNINGS,
2305 &DATABASE,
2306 &DATE_STYLE,
2307 &EXTRA_FLOAT_DIGITS,
2308 &INTEGER_DATETIMES,
2309 &INTERVAL_STYLE,
2310 &REAL_TIME_RECENCY_TIMEOUT,
2311 &SEARCH_PATH,
2312 &STANDARD_CONFORMING_STRINGS,
2313 &STATEMENT_TIMEOUT,
2314 &IDLE_IN_TRANSACTION_SESSION_TIMEOUT,
2315 &TIMEZONE,
2316 &TRANSACTION_ISOLATION,
2317 &MAX_QUERY_RESULT_SIZE,
2318 ]
2319 .into_iter()
2320 .map(|var| (UncasedStr::new(var.name()), var))
2321 .collect()
2322 });
2323
2324#[derive(Debug)]
2327pub struct FeatureFlag {
2328 pub flag: &'static VarDefinition,
2329 pub feature_desc: &'static str,
2330}
2331
2332impl FeatureFlag {
2333 pub fn require(&'static self, system_vars: &SystemVars) -> Result<(), VarError> {
2336 match *system_vars.expect_value::<bool>(self.flag) {
2337 true => Ok(()),
2338 false => Err(VarError::RequiresFeatureFlag { feature_flag: self }),
2339 }
2340 }
2341}
2342
2343impl PartialEq for FeatureFlag {
2344 fn eq(&self, other: &FeatureFlag) -> bool {
2345 self.flag.name() == other.flag.name()
2346 }
2347}
2348
2349impl Eq for FeatureFlag {}
2350
2351impl Var for MzVersion {
2352 fn name(&self) -> &'static str {
2353 MZ_VERSION_NAME.as_str()
2354 }
2355
2356 fn value(&self) -> String {
2357 self.build_info
2358 .human_version(self.helm_chart_version.clone())
2359 }
2360
2361 fn description(&self) -> &'static str {
2362 "Shows the Materialize server version (Materialize)."
2363 }
2364
2365 fn type_name(&self) -> Cow<'static, str> {
2366 String::type_name()
2367 }
2368
2369 fn visible(&self, _: &User, _: &SystemVars) -> Result<(), VarError> {
2370 Ok(())
2371 }
2372}
2373
2374impl Var for User {
2375 fn name(&self) -> &'static str {
2376 IS_SUPERUSER_NAME.as_str()
2377 }
2378
2379 fn value(&self) -> String {
2380 self.is_superuser().format()
2381 }
2382
2383 fn description(&self) -> &'static str {
2384 "Reports whether the current session is a superuser (PostgreSQL)."
2385 }
2386
2387 fn type_name(&self) -> Cow<'static, str> {
2388 bool::type_name()
2389 }
2390
2391 fn visible(&self, _: &User, _: &SystemVars) -> Result<(), VarError> {
2392 Ok(())
2393 }
2394}