1use std::borrow::Cow;
67use std::clone::Clone;
68use std::collections::BTreeMap;
69use std::fmt::Debug;
70use std::net::IpAddr;
71use std::string::ToString;
72use std::sync::{Arc, LazyLock};
73use std::time::Duration;
74
75use chrono::{DateTime, Utc};
76use derivative::Derivative;
77use im::OrdMap;
78use mz_build_info::BuildInfo;
79use mz_dyncfg::{ConfigSet, ConfigType, ConfigUpdates, ConfigVal};
80use mz_persist_client::cfg::{CRDB_CONNECT_TIMEOUT, CRDB_TCP_USER_TIMEOUT};
81use mz_repr::adt::numeric::Numeric;
82use mz_repr::adt::timestamp::CheckedTimestamp;
83use mz_repr::bytes::ByteSize;
84use mz_repr::user::ExternalUserMetadata;
85use mz_tracing::{CloneableEnvFilter, SerializableDirective};
86use serde::Serialize;
87use thiserror::Error;
88use tracing::error;
89use uncased::UncasedStr;
90
91use crate::ast::Ident;
92use crate::session::user::User;
93
94pub(crate) mod constraints;
95pub(crate) mod definitions;
96pub(crate) mod errors;
97pub(crate) mod polyfill;
98pub(crate) mod value;
99
100pub use definitions::*;
101pub use errors::*;
102pub use value::*;
103
104#[derive(Debug, Clone, Copy, PartialEq, Eq)]
109pub enum EndTransactionAction {
110 Commit,
112 Rollback,
114}
115
116#[derive(Debug, Clone, Copy)]
122pub enum VarInput<'a> {
123 Flat(&'a str),
125 SqlSet(&'a [String]),
128}
129
130impl<'a> VarInput<'a> {
131 pub fn to_vec(&self) -> Vec<String> {
133 match self {
134 VarInput::Flat(v) => vec![v.to_string()],
135 VarInput::SqlSet(values) => values.into_iter().map(|v| v.to_string()).collect(),
136 }
137 }
138}
139
140#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize)]
142pub enum OwnedVarInput {
143 Flat(String),
145 SqlSet(Vec<String>),
147}
148
149impl OwnedVarInput {
150 pub fn borrow(&self) -> VarInput {
152 match self {
153 OwnedVarInput::Flat(v) => VarInput::Flat(v),
154 OwnedVarInput::SqlSet(v) => VarInput::SqlSet(v),
155 }
156 }
157}
158
159pub trait Var: Debug {
161 fn name(&self) -> &'static str;
163
164 fn value(&self) -> String;
170
171 fn description(&self) -> &'static str;
174
175 fn type_name(&self) -> Cow<'static, str>;
177
178 fn visible(&self, user: &User, system_vars: &SystemVars) -> Result<(), VarError>;
183
184 fn is_unsafe(&self) -> bool {
186 self.name().starts_with("unsafe_")
187 }
188
189 fn as_var(&self) -> &dyn Var
192 where
193 Self: Sized,
194 {
195 self
196 }
197}
198
199#[derive(Debug)]
206pub struct SessionVar {
207 definition: VarDefinition,
208 default_value: Option<Box<dyn Value>>,
210 local_value: Option<Box<dyn Value>>,
212 staged_value: Option<Box<dyn Value>>,
214 session_value: Option<Box<dyn Value>>,
216}
217
218impl Clone for SessionVar {
219 fn clone(&self) -> Self {
220 SessionVar {
221 definition: self.definition.clone(),
222 default_value: self.default_value.as_ref().map(|v| v.box_clone()),
223 local_value: self.local_value.as_ref().map(|v| v.box_clone()),
224 staged_value: self.staged_value.as_ref().map(|v| v.box_clone()),
225 session_value: self.session_value.as_ref().map(|v| v.box_clone()),
226 }
227 }
228}
229
230impl SessionVar {
231 pub const fn new(var: VarDefinition) -> Self {
232 SessionVar {
233 definition: var,
234 default_value: None,
235 local_value: None,
236 staged_value: None,
237 session_value: None,
238 }
239 }
240
241 pub fn check(&self, input: VarInput) -> Result<String, VarError> {
244 let v = self.definition.parse(input)?;
245 self.validate_constraints(v.as_ref())?;
246
247 Ok(v.format())
248 }
249
250 pub fn set(&mut self, input: VarInput, local: bool) -> Result<(), VarError> {
252 let v = self.definition.parse(input)?;
253
254 self.validate_constraints(v.as_ref())?;
256
257 if local {
258 self.local_value = Some(v);
259 } else {
260 self.local_value = None;
261 self.staged_value = Some(v);
262 }
263 Ok(())
264 }
265
266 pub fn set_default(&mut self, input: VarInput) -> Result<(), VarError> {
268 let v = self.definition.parse(input)?;
269 self.validate_constraints(v.as_ref())?;
270 self.default_value = Some(v);
271 Ok(())
272 }
273
274 pub fn reset(&mut self, local: bool) {
276 let value = self
277 .default_value
278 .as_ref()
279 .map(|v| v.as_ref())
280 .unwrap_or_else(|| self.definition.value.value());
281 if local {
282 self.local_value = Some(value.box_clone());
283 } else {
284 self.local_value = None;
285 self.staged_value = Some(value.box_clone());
286 }
287 }
288
289 #[must_use]
291 pub fn end_transaction(&self, action: EndTransactionAction) -> Option<Self> {
292 if !self.is_mutating() {
293 return None;
294 }
295 let mut next: Self = self.clone();
296 next.local_value = None;
297 match action {
298 EndTransactionAction::Commit if next.staged_value.is_some() => {
299 next.session_value = next.staged_value.take()
300 }
301 _ => next.staged_value = None,
302 }
303 Some(next)
304 }
305
306 pub fn is_mutating(&self) -> bool {
308 self.local_value.is_some() || self.staged_value.is_some()
309 }
310
311 pub fn value_dyn(&self) -> &dyn Value {
312 self.local_value
313 .as_deref()
314 .or(self.staged_value.as_deref())
315 .or(self.session_value.as_deref())
316 .or(self.default_value.as_deref())
317 .unwrap_or_else(|| self.definition.value.value())
318 }
319
320 pub fn inspect_session_value(&self) -> Option<&dyn Value> {
325 self.session_value.as_deref()
326 }
327
328 fn validate_constraints(&self, val: &dyn Value) -> Result<(), VarError> {
329 if let Some(constraint) = &self.definition.constraint {
330 constraint.check_constraint(self, self.value_dyn(), val)
331 } else {
332 Ok(())
333 }
334 }
335}
336
337impl Var for SessionVar {
338 fn name(&self) -> &'static str {
339 self.definition.name.as_str()
340 }
341
342 fn value(&self) -> String {
343 self.value_dyn().format()
344 }
345
346 fn description(&self) -> &'static str {
347 self.definition.description
348 }
349
350 fn type_name(&self) -> Cow<'static, str> {
351 self.definition.type_name()
352 }
353
354 fn visible(
355 &self,
356 user: &User,
357 system_vars: &super::vars::SystemVars,
358 ) -> Result<(), super::vars::VarError> {
359 self.definition.visible(user, system_vars)
360 }
361}
362
363#[derive(Debug, Clone, PartialEq, Eq)]
364pub struct MzVersion {
365 build_info: &'static BuildInfo,
367 helm_chart_version: Option<String>,
369}
370
371impl MzVersion {
372 pub fn new(build_info: &'static BuildInfo, helm_chart_version: Option<String>) -> Self {
373 MzVersion {
374 build_info,
375 helm_chart_version,
376 }
377 }
378}
379
380#[derive(Debug, Clone)]
385pub struct SessionVars {
386 vars: OrdMap<&'static UncasedStr, SessionVar>,
388 mz_version: MzVersion,
390 user: User,
392}
393
394impl SessionVars {
395 pub fn new_unchecked(
397 build_info: &'static BuildInfo,
398 user: User,
399 helm_chart_version: Option<String>,
400 ) -> SessionVars {
401 use definitions::*;
402
403 let vars = [
404 &FAILPOINTS,
405 &SERVER_VERSION,
406 &SERVER_VERSION_NUM,
407 &SQL_SAFE_UPDATES,
408 &REAL_TIME_RECENCY,
409 &EMIT_PLAN_INSIGHTS_NOTICE,
410 &EMIT_TIMESTAMP_NOTICE,
411 &EMIT_TRACE_ID_NOTICE,
412 &AUTO_ROUTE_CATALOG_QUERIES,
413 &ENABLE_SESSION_RBAC_CHECKS,
414 &ENABLE_SESSION_CARDINALITY_ESTIMATES,
415 &MAX_IDENTIFIER_LENGTH,
416 &STATEMENT_LOGGING_SAMPLE_RATE,
417 &EMIT_INTROSPECTION_QUERY_NOTICE,
418 &UNSAFE_NEW_TRANSACTION_WALL_TIME,
419 &WELCOME_MESSAGE,
420 ]
421 .into_iter()
422 .chain(SESSION_SYSTEM_VARS.iter().map(|(_name, var)| *var))
423 .map(|var| (var.name, SessionVar::new(var.clone())))
424 .collect();
425
426 SessionVars {
427 vars,
428 mz_version: MzVersion::new(build_info, helm_chart_version),
429 user,
430 }
431 }
432
433 fn expect_value<V: Value>(&self, var: &VarDefinition) -> &V {
434 let var = self
435 .vars
436 .get(var.name)
437 .expect("provided var should be in state");
438 let val = var.value_dyn();
439 val.as_any().downcast_ref::<V>().expect("success")
440 }
441
442 pub fn iter(&self) -> impl Iterator<Item = &dyn Var> {
449 #[allow(clippy::as_conversions)]
450 self.vars
451 .values()
452 .map(|v| v.as_var())
453 .chain([&self.mz_version as &dyn Var, &self.user])
454 }
455
456 pub fn notify_set(&self) -> impl Iterator<Item = &dyn Var> {
460 [
467 &APPLICATION_NAME,
468 &CLIENT_ENCODING,
469 &DATE_STYLE,
470 &INTEGER_DATETIMES,
471 &SERVER_VERSION,
472 &STANDARD_CONFORMING_STRINGS,
473 &TIMEZONE,
474 &INTERVAL_STYLE,
475 &CLUSTER,
480 &CLUSTER_REPLICA,
481 &DEFAULT_CLUSTER_REPLICATION_FACTOR,
482 &DATABASE,
483 &SEARCH_PATH,
484 ]
485 .into_iter()
486 .map(|v| self.vars[v.name].as_var())
487 .chain(std::iter::once(self.mz_version.as_var()))
494 }
495
496 pub fn reset_all(&mut self) {
498 let names: Vec<_> = self.vars.keys().copied().collect();
499 for name in names {
500 self.vars[name].reset(false);
501 }
502 }
503
504 pub fn get(&self, system_vars: &SystemVars, name: &str) -> Result<&dyn Var, VarError> {
515 let name = compat_translate_name(name);
516
517 let name = UncasedStr::new(name);
518 if name == MZ_VERSION_NAME {
519 Ok(&self.mz_version)
520 } else if name == IS_SUPERUSER_NAME {
521 Ok(&self.user)
522 } else {
523 self.vars
524 .get(name)
525 .map(|v| {
526 v.visible(&self.user, system_vars)?;
527 Ok(v.as_var())
528 })
529 .transpose()?
530 .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
531 }
532 }
533
534 pub fn inspect(&self, name: &str) -> Result<&SessionVar, VarError> {
539 let name = compat_translate_name(name);
540
541 self.vars
542 .get(UncasedStr::new(name))
543 .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
544 }
545
546 pub fn set(
559 &mut self,
560 system_vars: &SystemVars,
561 name: &str,
562 input: VarInput,
563 local: bool,
564 ) -> Result<(), VarError> {
565 let (name, input) = compat_translate(name, input);
566
567 let name = UncasedStr::new(name);
568 self.check_read_only(name)?;
569
570 self.vars
571 .get_mut(name)
572 .map(|v| {
573 v.visible(&self.user, system_vars)?;
574 v.set(input, local)
575 })
576 .transpose()?
577 .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
578 }
579
580 pub fn set_default(&mut self, name: &str, input: VarInput) -> Result<(), VarError> {
583 let (name, input) = compat_translate(name, input);
584
585 let name = UncasedStr::new(name);
586 self.check_read_only(name)?;
587
588 self.vars
589 .get_mut(name)
590 .map(|v| v.set_default(input))
592 .transpose()?
593 .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
594 }
595
596 pub fn reset(
610 &mut self,
611 system_vars: &SystemVars,
612 name: &str,
613 local: bool,
614 ) -> Result<(), VarError> {
615 let name = compat_translate_name(name);
616
617 let name = UncasedStr::new(name);
618 self.check_read_only(name)?;
619
620 self.vars
621 .get_mut(name)
622 .map(|v| {
623 v.visible(&self.user, system_vars)?;
624 v.reset(local);
625 Ok(())
626 })
627 .transpose()?
628 .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
629 }
630
631 fn check_read_only(&self, name: &UncasedStr) -> Result<(), VarError> {
633 if name == MZ_VERSION_NAME {
634 Err(VarError::ReadOnlyParameter(MZ_VERSION_NAME.as_str()))
635 } else if name == IS_SUPERUSER_NAME {
636 Err(VarError::ReadOnlyParameter(IS_SUPERUSER_NAME.as_str()))
637 } else if name == MAX_IDENTIFIER_LENGTH.name {
638 Err(VarError::ReadOnlyParameter(
639 MAX_IDENTIFIER_LENGTH.name.as_str(),
640 ))
641 } else {
642 Ok(())
643 }
644 }
645
646 #[mz_ore::instrument(level = "debug")]
651 pub fn end_transaction(
652 &mut self,
653 action: EndTransactionAction,
654 ) -> BTreeMap<&'static str, String> {
655 let mut changed = BTreeMap::new();
656 let mut updates = Vec::new();
657 for (name, var) in self.vars.iter() {
658 if !var.is_mutating() {
659 continue;
660 }
661 let before = var.value();
662 let next = var.end_transaction(action).expect("must mutate");
663 let after = next.value();
664 updates.push((*name, next));
665
666 if before != after {
668 changed.insert(var.name(), after);
669 }
670 }
671 self.vars.extend(updates);
672 changed
673 }
674
675 pub fn application_name(&self) -> &str {
677 self.expect_value::<String>(&APPLICATION_NAME).as_str()
678 }
679
680 pub fn build_info(&self) -> &'static BuildInfo {
682 self.mz_version.build_info
683 }
684
685 pub fn client_encoding(&self) -> &ClientEncoding {
687 self.expect_value(&CLIENT_ENCODING)
688 }
689
690 pub fn client_min_messages(&self) -> &ClientSeverity {
692 self.expect_value(&CLIENT_MIN_MESSAGES)
693 }
694
695 pub fn cluster(&self) -> &str {
697 self.expect_value::<String>(&CLUSTER).as_str()
698 }
699
700 pub fn cluster_replica(&self) -> Option<&str> {
702 self.expect_value::<Option<String>>(&CLUSTER_REPLICA)
703 .as_deref()
704 }
705
706 pub fn current_object_missing_warnings(&self) -> bool {
709 *self.expect_value::<bool>(&CURRENT_OBJECT_MISSING_WARNINGS)
710 }
711
712 pub fn date_style(&self) -> &[&str] {
714 &self.expect_value::<DateStyle>(&DATE_STYLE).0
715 }
716
717 pub fn database(&self) -> &str {
719 self.expect_value::<String>(&DATABASE).as_str()
720 }
721
722 pub fn extra_float_digits(&self) -> i32 {
724 *self.expect_value(&EXTRA_FLOAT_DIGITS)
725 }
726
727 pub fn integer_datetimes(&self) -> bool {
729 *self.expect_value(&INTEGER_DATETIMES)
730 }
731
732 pub fn intervalstyle(&self) -> &IntervalStyle {
734 self.expect_value(&INTERVAL_STYLE)
735 }
736
737 pub fn mz_version(&self) -> String {
739 self.mz_version.value()
740 }
741
742 pub fn search_path(&self) -> &[Ident] {
744 self.expect_value::<Vec<Ident>>(&SEARCH_PATH).as_slice()
745 }
746
747 pub fn server_version(&self) -> &str {
749 self.expect_value::<String>(&SERVER_VERSION).as_str()
750 }
751
752 pub fn server_version_num(&self) -> i32 {
754 *self.expect_value(&SERVER_VERSION_NUM)
755 }
756
757 pub fn sql_safe_updates(&self) -> bool {
759 *self.expect_value(&SQL_SAFE_UPDATES)
760 }
761
762 pub fn standard_conforming_strings(&self) -> bool {
765 *self.expect_value(&STANDARD_CONFORMING_STRINGS)
766 }
767
768 pub fn statement_timeout(&self) -> &Duration {
770 self.expect_value(&STATEMENT_TIMEOUT)
771 }
772
773 pub fn idle_in_transaction_session_timeout(&self) -> &Duration {
775 self.expect_value(&IDLE_IN_TRANSACTION_SESSION_TIMEOUT)
776 }
777
778 pub fn timezone(&self) -> &TimeZone {
780 self.expect_value(&TIMEZONE)
781 }
782
783 pub fn transaction_isolation(&self) -> &IsolationLevel {
786 self.expect_value(&TRANSACTION_ISOLATION)
787 }
788
789 pub fn real_time_recency(&self) -> bool {
791 *self.expect_value(&REAL_TIME_RECENCY)
792 }
793
794 pub fn real_time_recency_timeout(&self) -> &Duration {
796 self.expect_value(&REAL_TIME_RECENCY_TIMEOUT)
797 }
798
799 pub fn emit_plan_insights_notice(&self) -> bool {
801 *self.expect_value(&EMIT_PLAN_INSIGHTS_NOTICE)
802 }
803
804 pub fn emit_timestamp_notice(&self) -> bool {
806 *self.expect_value(&EMIT_TIMESTAMP_NOTICE)
807 }
808
809 pub fn emit_trace_id_notice(&self) -> bool {
811 *self.expect_value(&EMIT_TRACE_ID_NOTICE)
812 }
813
814 pub fn auto_route_catalog_queries(&self) -> bool {
816 *self.expect_value(&AUTO_ROUTE_CATALOG_QUERIES)
817 }
818
819 pub fn enable_session_rbac_checks(&self) -> bool {
821 *self.expect_value(&ENABLE_SESSION_RBAC_CHECKS)
822 }
823
824 pub fn enable_session_cardinality_estimates(&self) -> bool {
826 *self.expect_value(&ENABLE_SESSION_CARDINALITY_ESTIMATES)
827 }
828
829 pub fn is_superuser(&self) -> bool {
831 self.user.is_superuser()
832 }
833
834 pub fn user(&self) -> &User {
836 &self.user
837 }
838
839 pub fn max_query_result_size(&self) -> u64 {
841 self.expect_value::<ByteSize>(&MAX_QUERY_RESULT_SIZE)
842 .as_bytes()
843 }
844
845 pub fn set_external_user_metadata(&mut self, metadata: ExternalUserMetadata) {
847 self.user.external_metadata = Some(metadata);
848 }
849
850 pub fn set_cluster(&mut self, cluster: String) {
851 let var = self
852 .vars
853 .get_mut(UncasedStr::new(CLUSTER.name()))
854 .expect("cluster variable must exist");
855 var.set(VarInput::Flat(&cluster), false)
856 .expect("setting cluster must succeed");
857 }
858
859 pub fn set_local_transaction_isolation(&mut self, transaction_isolation: IsolationLevel) {
860 let var = self
861 .vars
862 .get_mut(UncasedStr::new(TRANSACTION_ISOLATION.name()))
863 .expect("transaction_isolation variable must exist");
864 var.set(VarInput::Flat(transaction_isolation.as_str()), true)
865 .expect("setting transaction isolation must succeed");
866 }
867
868 pub fn get_statement_logging_sample_rate(&self) -> Numeric {
869 *self.expect_value(&STATEMENT_LOGGING_SAMPLE_RATE)
870 }
871
872 pub fn emit_introspection_query_notice(&self) -> bool {
874 *self.expect_value(&EMIT_INTROSPECTION_QUERY_NOTICE)
875 }
876
877 pub fn unsafe_new_transaction_wall_time(&self) -> Option<CheckedTimestamp<DateTime<Utc>>> {
878 *self.expect_value(&UNSAFE_NEW_TRANSACTION_WALL_TIME)
879 }
880
881 pub fn welcome_message(&self) -> bool {
883 *self.expect_value(&WELCOME_MESSAGE)
884 }
885}
886
887pub const OLD_CATALOG_SERVER_CLUSTER: &str = "mz_introspection";
889pub const OLD_AUTO_ROUTE_CATALOG_QUERIES: &str = "auto_route_introspection_queries";
890
891fn compat_translate<'a, 'b>(name: &'a str, input: VarInput<'b>) -> (&'a str, VarInput<'b>) {
899 if name == CLUSTER.name() {
900 if let Ok(value) = CLUSTER.parse(input) {
901 if value.format() == OLD_CATALOG_SERVER_CLUSTER {
902 tracing::debug!(
903 github_27285 = true,
904 "encountered deprecated `cluster` variable value: {}",
905 OLD_CATALOG_SERVER_CLUSTER,
906 );
907 return (name, VarInput::Flat("mz_catalog_server"));
908 }
909 }
910 }
911
912 if name == OLD_AUTO_ROUTE_CATALOG_QUERIES {
913 tracing::debug!(
914 github_27285 = true,
915 "encountered deprecated `{}` variable name",
916 OLD_AUTO_ROUTE_CATALOG_QUERIES,
917 );
918 return (AUTO_ROUTE_CATALOG_QUERIES.name(), input);
919 }
920
921 (name, input)
922}
923
924fn compat_translate_name(name: &str) -> &str {
925 let (name, _) = compat_translate(name, VarInput::Flat(""));
926 name
927}
928
929#[derive(Debug)]
932pub struct SystemVar {
933 definition: VarDefinition,
934 persisted_value: Option<Box<dyn Value>>,
936 dynamic_default: Option<Box<dyn Value>>,
938}
939
940impl Clone for SystemVar {
941 fn clone(&self) -> Self {
942 SystemVar {
943 definition: self.definition.clone(),
944 persisted_value: self.persisted_value.as_ref().map(|v| v.box_clone()),
945 dynamic_default: self.dynamic_default.as_ref().map(|v| v.box_clone()),
946 }
947 }
948}
949
950impl SystemVar {
951 pub fn new(definition: VarDefinition) -> Self {
952 SystemVar {
953 definition,
954 persisted_value: None,
955 dynamic_default: None,
956 }
957 }
958
959 fn is_default(&self, input: VarInput) -> Result<bool, VarError> {
960 let v = self.definition.parse(input)?;
961 Ok(self.definition.default_value() == v.as_ref())
962 }
963
964 pub fn value_dyn(&self) -> &dyn Value {
965 self.persisted_value
966 .as_deref()
967 .or(self.dynamic_default.as_deref())
968 .unwrap_or_else(|| self.definition.default_value())
969 }
970
971 pub fn value<V: 'static>(&self) -> &V {
972 let val = self.value_dyn();
973 val.as_any().downcast_ref::<V>().expect("success")
974 }
975
976 fn parse(&self, input: VarInput) -> Result<Box<dyn Value>, VarError> {
977 let v = self.definition.parse(input)?;
978 self.validate_constraints(v.as_ref())?;
980 Ok(v)
981 }
982
983 fn set(&mut self, input: VarInput) -> Result<bool, VarError> {
984 let v = self.parse(input)?;
985
986 if self.persisted_value.as_ref() != Some(&v) {
987 self.persisted_value = Some(v);
988 Ok(true)
989 } else {
990 Ok(false)
991 }
992 }
993
994 fn reset(&mut self) -> bool {
995 if self.persisted_value.is_some() {
996 self.persisted_value = None;
997 true
998 } else {
999 false
1000 }
1001 }
1002
1003 fn set_default(&mut self, input: VarInput) -> Result<(), VarError> {
1004 let v = self.definition.parse(input)?;
1005 self.dynamic_default = Some(v);
1006 Ok(())
1007 }
1008
1009 fn validate_constraints(&self, val: &dyn Value) -> Result<(), VarError> {
1010 if let Some(constraint) = &self.definition.constraint {
1011 constraint.check_constraint(self, self.value_dyn(), val)
1012 } else {
1013 Ok(())
1014 }
1015 }
1016}
1017
1018impl Var for SystemVar {
1019 fn name(&self) -> &'static str {
1020 self.definition.name.as_str()
1021 }
1022
1023 fn value(&self) -> String {
1024 self.value_dyn().format()
1025 }
1026
1027 fn description(&self) -> &'static str {
1028 self.definition.description
1029 }
1030
1031 fn type_name(&self) -> Cow<'static, str> {
1032 self.definition.type_name()
1033 }
1034
1035 fn visible(&self, user: &User, system_vars: &SystemVars) -> Result<(), VarError> {
1036 self.definition.visible(user, system_vars)
1037 }
1038}
1039
1040#[derive(Debug, Error)]
1041pub enum NetworkPolicyError {
1042 #[error("Access denied for address {0}")]
1043 AddressDenied(IpAddr),
1044}
1045
1046#[derive(Derivative, Clone)]
1051#[derivative(Debug)]
1052pub struct SystemVars {
1053 allow_unsafe: bool,
1055 vars: BTreeMap<&'static UncasedStr, SystemVar>,
1057 #[derivative(Debug = "ignore")]
1059 callbacks: BTreeMap<String, Vec<Arc<dyn Fn(&SystemVars) + Send + Sync>>>,
1060
1061 dyncfgs: ConfigSet,
1065}
1066
1067impl Default for SystemVars {
1068 fn default() -> Self {
1069 Self::new()
1070 }
1071}
1072
1073impl SystemVars {
1074 pub fn new() -> Self {
1075 let system_vars = vec![
1076 &MAX_KAFKA_CONNECTIONS,
1077 &MAX_POSTGRES_CONNECTIONS,
1078 &MAX_MYSQL_CONNECTIONS,
1079 &MAX_SQL_SERVER_CONNECTIONS,
1080 &MAX_AWS_PRIVATELINK_CONNECTIONS,
1081 &MAX_TABLES,
1082 &MAX_SOURCES,
1083 &MAX_SINKS,
1084 &MAX_MATERIALIZED_VIEWS,
1085 &MAX_CLUSTERS,
1086 &MAX_REPLICAS_PER_CLUSTER,
1087 &MAX_CREDIT_CONSUMPTION_RATE,
1088 &MAX_DATABASES,
1089 &MAX_SCHEMAS_PER_DATABASE,
1090 &MAX_OBJECTS_PER_SCHEMA,
1091 &MAX_SECRETS,
1092 &MAX_ROLES,
1093 &MAX_CONTINUAL_TASKS,
1094 &MAX_NETWORK_POLICIES,
1095 &MAX_RULES_PER_NETWORK_POLICY,
1096 &MAX_RESULT_SIZE,
1097 &MAX_COPY_FROM_SIZE,
1098 &ALLOWED_CLUSTER_REPLICA_SIZES,
1099 &DISK_CLUSTER_REPLICAS_DEFAULT,
1100 &upsert_rocksdb::UPSERT_ROCKSDB_AUTO_SPILL_TO_DISK,
1101 &upsert_rocksdb::UPSERT_ROCKSDB_AUTO_SPILL_THRESHOLD_BYTES,
1102 &upsert_rocksdb::UPSERT_ROCKSDB_COMPACTION_STYLE,
1103 &upsert_rocksdb::UPSERT_ROCKSDB_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET,
1104 &upsert_rocksdb::UPSERT_ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES,
1105 &upsert_rocksdb::UPSERT_ROCKSDB_UNIVERSAL_COMPACTION_RATIO,
1106 &upsert_rocksdb::UPSERT_ROCKSDB_PARALLELISM,
1107 &upsert_rocksdb::UPSERT_ROCKSDB_COMPRESSION_TYPE,
1108 &upsert_rocksdb::UPSERT_ROCKSDB_BOTTOMMOST_COMPRESSION_TYPE,
1109 &upsert_rocksdb::UPSERT_ROCKSDB_BATCH_SIZE,
1110 &upsert_rocksdb::UPSERT_ROCKSDB_RETRY_DURATION,
1111 &upsert_rocksdb::UPSERT_ROCKSDB_STATS_LOG_INTERVAL_SECONDS,
1112 &upsert_rocksdb::UPSERT_ROCKSDB_STATS_PERSIST_INTERVAL_SECONDS,
1113 &upsert_rocksdb::UPSERT_ROCKSDB_POINT_LOOKUP_BLOCK_CACHE_SIZE_MB,
1114 &upsert_rocksdb::UPSERT_ROCKSDB_SHRINK_ALLOCATED_BUFFERS_BY_RATIO,
1115 &upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_CLUSTER_MEMORY_FRACTION,
1116 &upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_MEMORY_BYTES,
1117 &upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_ALLOW_STALL,
1118 &STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES,
1119 &STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_TO_CLUSTER_SIZE_FRACTION,
1120 &STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_DISK_ONLY,
1121 &STORAGE_STATISTICS_INTERVAL,
1122 &STORAGE_STATISTICS_COLLECTION_INTERVAL,
1123 &STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO,
1124 &STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS,
1125 &PERSIST_FAST_PATH_LIMIT,
1126 &METRICS_RETENTION,
1127 &UNSAFE_MOCK_AUDIT_EVENT_TIMESTAMP,
1128 &ENABLE_RBAC_CHECKS,
1129 &PG_SOURCE_CONNECT_TIMEOUT,
1130 &PG_SOURCE_TCP_KEEPALIVES_IDLE,
1131 &PG_SOURCE_TCP_KEEPALIVES_INTERVAL,
1132 &PG_SOURCE_TCP_KEEPALIVES_RETRIES,
1133 &PG_SOURCE_TCP_USER_TIMEOUT,
1134 &PG_SOURCE_TCP_CONFIGURE_SERVER,
1135 &PG_SOURCE_SNAPSHOT_STATEMENT_TIMEOUT,
1136 &PG_SOURCE_WAL_SENDER_TIMEOUT,
1137 &PG_SOURCE_SNAPSHOT_COLLECT_STRICT_COUNT,
1138 &PG_SOURCE_SNAPSHOT_FALLBACK_TO_STRICT_COUNT,
1139 &PG_SOURCE_SNAPSHOT_WAIT_FOR_COUNT,
1140 &MYSQL_SOURCE_TCP_KEEPALIVE,
1141 &MYSQL_SOURCE_SNAPSHOT_MAX_EXECUTION_TIME,
1142 &MYSQL_SOURCE_SNAPSHOT_LOCK_WAIT_TIMEOUT,
1143 &MYSQL_SOURCE_CONNECT_TIMEOUT,
1144 &SSH_CHECK_INTERVAL,
1145 &SSH_CONNECT_TIMEOUT,
1146 &SSH_KEEPALIVES_IDLE,
1147 &KAFKA_SOCKET_KEEPALIVE,
1148 &KAFKA_SOCKET_TIMEOUT,
1149 &KAFKA_TRANSACTION_TIMEOUT,
1150 &KAFKA_SOCKET_CONNECTION_SETUP_TIMEOUT,
1151 &KAFKA_FETCH_METADATA_TIMEOUT,
1152 &KAFKA_PROGRESS_RECORD_FETCH_TIMEOUT,
1153 &ENABLE_LAUNCHDARKLY,
1154 &MAX_CONNECTIONS,
1155 &NETWORK_POLICY,
1156 &SUPERUSER_RESERVED_CONNECTIONS,
1157 &KEEP_N_SOURCE_STATUS_HISTORY_ENTRIES,
1158 &KEEP_N_SINK_STATUS_HISTORY_ENTRIES,
1159 &KEEP_N_PRIVATELINK_STATUS_HISTORY_ENTRIES,
1160 &REPLICA_STATUS_HISTORY_RETENTION_WINDOW,
1161 &ARRANGEMENT_EXERT_PROPORTIONALITY,
1162 &ENABLE_STORAGE_SHARD_FINALIZATION,
1163 &ENABLE_CONSOLIDATE_AFTER_UNION_NEGATE,
1164 &ENABLE_DEFAULT_CONNECTION_VALIDATION,
1165 &ENABLE_REDUCE_REDUCTION,
1166 &MIN_TIMESTAMP_INTERVAL,
1167 &MAX_TIMESTAMP_INTERVAL,
1168 &LOGGING_FILTER,
1169 &OPENTELEMETRY_FILTER,
1170 &LOGGING_FILTER_DEFAULTS,
1171 &OPENTELEMETRY_FILTER_DEFAULTS,
1172 &SENTRY_FILTERS,
1173 &WEBHOOKS_SECRETS_CACHING_TTL_SECS,
1174 &COORD_SLOW_MESSAGE_WARN_THRESHOLD,
1175 &grpc_client::CONNECT_TIMEOUT,
1176 &grpc_client::HTTP2_KEEP_ALIVE_INTERVAL,
1177 &grpc_client::HTTP2_KEEP_ALIVE_TIMEOUT,
1178 &cluster_scheduling::CLUSTER_MULTI_PROCESS_REPLICA_AZ_AFFINITY_WEIGHT,
1179 &cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY,
1180 &cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT,
1181 &cluster_scheduling::CLUSTER_ENABLE_TOPOLOGY_SPREAD,
1182 &cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE,
1183 &cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MAX_SKEW,
1184 &cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_SOFT,
1185 &cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY,
1186 &cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY_WEIGHT,
1187 &cluster_scheduling::CLUSTER_ALWAYS_USE_DISK,
1188 &cluster_scheduling::CLUSTER_ALTER_CHECK_READY_INTERVAL,
1189 &cluster_scheduling::CLUSTER_CHECK_SCHEDULING_POLICIES_INTERVAL,
1190 &cluster_scheduling::CLUSTER_SECURITY_CONTEXT_ENABLED,
1191 &cluster_scheduling::CLUSTER_REFRESH_MV_COMPACTION_ESTIMATE,
1192 &grpc_client::HTTP2_KEEP_ALIVE_TIMEOUT,
1193 &STATEMENT_LOGGING_MAX_SAMPLE_RATE,
1194 &STATEMENT_LOGGING_DEFAULT_SAMPLE_RATE,
1195 &STATEMENT_LOGGING_TARGET_DATA_RATE,
1196 &STATEMENT_LOGGING_MAX_DATA_CREDIT,
1197 &ENABLE_INTERNAL_STATEMENT_LOGGING,
1198 &OPTIMIZER_STATS_TIMEOUT,
1199 &OPTIMIZER_ONESHOT_STATS_TIMEOUT,
1200 &PRIVATELINK_STATUS_UPDATE_QUOTA_PER_MINUTE,
1201 &WEBHOOK_CONCURRENT_REQUEST_LIMIT,
1202 &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_SIZE,
1203 &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_WAIT,
1204 &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL,
1205 &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL_STAGGER,
1206 &USER_STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION,
1207 &FORCE_SOURCE_TABLE_SYNTAX,
1208 &OPTIMIZER_E2E_LATENCY_WARNING_THRESHOLD,
1209 ];
1210
1211 let dyncfgs = mz_dyncfgs::all_dyncfgs();
1212 let dyncfg_vars: Vec<_> = dyncfgs
1213 .entries()
1214 .map(|cfg| match cfg.default() {
1215 ConfigVal::Bool(default) => {
1216 VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
1217 }
1218 ConfigVal::U32(default) => {
1219 VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
1220 }
1221 ConfigVal::Usize(default) => {
1222 VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
1223 }
1224 ConfigVal::OptUsize(default) => {
1225 VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
1226 }
1227 ConfigVal::F64(default) => {
1228 VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
1229 }
1230 ConfigVal::String(default) => {
1231 VarDefinition::new_runtime(cfg.name(), default.clone(), cfg.desc(), false)
1232 }
1233 ConfigVal::Duration(default) => {
1234 VarDefinition::new_runtime(cfg.name(), default.clone(), cfg.desc(), false)
1235 }
1236 ConfigVal::Json(default) => {
1237 VarDefinition::new_runtime(cfg.name(), default.clone(), cfg.desc(), false)
1238 }
1239 })
1240 .collect();
1241
1242 let vars: BTreeMap<_, _> = system_vars
1243 .into_iter()
1244 .chain(definitions::FEATURE_FLAGS.iter().copied())
1246 .chain(SESSION_SYSTEM_VARS.values().copied())
1248 .cloned()
1249 .chain(dyncfg_vars)
1251 .map(|var| (var.name, SystemVar::new(var)))
1252 .collect();
1253
1254 let vars = SystemVars {
1255 vars,
1256 callbacks: BTreeMap::new(),
1257 allow_unsafe: false,
1258 dyncfgs,
1259 };
1260
1261 vars
1262 }
1263
1264 pub fn dyncfgs(&self) -> &ConfigSet {
1265 &self.dyncfgs
1266 }
1267
1268 pub fn set_unsafe(mut self, allow_unsafe: bool) -> Self {
1269 self.allow_unsafe = allow_unsafe;
1270 self
1271 }
1272
1273 pub fn allow_unsafe(&self) -> bool {
1274 self.allow_unsafe
1275 }
1276
1277 fn expect_value<V: 'static>(&self, var: &VarDefinition) -> &V {
1278 let val = self
1279 .vars
1280 .get(var.name)
1281 .expect("provided var should be in state");
1282
1283 val.value_dyn()
1284 .as_any()
1285 .downcast_ref::<V>()
1286 .expect("provided var type should matched stored var")
1287 }
1288
1289 fn expect_config_value<V: ConfigType + 'static>(&self, name: &UncasedStr) -> &V {
1290 let val = self
1291 .vars
1292 .get(name)
1293 .unwrap_or_else(|| panic!("provided var {name} should be in state"));
1294
1295 val.value_dyn()
1296 .as_any()
1297 .downcast_ref()
1298 .expect("provided var type should matched stored var")
1299 }
1300
1301 pub fn reset_all(&mut self) {
1304 for (_, var) in &mut self.vars {
1305 var.reset();
1306 }
1307 }
1308
1309 pub fn iter(&self) -> impl Iterator<Item = &dyn Var> {
1312 self.vars
1313 .values()
1314 .map(|v| v.as_var())
1315 .filter(|v| !SESSION_SYSTEM_VARS.contains_key(UncasedStr::new(v.name())))
1316 }
1317
1318 pub fn iter_synced(&self) -> impl Iterator<Item = &dyn Var> {
1322 self.iter().filter(|v| v.name() != ENABLE_LAUNCHDARKLY.name)
1323 }
1324
1325 pub fn iter_session(&self) -> impl Iterator<Item = &dyn Var> {
1327 self.vars
1328 .values()
1329 .map(|v| v.as_var())
1330 .filter(|v| SESSION_SYSTEM_VARS.contains_key(UncasedStr::new(v.name())))
1331 }
1332
1333 pub fn user_modifiable(&self, name: &str) -> bool {
1335 SESSION_SYSTEM_VARS.contains_key(UncasedStr::new(name))
1336 || name == ENABLE_RBAC_CHECKS.name()
1337 || name == NETWORK_POLICY.name()
1338 }
1339
1340 pub fn get(&self, name: &str) -> Result<&dyn Var, VarError> {
1361 self.vars
1362 .get(UncasedStr::new(name))
1363 .map(|v| v.as_var())
1364 .ok_or_else(|| VarError::UnknownParameter(name.into()))
1365 }
1366
1367 pub fn is_default(&self, name: &str, input: VarInput) -> Result<bool, VarError> {
1381 self.vars
1382 .get(UncasedStr::new(name))
1383 .ok_or_else(|| VarError::UnknownParameter(name.into()))
1384 .and_then(|v| v.is_default(input))
1385 }
1386
1387 pub fn set(&mut self, name: &str, input: VarInput) -> Result<bool, VarError> {
1410 let result = self
1411 .vars
1412 .get_mut(UncasedStr::new(name))
1413 .ok_or_else(|| VarError::UnknownParameter(name.into()))
1414 .and_then(|v| v.set(input))?;
1415 self.notify_callbacks(name);
1416 Ok(result)
1417 }
1418
1419 pub fn parse(&self, name: &str, input: VarInput) -> Result<Box<dyn Value>, VarError> {
1440 self.vars
1441 .get(UncasedStr::new(name))
1442 .ok_or_else(|| VarError::UnknownParameter(name.into()))
1443 .and_then(|v| v.parse(input))
1444 }
1445
1446 pub fn set_default(&mut self, name: &str, input: VarInput) -> Result<(), VarError> {
1454 let result = self
1455 .vars
1456 .get_mut(UncasedStr::new(name))
1457 .ok_or_else(|| VarError::UnknownParameter(name.into()))
1458 .and_then(|v| v.set_default(input))?;
1459 self.notify_callbacks(name);
1460 Ok(result)
1461 }
1462
1463 pub fn reset(&mut self, name: &str) -> Result<bool, VarError> {
1481 let result = self
1482 .vars
1483 .get_mut(UncasedStr::new(name))
1484 .ok_or_else(|| VarError::UnknownParameter(name.into()))
1485 .map(|v| v.reset())?;
1486 self.notify_callbacks(name);
1487 Ok(result)
1488 }
1489
1490 pub fn defaults(&self) -> BTreeMap<String, String> {
1492 self.vars
1493 .iter()
1494 .map(|(name, var)| {
1495 let default = var
1496 .dynamic_default
1497 .as_deref()
1498 .unwrap_or_else(|| var.definition.default_value());
1499 (name.as_str().to_owned(), default.format())
1500 })
1501 .collect()
1502 }
1503
1504 pub fn register_callback(
1509 &mut self,
1510 var: &VarDefinition,
1511 callback: Arc<dyn Fn(&SystemVars) + Send + Sync>,
1512 ) {
1513 self.callbacks
1514 .entry(var.name().to_string())
1515 .or_default()
1516 .push(callback);
1517 self.notify_callbacks(var.name());
1518 }
1519
1520 fn notify_callbacks(&self, name: &str) {
1522 if let Some(callbacks) = self.callbacks.get(name) {
1524 for callback in callbacks {
1525 (callback)(self);
1526 }
1527 }
1528 }
1529
1530 pub fn default_cluster(&self) -> String {
1533 self.expect_value::<String>(&CLUSTER).to_owned()
1534 }
1535
1536 pub fn max_kafka_connections(&self) -> u32 {
1538 *self.expect_value(&MAX_KAFKA_CONNECTIONS)
1539 }
1540
1541 pub fn max_postgres_connections(&self) -> u32 {
1543 *self.expect_value(&MAX_POSTGRES_CONNECTIONS)
1544 }
1545
1546 pub fn max_mysql_connections(&self) -> u32 {
1548 *self.expect_value(&MAX_MYSQL_CONNECTIONS)
1549 }
1550
1551 pub fn max_sql_server_connections(&self) -> u32 {
1553 *self.expect_value(&MAX_SQL_SERVER_CONNECTIONS)
1554 }
1555
1556 pub fn max_aws_privatelink_connections(&self) -> u32 {
1558 *self.expect_value(&MAX_AWS_PRIVATELINK_CONNECTIONS)
1559 }
1560
1561 pub fn max_tables(&self) -> u32 {
1563 *self.expect_value(&MAX_TABLES)
1564 }
1565
1566 pub fn max_sources(&self) -> u32 {
1568 *self.expect_value(&MAX_SOURCES)
1569 }
1570
1571 pub fn max_sinks(&self) -> u32 {
1573 *self.expect_value(&MAX_SINKS)
1574 }
1575
1576 pub fn max_materialized_views(&self) -> u32 {
1578 *self.expect_value(&MAX_MATERIALIZED_VIEWS)
1579 }
1580
1581 pub fn max_clusters(&self) -> u32 {
1583 *self.expect_value(&MAX_CLUSTERS)
1584 }
1585
1586 pub fn max_replicas_per_cluster(&self) -> u32 {
1588 *self.expect_value(&MAX_REPLICAS_PER_CLUSTER)
1589 }
1590
1591 pub fn max_credit_consumption_rate(&self) -> Numeric {
1593 *self.expect_value(&MAX_CREDIT_CONSUMPTION_RATE)
1594 }
1595
1596 pub fn max_databases(&self) -> u32 {
1598 *self.expect_value(&MAX_DATABASES)
1599 }
1600
1601 pub fn max_schemas_per_database(&self) -> u32 {
1603 *self.expect_value(&MAX_SCHEMAS_PER_DATABASE)
1604 }
1605
1606 pub fn max_objects_per_schema(&self) -> u32 {
1608 *self.expect_value(&MAX_OBJECTS_PER_SCHEMA)
1609 }
1610
1611 pub fn max_secrets(&self) -> u32 {
1613 *self.expect_value(&MAX_SECRETS)
1614 }
1615
1616 pub fn max_roles(&self) -> u32 {
1618 *self.expect_value(&MAX_ROLES)
1619 }
1620
1621 pub fn max_continual_tasks(&self) -> u32 {
1623 *self.expect_value(&MAX_CONTINUAL_TASKS)
1624 }
1625
1626 pub fn max_network_policies(&self) -> u32 {
1628 *self.expect_value(&MAX_NETWORK_POLICIES)
1629 }
1630
1631 pub fn max_rules_per_network_policy(&self) -> u32 {
1633 *self.expect_value(&MAX_RULES_PER_NETWORK_POLICY)
1634 }
1635
1636 pub fn max_result_size(&self) -> u64 {
1638 self.expect_value::<ByteSize>(&MAX_RESULT_SIZE).as_bytes()
1639 }
1640
1641 pub fn max_copy_from_size(&self) -> u32 {
1643 *self.expect_value(&MAX_COPY_FROM_SIZE)
1644 }
1645
1646 pub fn allowed_cluster_replica_sizes(&self) -> Vec<String> {
1648 self.expect_value::<Vec<Ident>>(&ALLOWED_CLUSTER_REPLICA_SIZES)
1649 .into_iter()
1650 .map(|s| s.as_str().into())
1651 .collect()
1652 }
1653
1654 pub fn default_cluster_replication_factor(&self) -> u32 {
1656 *self.expect_value::<u32>(&DEFAULT_CLUSTER_REPLICATION_FACTOR)
1657 }
1658
1659 pub fn disk_cluster_replicas_default(&self) -> bool {
1661 *self.expect_value(&DISK_CLUSTER_REPLICAS_DEFAULT)
1662 }
1663
1664 pub fn upsert_rocksdb_auto_spill_to_disk(&self) -> bool {
1665 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_AUTO_SPILL_TO_DISK)
1666 }
1667
1668 pub fn upsert_rocksdb_auto_spill_threshold_bytes(&self) -> usize {
1669 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_AUTO_SPILL_THRESHOLD_BYTES)
1670 }
1671
1672 pub fn upsert_rocksdb_compaction_style(&self) -> mz_rocksdb_types::config::CompactionStyle {
1673 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_COMPACTION_STYLE)
1674 }
1675
1676 pub fn upsert_rocksdb_optimize_compaction_memtable_budget(&self) -> usize {
1677 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET)
1678 }
1679
1680 pub fn upsert_rocksdb_level_compaction_dynamic_level_bytes(&self) -> bool {
1681 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES)
1682 }
1683
1684 pub fn upsert_rocksdb_universal_compaction_ratio(&self) -> i32 {
1685 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_UNIVERSAL_COMPACTION_RATIO)
1686 }
1687
1688 pub fn upsert_rocksdb_parallelism(&self) -> Option<i32> {
1689 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_PARALLELISM)
1690 }
1691
1692 pub fn upsert_rocksdb_compression_type(&self) -> mz_rocksdb_types::config::CompressionType {
1693 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_COMPRESSION_TYPE)
1694 }
1695
1696 pub fn upsert_rocksdb_bottommost_compression_type(
1697 &self,
1698 ) -> mz_rocksdb_types::config::CompressionType {
1699 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_BOTTOMMOST_COMPRESSION_TYPE)
1700 }
1701
1702 pub fn upsert_rocksdb_batch_size(&self) -> usize {
1703 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_BATCH_SIZE)
1704 }
1705
1706 pub fn upsert_rocksdb_retry_duration(&self) -> Duration {
1707 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_RETRY_DURATION)
1708 }
1709
1710 pub fn upsert_rocksdb_stats_log_interval_seconds(&self) -> u32 {
1711 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_STATS_LOG_INTERVAL_SECONDS)
1712 }
1713
1714 pub fn upsert_rocksdb_stats_persist_interval_seconds(&self) -> u32 {
1715 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_STATS_PERSIST_INTERVAL_SECONDS)
1716 }
1717
1718 pub fn upsert_rocksdb_point_lookup_block_cache_size_mb(&self) -> Option<u32> {
1719 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_POINT_LOOKUP_BLOCK_CACHE_SIZE_MB)
1720 }
1721
1722 pub fn upsert_rocksdb_shrink_allocated_buffers_by_ratio(&self) -> usize {
1723 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_SHRINK_ALLOCATED_BUFFERS_BY_RATIO)
1724 }
1725
1726 pub fn upsert_rocksdb_write_buffer_manager_cluster_memory_fraction(&self) -> Option<Numeric> {
1727 *self.expect_value(
1728 &upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_CLUSTER_MEMORY_FRACTION,
1729 )
1730 }
1731
1732 pub fn upsert_rocksdb_write_buffer_manager_memory_bytes(&self) -> Option<usize> {
1733 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_MEMORY_BYTES)
1734 }
1735
1736 pub fn upsert_rocksdb_write_buffer_manager_allow_stall(&self) -> bool {
1737 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_ALLOW_STALL)
1738 }
1739
1740 pub fn persist_fast_path_limit(&self) -> usize {
1741 *self.expect_value(&PERSIST_FAST_PATH_LIMIT)
1742 }
1743
1744 pub fn pg_source_connect_timeout(&self) -> Duration {
1746 *self.expect_value(&PG_SOURCE_CONNECT_TIMEOUT)
1747 }
1748
1749 pub fn pg_source_tcp_keepalives_retries(&self) -> u32 {
1751 *self.expect_value(&PG_SOURCE_TCP_KEEPALIVES_RETRIES)
1752 }
1753
1754 pub fn pg_source_tcp_keepalives_idle(&self) -> Duration {
1756 *self.expect_value(&PG_SOURCE_TCP_KEEPALIVES_IDLE)
1757 }
1758
1759 pub fn pg_source_tcp_keepalives_interval(&self) -> Duration {
1761 *self.expect_value(&PG_SOURCE_TCP_KEEPALIVES_INTERVAL)
1762 }
1763
1764 pub fn pg_source_tcp_user_timeout(&self) -> Duration {
1766 *self.expect_value(&PG_SOURCE_TCP_USER_TIMEOUT)
1767 }
1768
1769 pub fn pg_source_tcp_configure_server(&self) -> bool {
1771 *self.expect_value(&PG_SOURCE_TCP_CONFIGURE_SERVER)
1772 }
1773
1774 pub fn pg_source_snapshot_statement_timeout(&self) -> Duration {
1776 *self.expect_value(&PG_SOURCE_SNAPSHOT_STATEMENT_TIMEOUT)
1777 }
1778
1779 pub fn pg_source_wal_sender_timeout(&self) -> Option<Duration> {
1781 *self.expect_value(&PG_SOURCE_WAL_SENDER_TIMEOUT)
1782 }
1783
1784 pub fn pg_source_snapshot_collect_strict_count(&self) -> bool {
1786 *self.expect_value(&PG_SOURCE_SNAPSHOT_COLLECT_STRICT_COUNT)
1787 }
1788 pub fn pg_source_snapshot_fallback_to_strict_count(&self) -> bool {
1790 *self.expect_value(&PG_SOURCE_SNAPSHOT_FALLBACK_TO_STRICT_COUNT)
1791 }
1792 pub fn pg_source_snapshot_wait_for_count(&self) -> bool {
1794 *self.expect_value(&PG_SOURCE_SNAPSHOT_WAIT_FOR_COUNT)
1795 }
1796
1797 pub fn mysql_source_tcp_keepalive(&self) -> Duration {
1799 *self.expect_value(&MYSQL_SOURCE_TCP_KEEPALIVE)
1800 }
1801
1802 pub fn mysql_source_snapshot_max_execution_time(&self) -> Duration {
1804 *self.expect_value(&MYSQL_SOURCE_SNAPSHOT_MAX_EXECUTION_TIME)
1805 }
1806
1807 pub fn mysql_source_snapshot_lock_wait_timeout(&self) -> Duration {
1809 *self.expect_value(&MYSQL_SOURCE_SNAPSHOT_LOCK_WAIT_TIMEOUT)
1810 }
1811
1812 pub fn mysql_source_connect_timeout(&self) -> Duration {
1814 *self.expect_value(&MYSQL_SOURCE_CONNECT_TIMEOUT)
1815 }
1816
1817 pub fn ssh_check_interval(&self) -> Duration {
1819 *self.expect_value(&SSH_CHECK_INTERVAL)
1820 }
1821
1822 pub fn ssh_connect_timeout(&self) -> Duration {
1824 *self.expect_value(&SSH_CONNECT_TIMEOUT)
1825 }
1826
1827 pub fn ssh_keepalives_idle(&self) -> Duration {
1829 *self.expect_value(&SSH_KEEPALIVES_IDLE)
1830 }
1831
1832 pub fn kafka_socket_keepalive(&self) -> bool {
1834 *self.expect_value(&KAFKA_SOCKET_KEEPALIVE)
1835 }
1836
1837 pub fn kafka_socket_timeout(&self) -> Option<Duration> {
1839 *self.expect_value(&KAFKA_SOCKET_TIMEOUT)
1840 }
1841
1842 pub fn kafka_transaction_timeout(&self) -> Duration {
1844 *self.expect_value(&KAFKA_TRANSACTION_TIMEOUT)
1845 }
1846
1847 pub fn kafka_socket_connection_setup_timeout(&self) -> Duration {
1849 *self.expect_value(&KAFKA_SOCKET_CONNECTION_SETUP_TIMEOUT)
1850 }
1851
1852 pub fn kafka_fetch_metadata_timeout(&self) -> Duration {
1854 *self.expect_value(&KAFKA_FETCH_METADATA_TIMEOUT)
1855 }
1856
1857 pub fn kafka_progress_record_fetch_timeout(&self) -> Option<Duration> {
1859 *self.expect_value(&KAFKA_PROGRESS_RECORD_FETCH_TIMEOUT)
1860 }
1861
1862 pub fn crdb_connect_timeout(&self) -> Duration {
1864 *self.expect_config_value(UncasedStr::new(
1865 mz_persist_client::cfg::CRDB_CONNECT_TIMEOUT.name(),
1866 ))
1867 }
1868
1869 pub fn crdb_tcp_user_timeout(&self) -> Duration {
1871 *self.expect_config_value(UncasedStr::new(
1872 mz_persist_client::cfg::CRDB_TCP_USER_TIMEOUT.name(),
1873 ))
1874 }
1875
1876 pub fn storage_dataflow_max_inflight_bytes(&self) -> Option<usize> {
1878 *self.expect_value(&STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES)
1879 }
1880
1881 pub fn storage_dataflow_max_inflight_bytes_to_cluster_size_fraction(&self) -> Option<Numeric> {
1883 *self.expect_value(&STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_TO_CLUSTER_SIZE_FRACTION)
1884 }
1885
1886 pub fn storage_shrink_upsert_unused_buffers_by_ratio(&self) -> usize {
1888 *self.expect_value(&STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO)
1889 }
1890
1891 pub fn storage_dataflow_max_inflight_bytes_disk_only(&self) -> bool {
1893 *self.expect_value(&STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_DISK_ONLY)
1894 }
1895
1896 pub fn storage_statistics_interval(&self) -> Duration {
1898 *self.expect_value(&STORAGE_STATISTICS_INTERVAL)
1899 }
1900
1901 pub fn storage_statistics_collection_interval(&self) -> Duration {
1903 *self.expect_value(&STORAGE_STATISTICS_COLLECTION_INTERVAL)
1904 }
1905
1906 pub fn storage_record_source_sink_namespaced_errors(&self) -> bool {
1908 *self.expect_value(&STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS)
1909 }
1910
1911 pub fn persist_stats_filter_enabled(&self) -> bool {
1913 *self.expect_config_value(UncasedStr::new(
1914 mz_persist_client::stats::STATS_FILTER_ENABLED.name(),
1915 ))
1916 }
1917
1918 pub fn dyncfg_updates(&self) -> ConfigUpdates {
1919 let mut updates = ConfigUpdates::default();
1920 for entry in self.dyncfgs.entries() {
1921 let name = UncasedStr::new(entry.name());
1922 let val = match entry.val() {
1923 ConfigVal::Bool(_) => ConfigVal::from(*self.expect_config_value::<bool>(name)),
1924 ConfigVal::U32(_) => ConfigVal::from(*self.expect_config_value::<u32>(name)),
1925 ConfigVal::Usize(_) => ConfigVal::from(*self.expect_config_value::<usize>(name)),
1926 ConfigVal::OptUsize(_) => {
1927 ConfigVal::from(*self.expect_config_value::<Option<usize>>(name))
1928 }
1929 ConfigVal::F64(_) => ConfigVal::from(*self.expect_config_value::<f64>(name)),
1930 ConfigVal::String(_) => {
1931 ConfigVal::from(self.expect_config_value::<String>(name).clone())
1932 }
1933 ConfigVal::Duration(_) => {
1934 ConfigVal::from(*self.expect_config_value::<Duration>(name))
1935 }
1936 ConfigVal::Json(_) => {
1937 ConfigVal::from(self.expect_config_value::<serde_json::Value>(name).clone())
1938 }
1939 };
1940 updates.add_dynamic(entry.name(), val);
1941 }
1942 updates.apply(&self.dyncfgs);
1943 updates
1944 }
1945
1946 pub fn metrics_retention(&self) -> Duration {
1948 *self.expect_value(&METRICS_RETENTION)
1949 }
1950
1951 pub fn unsafe_mock_audit_event_timestamp(&self) -> Option<mz_repr::Timestamp> {
1953 *self.expect_value(&UNSAFE_MOCK_AUDIT_EVENT_TIMESTAMP)
1954 }
1955
1956 pub fn enable_rbac_checks(&self) -> bool {
1958 *self.expect_value(&ENABLE_RBAC_CHECKS)
1959 }
1960
1961 pub fn max_connections(&self) -> u32 {
1963 *self.expect_value(&MAX_CONNECTIONS)
1964 }
1965
1966 pub fn default_network_policy_name(&self) -> String {
1967 self.expect_value::<String>(&NETWORK_POLICY).clone()
1968 }
1969
1970 pub fn superuser_reserved_connections(&self) -> u32 {
1972 *self.expect_value(&SUPERUSER_RESERVED_CONNECTIONS)
1973 }
1974
1975 pub fn keep_n_source_status_history_entries(&self) -> usize {
1976 *self.expect_value(&KEEP_N_SOURCE_STATUS_HISTORY_ENTRIES)
1977 }
1978
1979 pub fn keep_n_sink_status_history_entries(&self) -> usize {
1980 *self.expect_value(&KEEP_N_SINK_STATUS_HISTORY_ENTRIES)
1981 }
1982
1983 pub fn keep_n_privatelink_status_history_entries(&self) -> usize {
1984 *self.expect_value(&KEEP_N_PRIVATELINK_STATUS_HISTORY_ENTRIES)
1985 }
1986
1987 pub fn replica_status_history_retention_window(&self) -> Duration {
1988 *self.expect_value(&REPLICA_STATUS_HISTORY_RETENTION_WINDOW)
1989 }
1990
1991 pub fn arrangement_exert_proportionality(&self) -> u32 {
1993 *self.expect_value(&ARRANGEMENT_EXERT_PROPORTIONALITY)
1994 }
1995
1996 pub fn enable_storage_shard_finalization(&self) -> bool {
1998 *self.expect_value(&ENABLE_STORAGE_SHARD_FINALIZATION)
1999 }
2000
2001 pub fn enable_consolidate_after_union_negate(&self) -> bool {
2002 *self.expect_value(&ENABLE_CONSOLIDATE_AFTER_UNION_NEGATE)
2003 }
2004
2005 pub fn enable_reduce_reduction(&self) -> bool {
2006 *self.expect_value(&ENABLE_REDUCE_REDUCTION)
2007 }
2008
2009 pub fn enable_default_connection_validation(&self) -> bool {
2011 *self.expect_value(&ENABLE_DEFAULT_CONNECTION_VALIDATION)
2012 }
2013
2014 pub fn min_timestamp_interval(&self) -> Duration {
2016 *self.expect_value(&MIN_TIMESTAMP_INTERVAL)
2017 }
2018 pub fn max_timestamp_interval(&self) -> Duration {
2020 *self.expect_value(&MAX_TIMESTAMP_INTERVAL)
2021 }
2022
2023 pub fn logging_filter(&self) -> CloneableEnvFilter {
2024 self.expect_value::<CloneableEnvFilter>(&LOGGING_FILTER)
2025 .clone()
2026 }
2027
2028 pub fn opentelemetry_filter(&self) -> CloneableEnvFilter {
2029 self.expect_value::<CloneableEnvFilter>(&OPENTELEMETRY_FILTER)
2030 .clone()
2031 }
2032
2033 pub fn logging_filter_defaults(&self) -> Vec<SerializableDirective> {
2034 self.expect_value::<Vec<SerializableDirective>>(&LOGGING_FILTER_DEFAULTS)
2035 .clone()
2036 }
2037
2038 pub fn opentelemetry_filter_defaults(&self) -> Vec<SerializableDirective> {
2039 self.expect_value::<Vec<SerializableDirective>>(&OPENTELEMETRY_FILTER_DEFAULTS)
2040 .clone()
2041 }
2042
2043 pub fn sentry_filters(&self) -> Vec<SerializableDirective> {
2044 self.expect_value::<Vec<SerializableDirective>>(&SENTRY_FILTERS)
2045 .clone()
2046 }
2047
2048 pub fn webhooks_secrets_caching_ttl_secs(&self) -> usize {
2049 *self.expect_value(&WEBHOOKS_SECRETS_CACHING_TTL_SECS)
2050 }
2051
2052 pub fn coord_slow_message_warn_threshold(&self) -> Duration {
2053 *self.expect_value(&COORD_SLOW_MESSAGE_WARN_THRESHOLD)
2054 }
2055
2056 pub fn grpc_client_http2_keep_alive_interval(&self) -> Duration {
2057 *self.expect_value(&grpc_client::HTTP2_KEEP_ALIVE_INTERVAL)
2058 }
2059
2060 pub fn grpc_client_http2_keep_alive_timeout(&self) -> Duration {
2061 *self.expect_value(&grpc_client::HTTP2_KEEP_ALIVE_TIMEOUT)
2062 }
2063
2064 pub fn grpc_connect_timeout(&self) -> Duration {
2065 *self.expect_value(&grpc_client::CONNECT_TIMEOUT)
2066 }
2067
2068 pub fn cluster_multi_process_replica_az_affinity_weight(&self) -> Option<i32> {
2069 *self.expect_value(&cluster_scheduling::CLUSTER_MULTI_PROCESS_REPLICA_AZ_AFFINITY_WEIGHT)
2070 }
2071
2072 pub fn cluster_soften_replication_anti_affinity(&self) -> bool {
2073 *self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY)
2074 }
2075
2076 pub fn cluster_soften_replication_anti_affinity_weight(&self) -> i32 {
2077 *self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT)
2078 }
2079
2080 pub fn cluster_enable_topology_spread(&self) -> bool {
2081 *self.expect_value(&cluster_scheduling::CLUSTER_ENABLE_TOPOLOGY_SPREAD)
2082 }
2083
2084 pub fn cluster_topology_spread_ignore_non_singular_scale(&self) -> bool {
2085 *self.expect_value(&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE)
2086 }
2087
2088 pub fn cluster_topology_spread_max_skew(&self) -> i32 {
2089 *self.expect_value(&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MAX_SKEW)
2090 }
2091
2092 pub fn cluster_topology_spread_soft(&self) -> bool {
2093 *self.expect_value(&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_SOFT)
2094 }
2095
2096 pub fn cluster_soften_az_affinity(&self) -> bool {
2097 *self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY)
2098 }
2099
2100 pub fn cluster_soften_az_affinity_weight(&self) -> i32 {
2101 *self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY_WEIGHT)
2102 }
2103
2104 pub fn cluster_always_use_disk(&self) -> bool {
2105 *self.expect_value(&cluster_scheduling::CLUSTER_ALWAYS_USE_DISK)
2106 }
2107
2108 pub fn cluster_alter_check_ready_interval(&self) -> Duration {
2109 *self.expect_value(&cluster_scheduling::CLUSTER_ALTER_CHECK_READY_INTERVAL)
2110 }
2111
2112 pub fn cluster_check_scheduling_policies_interval(&self) -> Duration {
2113 *self.expect_value(&cluster_scheduling::CLUSTER_CHECK_SCHEDULING_POLICIES_INTERVAL)
2114 }
2115
2116 pub fn cluster_security_context_enabled(&self) -> bool {
2117 *self.expect_value(&cluster_scheduling::CLUSTER_SECURITY_CONTEXT_ENABLED)
2118 }
2119
2120 pub fn cluster_refresh_mv_compaction_estimate(&self) -> Duration {
2121 *self.expect_value(&cluster_scheduling::CLUSTER_REFRESH_MV_COMPACTION_ESTIMATE)
2122 }
2123
2124 pub fn privatelink_status_update_quota_per_minute(&self) -> u32 {
2126 *self.expect_value(&PRIVATELINK_STATUS_UPDATE_QUOTA_PER_MINUTE)
2127 }
2128
2129 pub fn statement_logging_target_data_rate(&self) -> Option<usize> {
2130 *self.expect_value(&STATEMENT_LOGGING_TARGET_DATA_RATE)
2131 }
2132
2133 pub fn statement_logging_max_data_credit(&self) -> Option<usize> {
2134 *self.expect_value(&STATEMENT_LOGGING_MAX_DATA_CREDIT)
2135 }
2136
2137 pub fn statement_logging_max_sample_rate(&self) -> Numeric {
2139 *self.expect_value(&STATEMENT_LOGGING_MAX_SAMPLE_RATE)
2140 }
2141
2142 pub fn statement_logging_default_sample_rate(&self) -> Numeric {
2144 *self.expect_value(&STATEMENT_LOGGING_DEFAULT_SAMPLE_RATE)
2145 }
2146
2147 pub fn enable_internal_statement_logging(&self) -> bool {
2149 *self.expect_value(&ENABLE_INTERNAL_STATEMENT_LOGGING)
2150 }
2151
2152 pub fn optimizer_stats_timeout(&self) -> Duration {
2154 *self.expect_value(&OPTIMIZER_STATS_TIMEOUT)
2155 }
2156
2157 pub fn optimizer_oneshot_stats_timeout(&self) -> Duration {
2159 *self.expect_value(&OPTIMIZER_ONESHOT_STATS_TIMEOUT)
2160 }
2161
2162 pub fn webhook_concurrent_request_limit(&self) -> usize {
2164 *self.expect_value(&WEBHOOK_CONCURRENT_REQUEST_LIMIT)
2165 }
2166
2167 pub fn pg_timestamp_oracle_connection_pool_max_size(&self) -> usize {
2169 *self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_SIZE)
2170 }
2171
2172 pub fn pg_timestamp_oracle_connection_pool_max_wait(&self) -> Option<Duration> {
2174 *self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_WAIT)
2175 }
2176
2177 pub fn pg_timestamp_oracle_connection_pool_ttl(&self) -> Duration {
2179 *self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL)
2180 }
2181
2182 pub fn pg_timestamp_oracle_connection_pool_ttl_stagger(&self) -> Duration {
2184 *self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL_STAGGER)
2185 }
2186
2187 pub fn user_storage_managed_collections_batch_duration(&self) -> Duration {
2189 *self.expect_value(&USER_STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION)
2190 }
2191
2192 pub fn force_source_table_syntax(&self) -> bool {
2193 *self.expect_value(&FORCE_SOURCE_TABLE_SYNTAX)
2194 }
2195
2196 pub fn optimizer_e2e_latency_warning_threshold(&self) -> Duration {
2197 *self.expect_value(&OPTIMIZER_E2E_LATENCY_WARNING_THRESHOLD)
2198 }
2199
2200 pub fn is_compute_config_var(&self, name: &str) -> bool {
2204 name == MAX_RESULT_SIZE.name() || self.is_dyncfg_var(name) || is_tracing_var(name)
2205 }
2206
2207 pub fn is_metrics_config_var(&self, name: &str) -> bool {
2209 self.is_dyncfg_var(name)
2210 }
2211
2212 pub fn is_storage_config_var(&self, name: &str) -> bool {
2214 name == PG_SOURCE_CONNECT_TIMEOUT.name()
2215 || name == PG_SOURCE_TCP_KEEPALIVES_IDLE.name()
2216 || name == PG_SOURCE_TCP_KEEPALIVES_INTERVAL.name()
2217 || name == PG_SOURCE_TCP_KEEPALIVES_RETRIES.name()
2218 || name == PG_SOURCE_TCP_USER_TIMEOUT.name()
2219 || name == PG_SOURCE_TCP_CONFIGURE_SERVER.name()
2220 || name == PG_SOURCE_SNAPSHOT_STATEMENT_TIMEOUT.name()
2221 || name == PG_SOURCE_WAL_SENDER_TIMEOUT.name()
2222 || name == PG_SOURCE_SNAPSHOT_COLLECT_STRICT_COUNT.name()
2223 || name == PG_SOURCE_SNAPSHOT_FALLBACK_TO_STRICT_COUNT.name()
2224 || name == PG_SOURCE_SNAPSHOT_WAIT_FOR_COUNT.name()
2225 || name == MYSQL_SOURCE_TCP_KEEPALIVE.name()
2226 || name == MYSQL_SOURCE_SNAPSHOT_MAX_EXECUTION_TIME.name()
2227 || name == MYSQL_SOURCE_SNAPSHOT_LOCK_WAIT_TIMEOUT.name()
2228 || name == MYSQL_SOURCE_CONNECT_TIMEOUT.name()
2229 || name == ENABLE_STORAGE_SHARD_FINALIZATION.name()
2230 || name == SSH_CHECK_INTERVAL.name()
2231 || name == SSH_CONNECT_TIMEOUT.name()
2232 || name == SSH_KEEPALIVES_IDLE.name()
2233 || name == KAFKA_SOCKET_KEEPALIVE.name()
2234 || name == KAFKA_SOCKET_TIMEOUT.name()
2235 || name == KAFKA_TRANSACTION_TIMEOUT.name()
2236 || name == KAFKA_SOCKET_CONNECTION_SETUP_TIMEOUT.name()
2237 || name == KAFKA_FETCH_METADATA_TIMEOUT.name()
2238 || name == KAFKA_PROGRESS_RECORD_FETCH_TIMEOUT.name()
2239 || name == STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES.name()
2240 || name == STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_TO_CLUSTER_SIZE_FRACTION.name()
2241 || name == STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_DISK_ONLY.name()
2242 || name == STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO.name()
2243 || name == STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS.name()
2244 || name == STORAGE_STATISTICS_INTERVAL.name()
2245 || name == STORAGE_STATISTICS_COLLECTION_INTERVAL.name()
2246 || name == USER_STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION.name()
2247 || is_upsert_rocksdb_config_var(name)
2248 || self.is_dyncfg_var(name)
2249 || is_tracing_var(name)
2250 }
2251
2252 fn is_dyncfg_var(&self, name: &str) -> bool {
2254 self.dyncfgs.entries().any(|e| name == e.name())
2255 }
2256}
2257
2258pub fn is_tracing_var(name: &str) -> bool {
2259 name == LOGGING_FILTER.name()
2260 || name == LOGGING_FILTER_DEFAULTS.name()
2261 || name == OPENTELEMETRY_FILTER.name()
2262 || name == OPENTELEMETRY_FILTER_DEFAULTS.name()
2263 || name == SENTRY_FILTERS.name()
2264}
2265
2266pub fn is_secrets_caching_var(name: &str) -> bool {
2268 name == WEBHOOKS_SECRETS_CACHING_TTL_SECS.name()
2269}
2270
2271fn is_upsert_rocksdb_config_var(name: &str) -> bool {
2272 name == upsert_rocksdb::UPSERT_ROCKSDB_COMPACTION_STYLE.name()
2273 || name == upsert_rocksdb::UPSERT_ROCKSDB_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET.name()
2274 || name == upsert_rocksdb::UPSERT_ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES.name()
2275 || name == upsert_rocksdb::UPSERT_ROCKSDB_UNIVERSAL_COMPACTION_RATIO.name()
2276 || name == upsert_rocksdb::UPSERT_ROCKSDB_PARALLELISM.name()
2277 || name == upsert_rocksdb::UPSERT_ROCKSDB_COMPRESSION_TYPE.name()
2278 || name == upsert_rocksdb::UPSERT_ROCKSDB_BOTTOMMOST_COMPRESSION_TYPE.name()
2279 || name == upsert_rocksdb::UPSERT_ROCKSDB_BATCH_SIZE.name()
2280 || name == upsert_rocksdb::UPSERT_ROCKSDB_STATS_LOG_INTERVAL_SECONDS.name()
2281 || name == upsert_rocksdb::UPSERT_ROCKSDB_STATS_PERSIST_INTERVAL_SECONDS.name()
2282 || name == upsert_rocksdb::UPSERT_ROCKSDB_POINT_LOOKUP_BLOCK_CACHE_SIZE_MB.name()
2283 || name == upsert_rocksdb::UPSERT_ROCKSDB_SHRINK_ALLOCATED_BUFFERS_BY_RATIO.name()
2284}
2285
2286pub fn is_pg_timestamp_oracle_config_var(name: &str) -> bool {
2289 name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_SIZE.name()
2290 || name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_WAIT.name()
2291 || name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL.name()
2292 || name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL_STAGGER.name()
2293 || name == CRDB_CONNECT_TIMEOUT.name()
2294 || name == CRDB_TCP_USER_TIMEOUT.name()
2295}
2296
2297pub fn is_cluster_scheduling_var(name: &str) -> bool {
2299 name == cluster_scheduling::CLUSTER_MULTI_PROCESS_REPLICA_AZ_AFFINITY_WEIGHT.name()
2300 || name == cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY.name()
2301 || name == cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT.name()
2302 || name == cluster_scheduling::CLUSTER_ENABLE_TOPOLOGY_SPREAD.name()
2303 || name == cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE.name()
2304 || name == cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MAX_SKEW.name()
2305 || name == cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_SOFT.name()
2306 || name == cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY.name()
2307 || name == cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY_WEIGHT.name()
2308 || name == cluster_scheduling::CLUSTER_ALWAYS_USE_DISK.name()
2309}
2310
2311pub fn is_http_config_var(name: &str) -> bool {
2313 name == WEBHOOK_CONCURRENT_REQUEST_LIMIT.name()
2314}
2315
2316static SESSION_SYSTEM_VARS: LazyLock<BTreeMap<&'static UncasedStr, &'static VarDefinition>> =
2320 LazyLock::new(|| {
2321 [
2322 &APPLICATION_NAME,
2323 &CLIENT_ENCODING,
2324 &CLIENT_MIN_MESSAGES,
2325 &CLUSTER,
2326 &CLUSTER_REPLICA,
2327 &DEFAULT_CLUSTER_REPLICATION_FACTOR,
2328 &CURRENT_OBJECT_MISSING_WARNINGS,
2329 &DATABASE,
2330 &DATE_STYLE,
2331 &EXTRA_FLOAT_DIGITS,
2332 &INTEGER_DATETIMES,
2333 &INTERVAL_STYLE,
2334 &REAL_TIME_RECENCY_TIMEOUT,
2335 &SEARCH_PATH,
2336 &STANDARD_CONFORMING_STRINGS,
2337 &STATEMENT_TIMEOUT,
2338 &IDLE_IN_TRANSACTION_SESSION_TIMEOUT,
2339 &TIMEZONE,
2340 &TRANSACTION_ISOLATION,
2341 &MAX_QUERY_RESULT_SIZE,
2342 ]
2343 .into_iter()
2344 .map(|var| (UncasedStr::new(var.name()), var))
2345 .collect()
2346 });
2347
2348#[derive(Debug)]
2351pub struct FeatureFlag {
2352 pub flag: &'static VarDefinition,
2353 pub feature_desc: &'static str,
2354}
2355
2356impl FeatureFlag {
2357 pub fn require(&'static self, system_vars: &SystemVars) -> Result<(), VarError> {
2360 match *system_vars.expect_value::<bool>(self.flag) {
2361 true => Ok(()),
2362 false => Err(VarError::RequiresFeatureFlag { feature_flag: self }),
2363 }
2364 }
2365}
2366
2367impl PartialEq for FeatureFlag {
2368 fn eq(&self, other: &FeatureFlag) -> bool {
2369 self.flag.name() == other.flag.name()
2370 }
2371}
2372
2373impl Eq for FeatureFlag {}
2374
2375impl Var for MzVersion {
2376 fn name(&self) -> &'static str {
2377 MZ_VERSION_NAME.as_str()
2378 }
2379
2380 fn value(&self) -> String {
2381 self.build_info
2382 .human_version(self.helm_chart_version.clone())
2383 }
2384
2385 fn description(&self) -> &'static str {
2386 "Shows the Materialize server version (Materialize)."
2387 }
2388
2389 fn type_name(&self) -> Cow<'static, str> {
2390 String::type_name()
2391 }
2392
2393 fn visible(&self, _: &User, _: &SystemVars) -> Result<(), VarError> {
2394 Ok(())
2395 }
2396}
2397
2398impl Var for User {
2399 fn name(&self) -> &'static str {
2400 IS_SUPERUSER_NAME.as_str()
2401 }
2402
2403 fn value(&self) -> String {
2404 self.is_superuser().format()
2405 }
2406
2407 fn description(&self) -> &'static str {
2408 "Reports whether the current session is a superuser (PostgreSQL)."
2409 }
2410
2411 fn type_name(&self) -> Cow<'static, str> {
2412 bool::type_name()
2413 }
2414
2415 fn visible(&self, _: &User, _: &SystemVars) -> Result<(), VarError> {
2416 Ok(())
2417 }
2418}