1use std::borrow::Cow;
67use std::clone::Clone;
68use std::collections::BTreeMap;
69use std::fmt::Debug;
70use std::net::IpAddr;
71use std::num::NonZeroU32;
72use std::string::ToString;
73use std::sync::{Arc, LazyLock};
74use std::time::Duration;
75
76use chrono::{DateTime, Utc};
77use derivative::Derivative;
78use imbl::OrdMap;
79use mz_adapter_types::dyncfgs::{OIDC_GROUP_CLAIM, OIDC_GROUP_ROLE_SYNC_STRICT};
80use mz_build_info::BuildInfo;
81use mz_dyncfg::{ConfigSet, ConfigType, ConfigUpdates, ConfigVal};
82use mz_persist_client::cfg::{
83 CRDB_CONNECT_TIMEOUT, CRDB_KEEPALIVES_IDLE, CRDB_KEEPALIVES_INTERVAL, CRDB_KEEPALIVES_RETRIES,
84 CRDB_TCP_USER_TIMEOUT,
85};
86use mz_repr::adt::numeric::Numeric;
87use mz_repr::adt::timestamp::CheckedTimestamp;
88use mz_repr::bytes::ByteSize;
89use mz_repr::user::{ExternalUserMetadata, InternalUserMetadata};
90use mz_tracing::{CloneableEnvFilter, SerializableDirective};
91use serde::Serialize;
92use thiserror::Error;
93use uncased::UncasedStr;
94
95use crate::ast::Ident;
96use crate::session::user::User;
97
98pub(crate) mod constraints;
99pub(crate) mod definitions;
100pub(crate) mod errors;
101pub(crate) mod polyfill;
102pub(crate) mod value;
103
104pub use definitions::*;
105pub use errors::*;
106pub use value::*;
107
108#[derive(Debug, Clone, Copy, PartialEq, Eq)]
113pub enum EndTransactionAction {
114 Commit,
116 Rollback,
118}
119
120#[derive(Debug, Clone, Copy)]
126pub enum VarInput<'a> {
127 Flat(&'a str),
129 SqlSet(&'a [String]),
132}
133
134impl<'a> VarInput<'a> {
135 pub fn to_vec(&self) -> Vec<String> {
137 match self {
138 VarInput::Flat(v) => vec![v.to_string()],
139 VarInput::SqlSet(values) => values.into_iter().map(|v| v.to_string()).collect(),
140 }
141 }
142}
143
144#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize)]
146pub enum OwnedVarInput {
147 Flat(String),
149 SqlSet(Vec<String>),
151}
152
153impl OwnedVarInput {
154 pub fn borrow(&self) -> VarInput<'_> {
156 match self {
157 OwnedVarInput::Flat(v) => VarInput::Flat(v),
158 OwnedVarInput::SqlSet(v) => VarInput::SqlSet(v),
159 }
160 }
161}
162
163pub trait Var: Debug {
165 fn name(&self) -> &'static str;
167
168 fn value(&self) -> String;
174
175 fn description(&self) -> &'static str;
178
179 fn type_name(&self) -> Cow<'static, str>;
181
182 fn visible(&self, user: &User, system_vars: &SystemVars) -> Result<(), VarError>;
187
188 fn is_unsafe(&self) -> bool {
190 self.name().starts_with("unsafe_")
191 }
192
193 fn as_var(&self) -> &dyn Var
196 where
197 Self: Sized,
198 {
199 self
200 }
201}
202
203#[derive(Debug)]
210pub struct SessionVar {
211 definition: VarDefinition,
212 default_value: Option<Box<dyn Value>>,
214 local_value: Option<Box<dyn Value>>,
216 staged_value: Option<Box<dyn Value>>,
218 session_value: Option<Box<dyn Value>>,
220}
221
222impl Clone for SessionVar {
223 fn clone(&self) -> Self {
224 SessionVar {
225 definition: self.definition.clone(),
226 default_value: self.default_value.as_ref().map(|v| v.box_clone()),
227 local_value: self.local_value.as_ref().map(|v| v.box_clone()),
228 staged_value: self.staged_value.as_ref().map(|v| v.box_clone()),
229 session_value: self.session_value.as_ref().map(|v| v.box_clone()),
230 }
231 }
232}
233
234impl SessionVar {
235 pub const fn new(var: VarDefinition) -> Self {
236 SessionVar {
237 definition: var,
238 default_value: None,
239 local_value: None,
240 staged_value: None,
241 session_value: None,
242 }
243 }
244
245 pub fn check(&self, input: VarInput) -> Result<String, VarError> {
248 let v = self.definition.parse(input)?;
249 self.validate_constraints(v.as_ref())?;
250
251 Ok(v.format())
252 }
253
254 pub fn set(&mut self, input: VarInput, local: bool) -> Result<(), VarError> {
256 let v = self.definition.parse(input)?;
257
258 self.validate_constraints(v.as_ref())?;
260
261 if local {
262 self.local_value = Some(v);
263 } else {
264 self.local_value = None;
265 self.staged_value = Some(v);
266 }
267 Ok(())
268 }
269
270 pub fn set_default(&mut self, input: VarInput) -> Result<(), VarError> {
272 let v = self.definition.parse(input)?;
273 self.validate_constraints(v.as_ref())?;
274 self.default_value = Some(v);
275 Ok(())
276 }
277
278 pub fn reset(&mut self, local: bool) {
280 let value = self
281 .default_value
282 .as_ref()
283 .map(|v| v.as_ref())
284 .unwrap_or_else(|| self.definition.value.value());
285 if local {
286 self.local_value = Some(value.box_clone());
287 } else {
288 self.local_value = None;
289 self.staged_value = Some(value.box_clone());
290 }
291 }
292
293 #[must_use]
295 pub fn end_transaction(&self, action: EndTransactionAction) -> Option<Self> {
296 if !self.is_mutating() {
297 return None;
298 }
299 let mut next: Self = self.clone();
300 next.local_value = None;
301 match action {
302 EndTransactionAction::Commit if next.staged_value.is_some() => {
303 next.session_value = next.staged_value.take()
304 }
305 _ => next.staged_value = None,
306 }
307 Some(next)
308 }
309
310 pub fn is_mutating(&self) -> bool {
312 self.local_value.is_some() || self.staged_value.is_some()
313 }
314
315 pub fn value_dyn(&self) -> &dyn Value {
316 self.local_value
317 .as_deref()
318 .or(self.staged_value.as_deref())
319 .or(self.session_value.as_deref())
320 .or(self.default_value.as_deref())
321 .unwrap_or_else(|| self.definition.value.value())
322 }
323
324 pub fn inspect_session_value(&self) -> Option<&dyn Value> {
329 self.session_value.as_deref()
330 }
331
332 fn validate_constraints(&self, val: &dyn Value) -> Result<(), VarError> {
333 if let Some(constraint) = &self.definition.constraint {
334 constraint.check_constraint(self, self.value_dyn(), val)
335 } else {
336 Ok(())
337 }
338 }
339}
340
341impl Var for SessionVar {
342 fn name(&self) -> &'static str {
343 self.definition.name.as_str()
344 }
345
346 fn value(&self) -> String {
347 self.value_dyn().format()
348 }
349
350 fn description(&self) -> &'static str {
351 self.definition.description
352 }
353
354 fn type_name(&self) -> Cow<'static, str> {
355 self.definition.type_name()
356 }
357
358 fn visible(
359 &self,
360 user: &User,
361 system_vars: &super::vars::SystemVars,
362 ) -> Result<(), super::vars::VarError> {
363 self.definition.visible(user, system_vars)
364 }
365}
366
367#[derive(Debug, Clone, PartialEq, Eq)]
368pub struct MzVersion {
369 build_info: &'static BuildInfo,
371 helm_chart_version: Option<String>,
373}
374
375impl MzVersion {
376 pub fn new(build_info: &'static BuildInfo, helm_chart_version: Option<String>) -> Self {
377 MzVersion {
378 build_info,
379 helm_chart_version,
380 }
381 }
382}
383
384#[derive(Debug, Clone)]
389pub struct SessionVars {
390 vars: OrdMap<&'static UncasedStr, SessionVar>,
392 mz_version: MzVersion,
394 user: User,
396}
397
398impl SessionVars {
399 pub fn new_unchecked(
401 build_info: &'static BuildInfo,
402 user: User,
403 helm_chart_version: Option<String>,
404 ) -> SessionVars {
405 use definitions::*;
406
407 let vars = [
408 &FAILPOINTS,
409 &SERVER_VERSION,
410 &SERVER_VERSION_NUM,
411 &SQL_SAFE_UPDATES,
412 &REAL_TIME_RECENCY,
413 &EMIT_PLAN_INSIGHTS_NOTICE,
414 &EMIT_TIMESTAMP_NOTICE,
415 &EMIT_TRACE_ID_NOTICE,
416 &AUTO_ROUTE_CATALOG_QUERIES,
417 &ENABLE_SESSION_RBAC_CHECKS,
418 &RESTRICT_TO_USER_OBJECTS,
419 &ENABLE_SESSION_CARDINALITY_ESTIMATES,
420 &MAX_IDENTIFIER_LENGTH,
421 &STATEMENT_LOGGING_SAMPLE_RATE,
422 &EMIT_INTROSPECTION_QUERY_NOTICE,
423 &UNSAFE_NEW_TRANSACTION_WALL_TIME,
424 &WELCOME_MESSAGE,
425 ]
426 .into_iter()
427 .chain(SESSION_SYSTEM_VARS.iter().map(|(_name, var)| *var))
428 .map(|var| (var.name, SessionVar::new(var.clone())))
429 .collect();
430
431 SessionVars {
432 vars,
433 mz_version: MzVersion::new(build_info, helm_chart_version),
434 user,
435 }
436 }
437
438 fn expect_value<V: Value>(&self, var: &VarDefinition) -> &V {
439 let var = self
440 .vars
441 .get(var.name)
442 .expect("provided var should be in state");
443 let val = var.value_dyn();
444 val.as_any().downcast_ref::<V>().expect("success")
445 }
446
447 pub fn iter(&self) -> impl Iterator<Item = &dyn Var> {
454 #[allow(clippy::as_conversions)]
455 self.vars
456 .values()
457 .map(|v| v.as_var())
458 .chain([&self.mz_version as &dyn Var, &self.user])
459 }
460
461 pub fn notify_set(&self) -> impl Iterator<Item = &dyn Var> {
465 [
472 &APPLICATION_NAME,
473 &CLIENT_ENCODING,
474 &DATE_STYLE,
475 &INTEGER_DATETIMES,
476 &SERVER_VERSION,
477 &STANDARD_CONFORMING_STRINGS,
478 &TIMEZONE,
479 &INTERVAL_STYLE,
480 &CLUSTER,
485 &CLUSTER_REPLICA,
486 &DEFAULT_CLUSTER_REPLICATION_FACTOR,
487 &DATABASE,
488 &SEARCH_PATH,
489 ]
490 .into_iter()
491 .map(|v| self.vars[v.name].as_var())
492 .chain(std::iter::once(self.mz_version.as_var()))
499 }
500
501 pub fn reset_all(&mut self) {
503 let names: Vec<_> = self.vars.keys().copied().collect();
504 for name in names {
505 self.vars[name].reset(false);
506 }
507 }
508
509 pub fn get(&self, system_vars: &SystemVars, name: &str) -> Result<&dyn Var, VarError> {
520 let name = compat_translate_name(name);
521
522 let name = UncasedStr::new(name);
523 if name == MZ_VERSION_NAME {
524 Ok(&self.mz_version)
525 } else if name == IS_SUPERUSER_NAME {
526 Ok(&self.user)
527 } else {
528 self.vars
529 .get(name)
530 .map(|v| {
531 v.visible(&self.user, system_vars)?;
532 Ok(v.as_var())
533 })
534 .transpose()?
535 .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
536 }
537 }
538
539 pub fn inspect(&self, name: &str) -> Result<&SessionVar, VarError> {
544 let name = compat_translate_name(name);
545
546 self.vars
547 .get(UncasedStr::new(name))
548 .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
549 }
550
551 pub fn set(
564 &mut self,
565 system_vars: &SystemVars,
566 name: &str,
567 input: VarInput,
568 local: bool,
569 ) -> Result<(), VarError> {
570 let (name, input) = compat_translate(name, input);
571
572 let name = UncasedStr::new(name);
573 self.check_read_only(name)?;
574
575 self.vars
576 .get_mut(name)
577 .map(|v| {
578 v.visible(&self.user, system_vars)?;
579 v.set(input, local)
580 })
581 .transpose()?
582 .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
583 }
584
585 pub fn set_default(&mut self, name: &str, input: VarInput) -> Result<(), VarError> {
588 let (name, input) = compat_translate(name, input);
589
590 let name = UncasedStr::new(name);
591
592 if !Self::allow_role_default(name) {
596 self.check_read_only(name)?;
597 }
598
599 self.vars
600 .get_mut(name)
601 .map(|v| v.set_default(input))
603 .transpose()?
604 .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
605 }
606
607 fn allow_role_default(name: &UncasedStr) -> bool {
615 name == RESTRICT_TO_USER_OBJECTS.name
616 }
617
618 pub fn reset(
632 &mut self,
633 system_vars: &SystemVars,
634 name: &str,
635 local: bool,
636 ) -> Result<(), VarError> {
637 let name = compat_translate_name(name);
638
639 let name = UncasedStr::new(name);
640 self.check_read_only(name)?;
641
642 self.vars
643 .get_mut(name)
644 .map(|v| {
645 v.visible(&self.user, system_vars)?;
646 v.reset(local);
647 Ok(())
648 })
649 .transpose()?
650 .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
651 }
652
653 fn check_read_only(&self, name: &UncasedStr) -> Result<(), VarError> {
660 if name == MZ_VERSION_NAME {
661 Err(VarError::ReadOnlyParameter(MZ_VERSION_NAME.as_str()))
662 } else if name == IS_SUPERUSER_NAME {
663 Err(VarError::ReadOnlyParameter(IS_SUPERUSER_NAME.as_str()))
664 } else if name == MAX_IDENTIFIER_LENGTH.name {
665 Err(VarError::ReadOnlyParameter(
666 MAX_IDENTIFIER_LENGTH.name.as_str(),
667 ))
668 } else if name == RESTRICT_TO_USER_OBJECTS.name {
669 Err(VarError::ReadOnlyParameter(
673 RESTRICT_TO_USER_OBJECTS.name.as_str(),
674 ))
675 } else {
676 Ok(())
677 }
678 }
679
680 #[mz_ore::instrument(level = "debug")]
685 pub fn end_transaction(
686 &mut self,
687 action: EndTransactionAction,
688 ) -> BTreeMap<&'static str, String> {
689 let mut changed = BTreeMap::new();
690 let mut updates = Vec::new();
691 for (name, var) in self.vars.iter() {
692 if !var.is_mutating() {
693 continue;
694 }
695 let before = var.value();
696 let next = var.end_transaction(action).expect("must mutate");
697 let after = next.value();
698 updates.push((*name, next));
699
700 if before != after {
702 changed.insert(var.name(), after);
703 }
704 }
705 self.vars.extend(updates);
706 changed
707 }
708
709 pub fn application_name(&self) -> &str {
711 self.expect_value::<String>(&APPLICATION_NAME).as_str()
712 }
713
714 pub fn build_info(&self) -> &'static BuildInfo {
716 self.mz_version.build_info
717 }
718
719 pub fn client_encoding(&self) -> &ClientEncoding {
721 self.expect_value(&CLIENT_ENCODING)
722 }
723
724 pub fn client_min_messages(&self) -> &ClientSeverity {
726 self.expect_value(&CLIENT_MIN_MESSAGES)
727 }
728
729 pub fn cluster(&self) -> &str {
731 self.expect_value::<String>(&CLUSTER).as_str()
732 }
733
734 pub fn cluster_replica(&self) -> Option<&str> {
736 self.expect_value::<Option<String>>(&CLUSTER_REPLICA)
737 .as_deref()
738 }
739
740 pub fn current_object_missing_warnings(&self) -> bool {
743 *self.expect_value::<bool>(&CURRENT_OBJECT_MISSING_WARNINGS)
744 }
745
746 pub fn date_style(&self) -> &[&str] {
748 &self.expect_value::<DateStyle>(&DATE_STYLE).0
749 }
750
751 pub fn database(&self) -> &str {
753 self.expect_value::<String>(&DATABASE).as_str()
754 }
755
756 pub fn extra_float_digits(&self) -> i32 {
758 *self.expect_value(&EXTRA_FLOAT_DIGITS)
759 }
760
761 pub fn integer_datetimes(&self) -> bool {
763 *self.expect_value(&INTEGER_DATETIMES)
764 }
765
766 pub fn intervalstyle(&self) -> &IntervalStyle {
768 self.expect_value(&INTERVAL_STYLE)
769 }
770
771 pub fn mz_version(&self) -> String {
773 self.mz_version.value()
774 }
775
776 pub fn search_path(&self) -> &[Ident] {
778 self.expect_value::<Vec<Ident>>(&SEARCH_PATH).as_slice()
779 }
780
781 pub fn server_version(&self) -> &str {
783 self.expect_value::<String>(&SERVER_VERSION).as_str()
784 }
785
786 pub fn server_version_num(&self) -> i32 {
788 *self.expect_value(&SERVER_VERSION_NUM)
789 }
790
791 pub fn sql_safe_updates(&self) -> bool {
793 *self.expect_value(&SQL_SAFE_UPDATES)
794 }
795
796 pub fn standard_conforming_strings(&self) -> bool {
799 *self.expect_value(&STANDARD_CONFORMING_STRINGS)
800 }
801
802 pub fn statement_timeout(&self) -> &Duration {
804 self.expect_value(&STATEMENT_TIMEOUT)
805 }
806
807 pub fn idle_in_transaction_session_timeout(&self) -> &Duration {
809 self.expect_value(&IDLE_IN_TRANSACTION_SESSION_TIMEOUT)
810 }
811
812 pub fn timezone(&self) -> &TimeZone {
814 self.expect_value(&TIMEZONE)
815 }
816
817 pub fn transaction_isolation(&self) -> &IsolationLevel {
820 self.expect_value(&TRANSACTION_ISOLATION)
821 }
822
823 pub fn real_time_recency(&self) -> bool {
825 *self.expect_value(&REAL_TIME_RECENCY)
826 }
827
828 pub fn real_time_recency_timeout(&self) -> &Duration {
830 self.expect_value(&REAL_TIME_RECENCY_TIMEOUT)
831 }
832
833 pub fn emit_plan_insights_notice(&self) -> bool {
835 *self.expect_value(&EMIT_PLAN_INSIGHTS_NOTICE)
836 }
837
838 pub fn emit_timestamp_notice(&self) -> bool {
840 *self.expect_value(&EMIT_TIMESTAMP_NOTICE)
841 }
842
843 pub fn emit_trace_id_notice(&self) -> bool {
845 *self.expect_value(&EMIT_TRACE_ID_NOTICE)
846 }
847
848 pub fn auto_route_catalog_queries(&self) -> bool {
850 *self.expect_value(&AUTO_ROUTE_CATALOG_QUERIES)
851 }
852
853 pub fn enable_session_rbac_checks(&self) -> bool {
855 *self.expect_value(&ENABLE_SESSION_RBAC_CHECKS)
856 }
857
858 pub fn restrict_to_user_objects(&self) -> bool {
860 *self.expect_value(&RESTRICT_TO_USER_OBJECTS)
861 }
862
863 pub fn enable_session_cardinality_estimates(&self) -> bool {
865 *self.expect_value(&ENABLE_SESSION_CARDINALITY_ESTIMATES)
866 }
867
868 pub fn is_superuser(&self) -> bool {
870 self.user.is_superuser()
871 }
872
873 pub fn user(&self) -> &User {
875 &self.user
876 }
877
878 pub fn max_query_result_size(&self) -> u64 {
880 self.expect_value::<ByteSize>(&MAX_QUERY_RESULT_SIZE)
881 .as_bytes()
882 }
883
884 pub fn set_internal_user_metadata(&mut self, metadata: InternalUserMetadata) {
886 self.user.internal_metadata = Some(metadata);
887 }
888
889 pub fn set_external_user_metadata(&mut self, metadata: ExternalUserMetadata) {
891 self.user.external_metadata = Some(metadata);
892 }
893
894 pub fn set_cluster(&mut self, cluster: String) {
895 let var = self
896 .vars
897 .get_mut(UncasedStr::new(CLUSTER.name()))
898 .expect("cluster variable must exist");
899 var.set(VarInput::Flat(&cluster), false)
900 .expect("setting cluster must succeed");
901 }
902
903 pub fn set_local_transaction_isolation(&mut self, transaction_isolation: IsolationLevel) {
904 let var = self
905 .vars
906 .get_mut(UncasedStr::new(TRANSACTION_ISOLATION.name()))
907 .expect("transaction_isolation variable must exist");
908 var.set(VarInput::Flat(transaction_isolation.as_str()), true)
909 .expect("setting transaction isolation must succeed");
910 }
911
912 pub fn get_statement_logging_sample_rate(&self) -> Numeric {
913 *self.expect_value(&STATEMENT_LOGGING_SAMPLE_RATE)
914 }
915
916 pub fn emit_introspection_query_notice(&self) -> bool {
918 *self.expect_value(&EMIT_INTROSPECTION_QUERY_NOTICE)
919 }
920
921 pub fn unsafe_new_transaction_wall_time(&self) -> Option<CheckedTimestamp<DateTime<Utc>>> {
922 *self.expect_value(&UNSAFE_NEW_TRANSACTION_WALL_TIME)
923 }
924
925 pub fn welcome_message(&self) -> bool {
927 *self.expect_value(&WELCOME_MESSAGE)
928 }
929}
930
931pub const OLD_CATALOG_SERVER_CLUSTER: &str = "mz_introspection";
933pub const OLD_AUTO_ROUTE_CATALOG_QUERIES: &str = "auto_route_introspection_queries";
934
935fn compat_translate<'a, 'b>(name: &'a str, input: VarInput<'b>) -> (&'a str, VarInput<'b>) {
943 if name == CLUSTER.name() {
944 if let Ok(value) = CLUSTER.parse(input) {
945 if value.format() == OLD_CATALOG_SERVER_CLUSTER {
946 tracing::debug!(
947 github_27285 = true,
948 "encountered deprecated `cluster` variable value: {}",
949 OLD_CATALOG_SERVER_CLUSTER,
950 );
951 return (name, VarInput::Flat("mz_catalog_server"));
952 }
953 }
954 }
955
956 if name == OLD_AUTO_ROUTE_CATALOG_QUERIES {
957 tracing::debug!(
958 github_27285 = true,
959 "encountered deprecated `{}` variable name",
960 OLD_AUTO_ROUTE_CATALOG_QUERIES,
961 );
962 return (AUTO_ROUTE_CATALOG_QUERIES.name(), input);
963 }
964
965 (name, input)
966}
967
968fn compat_translate_name(name: &str) -> &str {
969 let (name, _) = compat_translate(name, VarInput::Flat(""));
970 name
971}
972
973#[derive(Debug)]
976pub struct SystemVar {
977 definition: VarDefinition,
978 persisted_value: Option<Box<dyn Value>>,
980 dynamic_default: Option<Box<dyn Value>>,
982}
983
984impl Clone for SystemVar {
985 fn clone(&self) -> Self {
986 SystemVar {
987 definition: self.definition.clone(),
988 persisted_value: self.persisted_value.as_ref().map(|v| v.box_clone()),
989 dynamic_default: self.dynamic_default.as_ref().map(|v| v.box_clone()),
990 }
991 }
992}
993
994impl SystemVar {
995 pub fn new(definition: VarDefinition) -> Self {
996 SystemVar {
997 definition,
998 persisted_value: None,
999 dynamic_default: None,
1000 }
1001 }
1002
1003 fn is_default(&self, input: VarInput) -> Result<bool, VarError> {
1004 let v = self.definition.parse(input)?;
1005 Ok(self.definition.default_value() == v.as_ref())
1006 }
1007
1008 pub fn value_dyn(&self) -> &dyn Value {
1009 self.persisted_value
1010 .as_deref()
1011 .or(self.dynamic_default.as_deref())
1012 .unwrap_or_else(|| self.definition.default_value())
1013 }
1014
1015 pub fn value<V: 'static>(&self) -> &V {
1016 let val = self.value_dyn();
1017 val.as_any().downcast_ref::<V>().expect("success")
1018 }
1019
1020 fn parse(&self, input: VarInput) -> Result<Box<dyn Value>, VarError> {
1021 let v = self.definition.parse(input)?;
1022 self.validate_constraints(v.as_ref())?;
1024 Ok(v)
1025 }
1026
1027 fn set(&mut self, input: VarInput) -> Result<bool, VarError> {
1028 let v = self.parse(input)?;
1029
1030 if self.persisted_value.as_ref() != Some(&v) {
1031 self.persisted_value = Some(v);
1032 Ok(true)
1033 } else {
1034 Ok(false)
1035 }
1036 }
1037
1038 fn reset(&mut self) -> bool {
1039 if self.persisted_value.is_some() {
1040 self.persisted_value = None;
1041 true
1042 } else {
1043 false
1044 }
1045 }
1046
1047 fn set_default(&mut self, input: VarInput) -> Result<(), VarError> {
1048 let v = self.definition.parse(input)?;
1049 self.dynamic_default = Some(v);
1050 Ok(())
1051 }
1052
1053 fn validate_constraints(&self, val: &dyn Value) -> Result<(), VarError> {
1054 if let Some(constraint) = &self.definition.constraint {
1055 constraint.check_constraint(self, self.value_dyn(), val)
1056 } else {
1057 Ok(())
1058 }
1059 }
1060}
1061
1062impl Var for SystemVar {
1063 fn name(&self) -> &'static str {
1064 self.definition.name.as_str()
1065 }
1066
1067 fn value(&self) -> String {
1068 self.value_dyn().format()
1069 }
1070
1071 fn description(&self) -> &'static str {
1072 self.definition.description
1073 }
1074
1075 fn type_name(&self) -> Cow<'static, str> {
1076 self.definition.type_name()
1077 }
1078
1079 fn visible(&self, user: &User, system_vars: &SystemVars) -> Result<(), VarError> {
1080 self.definition.visible(user, system_vars)
1081 }
1082}
1083
1084#[derive(Debug, Error)]
1085pub enum NetworkPolicyError {
1086 #[error("Access denied for address {0}")]
1087 AddressDenied(IpAddr),
1088}
1089
1090#[derive(Derivative, Clone)]
1095#[derivative(Debug)]
1096pub struct SystemVars {
1097 allow_unsafe: bool,
1099 vars: BTreeMap<&'static UncasedStr, SystemVar>,
1101 #[derivative(Debug = "ignore")]
1103 callbacks: BTreeMap<String, Vec<Arc<dyn Fn(&SystemVars) + Send + Sync>>>,
1104
1105 dyncfgs: ConfigSet,
1109}
1110
1111impl Default for SystemVars {
1112 fn default() -> Self {
1113 Self::new()
1114 }
1115}
1116
1117impl SystemVars {
1118 pub fn new() -> Self {
1119 let system_vars = vec![
1120 &MAX_KAFKA_CONNECTIONS,
1121 &MAX_POSTGRES_CONNECTIONS,
1122 &MAX_MYSQL_CONNECTIONS,
1123 &MAX_SQL_SERVER_CONNECTIONS,
1124 &MAX_AWS_PRIVATELINK_CONNECTIONS,
1125 &MAX_TABLES,
1126 &MAX_SOURCES,
1127 &MAX_SINKS,
1128 &MAX_MATERIALIZED_VIEWS,
1129 &MAX_CLUSTERS,
1130 &MAX_REPLICAS_PER_CLUSTER,
1131 &MAX_CREDIT_CONSUMPTION_RATE,
1132 &MAX_DATABASES,
1133 &MAX_SCHEMAS_PER_DATABASE,
1134 &MAX_OBJECTS_PER_SCHEMA,
1135 &MAX_SECRETS,
1136 &MAX_ROLES,
1137 &MAX_NETWORK_POLICIES,
1138 &MAX_RULES_PER_NETWORK_POLICY,
1139 &MAX_RESULT_SIZE,
1140 &MAX_COPY_FROM_ROW_SIZE,
1141 &ALLOWED_CLUSTER_REPLICA_SIZES,
1142 &upsert_rocksdb::UPSERT_ROCKSDB_COMPACTION_STYLE,
1143 &upsert_rocksdb::UPSERT_ROCKSDB_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET,
1144 &upsert_rocksdb::UPSERT_ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES,
1145 &upsert_rocksdb::UPSERT_ROCKSDB_UNIVERSAL_COMPACTION_RATIO,
1146 &upsert_rocksdb::UPSERT_ROCKSDB_PARALLELISM,
1147 &upsert_rocksdb::UPSERT_ROCKSDB_COMPRESSION_TYPE,
1148 &upsert_rocksdb::UPSERT_ROCKSDB_BOTTOMMOST_COMPRESSION_TYPE,
1149 &upsert_rocksdb::UPSERT_ROCKSDB_BATCH_SIZE,
1150 &upsert_rocksdb::UPSERT_ROCKSDB_RETRY_DURATION,
1151 &upsert_rocksdb::UPSERT_ROCKSDB_STATS_LOG_INTERVAL_SECONDS,
1152 &upsert_rocksdb::UPSERT_ROCKSDB_STATS_PERSIST_INTERVAL_SECONDS,
1153 &upsert_rocksdb::UPSERT_ROCKSDB_POINT_LOOKUP_BLOCK_CACHE_SIZE_MB,
1154 &upsert_rocksdb::UPSERT_ROCKSDB_SHRINK_ALLOCATED_BUFFERS_BY_RATIO,
1155 &upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_CLUSTER_MEMORY_FRACTION,
1156 &upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_MEMORY_BYTES,
1157 &upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_ALLOW_STALL,
1158 &STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES,
1159 &STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_TO_CLUSTER_SIZE_FRACTION,
1160 &STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_DISK_ONLY,
1161 &STORAGE_STATISTICS_INTERVAL,
1162 &STORAGE_STATISTICS_COLLECTION_INTERVAL,
1163 &STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO,
1164 &STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS,
1165 &PERSIST_FAST_PATH_LIMIT,
1166 &METRICS_RETENTION,
1167 &UNSAFE_MOCK_AUDIT_EVENT_TIMESTAMP,
1168 &ENABLE_RBAC_CHECKS,
1169 &PG_SOURCE_CONNECT_TIMEOUT,
1170 &PG_SOURCE_TCP_KEEPALIVES_IDLE,
1171 &PG_SOURCE_TCP_KEEPALIVES_INTERVAL,
1172 &PG_SOURCE_TCP_KEEPALIVES_RETRIES,
1173 &PG_SOURCE_TCP_USER_TIMEOUT,
1174 &PG_SOURCE_TCP_CONFIGURE_SERVER,
1175 &PG_SOURCE_SNAPSHOT_STATEMENT_TIMEOUT,
1176 &PG_SOURCE_WAL_SENDER_TIMEOUT,
1177 &PG_SOURCE_SNAPSHOT_COLLECT_STRICT_COUNT,
1178 &MYSQL_SOURCE_TCP_KEEPALIVE,
1179 &MYSQL_SOURCE_SNAPSHOT_MAX_EXECUTION_TIME,
1180 &MYSQL_SOURCE_SNAPSHOT_LOCK_WAIT_TIMEOUT,
1181 &MYSQL_SOURCE_CONNECT_TIMEOUT,
1182 &SSH_CHECK_INTERVAL,
1183 &SSH_CONNECT_TIMEOUT,
1184 &SSH_KEEPALIVES_IDLE,
1185 &KAFKA_SOCKET_KEEPALIVE,
1186 &KAFKA_SOCKET_TIMEOUT,
1187 &KAFKA_TRANSACTION_TIMEOUT,
1188 &KAFKA_SOCKET_CONNECTION_SETUP_TIMEOUT,
1189 &KAFKA_FETCH_METADATA_TIMEOUT,
1190 &KAFKA_PROGRESS_RECORD_FETCH_TIMEOUT,
1191 &ENABLE_LAUNCHDARKLY,
1192 &MAX_CONNECTIONS,
1193 &NETWORK_POLICY,
1194 &SUPERUSER_RESERVED_CONNECTIONS,
1195 &KEEP_N_SOURCE_STATUS_HISTORY_ENTRIES,
1196 &KEEP_N_SINK_STATUS_HISTORY_ENTRIES,
1197 &KEEP_N_PRIVATELINK_STATUS_HISTORY_ENTRIES,
1198 &REPLICA_STATUS_HISTORY_RETENTION_WINDOW,
1199 &ENABLE_STORAGE_SHARD_FINALIZATION,
1200 &ENABLE_DEFAULT_CONNECTION_VALIDATION,
1201 &DEFAULT_TIMESTAMP_INTERVAL,
1202 &MIN_TIMESTAMP_INTERVAL,
1203 &MAX_TIMESTAMP_INTERVAL,
1204 &LOGGING_FILTER,
1205 &OPENTELEMETRY_FILTER,
1206 &LOGGING_FILTER_DEFAULTS,
1207 &OPENTELEMETRY_FILTER_DEFAULTS,
1208 &SENTRY_FILTERS,
1209 &WEBHOOKS_SECRETS_CACHING_TTL_SECS,
1210 &COORD_SLOW_MESSAGE_WARN_THRESHOLD,
1211 &grpc_client::CONNECT_TIMEOUT,
1212 &grpc_client::HTTP2_KEEP_ALIVE_INTERVAL,
1213 &grpc_client::HTTP2_KEEP_ALIVE_TIMEOUT,
1214 &cluster_scheduling::CLUSTER_MULTI_PROCESS_REPLICA_AZ_AFFINITY_WEIGHT,
1215 &cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY,
1216 &cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT,
1217 &cluster_scheduling::CLUSTER_ENABLE_TOPOLOGY_SPREAD,
1218 &cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE,
1219 &cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MAX_SKEW,
1220 &cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MIN_DOMAINS,
1221 &cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_SOFT,
1222 &cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY,
1223 &cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY_WEIGHT,
1224 &cluster_scheduling::CLUSTER_ALTER_CHECK_READY_INTERVAL,
1225 &cluster_scheduling::CLUSTER_CHECK_SCHEDULING_POLICIES_INTERVAL,
1226 &cluster_scheduling::CLUSTER_SECURITY_CONTEXT_ENABLED,
1227 &cluster_scheduling::CLUSTER_REFRESH_MV_COMPACTION_ESTIMATE,
1228 &grpc_client::HTTP2_KEEP_ALIVE_TIMEOUT,
1229 &STATEMENT_LOGGING_MAX_SAMPLE_RATE,
1230 &STATEMENT_LOGGING_DEFAULT_SAMPLE_RATE,
1231 &STATEMENT_LOGGING_TARGET_DATA_RATE,
1232 &STATEMENT_LOGGING_MAX_DATA_CREDIT,
1233 &ENABLE_INTERNAL_STATEMENT_LOGGING,
1234 &OPTIMIZER_STATS_TIMEOUT,
1235 &OPTIMIZER_ONESHOT_STATS_TIMEOUT,
1236 &PRIVATELINK_STATUS_UPDATE_QUOTA_PER_MINUTE,
1237 &WEBHOOK_CONCURRENT_REQUEST_LIMIT,
1238 &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_SIZE,
1239 &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_WAIT,
1240 &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL,
1241 &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL_STAGGER,
1242 &USER_STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION,
1243 &FORCE_SOURCE_TABLE_SYNTAX,
1244 &OPTIMIZER_E2E_LATENCY_WARNING_THRESHOLD,
1245 &SCRAM_ITERATIONS,
1246 ];
1247
1248 let dyncfgs = mz_dyncfgs::all_dyncfgs();
1249 let dyncfg_vars: Vec<_> = dyncfgs
1250 .entries()
1251 .map(|cfg| {
1252 let user_visible = cfg.name() == OIDC_GROUP_CLAIM.name()
1255 || cfg.name() == OIDC_GROUP_ROLE_SYNC_STRICT.name();
1256 match cfg.default() {
1257 ConfigVal::Bool(default) => {
1258 VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), user_visible)
1259 }
1260 ConfigVal::U32(default) => {
1261 VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), user_visible)
1262 }
1263 ConfigVal::Usize(default) => {
1264 VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), user_visible)
1265 }
1266 ConfigVal::OptUsize(default) => {
1267 VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), user_visible)
1268 }
1269 ConfigVal::F64(default) => {
1270 VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), user_visible)
1271 }
1272 ConfigVal::String(default) => VarDefinition::new_runtime(
1273 cfg.name(),
1274 default.clone(),
1275 cfg.desc(),
1276 user_visible,
1277 ),
1278 ConfigVal::OptString(default) => VarDefinition::new_runtime(
1279 cfg.name(),
1280 default.clone(),
1281 cfg.desc(),
1282 user_visible,
1283 ),
1284 ConfigVal::Duration(default) => VarDefinition::new_runtime(
1285 cfg.name(),
1286 default.clone(),
1287 cfg.desc(),
1288 user_visible,
1289 ),
1290 ConfigVal::Json(default) => VarDefinition::new_runtime(
1291 cfg.name(),
1292 default.clone(),
1293 cfg.desc(),
1294 user_visible,
1295 ),
1296 }
1297 })
1298 .collect();
1299
1300 let vars: BTreeMap<_, _> = system_vars
1301 .into_iter()
1302 .chain(definitions::FEATURE_FLAGS.iter().copied())
1304 .chain(SESSION_SYSTEM_VARS.values().copied())
1306 .cloned()
1307 .chain(dyncfg_vars)
1309 .map(|var| (var.name, SystemVar::new(var)))
1310 .collect();
1311
1312 let vars = SystemVars {
1313 vars,
1314 callbacks: BTreeMap::new(),
1315 allow_unsafe: false,
1316 dyncfgs,
1317 };
1318
1319 vars
1320 }
1321
1322 pub fn dyncfgs(&self) -> &ConfigSet {
1323 &self.dyncfgs
1324 }
1325
1326 pub fn set_unsafe(mut self, allow_unsafe: bool) -> Self {
1327 self.allow_unsafe = allow_unsafe;
1328 self
1329 }
1330
1331 pub fn allow_unsafe(&self) -> bool {
1332 self.allow_unsafe
1333 }
1334
1335 fn expect_value<V: 'static>(&self, var: &VarDefinition) -> &V {
1336 let val = self
1337 .vars
1338 .get(var.name)
1339 .expect("provided var should be in state");
1340
1341 val.value_dyn()
1342 .as_any()
1343 .downcast_ref::<V>()
1344 .expect("provided var type should matched stored var")
1345 }
1346
1347 fn expect_config_value<V: ConfigType + 'static>(&self, name: &UncasedStr) -> &V {
1348 let val = self
1349 .vars
1350 .get(name)
1351 .unwrap_or_else(|| panic!("provided var {name} should be in state"));
1352
1353 val.value_dyn()
1354 .as_any()
1355 .downcast_ref()
1356 .expect("provided var type should matched stored var")
1357 }
1358
1359 pub fn reset_all(&mut self) {
1362 for (_, var) in &mut self.vars {
1363 var.reset();
1364 }
1365 }
1366
1367 pub fn iter(&self) -> impl Iterator<Item = &dyn Var> {
1370 self.vars
1371 .values()
1372 .map(|v| v.as_var())
1373 .filter(|v| !SESSION_SYSTEM_VARS.contains_key(UncasedStr::new(v.name())))
1374 }
1375
1376 pub fn iter_synced(&self) -> impl Iterator<Item = &dyn Var> {
1380 self.iter().filter(|v| v.name() != ENABLE_LAUNCHDARKLY.name)
1381 }
1382
1383 pub fn iter_session(&self) -> impl Iterator<Item = &dyn Var> {
1385 self.vars
1386 .values()
1387 .map(|v| v.as_var())
1388 .filter(|v| SESSION_SYSTEM_VARS.contains_key(UncasedStr::new(v.name())))
1389 }
1390
1391 pub fn user_modifiable(&self, name: &str) -> bool {
1393 SESSION_SYSTEM_VARS.contains_key(UncasedStr::new(name))
1394 || name == ENABLE_RBAC_CHECKS.name()
1395 || name == NETWORK_POLICY.name()
1396 || name == OIDC_GROUP_CLAIM.name()
1397 || name == OIDC_GROUP_ROLE_SYNC_STRICT.name()
1398 }
1399
1400 pub fn get(&self, name: &str) -> Result<&dyn Var, VarError> {
1421 self.vars
1422 .get(UncasedStr::new(name))
1423 .map(|v| v.as_var())
1424 .ok_or_else(|| VarError::UnknownParameter(name.into()))
1425 }
1426
1427 pub fn is_default(&self, name: &str, input: VarInput) -> Result<bool, VarError> {
1441 self.vars
1442 .get(UncasedStr::new(name))
1443 .ok_or_else(|| VarError::UnknownParameter(name.into()))
1444 .and_then(|v| v.is_default(input))
1445 }
1446
1447 pub fn set(&mut self, name: &str, input: VarInput) -> Result<bool, VarError> {
1470 let result = self
1471 .vars
1472 .get_mut(UncasedStr::new(name))
1473 .ok_or_else(|| VarError::UnknownParameter(name.into()))
1474 .and_then(|v| v.set(input))?;
1475 self.notify_callbacks(name);
1476 Ok(result)
1477 }
1478
1479 pub fn parse(&self, name: &str, input: VarInput) -> Result<Box<dyn Value>, VarError> {
1500 self.vars
1501 .get(UncasedStr::new(name))
1502 .ok_or_else(|| VarError::UnknownParameter(name.into()))
1503 .and_then(|v| v.parse(input))
1504 }
1505
1506 pub fn set_default(&mut self, name: &str, input: VarInput) -> Result<(), VarError> {
1514 self.vars
1515 .get_mut(UncasedStr::new(name))
1516 .ok_or_else(|| VarError::UnknownParameter(name.into()))
1517 .and_then(|v| v.set_default(input))?;
1518 self.notify_callbacks(name);
1519 Ok(())
1520 }
1521
1522 pub fn reset(&mut self, name: &str) -> Result<bool, VarError> {
1540 let result = self
1541 .vars
1542 .get_mut(UncasedStr::new(name))
1543 .ok_or_else(|| VarError::UnknownParameter(name.into()))
1544 .map(|v| v.reset())?;
1545 self.notify_callbacks(name);
1546 Ok(result)
1547 }
1548
1549 pub fn defaults(&self) -> BTreeMap<String, String> {
1551 self.vars
1552 .iter()
1553 .map(|(name, var)| {
1554 let default = var
1555 .dynamic_default
1556 .as_deref()
1557 .unwrap_or_else(|| var.definition.default_value());
1558 (name.as_str().to_owned(), default.format())
1559 })
1560 .collect()
1561 }
1562
1563 pub fn register_callback(
1568 &mut self,
1569 var: &VarDefinition,
1570 callback: Arc<dyn Fn(&SystemVars) + Send + Sync>,
1571 ) {
1572 self.callbacks
1573 .entry(var.name().to_string())
1574 .or_default()
1575 .push(callback);
1576 self.notify_callbacks(var.name());
1577 }
1578
1579 fn notify_callbacks(&self, name: &str) {
1581 if let Some(callbacks) = self.callbacks.get(name) {
1583 for callback in callbacks {
1584 (callback)(self);
1585 }
1586 }
1587 }
1588
1589 pub fn default_cluster(&self) -> String {
1592 self.expect_value::<String>(&CLUSTER).to_owned()
1593 }
1594
1595 pub fn max_kafka_connections(&self) -> u32 {
1597 *self.expect_value(&MAX_KAFKA_CONNECTIONS)
1598 }
1599
1600 pub fn max_postgres_connections(&self) -> u32 {
1602 *self.expect_value(&MAX_POSTGRES_CONNECTIONS)
1603 }
1604
1605 pub fn max_mysql_connections(&self) -> u32 {
1607 *self.expect_value(&MAX_MYSQL_CONNECTIONS)
1608 }
1609
1610 pub fn max_sql_server_connections(&self) -> u32 {
1612 *self.expect_value(&MAX_SQL_SERVER_CONNECTIONS)
1613 }
1614
1615 pub fn max_aws_privatelink_connections(&self) -> u32 {
1617 *self.expect_value(&MAX_AWS_PRIVATELINK_CONNECTIONS)
1618 }
1619
1620 pub fn max_tables(&self) -> u32 {
1622 *self.expect_value(&MAX_TABLES)
1623 }
1624
1625 pub fn max_sources(&self) -> u32 {
1627 *self.expect_value(&MAX_SOURCES)
1628 }
1629
1630 pub fn max_sinks(&self) -> u32 {
1632 *self.expect_value(&MAX_SINKS)
1633 }
1634
1635 pub fn max_materialized_views(&self) -> u32 {
1637 *self.expect_value(&MAX_MATERIALIZED_VIEWS)
1638 }
1639
1640 pub fn max_clusters(&self) -> u32 {
1642 *self.expect_value(&MAX_CLUSTERS)
1643 }
1644
1645 pub fn max_replicas_per_cluster(&self) -> u32 {
1647 *self.expect_value(&MAX_REPLICAS_PER_CLUSTER)
1648 }
1649
1650 pub fn max_credit_consumption_rate(&self) -> Numeric {
1652 *self.expect_value(&MAX_CREDIT_CONSUMPTION_RATE)
1653 }
1654
1655 pub fn max_databases(&self) -> u32 {
1657 *self.expect_value(&MAX_DATABASES)
1658 }
1659
1660 pub fn max_schemas_per_database(&self) -> u32 {
1662 *self.expect_value(&MAX_SCHEMAS_PER_DATABASE)
1663 }
1664
1665 pub fn max_objects_per_schema(&self) -> u32 {
1667 *self.expect_value(&MAX_OBJECTS_PER_SCHEMA)
1668 }
1669
1670 pub fn max_secrets(&self) -> u32 {
1672 *self.expect_value(&MAX_SECRETS)
1673 }
1674
1675 pub fn max_roles(&self) -> u32 {
1677 *self.expect_value(&MAX_ROLES)
1678 }
1679
1680 pub fn max_network_policies(&self) -> u32 {
1682 *self.expect_value(&MAX_NETWORK_POLICIES)
1683 }
1684
1685 pub fn max_rules_per_network_policy(&self) -> u32 {
1687 *self.expect_value(&MAX_RULES_PER_NETWORK_POLICY)
1688 }
1689
1690 pub fn max_result_size(&self) -> u64 {
1692 self.expect_value::<ByteSize>(&MAX_RESULT_SIZE).as_bytes()
1693 }
1694
1695 pub fn max_copy_from_row_size(&self) -> u64 {
1697 self.expect_value::<ByteSize>(&MAX_COPY_FROM_ROW_SIZE)
1698 .as_bytes()
1699 }
1700
1701 pub fn allowed_cluster_replica_sizes(&self) -> Vec<String> {
1703 self.expect_value::<Vec<Ident>>(&ALLOWED_CLUSTER_REPLICA_SIZES)
1704 .into_iter()
1705 .map(|s| s.as_str().into())
1706 .collect()
1707 }
1708
1709 pub fn default_cluster_replication_factor(&self) -> u32 {
1711 *self.expect_value::<u32>(&DEFAULT_CLUSTER_REPLICATION_FACTOR)
1712 }
1713
1714 pub fn upsert_rocksdb_compaction_style(&self) -> mz_rocksdb_types::config::CompactionStyle {
1715 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_COMPACTION_STYLE)
1716 }
1717
1718 pub fn upsert_rocksdb_optimize_compaction_memtable_budget(&self) -> usize {
1719 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET)
1720 }
1721
1722 pub fn upsert_rocksdb_level_compaction_dynamic_level_bytes(&self) -> bool {
1723 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES)
1724 }
1725
1726 pub fn upsert_rocksdb_universal_compaction_ratio(&self) -> i32 {
1727 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_UNIVERSAL_COMPACTION_RATIO)
1728 }
1729
1730 pub fn upsert_rocksdb_parallelism(&self) -> Option<i32> {
1731 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_PARALLELISM)
1732 }
1733
1734 pub fn upsert_rocksdb_compression_type(&self) -> mz_rocksdb_types::config::CompressionType {
1735 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_COMPRESSION_TYPE)
1736 }
1737
1738 pub fn upsert_rocksdb_bottommost_compression_type(
1739 &self,
1740 ) -> mz_rocksdb_types::config::CompressionType {
1741 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_BOTTOMMOST_COMPRESSION_TYPE)
1742 }
1743
1744 pub fn upsert_rocksdb_batch_size(&self) -> usize {
1745 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_BATCH_SIZE)
1746 }
1747
1748 pub fn upsert_rocksdb_retry_duration(&self) -> Duration {
1749 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_RETRY_DURATION)
1750 }
1751
1752 pub fn upsert_rocksdb_stats_log_interval_seconds(&self) -> u32 {
1753 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_STATS_LOG_INTERVAL_SECONDS)
1754 }
1755
1756 pub fn upsert_rocksdb_stats_persist_interval_seconds(&self) -> u32 {
1757 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_STATS_PERSIST_INTERVAL_SECONDS)
1758 }
1759
1760 pub fn upsert_rocksdb_point_lookup_block_cache_size_mb(&self) -> Option<u32> {
1761 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_POINT_LOOKUP_BLOCK_CACHE_SIZE_MB)
1762 }
1763
1764 pub fn upsert_rocksdb_shrink_allocated_buffers_by_ratio(&self) -> usize {
1765 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_SHRINK_ALLOCATED_BUFFERS_BY_RATIO)
1766 }
1767
1768 pub fn upsert_rocksdb_write_buffer_manager_cluster_memory_fraction(&self) -> Option<Numeric> {
1769 *self.expect_value(
1770 &upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_CLUSTER_MEMORY_FRACTION,
1771 )
1772 }
1773
1774 pub fn upsert_rocksdb_write_buffer_manager_memory_bytes(&self) -> Option<usize> {
1775 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_MEMORY_BYTES)
1776 }
1777
1778 pub fn upsert_rocksdb_write_buffer_manager_allow_stall(&self) -> bool {
1779 *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_ALLOW_STALL)
1780 }
1781
1782 pub fn persist_fast_path_limit(&self) -> usize {
1783 *self.expect_value(&PERSIST_FAST_PATH_LIMIT)
1784 }
1785
1786 pub fn pg_source_connect_timeout(&self) -> Duration {
1788 *self.expect_value(&PG_SOURCE_CONNECT_TIMEOUT)
1789 }
1790
1791 pub fn pg_source_tcp_keepalives_retries(&self) -> u32 {
1793 *self.expect_value(&PG_SOURCE_TCP_KEEPALIVES_RETRIES)
1794 }
1795
1796 pub fn pg_source_tcp_keepalives_idle(&self) -> Duration {
1798 *self.expect_value(&PG_SOURCE_TCP_KEEPALIVES_IDLE)
1799 }
1800
1801 pub fn pg_source_tcp_keepalives_interval(&self) -> Duration {
1803 *self.expect_value(&PG_SOURCE_TCP_KEEPALIVES_INTERVAL)
1804 }
1805
1806 pub fn pg_source_tcp_user_timeout(&self) -> Duration {
1808 *self.expect_value(&PG_SOURCE_TCP_USER_TIMEOUT)
1809 }
1810
1811 pub fn pg_source_tcp_configure_server(&self) -> bool {
1813 *self.expect_value(&PG_SOURCE_TCP_CONFIGURE_SERVER)
1814 }
1815
1816 pub fn pg_source_snapshot_statement_timeout(&self) -> Duration {
1818 *self.expect_value(&PG_SOURCE_SNAPSHOT_STATEMENT_TIMEOUT)
1819 }
1820
1821 pub fn pg_source_wal_sender_timeout(&self) -> Option<Duration> {
1823 *self.expect_value(&PG_SOURCE_WAL_SENDER_TIMEOUT)
1824 }
1825
1826 pub fn pg_source_snapshot_collect_strict_count(&self) -> bool {
1828 *self.expect_value(&PG_SOURCE_SNAPSHOT_COLLECT_STRICT_COUNT)
1829 }
1830
1831 pub fn mysql_source_tcp_keepalive(&self) -> Duration {
1833 *self.expect_value(&MYSQL_SOURCE_TCP_KEEPALIVE)
1834 }
1835
1836 pub fn mysql_source_snapshot_max_execution_time(&self) -> Duration {
1838 *self.expect_value(&MYSQL_SOURCE_SNAPSHOT_MAX_EXECUTION_TIME)
1839 }
1840
1841 pub fn mysql_source_snapshot_lock_wait_timeout(&self) -> Duration {
1843 *self.expect_value(&MYSQL_SOURCE_SNAPSHOT_LOCK_WAIT_TIMEOUT)
1844 }
1845
1846 pub fn mysql_source_connect_timeout(&self) -> Duration {
1848 *self.expect_value(&MYSQL_SOURCE_CONNECT_TIMEOUT)
1849 }
1850
1851 pub fn ssh_check_interval(&self) -> Duration {
1853 *self.expect_value(&SSH_CHECK_INTERVAL)
1854 }
1855
1856 pub fn ssh_connect_timeout(&self) -> Duration {
1858 *self.expect_value(&SSH_CONNECT_TIMEOUT)
1859 }
1860
1861 pub fn ssh_keepalives_idle(&self) -> Duration {
1863 *self.expect_value(&SSH_KEEPALIVES_IDLE)
1864 }
1865
1866 pub fn kafka_socket_keepalive(&self) -> bool {
1868 *self.expect_value(&KAFKA_SOCKET_KEEPALIVE)
1869 }
1870
1871 pub fn kafka_socket_timeout(&self) -> Option<Duration> {
1873 *self.expect_value(&KAFKA_SOCKET_TIMEOUT)
1874 }
1875
1876 pub fn kafka_transaction_timeout(&self) -> Duration {
1878 *self.expect_value(&KAFKA_TRANSACTION_TIMEOUT)
1879 }
1880
1881 pub fn kafka_socket_connection_setup_timeout(&self) -> Duration {
1883 *self.expect_value(&KAFKA_SOCKET_CONNECTION_SETUP_TIMEOUT)
1884 }
1885
1886 pub fn kafka_fetch_metadata_timeout(&self) -> Duration {
1888 *self.expect_value(&KAFKA_FETCH_METADATA_TIMEOUT)
1889 }
1890
1891 pub fn kafka_progress_record_fetch_timeout(&self) -> Option<Duration> {
1893 *self.expect_value(&KAFKA_PROGRESS_RECORD_FETCH_TIMEOUT)
1894 }
1895
1896 pub fn crdb_connect_timeout(&self) -> Duration {
1898 *self.expect_config_value(UncasedStr::new(
1899 mz_persist_client::cfg::CRDB_CONNECT_TIMEOUT.name(),
1900 ))
1901 }
1902
1903 pub fn crdb_tcp_user_timeout(&self) -> Duration {
1905 *self.expect_config_value(UncasedStr::new(
1906 mz_persist_client::cfg::CRDB_TCP_USER_TIMEOUT.name(),
1907 ))
1908 }
1909
1910 pub fn crdb_keepalives_idle(&self) -> Duration {
1912 *self.expect_config_value(UncasedStr::new(
1913 mz_persist_client::cfg::CRDB_KEEPALIVES_IDLE.name(),
1914 ))
1915 }
1916
1917 pub fn crdb_keepalives_interval(&self) -> Duration {
1919 *self.expect_config_value(UncasedStr::new(
1920 mz_persist_client::cfg::CRDB_KEEPALIVES_INTERVAL.name(),
1921 ))
1922 }
1923
1924 pub fn crdb_keepalives_retries(&self) -> u32 {
1926 *self.expect_config_value(UncasedStr::new(
1927 mz_persist_client::cfg::CRDB_KEEPALIVES_RETRIES.name(),
1928 ))
1929 }
1930
1931 pub fn storage_dataflow_max_inflight_bytes(&self) -> Option<usize> {
1933 *self.expect_value(&STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES)
1934 }
1935
1936 pub fn storage_dataflow_max_inflight_bytes_to_cluster_size_fraction(&self) -> Option<Numeric> {
1938 *self.expect_value(&STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_TO_CLUSTER_SIZE_FRACTION)
1939 }
1940
1941 pub fn storage_shrink_upsert_unused_buffers_by_ratio(&self) -> usize {
1943 *self.expect_value(&STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO)
1944 }
1945
1946 pub fn storage_dataflow_max_inflight_bytes_disk_only(&self) -> bool {
1948 *self.expect_value(&STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_DISK_ONLY)
1949 }
1950
1951 pub fn storage_statistics_interval(&self) -> Duration {
1953 *self.expect_value(&STORAGE_STATISTICS_INTERVAL)
1954 }
1955
1956 pub fn storage_statistics_collection_interval(&self) -> Duration {
1958 *self.expect_value(&STORAGE_STATISTICS_COLLECTION_INTERVAL)
1959 }
1960
1961 pub fn storage_record_source_sink_namespaced_errors(&self) -> bool {
1963 *self.expect_value(&STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS)
1964 }
1965
1966 pub fn persist_stats_filter_enabled(&self) -> bool {
1968 *self.expect_config_value(UncasedStr::new(
1969 mz_persist_client::stats::STATS_FILTER_ENABLED.name(),
1970 ))
1971 }
1972
1973 pub fn scram_iterations(&self) -> NonZeroU32 {
1974 *self.expect_value(&SCRAM_ITERATIONS)
1975 }
1976
1977 pub fn dyncfg_updates(&self) -> ConfigUpdates {
1978 let mut updates = ConfigUpdates::default();
1979 for entry in self.dyncfgs.entries() {
1980 let name = UncasedStr::new(entry.name());
1981 let val = match entry.val() {
1982 ConfigVal::Bool(_) => ConfigVal::from(*self.expect_config_value::<bool>(name)),
1983 ConfigVal::U32(_) => ConfigVal::from(*self.expect_config_value::<u32>(name)),
1984 ConfigVal::Usize(_) => ConfigVal::from(*self.expect_config_value::<usize>(name)),
1985 ConfigVal::OptUsize(_) => {
1986 ConfigVal::from(*self.expect_config_value::<Option<usize>>(name))
1987 }
1988 ConfigVal::F64(_) => ConfigVal::from(*self.expect_config_value::<f64>(name)),
1989 ConfigVal::String(_) => {
1990 ConfigVal::from(self.expect_config_value::<String>(name).clone())
1991 }
1992 ConfigVal::OptString(_) => {
1993 ConfigVal::from(self.expect_config_value::<Option<String>>(name).clone())
1994 }
1995 ConfigVal::Duration(_) => {
1996 ConfigVal::from(*self.expect_config_value::<Duration>(name))
1997 }
1998 ConfigVal::Json(_) => {
1999 ConfigVal::from(self.expect_config_value::<serde_json::Value>(name).clone())
2000 }
2001 };
2002 updates.add_dynamic(entry.name(), val);
2003 }
2004 updates.apply(&self.dyncfgs);
2005 updates
2006 }
2007
2008 pub fn metrics_retention(&self) -> Duration {
2010 *self.expect_value(&METRICS_RETENTION)
2011 }
2012
2013 pub fn unsafe_mock_audit_event_timestamp(&self) -> Option<mz_repr::Timestamp> {
2015 *self.expect_value(&UNSAFE_MOCK_AUDIT_EVENT_TIMESTAMP)
2016 }
2017
2018 pub fn enable_rbac_checks(&self) -> bool {
2020 *self.expect_value(&ENABLE_RBAC_CHECKS)
2021 }
2022
2023 pub fn max_connections(&self) -> u32 {
2025 *self.expect_value(&MAX_CONNECTIONS)
2026 }
2027
2028 pub fn default_network_policy_name(&self) -> String {
2029 self.expect_value::<String>(&NETWORK_POLICY).clone()
2030 }
2031
2032 pub fn superuser_reserved_connections(&self) -> u32 {
2034 *self.expect_value(&SUPERUSER_RESERVED_CONNECTIONS)
2035 }
2036
2037 pub fn keep_n_source_status_history_entries(&self) -> usize {
2038 *self.expect_value(&KEEP_N_SOURCE_STATUS_HISTORY_ENTRIES)
2039 }
2040
2041 pub fn keep_n_sink_status_history_entries(&self) -> usize {
2042 *self.expect_value(&KEEP_N_SINK_STATUS_HISTORY_ENTRIES)
2043 }
2044
2045 pub fn keep_n_privatelink_status_history_entries(&self) -> usize {
2046 *self.expect_value(&KEEP_N_PRIVATELINK_STATUS_HISTORY_ENTRIES)
2047 }
2048
2049 pub fn replica_status_history_retention_window(&self) -> Duration {
2050 *self.expect_value(&REPLICA_STATUS_HISTORY_RETENTION_WINDOW)
2051 }
2052
2053 pub fn enable_storage_shard_finalization(&self) -> bool {
2055 *self.expect_value(&ENABLE_STORAGE_SHARD_FINALIZATION)
2056 }
2057
2058 pub fn enable_default_connection_validation(&self) -> bool {
2060 *self.expect_value(&ENABLE_DEFAULT_CONNECTION_VALIDATION)
2061 }
2062
2063 pub fn default_timestamp_interval(&self) -> Duration {
2065 *self.expect_value(&DEFAULT_TIMESTAMP_INTERVAL)
2066 }
2067
2068 pub fn min_timestamp_interval(&self) -> Duration {
2070 *self.expect_value(&MIN_TIMESTAMP_INTERVAL)
2071 }
2072 pub fn max_timestamp_interval(&self) -> Duration {
2074 *self.expect_value(&MAX_TIMESTAMP_INTERVAL)
2075 }
2076
2077 pub fn logging_filter(&self) -> CloneableEnvFilter {
2078 self.expect_value::<CloneableEnvFilter>(&LOGGING_FILTER)
2079 .clone()
2080 }
2081
2082 pub fn opentelemetry_filter(&self) -> CloneableEnvFilter {
2083 self.expect_value::<CloneableEnvFilter>(&OPENTELEMETRY_FILTER)
2084 .clone()
2085 }
2086
2087 pub fn logging_filter_defaults(&self) -> Vec<SerializableDirective> {
2088 self.expect_value::<Vec<SerializableDirective>>(&LOGGING_FILTER_DEFAULTS)
2089 .clone()
2090 }
2091
2092 pub fn opentelemetry_filter_defaults(&self) -> Vec<SerializableDirective> {
2093 self.expect_value::<Vec<SerializableDirective>>(&OPENTELEMETRY_FILTER_DEFAULTS)
2094 .clone()
2095 }
2096
2097 pub fn sentry_filters(&self) -> Vec<SerializableDirective> {
2098 self.expect_value::<Vec<SerializableDirective>>(&SENTRY_FILTERS)
2099 .clone()
2100 }
2101
2102 pub fn webhooks_secrets_caching_ttl_secs(&self) -> usize {
2103 *self.expect_value(&WEBHOOKS_SECRETS_CACHING_TTL_SECS)
2104 }
2105
2106 pub fn coord_slow_message_warn_threshold(&self) -> Duration {
2107 *self.expect_value(&COORD_SLOW_MESSAGE_WARN_THRESHOLD)
2108 }
2109
2110 pub fn grpc_client_http2_keep_alive_interval(&self) -> Duration {
2111 *self.expect_value(&grpc_client::HTTP2_KEEP_ALIVE_INTERVAL)
2112 }
2113
2114 pub fn grpc_client_http2_keep_alive_timeout(&self) -> Duration {
2115 *self.expect_value(&grpc_client::HTTP2_KEEP_ALIVE_TIMEOUT)
2116 }
2117
2118 pub fn grpc_connect_timeout(&self) -> Duration {
2119 *self.expect_value(&grpc_client::CONNECT_TIMEOUT)
2120 }
2121
2122 pub fn cluster_multi_process_replica_az_affinity_weight(&self) -> Option<i32> {
2123 *self.expect_value(&cluster_scheduling::CLUSTER_MULTI_PROCESS_REPLICA_AZ_AFFINITY_WEIGHT)
2124 }
2125
2126 pub fn cluster_soften_replication_anti_affinity(&self) -> bool {
2127 *self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY)
2128 }
2129
2130 pub fn cluster_soften_replication_anti_affinity_weight(&self) -> i32 {
2131 *self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT)
2132 }
2133
2134 pub fn cluster_enable_topology_spread(&self) -> bool {
2135 *self.expect_value(&cluster_scheduling::CLUSTER_ENABLE_TOPOLOGY_SPREAD)
2136 }
2137
2138 pub fn cluster_topology_spread_ignore_non_singular_scale(&self) -> bool {
2139 *self.expect_value(&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE)
2140 }
2141
2142 pub fn cluster_topology_spread_max_skew(&self) -> i32 {
2143 *self.expect_value(&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MAX_SKEW)
2144 }
2145
2146 pub fn cluster_topology_spread_set_min_domains(&self) -> Option<i32> {
2147 *self.expect_value(&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MIN_DOMAINS)
2148 }
2149
2150 pub fn cluster_topology_spread_soft(&self) -> bool {
2151 *self.expect_value(&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_SOFT)
2152 }
2153
2154 pub fn cluster_soften_az_affinity(&self) -> bool {
2155 *self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY)
2156 }
2157
2158 pub fn cluster_soften_az_affinity_weight(&self) -> i32 {
2159 *self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY_WEIGHT)
2160 }
2161
2162 pub fn cluster_alter_check_ready_interval(&self) -> Duration {
2163 *self.expect_value(&cluster_scheduling::CLUSTER_ALTER_CHECK_READY_INTERVAL)
2164 }
2165
2166 pub fn cluster_check_scheduling_policies_interval(&self) -> Duration {
2167 *self.expect_value(&cluster_scheduling::CLUSTER_CHECK_SCHEDULING_POLICIES_INTERVAL)
2168 }
2169
2170 pub fn cluster_security_context_enabled(&self) -> bool {
2171 *self.expect_value(&cluster_scheduling::CLUSTER_SECURITY_CONTEXT_ENABLED)
2172 }
2173
2174 pub fn cluster_refresh_mv_compaction_estimate(&self) -> Duration {
2175 *self.expect_value(&cluster_scheduling::CLUSTER_REFRESH_MV_COMPACTION_ESTIMATE)
2176 }
2177
2178 pub fn privatelink_status_update_quota_per_minute(&self) -> u32 {
2180 *self.expect_value(&PRIVATELINK_STATUS_UPDATE_QUOTA_PER_MINUTE)
2181 }
2182
2183 pub fn statement_logging_target_data_rate(&self) -> Option<usize> {
2184 *self.expect_value(&STATEMENT_LOGGING_TARGET_DATA_RATE)
2185 }
2186
2187 pub fn statement_logging_max_data_credit(&self) -> Option<usize> {
2188 *self.expect_value(&STATEMENT_LOGGING_MAX_DATA_CREDIT)
2189 }
2190
2191 pub fn statement_logging_max_sample_rate(&self) -> Numeric {
2193 *self.expect_value(&STATEMENT_LOGGING_MAX_SAMPLE_RATE)
2194 }
2195
2196 pub fn statement_logging_default_sample_rate(&self) -> Numeric {
2198 *self.expect_value(&STATEMENT_LOGGING_DEFAULT_SAMPLE_RATE)
2199 }
2200
2201 pub fn enable_internal_statement_logging(&self) -> bool {
2203 *self.expect_value(&ENABLE_INTERNAL_STATEMENT_LOGGING)
2204 }
2205
2206 pub fn optimizer_stats_timeout(&self) -> Duration {
2208 *self.expect_value(&OPTIMIZER_STATS_TIMEOUT)
2209 }
2210
2211 pub fn optimizer_oneshot_stats_timeout(&self) -> Duration {
2213 *self.expect_value(&OPTIMIZER_ONESHOT_STATS_TIMEOUT)
2214 }
2215
2216 pub fn webhook_concurrent_request_limit(&self) -> usize {
2218 *self.expect_value(&WEBHOOK_CONCURRENT_REQUEST_LIMIT)
2219 }
2220
2221 pub fn pg_timestamp_oracle_connection_pool_max_size(&self) -> usize {
2223 *self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_SIZE)
2224 }
2225
2226 pub fn pg_timestamp_oracle_connection_pool_max_wait(&self) -> Option<Duration> {
2228 *self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_WAIT)
2229 }
2230
2231 pub fn pg_timestamp_oracle_connection_pool_ttl(&self) -> Duration {
2233 *self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL)
2234 }
2235
2236 pub fn pg_timestamp_oracle_connection_pool_ttl_stagger(&self) -> Duration {
2238 *self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL_STAGGER)
2239 }
2240
2241 pub fn user_storage_managed_collections_batch_duration(&self) -> Duration {
2243 *self.expect_value(&USER_STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION)
2244 }
2245
2246 pub fn force_source_table_syntax(&self) -> bool {
2247 *self.expect_value(&FORCE_SOURCE_TABLE_SYNTAX)
2248 }
2249
2250 pub fn optimizer_e2e_latency_warning_threshold(&self) -> Duration {
2251 *self.expect_value(&OPTIMIZER_E2E_LATENCY_WARNING_THRESHOLD)
2252 }
2253
2254 pub fn is_controller_config_var(&self, name: &str) -> bool {
2256 self.is_dyncfg_var(name)
2257 }
2258
2259 pub fn is_compute_config_var(&self, name: &str) -> bool {
2263 name == MAX_RESULT_SIZE.name() || self.is_dyncfg_var(name) || is_tracing_var(name)
2264 }
2265
2266 pub fn is_metrics_config_var(&self, name: &str) -> bool {
2268 self.is_dyncfg_var(name)
2269 }
2270
2271 pub fn is_storage_config_var(&self, name: &str) -> bool {
2273 name == PG_SOURCE_CONNECT_TIMEOUT.name()
2274 || name == PG_SOURCE_TCP_KEEPALIVES_IDLE.name()
2275 || name == PG_SOURCE_TCP_KEEPALIVES_INTERVAL.name()
2276 || name == PG_SOURCE_TCP_KEEPALIVES_RETRIES.name()
2277 || name == PG_SOURCE_TCP_USER_TIMEOUT.name()
2278 || name == PG_SOURCE_TCP_CONFIGURE_SERVER.name()
2279 || name == PG_SOURCE_SNAPSHOT_STATEMENT_TIMEOUT.name()
2280 || name == PG_SOURCE_WAL_SENDER_TIMEOUT.name()
2281 || name == PG_SOURCE_SNAPSHOT_COLLECT_STRICT_COUNT.name()
2282 || name == MYSQL_SOURCE_TCP_KEEPALIVE.name()
2283 || name == MYSQL_SOURCE_SNAPSHOT_MAX_EXECUTION_TIME.name()
2284 || name == MYSQL_SOURCE_SNAPSHOT_LOCK_WAIT_TIMEOUT.name()
2285 || name == MYSQL_SOURCE_CONNECT_TIMEOUT.name()
2286 || name == ENABLE_STORAGE_SHARD_FINALIZATION.name()
2287 || name == SSH_CHECK_INTERVAL.name()
2288 || name == SSH_CONNECT_TIMEOUT.name()
2289 || name == SSH_KEEPALIVES_IDLE.name()
2290 || name == KAFKA_SOCKET_KEEPALIVE.name()
2291 || name == KAFKA_SOCKET_TIMEOUT.name()
2292 || name == KAFKA_TRANSACTION_TIMEOUT.name()
2293 || name == KAFKA_SOCKET_CONNECTION_SETUP_TIMEOUT.name()
2294 || name == KAFKA_FETCH_METADATA_TIMEOUT.name()
2295 || name == KAFKA_PROGRESS_RECORD_FETCH_TIMEOUT.name()
2296 || name == STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES.name()
2297 || name == STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_TO_CLUSTER_SIZE_FRACTION.name()
2298 || name == STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_DISK_ONLY.name()
2299 || name == STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO.name()
2300 || name == STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS.name()
2301 || name == STORAGE_STATISTICS_INTERVAL.name()
2302 || name == STORAGE_STATISTICS_COLLECTION_INTERVAL.name()
2303 || name == USER_STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION.name()
2304 || is_upsert_rocksdb_config_var(name)
2305 || self.is_dyncfg_var(name)
2306 || is_tracing_var(name)
2307 }
2308
2309 fn is_dyncfg_var(&self, name: &str) -> bool {
2311 self.dyncfgs.entries().any(|e| name == e.name())
2312 }
2313}
2314
2315pub fn is_tracing_var(name: &str) -> bool {
2316 name == LOGGING_FILTER.name()
2317 || name == LOGGING_FILTER_DEFAULTS.name()
2318 || name == OPENTELEMETRY_FILTER.name()
2319 || name == OPENTELEMETRY_FILTER_DEFAULTS.name()
2320 || name == SENTRY_FILTERS.name()
2321}
2322
2323pub fn is_secrets_caching_var(name: &str) -> bool {
2325 name == WEBHOOKS_SECRETS_CACHING_TTL_SECS.name()
2326}
2327
2328fn is_upsert_rocksdb_config_var(name: &str) -> bool {
2329 name == upsert_rocksdb::UPSERT_ROCKSDB_COMPACTION_STYLE.name()
2330 || name == upsert_rocksdb::UPSERT_ROCKSDB_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET.name()
2331 || name == upsert_rocksdb::UPSERT_ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES.name()
2332 || name == upsert_rocksdb::UPSERT_ROCKSDB_UNIVERSAL_COMPACTION_RATIO.name()
2333 || name == upsert_rocksdb::UPSERT_ROCKSDB_PARALLELISM.name()
2334 || name == upsert_rocksdb::UPSERT_ROCKSDB_COMPRESSION_TYPE.name()
2335 || name == upsert_rocksdb::UPSERT_ROCKSDB_BOTTOMMOST_COMPRESSION_TYPE.name()
2336 || name == upsert_rocksdb::UPSERT_ROCKSDB_BATCH_SIZE.name()
2337 || name == upsert_rocksdb::UPSERT_ROCKSDB_STATS_LOG_INTERVAL_SECONDS.name()
2338 || name == upsert_rocksdb::UPSERT_ROCKSDB_STATS_PERSIST_INTERVAL_SECONDS.name()
2339 || name == upsert_rocksdb::UPSERT_ROCKSDB_POINT_LOOKUP_BLOCK_CACHE_SIZE_MB.name()
2340 || name == upsert_rocksdb::UPSERT_ROCKSDB_SHRINK_ALLOCATED_BUFFERS_BY_RATIO.name()
2341}
2342
2343pub fn is_timestamp_oracle_config_var(name: &str) -> bool {
2346 name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_SIZE.name()
2347 || name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_WAIT.name()
2348 || name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL.name()
2349 || name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL_STAGGER.name()
2350 || name == CRDB_CONNECT_TIMEOUT.name()
2351 || name == CRDB_TCP_USER_TIMEOUT.name()
2352 || name == CRDB_KEEPALIVES_IDLE.name()
2353 || name == CRDB_KEEPALIVES_INTERVAL.name()
2354 || name == CRDB_KEEPALIVES_RETRIES.name()
2355}
2356
2357pub fn is_cluster_scheduling_var(name: &str) -> bool {
2359 name == cluster_scheduling::CLUSTER_MULTI_PROCESS_REPLICA_AZ_AFFINITY_WEIGHT.name()
2360 || name == cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY.name()
2361 || name == cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT.name()
2362 || name == cluster_scheduling::CLUSTER_ENABLE_TOPOLOGY_SPREAD.name()
2363 || name == cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE.name()
2364 || name == cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MAX_SKEW.name()
2365 || name == cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_SOFT.name()
2366 || name == cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY.name()
2367 || name == cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY_WEIGHT.name()
2368}
2369
2370pub fn is_http_config_var(name: &str) -> bool {
2372 name == WEBHOOK_CONCURRENT_REQUEST_LIMIT.name()
2373}
2374
2375static SESSION_SYSTEM_VARS: LazyLock<BTreeMap<&'static UncasedStr, &'static VarDefinition>> =
2379 LazyLock::new(|| {
2380 [
2381 &APPLICATION_NAME,
2382 &CLIENT_ENCODING,
2383 &CLIENT_MIN_MESSAGES,
2384 &CLUSTER,
2385 &CLUSTER_REPLICA,
2386 &DEFAULT_CLUSTER_REPLICATION_FACTOR,
2387 &CURRENT_OBJECT_MISSING_WARNINGS,
2388 &DATABASE,
2389 &DATE_STYLE,
2390 &EXTRA_FLOAT_DIGITS,
2391 &INTEGER_DATETIMES,
2392 &INTERVAL_STYLE,
2393 &REAL_TIME_RECENCY_TIMEOUT,
2394 &SEARCH_PATH,
2395 &STANDARD_CONFORMING_STRINGS,
2396 &STATEMENT_TIMEOUT,
2397 &IDLE_IN_TRANSACTION_SESSION_TIMEOUT,
2398 &TIMEZONE,
2399 &TRANSACTION_ISOLATION,
2400 &MAX_QUERY_RESULT_SIZE,
2401 ]
2402 .into_iter()
2403 .map(|var| (UncasedStr::new(var.name()), var))
2404 .collect()
2405 });
2406
2407#[derive(Debug)]
2410pub struct FeatureFlag {
2411 pub flag: &'static VarDefinition,
2412 pub feature_desc: &'static str,
2413}
2414
2415impl FeatureFlag {
2416 pub fn require(&'static self, system_vars: &SystemVars) -> Result<(), VarError> {
2419 match *system_vars.expect_value::<bool>(self.flag) {
2420 true => Ok(()),
2421 false => Err(VarError::RequiresFeatureFlag { feature_flag: self }),
2422 }
2423 }
2424}
2425
2426impl PartialEq for FeatureFlag {
2427 fn eq(&self, other: &FeatureFlag) -> bool {
2428 self.flag.name() == other.flag.name()
2429 }
2430}
2431
2432impl Eq for FeatureFlag {}
2433
2434impl Var for MzVersion {
2435 fn name(&self) -> &'static str {
2436 MZ_VERSION_NAME.as_str()
2437 }
2438
2439 fn value(&self) -> String {
2440 self.build_info
2441 .human_version(self.helm_chart_version.clone())
2442 }
2443
2444 fn description(&self) -> &'static str {
2445 "Shows the Materialize server version (Materialize)."
2446 }
2447
2448 fn type_name(&self) -> Cow<'static, str> {
2449 String::type_name()
2450 }
2451
2452 fn visible(&self, _: &User, _: &SystemVars) -> Result<(), VarError> {
2453 Ok(())
2454 }
2455}
2456
2457impl Var for User {
2458 fn name(&self) -> &'static str {
2459 IS_SUPERUSER_NAME.as_str()
2460 }
2461
2462 fn value(&self) -> String {
2463 self.is_superuser().format()
2464 }
2465
2466 fn description(&self) -> &'static str {
2467 "Reports whether the current session is a superuser (PostgreSQL)."
2468 }
2469
2470 fn type_name(&self) -> Cow<'static, str> {
2471 bool::type_name()
2472 }
2473
2474 fn visible(&self, _: &User, _: &SystemVars) -> Result<(), VarError> {
2475 Ok(())
2476 }
2477}