Skip to main content

mz_sql/session/
vars.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Run-time configuration parameters
11//!
12//! ## Overview
13//! Materialize roughly follows the PostgreSQL configuration model, which works
14//! as follows. There is a global set of named configuration parameters, like
15//! `DateStyle` and `client_encoding`. These parameters can be set in several
16//! places: in an on-disk configuration file (in Postgres, named
17//! postgresql.conf), in command line arguments when the server is started, or
18//! at runtime via the `ALTER SYSTEM` or `SET` statements. Parameters that are
19//! set in a session take precedence over database defaults, which in turn take
20//! precedence over command line arguments, which in turn take precedence over
21//! settings in the on-disk configuration. Note that changing the value of
22//! parameters obeys transaction semantics: if a transaction fails to commit,
23//! any parameters that were changed in that transaction (i.e., via `SET`) will
24//! be rolled back to their previous value.
25//!
26//! The Materialize configuration hierarchy at the moment is much simpler.
27//! Global defaults are hardcoded into the binary, and a select few parameters
28//! can be overridden per session. A select few parameters can be overridden on
29//! disk.
30//!
31//! The set of variables that can be overridden per session and the set of
32//! variables that can be overridden on disk are currently disjoint. The
33//! infrastructure has been designed with an eye towards merging these two sets
34//! and supporting additional layers to the hierarchy, however, should the need
35//! arise.
36//!
37//! The configuration parameters that exist are driven by compatibility with
38//! PostgreSQL drivers that expect them, not because they are particularly
39//! important.
40//!
41//! ## Structure
42//! The most meaningful exports from this module are:
43//!
44//! - [`SessionVars`] represent per-session parameters, which each user can
45//!   access independently of one another, and are accessed via `SET`.
46//!
47//!   The fields of [`SessionVars`] are either;
48//!     - `SessionVar`, which is preferable and simply requires full support of
49//!       the `SessionVar` impl for its embedded value type.
50//!     - `ServerVar` for types that do not currently support everything
51//!       required by `SessionVar`, e.g. they are fixed-value parameters.
52//!
53//!   In the fullness of time, all fields in [`SessionVars`] should be
54//!   `SessionVar`.
55//!
56//! - [`SystemVars`] represent system-wide configuration settings and are
57//!   accessed via `ALTER SYSTEM SET`.
58//!
59//!   All elements of [`SystemVars`] are `SystemVar`.
60//!
61//! Some [`VarDefinition`] are also marked as a [`FeatureFlag`]; this is just a
62//! wrapper to make working with a set of [`VarDefinition`] easier, primarily from
63//! within SQL planning, where we might want to check if a feature is enabled
64//! before planning it.
65
66use std::borrow::Cow;
67use std::clone::Clone;
68use std::collections::BTreeMap;
69use std::fmt::Debug;
70use std::net::IpAddr;
71use std::num::NonZeroU32;
72use std::string::ToString;
73use std::sync::{Arc, LazyLock};
74use std::time::Duration;
75
76use chrono::{DateTime, Utc};
77use derivative::Derivative;
78use imbl::OrdMap;
79use mz_build_info::BuildInfo;
80use mz_dyncfg::{ConfigSet, ConfigType, ConfigUpdates, ConfigVal, ParameterScope};
81use mz_persist_client::cfg::{
82    CRDB_CONNECT_TIMEOUT, CRDB_KEEPALIVES_IDLE, CRDB_KEEPALIVES_INTERVAL, CRDB_KEEPALIVES_RETRIES,
83    CRDB_TCP_USER_TIMEOUT,
84};
85use mz_repr::adt::numeric::Numeric;
86use mz_repr::adt::timestamp::CheckedTimestamp;
87use mz_repr::bytes::ByteSize;
88use mz_repr::user::{ExternalUserMetadata, InternalUserMetadata};
89use mz_tracing::{CloneableEnvFilter, SerializableDirective};
90use serde::Serialize;
91use thiserror::Error;
92use uncased::UncasedStr;
93
94use crate::ast::Ident;
95use crate::session::user::User;
96
97pub(crate) mod constraints;
98pub(crate) mod definitions;
99pub(crate) mod errors;
100pub(crate) mod polyfill;
101pub(crate) mod value;
102
103pub use definitions::*;
104pub use errors::*;
105pub use value::*;
106
107/// The action to take during end_transaction.
108///
109/// This enum lives here because of convenience: it's more of an adapter
110/// concept but [`SessionVars::end_transaction`] takes it.
111#[derive(Debug, Clone, Copy, PartialEq, Eq)]
112pub enum EndTransactionAction {
113    /// Commit the transaction.
114    Commit,
115    /// Rollback the transaction.
116    Rollback,
117}
118
119/// Represents the input to a variable.
120///
121/// Each variable has different rules for how it handles each style of input.
122/// This type allows us to defer interpretation of the input until the
123/// variable-specific interpretation can be applied.
124#[derive(Debug, Clone, Copy)]
125pub enum VarInput<'a> {
126    /// The input has been flattened into a single string.
127    ///
128    /// NOTE: when adding a new variant here (or in [`OwnedVarInput`]), extend
129    /// the `mz_catalog.mz_role_parameters` materialized view in
130    /// `src/catalog/src/builtin/mz_catalog.rs`. That MV discriminates on the
131    /// externally-tagged JSON shape of [`OwnedVarInput`] to format
132    /// `parameter_value`.
133    Flat(&'a str),
134    /// The input comes from a SQL `SET` statement and is jumbled across
135    /// multiple components.
136    ///
137    /// NOTE: see the doc-comment on [`VarInput::Flat`] — adding a new variant
138    /// requires extending `mz_catalog.mz_role_parameters`.
139    SqlSet(&'a [String]),
140}
141
142impl<'a> VarInput<'a> {
143    /// Converts the variable input to an owned vector of strings.
144    pub fn to_vec(&self) -> Vec<String> {
145        match self {
146            VarInput::Flat(v) => vec![v.to_string()],
147            VarInput::SqlSet(values) => values.into_iter().map(|v| v.to_string()).collect(),
148        }
149    }
150}
151
152/// An owned version of [`VarInput`].
153#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize)]
154pub enum OwnedVarInput {
155    /// See [`VarInput::Flat`].
156    ///
157    /// NOTE: adding a new variant requires extending the
158    /// `mz_catalog.mz_role_parameters` materialized view in
159    /// `src/catalog/src/builtin/mz_catalog.rs`, which discriminates on the
160    /// externally-tagged JSON shape of this enum.
161    Flat(String),
162    /// See [`VarInput::SqlSet`].
163    ///
164    /// NOTE: see the doc-comment on [`OwnedVarInput::Flat`].
165    SqlSet(Vec<String>),
166}
167
168impl OwnedVarInput {
169    /// Converts this owned variable input as a [`VarInput`].
170    pub fn borrow(&self) -> VarInput<'_> {
171        match self {
172            OwnedVarInput::Flat(v) => VarInput::Flat(v),
173            OwnedVarInput::SqlSet(v) => VarInput::SqlSet(v),
174        }
175    }
176}
177
178/// A `Var` represents a configuration parameter of an arbitrary type.
179pub trait Var: Debug {
180    /// Returns the name of the configuration parameter.
181    fn name(&self) -> &'static str;
182
183    /// Constructs a flattened string representation of the current value of the
184    /// configuration parameter.
185    ///
186    /// The resulting string is guaranteed to be parsable if provided to
187    /// `Value::parse` as a [`VarInput::Flat`].
188    fn value(&self) -> String;
189
190    /// Returns a short sentence describing the purpose of the configuration
191    /// parameter.
192    fn description(&self) -> &'static str;
193
194    /// Returns the name of the type of this variable.
195    fn type_name(&self) -> Cow<'static, str>;
196
197    /// Indicates wither the [`Var`] is visible as a function of the `user` and `system_vars`.
198    /// "Invisible" parameters return `VarErrors`.
199    ///
200    /// Variables marked as `internal` are only visible for the system user.
201    fn visible(&self, user: &User, system_vars: &SystemVars) -> Result<(), VarError>;
202
203    /// Reports whether the variable is only visible in unsafe mode.
204    fn is_unsafe(&self) -> bool {
205        self.name().starts_with("unsafe_")
206    }
207
208    /// Returns the [`ParameterScope`] at which this variable's value may be
209    /// overridden by the LaunchDarkly sync loop. Defaults to
210    /// [`ParameterScope::Environment`].
211    fn scope(&self) -> ParameterScope {
212        ParameterScope::Environment
213    }
214
215    /// Upcast `self` to a `dyn Var`, useful when working with multiple different implementors of
216    /// [`Var`].
217    fn as_var(&self) -> &dyn Var
218    where
219        Self: Sized,
220    {
221        self
222    }
223}
224
225/// A `SessionVar` is the session value for a configuration parameter. If unset,
226/// the server default is used instead.
227///
228/// Note: even though all of the different `*_value` fields are `Box<dyn Value>` they are enforced
229/// to be the same type because we use the `definition`s `parse(...)` method. This is guaranteed to
230/// return the same type as the compiled in default.
231#[derive(Debug)]
232pub struct SessionVar {
233    definition: VarDefinition,
234    /// System or Role default value.
235    default_value: Option<Box<dyn Value>>,
236    /// Value `LOCAL` to a transaction, will be unset at the completion of the transaction.
237    local_value: Option<Box<dyn Value>>,
238    /// Value set during a transaction, will be set if the transaction is committed.
239    staged_value: Option<Box<dyn Value>>,
240    /// Value that overrides the default.
241    session_value: Option<Box<dyn Value>>,
242}
243
244impl Clone for SessionVar {
245    fn clone(&self) -> Self {
246        SessionVar {
247            definition: self.definition.clone(),
248            default_value: self.default_value.as_ref().map(|v| v.box_clone()),
249            local_value: self.local_value.as_ref().map(|v| v.box_clone()),
250            staged_value: self.staged_value.as_ref().map(|v| v.box_clone()),
251            session_value: self.session_value.as_ref().map(|v| v.box_clone()),
252        }
253    }
254}
255
256impl SessionVar {
257    pub const fn new(var: VarDefinition) -> Self {
258        SessionVar {
259            definition: var,
260            default_value: None,
261            local_value: None,
262            staged_value: None,
263            session_value: None,
264        }
265    }
266
267    /// Checks if the provided [`VarInput`] is valid for the current session variable, returning
268    /// the formatted output if it's valid.
269    pub fn check(&self, input: VarInput) -> Result<String, VarError> {
270        let v = self.definition.parse(input)?;
271        self.validate_constraints(v.as_ref())?;
272
273        Ok(v.format())
274    }
275
276    /// Parse the input and update the stored value to match.
277    pub fn set(&mut self, input: VarInput, local: bool) -> Result<(), VarError> {
278        let v = self.definition.parse(input)?;
279
280        // Validate our parsed value.
281        self.validate_constraints(v.as_ref())?;
282
283        if local {
284            self.local_value = Some(v);
285        } else {
286            self.local_value = None;
287            self.staged_value = Some(v);
288        }
289        Ok(())
290    }
291
292    /// Sets the default value for the variable.
293    pub fn set_default(&mut self, input: VarInput) -> Result<(), VarError> {
294        let v = self.definition.parse(input)?;
295        self.validate_constraints(v.as_ref())?;
296        self.default_value = Some(v);
297        Ok(())
298    }
299
300    /// Reset the stored value to the default.
301    pub fn reset(&mut self, local: bool) {
302        let value = self
303            .default_value
304            .as_ref()
305            .map(|v| v.as_ref())
306            .unwrap_or_else(|| self.definition.value.value());
307        if local {
308            self.local_value = Some(value.box_clone());
309        } else {
310            self.local_value = None;
311            self.staged_value = Some(value.box_clone());
312        }
313    }
314
315    /// Returns a possibly new SessionVar if this needs to mutate at transaction end.
316    #[must_use]
317    pub fn end_transaction(&self, action: EndTransactionAction) -> Option<Self> {
318        if !self.is_mutating() {
319            return None;
320        }
321        let mut next: Self = self.clone();
322        next.local_value = None;
323        match action {
324            EndTransactionAction::Commit if next.staged_value.is_some() => {
325                next.session_value = next.staged_value.take()
326            }
327            _ => next.staged_value = None,
328        }
329        Some(next)
330    }
331
332    /// Whether this Var needs to mutate at the end of a transaction.
333    pub fn is_mutating(&self) -> bool {
334        self.local_value.is_some() || self.staged_value.is_some()
335    }
336
337    pub fn value_dyn(&self) -> &dyn Value {
338        self.local_value
339            .as_deref()
340            .or(self.staged_value.as_deref())
341            .or(self.session_value.as_deref())
342            .or(self.default_value.as_deref())
343            .unwrap_or_else(|| self.definition.value.value())
344    }
345
346    /// Returns the [`Value`] that is currently stored as the `session_value`.
347    ///
348    /// Note: This should __only__ be used for inspection, if you want to determine the current
349    /// value of this [`SessionVar`] you should use [`SessionVar::value`].
350    pub fn inspect_session_value(&self) -> Option<&dyn Value> {
351        self.session_value.as_deref()
352    }
353
354    fn validate_constraints(&self, val: &dyn Value) -> Result<(), VarError> {
355        if let Some(constraint) = &self.definition.constraint {
356            constraint.check_constraint(self, self.value_dyn(), val)
357        } else {
358            Ok(())
359        }
360    }
361}
362
363impl Var for SessionVar {
364    fn name(&self) -> &'static str {
365        self.definition.name.as_str()
366    }
367
368    fn value(&self) -> String {
369        self.value_dyn().format()
370    }
371
372    fn description(&self) -> &'static str {
373        self.definition.description
374    }
375
376    fn type_name(&self) -> Cow<'static, str> {
377        self.definition.type_name()
378    }
379
380    fn visible(
381        &self,
382        user: &User,
383        system_vars: &super::vars::SystemVars,
384    ) -> Result<(), super::vars::VarError> {
385        self.definition.visible(user, system_vars)
386    }
387}
388
389#[derive(Debug, Clone, PartialEq, Eq)]
390pub struct MzVersion {
391    /// Inputs to computed variables.
392    build_info: &'static BuildInfo,
393    /// Helm chart version
394    helm_chart_version: Option<String>,
395}
396
397impl MzVersion {
398    pub fn new(build_info: &'static BuildInfo, helm_chart_version: Option<String>) -> Self {
399        MzVersion {
400            build_info,
401            helm_chart_version,
402        }
403    }
404}
405
406/// Session variables.
407///
408/// See the [`crate::session::vars`] module documentation for more details on the
409/// Materialize configuration model.
410#[derive(Debug, Clone)]
411pub struct SessionVars {
412    /// The set of all session variables.
413    vars: OrdMap<&'static UncasedStr, SessionVar>,
414    /// Inputs to computed variables.
415    mz_version: MzVersion,
416    /// Information about the user associated with this Session.
417    user: User,
418}
419
420impl SessionVars {
421    /// Creates a new [`SessionVars`] without considering the System or Role defaults.
422    pub fn new_unchecked(
423        build_info: &'static BuildInfo,
424        user: User,
425        helm_chart_version: Option<String>,
426    ) -> SessionVars {
427        use definitions::*;
428
429        let vars = [
430            &FAILPOINTS,
431            &SERVER_VERSION,
432            &SERVER_VERSION_NUM,
433            &SQL_SAFE_UPDATES,
434            &REAL_TIME_RECENCY,
435            &EMIT_PLAN_INSIGHTS_NOTICE,
436            &EMIT_TIMESTAMP_NOTICE,
437            &EMIT_TRACE_ID_NOTICE,
438            &AUTO_ROUTE_CATALOG_QUERIES,
439            &ENABLE_SESSION_RBAC_CHECKS,
440            &RESTRICT_TO_USER_OBJECTS,
441            &ENABLE_SESSION_CARDINALITY_ESTIMATES,
442            &MAX_IDENTIFIER_LENGTH,
443            &STATEMENT_LOGGING_SAMPLE_RATE,
444            &EMIT_INTROSPECTION_QUERY_NOTICE,
445            &UNSAFE_NEW_TRANSACTION_WALL_TIME,
446            &WELCOME_MESSAGE,
447        ]
448        .into_iter()
449        .chain(SESSION_SYSTEM_VARS.iter().map(|(_name, var)| *var))
450        .map(|var| (var.name, SessionVar::new(var.clone())))
451        .collect();
452
453        SessionVars {
454            vars,
455            mz_version: MzVersion::new(build_info, helm_chart_version),
456            user,
457        }
458    }
459
460    fn expect_value<V: Value>(&self, var: &VarDefinition) -> &V {
461        let var = self
462            .vars
463            .get(var.name)
464            .expect("provided var should be in state");
465        let val = var.value_dyn();
466        val.as_any().downcast_ref::<V>().expect("success")
467    }
468
469    /// Returns an iterator over the configuration parameters and their current
470    /// values for this session.
471    ///
472    /// Note that this function does not check that the access variable should
473    /// be visible because of other settings or users. Before or after accessing
474    /// this method, you should call `Var::visible`.
475    pub fn iter(&self) -> impl Iterator<Item = &dyn Var> {
476        #[allow(clippy::as_conversions)]
477        self.vars
478            .values()
479            .map(|v| v.as_var())
480            .chain([&self.mz_version as &dyn Var, &self.user])
481    }
482
483    /// Returns an iterator over configuration parameters (and their current
484    /// values for this session) that are expected to be sent to the client when
485    /// a new connection is established or when their value changes.
486    pub fn notify_set(&self) -> impl Iterator<Item = &dyn Var> {
487        // WARNING: variables in this set are not checked for visibility, and
488        // are assumed to be visible for all sessions.
489        //
490        // This is fixible with some elbow grease, but at the moment it seems
491        // unlikely that we'll have a variable in the notify set that shouldn't
492        // be visible to all sessions.
493        [
494            &APPLICATION_NAME,
495            &CLIENT_ENCODING,
496            &DATE_STYLE,
497            &INTEGER_DATETIMES,
498            &SERVER_VERSION,
499            &STANDARD_CONFORMING_STRINGS,
500            &TIMEZONE,
501            &INTERVAL_STYLE,
502            // Including `cluster`, `cluster_replica`, `database`, and `search_path` in the notify
503            // set is a Materialize extension. Doing so allows users to more easily identify where
504            // their queries will be executing, which is important to know when you consider the
505            // size of a cluster, what indexes are present, etc.
506            &CLUSTER,
507            &CLUSTER_REPLICA,
508            &DEFAULT_CLUSTER_REPLICATION_FACTOR,
509            &DATABASE,
510            &SEARCH_PATH,
511        ]
512        .into_iter()
513        .map(|v| self.vars[v.name].as_var())
514        // Including `mz_version` in the notify set is a Materialize
515        // extension. Doing so allows applications to detect whether they
516        // are talking to Materialize or PostgreSQL without an additional
517        // network roundtrip. This is known to be safe because CockroachDB
518        // has an analogous extension [0].
519        // [0]: https://github.com/cockroachdb/cockroach/blob/369c4057a/pkg/sql/pgwire/conn.go#L1840
520        .chain(std::iter::once(self.mz_version.as_var()))
521    }
522
523    /// Resets all variables to their default value.
524    pub fn reset_all(&mut self) {
525        let names: Vec<_> = self.vars.keys().copied().collect();
526        for name in names {
527            self.vars[name].reset(false);
528        }
529    }
530
531    /// Returns a [`Var`] representing the configuration parameter with the
532    /// specified name.
533    ///
534    /// Configuration parameters are matched case insensitively. If no such
535    /// configuration parameter exists, `get` returns an error.
536    ///
537    /// Note that if `name` is known at compile time, you should instead use the
538    /// named accessor to access the variable with its true Rust type. For
539    /// example, `self.get("sql_safe_updates").value()` returns the string
540    /// `"true"` or `"false"`, while `self.sql_safe_updates()` returns a bool.
541    pub fn get(&self, system_vars: &SystemVars, name: &str) -> Result<&dyn Var, VarError> {
542        let name = compat_translate_name(name);
543
544        let name = UncasedStr::new(name);
545        if name == MZ_VERSION_NAME {
546            Ok(&self.mz_version)
547        } else if name == IS_SUPERUSER_NAME {
548            Ok(&self.user)
549        } else {
550            self.vars
551                .get(name)
552                .map(|v| {
553                    v.visible(&self.user, system_vars)?;
554                    Ok(v.as_var())
555                })
556                .transpose()?
557                .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
558        }
559    }
560
561    /// Returns a [`SessionVar`] for inspection.
562    ///
563    /// Note: If you're trying to determine the value of the variable with `name` you should
564    /// instead use the named accessor, or [`SessionVars::get`].
565    pub fn inspect(&self, name: &str) -> Result<&SessionVar, VarError> {
566        let name = compat_translate_name(name);
567
568        self.vars
569            .get(UncasedStr::new(name))
570            .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
571    }
572
573    /// Sets the configuration parameter named `name` to the value represented
574    /// by `value`.
575    ///
576    /// The new value may be either committed or rolled back by the next call to
577    /// [`SessionVars::end_transaction`]. If `local` is true, the new value is always
578    /// discarded by the next call to [`SessionVars::end_transaction`], even if the
579    /// transaction is marked to commit.
580    ///
581    /// Like with [`SessionVars::get`], configuration parameters are matched case
582    /// insensitively. If `value` is not valid, as determined by the underlying
583    /// configuration parameter, or if the named configuration parameter does
584    /// not exist, an error is returned.
585    pub fn set(
586        &mut self,
587        system_vars: &SystemVars,
588        name: &str,
589        input: VarInput,
590        local: bool,
591    ) -> Result<(), VarError> {
592        let (name, input) = compat_translate(name, input);
593
594        check_transaction_isolation_feature_flag(name, input, system_vars)?;
595
596        let name = UncasedStr::new(name);
597        self.check_read_only(name)?;
598
599        self.vars
600            .get_mut(name)
601            .map(|v| {
602                v.visible(&self.user, system_vars)?;
603                v.set(input, local)
604            })
605            .transpose()?
606            .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
607    }
608
609    /// Sets the default value for the parameter named `name` to the value
610    /// represented by `value`.
611    pub fn set_default(&mut self, name: &str, input: VarInput) -> Result<(), VarError> {
612        let (name, input) = compat_translate(name, input);
613
614        let name = UncasedStr::new(name);
615
616        // Check if this variable is allowed to be set as a role default.
617        // Most read-only variables are blocked, but some (like restrict_to_user_objects)
618        // are specifically designed to be set via ALTER ROLE by superusers.
619        if !Self::allow_role_default(name) {
620            self.check_read_only(name)?;
621        }
622
623        self.vars
624            .get_mut(name)
625            // Note: visibility is checked when persisting a role default.
626            .map(|v| v.set_default(input))
627            .transpose()?
628            .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
629    }
630
631    /// Returns true if the variable can be set as a role default even if it's
632    /// otherwise read-only from direct SET commands.
633    ///
634    /// SECURITY: Any variable listed here must also have a corresponding
635    /// superuser RBAC check in `generate_rbac_requirements` in `rbac.rs`
636    /// (see the `PlannedAlterRoleOption::Variable` match arm). Without that
637    /// check, any role could set the variable on themselves via ALTER ROLE.
638    fn allow_role_default(name: &UncasedStr) -> bool {
639        name == RESTRICT_TO_USER_OBJECTS.name
640    }
641
642    /// Sets the configuration parameter named `name` to its default value.
643    ///
644    /// The new value may be either committed or rolled back by the next call to
645    /// [`SessionVars::end_transaction`]. If `local` is true, the new value is
646    /// always discarded by the next call to [`SessionVars::end_transaction`],
647    /// even if the transaction is marked to commit.
648    ///
649    /// Like with [`SessionVars::get`], configuration parameters are matched
650    /// case insensitively. If the named configuration parameter does not exist,
651    /// an error is returned.
652    ///
653    /// If the variable does not exist or the user does not have the visibility
654    /// requires, this function returns an error.
655    pub fn reset(
656        &mut self,
657        system_vars: &SystemVars,
658        name: &str,
659        local: bool,
660    ) -> Result<(), VarError> {
661        let name = compat_translate_name(name);
662
663        let name = UncasedStr::new(name);
664        self.check_read_only(name)?;
665
666        self.vars
667            .get_mut(name)
668            .map(|v| {
669                v.visible(&self.user, system_vars)?;
670                v.reset(local);
671                Ok(())
672            })
673            .transpose()?
674            .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
675    }
676
677    /// Returns an error if the variable corresponding to `name` is read only.
678    ///
679    /// Note: This is called by `set()` (for SQL SET commands) but NOT by
680    /// `set_default()` (for role defaults). This allows variables like
681    /// `restrict_to_user_objects` to be set via `ALTER ROLE ... SET` by
682    /// superusers while blocking direct `SET` commands from regular users.
683    fn check_read_only(&self, name: &UncasedStr) -> Result<(), VarError> {
684        if name == MZ_VERSION_NAME {
685            Err(VarError::ReadOnlyParameter(MZ_VERSION_NAME.as_str()))
686        } else if name == IS_SUPERUSER_NAME {
687            Err(VarError::ReadOnlyParameter(IS_SUPERUSER_NAME.as_str()))
688        } else if name == MAX_IDENTIFIER_LENGTH.name {
689            Err(VarError::ReadOnlyParameter(
690                MAX_IDENTIFIER_LENGTH.name.as_str(),
691            ))
692        } else if name == RESTRICT_TO_USER_OBJECTS.name {
693            // This variable can only be set via ALTER ROLE ... SET by superusers,
694            // not via direct SET commands. This prevents malicious queries from
695            // bypassing the restriction.
696            Err(VarError::ReadOnlyParameter(
697                RESTRICT_TO_USER_OBJECTS.name.as_str(),
698            ))
699        } else {
700            Ok(())
701        }
702    }
703
704    /// Commits or rolls back configuration parameter updates made via
705    /// [`SessionVars::set`] since the last call to `end_transaction`.
706    ///
707    /// Returns any session parameters that changed because the transaction ended.
708    #[mz_ore::instrument(level = "debug")]
709    pub fn end_transaction(
710        &mut self,
711        action: EndTransactionAction,
712    ) -> BTreeMap<&'static str, String> {
713        let mut changed = BTreeMap::new();
714        let mut updates = Vec::new();
715        for (name, var) in self.vars.iter() {
716            if !var.is_mutating() {
717                continue;
718            }
719            let before = var.value();
720            let next = var.end_transaction(action).expect("must mutate");
721            let after = next.value();
722            updates.push((*name, next));
723
724            // Report the new value of the parameter.
725            if before != after {
726                changed.insert(var.name(), after);
727            }
728        }
729        self.vars.extend(updates);
730        changed
731    }
732
733    /// Returns the value of the `application_name` configuration parameter.
734    pub fn application_name(&self) -> &str {
735        self.expect_value::<String>(&APPLICATION_NAME).as_str()
736    }
737
738    /// Returns the build info.
739    pub fn build_info(&self) -> &'static BuildInfo {
740        self.mz_version.build_info
741    }
742
743    /// Returns the value of the `client_encoding` configuration parameter.
744    pub fn client_encoding(&self) -> &ClientEncoding {
745        self.expect_value(&CLIENT_ENCODING)
746    }
747
748    /// Returns the value of the `client_min_messages` configuration parameter.
749    pub fn client_min_messages(&self) -> &ClientSeverity {
750        self.expect_value(&CLIENT_MIN_MESSAGES)
751    }
752
753    /// Returns the value of the `cluster` configuration parameter.
754    pub fn cluster(&self) -> &str {
755        self.expect_value::<String>(&CLUSTER).as_str()
756    }
757
758    /// Returns the value of the `cluster_replica` configuration parameter.
759    pub fn cluster_replica(&self) -> Option<&str> {
760        self.expect_value::<Option<String>>(&CLUSTER_REPLICA)
761            .as_deref()
762    }
763
764    /// Returns the value of the `current_object_missing_warnings` configuration
765    /// parameter.
766    pub fn current_object_missing_warnings(&self) -> bool {
767        *self.expect_value::<bool>(&CURRENT_OBJECT_MISSING_WARNINGS)
768    }
769
770    /// Returns the value of the `DateStyle` configuration parameter.
771    pub fn date_style(&self) -> &[&str] {
772        &self.expect_value::<DateStyle>(&DATE_STYLE).0
773    }
774
775    /// Returns the value of the `database` configuration parameter.
776    pub fn database(&self) -> &str {
777        self.expect_value::<String>(&DATABASE).as_str()
778    }
779
780    /// Returns the value of the `extra_float_digits` configuration parameter.
781    pub fn extra_float_digits(&self) -> i32 {
782        *self.expect_value(&EXTRA_FLOAT_DIGITS)
783    }
784
785    /// Returns the value of the `integer_datetimes` configuration parameter.
786    pub fn integer_datetimes(&self) -> bool {
787        *self.expect_value(&INTEGER_DATETIMES)
788    }
789
790    /// Returns the value of the `intervalstyle` configuration parameter.
791    pub fn intervalstyle(&self) -> &IntervalStyle {
792        self.expect_value(&INTERVAL_STYLE)
793    }
794
795    /// Returns the value of the `mz_version` configuration parameter.
796    pub fn mz_version(&self) -> String {
797        self.mz_version.value()
798    }
799
800    /// Returns the value of the `search_path` configuration parameter.
801    pub fn search_path(&self) -> &[Ident] {
802        self.expect_value::<Vec<Ident>>(&SEARCH_PATH).as_slice()
803    }
804
805    /// Returns the value of the `server_version` configuration parameter.
806    pub fn server_version(&self) -> &str {
807        self.expect_value::<String>(&SERVER_VERSION).as_str()
808    }
809
810    /// Returns the value of the `server_version_num` configuration parameter.
811    pub fn server_version_num(&self) -> i32 {
812        *self.expect_value(&SERVER_VERSION_NUM)
813    }
814
815    /// Returns the value of the `sql_safe_updates` configuration parameter.
816    pub fn sql_safe_updates(&self) -> bool {
817        *self.expect_value(&SQL_SAFE_UPDATES)
818    }
819
820    /// Returns the value of the `standard_conforming_strings` configuration
821    /// parameter.
822    pub fn standard_conforming_strings(&self) -> bool {
823        *self.expect_value(&STANDARD_CONFORMING_STRINGS)
824    }
825
826    /// Returns the value of the `statement_timeout` configuration parameter.
827    pub fn statement_timeout(&self) -> &Duration {
828        self.expect_value(&STATEMENT_TIMEOUT)
829    }
830
831    /// Returns the value of the `idle_in_transaction_session_timeout` configuration parameter.
832    pub fn idle_in_transaction_session_timeout(&self) -> &Duration {
833        self.expect_value(&IDLE_IN_TRANSACTION_SESSION_TIMEOUT)
834    }
835
836    /// Returns the value of the `timezone` configuration parameter.
837    pub fn timezone(&self) -> &TimeZone {
838        self.expect_value(&TIMEZONE)
839    }
840
841    /// Returns the value of the `transaction_isolation` configuration
842    /// parameter.
843    pub fn transaction_isolation(&self) -> &IsolationLevel {
844        self.expect_value(&TRANSACTION_ISOLATION)
845    }
846
847    /// Returns the value of `real_time_recency` configuration parameter.
848    pub fn real_time_recency(&self) -> bool {
849        *self.expect_value(&REAL_TIME_RECENCY)
850    }
851
852    /// Returns the value of the `real_time_recency_timeout` configuration parameter.
853    pub fn real_time_recency_timeout(&self) -> &Duration {
854        self.expect_value(&REAL_TIME_RECENCY_TIMEOUT)
855    }
856
857    /// Returns the value of `emit_plan_insights_notice` configuration parameter.
858    pub fn emit_plan_insights_notice(&self) -> bool {
859        *self.expect_value(&EMIT_PLAN_INSIGHTS_NOTICE)
860    }
861
862    /// Returns the value of `emit_timestamp_notice` configuration parameter.
863    pub fn emit_timestamp_notice(&self) -> bool {
864        *self.expect_value(&EMIT_TIMESTAMP_NOTICE)
865    }
866
867    /// Returns the value of `emit_trace_id_notice` configuration parameter.
868    pub fn emit_trace_id_notice(&self) -> bool {
869        *self.expect_value(&EMIT_TRACE_ID_NOTICE)
870    }
871
872    /// Returns the value of `auto_route_catalog_queries` configuration parameter.
873    pub fn auto_route_catalog_queries(&self) -> bool {
874        *self.expect_value(&AUTO_ROUTE_CATALOG_QUERIES)
875    }
876
877    /// Returns the value of `enable_session_rbac_checks` configuration parameter.
878    pub fn enable_session_rbac_checks(&self) -> bool {
879        *self.expect_value(&ENABLE_SESSION_RBAC_CHECKS)
880    }
881
882    /// Returns the value of `restrict_to_user_objects` configuration parameter.
883    pub fn restrict_to_user_objects(&self) -> bool {
884        *self.expect_value(&RESTRICT_TO_USER_OBJECTS)
885    }
886
887    /// Returns the value of `enable_session_cardinality_estimates` configuration parameter.
888    pub fn enable_session_cardinality_estimates(&self) -> bool {
889        *self.expect_value(&ENABLE_SESSION_CARDINALITY_ESTIMATES)
890    }
891
892    /// Returns the value of `is_superuser` configuration parameter.
893    pub fn is_superuser(&self) -> bool {
894        self.user.is_superuser()
895    }
896
897    /// Returns the user associated with this `SessionVars` instance.
898    pub fn user(&self) -> &User {
899        &self.user
900    }
901
902    /// Returns the value of the `max_query_result_size` configuration parameter.
903    pub fn max_query_result_size(&self) -> u64 {
904        self.expect_value::<ByteSize>(&MAX_QUERY_RESULT_SIZE)
905            .as_bytes()
906    }
907
908    /// Sets the internal metadata associated with the user.
909    pub fn set_internal_user_metadata(&mut self, metadata: InternalUserMetadata) {
910        self.user.internal_metadata = Some(metadata);
911    }
912
913    /// Sets the external metadata associated with the user.
914    pub fn set_external_user_metadata(&mut self, metadata: ExternalUserMetadata) {
915        self.user.external_metadata = Some(metadata);
916    }
917
918    pub fn set_cluster(&mut self, cluster: String) {
919        let var = self
920            .vars
921            .get_mut(UncasedStr::new(CLUSTER.name()))
922            .expect("cluster variable must exist");
923        var.set(VarInput::Flat(&cluster), false)
924            .expect("setting cluster must succeed");
925    }
926
927    pub fn set_local_transaction_isolation(&mut self, transaction_isolation: IsolationLevel) {
928        let var = self
929            .vars
930            .get_mut(UncasedStr::new(TRANSACTION_ISOLATION.name()))
931            .expect("transaction_isolation variable must exist");
932        var.set(VarInput::Flat(&transaction_isolation.to_string()), true)
933            .expect("setting transaction isolation must succeed");
934    }
935
936    pub fn get_statement_logging_sample_rate(&self) -> Numeric {
937        *self.expect_value(&STATEMENT_LOGGING_SAMPLE_RATE)
938    }
939
940    /// Returns the value of the `emit_introspection_query_notice` configuration parameter.
941    pub fn emit_introspection_query_notice(&self) -> bool {
942        *self.expect_value(&EMIT_INTROSPECTION_QUERY_NOTICE)
943    }
944
945    pub fn unsafe_new_transaction_wall_time(&self) -> Option<CheckedTimestamp<DateTime<Utc>>> {
946        *self.expect_value(&UNSAFE_NEW_TRANSACTION_WALL_TIME)
947    }
948
949    /// Returns the value of the `welcome_message` configuration parameter.
950    pub fn welcome_message(&self) -> bool {
951        *self.expect_value(&WELCOME_MESSAGE)
952    }
953}
954
955// TODO(database-issues#8069) remove together with `compat_translate`
956pub const OLD_CATALOG_SERVER_CLUSTER: &str = "mz_introspection";
957pub const OLD_AUTO_ROUTE_CATALOG_QUERIES: &str = "auto_route_introspection_queries";
958
959/// If the given variable name and/or input is deprecated, return a corresponding updated value,
960/// otherwise return the original.
961///
962/// This method was introduced to gracefully handle the rename of the `mz_introspection` cluster to
963/// `mz_cluster_server`. The plan is to remove it once all users have migrated to the new name. The
964/// debug logs will be helpful for checking this in production.
965// TODO(database-issues#8069) remove this after sufficient time has passed
966fn compat_translate<'a, 'b>(name: &'a str, input: VarInput<'b>) -> (&'a str, VarInput<'b>) {
967    if name == CLUSTER.name() {
968        if let Ok(value) = CLUSTER.parse(input) {
969            if value.format() == OLD_CATALOG_SERVER_CLUSTER {
970                tracing::debug!(
971                    github_27285 = true,
972                    "encountered deprecated `cluster` variable value: {}",
973                    OLD_CATALOG_SERVER_CLUSTER,
974                );
975                return (name, VarInput::Flat("mz_catalog_server"));
976            }
977        }
978    }
979
980    if name == OLD_AUTO_ROUTE_CATALOG_QUERIES {
981        tracing::debug!(
982            github_27285 = true,
983            "encountered deprecated `{}` variable name",
984            OLD_AUTO_ROUTE_CATALOG_QUERIES,
985        );
986        return (AUTO_ROUTE_CATALOG_QUERIES.name(), input);
987    }
988
989    (name, input)
990}
991
992fn compat_translate_name(name: &str) -> &str {
993    let (name, _) = compat_translate(name, VarInput::Flat(""));
994    name
995}
996
997/// Enforces feature-flag gating for `transaction_isolation` levels that sit
998/// behind a flag (`bounded staleness <duration>` and
999/// `strong session serializable`).
1000///
1001/// Returns `Ok(())` for any other variable, and for an unparseable value
1002/// (parse errors surface on the actual set). This is shared by every path that
1003/// assigns `transaction_isolation` — `SET`, `SET TRANSACTION`,
1004/// `ALTER ROLE ... SET`, and connection options — so that the gate cannot be
1005/// bypassed by choosing a different syntax or letter case.
1006pub fn check_transaction_isolation_feature_flag(
1007    name: &str,
1008    input: VarInput,
1009    system_vars: &SystemVars,
1010) -> Result<(), VarError> {
1011    if UncasedStr::new(name) != UncasedStr::new(TRANSACTION_ISOLATION_VAR_NAME) {
1012        return Ok(());
1013    }
1014    // Ignore parse failures here; the actual set surfaces them.
1015    let Ok(level) = IsolationLevel::parse(input) else {
1016        return Ok(());
1017    };
1018    match level {
1019        IsolationLevel::StrongSessionSerializable => ENABLE_SESSION_TIMELINES.require(system_vars),
1020        IsolationLevel::BoundedStaleness(_) => {
1021            ENABLE_BOUNDED_STALENESS_ISOLATION.require(system_vars)
1022        }
1023        _ => Ok(()),
1024    }
1025}
1026
1027/// A `SystemVar` is persisted on disk value for a configuration parameter. If unset,
1028/// the server default is used instead.
1029#[derive(Debug)]
1030pub struct SystemVar {
1031    definition: VarDefinition,
1032    /// Value currently persisted to disk.
1033    persisted_value: Option<Box<dyn Value>>,
1034    /// Current default, not persisted to disk.
1035    dynamic_default: Option<Box<dyn Value>>,
1036}
1037
1038impl Clone for SystemVar {
1039    fn clone(&self) -> Self {
1040        SystemVar {
1041            definition: self.definition.clone(),
1042            persisted_value: self.persisted_value.as_ref().map(|v| v.box_clone()),
1043            dynamic_default: self.dynamic_default.as_ref().map(|v| v.box_clone()),
1044        }
1045    }
1046}
1047
1048impl SystemVar {
1049    pub fn new(definition: VarDefinition) -> Self {
1050        SystemVar {
1051            definition,
1052            persisted_value: None,
1053            dynamic_default: None,
1054        }
1055    }
1056
1057    fn is_default(&self, input: VarInput) -> Result<bool, VarError> {
1058        let v = self.definition.parse(input)?;
1059        Ok(self.definition.default_value() == v.as_ref())
1060    }
1061
1062    pub fn value_dyn(&self) -> &dyn Value {
1063        self.persisted_value
1064            .as_deref()
1065            .or(self.dynamic_default.as_deref())
1066            .unwrap_or_else(|| self.definition.default_value())
1067    }
1068
1069    pub fn value<V: 'static>(&self) -> &V {
1070        let val = self.value_dyn();
1071        val.as_any().downcast_ref::<V>().expect("success")
1072    }
1073
1074    fn parse(&self, input: VarInput) -> Result<Box<dyn Value>, VarError> {
1075        let v = self.definition.parse(input)?;
1076        // Validate our parsed value.
1077        self.validate_constraints(v.as_ref())?;
1078        Ok(v)
1079    }
1080
1081    fn set(&mut self, input: VarInput) -> Result<bool, VarError> {
1082        let v = self.parse(input)?;
1083
1084        if self.persisted_value.as_ref() != Some(&v) {
1085            self.persisted_value = Some(v);
1086            Ok(true)
1087        } else {
1088            Ok(false)
1089        }
1090    }
1091
1092    fn reset(&mut self) -> bool {
1093        if self.persisted_value.is_some() {
1094            self.persisted_value = None;
1095            true
1096        } else {
1097            false
1098        }
1099    }
1100
1101    fn set_default(&mut self, input: VarInput) -> Result<(), VarError> {
1102        let v = self.definition.parse(input)?;
1103        self.dynamic_default = Some(v);
1104        Ok(())
1105    }
1106
1107    fn validate_constraints(&self, val: &dyn Value) -> Result<(), VarError> {
1108        if let Some(constraint) = &self.definition.constraint {
1109            constraint.check_constraint(self, self.value_dyn(), val)
1110        } else {
1111            Ok(())
1112        }
1113    }
1114}
1115
1116impl Var for SystemVar {
1117    fn name(&self) -> &'static str {
1118        self.definition.name.as_str()
1119    }
1120
1121    fn value(&self) -> String {
1122        self.value_dyn().format()
1123    }
1124
1125    fn description(&self) -> &'static str {
1126        self.definition.description
1127    }
1128
1129    fn type_name(&self) -> Cow<'static, str> {
1130        self.definition.type_name()
1131    }
1132
1133    fn scope(&self) -> ParameterScope {
1134        self.definition.scope()
1135    }
1136
1137    fn visible(&self, user: &User, system_vars: &SystemVars) -> Result<(), VarError> {
1138        self.definition.visible(user, system_vars)
1139    }
1140}
1141
1142#[derive(Debug, Error)]
1143pub enum NetworkPolicyError {
1144    #[error("Access denied for address {0}")]
1145    AddressDenied(IpAddr),
1146}
1147
1148/// On disk variables.
1149///
1150/// See the [`crate::session::vars`] module documentation for more details on the
1151/// Materialize configuration model.
1152#[derive(Derivative, Clone)]
1153#[derivative(Debug)]
1154pub struct SystemVars {
1155    /// Allows "unsafe" parameters to be set.
1156    allow_unsafe: bool,
1157    /// Set of all [`SystemVar`]s.
1158    vars: BTreeMap<&'static UncasedStr, SystemVar>,
1159    /// External components interested in when a [`SystemVar`] gets updated.
1160    #[derivative(Debug = "ignore")]
1161    callbacks: BTreeMap<String, Vec<Arc<dyn Fn(&SystemVars) + Send + Sync>>>,
1162
1163    /// NB: This is intentionally disconnected from the one that is plumbed around to persist and
1164    /// the controllers. This is so we can explicitly control and reason about when changes to config
1165    /// values are propagated to the rest of the system.
1166    dyncfgs: ConfigSet,
1167}
1168
1169impl Default for SystemVars {
1170    fn default() -> Self {
1171        Self::new()
1172    }
1173}
1174
1175impl SystemVars {
1176    pub fn new() -> Self {
1177        let system_vars = vec![
1178            &MAX_KAFKA_CONNECTIONS,
1179            &MAX_POSTGRES_CONNECTIONS,
1180            &MAX_MYSQL_CONNECTIONS,
1181            &MAX_SQL_SERVER_CONNECTIONS,
1182            &MAX_AWS_PRIVATELINK_CONNECTIONS,
1183            &MAX_TABLES,
1184            &MAX_SOURCES,
1185            &MAX_SINKS,
1186            &MAX_MATERIALIZED_VIEWS,
1187            &MAX_CLUSTERS,
1188            &MAX_REPLICAS_PER_CLUSTER,
1189            &MAX_CREDIT_CONSUMPTION_RATE,
1190            &MAX_DATABASES,
1191            &MAX_SCHEMAS_PER_DATABASE,
1192            &MAX_OBJECTS_PER_SCHEMA,
1193            &MAX_SECRETS,
1194            &MAX_ROLES,
1195            &MAX_NETWORK_POLICIES,
1196            &MAX_RULES_PER_NETWORK_POLICY,
1197            &MAX_RESULT_SIZE,
1198            &MAX_COPY_FROM_ROW_SIZE,
1199            &ALLOWED_CLUSTER_REPLICA_SIZES,
1200            &upsert_rocksdb::UPSERT_ROCKSDB_COMPACTION_STYLE,
1201            &upsert_rocksdb::UPSERT_ROCKSDB_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET,
1202            &upsert_rocksdb::UPSERT_ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES,
1203            &upsert_rocksdb::UPSERT_ROCKSDB_UNIVERSAL_COMPACTION_RATIO,
1204            &upsert_rocksdb::UPSERT_ROCKSDB_PARALLELISM,
1205            &upsert_rocksdb::UPSERT_ROCKSDB_COMPRESSION_TYPE,
1206            &upsert_rocksdb::UPSERT_ROCKSDB_BOTTOMMOST_COMPRESSION_TYPE,
1207            &upsert_rocksdb::UPSERT_ROCKSDB_BATCH_SIZE,
1208            &upsert_rocksdb::UPSERT_ROCKSDB_RETRY_DURATION,
1209            &upsert_rocksdb::UPSERT_ROCKSDB_STATS_LOG_INTERVAL_SECONDS,
1210            &upsert_rocksdb::UPSERT_ROCKSDB_STATS_PERSIST_INTERVAL_SECONDS,
1211            &upsert_rocksdb::UPSERT_ROCKSDB_POINT_LOOKUP_BLOCK_CACHE_SIZE_MB,
1212            &upsert_rocksdb::UPSERT_ROCKSDB_SHRINK_ALLOCATED_BUFFERS_BY_RATIO,
1213            &upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_CLUSTER_MEMORY_FRACTION,
1214            &upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_MEMORY_BYTES,
1215            &upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_ALLOW_STALL,
1216            &STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES,
1217            &STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_TO_CLUSTER_SIZE_FRACTION,
1218            &STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_DISK_ONLY,
1219            &STORAGE_STATISTICS_INTERVAL,
1220            &STORAGE_STATISTICS_COLLECTION_INTERVAL,
1221            &STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO,
1222            &STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS,
1223            &PERSIST_FAST_PATH_LIMIT,
1224            &METRICS_RETENTION,
1225            &UNSAFE_MOCK_AUDIT_EVENT_TIMESTAMP,
1226            &ENABLE_RBAC_CHECKS,
1227            &PG_SOURCE_CONNECT_TIMEOUT,
1228            &PG_SOURCE_TCP_KEEPALIVES_IDLE,
1229            &PG_SOURCE_TCP_KEEPALIVES_INTERVAL,
1230            &PG_SOURCE_TCP_KEEPALIVES_RETRIES,
1231            &PG_SOURCE_TCP_USER_TIMEOUT,
1232            &PG_SOURCE_TCP_CONFIGURE_SERVER,
1233            &PG_SOURCE_SNAPSHOT_STATEMENT_TIMEOUT,
1234            &PG_SOURCE_WAL_SENDER_TIMEOUT,
1235            &PG_SOURCE_SNAPSHOT_COLLECT_STRICT_COUNT,
1236            &MYSQL_SOURCE_TCP_KEEPALIVE,
1237            &MYSQL_SOURCE_SNAPSHOT_MAX_EXECUTION_TIME,
1238            &MYSQL_SOURCE_SNAPSHOT_LOCK_WAIT_TIMEOUT,
1239            &MYSQL_SOURCE_CONNECT_TIMEOUT,
1240            &SSH_CHECK_INTERVAL,
1241            &SSH_CONNECT_TIMEOUT,
1242            &SSH_KEEPALIVES_IDLE,
1243            &KAFKA_SOCKET_KEEPALIVE,
1244            &KAFKA_SOCKET_TIMEOUT,
1245            &KAFKA_TRANSACTION_TIMEOUT,
1246            &KAFKA_SOCKET_CONNECTION_SETUP_TIMEOUT,
1247            &KAFKA_FETCH_METADATA_TIMEOUT,
1248            &KAFKA_PROGRESS_RECORD_FETCH_TIMEOUT,
1249            &ENABLE_LAUNCHDARKLY,
1250            &MAX_CONNECTIONS,
1251            &NETWORK_POLICY,
1252            &SUPERUSER_RESERVED_CONNECTIONS,
1253            &KEEP_N_SOURCE_STATUS_HISTORY_ENTRIES,
1254            &KEEP_N_SINK_STATUS_HISTORY_ENTRIES,
1255            &KEEP_N_PRIVATELINK_STATUS_HISTORY_ENTRIES,
1256            &REPLICA_STATUS_HISTORY_RETENTION_WINDOW,
1257            &ENABLE_STORAGE_SHARD_FINALIZATION,
1258            &ENABLE_DEFAULT_CONNECTION_VALIDATION,
1259            &DEFAULT_TIMESTAMP_INTERVAL,
1260            &MIN_TIMESTAMP_INTERVAL,
1261            &MAX_TIMESTAMP_INTERVAL,
1262            &LOGGING_FILTER,
1263            &OPENTELEMETRY_FILTER,
1264            &LOGGING_FILTER_DEFAULTS,
1265            &OPENTELEMETRY_FILTER_DEFAULTS,
1266            &SENTRY_FILTERS,
1267            &WEBHOOKS_SECRETS_CACHING_TTL_SECS,
1268            &COORD_SLOW_MESSAGE_WARN_THRESHOLD,
1269            &grpc_client::CONNECT_TIMEOUT,
1270            &grpc_client::HTTP2_KEEP_ALIVE_INTERVAL,
1271            &grpc_client::HTTP2_KEEP_ALIVE_TIMEOUT,
1272            &cluster_scheduling::CLUSTER_MULTI_PROCESS_REPLICA_AZ_AFFINITY_WEIGHT,
1273            &cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY,
1274            &cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT,
1275            &cluster_scheduling::CLUSTER_ENABLE_TOPOLOGY_SPREAD,
1276            &cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE,
1277            &cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MAX_SKEW,
1278            &cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MIN_DOMAINS,
1279            &cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_SOFT,
1280            &cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY,
1281            &cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY_WEIGHT,
1282            &cluster_scheduling::CLUSTER_ALTER_CHECK_READY_INTERVAL,
1283            &cluster_scheduling::CLUSTER_CHECK_SCHEDULING_POLICIES_INTERVAL,
1284            &cluster_scheduling::CLUSTER_SECURITY_CONTEXT_ENABLED,
1285            &cluster_scheduling::CLUSTER_REFRESH_MV_COMPACTION_ESTIMATE,
1286            &grpc_client::HTTP2_KEEP_ALIVE_TIMEOUT,
1287            &STATEMENT_LOGGING_MAX_SAMPLE_RATE,
1288            &STATEMENT_LOGGING_DEFAULT_SAMPLE_RATE,
1289            &STATEMENT_LOGGING_TARGET_DATA_RATE,
1290            &STATEMENT_LOGGING_MAX_DATA_CREDIT,
1291            &ENABLE_INTERNAL_STATEMENT_LOGGING,
1292            &OPTIMIZER_STATS_TIMEOUT,
1293            &OPTIMIZER_ONESHOT_STATS_TIMEOUT,
1294            &PRIVATELINK_STATUS_UPDATE_QUOTA_PER_MINUTE,
1295            &WEBHOOK_CONCURRENT_REQUEST_LIMIT,
1296            &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_SIZE,
1297            &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_WAIT,
1298            &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL,
1299            &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL_STAGGER,
1300            &USER_STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION,
1301            &FORCE_SOURCE_TABLE_SYNTAX,
1302            &OPTIMIZER_E2E_LATENCY_WARNING_THRESHOLD,
1303            &SCRAM_ITERATIONS,
1304        ];
1305
1306        let dyncfgs = mz_dyncfgs::all_dyncfgs();
1307        let dyncfg_vars: Vec<_> = dyncfgs
1308            .entries()
1309            .map(|cfg| {
1310                let var = match cfg.default() {
1311                    ConfigVal::Bool(default) => {
1312                        VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
1313                    }
1314                    ConfigVal::U32(default) => {
1315                        VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
1316                    }
1317                    ConfigVal::Usize(default) => {
1318                        VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
1319                    }
1320                    ConfigVal::OptUsize(default) => {
1321                        VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
1322                    }
1323                    ConfigVal::F64(default) => {
1324                        VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
1325                    }
1326                    ConfigVal::String(default) => {
1327                        VarDefinition::new_runtime(cfg.name(), default.clone(), cfg.desc(), false)
1328                    }
1329                    ConfigVal::OptString(default) => {
1330                        VarDefinition::new_runtime(cfg.name(), default.clone(), cfg.desc(), false)
1331                    }
1332                    ConfigVal::Duration(default) => {
1333                        VarDefinition::new_runtime(cfg.name(), default.clone(), cfg.desc(), false)
1334                    }
1335                    ConfigVal::Json(default) => {
1336                        VarDefinition::new_runtime(cfg.name(), default.clone(), cfg.desc(), false)
1337                    }
1338                };
1339                // Carry the dyncfg's declared scope through to the system var,
1340                // so scoped resolution and introspection see it.
1341                var.scoped(cfg.scope())
1342            })
1343            .collect();
1344
1345        let vars: BTreeMap<_, _> = system_vars
1346            .into_iter()
1347            // Include all of our feature flags.
1348            .chain(definitions::FEATURE_FLAGS.iter().copied())
1349            // Include the subset of Session variables we allow system defaults for.
1350            .chain(SESSION_SYSTEM_VARS.values().copied())
1351            .cloned()
1352            // Include Persist configs.
1353            .chain(dyncfg_vars)
1354            .map(|var| (var.name, SystemVar::new(var)))
1355            .collect();
1356
1357        let vars = SystemVars {
1358            vars,
1359            callbacks: BTreeMap::new(),
1360            allow_unsafe: false,
1361            dyncfgs,
1362        };
1363
1364        vars
1365    }
1366
1367    pub fn dyncfgs(&self) -> &ConfigSet {
1368        &self.dyncfgs
1369    }
1370
1371    pub fn set_unsafe(mut self, allow_unsafe: bool) -> Self {
1372        self.allow_unsafe = allow_unsafe;
1373        self
1374    }
1375
1376    pub fn allow_unsafe(&self) -> bool {
1377        self.allow_unsafe
1378    }
1379
1380    fn expect_value<V: 'static>(&self, var: &VarDefinition) -> &V {
1381        let val = self
1382            .vars
1383            .get(var.name)
1384            .expect("provided var should be in state");
1385
1386        val.value_dyn()
1387            .as_any()
1388            .downcast_ref::<V>()
1389            .expect("provided var type should matched stored var")
1390    }
1391
1392    fn expect_config_value<V: ConfigType + 'static>(&self, name: &UncasedStr) -> &V {
1393        let val = self
1394            .vars
1395            .get(name)
1396            .unwrap_or_else(|| panic!("provided var {name} should be in state"));
1397
1398        val.value_dyn()
1399            .as_any()
1400            .downcast_ref()
1401            .expect("provided var type should matched stored var")
1402    }
1403
1404    /// Reset all the values to their defaults (preserving
1405    /// defaults from `VarMut::set_default).
1406    pub fn reset_all(&mut self) {
1407        for (_, var) in &mut self.vars {
1408            var.reset();
1409        }
1410    }
1411
1412    /// Returns an iterator over the configuration parameters and their current
1413    /// values on disk.
1414    pub fn iter(&self) -> impl Iterator<Item = &dyn Var> {
1415        self.vars
1416            .values()
1417            .map(|v| v.as_var())
1418            .filter(|v| !SESSION_SYSTEM_VARS.contains_key(UncasedStr::new(v.name())))
1419    }
1420
1421    /// Returns an iterator over the configuration parameters and their current
1422    /// values on disk. Compared to [`SystemVars::iter`], this should omit vars
1423    /// that shouldn't be synced by SystemParameterFrontend.
1424    pub fn iter_synced(&self) -> impl Iterator<Item = &dyn Var> {
1425        self.iter().filter(|v| v.name() != ENABLE_LAUNCHDARKLY.name)
1426    }
1427
1428    /// Returns an iterator over the configuration parameters that can be overriden per-Session.
1429    pub fn iter_session(&self) -> impl Iterator<Item = &dyn Var> {
1430        self.vars
1431            .values()
1432            .map(|v| v.as_var())
1433            .filter(|v| SESSION_SYSTEM_VARS.contains_key(UncasedStr::new(v.name())))
1434    }
1435
1436    /// Returns whether or not this parameter can be modified by a superuser.
1437    pub fn user_modifiable(&self, name: &str) -> bool {
1438        SESSION_SYSTEM_VARS.contains_key(UncasedStr::new(name))
1439            || name == ENABLE_RBAC_CHECKS.name()
1440            || name == NETWORK_POLICY.name()
1441    }
1442
1443    /// Returns a [`Var`] representing the configuration parameter with the
1444    /// specified name.
1445    ///
1446    /// Configuration parameters are matched case insensitively. If no such
1447    /// configuration parameter exists, `get` returns an error.
1448    ///
1449    /// Note that:
1450    /// - If `name` is known at compile time, you should instead use the named
1451    /// accessor to access the variable with its true Rust type. For example,
1452    /// `self.get("max_tables").value()` returns the string `"25"` or the
1453    /// current value, while `self.max_tables()` returns an i32.
1454    ///
1455    /// - This function does not check that the access variable should be
1456    /// visible because of other settings or users. Before or after accessing
1457    /// this method, you should call `Var::visible`.
1458    ///
1459    /// # Errors
1460    ///
1461    /// The call will return an error:
1462    /// 1. If `name` does not refer to a valid [`SystemVars`] field.
1463    pub fn get(&self, name: &str) -> Result<&dyn Var, VarError> {
1464        self.vars
1465            .get(UncasedStr::new(name))
1466            .map(|v| v.as_var())
1467            .ok_or_else(|| VarError::UnknownParameter(name.into()))
1468    }
1469
1470    /// Check if the given `values` is the default value for the [`Var`]
1471    /// identified by `name`.
1472    ///
1473    /// Note that this function does not check that the access variable should
1474    /// be visible because of other settings or users. Before or after accessing
1475    /// this method, you should call `Var::visible`.
1476    ///
1477    /// # Errors
1478    ///
1479    /// The call will return an error:
1480    /// 1. If `name` does not refer to a valid [`SystemVars`] field.
1481    /// 2. If `values` does not represent a valid [`SystemVars`] value for
1482    ///    `name`.
1483    pub fn is_default(&self, name: &str, input: VarInput) -> Result<bool, VarError> {
1484        self.vars
1485            .get(UncasedStr::new(name))
1486            .ok_or_else(|| VarError::UnknownParameter(name.into()))
1487            .and_then(|v| v.is_default(input))
1488    }
1489
1490    /// Sets the configuration parameter named `name` to the value represented
1491    /// by `input`.
1492    ///
1493    /// Like with [`SystemVars::get`], configuration parameters are matched case
1494    /// insensitively. If `input` is not valid, as determined by the underlying
1495    /// configuration parameter, or if the named configuration parameter does
1496    /// not exist, an error is returned.
1497    ///
1498    /// Return a `bool` value indicating whether the [`Var`] identified by
1499    /// `name` was modified by this call (it won't be if it already had the
1500    /// given `input`).
1501    ///
1502    /// Note that this function does not check that the access variable should
1503    /// be visible because of other settings or users. Before or after accessing
1504    /// this method, you should call `Var::visible`.
1505    ///
1506    /// # Errors
1507    ///
1508    /// The call will return an error:
1509    /// 1. If `name` does not refer to a valid [`SystemVars`] field.
1510    /// 2. If `input` does not represent a valid [`SystemVars`] value for
1511    ///    `name`.
1512    pub fn set(&mut self, name: &str, input: VarInput) -> Result<bool, VarError> {
1513        let result = self
1514            .vars
1515            .get_mut(UncasedStr::new(name))
1516            .ok_or_else(|| VarError::UnknownParameter(name.into()))
1517            .and_then(|v| v.set(input))?;
1518        self.notify_callbacks(name);
1519        Ok(result)
1520    }
1521
1522    /// Parses the configuration parameter value represented by `input` named
1523    /// `name`.
1524    ///
1525    /// Like with [`SystemVars::get`], configuration parameters are matched case
1526    /// insensitively. If `input` is not valid, as determined by the underlying
1527    /// configuration parameter, or if the named configuration parameter does
1528    /// not exist, an error is returned.
1529    ///
1530    /// Return a `Box<dyn Value>` that is the result of parsing `input`.
1531    ///
1532    /// Note that this function does not check that the access variable should
1533    /// be visible because of other settings or users. Before or after accessing
1534    /// this method, you should call `Var::visible`.
1535    ///
1536    /// # Errors
1537    ///
1538    /// The call will return an error:
1539    /// 1. If `name` does not refer to a valid [`SystemVars`] field.
1540    /// 2. If `input` does not represent a valid [`SystemVars`] value for
1541    ///    `name`.
1542    pub fn parse(&self, name: &str, input: VarInput) -> Result<Box<dyn Value>, VarError> {
1543        self.vars
1544            .get(UncasedStr::new(name))
1545            .ok_or_else(|| VarError::UnknownParameter(name.into()))
1546            .and_then(|v| v.parse(input))
1547    }
1548
1549    /// Set the default for this variable. This is the value this
1550    /// variable will be be `reset` to. If no default is set, the static default in the
1551    /// variable definition is used instead.
1552    ///
1553    /// Note that this function does not check that the access variable should
1554    /// be visible because of other settings or users. Before or after accessing
1555    /// this method, you should call `Var::visible`.
1556    pub fn set_default(&mut self, name: &str, input: VarInput) -> Result<(), VarError> {
1557        self.vars
1558            .get_mut(UncasedStr::new(name))
1559            .ok_or_else(|| VarError::UnknownParameter(name.into()))
1560            .and_then(|v| v.set_default(input))?;
1561        self.notify_callbacks(name);
1562        Ok(())
1563    }
1564
1565    /// Sets the configuration parameter named `name` to its default value.
1566    ///
1567    /// Like with [`SystemVars::get`], configuration parameters are matched case
1568    /// insensitively. If the named configuration parameter does not exist, an
1569    /// error is returned.
1570    ///
1571    /// Return a `bool` value indicating whether the [`Var`] identified by
1572    /// `name` was modified by this call (it won't be if was already reset).
1573    ///
1574    /// Note that this function does not check that the access variable should
1575    /// be visible because of other settings or users. Before or after accessing
1576    /// this method, you should call `Var::visible`.
1577    ///
1578    /// # Errors
1579    ///
1580    /// The call will return an error:
1581    /// 1. If `name` does not refer to a valid [`SystemVars`] field.
1582    pub fn reset(&mut self, name: &str) -> Result<bool, VarError> {
1583        let result = self
1584            .vars
1585            .get_mut(UncasedStr::new(name))
1586            .ok_or_else(|| VarError::UnknownParameter(name.into()))
1587            .map(|v| v.reset())?;
1588        self.notify_callbacks(name);
1589        Ok(result)
1590    }
1591
1592    /// Returns a map from each system parameter's name to its default value.
1593    pub fn defaults(&self) -> BTreeMap<String, String> {
1594        self.vars
1595            .iter()
1596            .map(|(name, var)| {
1597                let default = var
1598                    .dynamic_default
1599                    .as_deref()
1600                    .unwrap_or_else(|| var.definition.default_value());
1601                (name.as_str().to_owned(), default.format())
1602            })
1603            .collect()
1604    }
1605
1606    /// Registers a closure that will get called when the value for the
1607    /// specified [`VarDefinition`] changes.
1608    ///
1609    /// The callback is guaranteed to be called at least once.
1610    pub fn register_callback(
1611        &mut self,
1612        var: &VarDefinition,
1613        callback: Arc<dyn Fn(&SystemVars) + Send + Sync>,
1614    ) {
1615        self.callbacks
1616            .entry(var.name().to_string())
1617            .or_default()
1618            .push(callback);
1619        self.notify_callbacks(var.name());
1620    }
1621
1622    /// Notify any external components interested in this variable.
1623    fn notify_callbacks(&self, name: &str) {
1624        // Get the callbacks interested in this variable.
1625        if let Some(callbacks) = self.callbacks.get(name) {
1626            for callback in callbacks {
1627                (callback)(self);
1628            }
1629        }
1630    }
1631
1632    /// Returns the system default for the [`CLUSTER`] session variable. To know the active cluster
1633    /// for the current session, you must check the [`SessionVars`].
1634    pub fn default_cluster(&self) -> String {
1635        self.expect_value::<String>(&CLUSTER).to_owned()
1636    }
1637
1638    /// Returns the value of the `max_kafka_connections` configuration parameter.
1639    pub fn max_kafka_connections(&self) -> u32 {
1640        *self.expect_value(&MAX_KAFKA_CONNECTIONS)
1641    }
1642
1643    /// Returns the value of the `max_postgres_connections` configuration parameter.
1644    pub fn max_postgres_connections(&self) -> u32 {
1645        *self.expect_value(&MAX_POSTGRES_CONNECTIONS)
1646    }
1647
1648    /// Returns the value of the `max_mysql_connections` configuration parameter.
1649    pub fn max_mysql_connections(&self) -> u32 {
1650        *self.expect_value(&MAX_MYSQL_CONNECTIONS)
1651    }
1652
1653    /// Returns the value of the `max_sql_server_connections` configuration parameter.
1654    pub fn max_sql_server_connections(&self) -> u32 {
1655        *self.expect_value(&MAX_SQL_SERVER_CONNECTIONS)
1656    }
1657
1658    /// Returns the value of the `max_aws_privatelink_connections` configuration parameter.
1659    pub fn max_aws_privatelink_connections(&self) -> u32 {
1660        *self.expect_value(&MAX_AWS_PRIVATELINK_CONNECTIONS)
1661    }
1662
1663    /// Returns the value of the `max_tables` configuration parameter.
1664    pub fn max_tables(&self) -> u32 {
1665        *self.expect_value(&MAX_TABLES)
1666    }
1667
1668    /// Returns the value of the `max_sources` configuration parameter.
1669    pub fn max_sources(&self) -> u32 {
1670        *self.expect_value(&MAX_SOURCES)
1671    }
1672
1673    /// Returns the value of the `max_sinks` configuration parameter.
1674    pub fn max_sinks(&self) -> u32 {
1675        *self.expect_value(&MAX_SINKS)
1676    }
1677
1678    /// Returns the value of the `max_materialized_views` configuration parameter.
1679    pub fn max_materialized_views(&self) -> u32 {
1680        *self.expect_value(&MAX_MATERIALIZED_VIEWS)
1681    }
1682
1683    /// Returns the value of the `max_clusters` configuration parameter.
1684    pub fn max_clusters(&self) -> u32 {
1685        *self.expect_value(&MAX_CLUSTERS)
1686    }
1687
1688    /// Returns the value of the `max_replicas_per_cluster` configuration parameter.
1689    pub fn max_replicas_per_cluster(&self) -> u32 {
1690        *self.expect_value(&MAX_REPLICAS_PER_CLUSTER)
1691    }
1692
1693    /// Returns the value of the `max_credit_consumption_rate` configuration parameter.
1694    pub fn max_credit_consumption_rate(&self) -> Numeric {
1695        *self.expect_value(&MAX_CREDIT_CONSUMPTION_RATE)
1696    }
1697
1698    /// Returns the value of the `max_databases` configuration parameter.
1699    pub fn max_databases(&self) -> u32 {
1700        *self.expect_value(&MAX_DATABASES)
1701    }
1702
1703    /// Returns the value of the `max_schemas_per_database` configuration parameter.
1704    pub fn max_schemas_per_database(&self) -> u32 {
1705        *self.expect_value(&MAX_SCHEMAS_PER_DATABASE)
1706    }
1707
1708    /// Returns the value of the `max_objects_per_schema` configuration parameter.
1709    pub fn max_objects_per_schema(&self) -> u32 {
1710        *self.expect_value(&MAX_OBJECTS_PER_SCHEMA)
1711    }
1712
1713    /// Returns the value of the `max_secrets` configuration parameter.
1714    pub fn max_secrets(&self) -> u32 {
1715        *self.expect_value(&MAX_SECRETS)
1716    }
1717
1718    /// Returns the value of the `max_roles` configuration parameter.
1719    pub fn max_roles(&self) -> u32 {
1720        *self.expect_value(&MAX_ROLES)
1721    }
1722
1723    /// Returns the value of the `max_network_policies` configuration parameter.
1724    pub fn max_network_policies(&self) -> u32 {
1725        *self.expect_value(&MAX_NETWORK_POLICIES)
1726    }
1727
1728    /// Returns the value of the `max_network_policies` configuration parameter.
1729    pub fn max_rules_per_network_policy(&self) -> u32 {
1730        *self.expect_value(&MAX_RULES_PER_NETWORK_POLICY)
1731    }
1732
1733    /// Returns the value of the `max_result_size` configuration parameter.
1734    pub fn max_result_size(&self) -> u64 {
1735        self.expect_value::<ByteSize>(&MAX_RESULT_SIZE).as_bytes()
1736    }
1737
1738    /// Returns the value of the `max_copy_from_row_size` configuration parameter.
1739    pub fn max_copy_from_row_size(&self) -> u64 {
1740        self.expect_value::<ByteSize>(&MAX_COPY_FROM_ROW_SIZE)
1741            .as_bytes()
1742    }
1743
1744    /// Returns the value of the `allowed_cluster_replica_sizes` configuration parameter.
1745    pub fn allowed_cluster_replica_sizes(&self) -> Vec<String> {
1746        self.expect_value::<Vec<Ident>>(&ALLOWED_CLUSTER_REPLICA_SIZES)
1747            .into_iter()
1748            .map(|s| s.as_str().into())
1749            .collect()
1750    }
1751
1752    /// Returns the value of the `default_cluster_replication_factor` configuration parameter.
1753    pub fn default_cluster_replication_factor(&self) -> u32 {
1754        *self.expect_value::<u32>(&DEFAULT_CLUSTER_REPLICATION_FACTOR)
1755    }
1756
1757    pub fn upsert_rocksdb_compaction_style(&self) -> mz_rocksdb_types::config::CompactionStyle {
1758        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_COMPACTION_STYLE)
1759    }
1760
1761    pub fn upsert_rocksdb_optimize_compaction_memtable_budget(&self) -> usize {
1762        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET)
1763    }
1764
1765    pub fn upsert_rocksdb_level_compaction_dynamic_level_bytes(&self) -> bool {
1766        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES)
1767    }
1768
1769    pub fn upsert_rocksdb_universal_compaction_ratio(&self) -> i32 {
1770        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_UNIVERSAL_COMPACTION_RATIO)
1771    }
1772
1773    pub fn upsert_rocksdb_parallelism(&self) -> Option<i32> {
1774        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_PARALLELISM)
1775    }
1776
1777    pub fn upsert_rocksdb_compression_type(&self) -> mz_rocksdb_types::config::CompressionType {
1778        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_COMPRESSION_TYPE)
1779    }
1780
1781    pub fn upsert_rocksdb_bottommost_compression_type(
1782        &self,
1783    ) -> mz_rocksdb_types::config::CompressionType {
1784        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_BOTTOMMOST_COMPRESSION_TYPE)
1785    }
1786
1787    pub fn upsert_rocksdb_batch_size(&self) -> usize {
1788        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_BATCH_SIZE)
1789    }
1790
1791    pub fn upsert_rocksdb_retry_duration(&self) -> Duration {
1792        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_RETRY_DURATION)
1793    }
1794
1795    pub fn upsert_rocksdb_stats_log_interval_seconds(&self) -> u32 {
1796        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_STATS_LOG_INTERVAL_SECONDS)
1797    }
1798
1799    pub fn upsert_rocksdb_stats_persist_interval_seconds(&self) -> u32 {
1800        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_STATS_PERSIST_INTERVAL_SECONDS)
1801    }
1802
1803    pub fn upsert_rocksdb_point_lookup_block_cache_size_mb(&self) -> Option<u32> {
1804        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_POINT_LOOKUP_BLOCK_CACHE_SIZE_MB)
1805    }
1806
1807    pub fn upsert_rocksdb_shrink_allocated_buffers_by_ratio(&self) -> usize {
1808        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_SHRINK_ALLOCATED_BUFFERS_BY_RATIO)
1809    }
1810
1811    pub fn upsert_rocksdb_write_buffer_manager_cluster_memory_fraction(&self) -> Option<Numeric> {
1812        *self.expect_value(
1813            &upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_CLUSTER_MEMORY_FRACTION,
1814        )
1815    }
1816
1817    pub fn upsert_rocksdb_write_buffer_manager_memory_bytes(&self) -> Option<usize> {
1818        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_MEMORY_BYTES)
1819    }
1820
1821    pub fn upsert_rocksdb_write_buffer_manager_allow_stall(&self) -> bool {
1822        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_ALLOW_STALL)
1823    }
1824
1825    pub fn persist_fast_path_limit(&self) -> usize {
1826        *self.expect_value(&PERSIST_FAST_PATH_LIMIT)
1827    }
1828
1829    /// Returns the `pg_source_connect_timeout` configuration parameter.
1830    pub fn pg_source_connect_timeout(&self) -> Duration {
1831        *self.expect_value(&PG_SOURCE_CONNECT_TIMEOUT)
1832    }
1833
1834    /// Returns the `pg_source_tcp_keepalives_retries` configuration parameter.
1835    pub fn pg_source_tcp_keepalives_retries(&self) -> u32 {
1836        *self.expect_value(&PG_SOURCE_TCP_KEEPALIVES_RETRIES)
1837    }
1838
1839    /// Returns the `pg_source_tcp_keepalives_idle` configuration parameter.
1840    pub fn pg_source_tcp_keepalives_idle(&self) -> Duration {
1841        *self.expect_value(&PG_SOURCE_TCP_KEEPALIVES_IDLE)
1842    }
1843
1844    /// Returns the `pg_source_tcp_keepalives_interval` configuration parameter.
1845    pub fn pg_source_tcp_keepalives_interval(&self) -> Duration {
1846        *self.expect_value(&PG_SOURCE_TCP_KEEPALIVES_INTERVAL)
1847    }
1848
1849    /// Returns the `pg_source_tcp_user_timeout` configuration parameter.
1850    pub fn pg_source_tcp_user_timeout(&self) -> Duration {
1851        *self.expect_value(&PG_SOURCE_TCP_USER_TIMEOUT)
1852    }
1853
1854    /// Returns the `pg_source_tcp_configure_server` configuration parameter.
1855    pub fn pg_source_tcp_configure_server(&self) -> bool {
1856        *self.expect_value(&PG_SOURCE_TCP_CONFIGURE_SERVER)
1857    }
1858
1859    /// Returns the `pg_source_snapshot_statement_timeout` configuration parameter.
1860    pub fn pg_source_snapshot_statement_timeout(&self) -> Duration {
1861        *self.expect_value(&PG_SOURCE_SNAPSHOT_STATEMENT_TIMEOUT)
1862    }
1863
1864    /// Returns the `pg_source_wal_sender_timeout` configuration parameter.
1865    pub fn pg_source_wal_sender_timeout(&self) -> Option<Duration> {
1866        *self.expect_value(&PG_SOURCE_WAL_SENDER_TIMEOUT)
1867    }
1868
1869    /// Returns the `pg_source_snapshot_collect_strict_count` configuration parameter.
1870    pub fn pg_source_snapshot_collect_strict_count(&self) -> bool {
1871        *self.expect_value(&PG_SOURCE_SNAPSHOT_COLLECT_STRICT_COUNT)
1872    }
1873
1874    /// Returns the `mysql_source_tcp_keepalive` configuration parameter.
1875    pub fn mysql_source_tcp_keepalive(&self) -> Duration {
1876        *self.expect_value(&MYSQL_SOURCE_TCP_KEEPALIVE)
1877    }
1878
1879    /// Returns the `mysql_source_snapshot_max_execution_time` configuration parameter.
1880    pub fn mysql_source_snapshot_max_execution_time(&self) -> Duration {
1881        *self.expect_value(&MYSQL_SOURCE_SNAPSHOT_MAX_EXECUTION_TIME)
1882    }
1883
1884    /// Returns the `mysql_source_snapshot_lock_wait_timeout` configuration parameter.
1885    pub fn mysql_source_snapshot_lock_wait_timeout(&self) -> Duration {
1886        *self.expect_value(&MYSQL_SOURCE_SNAPSHOT_LOCK_WAIT_TIMEOUT)
1887    }
1888
1889    /// Returns the `mysql_source_connect_timeout` configuration parameter.
1890    pub fn mysql_source_connect_timeout(&self) -> Duration {
1891        *self.expect_value(&MYSQL_SOURCE_CONNECT_TIMEOUT)
1892    }
1893
1894    /// Returns the `ssh_check_interval` configuration parameter.
1895    pub fn ssh_check_interval(&self) -> Duration {
1896        *self.expect_value(&SSH_CHECK_INTERVAL)
1897    }
1898
1899    /// Returns the `ssh_connect_timeout` configuration parameter.
1900    pub fn ssh_connect_timeout(&self) -> Duration {
1901        *self.expect_value(&SSH_CONNECT_TIMEOUT)
1902    }
1903
1904    /// Returns the `ssh_keepalives_idle` configuration parameter.
1905    pub fn ssh_keepalives_idle(&self) -> Duration {
1906        *self.expect_value(&SSH_KEEPALIVES_IDLE)
1907    }
1908
1909    /// Returns the `kafka_socket_keepalive` configuration parameter.
1910    pub fn kafka_socket_keepalive(&self) -> bool {
1911        *self.expect_value(&KAFKA_SOCKET_KEEPALIVE)
1912    }
1913
1914    /// Returns the `kafka_socket_timeout` configuration parameter.
1915    pub fn kafka_socket_timeout(&self) -> Option<Duration> {
1916        *self.expect_value(&KAFKA_SOCKET_TIMEOUT)
1917    }
1918
1919    /// Returns the `kafka_transaction_timeout` configuration parameter.
1920    pub fn kafka_transaction_timeout(&self) -> Duration {
1921        *self.expect_value(&KAFKA_TRANSACTION_TIMEOUT)
1922    }
1923
1924    /// Returns the `kafka_socket_connection_setup_timeout` configuration parameter.
1925    pub fn kafka_socket_connection_setup_timeout(&self) -> Duration {
1926        *self.expect_value(&KAFKA_SOCKET_CONNECTION_SETUP_TIMEOUT)
1927    }
1928
1929    /// Returns the `kafka_fetch_metadata_timeout` configuration parameter.
1930    pub fn kafka_fetch_metadata_timeout(&self) -> Duration {
1931        *self.expect_value(&KAFKA_FETCH_METADATA_TIMEOUT)
1932    }
1933
1934    /// Returns the `kafka_progress_record_fetch_timeout` configuration parameter.
1935    pub fn kafka_progress_record_fetch_timeout(&self) -> Option<Duration> {
1936        *self.expect_value(&KAFKA_PROGRESS_RECORD_FETCH_TIMEOUT)
1937    }
1938
1939    /// Returns the `crdb_connect_timeout` configuration parameter.
1940    pub fn crdb_connect_timeout(&self) -> Duration {
1941        *self.expect_config_value(UncasedStr::new(
1942            mz_persist_client::cfg::CRDB_CONNECT_TIMEOUT.name(),
1943        ))
1944    }
1945
1946    /// Returns the `crdb_tcp_user_timeout` configuration parameter.
1947    pub fn crdb_tcp_user_timeout(&self) -> Duration {
1948        *self.expect_config_value(UncasedStr::new(
1949            mz_persist_client::cfg::CRDB_TCP_USER_TIMEOUT.name(),
1950        ))
1951    }
1952
1953    /// Returns the `crdb_keepalives_idle` configuration parameter.
1954    pub fn crdb_keepalives_idle(&self) -> Duration {
1955        *self.expect_config_value(UncasedStr::new(
1956            mz_persist_client::cfg::CRDB_KEEPALIVES_IDLE.name(),
1957        ))
1958    }
1959
1960    /// Returns the `crdb_keepalives_interval` configuration parameter.
1961    pub fn crdb_keepalives_interval(&self) -> Duration {
1962        *self.expect_config_value(UncasedStr::new(
1963            mz_persist_client::cfg::CRDB_KEEPALIVES_INTERVAL.name(),
1964        ))
1965    }
1966
1967    /// Returns the `crdb_keepalives_retries` configuration parameter.
1968    pub fn crdb_keepalives_retries(&self) -> u32 {
1969        *self.expect_config_value(UncasedStr::new(
1970            mz_persist_client::cfg::CRDB_KEEPALIVES_RETRIES.name(),
1971        ))
1972    }
1973
1974    /// Returns the `storage_dataflow_max_inflight_bytes` configuration parameter.
1975    pub fn storage_dataflow_max_inflight_bytes(&self) -> Option<usize> {
1976        *self.expect_value(&STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES)
1977    }
1978
1979    /// Returns the `storage_dataflow_max_inflight_bytes_to_cluster_size_fraction` configuration parameter.
1980    pub fn storage_dataflow_max_inflight_bytes_to_cluster_size_fraction(&self) -> Option<Numeric> {
1981        *self.expect_value(&STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_TO_CLUSTER_SIZE_FRACTION)
1982    }
1983
1984    /// Returns the `storage_shrink_upsert_unused_buffers_by_ratio` configuration parameter.
1985    pub fn storage_shrink_upsert_unused_buffers_by_ratio(&self) -> usize {
1986        *self.expect_value(&STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO)
1987    }
1988
1989    /// Returns the `storage_dataflow_max_inflight_bytes_disk_only` configuration parameter.
1990    pub fn storage_dataflow_max_inflight_bytes_disk_only(&self) -> bool {
1991        *self.expect_value(&STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_DISK_ONLY)
1992    }
1993
1994    /// Returns the `storage_statistics_interval` configuration parameter.
1995    pub fn storage_statistics_interval(&self) -> Duration {
1996        *self.expect_value(&STORAGE_STATISTICS_INTERVAL)
1997    }
1998
1999    /// Returns the `storage_statistics_collection_interval` configuration parameter.
2000    pub fn storage_statistics_collection_interval(&self) -> Duration {
2001        *self.expect_value(&STORAGE_STATISTICS_COLLECTION_INTERVAL)
2002    }
2003
2004    /// Returns the `storage_record_source_sink_namespaced_errors` configuration parameter.
2005    pub fn storage_record_source_sink_namespaced_errors(&self) -> bool {
2006        *self.expect_value(&STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS)
2007    }
2008
2009    /// Returns the `persist_stats_filter_enabled` configuration parameter.
2010    pub fn persist_stats_filter_enabled(&self) -> bool {
2011        *self.expect_config_value(UncasedStr::new(
2012            mz_persist_client::stats::STATS_FILTER_ENABLED.name(),
2013        ))
2014    }
2015
2016    pub fn scram_iterations(&self) -> NonZeroU32 {
2017        *self.expect_value(&SCRAM_ITERATIONS)
2018    }
2019
2020    pub fn dyncfg_updates(&self) -> ConfigUpdates {
2021        let mut updates = ConfigUpdates::default();
2022        for entry in self.dyncfgs.entries() {
2023            let name = UncasedStr::new(entry.name());
2024            let val = match entry.val() {
2025                ConfigVal::Bool(_) => ConfigVal::from(*self.expect_config_value::<bool>(name)),
2026                ConfigVal::U32(_) => ConfigVal::from(*self.expect_config_value::<u32>(name)),
2027                ConfigVal::Usize(_) => ConfigVal::from(*self.expect_config_value::<usize>(name)),
2028                ConfigVal::OptUsize(_) => {
2029                    ConfigVal::from(*self.expect_config_value::<Option<usize>>(name))
2030                }
2031                ConfigVal::F64(_) => ConfigVal::from(*self.expect_config_value::<f64>(name)),
2032                ConfigVal::String(_) => {
2033                    ConfigVal::from(self.expect_config_value::<String>(name).clone())
2034                }
2035                ConfigVal::OptString(_) => {
2036                    ConfigVal::from(self.expect_config_value::<Option<String>>(name).clone())
2037                }
2038                ConfigVal::Duration(_) => {
2039                    ConfigVal::from(*self.expect_config_value::<Duration>(name))
2040                }
2041                ConfigVal::Json(_) => {
2042                    ConfigVal::from(self.expect_config_value::<serde_json::Value>(name).clone())
2043                }
2044            };
2045            updates.add_dynamic(entry.name(), val);
2046        }
2047        updates.apply(&self.dyncfgs);
2048        updates
2049    }
2050
2051    /// Returns the `metrics_retention` configuration parameter.
2052    pub fn metrics_retention(&self) -> Duration {
2053        *self.expect_value(&METRICS_RETENTION)
2054    }
2055
2056    /// Returns the `unsafe_mock_audit_event_timestamp` configuration parameter.
2057    pub fn unsafe_mock_audit_event_timestamp(&self) -> Option<mz_repr::Timestamp> {
2058        *self.expect_value(&UNSAFE_MOCK_AUDIT_EVENT_TIMESTAMP)
2059    }
2060
2061    /// Returns the `enable_rbac_checks` configuration parameter.
2062    pub fn enable_rbac_checks(&self) -> bool {
2063        *self.expect_value(&ENABLE_RBAC_CHECKS)
2064    }
2065
2066    /// Returns the `max_connections` configuration parameter.
2067    pub fn max_connections(&self) -> u32 {
2068        *self.expect_value(&MAX_CONNECTIONS)
2069    }
2070
2071    pub fn default_network_policy_name(&self) -> String {
2072        self.expect_value::<String>(&NETWORK_POLICY).clone()
2073    }
2074
2075    /// Returns the `superuser_reserved_connections` configuration parameter.
2076    pub fn superuser_reserved_connections(&self) -> u32 {
2077        *self.expect_value(&SUPERUSER_RESERVED_CONNECTIONS)
2078    }
2079
2080    pub fn keep_n_source_status_history_entries(&self) -> usize {
2081        *self.expect_value(&KEEP_N_SOURCE_STATUS_HISTORY_ENTRIES)
2082    }
2083
2084    pub fn keep_n_sink_status_history_entries(&self) -> usize {
2085        *self.expect_value(&KEEP_N_SINK_STATUS_HISTORY_ENTRIES)
2086    }
2087
2088    pub fn keep_n_privatelink_status_history_entries(&self) -> usize {
2089        *self.expect_value(&KEEP_N_PRIVATELINK_STATUS_HISTORY_ENTRIES)
2090    }
2091
2092    pub fn replica_status_history_retention_window(&self) -> Duration {
2093        *self.expect_value(&REPLICA_STATUS_HISTORY_RETENTION_WINDOW)
2094    }
2095
2096    /// Returns the `enable_storage_shard_finalization` configuration parameter.
2097    pub fn enable_storage_shard_finalization(&self) -> bool {
2098        *self.expect_value(&ENABLE_STORAGE_SHARD_FINALIZATION)
2099    }
2100
2101    /// Returns the `enable_default_connection_validation` configuration parameter.
2102    pub fn enable_default_connection_validation(&self) -> bool {
2103        *self.expect_value(&ENABLE_DEFAULT_CONNECTION_VALIDATION)
2104    }
2105
2106    /// Returns the `default_timestamp_interval` configuration parameter.
2107    pub fn default_timestamp_interval(&self) -> Duration {
2108        *self.expect_value(&DEFAULT_TIMESTAMP_INTERVAL)
2109    }
2110
2111    /// Returns the `min_timestamp_interval` configuration parameter.
2112    pub fn min_timestamp_interval(&self) -> Duration {
2113        *self.expect_value(&MIN_TIMESTAMP_INTERVAL)
2114    }
2115    /// Returns the `max_timestamp_interval` configuration parameter.
2116    pub fn max_timestamp_interval(&self) -> Duration {
2117        *self.expect_value(&MAX_TIMESTAMP_INTERVAL)
2118    }
2119
2120    pub fn logging_filter(&self) -> CloneableEnvFilter {
2121        self.expect_value::<CloneableEnvFilter>(&LOGGING_FILTER)
2122            .clone()
2123    }
2124
2125    pub fn opentelemetry_filter(&self) -> CloneableEnvFilter {
2126        self.expect_value::<CloneableEnvFilter>(&OPENTELEMETRY_FILTER)
2127            .clone()
2128    }
2129
2130    pub fn logging_filter_defaults(&self) -> Vec<SerializableDirective> {
2131        self.expect_value::<Vec<SerializableDirective>>(&LOGGING_FILTER_DEFAULTS)
2132            .clone()
2133    }
2134
2135    pub fn opentelemetry_filter_defaults(&self) -> Vec<SerializableDirective> {
2136        self.expect_value::<Vec<SerializableDirective>>(&OPENTELEMETRY_FILTER_DEFAULTS)
2137            .clone()
2138    }
2139
2140    pub fn sentry_filters(&self) -> Vec<SerializableDirective> {
2141        self.expect_value::<Vec<SerializableDirective>>(&SENTRY_FILTERS)
2142            .clone()
2143    }
2144
2145    pub fn webhooks_secrets_caching_ttl_secs(&self) -> usize {
2146        *self.expect_value(&WEBHOOKS_SECRETS_CACHING_TTL_SECS)
2147    }
2148
2149    pub fn coord_slow_message_warn_threshold(&self) -> Duration {
2150        *self.expect_value(&COORD_SLOW_MESSAGE_WARN_THRESHOLD)
2151    }
2152
2153    pub fn grpc_client_http2_keep_alive_interval(&self) -> Duration {
2154        *self.expect_value(&grpc_client::HTTP2_KEEP_ALIVE_INTERVAL)
2155    }
2156
2157    pub fn grpc_client_http2_keep_alive_timeout(&self) -> Duration {
2158        *self.expect_value(&grpc_client::HTTP2_KEEP_ALIVE_TIMEOUT)
2159    }
2160
2161    pub fn grpc_connect_timeout(&self) -> Duration {
2162        *self.expect_value(&grpc_client::CONNECT_TIMEOUT)
2163    }
2164
2165    pub fn cluster_multi_process_replica_az_affinity_weight(&self) -> Option<i32> {
2166        *self.expect_value(&cluster_scheduling::CLUSTER_MULTI_PROCESS_REPLICA_AZ_AFFINITY_WEIGHT)
2167    }
2168
2169    pub fn cluster_soften_replication_anti_affinity(&self) -> bool {
2170        *self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY)
2171    }
2172
2173    pub fn cluster_soften_replication_anti_affinity_weight(&self) -> i32 {
2174        *self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT)
2175    }
2176
2177    pub fn cluster_enable_topology_spread(&self) -> bool {
2178        *self.expect_value(&cluster_scheduling::CLUSTER_ENABLE_TOPOLOGY_SPREAD)
2179    }
2180
2181    pub fn cluster_topology_spread_ignore_non_singular_scale(&self) -> bool {
2182        *self.expect_value(&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE)
2183    }
2184
2185    pub fn cluster_topology_spread_max_skew(&self) -> i32 {
2186        *self.expect_value(&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MAX_SKEW)
2187    }
2188
2189    pub fn cluster_topology_spread_set_min_domains(&self) -> Option<i32> {
2190        *self.expect_value(&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MIN_DOMAINS)
2191    }
2192
2193    pub fn cluster_topology_spread_soft(&self) -> bool {
2194        *self.expect_value(&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_SOFT)
2195    }
2196
2197    pub fn cluster_soften_az_affinity(&self) -> bool {
2198        *self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY)
2199    }
2200
2201    pub fn cluster_soften_az_affinity_weight(&self) -> i32 {
2202        *self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY_WEIGHT)
2203    }
2204
2205    pub fn cluster_alter_check_ready_interval(&self) -> Duration {
2206        *self.expect_value(&cluster_scheduling::CLUSTER_ALTER_CHECK_READY_INTERVAL)
2207    }
2208
2209    pub fn cluster_check_scheduling_policies_interval(&self) -> Duration {
2210        *self.expect_value(&cluster_scheduling::CLUSTER_CHECK_SCHEDULING_POLICIES_INTERVAL)
2211    }
2212
2213    pub fn cluster_security_context_enabled(&self) -> bool {
2214        *self.expect_value(&cluster_scheduling::CLUSTER_SECURITY_CONTEXT_ENABLED)
2215    }
2216
2217    pub fn cluster_refresh_mv_compaction_estimate(&self) -> Duration {
2218        *self.expect_value(&cluster_scheduling::CLUSTER_REFRESH_MV_COMPACTION_ESTIMATE)
2219    }
2220
2221    /// Returns the `privatelink_status_update_quota_per_minute` configuration parameter.
2222    pub fn privatelink_status_update_quota_per_minute(&self) -> u32 {
2223        *self.expect_value(&PRIVATELINK_STATUS_UPDATE_QUOTA_PER_MINUTE)
2224    }
2225
2226    pub fn statement_logging_target_data_rate(&self) -> Option<usize> {
2227        *self.expect_value(&STATEMENT_LOGGING_TARGET_DATA_RATE)
2228    }
2229
2230    pub fn statement_logging_max_data_credit(&self) -> Option<usize> {
2231        *self.expect_value(&STATEMENT_LOGGING_MAX_DATA_CREDIT)
2232    }
2233
2234    /// Returns the `statement_logging_max_sample_rate` configuration parameter.
2235    pub fn statement_logging_max_sample_rate(&self) -> Numeric {
2236        *self.expect_value(&STATEMENT_LOGGING_MAX_SAMPLE_RATE)
2237    }
2238
2239    /// Returns the `statement_logging_default_sample_rate` configuration parameter.
2240    pub fn statement_logging_default_sample_rate(&self) -> Numeric {
2241        *self.expect_value(&STATEMENT_LOGGING_DEFAULT_SAMPLE_RATE)
2242    }
2243
2244    /// Returns the `enable_internal_statement_logging` configuration parameter.
2245    pub fn enable_internal_statement_logging(&self) -> bool {
2246        *self.expect_value(&ENABLE_INTERNAL_STATEMENT_LOGGING)
2247    }
2248
2249    /// Returns the `optimizer_stats_timeout` configuration parameter.
2250    pub fn optimizer_stats_timeout(&self) -> Duration {
2251        *self.expect_value(&OPTIMIZER_STATS_TIMEOUT)
2252    }
2253
2254    /// Returns the `optimizer_oneshot_stats_timeout` configuration parameter.
2255    pub fn optimizer_oneshot_stats_timeout(&self) -> Duration {
2256        *self.expect_value(&OPTIMIZER_ONESHOT_STATS_TIMEOUT)
2257    }
2258
2259    /// Returns the `webhook_concurrent_request_limit` configuration parameter.
2260    pub fn webhook_concurrent_request_limit(&self) -> usize {
2261        *self.expect_value(&WEBHOOK_CONCURRENT_REQUEST_LIMIT)
2262    }
2263
2264    /// Returns the `pg_timestamp_oracle_connection_pool_max_size` configuration parameter.
2265    pub fn pg_timestamp_oracle_connection_pool_max_size(&self) -> usize {
2266        *self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_SIZE)
2267    }
2268
2269    /// Returns the `pg_timestamp_oracle_connection_pool_max_wait` configuration parameter.
2270    pub fn pg_timestamp_oracle_connection_pool_max_wait(&self) -> Option<Duration> {
2271        *self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_WAIT)
2272    }
2273
2274    /// Returns the `pg_timestamp_oracle_connection_pool_ttl` configuration parameter.
2275    pub fn pg_timestamp_oracle_connection_pool_ttl(&self) -> Duration {
2276        *self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL)
2277    }
2278
2279    /// Returns the `pg_timestamp_oracle_connection_pool_ttl_stagger` configuration parameter.
2280    pub fn pg_timestamp_oracle_connection_pool_ttl_stagger(&self) -> Duration {
2281        *self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL_STAGGER)
2282    }
2283
2284    /// Returns the `user_storage_managed_collections_batch_duration` configuration parameter.
2285    pub fn user_storage_managed_collections_batch_duration(&self) -> Duration {
2286        *self.expect_value(&USER_STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION)
2287    }
2288
2289    pub fn force_source_table_syntax(&self) -> bool {
2290        *self.expect_value(&FORCE_SOURCE_TABLE_SYNTAX)
2291    }
2292
2293    pub fn optimizer_e2e_latency_warning_threshold(&self) -> Duration {
2294        *self.expect_value(&OPTIMIZER_E2E_LATENCY_WARNING_THRESHOLD)
2295    }
2296
2297    /// Returns whether the named variable is a controller configuration parameter.
2298    pub fn is_controller_config_var(&self, name: &str) -> bool {
2299        self.is_dyncfg_var(name)
2300    }
2301
2302    /// Returns whether the named variable is a compute configuration parameter
2303    /// (things that go in `ComputeParameters` and are sent to replicas via `UpdateConfiguration`
2304    /// commands).
2305    pub fn is_compute_config_var(&self, name: &str) -> bool {
2306        name == MAX_RESULT_SIZE.name() || self.is_dyncfg_var(name) || is_tracing_var(name)
2307    }
2308
2309    /// Returns whether the named variable is a metrics configuration parameter
2310    pub fn is_metrics_config_var(&self, name: &str) -> bool {
2311        self.is_dyncfg_var(name)
2312    }
2313
2314    /// Returns whether the named variable is a storage configuration parameter.
2315    pub fn is_storage_config_var(&self, name: &str) -> bool {
2316        name == PG_SOURCE_CONNECT_TIMEOUT.name()
2317            || name == PG_SOURCE_TCP_KEEPALIVES_IDLE.name()
2318            || name == PG_SOURCE_TCP_KEEPALIVES_INTERVAL.name()
2319            || name == PG_SOURCE_TCP_KEEPALIVES_RETRIES.name()
2320            || name == PG_SOURCE_TCP_USER_TIMEOUT.name()
2321            || name == PG_SOURCE_TCP_CONFIGURE_SERVER.name()
2322            || name == PG_SOURCE_SNAPSHOT_STATEMENT_TIMEOUT.name()
2323            || name == PG_SOURCE_WAL_SENDER_TIMEOUT.name()
2324            || name == PG_SOURCE_SNAPSHOT_COLLECT_STRICT_COUNT.name()
2325            || name == MYSQL_SOURCE_TCP_KEEPALIVE.name()
2326            || name == MYSQL_SOURCE_SNAPSHOT_MAX_EXECUTION_TIME.name()
2327            || name == MYSQL_SOURCE_SNAPSHOT_LOCK_WAIT_TIMEOUT.name()
2328            || name == MYSQL_SOURCE_CONNECT_TIMEOUT.name()
2329            || name == ENABLE_STORAGE_SHARD_FINALIZATION.name()
2330            || name == SSH_CHECK_INTERVAL.name()
2331            || name == SSH_CONNECT_TIMEOUT.name()
2332            || name == SSH_KEEPALIVES_IDLE.name()
2333            || name == KAFKA_SOCKET_KEEPALIVE.name()
2334            || name == KAFKA_SOCKET_TIMEOUT.name()
2335            || name == KAFKA_TRANSACTION_TIMEOUT.name()
2336            || name == KAFKA_SOCKET_CONNECTION_SETUP_TIMEOUT.name()
2337            || name == KAFKA_FETCH_METADATA_TIMEOUT.name()
2338            || name == KAFKA_PROGRESS_RECORD_FETCH_TIMEOUT.name()
2339            || name == STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES.name()
2340            || name == STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_TO_CLUSTER_SIZE_FRACTION.name()
2341            || name == STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_DISK_ONLY.name()
2342            || name == STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO.name()
2343            || name == STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS.name()
2344            || name == STORAGE_STATISTICS_INTERVAL.name()
2345            || name == STORAGE_STATISTICS_COLLECTION_INTERVAL.name()
2346            || name == USER_STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION.name()
2347            || is_upsert_rocksdb_config_var(name)
2348            || self.is_dyncfg_var(name)
2349            || is_tracing_var(name)
2350    }
2351
2352    /// Returns whether the named variable is a dyncfg configuration parameter.
2353    fn is_dyncfg_var(&self, name: &str) -> bool {
2354        self.dyncfgs.entries().any(|e| name == e.name())
2355    }
2356}
2357
2358pub fn is_tracing_var(name: &str) -> bool {
2359    name == LOGGING_FILTER.name()
2360        || name == LOGGING_FILTER_DEFAULTS.name()
2361        || name == OPENTELEMETRY_FILTER.name()
2362        || name == OPENTELEMETRY_FILTER_DEFAULTS.name()
2363        || name == SENTRY_FILTERS.name()
2364}
2365
2366/// Returns whether the named variable is a caching configuration parameter.
2367pub fn is_secrets_caching_var(name: &str) -> bool {
2368    name == WEBHOOKS_SECRETS_CACHING_TTL_SECS.name()
2369}
2370
2371fn is_upsert_rocksdb_config_var(name: &str) -> bool {
2372    name == upsert_rocksdb::UPSERT_ROCKSDB_COMPACTION_STYLE.name()
2373        || name == upsert_rocksdb::UPSERT_ROCKSDB_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET.name()
2374        || name == upsert_rocksdb::UPSERT_ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES.name()
2375        || name == upsert_rocksdb::UPSERT_ROCKSDB_UNIVERSAL_COMPACTION_RATIO.name()
2376        || name == upsert_rocksdb::UPSERT_ROCKSDB_PARALLELISM.name()
2377        || name == upsert_rocksdb::UPSERT_ROCKSDB_COMPRESSION_TYPE.name()
2378        || name == upsert_rocksdb::UPSERT_ROCKSDB_BOTTOMMOST_COMPRESSION_TYPE.name()
2379        || name == upsert_rocksdb::UPSERT_ROCKSDB_BATCH_SIZE.name()
2380        || name == upsert_rocksdb::UPSERT_ROCKSDB_STATS_LOG_INTERVAL_SECONDS.name()
2381        || name == upsert_rocksdb::UPSERT_ROCKSDB_STATS_PERSIST_INTERVAL_SECONDS.name()
2382        || name == upsert_rocksdb::UPSERT_ROCKSDB_POINT_LOOKUP_BLOCK_CACHE_SIZE_MB.name()
2383        || name == upsert_rocksdb::UPSERT_ROCKSDB_SHRINK_ALLOCATED_BUFFERS_BY_RATIO.name()
2384}
2385
2386/// Returns whether the named variable is a (Postgres/CRDB) timestamp oracle
2387/// configuration parameter.
2388pub fn is_timestamp_oracle_config_var(name: &str) -> bool {
2389    name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_SIZE.name()
2390        || name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_WAIT.name()
2391        || name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL.name()
2392        || name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL_STAGGER.name()
2393        || name == CRDB_CONNECT_TIMEOUT.name()
2394        || name == CRDB_TCP_USER_TIMEOUT.name()
2395        || name == CRDB_KEEPALIVES_IDLE.name()
2396        || name == CRDB_KEEPALIVES_INTERVAL.name()
2397        || name == CRDB_KEEPALIVES_RETRIES.name()
2398}
2399
2400/// Returns whether the named variable is a cluster scheduling config
2401pub fn is_cluster_scheduling_var(name: &str) -> bool {
2402    name == cluster_scheduling::CLUSTER_MULTI_PROCESS_REPLICA_AZ_AFFINITY_WEIGHT.name()
2403        || name == cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY.name()
2404        || name == cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT.name()
2405        || name == cluster_scheduling::CLUSTER_ENABLE_TOPOLOGY_SPREAD.name()
2406        || name == cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE.name()
2407        || name == cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MAX_SKEW.name()
2408        || name == cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_SOFT.name()
2409        || name == cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY.name()
2410        || name == cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY_WEIGHT.name()
2411}
2412
2413/// Returns whether the named variable is an HTTP server related config var.
2414pub fn is_http_config_var(name: &str) -> bool {
2415    name == WEBHOOK_CONCURRENT_REQUEST_LIMIT.name()
2416}
2417
2418/// Set of [`SystemVar`]s that can also get set at a per-Session level.
2419///
2420/// TODO(parkmycar): Instead of a separate list, make this a field on VarDefinition.
2421static SESSION_SYSTEM_VARS: LazyLock<BTreeMap<&'static UncasedStr, &'static VarDefinition>> =
2422    LazyLock::new(|| {
2423        [
2424            &APPLICATION_NAME,
2425            &CLIENT_ENCODING,
2426            &CLIENT_MIN_MESSAGES,
2427            &CLUSTER,
2428            &CLUSTER_REPLICA,
2429            &DEFAULT_CLUSTER_REPLICATION_FACTOR,
2430            &CURRENT_OBJECT_MISSING_WARNINGS,
2431            &DATABASE,
2432            &DATE_STYLE,
2433            &EXTRA_FLOAT_DIGITS,
2434            &INTEGER_DATETIMES,
2435            &INTERVAL_STYLE,
2436            &REAL_TIME_RECENCY_TIMEOUT,
2437            &SEARCH_PATH,
2438            &STANDARD_CONFORMING_STRINGS,
2439            &STATEMENT_TIMEOUT,
2440            &IDLE_IN_TRANSACTION_SESSION_TIMEOUT,
2441            &TIMEZONE,
2442            &TRANSACTION_ISOLATION,
2443            &MAX_QUERY_RESULT_SIZE,
2444        ]
2445        .into_iter()
2446        .map(|var| (UncasedStr::new(var.name()), var))
2447        .collect()
2448    });
2449
2450// Provides a wrapper to express that a particular `ServerVar` is meant to be used as a feature
2451/// flag.
2452#[derive(Debug)]
2453pub struct FeatureFlag {
2454    pub flag: &'static VarDefinition,
2455    pub feature_desc: &'static str,
2456}
2457
2458impl FeatureFlag {
2459    /// Returns an error unless the feature flag is enabled in the provided
2460    /// `system_vars`.
2461    pub fn require(&'static self, system_vars: &SystemVars) -> Result<(), VarError> {
2462        match *system_vars.expect_value::<bool>(self.flag) {
2463            true => Ok(()),
2464            false => Err(VarError::RequiresFeatureFlag { feature_flag: self }),
2465        }
2466    }
2467}
2468
2469impl PartialEq for FeatureFlag {
2470    fn eq(&self, other: &FeatureFlag) -> bool {
2471        self.flag.name() == other.flag.name()
2472    }
2473}
2474
2475impl Eq for FeatureFlag {}
2476
2477impl Var for MzVersion {
2478    fn name(&self) -> &'static str {
2479        MZ_VERSION_NAME.as_str()
2480    }
2481
2482    fn value(&self) -> String {
2483        self.build_info
2484            .human_version(self.helm_chart_version.clone())
2485    }
2486
2487    fn description(&self) -> &'static str {
2488        "Shows the Materialize server version (Materialize)."
2489    }
2490
2491    fn type_name(&self) -> Cow<'static, str> {
2492        String::type_name()
2493    }
2494
2495    fn visible(&self, _: &User, _: &SystemVars) -> Result<(), VarError> {
2496        Ok(())
2497    }
2498}
2499
2500impl Var for User {
2501    fn name(&self) -> &'static str {
2502        IS_SUPERUSER_NAME.as_str()
2503    }
2504
2505    fn value(&self) -> String {
2506        self.is_superuser().format()
2507    }
2508
2509    fn description(&self) -> &'static str {
2510        "Reports whether the current session is a superuser (PostgreSQL)."
2511    }
2512
2513    fn type_name(&self) -> Cow<'static, str> {
2514        bool::type_name()
2515    }
2516
2517    fn visible(&self, _: &User, _: &SystemVars) -> Result<(), VarError> {
2518        Ok(())
2519    }
2520}
2521
2522#[cfg(test)]
2523mod isolation_feature_flag_tests {
2524    use super::*;
2525
2526    #[mz_ore::test]
2527    fn gates_bounded_staleness_value() {
2528        let mut system_vars = SystemVars::new();
2529
2530        // Default-on: the value passes the gate.
2531        check_transaction_isolation_feature_flag(
2532            TRANSACTION_ISOLATION_VAR_NAME,
2533            VarInput::Flat("bounded staleness 5s"),
2534            &system_vars,
2535        )
2536        .expect("flag on by default");
2537
2538        // Turn the flag off: the value is rejected regardless of the letter case
2539        // of the variable name. This covers `SET`, `SET "TRANSACTION_ISOLATION"`,
2540        // `ALTER ROLE ... SET`, and connection options, which all route through
2541        // `SessionVars::set` and this shared check.
2542        system_vars
2543            .set("enable_bounded_staleness_isolation", VarInput::Flat("off"))
2544            .expect("set flag");
2545        for name in ["transaction_isolation", "TRANSACTION_ISOLATION"] {
2546            let err = check_transaction_isolation_feature_flag(
2547                name,
2548                VarInput::Flat("bounded staleness 5s"),
2549                &system_vars,
2550            )
2551            .expect_err("flag off rejects bounded staleness");
2552            assert!(matches!(err, VarError::RequiresFeatureFlag { .. }));
2553        }
2554
2555        // Non-gated levels are unaffected.
2556        check_transaction_isolation_feature_flag(
2557            TRANSACTION_ISOLATION_VAR_NAME,
2558            VarInput::Flat("serializable"),
2559            &system_vars,
2560        )
2561        .expect("serializable always allowed");
2562
2563        // Unrelated variables are ignored, even with a gated-looking value.
2564        check_transaction_isolation_feature_flag(
2565            CLUSTER.name(),
2566            VarInput::Flat("bounded staleness 5s"),
2567            &system_vars,
2568        )
2569        .expect("unrelated var ignored");
2570    }
2571}