Skip to main content

mz_sql/session/
vars.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Run-time configuration parameters
11//!
12//! ## Overview
13//! Materialize roughly follows the PostgreSQL configuration model, which works
14//! as follows. There is a global set of named configuration parameters, like
15//! `DateStyle` and `client_encoding`. These parameters can be set in several
16//! places: in an on-disk configuration file (in Postgres, named
17//! postgresql.conf), in command line arguments when the server is started, or
18//! at runtime via the `ALTER SYSTEM` or `SET` statements. Parameters that are
19//! set in a session take precedence over database defaults, which in turn take
20//! precedence over command line arguments, which in turn take precedence over
21//! settings in the on-disk configuration. Note that changing the value of
22//! parameters obeys transaction semantics: if a transaction fails to commit,
23//! any parameters that were changed in that transaction (i.e., via `SET`) will
24//! be rolled back to their previous value.
25//!
26//! The Materialize configuration hierarchy at the moment is much simpler.
27//! Global defaults are hardcoded into the binary, and a select few parameters
28//! can be overridden per session. A select few parameters can be overridden on
29//! disk.
30//!
31//! The set of variables that can be overridden per session and the set of
32//! variables that can be overridden on disk are currently disjoint. The
33//! infrastructure has been designed with an eye towards merging these two sets
34//! and supporting additional layers to the hierarchy, however, should the need
35//! arise.
36//!
37//! The configuration parameters that exist are driven by compatibility with
38//! PostgreSQL drivers that expect them, not because they are particularly
39//! important.
40//!
41//! ## Structure
42//! The most meaningful exports from this module are:
43//!
44//! - [`SessionVars`] represent per-session parameters, which each user can
45//!   access independently of one another, and are accessed via `SET`.
46//!
47//!   The fields of [`SessionVars`] are either;
48//!     - `SessionVar`, which is preferable and simply requires full support of
49//!       the `SessionVar` impl for its embedded value type.
50//!     - `ServerVar` for types that do not currently support everything
51//!       required by `SessionVar`, e.g. they are fixed-value parameters.
52//!
53//!   In the fullness of time, all fields in [`SessionVars`] should be
54//!   `SessionVar`.
55//!
56//! - [`SystemVars`] represent system-wide configuration settings and are
57//!   accessed via `ALTER SYSTEM SET`.
58//!
59//!   All elements of [`SystemVars`] are `SystemVar`.
60//!
61//! Some [`VarDefinition`] are also marked as a [`FeatureFlag`]; this is just a
62//! wrapper to make working with a set of [`VarDefinition`] easier, primarily from
63//! within SQL planning, where we might want to check if a feature is enabled
64//! before planning it.
65
66use std::borrow::Cow;
67use std::clone::Clone;
68use std::collections::BTreeMap;
69use std::fmt::Debug;
70use std::net::IpAddr;
71use std::num::NonZeroU32;
72use std::string::ToString;
73use std::sync::{Arc, LazyLock};
74use std::time::Duration;
75
76use chrono::{DateTime, Utc};
77use derivative::Derivative;
78use imbl::OrdMap;
79use mz_adapter_types::dyncfgs::{OIDC_GROUP_CLAIM, OIDC_GROUP_ROLE_SYNC_STRICT};
80use mz_build_info::BuildInfo;
81use mz_dyncfg::{ConfigSet, ConfigType, ConfigUpdates, ConfigVal};
82use mz_persist_client::cfg::{
83    CRDB_CONNECT_TIMEOUT, CRDB_KEEPALIVES_IDLE, CRDB_KEEPALIVES_INTERVAL, CRDB_KEEPALIVES_RETRIES,
84    CRDB_TCP_USER_TIMEOUT,
85};
86use mz_repr::adt::numeric::Numeric;
87use mz_repr::adt::timestamp::CheckedTimestamp;
88use mz_repr::bytes::ByteSize;
89use mz_repr::user::{ExternalUserMetadata, InternalUserMetadata};
90use mz_tracing::{CloneableEnvFilter, SerializableDirective};
91use serde::Serialize;
92use thiserror::Error;
93use uncased::UncasedStr;
94
95use crate::ast::Ident;
96use crate::session::user::User;
97
98pub(crate) mod constraints;
99pub(crate) mod definitions;
100pub(crate) mod errors;
101pub(crate) mod polyfill;
102pub(crate) mod value;
103
104pub use definitions::*;
105pub use errors::*;
106pub use value::*;
107
108/// The action to take during end_transaction.
109///
110/// This enum lives here because of convenience: it's more of an adapter
111/// concept but [`SessionVars::end_transaction`] takes it.
112#[derive(Debug, Clone, Copy, PartialEq, Eq)]
113pub enum EndTransactionAction {
114    /// Commit the transaction.
115    Commit,
116    /// Rollback the transaction.
117    Rollback,
118}
119
120/// Represents the input to a variable.
121///
122/// Each variable has different rules for how it handles each style of input.
123/// This type allows us to defer interpretation of the input until the
124/// variable-specific interpretation can be applied.
125#[derive(Debug, Clone, Copy)]
126pub enum VarInput<'a> {
127    /// The input has been flattened into a single string.
128    Flat(&'a str),
129    /// The input comes from a SQL `SET` statement and is jumbled across
130    /// multiple components.
131    SqlSet(&'a [String]),
132}
133
134impl<'a> VarInput<'a> {
135    /// Converts the variable input to an owned vector of strings.
136    pub fn to_vec(&self) -> Vec<String> {
137        match self {
138            VarInput::Flat(v) => vec![v.to_string()],
139            VarInput::SqlSet(values) => values.into_iter().map(|v| v.to_string()).collect(),
140        }
141    }
142}
143
144/// An owned version of [`VarInput`].
145#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize)]
146pub enum OwnedVarInput {
147    /// See [`VarInput::Flat`].
148    Flat(String),
149    /// See [`VarInput::SqlSet`].
150    SqlSet(Vec<String>),
151}
152
153impl OwnedVarInput {
154    /// Converts this owned variable input as a [`VarInput`].
155    pub fn borrow(&self) -> VarInput<'_> {
156        match self {
157            OwnedVarInput::Flat(v) => VarInput::Flat(v),
158            OwnedVarInput::SqlSet(v) => VarInput::SqlSet(v),
159        }
160    }
161}
162
163/// A `Var` represents a configuration parameter of an arbitrary type.
164pub trait Var: Debug {
165    /// Returns the name of the configuration parameter.
166    fn name(&self) -> &'static str;
167
168    /// Constructs a flattened string representation of the current value of the
169    /// configuration parameter.
170    ///
171    /// The resulting string is guaranteed to be parsable if provided to
172    /// `Value::parse` as a [`VarInput::Flat`].
173    fn value(&self) -> String;
174
175    /// Returns a short sentence describing the purpose of the configuration
176    /// parameter.
177    fn description(&self) -> &'static str;
178
179    /// Returns the name of the type of this variable.
180    fn type_name(&self) -> Cow<'static, str>;
181
182    /// Indicates wither the [`Var`] is visible as a function of the `user` and `system_vars`.
183    /// "Invisible" parameters return `VarErrors`.
184    ///
185    /// Variables marked as `internal` are only visible for the system user.
186    fn visible(&self, user: &User, system_vars: &SystemVars) -> Result<(), VarError>;
187
188    /// Reports whether the variable is only visible in unsafe mode.
189    fn is_unsafe(&self) -> bool {
190        self.name().starts_with("unsafe_")
191    }
192
193    /// Upcast `self` to a `dyn Var`, useful when working with multiple different implementors of
194    /// [`Var`].
195    fn as_var(&self) -> &dyn Var
196    where
197        Self: Sized,
198    {
199        self
200    }
201}
202
203/// A `SessionVar` is the session value for a configuration parameter. If unset,
204/// the server default is used instead.
205///
206/// Note: even though all of the different `*_value` fields are `Box<dyn Value>` they are enforced
207/// to be the same type because we use the `definition`s `parse(...)` method. This is guaranteed to
208/// return the same type as the compiled in default.
209#[derive(Debug)]
210pub struct SessionVar {
211    definition: VarDefinition,
212    /// System or Role default value.
213    default_value: Option<Box<dyn Value>>,
214    /// Value `LOCAL` to a transaction, will be unset at the completion of the transaction.
215    local_value: Option<Box<dyn Value>>,
216    /// Value set during a transaction, will be set if the transaction is committed.
217    staged_value: Option<Box<dyn Value>>,
218    /// Value that overrides the default.
219    session_value: Option<Box<dyn Value>>,
220}
221
222impl Clone for SessionVar {
223    fn clone(&self) -> Self {
224        SessionVar {
225            definition: self.definition.clone(),
226            default_value: self.default_value.as_ref().map(|v| v.box_clone()),
227            local_value: self.local_value.as_ref().map(|v| v.box_clone()),
228            staged_value: self.staged_value.as_ref().map(|v| v.box_clone()),
229            session_value: self.session_value.as_ref().map(|v| v.box_clone()),
230        }
231    }
232}
233
234impl SessionVar {
235    pub const fn new(var: VarDefinition) -> Self {
236        SessionVar {
237            definition: var,
238            default_value: None,
239            local_value: None,
240            staged_value: None,
241            session_value: None,
242        }
243    }
244
245    /// Checks if the provided [`VarInput`] is valid for the current session variable, returning
246    /// the formatted output if it's valid.
247    pub fn check(&self, input: VarInput) -> Result<String, VarError> {
248        let v = self.definition.parse(input)?;
249        self.validate_constraints(v.as_ref())?;
250
251        Ok(v.format())
252    }
253
254    /// Parse the input and update the stored value to match.
255    pub fn set(&mut self, input: VarInput, local: bool) -> Result<(), VarError> {
256        let v = self.definition.parse(input)?;
257
258        // Validate our parsed value.
259        self.validate_constraints(v.as_ref())?;
260
261        if local {
262            self.local_value = Some(v);
263        } else {
264            self.local_value = None;
265            self.staged_value = Some(v);
266        }
267        Ok(())
268    }
269
270    /// Sets the default value for the variable.
271    pub fn set_default(&mut self, input: VarInput) -> Result<(), VarError> {
272        let v = self.definition.parse(input)?;
273        self.validate_constraints(v.as_ref())?;
274        self.default_value = Some(v);
275        Ok(())
276    }
277
278    /// Reset the stored value to the default.
279    pub fn reset(&mut self, local: bool) {
280        let value = self
281            .default_value
282            .as_ref()
283            .map(|v| v.as_ref())
284            .unwrap_or_else(|| self.definition.value.value());
285        if local {
286            self.local_value = Some(value.box_clone());
287        } else {
288            self.local_value = None;
289            self.staged_value = Some(value.box_clone());
290        }
291    }
292
293    /// Returns a possibly new SessionVar if this needs to mutate at transaction end.
294    #[must_use]
295    pub fn end_transaction(&self, action: EndTransactionAction) -> Option<Self> {
296        if !self.is_mutating() {
297            return None;
298        }
299        let mut next: Self = self.clone();
300        next.local_value = None;
301        match action {
302            EndTransactionAction::Commit if next.staged_value.is_some() => {
303                next.session_value = next.staged_value.take()
304            }
305            _ => next.staged_value = None,
306        }
307        Some(next)
308    }
309
310    /// Whether this Var needs to mutate at the end of a transaction.
311    pub fn is_mutating(&self) -> bool {
312        self.local_value.is_some() || self.staged_value.is_some()
313    }
314
315    pub fn value_dyn(&self) -> &dyn Value {
316        self.local_value
317            .as_deref()
318            .or(self.staged_value.as_deref())
319            .or(self.session_value.as_deref())
320            .or(self.default_value.as_deref())
321            .unwrap_or_else(|| self.definition.value.value())
322    }
323
324    /// Returns the [`Value`] that is currently stored as the `session_value`.
325    ///
326    /// Note: This should __only__ be used for inspection, if you want to determine the current
327    /// value of this [`SessionVar`] you should use [`SessionVar::value`].
328    pub fn inspect_session_value(&self) -> Option<&dyn Value> {
329        self.session_value.as_deref()
330    }
331
332    fn validate_constraints(&self, val: &dyn Value) -> Result<(), VarError> {
333        if let Some(constraint) = &self.definition.constraint {
334            constraint.check_constraint(self, self.value_dyn(), val)
335        } else {
336            Ok(())
337        }
338    }
339}
340
341impl Var for SessionVar {
342    fn name(&self) -> &'static str {
343        self.definition.name.as_str()
344    }
345
346    fn value(&self) -> String {
347        self.value_dyn().format()
348    }
349
350    fn description(&self) -> &'static str {
351        self.definition.description
352    }
353
354    fn type_name(&self) -> Cow<'static, str> {
355        self.definition.type_name()
356    }
357
358    fn visible(
359        &self,
360        user: &User,
361        system_vars: &super::vars::SystemVars,
362    ) -> Result<(), super::vars::VarError> {
363        self.definition.visible(user, system_vars)
364    }
365}
366
367#[derive(Debug, Clone, PartialEq, Eq)]
368pub struct MzVersion {
369    /// Inputs to computed variables.
370    build_info: &'static BuildInfo,
371    /// Helm chart version
372    helm_chart_version: Option<String>,
373}
374
375impl MzVersion {
376    pub fn new(build_info: &'static BuildInfo, helm_chart_version: Option<String>) -> Self {
377        MzVersion {
378            build_info,
379            helm_chart_version,
380        }
381    }
382}
383
384/// Session variables.
385///
386/// See the [`crate::session::vars`] module documentation for more details on the
387/// Materialize configuration model.
388#[derive(Debug, Clone)]
389pub struct SessionVars {
390    /// The set of all session variables.
391    vars: OrdMap<&'static UncasedStr, SessionVar>,
392    /// Inputs to computed variables.
393    mz_version: MzVersion,
394    /// Information about the user associated with this Session.
395    user: User,
396}
397
398impl SessionVars {
399    /// Creates a new [`SessionVars`] without considering the System or Role defaults.
400    pub fn new_unchecked(
401        build_info: &'static BuildInfo,
402        user: User,
403        helm_chart_version: Option<String>,
404    ) -> SessionVars {
405        use definitions::*;
406
407        let vars = [
408            &FAILPOINTS,
409            &SERVER_VERSION,
410            &SERVER_VERSION_NUM,
411            &SQL_SAFE_UPDATES,
412            &REAL_TIME_RECENCY,
413            &EMIT_PLAN_INSIGHTS_NOTICE,
414            &EMIT_TIMESTAMP_NOTICE,
415            &EMIT_TRACE_ID_NOTICE,
416            &AUTO_ROUTE_CATALOG_QUERIES,
417            &ENABLE_SESSION_RBAC_CHECKS,
418            &RESTRICT_TO_USER_OBJECTS,
419            &ENABLE_SESSION_CARDINALITY_ESTIMATES,
420            &MAX_IDENTIFIER_LENGTH,
421            &STATEMENT_LOGGING_SAMPLE_RATE,
422            &EMIT_INTROSPECTION_QUERY_NOTICE,
423            &UNSAFE_NEW_TRANSACTION_WALL_TIME,
424            &WELCOME_MESSAGE,
425        ]
426        .into_iter()
427        .chain(SESSION_SYSTEM_VARS.iter().map(|(_name, var)| *var))
428        .map(|var| (var.name, SessionVar::new(var.clone())))
429        .collect();
430
431        SessionVars {
432            vars,
433            mz_version: MzVersion::new(build_info, helm_chart_version),
434            user,
435        }
436    }
437
438    fn expect_value<V: Value>(&self, var: &VarDefinition) -> &V {
439        let var = self
440            .vars
441            .get(var.name)
442            .expect("provided var should be in state");
443        let val = var.value_dyn();
444        val.as_any().downcast_ref::<V>().expect("success")
445    }
446
447    /// Returns an iterator over the configuration parameters and their current
448    /// values for this session.
449    ///
450    /// Note that this function does not check that the access variable should
451    /// be visible because of other settings or users. Before or after accessing
452    /// this method, you should call `Var::visible`.
453    pub fn iter(&self) -> impl Iterator<Item = &dyn Var> {
454        #[allow(clippy::as_conversions)]
455        self.vars
456            .values()
457            .map(|v| v.as_var())
458            .chain([&self.mz_version as &dyn Var, &self.user])
459    }
460
461    /// Returns an iterator over configuration parameters (and their current
462    /// values for this session) that are expected to be sent to the client when
463    /// a new connection is established or when their value changes.
464    pub fn notify_set(&self) -> impl Iterator<Item = &dyn Var> {
465        // WARNING: variables in this set are not checked for visibility, and
466        // are assumed to be visible for all sessions.
467        //
468        // This is fixible with some elbow grease, but at the moment it seems
469        // unlikely that we'll have a variable in the notify set that shouldn't
470        // be visible to all sessions.
471        [
472            &APPLICATION_NAME,
473            &CLIENT_ENCODING,
474            &DATE_STYLE,
475            &INTEGER_DATETIMES,
476            &SERVER_VERSION,
477            &STANDARD_CONFORMING_STRINGS,
478            &TIMEZONE,
479            &INTERVAL_STYLE,
480            // Including `cluster`, `cluster_replica`, `database`, and `search_path` in the notify
481            // set is a Materialize extension. Doing so allows users to more easily identify where
482            // their queries will be executing, which is important to know when you consider the
483            // size of a cluster, what indexes are present, etc.
484            &CLUSTER,
485            &CLUSTER_REPLICA,
486            &DEFAULT_CLUSTER_REPLICATION_FACTOR,
487            &DATABASE,
488            &SEARCH_PATH,
489        ]
490        .into_iter()
491        .map(|v| self.vars[v.name].as_var())
492        // Including `mz_version` in the notify set is a Materialize
493        // extension. Doing so allows applications to detect whether they
494        // are talking to Materialize or PostgreSQL without an additional
495        // network roundtrip. This is known to be safe because CockroachDB
496        // has an analogous extension [0].
497        // [0]: https://github.com/cockroachdb/cockroach/blob/369c4057a/pkg/sql/pgwire/conn.go#L1840
498        .chain(std::iter::once(self.mz_version.as_var()))
499    }
500
501    /// Resets all variables to their default value.
502    pub fn reset_all(&mut self) {
503        let names: Vec<_> = self.vars.keys().copied().collect();
504        for name in names {
505            self.vars[name].reset(false);
506        }
507    }
508
509    /// Returns a [`Var`] representing the configuration parameter with the
510    /// specified name.
511    ///
512    /// Configuration parameters are matched case insensitively. If no such
513    /// configuration parameter exists, `get` returns an error.
514    ///
515    /// Note that if `name` is known at compile time, you should instead use the
516    /// named accessor to access the variable with its true Rust type. For
517    /// example, `self.get("sql_safe_updates").value()` returns the string
518    /// `"true"` or `"false"`, while `self.sql_safe_updates()` returns a bool.
519    pub fn get(&self, system_vars: &SystemVars, name: &str) -> Result<&dyn Var, VarError> {
520        let name = compat_translate_name(name);
521
522        let name = UncasedStr::new(name);
523        if name == MZ_VERSION_NAME {
524            Ok(&self.mz_version)
525        } else if name == IS_SUPERUSER_NAME {
526            Ok(&self.user)
527        } else {
528            self.vars
529                .get(name)
530                .map(|v| {
531                    v.visible(&self.user, system_vars)?;
532                    Ok(v.as_var())
533                })
534                .transpose()?
535                .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
536        }
537    }
538
539    /// Returns a [`SessionVar`] for inspection.
540    ///
541    /// Note: If you're trying to determine the value of the variable with `name` you should
542    /// instead use the named accessor, or [`SessionVars::get`].
543    pub fn inspect(&self, name: &str) -> Result<&SessionVar, VarError> {
544        let name = compat_translate_name(name);
545
546        self.vars
547            .get(UncasedStr::new(name))
548            .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
549    }
550
551    /// Sets the configuration parameter named `name` to the value represented
552    /// by `value`.
553    ///
554    /// The new value may be either committed or rolled back by the next call to
555    /// [`SessionVars::end_transaction`]. If `local` is true, the new value is always
556    /// discarded by the next call to [`SessionVars::end_transaction`], even if the
557    /// transaction is marked to commit.
558    ///
559    /// Like with [`SessionVars::get`], configuration parameters are matched case
560    /// insensitively. If `value` is not valid, as determined by the underlying
561    /// configuration parameter, or if the named configuration parameter does
562    /// not exist, an error is returned.
563    pub fn set(
564        &mut self,
565        system_vars: &SystemVars,
566        name: &str,
567        input: VarInput,
568        local: bool,
569    ) -> Result<(), VarError> {
570        let (name, input) = compat_translate(name, input);
571
572        let name = UncasedStr::new(name);
573        self.check_read_only(name)?;
574
575        self.vars
576            .get_mut(name)
577            .map(|v| {
578                v.visible(&self.user, system_vars)?;
579                v.set(input, local)
580            })
581            .transpose()?
582            .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
583    }
584
585    /// Sets the default value for the parameter named `name` to the value
586    /// represented by `value`.
587    pub fn set_default(&mut self, name: &str, input: VarInput) -> Result<(), VarError> {
588        let (name, input) = compat_translate(name, input);
589
590        let name = UncasedStr::new(name);
591
592        // Check if this variable is allowed to be set as a role default.
593        // Most read-only variables are blocked, but some (like restrict_to_user_objects)
594        // are specifically designed to be set via ALTER ROLE by superusers.
595        if !Self::allow_role_default(name) {
596            self.check_read_only(name)?;
597        }
598
599        self.vars
600            .get_mut(name)
601            // Note: visibility is checked when persisting a role default.
602            .map(|v| v.set_default(input))
603            .transpose()?
604            .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
605    }
606
607    /// Returns true if the variable can be set as a role default even if it's
608    /// otherwise read-only from direct SET commands.
609    ///
610    /// SECURITY: Any variable listed here must also have a corresponding
611    /// superuser RBAC check in `generate_rbac_requirements` in `rbac.rs`
612    /// (see the `PlannedAlterRoleOption::Variable` match arm). Without that
613    /// check, any role could set the variable on themselves via ALTER ROLE.
614    fn allow_role_default(name: &UncasedStr) -> bool {
615        name == RESTRICT_TO_USER_OBJECTS.name
616    }
617
618    /// Sets the configuration parameter named `name` to its default value.
619    ///
620    /// The new value may be either committed or rolled back by the next call to
621    /// [`SessionVars::end_transaction`]. If `local` is true, the new value is
622    /// always discarded by the next call to [`SessionVars::end_transaction`],
623    /// even if the transaction is marked to commit.
624    ///
625    /// Like with [`SessionVars::get`], configuration parameters are matched
626    /// case insensitively. If the named configuration parameter does not exist,
627    /// an error is returned.
628    ///
629    /// If the variable does not exist or the user does not have the visibility
630    /// requires, this function returns an error.
631    pub fn reset(
632        &mut self,
633        system_vars: &SystemVars,
634        name: &str,
635        local: bool,
636    ) -> Result<(), VarError> {
637        let name = compat_translate_name(name);
638
639        let name = UncasedStr::new(name);
640        self.check_read_only(name)?;
641
642        self.vars
643            .get_mut(name)
644            .map(|v| {
645                v.visible(&self.user, system_vars)?;
646                v.reset(local);
647                Ok(())
648            })
649            .transpose()?
650            .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
651    }
652
653    /// Returns an error if the variable corresponding to `name` is read only.
654    ///
655    /// Note: This is called by `set()` (for SQL SET commands) but NOT by
656    /// `set_default()` (for role defaults). This allows variables like
657    /// `restrict_to_user_objects` to be set via `ALTER ROLE ... SET` by
658    /// superusers while blocking direct `SET` commands from regular users.
659    fn check_read_only(&self, name: &UncasedStr) -> Result<(), VarError> {
660        if name == MZ_VERSION_NAME {
661            Err(VarError::ReadOnlyParameter(MZ_VERSION_NAME.as_str()))
662        } else if name == IS_SUPERUSER_NAME {
663            Err(VarError::ReadOnlyParameter(IS_SUPERUSER_NAME.as_str()))
664        } else if name == MAX_IDENTIFIER_LENGTH.name {
665            Err(VarError::ReadOnlyParameter(
666                MAX_IDENTIFIER_LENGTH.name.as_str(),
667            ))
668        } else if name == RESTRICT_TO_USER_OBJECTS.name {
669            // This variable can only be set via ALTER ROLE ... SET by superusers,
670            // not via direct SET commands. This prevents malicious queries from
671            // bypassing the restriction.
672            Err(VarError::ReadOnlyParameter(
673                RESTRICT_TO_USER_OBJECTS.name.as_str(),
674            ))
675        } else {
676            Ok(())
677        }
678    }
679
680    /// Commits or rolls back configuration parameter updates made via
681    /// [`SessionVars::set`] since the last call to `end_transaction`.
682    ///
683    /// Returns any session parameters that changed because the transaction ended.
684    #[mz_ore::instrument(level = "debug")]
685    pub fn end_transaction(
686        &mut self,
687        action: EndTransactionAction,
688    ) -> BTreeMap<&'static str, String> {
689        let mut changed = BTreeMap::new();
690        let mut updates = Vec::new();
691        for (name, var) in self.vars.iter() {
692            if !var.is_mutating() {
693                continue;
694            }
695            let before = var.value();
696            let next = var.end_transaction(action).expect("must mutate");
697            let after = next.value();
698            updates.push((*name, next));
699
700            // Report the new value of the parameter.
701            if before != after {
702                changed.insert(var.name(), after);
703            }
704        }
705        self.vars.extend(updates);
706        changed
707    }
708
709    /// Returns the value of the `application_name` configuration parameter.
710    pub fn application_name(&self) -> &str {
711        self.expect_value::<String>(&APPLICATION_NAME).as_str()
712    }
713
714    /// Returns the build info.
715    pub fn build_info(&self) -> &'static BuildInfo {
716        self.mz_version.build_info
717    }
718
719    /// Returns the value of the `client_encoding` configuration parameter.
720    pub fn client_encoding(&self) -> &ClientEncoding {
721        self.expect_value(&CLIENT_ENCODING)
722    }
723
724    /// Returns the value of the `client_min_messages` configuration parameter.
725    pub fn client_min_messages(&self) -> &ClientSeverity {
726        self.expect_value(&CLIENT_MIN_MESSAGES)
727    }
728
729    /// Returns the value of the `cluster` configuration parameter.
730    pub fn cluster(&self) -> &str {
731        self.expect_value::<String>(&CLUSTER).as_str()
732    }
733
734    /// Returns the value of the `cluster_replica` configuration parameter.
735    pub fn cluster_replica(&self) -> Option<&str> {
736        self.expect_value::<Option<String>>(&CLUSTER_REPLICA)
737            .as_deref()
738    }
739
740    /// Returns the value of the `current_object_missing_warnings` configuration
741    /// parameter.
742    pub fn current_object_missing_warnings(&self) -> bool {
743        *self.expect_value::<bool>(&CURRENT_OBJECT_MISSING_WARNINGS)
744    }
745
746    /// Returns the value of the `DateStyle` configuration parameter.
747    pub fn date_style(&self) -> &[&str] {
748        &self.expect_value::<DateStyle>(&DATE_STYLE).0
749    }
750
751    /// Returns the value of the `database` configuration parameter.
752    pub fn database(&self) -> &str {
753        self.expect_value::<String>(&DATABASE).as_str()
754    }
755
756    /// Returns the value of the `extra_float_digits` configuration parameter.
757    pub fn extra_float_digits(&self) -> i32 {
758        *self.expect_value(&EXTRA_FLOAT_DIGITS)
759    }
760
761    /// Returns the value of the `integer_datetimes` configuration parameter.
762    pub fn integer_datetimes(&self) -> bool {
763        *self.expect_value(&INTEGER_DATETIMES)
764    }
765
766    /// Returns the value of the `intervalstyle` configuration parameter.
767    pub fn intervalstyle(&self) -> &IntervalStyle {
768        self.expect_value(&INTERVAL_STYLE)
769    }
770
771    /// Returns the value of the `mz_version` configuration parameter.
772    pub fn mz_version(&self) -> String {
773        self.mz_version.value()
774    }
775
776    /// Returns the value of the `search_path` configuration parameter.
777    pub fn search_path(&self) -> &[Ident] {
778        self.expect_value::<Vec<Ident>>(&SEARCH_PATH).as_slice()
779    }
780
781    /// Returns the value of the `server_version` configuration parameter.
782    pub fn server_version(&self) -> &str {
783        self.expect_value::<String>(&SERVER_VERSION).as_str()
784    }
785
786    /// Returns the value of the `server_version_num` configuration parameter.
787    pub fn server_version_num(&self) -> i32 {
788        *self.expect_value(&SERVER_VERSION_NUM)
789    }
790
791    /// Returns the value of the `sql_safe_updates` configuration parameter.
792    pub fn sql_safe_updates(&self) -> bool {
793        *self.expect_value(&SQL_SAFE_UPDATES)
794    }
795
796    /// Returns the value of the `standard_conforming_strings` configuration
797    /// parameter.
798    pub fn standard_conforming_strings(&self) -> bool {
799        *self.expect_value(&STANDARD_CONFORMING_STRINGS)
800    }
801
802    /// Returns the value of the `statement_timeout` configuration parameter.
803    pub fn statement_timeout(&self) -> &Duration {
804        self.expect_value(&STATEMENT_TIMEOUT)
805    }
806
807    /// Returns the value of the `idle_in_transaction_session_timeout` configuration parameter.
808    pub fn idle_in_transaction_session_timeout(&self) -> &Duration {
809        self.expect_value(&IDLE_IN_TRANSACTION_SESSION_TIMEOUT)
810    }
811
812    /// Returns the value of the `timezone` configuration parameter.
813    pub fn timezone(&self) -> &TimeZone {
814        self.expect_value(&TIMEZONE)
815    }
816
817    /// Returns the value of the `transaction_isolation` configuration
818    /// parameter.
819    pub fn transaction_isolation(&self) -> &IsolationLevel {
820        self.expect_value(&TRANSACTION_ISOLATION)
821    }
822
823    /// Returns the value of `real_time_recency` configuration parameter.
824    pub fn real_time_recency(&self) -> bool {
825        *self.expect_value(&REAL_TIME_RECENCY)
826    }
827
828    /// Returns the value of the `real_time_recency_timeout` configuration parameter.
829    pub fn real_time_recency_timeout(&self) -> &Duration {
830        self.expect_value(&REAL_TIME_RECENCY_TIMEOUT)
831    }
832
833    /// Returns the value of `emit_plan_insights_notice` configuration parameter.
834    pub fn emit_plan_insights_notice(&self) -> bool {
835        *self.expect_value(&EMIT_PLAN_INSIGHTS_NOTICE)
836    }
837
838    /// Returns the value of `emit_timestamp_notice` configuration parameter.
839    pub fn emit_timestamp_notice(&self) -> bool {
840        *self.expect_value(&EMIT_TIMESTAMP_NOTICE)
841    }
842
843    /// Returns the value of `emit_trace_id_notice` configuration parameter.
844    pub fn emit_trace_id_notice(&self) -> bool {
845        *self.expect_value(&EMIT_TRACE_ID_NOTICE)
846    }
847
848    /// Returns the value of `auto_route_catalog_queries` configuration parameter.
849    pub fn auto_route_catalog_queries(&self) -> bool {
850        *self.expect_value(&AUTO_ROUTE_CATALOG_QUERIES)
851    }
852
853    /// Returns the value of `enable_session_rbac_checks` configuration parameter.
854    pub fn enable_session_rbac_checks(&self) -> bool {
855        *self.expect_value(&ENABLE_SESSION_RBAC_CHECKS)
856    }
857
858    /// Returns the value of `restrict_to_user_objects` configuration parameter.
859    pub fn restrict_to_user_objects(&self) -> bool {
860        *self.expect_value(&RESTRICT_TO_USER_OBJECTS)
861    }
862
863    /// Returns the value of `enable_session_cardinality_estimates` configuration parameter.
864    pub fn enable_session_cardinality_estimates(&self) -> bool {
865        *self.expect_value(&ENABLE_SESSION_CARDINALITY_ESTIMATES)
866    }
867
868    /// Returns the value of `is_superuser` configuration parameter.
869    pub fn is_superuser(&self) -> bool {
870        self.user.is_superuser()
871    }
872
873    /// Returns the user associated with this `SessionVars` instance.
874    pub fn user(&self) -> &User {
875        &self.user
876    }
877
878    /// Returns the value of the `max_query_result_size` configuration parameter.
879    pub fn max_query_result_size(&self) -> u64 {
880        self.expect_value::<ByteSize>(&MAX_QUERY_RESULT_SIZE)
881            .as_bytes()
882    }
883
884    /// Sets the internal metadata associated with the user.
885    pub fn set_internal_user_metadata(&mut self, metadata: InternalUserMetadata) {
886        self.user.internal_metadata = Some(metadata);
887    }
888
889    /// Sets the external metadata associated with the user.
890    pub fn set_external_user_metadata(&mut self, metadata: ExternalUserMetadata) {
891        self.user.external_metadata = Some(metadata);
892    }
893
894    pub fn set_cluster(&mut self, cluster: String) {
895        let var = self
896            .vars
897            .get_mut(UncasedStr::new(CLUSTER.name()))
898            .expect("cluster variable must exist");
899        var.set(VarInput::Flat(&cluster), false)
900            .expect("setting cluster must succeed");
901    }
902
903    pub fn set_local_transaction_isolation(&mut self, transaction_isolation: IsolationLevel) {
904        let var = self
905            .vars
906            .get_mut(UncasedStr::new(TRANSACTION_ISOLATION.name()))
907            .expect("transaction_isolation variable must exist");
908        var.set(VarInput::Flat(transaction_isolation.as_str()), true)
909            .expect("setting transaction isolation must succeed");
910    }
911
912    pub fn get_statement_logging_sample_rate(&self) -> Numeric {
913        *self.expect_value(&STATEMENT_LOGGING_SAMPLE_RATE)
914    }
915
916    /// Returns the value of the `emit_introspection_query_notice` configuration parameter.
917    pub fn emit_introspection_query_notice(&self) -> bool {
918        *self.expect_value(&EMIT_INTROSPECTION_QUERY_NOTICE)
919    }
920
921    pub fn unsafe_new_transaction_wall_time(&self) -> Option<CheckedTimestamp<DateTime<Utc>>> {
922        *self.expect_value(&UNSAFE_NEW_TRANSACTION_WALL_TIME)
923    }
924
925    /// Returns the value of the `welcome_message` configuration parameter.
926    pub fn welcome_message(&self) -> bool {
927        *self.expect_value(&WELCOME_MESSAGE)
928    }
929}
930
931// TODO(database-issues#8069) remove together with `compat_translate`
932pub const OLD_CATALOG_SERVER_CLUSTER: &str = "mz_introspection";
933pub const OLD_AUTO_ROUTE_CATALOG_QUERIES: &str = "auto_route_introspection_queries";
934
935/// If the given variable name and/or input is deprecated, return a corresponding updated value,
936/// otherwise return the original.
937///
938/// This method was introduced to gracefully handle the rename of the `mz_introspection` cluster to
939/// `mz_cluster_server`. The plan is to remove it once all users have migrated to the new name. The
940/// debug logs will be helpful for checking this in production.
941// TODO(database-issues#8069) remove this after sufficient time has passed
942fn compat_translate<'a, 'b>(name: &'a str, input: VarInput<'b>) -> (&'a str, VarInput<'b>) {
943    if name == CLUSTER.name() {
944        if let Ok(value) = CLUSTER.parse(input) {
945            if value.format() == OLD_CATALOG_SERVER_CLUSTER {
946                tracing::debug!(
947                    github_27285 = true,
948                    "encountered deprecated `cluster` variable value: {}",
949                    OLD_CATALOG_SERVER_CLUSTER,
950                );
951                return (name, VarInput::Flat("mz_catalog_server"));
952            }
953        }
954    }
955
956    if name == OLD_AUTO_ROUTE_CATALOG_QUERIES {
957        tracing::debug!(
958            github_27285 = true,
959            "encountered deprecated `{}` variable name",
960            OLD_AUTO_ROUTE_CATALOG_QUERIES,
961        );
962        return (AUTO_ROUTE_CATALOG_QUERIES.name(), input);
963    }
964
965    (name, input)
966}
967
968fn compat_translate_name(name: &str) -> &str {
969    let (name, _) = compat_translate(name, VarInput::Flat(""));
970    name
971}
972
973/// A `SystemVar` is persisted on disk value for a configuration parameter. If unset,
974/// the server default is used instead.
975#[derive(Debug)]
976pub struct SystemVar {
977    definition: VarDefinition,
978    /// Value currently persisted to disk.
979    persisted_value: Option<Box<dyn Value>>,
980    /// Current default, not persisted to disk.
981    dynamic_default: Option<Box<dyn Value>>,
982}
983
984impl Clone for SystemVar {
985    fn clone(&self) -> Self {
986        SystemVar {
987            definition: self.definition.clone(),
988            persisted_value: self.persisted_value.as_ref().map(|v| v.box_clone()),
989            dynamic_default: self.dynamic_default.as_ref().map(|v| v.box_clone()),
990        }
991    }
992}
993
994impl SystemVar {
995    pub fn new(definition: VarDefinition) -> Self {
996        SystemVar {
997            definition,
998            persisted_value: None,
999            dynamic_default: None,
1000        }
1001    }
1002
1003    fn is_default(&self, input: VarInput) -> Result<bool, VarError> {
1004        let v = self.definition.parse(input)?;
1005        Ok(self.definition.default_value() == v.as_ref())
1006    }
1007
1008    pub fn value_dyn(&self) -> &dyn Value {
1009        self.persisted_value
1010            .as_deref()
1011            .or(self.dynamic_default.as_deref())
1012            .unwrap_or_else(|| self.definition.default_value())
1013    }
1014
1015    pub fn value<V: 'static>(&self) -> &V {
1016        let val = self.value_dyn();
1017        val.as_any().downcast_ref::<V>().expect("success")
1018    }
1019
1020    fn parse(&self, input: VarInput) -> Result<Box<dyn Value>, VarError> {
1021        let v = self.definition.parse(input)?;
1022        // Validate our parsed value.
1023        self.validate_constraints(v.as_ref())?;
1024        Ok(v)
1025    }
1026
1027    fn set(&mut self, input: VarInput) -> Result<bool, VarError> {
1028        let v = self.parse(input)?;
1029
1030        if self.persisted_value.as_ref() != Some(&v) {
1031            self.persisted_value = Some(v);
1032            Ok(true)
1033        } else {
1034            Ok(false)
1035        }
1036    }
1037
1038    fn reset(&mut self) -> bool {
1039        if self.persisted_value.is_some() {
1040            self.persisted_value = None;
1041            true
1042        } else {
1043            false
1044        }
1045    }
1046
1047    fn set_default(&mut self, input: VarInput) -> Result<(), VarError> {
1048        let v = self.definition.parse(input)?;
1049        self.dynamic_default = Some(v);
1050        Ok(())
1051    }
1052
1053    fn validate_constraints(&self, val: &dyn Value) -> Result<(), VarError> {
1054        if let Some(constraint) = &self.definition.constraint {
1055            constraint.check_constraint(self, self.value_dyn(), val)
1056        } else {
1057            Ok(())
1058        }
1059    }
1060}
1061
1062impl Var for SystemVar {
1063    fn name(&self) -> &'static str {
1064        self.definition.name.as_str()
1065    }
1066
1067    fn value(&self) -> String {
1068        self.value_dyn().format()
1069    }
1070
1071    fn description(&self) -> &'static str {
1072        self.definition.description
1073    }
1074
1075    fn type_name(&self) -> Cow<'static, str> {
1076        self.definition.type_name()
1077    }
1078
1079    fn visible(&self, user: &User, system_vars: &SystemVars) -> Result<(), VarError> {
1080        self.definition.visible(user, system_vars)
1081    }
1082}
1083
1084#[derive(Debug, Error)]
1085pub enum NetworkPolicyError {
1086    #[error("Access denied for address {0}")]
1087    AddressDenied(IpAddr),
1088}
1089
1090/// On disk variables.
1091///
1092/// See the [`crate::session::vars`] module documentation for more details on the
1093/// Materialize configuration model.
1094#[derive(Derivative, Clone)]
1095#[derivative(Debug)]
1096pub struct SystemVars {
1097    /// Allows "unsafe" parameters to be set.
1098    allow_unsafe: bool,
1099    /// Set of all [`SystemVar`]s.
1100    vars: BTreeMap<&'static UncasedStr, SystemVar>,
1101    /// External components interested in when a [`SystemVar`] gets updated.
1102    #[derivative(Debug = "ignore")]
1103    callbacks: BTreeMap<String, Vec<Arc<dyn Fn(&SystemVars) + Send + Sync>>>,
1104
1105    /// NB: This is intentionally disconnected from the one that is plumbed around to persist and
1106    /// the controllers. This is so we can explicitly control and reason about when changes to config
1107    /// values are propagated to the rest of the system.
1108    dyncfgs: ConfigSet,
1109}
1110
1111impl Default for SystemVars {
1112    fn default() -> Self {
1113        Self::new()
1114    }
1115}
1116
1117impl SystemVars {
1118    pub fn new() -> Self {
1119        let system_vars = vec![
1120            &MAX_KAFKA_CONNECTIONS,
1121            &MAX_POSTGRES_CONNECTIONS,
1122            &MAX_MYSQL_CONNECTIONS,
1123            &MAX_SQL_SERVER_CONNECTIONS,
1124            &MAX_AWS_PRIVATELINK_CONNECTIONS,
1125            &MAX_TABLES,
1126            &MAX_SOURCES,
1127            &MAX_SINKS,
1128            &MAX_MATERIALIZED_VIEWS,
1129            &MAX_CLUSTERS,
1130            &MAX_REPLICAS_PER_CLUSTER,
1131            &MAX_CREDIT_CONSUMPTION_RATE,
1132            &MAX_DATABASES,
1133            &MAX_SCHEMAS_PER_DATABASE,
1134            &MAX_OBJECTS_PER_SCHEMA,
1135            &MAX_SECRETS,
1136            &MAX_ROLES,
1137            &MAX_NETWORK_POLICIES,
1138            &MAX_RULES_PER_NETWORK_POLICY,
1139            &MAX_RESULT_SIZE,
1140            &MAX_COPY_FROM_ROW_SIZE,
1141            &ALLOWED_CLUSTER_REPLICA_SIZES,
1142            &upsert_rocksdb::UPSERT_ROCKSDB_COMPACTION_STYLE,
1143            &upsert_rocksdb::UPSERT_ROCKSDB_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET,
1144            &upsert_rocksdb::UPSERT_ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES,
1145            &upsert_rocksdb::UPSERT_ROCKSDB_UNIVERSAL_COMPACTION_RATIO,
1146            &upsert_rocksdb::UPSERT_ROCKSDB_PARALLELISM,
1147            &upsert_rocksdb::UPSERT_ROCKSDB_COMPRESSION_TYPE,
1148            &upsert_rocksdb::UPSERT_ROCKSDB_BOTTOMMOST_COMPRESSION_TYPE,
1149            &upsert_rocksdb::UPSERT_ROCKSDB_BATCH_SIZE,
1150            &upsert_rocksdb::UPSERT_ROCKSDB_RETRY_DURATION,
1151            &upsert_rocksdb::UPSERT_ROCKSDB_STATS_LOG_INTERVAL_SECONDS,
1152            &upsert_rocksdb::UPSERT_ROCKSDB_STATS_PERSIST_INTERVAL_SECONDS,
1153            &upsert_rocksdb::UPSERT_ROCKSDB_POINT_LOOKUP_BLOCK_CACHE_SIZE_MB,
1154            &upsert_rocksdb::UPSERT_ROCKSDB_SHRINK_ALLOCATED_BUFFERS_BY_RATIO,
1155            &upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_CLUSTER_MEMORY_FRACTION,
1156            &upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_MEMORY_BYTES,
1157            &upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_ALLOW_STALL,
1158            &STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES,
1159            &STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_TO_CLUSTER_SIZE_FRACTION,
1160            &STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_DISK_ONLY,
1161            &STORAGE_STATISTICS_INTERVAL,
1162            &STORAGE_STATISTICS_COLLECTION_INTERVAL,
1163            &STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO,
1164            &STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS,
1165            &PERSIST_FAST_PATH_LIMIT,
1166            &METRICS_RETENTION,
1167            &UNSAFE_MOCK_AUDIT_EVENT_TIMESTAMP,
1168            &ENABLE_RBAC_CHECKS,
1169            &PG_SOURCE_CONNECT_TIMEOUT,
1170            &PG_SOURCE_TCP_KEEPALIVES_IDLE,
1171            &PG_SOURCE_TCP_KEEPALIVES_INTERVAL,
1172            &PG_SOURCE_TCP_KEEPALIVES_RETRIES,
1173            &PG_SOURCE_TCP_USER_TIMEOUT,
1174            &PG_SOURCE_TCP_CONFIGURE_SERVER,
1175            &PG_SOURCE_SNAPSHOT_STATEMENT_TIMEOUT,
1176            &PG_SOURCE_WAL_SENDER_TIMEOUT,
1177            &PG_SOURCE_SNAPSHOT_COLLECT_STRICT_COUNT,
1178            &MYSQL_SOURCE_TCP_KEEPALIVE,
1179            &MYSQL_SOURCE_SNAPSHOT_MAX_EXECUTION_TIME,
1180            &MYSQL_SOURCE_SNAPSHOT_LOCK_WAIT_TIMEOUT,
1181            &MYSQL_SOURCE_CONNECT_TIMEOUT,
1182            &SSH_CHECK_INTERVAL,
1183            &SSH_CONNECT_TIMEOUT,
1184            &SSH_KEEPALIVES_IDLE,
1185            &KAFKA_SOCKET_KEEPALIVE,
1186            &KAFKA_SOCKET_TIMEOUT,
1187            &KAFKA_TRANSACTION_TIMEOUT,
1188            &KAFKA_SOCKET_CONNECTION_SETUP_TIMEOUT,
1189            &KAFKA_FETCH_METADATA_TIMEOUT,
1190            &KAFKA_PROGRESS_RECORD_FETCH_TIMEOUT,
1191            &ENABLE_LAUNCHDARKLY,
1192            &MAX_CONNECTIONS,
1193            &NETWORK_POLICY,
1194            &SUPERUSER_RESERVED_CONNECTIONS,
1195            &KEEP_N_SOURCE_STATUS_HISTORY_ENTRIES,
1196            &KEEP_N_SINK_STATUS_HISTORY_ENTRIES,
1197            &KEEP_N_PRIVATELINK_STATUS_HISTORY_ENTRIES,
1198            &REPLICA_STATUS_HISTORY_RETENTION_WINDOW,
1199            &ENABLE_STORAGE_SHARD_FINALIZATION,
1200            &ENABLE_DEFAULT_CONNECTION_VALIDATION,
1201            &DEFAULT_TIMESTAMP_INTERVAL,
1202            &MIN_TIMESTAMP_INTERVAL,
1203            &MAX_TIMESTAMP_INTERVAL,
1204            &LOGGING_FILTER,
1205            &OPENTELEMETRY_FILTER,
1206            &LOGGING_FILTER_DEFAULTS,
1207            &OPENTELEMETRY_FILTER_DEFAULTS,
1208            &SENTRY_FILTERS,
1209            &WEBHOOKS_SECRETS_CACHING_TTL_SECS,
1210            &COORD_SLOW_MESSAGE_WARN_THRESHOLD,
1211            &grpc_client::CONNECT_TIMEOUT,
1212            &grpc_client::HTTP2_KEEP_ALIVE_INTERVAL,
1213            &grpc_client::HTTP2_KEEP_ALIVE_TIMEOUT,
1214            &cluster_scheduling::CLUSTER_MULTI_PROCESS_REPLICA_AZ_AFFINITY_WEIGHT,
1215            &cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY,
1216            &cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT,
1217            &cluster_scheduling::CLUSTER_ENABLE_TOPOLOGY_SPREAD,
1218            &cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE,
1219            &cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MAX_SKEW,
1220            &cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MIN_DOMAINS,
1221            &cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_SOFT,
1222            &cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY,
1223            &cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY_WEIGHT,
1224            &cluster_scheduling::CLUSTER_ALTER_CHECK_READY_INTERVAL,
1225            &cluster_scheduling::CLUSTER_CHECK_SCHEDULING_POLICIES_INTERVAL,
1226            &cluster_scheduling::CLUSTER_SECURITY_CONTEXT_ENABLED,
1227            &cluster_scheduling::CLUSTER_REFRESH_MV_COMPACTION_ESTIMATE,
1228            &grpc_client::HTTP2_KEEP_ALIVE_TIMEOUT,
1229            &STATEMENT_LOGGING_MAX_SAMPLE_RATE,
1230            &STATEMENT_LOGGING_DEFAULT_SAMPLE_RATE,
1231            &STATEMENT_LOGGING_TARGET_DATA_RATE,
1232            &STATEMENT_LOGGING_MAX_DATA_CREDIT,
1233            &ENABLE_INTERNAL_STATEMENT_LOGGING,
1234            &OPTIMIZER_STATS_TIMEOUT,
1235            &OPTIMIZER_ONESHOT_STATS_TIMEOUT,
1236            &PRIVATELINK_STATUS_UPDATE_QUOTA_PER_MINUTE,
1237            &WEBHOOK_CONCURRENT_REQUEST_LIMIT,
1238            &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_SIZE,
1239            &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_WAIT,
1240            &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL,
1241            &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL_STAGGER,
1242            &USER_STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION,
1243            &FORCE_SOURCE_TABLE_SYNTAX,
1244            &OPTIMIZER_E2E_LATENCY_WARNING_THRESHOLD,
1245            &SCRAM_ITERATIONS,
1246        ];
1247
1248        let dyncfgs = mz_dyncfgs::all_dyncfgs();
1249        let dyncfg_vars: Vec<_> = dyncfgs
1250            .entries()
1251            .map(|cfg| {
1252                // Most dyncfgs are internal-only; these two must be visible to
1253                // environment superusers so ALTER SYSTEM SET can reach them.
1254                let user_visible = cfg.name() == OIDC_GROUP_CLAIM.name()
1255                    || cfg.name() == OIDC_GROUP_ROLE_SYNC_STRICT.name();
1256                match cfg.default() {
1257                    ConfigVal::Bool(default) => {
1258                        VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), user_visible)
1259                    }
1260                    ConfigVal::U32(default) => {
1261                        VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), user_visible)
1262                    }
1263                    ConfigVal::Usize(default) => {
1264                        VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), user_visible)
1265                    }
1266                    ConfigVal::OptUsize(default) => {
1267                        VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), user_visible)
1268                    }
1269                    ConfigVal::F64(default) => {
1270                        VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), user_visible)
1271                    }
1272                    ConfigVal::String(default) => VarDefinition::new_runtime(
1273                        cfg.name(),
1274                        default.clone(),
1275                        cfg.desc(),
1276                        user_visible,
1277                    ),
1278                    ConfigVal::OptString(default) => VarDefinition::new_runtime(
1279                        cfg.name(),
1280                        default.clone(),
1281                        cfg.desc(),
1282                        user_visible,
1283                    ),
1284                    ConfigVal::Duration(default) => VarDefinition::new_runtime(
1285                        cfg.name(),
1286                        default.clone(),
1287                        cfg.desc(),
1288                        user_visible,
1289                    ),
1290                    ConfigVal::Json(default) => VarDefinition::new_runtime(
1291                        cfg.name(),
1292                        default.clone(),
1293                        cfg.desc(),
1294                        user_visible,
1295                    ),
1296                }
1297            })
1298            .collect();
1299
1300        let vars: BTreeMap<_, _> = system_vars
1301            .into_iter()
1302            // Include all of our feature flags.
1303            .chain(definitions::FEATURE_FLAGS.iter().copied())
1304            // Include the subset of Session variables we allow system defaults for.
1305            .chain(SESSION_SYSTEM_VARS.values().copied())
1306            .cloned()
1307            // Include Persist configs.
1308            .chain(dyncfg_vars)
1309            .map(|var| (var.name, SystemVar::new(var)))
1310            .collect();
1311
1312        let vars = SystemVars {
1313            vars,
1314            callbacks: BTreeMap::new(),
1315            allow_unsafe: false,
1316            dyncfgs,
1317        };
1318
1319        vars
1320    }
1321
1322    pub fn dyncfgs(&self) -> &ConfigSet {
1323        &self.dyncfgs
1324    }
1325
1326    pub fn set_unsafe(mut self, allow_unsafe: bool) -> Self {
1327        self.allow_unsafe = allow_unsafe;
1328        self
1329    }
1330
1331    pub fn allow_unsafe(&self) -> bool {
1332        self.allow_unsafe
1333    }
1334
1335    fn expect_value<V: 'static>(&self, var: &VarDefinition) -> &V {
1336        let val = self
1337            .vars
1338            .get(var.name)
1339            .expect("provided var should be in state");
1340
1341        val.value_dyn()
1342            .as_any()
1343            .downcast_ref::<V>()
1344            .expect("provided var type should matched stored var")
1345    }
1346
1347    fn expect_config_value<V: ConfigType + 'static>(&self, name: &UncasedStr) -> &V {
1348        let val = self
1349            .vars
1350            .get(name)
1351            .unwrap_or_else(|| panic!("provided var {name} should be in state"));
1352
1353        val.value_dyn()
1354            .as_any()
1355            .downcast_ref()
1356            .expect("provided var type should matched stored var")
1357    }
1358
1359    /// Reset all the values to their defaults (preserving
1360    /// defaults from `VarMut::set_default).
1361    pub fn reset_all(&mut self) {
1362        for (_, var) in &mut self.vars {
1363            var.reset();
1364        }
1365    }
1366
1367    /// Returns an iterator over the configuration parameters and their current
1368    /// values on disk.
1369    pub fn iter(&self) -> impl Iterator<Item = &dyn Var> {
1370        self.vars
1371            .values()
1372            .map(|v| v.as_var())
1373            .filter(|v| !SESSION_SYSTEM_VARS.contains_key(UncasedStr::new(v.name())))
1374    }
1375
1376    /// Returns an iterator over the configuration parameters and their current
1377    /// values on disk. Compared to [`SystemVars::iter`], this should omit vars
1378    /// that shouldn't be synced by SystemParameterFrontend.
1379    pub fn iter_synced(&self) -> impl Iterator<Item = &dyn Var> {
1380        self.iter().filter(|v| v.name() != ENABLE_LAUNCHDARKLY.name)
1381    }
1382
1383    /// Returns an iterator over the configuration parameters that can be overriden per-Session.
1384    pub fn iter_session(&self) -> impl Iterator<Item = &dyn Var> {
1385        self.vars
1386            .values()
1387            .map(|v| v.as_var())
1388            .filter(|v| SESSION_SYSTEM_VARS.contains_key(UncasedStr::new(v.name())))
1389    }
1390
1391    /// Returns whether or not this parameter can be modified by a superuser.
1392    pub fn user_modifiable(&self, name: &str) -> bool {
1393        SESSION_SYSTEM_VARS.contains_key(UncasedStr::new(name))
1394            || name == ENABLE_RBAC_CHECKS.name()
1395            || name == NETWORK_POLICY.name()
1396            || name == OIDC_GROUP_CLAIM.name()
1397            || name == OIDC_GROUP_ROLE_SYNC_STRICT.name()
1398    }
1399
1400    /// Returns a [`Var`] representing the configuration parameter with the
1401    /// specified name.
1402    ///
1403    /// Configuration parameters are matched case insensitively. If no such
1404    /// configuration parameter exists, `get` returns an error.
1405    ///
1406    /// Note that:
1407    /// - If `name` is known at compile time, you should instead use the named
1408    /// accessor to access the variable with its true Rust type. For example,
1409    /// `self.get("max_tables").value()` returns the string `"25"` or the
1410    /// current value, while `self.max_tables()` returns an i32.
1411    ///
1412    /// - This function does not check that the access variable should be
1413    /// visible because of other settings or users. Before or after accessing
1414    /// this method, you should call `Var::visible`.
1415    ///
1416    /// # Errors
1417    ///
1418    /// The call will return an error:
1419    /// 1. If `name` does not refer to a valid [`SystemVars`] field.
1420    pub fn get(&self, name: &str) -> Result<&dyn Var, VarError> {
1421        self.vars
1422            .get(UncasedStr::new(name))
1423            .map(|v| v.as_var())
1424            .ok_or_else(|| VarError::UnknownParameter(name.into()))
1425    }
1426
1427    /// Check if the given `values` is the default value for the [`Var`]
1428    /// identified by `name`.
1429    ///
1430    /// Note that this function does not check that the access variable should
1431    /// be visible because of other settings or users. Before or after accessing
1432    /// this method, you should call `Var::visible`.
1433    ///
1434    /// # Errors
1435    ///
1436    /// The call will return an error:
1437    /// 1. If `name` does not refer to a valid [`SystemVars`] field.
1438    /// 2. If `values` does not represent a valid [`SystemVars`] value for
1439    ///    `name`.
1440    pub fn is_default(&self, name: &str, input: VarInput) -> Result<bool, VarError> {
1441        self.vars
1442            .get(UncasedStr::new(name))
1443            .ok_or_else(|| VarError::UnknownParameter(name.into()))
1444            .and_then(|v| v.is_default(input))
1445    }
1446
1447    /// Sets the configuration parameter named `name` to the value represented
1448    /// by `input`.
1449    ///
1450    /// Like with [`SystemVars::get`], configuration parameters are matched case
1451    /// insensitively. If `input` is not valid, as determined by the underlying
1452    /// configuration parameter, or if the named configuration parameter does
1453    /// not exist, an error is returned.
1454    ///
1455    /// Return a `bool` value indicating whether the [`Var`] identified by
1456    /// `name` was modified by this call (it won't be if it already had the
1457    /// given `input`).
1458    ///
1459    /// Note that this function does not check that the access variable should
1460    /// be visible because of other settings or users. Before or after accessing
1461    /// this method, you should call `Var::visible`.
1462    ///
1463    /// # Errors
1464    ///
1465    /// The call will return an error:
1466    /// 1. If `name` does not refer to a valid [`SystemVars`] field.
1467    /// 2. If `input` does not represent a valid [`SystemVars`] value for
1468    ///    `name`.
1469    pub fn set(&mut self, name: &str, input: VarInput) -> Result<bool, VarError> {
1470        let result = self
1471            .vars
1472            .get_mut(UncasedStr::new(name))
1473            .ok_or_else(|| VarError::UnknownParameter(name.into()))
1474            .and_then(|v| v.set(input))?;
1475        self.notify_callbacks(name);
1476        Ok(result)
1477    }
1478
1479    /// Parses the configuration parameter value represented by `input` named
1480    /// `name`.
1481    ///
1482    /// Like with [`SystemVars::get`], configuration parameters are matched case
1483    /// insensitively. If `input` is not valid, as determined by the underlying
1484    /// configuration parameter, or if the named configuration parameter does
1485    /// not exist, an error is returned.
1486    ///
1487    /// Return a `Box<dyn Value>` that is the result of parsing `input`.
1488    ///
1489    /// Note that this function does not check that the access variable should
1490    /// be visible because of other settings or users. Before or after accessing
1491    /// this method, you should call `Var::visible`.
1492    ///
1493    /// # Errors
1494    ///
1495    /// The call will return an error:
1496    /// 1. If `name` does not refer to a valid [`SystemVars`] field.
1497    /// 2. If `input` does not represent a valid [`SystemVars`] value for
1498    ///    `name`.
1499    pub fn parse(&self, name: &str, input: VarInput) -> Result<Box<dyn Value>, VarError> {
1500        self.vars
1501            .get(UncasedStr::new(name))
1502            .ok_or_else(|| VarError::UnknownParameter(name.into()))
1503            .and_then(|v| v.parse(input))
1504    }
1505
1506    /// Set the default for this variable. This is the value this
1507    /// variable will be be `reset` to. If no default is set, the static default in the
1508    /// variable definition is used instead.
1509    ///
1510    /// Note that this function does not check that the access variable should
1511    /// be visible because of other settings or users. Before or after accessing
1512    /// this method, you should call `Var::visible`.
1513    pub fn set_default(&mut self, name: &str, input: VarInput) -> Result<(), VarError> {
1514        self.vars
1515            .get_mut(UncasedStr::new(name))
1516            .ok_or_else(|| VarError::UnknownParameter(name.into()))
1517            .and_then(|v| v.set_default(input))?;
1518        self.notify_callbacks(name);
1519        Ok(())
1520    }
1521
1522    /// Sets the configuration parameter named `name` to its default value.
1523    ///
1524    /// Like with [`SystemVars::get`], configuration parameters are matched case
1525    /// insensitively. If the named configuration parameter does not exist, an
1526    /// error is returned.
1527    ///
1528    /// Return a `bool` value indicating whether the [`Var`] identified by
1529    /// `name` was modified by this call (it won't be if was already reset).
1530    ///
1531    /// Note that this function does not check that the access variable should
1532    /// be visible because of other settings or users. Before or after accessing
1533    /// this method, you should call `Var::visible`.
1534    ///
1535    /// # Errors
1536    ///
1537    /// The call will return an error:
1538    /// 1. If `name` does not refer to a valid [`SystemVars`] field.
1539    pub fn reset(&mut self, name: &str) -> Result<bool, VarError> {
1540        let result = self
1541            .vars
1542            .get_mut(UncasedStr::new(name))
1543            .ok_or_else(|| VarError::UnknownParameter(name.into()))
1544            .map(|v| v.reset())?;
1545        self.notify_callbacks(name);
1546        Ok(result)
1547    }
1548
1549    /// Returns a map from each system parameter's name to its default value.
1550    pub fn defaults(&self) -> BTreeMap<String, String> {
1551        self.vars
1552            .iter()
1553            .map(|(name, var)| {
1554                let default = var
1555                    .dynamic_default
1556                    .as_deref()
1557                    .unwrap_or_else(|| var.definition.default_value());
1558                (name.as_str().to_owned(), default.format())
1559            })
1560            .collect()
1561    }
1562
1563    /// Registers a closure that will get called when the value for the
1564    /// specified [`VarDefinition`] changes.
1565    ///
1566    /// The callback is guaranteed to be called at least once.
1567    pub fn register_callback(
1568        &mut self,
1569        var: &VarDefinition,
1570        callback: Arc<dyn Fn(&SystemVars) + Send + Sync>,
1571    ) {
1572        self.callbacks
1573            .entry(var.name().to_string())
1574            .or_default()
1575            .push(callback);
1576        self.notify_callbacks(var.name());
1577    }
1578
1579    /// Notify any external components interested in this variable.
1580    fn notify_callbacks(&self, name: &str) {
1581        // Get the callbacks interested in this variable.
1582        if let Some(callbacks) = self.callbacks.get(name) {
1583            for callback in callbacks {
1584                (callback)(self);
1585            }
1586        }
1587    }
1588
1589    /// Returns the system default for the [`CLUSTER`] session variable. To know the active cluster
1590    /// for the current session, you must check the [`SessionVars`].
1591    pub fn default_cluster(&self) -> String {
1592        self.expect_value::<String>(&CLUSTER).to_owned()
1593    }
1594
1595    /// Returns the value of the `max_kafka_connections` configuration parameter.
1596    pub fn max_kafka_connections(&self) -> u32 {
1597        *self.expect_value(&MAX_KAFKA_CONNECTIONS)
1598    }
1599
1600    /// Returns the value of the `max_postgres_connections` configuration parameter.
1601    pub fn max_postgres_connections(&self) -> u32 {
1602        *self.expect_value(&MAX_POSTGRES_CONNECTIONS)
1603    }
1604
1605    /// Returns the value of the `max_mysql_connections` configuration parameter.
1606    pub fn max_mysql_connections(&self) -> u32 {
1607        *self.expect_value(&MAX_MYSQL_CONNECTIONS)
1608    }
1609
1610    /// Returns the value of the `max_sql_server_connections` configuration parameter.
1611    pub fn max_sql_server_connections(&self) -> u32 {
1612        *self.expect_value(&MAX_SQL_SERVER_CONNECTIONS)
1613    }
1614
1615    /// Returns the value of the `max_aws_privatelink_connections` configuration parameter.
1616    pub fn max_aws_privatelink_connections(&self) -> u32 {
1617        *self.expect_value(&MAX_AWS_PRIVATELINK_CONNECTIONS)
1618    }
1619
1620    /// Returns the value of the `max_tables` configuration parameter.
1621    pub fn max_tables(&self) -> u32 {
1622        *self.expect_value(&MAX_TABLES)
1623    }
1624
1625    /// Returns the value of the `max_sources` configuration parameter.
1626    pub fn max_sources(&self) -> u32 {
1627        *self.expect_value(&MAX_SOURCES)
1628    }
1629
1630    /// Returns the value of the `max_sinks` configuration parameter.
1631    pub fn max_sinks(&self) -> u32 {
1632        *self.expect_value(&MAX_SINKS)
1633    }
1634
1635    /// Returns the value of the `max_materialized_views` configuration parameter.
1636    pub fn max_materialized_views(&self) -> u32 {
1637        *self.expect_value(&MAX_MATERIALIZED_VIEWS)
1638    }
1639
1640    /// Returns the value of the `max_clusters` configuration parameter.
1641    pub fn max_clusters(&self) -> u32 {
1642        *self.expect_value(&MAX_CLUSTERS)
1643    }
1644
1645    /// Returns the value of the `max_replicas_per_cluster` configuration parameter.
1646    pub fn max_replicas_per_cluster(&self) -> u32 {
1647        *self.expect_value(&MAX_REPLICAS_PER_CLUSTER)
1648    }
1649
1650    /// Returns the value of the `max_credit_consumption_rate` configuration parameter.
1651    pub fn max_credit_consumption_rate(&self) -> Numeric {
1652        *self.expect_value(&MAX_CREDIT_CONSUMPTION_RATE)
1653    }
1654
1655    /// Returns the value of the `max_databases` configuration parameter.
1656    pub fn max_databases(&self) -> u32 {
1657        *self.expect_value(&MAX_DATABASES)
1658    }
1659
1660    /// Returns the value of the `max_schemas_per_database` configuration parameter.
1661    pub fn max_schemas_per_database(&self) -> u32 {
1662        *self.expect_value(&MAX_SCHEMAS_PER_DATABASE)
1663    }
1664
1665    /// Returns the value of the `max_objects_per_schema` configuration parameter.
1666    pub fn max_objects_per_schema(&self) -> u32 {
1667        *self.expect_value(&MAX_OBJECTS_PER_SCHEMA)
1668    }
1669
1670    /// Returns the value of the `max_secrets` configuration parameter.
1671    pub fn max_secrets(&self) -> u32 {
1672        *self.expect_value(&MAX_SECRETS)
1673    }
1674
1675    /// Returns the value of the `max_roles` configuration parameter.
1676    pub fn max_roles(&self) -> u32 {
1677        *self.expect_value(&MAX_ROLES)
1678    }
1679
1680    /// Returns the value of the `max_network_policies` configuration parameter.
1681    pub fn max_network_policies(&self) -> u32 {
1682        *self.expect_value(&MAX_NETWORK_POLICIES)
1683    }
1684
1685    /// Returns the value of the `max_network_policies` configuration parameter.
1686    pub fn max_rules_per_network_policy(&self) -> u32 {
1687        *self.expect_value(&MAX_RULES_PER_NETWORK_POLICY)
1688    }
1689
1690    /// Returns the value of the `max_result_size` configuration parameter.
1691    pub fn max_result_size(&self) -> u64 {
1692        self.expect_value::<ByteSize>(&MAX_RESULT_SIZE).as_bytes()
1693    }
1694
1695    /// Returns the value of the `max_copy_from_row_size` configuration parameter.
1696    pub fn max_copy_from_row_size(&self) -> u64 {
1697        self.expect_value::<ByteSize>(&MAX_COPY_FROM_ROW_SIZE)
1698            .as_bytes()
1699    }
1700
1701    /// Returns the value of the `allowed_cluster_replica_sizes` configuration parameter.
1702    pub fn allowed_cluster_replica_sizes(&self) -> Vec<String> {
1703        self.expect_value::<Vec<Ident>>(&ALLOWED_CLUSTER_REPLICA_SIZES)
1704            .into_iter()
1705            .map(|s| s.as_str().into())
1706            .collect()
1707    }
1708
1709    /// Returns the value of the `default_cluster_replication_factor` configuration parameter.
1710    pub fn default_cluster_replication_factor(&self) -> u32 {
1711        *self.expect_value::<u32>(&DEFAULT_CLUSTER_REPLICATION_FACTOR)
1712    }
1713
1714    pub fn upsert_rocksdb_compaction_style(&self) -> mz_rocksdb_types::config::CompactionStyle {
1715        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_COMPACTION_STYLE)
1716    }
1717
1718    pub fn upsert_rocksdb_optimize_compaction_memtable_budget(&self) -> usize {
1719        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET)
1720    }
1721
1722    pub fn upsert_rocksdb_level_compaction_dynamic_level_bytes(&self) -> bool {
1723        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES)
1724    }
1725
1726    pub fn upsert_rocksdb_universal_compaction_ratio(&self) -> i32 {
1727        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_UNIVERSAL_COMPACTION_RATIO)
1728    }
1729
1730    pub fn upsert_rocksdb_parallelism(&self) -> Option<i32> {
1731        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_PARALLELISM)
1732    }
1733
1734    pub fn upsert_rocksdb_compression_type(&self) -> mz_rocksdb_types::config::CompressionType {
1735        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_COMPRESSION_TYPE)
1736    }
1737
1738    pub fn upsert_rocksdb_bottommost_compression_type(
1739        &self,
1740    ) -> mz_rocksdb_types::config::CompressionType {
1741        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_BOTTOMMOST_COMPRESSION_TYPE)
1742    }
1743
1744    pub fn upsert_rocksdb_batch_size(&self) -> usize {
1745        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_BATCH_SIZE)
1746    }
1747
1748    pub fn upsert_rocksdb_retry_duration(&self) -> Duration {
1749        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_RETRY_DURATION)
1750    }
1751
1752    pub fn upsert_rocksdb_stats_log_interval_seconds(&self) -> u32 {
1753        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_STATS_LOG_INTERVAL_SECONDS)
1754    }
1755
1756    pub fn upsert_rocksdb_stats_persist_interval_seconds(&self) -> u32 {
1757        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_STATS_PERSIST_INTERVAL_SECONDS)
1758    }
1759
1760    pub fn upsert_rocksdb_point_lookup_block_cache_size_mb(&self) -> Option<u32> {
1761        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_POINT_LOOKUP_BLOCK_CACHE_SIZE_MB)
1762    }
1763
1764    pub fn upsert_rocksdb_shrink_allocated_buffers_by_ratio(&self) -> usize {
1765        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_SHRINK_ALLOCATED_BUFFERS_BY_RATIO)
1766    }
1767
1768    pub fn upsert_rocksdb_write_buffer_manager_cluster_memory_fraction(&self) -> Option<Numeric> {
1769        *self.expect_value(
1770            &upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_CLUSTER_MEMORY_FRACTION,
1771        )
1772    }
1773
1774    pub fn upsert_rocksdb_write_buffer_manager_memory_bytes(&self) -> Option<usize> {
1775        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_MEMORY_BYTES)
1776    }
1777
1778    pub fn upsert_rocksdb_write_buffer_manager_allow_stall(&self) -> bool {
1779        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_ALLOW_STALL)
1780    }
1781
1782    pub fn persist_fast_path_limit(&self) -> usize {
1783        *self.expect_value(&PERSIST_FAST_PATH_LIMIT)
1784    }
1785
1786    /// Returns the `pg_source_connect_timeout` configuration parameter.
1787    pub fn pg_source_connect_timeout(&self) -> Duration {
1788        *self.expect_value(&PG_SOURCE_CONNECT_TIMEOUT)
1789    }
1790
1791    /// Returns the `pg_source_tcp_keepalives_retries` configuration parameter.
1792    pub fn pg_source_tcp_keepalives_retries(&self) -> u32 {
1793        *self.expect_value(&PG_SOURCE_TCP_KEEPALIVES_RETRIES)
1794    }
1795
1796    /// Returns the `pg_source_tcp_keepalives_idle` configuration parameter.
1797    pub fn pg_source_tcp_keepalives_idle(&self) -> Duration {
1798        *self.expect_value(&PG_SOURCE_TCP_KEEPALIVES_IDLE)
1799    }
1800
1801    /// Returns the `pg_source_tcp_keepalives_interval` configuration parameter.
1802    pub fn pg_source_tcp_keepalives_interval(&self) -> Duration {
1803        *self.expect_value(&PG_SOURCE_TCP_KEEPALIVES_INTERVAL)
1804    }
1805
1806    /// Returns the `pg_source_tcp_user_timeout` configuration parameter.
1807    pub fn pg_source_tcp_user_timeout(&self) -> Duration {
1808        *self.expect_value(&PG_SOURCE_TCP_USER_TIMEOUT)
1809    }
1810
1811    /// Returns the `pg_source_tcp_configure_server` configuration parameter.
1812    pub fn pg_source_tcp_configure_server(&self) -> bool {
1813        *self.expect_value(&PG_SOURCE_TCP_CONFIGURE_SERVER)
1814    }
1815
1816    /// Returns the `pg_source_snapshot_statement_timeout` configuration parameter.
1817    pub fn pg_source_snapshot_statement_timeout(&self) -> Duration {
1818        *self.expect_value(&PG_SOURCE_SNAPSHOT_STATEMENT_TIMEOUT)
1819    }
1820
1821    /// Returns the `pg_source_wal_sender_timeout` configuration parameter.
1822    pub fn pg_source_wal_sender_timeout(&self) -> Option<Duration> {
1823        *self.expect_value(&PG_SOURCE_WAL_SENDER_TIMEOUT)
1824    }
1825
1826    /// Returns the `pg_source_snapshot_collect_strict_count` configuration parameter.
1827    pub fn pg_source_snapshot_collect_strict_count(&self) -> bool {
1828        *self.expect_value(&PG_SOURCE_SNAPSHOT_COLLECT_STRICT_COUNT)
1829    }
1830
1831    /// Returns the `mysql_source_tcp_keepalive` configuration parameter.
1832    pub fn mysql_source_tcp_keepalive(&self) -> Duration {
1833        *self.expect_value(&MYSQL_SOURCE_TCP_KEEPALIVE)
1834    }
1835
1836    /// Returns the `mysql_source_snapshot_max_execution_time` configuration parameter.
1837    pub fn mysql_source_snapshot_max_execution_time(&self) -> Duration {
1838        *self.expect_value(&MYSQL_SOURCE_SNAPSHOT_MAX_EXECUTION_TIME)
1839    }
1840
1841    /// Returns the `mysql_source_snapshot_lock_wait_timeout` configuration parameter.
1842    pub fn mysql_source_snapshot_lock_wait_timeout(&self) -> Duration {
1843        *self.expect_value(&MYSQL_SOURCE_SNAPSHOT_LOCK_WAIT_TIMEOUT)
1844    }
1845
1846    /// Returns the `mysql_source_connect_timeout` configuration parameter.
1847    pub fn mysql_source_connect_timeout(&self) -> Duration {
1848        *self.expect_value(&MYSQL_SOURCE_CONNECT_TIMEOUT)
1849    }
1850
1851    /// Returns the `ssh_check_interval` configuration parameter.
1852    pub fn ssh_check_interval(&self) -> Duration {
1853        *self.expect_value(&SSH_CHECK_INTERVAL)
1854    }
1855
1856    /// Returns the `ssh_connect_timeout` configuration parameter.
1857    pub fn ssh_connect_timeout(&self) -> Duration {
1858        *self.expect_value(&SSH_CONNECT_TIMEOUT)
1859    }
1860
1861    /// Returns the `ssh_keepalives_idle` configuration parameter.
1862    pub fn ssh_keepalives_idle(&self) -> Duration {
1863        *self.expect_value(&SSH_KEEPALIVES_IDLE)
1864    }
1865
1866    /// Returns the `kafka_socket_keepalive` configuration parameter.
1867    pub fn kafka_socket_keepalive(&self) -> bool {
1868        *self.expect_value(&KAFKA_SOCKET_KEEPALIVE)
1869    }
1870
1871    /// Returns the `kafka_socket_timeout` configuration parameter.
1872    pub fn kafka_socket_timeout(&self) -> Option<Duration> {
1873        *self.expect_value(&KAFKA_SOCKET_TIMEOUT)
1874    }
1875
1876    /// Returns the `kafka_transaction_timeout` configuration parameter.
1877    pub fn kafka_transaction_timeout(&self) -> Duration {
1878        *self.expect_value(&KAFKA_TRANSACTION_TIMEOUT)
1879    }
1880
1881    /// Returns the `kafka_socket_connection_setup_timeout` configuration parameter.
1882    pub fn kafka_socket_connection_setup_timeout(&self) -> Duration {
1883        *self.expect_value(&KAFKA_SOCKET_CONNECTION_SETUP_TIMEOUT)
1884    }
1885
1886    /// Returns the `kafka_fetch_metadata_timeout` configuration parameter.
1887    pub fn kafka_fetch_metadata_timeout(&self) -> Duration {
1888        *self.expect_value(&KAFKA_FETCH_METADATA_TIMEOUT)
1889    }
1890
1891    /// Returns the `kafka_progress_record_fetch_timeout` configuration parameter.
1892    pub fn kafka_progress_record_fetch_timeout(&self) -> Option<Duration> {
1893        *self.expect_value(&KAFKA_PROGRESS_RECORD_FETCH_TIMEOUT)
1894    }
1895
1896    /// Returns the `crdb_connect_timeout` configuration parameter.
1897    pub fn crdb_connect_timeout(&self) -> Duration {
1898        *self.expect_config_value(UncasedStr::new(
1899            mz_persist_client::cfg::CRDB_CONNECT_TIMEOUT.name(),
1900        ))
1901    }
1902
1903    /// Returns the `crdb_tcp_user_timeout` configuration parameter.
1904    pub fn crdb_tcp_user_timeout(&self) -> Duration {
1905        *self.expect_config_value(UncasedStr::new(
1906            mz_persist_client::cfg::CRDB_TCP_USER_TIMEOUT.name(),
1907        ))
1908    }
1909
1910    /// Returns the `crdb_keepalives_idle` configuration parameter.
1911    pub fn crdb_keepalives_idle(&self) -> Duration {
1912        *self.expect_config_value(UncasedStr::new(
1913            mz_persist_client::cfg::CRDB_KEEPALIVES_IDLE.name(),
1914        ))
1915    }
1916
1917    /// Returns the `crdb_keepalives_interval` configuration parameter.
1918    pub fn crdb_keepalives_interval(&self) -> Duration {
1919        *self.expect_config_value(UncasedStr::new(
1920            mz_persist_client::cfg::CRDB_KEEPALIVES_INTERVAL.name(),
1921        ))
1922    }
1923
1924    /// Returns the `crdb_keepalives_retries` configuration parameter.
1925    pub fn crdb_keepalives_retries(&self) -> u32 {
1926        *self.expect_config_value(UncasedStr::new(
1927            mz_persist_client::cfg::CRDB_KEEPALIVES_RETRIES.name(),
1928        ))
1929    }
1930
1931    /// Returns the `storage_dataflow_max_inflight_bytes` configuration parameter.
1932    pub fn storage_dataflow_max_inflight_bytes(&self) -> Option<usize> {
1933        *self.expect_value(&STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES)
1934    }
1935
1936    /// Returns the `storage_dataflow_max_inflight_bytes_to_cluster_size_fraction` configuration parameter.
1937    pub fn storage_dataflow_max_inflight_bytes_to_cluster_size_fraction(&self) -> Option<Numeric> {
1938        *self.expect_value(&STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_TO_CLUSTER_SIZE_FRACTION)
1939    }
1940
1941    /// Returns the `storage_shrink_upsert_unused_buffers_by_ratio` configuration parameter.
1942    pub fn storage_shrink_upsert_unused_buffers_by_ratio(&self) -> usize {
1943        *self.expect_value(&STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO)
1944    }
1945
1946    /// Returns the `storage_dataflow_max_inflight_bytes_disk_only` configuration parameter.
1947    pub fn storage_dataflow_max_inflight_bytes_disk_only(&self) -> bool {
1948        *self.expect_value(&STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_DISK_ONLY)
1949    }
1950
1951    /// Returns the `storage_statistics_interval` configuration parameter.
1952    pub fn storage_statistics_interval(&self) -> Duration {
1953        *self.expect_value(&STORAGE_STATISTICS_INTERVAL)
1954    }
1955
1956    /// Returns the `storage_statistics_collection_interval` configuration parameter.
1957    pub fn storage_statistics_collection_interval(&self) -> Duration {
1958        *self.expect_value(&STORAGE_STATISTICS_COLLECTION_INTERVAL)
1959    }
1960
1961    /// Returns the `storage_record_source_sink_namespaced_errors` configuration parameter.
1962    pub fn storage_record_source_sink_namespaced_errors(&self) -> bool {
1963        *self.expect_value(&STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS)
1964    }
1965
1966    /// Returns the `persist_stats_filter_enabled` configuration parameter.
1967    pub fn persist_stats_filter_enabled(&self) -> bool {
1968        *self.expect_config_value(UncasedStr::new(
1969            mz_persist_client::stats::STATS_FILTER_ENABLED.name(),
1970        ))
1971    }
1972
1973    pub fn scram_iterations(&self) -> NonZeroU32 {
1974        *self.expect_value(&SCRAM_ITERATIONS)
1975    }
1976
1977    pub fn dyncfg_updates(&self) -> ConfigUpdates {
1978        let mut updates = ConfigUpdates::default();
1979        for entry in self.dyncfgs.entries() {
1980            let name = UncasedStr::new(entry.name());
1981            let val = match entry.val() {
1982                ConfigVal::Bool(_) => ConfigVal::from(*self.expect_config_value::<bool>(name)),
1983                ConfigVal::U32(_) => ConfigVal::from(*self.expect_config_value::<u32>(name)),
1984                ConfigVal::Usize(_) => ConfigVal::from(*self.expect_config_value::<usize>(name)),
1985                ConfigVal::OptUsize(_) => {
1986                    ConfigVal::from(*self.expect_config_value::<Option<usize>>(name))
1987                }
1988                ConfigVal::F64(_) => ConfigVal::from(*self.expect_config_value::<f64>(name)),
1989                ConfigVal::String(_) => {
1990                    ConfigVal::from(self.expect_config_value::<String>(name).clone())
1991                }
1992                ConfigVal::OptString(_) => {
1993                    ConfigVal::from(self.expect_config_value::<Option<String>>(name).clone())
1994                }
1995                ConfigVal::Duration(_) => {
1996                    ConfigVal::from(*self.expect_config_value::<Duration>(name))
1997                }
1998                ConfigVal::Json(_) => {
1999                    ConfigVal::from(self.expect_config_value::<serde_json::Value>(name).clone())
2000                }
2001            };
2002            updates.add_dynamic(entry.name(), val);
2003        }
2004        updates.apply(&self.dyncfgs);
2005        updates
2006    }
2007
2008    /// Returns the `metrics_retention` configuration parameter.
2009    pub fn metrics_retention(&self) -> Duration {
2010        *self.expect_value(&METRICS_RETENTION)
2011    }
2012
2013    /// Returns the `unsafe_mock_audit_event_timestamp` configuration parameter.
2014    pub fn unsafe_mock_audit_event_timestamp(&self) -> Option<mz_repr::Timestamp> {
2015        *self.expect_value(&UNSAFE_MOCK_AUDIT_EVENT_TIMESTAMP)
2016    }
2017
2018    /// Returns the `enable_rbac_checks` configuration parameter.
2019    pub fn enable_rbac_checks(&self) -> bool {
2020        *self.expect_value(&ENABLE_RBAC_CHECKS)
2021    }
2022
2023    /// Returns the `max_connections` configuration parameter.
2024    pub fn max_connections(&self) -> u32 {
2025        *self.expect_value(&MAX_CONNECTIONS)
2026    }
2027
2028    pub fn default_network_policy_name(&self) -> String {
2029        self.expect_value::<String>(&NETWORK_POLICY).clone()
2030    }
2031
2032    /// Returns the `superuser_reserved_connections` configuration parameter.
2033    pub fn superuser_reserved_connections(&self) -> u32 {
2034        *self.expect_value(&SUPERUSER_RESERVED_CONNECTIONS)
2035    }
2036
2037    pub fn keep_n_source_status_history_entries(&self) -> usize {
2038        *self.expect_value(&KEEP_N_SOURCE_STATUS_HISTORY_ENTRIES)
2039    }
2040
2041    pub fn keep_n_sink_status_history_entries(&self) -> usize {
2042        *self.expect_value(&KEEP_N_SINK_STATUS_HISTORY_ENTRIES)
2043    }
2044
2045    pub fn keep_n_privatelink_status_history_entries(&self) -> usize {
2046        *self.expect_value(&KEEP_N_PRIVATELINK_STATUS_HISTORY_ENTRIES)
2047    }
2048
2049    pub fn replica_status_history_retention_window(&self) -> Duration {
2050        *self.expect_value(&REPLICA_STATUS_HISTORY_RETENTION_WINDOW)
2051    }
2052
2053    /// Returns the `enable_storage_shard_finalization` configuration parameter.
2054    pub fn enable_storage_shard_finalization(&self) -> bool {
2055        *self.expect_value(&ENABLE_STORAGE_SHARD_FINALIZATION)
2056    }
2057
2058    /// Returns the `enable_default_connection_validation` configuration parameter.
2059    pub fn enable_default_connection_validation(&self) -> bool {
2060        *self.expect_value(&ENABLE_DEFAULT_CONNECTION_VALIDATION)
2061    }
2062
2063    /// Returns the `default_timestamp_interval` configuration parameter.
2064    pub fn default_timestamp_interval(&self) -> Duration {
2065        *self.expect_value(&DEFAULT_TIMESTAMP_INTERVAL)
2066    }
2067
2068    /// Returns the `min_timestamp_interval` configuration parameter.
2069    pub fn min_timestamp_interval(&self) -> Duration {
2070        *self.expect_value(&MIN_TIMESTAMP_INTERVAL)
2071    }
2072    /// Returns the `max_timestamp_interval` configuration parameter.
2073    pub fn max_timestamp_interval(&self) -> Duration {
2074        *self.expect_value(&MAX_TIMESTAMP_INTERVAL)
2075    }
2076
2077    pub fn logging_filter(&self) -> CloneableEnvFilter {
2078        self.expect_value::<CloneableEnvFilter>(&LOGGING_FILTER)
2079            .clone()
2080    }
2081
2082    pub fn opentelemetry_filter(&self) -> CloneableEnvFilter {
2083        self.expect_value::<CloneableEnvFilter>(&OPENTELEMETRY_FILTER)
2084            .clone()
2085    }
2086
2087    pub fn logging_filter_defaults(&self) -> Vec<SerializableDirective> {
2088        self.expect_value::<Vec<SerializableDirective>>(&LOGGING_FILTER_DEFAULTS)
2089            .clone()
2090    }
2091
2092    pub fn opentelemetry_filter_defaults(&self) -> Vec<SerializableDirective> {
2093        self.expect_value::<Vec<SerializableDirective>>(&OPENTELEMETRY_FILTER_DEFAULTS)
2094            .clone()
2095    }
2096
2097    pub fn sentry_filters(&self) -> Vec<SerializableDirective> {
2098        self.expect_value::<Vec<SerializableDirective>>(&SENTRY_FILTERS)
2099            .clone()
2100    }
2101
2102    pub fn webhooks_secrets_caching_ttl_secs(&self) -> usize {
2103        *self.expect_value(&WEBHOOKS_SECRETS_CACHING_TTL_SECS)
2104    }
2105
2106    pub fn coord_slow_message_warn_threshold(&self) -> Duration {
2107        *self.expect_value(&COORD_SLOW_MESSAGE_WARN_THRESHOLD)
2108    }
2109
2110    pub fn grpc_client_http2_keep_alive_interval(&self) -> Duration {
2111        *self.expect_value(&grpc_client::HTTP2_KEEP_ALIVE_INTERVAL)
2112    }
2113
2114    pub fn grpc_client_http2_keep_alive_timeout(&self) -> Duration {
2115        *self.expect_value(&grpc_client::HTTP2_KEEP_ALIVE_TIMEOUT)
2116    }
2117
2118    pub fn grpc_connect_timeout(&self) -> Duration {
2119        *self.expect_value(&grpc_client::CONNECT_TIMEOUT)
2120    }
2121
2122    pub fn cluster_multi_process_replica_az_affinity_weight(&self) -> Option<i32> {
2123        *self.expect_value(&cluster_scheduling::CLUSTER_MULTI_PROCESS_REPLICA_AZ_AFFINITY_WEIGHT)
2124    }
2125
2126    pub fn cluster_soften_replication_anti_affinity(&self) -> bool {
2127        *self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY)
2128    }
2129
2130    pub fn cluster_soften_replication_anti_affinity_weight(&self) -> i32 {
2131        *self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT)
2132    }
2133
2134    pub fn cluster_enable_topology_spread(&self) -> bool {
2135        *self.expect_value(&cluster_scheduling::CLUSTER_ENABLE_TOPOLOGY_SPREAD)
2136    }
2137
2138    pub fn cluster_topology_spread_ignore_non_singular_scale(&self) -> bool {
2139        *self.expect_value(&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE)
2140    }
2141
2142    pub fn cluster_topology_spread_max_skew(&self) -> i32 {
2143        *self.expect_value(&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MAX_SKEW)
2144    }
2145
2146    pub fn cluster_topology_spread_set_min_domains(&self) -> Option<i32> {
2147        *self.expect_value(&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MIN_DOMAINS)
2148    }
2149
2150    pub fn cluster_topology_spread_soft(&self) -> bool {
2151        *self.expect_value(&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_SOFT)
2152    }
2153
2154    pub fn cluster_soften_az_affinity(&self) -> bool {
2155        *self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY)
2156    }
2157
2158    pub fn cluster_soften_az_affinity_weight(&self) -> i32 {
2159        *self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY_WEIGHT)
2160    }
2161
2162    pub fn cluster_alter_check_ready_interval(&self) -> Duration {
2163        *self.expect_value(&cluster_scheduling::CLUSTER_ALTER_CHECK_READY_INTERVAL)
2164    }
2165
2166    pub fn cluster_check_scheduling_policies_interval(&self) -> Duration {
2167        *self.expect_value(&cluster_scheduling::CLUSTER_CHECK_SCHEDULING_POLICIES_INTERVAL)
2168    }
2169
2170    pub fn cluster_security_context_enabled(&self) -> bool {
2171        *self.expect_value(&cluster_scheduling::CLUSTER_SECURITY_CONTEXT_ENABLED)
2172    }
2173
2174    pub fn cluster_refresh_mv_compaction_estimate(&self) -> Duration {
2175        *self.expect_value(&cluster_scheduling::CLUSTER_REFRESH_MV_COMPACTION_ESTIMATE)
2176    }
2177
2178    /// Returns the `privatelink_status_update_quota_per_minute` configuration parameter.
2179    pub fn privatelink_status_update_quota_per_minute(&self) -> u32 {
2180        *self.expect_value(&PRIVATELINK_STATUS_UPDATE_QUOTA_PER_MINUTE)
2181    }
2182
2183    pub fn statement_logging_target_data_rate(&self) -> Option<usize> {
2184        *self.expect_value(&STATEMENT_LOGGING_TARGET_DATA_RATE)
2185    }
2186
2187    pub fn statement_logging_max_data_credit(&self) -> Option<usize> {
2188        *self.expect_value(&STATEMENT_LOGGING_MAX_DATA_CREDIT)
2189    }
2190
2191    /// Returns the `statement_logging_max_sample_rate` configuration parameter.
2192    pub fn statement_logging_max_sample_rate(&self) -> Numeric {
2193        *self.expect_value(&STATEMENT_LOGGING_MAX_SAMPLE_RATE)
2194    }
2195
2196    /// Returns the `statement_logging_default_sample_rate` configuration parameter.
2197    pub fn statement_logging_default_sample_rate(&self) -> Numeric {
2198        *self.expect_value(&STATEMENT_LOGGING_DEFAULT_SAMPLE_RATE)
2199    }
2200
2201    /// Returns the `enable_internal_statement_logging` configuration parameter.
2202    pub fn enable_internal_statement_logging(&self) -> bool {
2203        *self.expect_value(&ENABLE_INTERNAL_STATEMENT_LOGGING)
2204    }
2205
2206    /// Returns the `optimizer_stats_timeout` configuration parameter.
2207    pub fn optimizer_stats_timeout(&self) -> Duration {
2208        *self.expect_value(&OPTIMIZER_STATS_TIMEOUT)
2209    }
2210
2211    /// Returns the `optimizer_oneshot_stats_timeout` configuration parameter.
2212    pub fn optimizer_oneshot_stats_timeout(&self) -> Duration {
2213        *self.expect_value(&OPTIMIZER_ONESHOT_STATS_TIMEOUT)
2214    }
2215
2216    /// Returns the `webhook_concurrent_request_limit` configuration parameter.
2217    pub fn webhook_concurrent_request_limit(&self) -> usize {
2218        *self.expect_value(&WEBHOOK_CONCURRENT_REQUEST_LIMIT)
2219    }
2220
2221    /// Returns the `pg_timestamp_oracle_connection_pool_max_size` configuration parameter.
2222    pub fn pg_timestamp_oracle_connection_pool_max_size(&self) -> usize {
2223        *self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_SIZE)
2224    }
2225
2226    /// Returns the `pg_timestamp_oracle_connection_pool_max_wait` configuration parameter.
2227    pub fn pg_timestamp_oracle_connection_pool_max_wait(&self) -> Option<Duration> {
2228        *self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_WAIT)
2229    }
2230
2231    /// Returns the `pg_timestamp_oracle_connection_pool_ttl` configuration parameter.
2232    pub fn pg_timestamp_oracle_connection_pool_ttl(&self) -> Duration {
2233        *self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL)
2234    }
2235
2236    /// Returns the `pg_timestamp_oracle_connection_pool_ttl_stagger` configuration parameter.
2237    pub fn pg_timestamp_oracle_connection_pool_ttl_stagger(&self) -> Duration {
2238        *self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL_STAGGER)
2239    }
2240
2241    /// Returns the `user_storage_managed_collections_batch_duration` configuration parameter.
2242    pub fn user_storage_managed_collections_batch_duration(&self) -> Duration {
2243        *self.expect_value(&USER_STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION)
2244    }
2245
2246    pub fn force_source_table_syntax(&self) -> bool {
2247        *self.expect_value(&FORCE_SOURCE_TABLE_SYNTAX)
2248    }
2249
2250    pub fn optimizer_e2e_latency_warning_threshold(&self) -> Duration {
2251        *self.expect_value(&OPTIMIZER_E2E_LATENCY_WARNING_THRESHOLD)
2252    }
2253
2254    /// Returns whether the named variable is a controller configuration parameter.
2255    pub fn is_controller_config_var(&self, name: &str) -> bool {
2256        self.is_dyncfg_var(name)
2257    }
2258
2259    /// Returns whether the named variable is a compute configuration parameter
2260    /// (things that go in `ComputeParameters` and are sent to replicas via `UpdateConfiguration`
2261    /// commands).
2262    pub fn is_compute_config_var(&self, name: &str) -> bool {
2263        name == MAX_RESULT_SIZE.name() || self.is_dyncfg_var(name) || is_tracing_var(name)
2264    }
2265
2266    /// Returns whether the named variable is a metrics configuration parameter
2267    pub fn is_metrics_config_var(&self, name: &str) -> bool {
2268        self.is_dyncfg_var(name)
2269    }
2270
2271    /// Returns whether the named variable is a storage configuration parameter.
2272    pub fn is_storage_config_var(&self, name: &str) -> bool {
2273        name == PG_SOURCE_CONNECT_TIMEOUT.name()
2274            || name == PG_SOURCE_TCP_KEEPALIVES_IDLE.name()
2275            || name == PG_SOURCE_TCP_KEEPALIVES_INTERVAL.name()
2276            || name == PG_SOURCE_TCP_KEEPALIVES_RETRIES.name()
2277            || name == PG_SOURCE_TCP_USER_TIMEOUT.name()
2278            || name == PG_SOURCE_TCP_CONFIGURE_SERVER.name()
2279            || name == PG_SOURCE_SNAPSHOT_STATEMENT_TIMEOUT.name()
2280            || name == PG_SOURCE_WAL_SENDER_TIMEOUT.name()
2281            || name == PG_SOURCE_SNAPSHOT_COLLECT_STRICT_COUNT.name()
2282            || name == MYSQL_SOURCE_TCP_KEEPALIVE.name()
2283            || name == MYSQL_SOURCE_SNAPSHOT_MAX_EXECUTION_TIME.name()
2284            || name == MYSQL_SOURCE_SNAPSHOT_LOCK_WAIT_TIMEOUT.name()
2285            || name == MYSQL_SOURCE_CONNECT_TIMEOUT.name()
2286            || name == ENABLE_STORAGE_SHARD_FINALIZATION.name()
2287            || name == SSH_CHECK_INTERVAL.name()
2288            || name == SSH_CONNECT_TIMEOUT.name()
2289            || name == SSH_KEEPALIVES_IDLE.name()
2290            || name == KAFKA_SOCKET_KEEPALIVE.name()
2291            || name == KAFKA_SOCKET_TIMEOUT.name()
2292            || name == KAFKA_TRANSACTION_TIMEOUT.name()
2293            || name == KAFKA_SOCKET_CONNECTION_SETUP_TIMEOUT.name()
2294            || name == KAFKA_FETCH_METADATA_TIMEOUT.name()
2295            || name == KAFKA_PROGRESS_RECORD_FETCH_TIMEOUT.name()
2296            || name == STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES.name()
2297            || name == STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_TO_CLUSTER_SIZE_FRACTION.name()
2298            || name == STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_DISK_ONLY.name()
2299            || name == STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO.name()
2300            || name == STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS.name()
2301            || name == STORAGE_STATISTICS_INTERVAL.name()
2302            || name == STORAGE_STATISTICS_COLLECTION_INTERVAL.name()
2303            || name == USER_STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION.name()
2304            || is_upsert_rocksdb_config_var(name)
2305            || self.is_dyncfg_var(name)
2306            || is_tracing_var(name)
2307    }
2308
2309    /// Returns whether the named variable is a dyncfg configuration parameter.
2310    fn is_dyncfg_var(&self, name: &str) -> bool {
2311        self.dyncfgs.entries().any(|e| name == e.name())
2312    }
2313}
2314
2315pub fn is_tracing_var(name: &str) -> bool {
2316    name == LOGGING_FILTER.name()
2317        || name == LOGGING_FILTER_DEFAULTS.name()
2318        || name == OPENTELEMETRY_FILTER.name()
2319        || name == OPENTELEMETRY_FILTER_DEFAULTS.name()
2320        || name == SENTRY_FILTERS.name()
2321}
2322
2323/// Returns whether the named variable is a caching configuration parameter.
2324pub fn is_secrets_caching_var(name: &str) -> bool {
2325    name == WEBHOOKS_SECRETS_CACHING_TTL_SECS.name()
2326}
2327
2328fn is_upsert_rocksdb_config_var(name: &str) -> bool {
2329    name == upsert_rocksdb::UPSERT_ROCKSDB_COMPACTION_STYLE.name()
2330        || name == upsert_rocksdb::UPSERT_ROCKSDB_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET.name()
2331        || name == upsert_rocksdb::UPSERT_ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES.name()
2332        || name == upsert_rocksdb::UPSERT_ROCKSDB_UNIVERSAL_COMPACTION_RATIO.name()
2333        || name == upsert_rocksdb::UPSERT_ROCKSDB_PARALLELISM.name()
2334        || name == upsert_rocksdb::UPSERT_ROCKSDB_COMPRESSION_TYPE.name()
2335        || name == upsert_rocksdb::UPSERT_ROCKSDB_BOTTOMMOST_COMPRESSION_TYPE.name()
2336        || name == upsert_rocksdb::UPSERT_ROCKSDB_BATCH_SIZE.name()
2337        || name == upsert_rocksdb::UPSERT_ROCKSDB_STATS_LOG_INTERVAL_SECONDS.name()
2338        || name == upsert_rocksdb::UPSERT_ROCKSDB_STATS_PERSIST_INTERVAL_SECONDS.name()
2339        || name == upsert_rocksdb::UPSERT_ROCKSDB_POINT_LOOKUP_BLOCK_CACHE_SIZE_MB.name()
2340        || name == upsert_rocksdb::UPSERT_ROCKSDB_SHRINK_ALLOCATED_BUFFERS_BY_RATIO.name()
2341}
2342
2343/// Returns whether the named variable is a (Postgres/CRDB) timestamp oracle
2344/// configuration parameter.
2345pub fn is_timestamp_oracle_config_var(name: &str) -> bool {
2346    name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_SIZE.name()
2347        || name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_WAIT.name()
2348        || name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL.name()
2349        || name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL_STAGGER.name()
2350        || name == CRDB_CONNECT_TIMEOUT.name()
2351        || name == CRDB_TCP_USER_TIMEOUT.name()
2352        || name == CRDB_KEEPALIVES_IDLE.name()
2353        || name == CRDB_KEEPALIVES_INTERVAL.name()
2354        || name == CRDB_KEEPALIVES_RETRIES.name()
2355}
2356
2357/// Returns whether the named variable is a cluster scheduling config
2358pub fn is_cluster_scheduling_var(name: &str) -> bool {
2359    name == cluster_scheduling::CLUSTER_MULTI_PROCESS_REPLICA_AZ_AFFINITY_WEIGHT.name()
2360        || name == cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY.name()
2361        || name == cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT.name()
2362        || name == cluster_scheduling::CLUSTER_ENABLE_TOPOLOGY_SPREAD.name()
2363        || name == cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE.name()
2364        || name == cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MAX_SKEW.name()
2365        || name == cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_SOFT.name()
2366        || name == cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY.name()
2367        || name == cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY_WEIGHT.name()
2368}
2369
2370/// Returns whether the named variable is an HTTP server related config var.
2371pub fn is_http_config_var(name: &str) -> bool {
2372    name == WEBHOOK_CONCURRENT_REQUEST_LIMIT.name()
2373}
2374
2375/// Set of [`SystemVar`]s that can also get set at a per-Session level.
2376///
2377/// TODO(parkmycar): Instead of a separate list, make this a field on VarDefinition.
2378static SESSION_SYSTEM_VARS: LazyLock<BTreeMap<&'static UncasedStr, &'static VarDefinition>> =
2379    LazyLock::new(|| {
2380        [
2381            &APPLICATION_NAME,
2382            &CLIENT_ENCODING,
2383            &CLIENT_MIN_MESSAGES,
2384            &CLUSTER,
2385            &CLUSTER_REPLICA,
2386            &DEFAULT_CLUSTER_REPLICATION_FACTOR,
2387            &CURRENT_OBJECT_MISSING_WARNINGS,
2388            &DATABASE,
2389            &DATE_STYLE,
2390            &EXTRA_FLOAT_DIGITS,
2391            &INTEGER_DATETIMES,
2392            &INTERVAL_STYLE,
2393            &REAL_TIME_RECENCY_TIMEOUT,
2394            &SEARCH_PATH,
2395            &STANDARD_CONFORMING_STRINGS,
2396            &STATEMENT_TIMEOUT,
2397            &IDLE_IN_TRANSACTION_SESSION_TIMEOUT,
2398            &TIMEZONE,
2399            &TRANSACTION_ISOLATION,
2400            &MAX_QUERY_RESULT_SIZE,
2401        ]
2402        .into_iter()
2403        .map(|var| (UncasedStr::new(var.name()), var))
2404        .collect()
2405    });
2406
2407// Provides a wrapper to express that a particular `ServerVar` is meant to be used as a feature
2408/// flag.
2409#[derive(Debug)]
2410pub struct FeatureFlag {
2411    pub flag: &'static VarDefinition,
2412    pub feature_desc: &'static str,
2413}
2414
2415impl FeatureFlag {
2416    /// Returns an error unless the feature flag is enabled in the provided
2417    /// `system_vars`.
2418    pub fn require(&'static self, system_vars: &SystemVars) -> Result<(), VarError> {
2419        match *system_vars.expect_value::<bool>(self.flag) {
2420            true => Ok(()),
2421            false => Err(VarError::RequiresFeatureFlag { feature_flag: self }),
2422        }
2423    }
2424}
2425
2426impl PartialEq for FeatureFlag {
2427    fn eq(&self, other: &FeatureFlag) -> bool {
2428        self.flag.name() == other.flag.name()
2429    }
2430}
2431
2432impl Eq for FeatureFlag {}
2433
2434impl Var for MzVersion {
2435    fn name(&self) -> &'static str {
2436        MZ_VERSION_NAME.as_str()
2437    }
2438
2439    fn value(&self) -> String {
2440        self.build_info
2441            .human_version(self.helm_chart_version.clone())
2442    }
2443
2444    fn description(&self) -> &'static str {
2445        "Shows the Materialize server version (Materialize)."
2446    }
2447
2448    fn type_name(&self) -> Cow<'static, str> {
2449        String::type_name()
2450    }
2451
2452    fn visible(&self, _: &User, _: &SystemVars) -> Result<(), VarError> {
2453        Ok(())
2454    }
2455}
2456
2457impl Var for User {
2458    fn name(&self) -> &'static str {
2459        IS_SUPERUSER_NAME.as_str()
2460    }
2461
2462    fn value(&self) -> String {
2463        self.is_superuser().format()
2464    }
2465
2466    fn description(&self) -> &'static str {
2467        "Reports whether the current session is a superuser (PostgreSQL)."
2468    }
2469
2470    fn type_name(&self) -> Cow<'static, str> {
2471        bool::type_name()
2472    }
2473
2474    fn visible(&self, _: &User, _: &SystemVars) -> Result<(), VarError> {
2475        Ok(())
2476    }
2477}