mz_sql/session/
vars.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Run-time configuration parameters
11//!
12//! ## Overview
13//! Materialize roughly follows the PostgreSQL configuration model, which works
14//! as follows. There is a global set of named configuration parameters, like
15//! `DateStyle` and `client_encoding`. These parameters can be set in several
16//! places: in an on-disk configuration file (in Postgres, named
17//! postgresql.conf), in command line arguments when the server is started, or
18//! at runtime via the `ALTER SYSTEM` or `SET` statements. Parameters that are
19//! set in a session take precedence over database defaults, which in turn take
20//! precedence over command line arguments, which in turn take precedence over
21//! settings in the on-disk configuration. Note that changing the value of
22//! parameters obeys transaction semantics: if a transaction fails to commit,
23//! any parameters that were changed in that transaction (i.e., via `SET`) will
24//! be rolled back to their previous value.
25//!
26//! The Materialize configuration hierarchy at the moment is much simpler.
27//! Global defaults are hardcoded into the binary, and a select few parameters
28//! can be overridden per session. A select few parameters can be overridden on
29//! disk.
30//!
31//! The set of variables that can be overridden per session and the set of
32//! variables that can be overridden on disk are currently disjoint. The
33//! infrastructure has been designed with an eye towards merging these two sets
34//! and supporting additional layers to the hierarchy, however, should the need
35//! arise.
36//!
37//! The configuration parameters that exist are driven by compatibility with
38//! PostgreSQL drivers that expect them, not because they are particularly
39//! important.
40//!
41//! ## Structure
42//! Thw most meaningful exports from this module are:
43//!
44//! - [`SessionVars`] represent per-session parameters, which each user can
45//!   access independently of one another, and are accessed via `SET`.
46//!
47//!   The fields of [`SessionVars`] are either;
48//!     - `SessionVar`, which is preferable and simply requires full support of
49//!       the `SessionVar` impl for its embedded value type.
50//!     - `ServerVar` for types that do not currently support everything
51//!       required by `SessionVar`, e.g. they are fixed-value parameters.
52//!
53//!   In the fullness of time, all fields in [`SessionVars`] should be
54//!   `SessionVar`.
55//!
56//! - [`SystemVars`] represent system-wide configuration settings and are
57//!   accessed via `ALTER SYSTEM SET`.
58//!
59//!   All elements of [`SystemVars`] are `SystemVar`.
60//!
61//! Some [`VarDefinition`] are also marked as a [`FeatureFlag`]; this is just a
62//! wrapper to make working with a set of [`VarDefinition`] easier, primarily from
63//! within SQL planning, where we might want to check if a feature is enabled
64//! before planning it.
65
66use std::borrow::Cow;
67use std::clone::Clone;
68use std::collections::BTreeMap;
69use std::fmt::Debug;
70use std::net::IpAddr;
71use std::string::ToString;
72use std::sync::{Arc, LazyLock};
73use std::time::Duration;
74
75use chrono::{DateTime, Utc};
76use derivative::Derivative;
77use im::OrdMap;
78use mz_build_info::BuildInfo;
79use mz_dyncfg::{ConfigSet, ConfigType, ConfigUpdates, ConfigVal};
80use mz_persist_client::cfg::{CRDB_CONNECT_TIMEOUT, CRDB_TCP_USER_TIMEOUT};
81use mz_repr::adt::numeric::Numeric;
82use mz_repr::adt::timestamp::CheckedTimestamp;
83use mz_repr::bytes::ByteSize;
84use mz_repr::user::ExternalUserMetadata;
85use mz_tracing::{CloneableEnvFilter, SerializableDirective};
86use serde::Serialize;
87use thiserror::Error;
88use tracing::error;
89use uncased::UncasedStr;
90
91use crate::ast::Ident;
92use crate::session::user::User;
93
94pub(crate) mod constraints;
95pub(crate) mod definitions;
96pub(crate) mod errors;
97pub(crate) mod polyfill;
98pub(crate) mod value;
99
100pub use definitions::*;
101pub use errors::*;
102pub use value::*;
103
104/// The action to take during end_transaction.
105///
106/// This enum lives here because of convenience: it's more of an adapter
107/// concept but [`SessionVars::end_transaction`] takes it.
108#[derive(Debug, Clone, Copy, PartialEq, Eq)]
109pub enum EndTransactionAction {
110    /// Commit the transaction.
111    Commit,
112    /// Rollback the transaction.
113    Rollback,
114}
115
116/// Represents the input to a variable.
117///
118/// Each variable has different rules for how it handles each style of input.
119/// This type allows us to defer interpretation of the input until the
120/// variable-specific interpretation can be applied.
121#[derive(Debug, Clone, Copy)]
122pub enum VarInput<'a> {
123    /// The input has been flattened into a single string.
124    Flat(&'a str),
125    /// The input comes from a SQL `SET` statement and is jumbled across
126    /// multiple components.
127    SqlSet(&'a [String]),
128}
129
130impl<'a> VarInput<'a> {
131    /// Converts the variable input to an owned vector of strings.
132    pub fn to_vec(&self) -> Vec<String> {
133        match self {
134            VarInput::Flat(v) => vec![v.to_string()],
135            VarInput::SqlSet(values) => values.into_iter().map(|v| v.to_string()).collect(),
136        }
137    }
138}
139
140/// An owned version of [`VarInput`].
141#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize)]
142pub enum OwnedVarInput {
143    /// See [`VarInput::Flat`].
144    Flat(String),
145    /// See [`VarInput::SqlSet`].
146    SqlSet(Vec<String>),
147}
148
149impl OwnedVarInput {
150    /// Converts this owned variable input as a [`VarInput`].
151    pub fn borrow(&self) -> VarInput {
152        match self {
153            OwnedVarInput::Flat(v) => VarInput::Flat(v),
154            OwnedVarInput::SqlSet(v) => VarInput::SqlSet(v),
155        }
156    }
157}
158
159/// A `Var` represents a configuration parameter of an arbitrary type.
160pub trait Var: Debug {
161    /// Returns the name of the configuration parameter.
162    fn name(&self) -> &'static str;
163
164    /// Constructs a flattened string representation of the current value of the
165    /// configuration parameter.
166    ///
167    /// The resulting string is guaranteed to be parsable if provided to
168    /// `Value::parse` as a [`VarInput::Flat`].
169    fn value(&self) -> String;
170
171    /// Returns a short sentence describing the purpose of the configuration
172    /// parameter.
173    fn description(&self) -> &'static str;
174
175    /// Returns the name of the type of this variable.
176    fn type_name(&self) -> Cow<'static, str>;
177
178    /// Indicates wither the [`Var`] is visible as a function of the `user` and `system_vars`.
179    /// "Invisible" parameters return `VarErrors`.
180    ///
181    /// Variables marked as `internal` are only visible for the system user.
182    fn visible(&self, user: &User, system_vars: &SystemVars) -> Result<(), VarError>;
183
184    /// Reports whether the variable is only visible in unsafe mode.
185    fn is_unsafe(&self) -> bool {
186        self.name().starts_with("unsafe_")
187    }
188
189    /// Upcast `self` to a `dyn Var`, useful when working with multiple different implementors of
190    /// [`Var`].
191    fn as_var(&self) -> &dyn Var
192    where
193        Self: Sized,
194    {
195        self
196    }
197}
198
199/// A `SessionVar` is the session value for a configuration parameter. If unset,
200/// the server default is used instead.
201///
202/// Note: even though all of the different `*_value` fields are `Box<dyn Value>` they are enforced
203/// to be the same type because we use the `definition`s `parse(...)` method. This is guaranteed to
204/// return the same type as the compiled in default.
205#[derive(Debug)]
206pub struct SessionVar {
207    definition: VarDefinition,
208    /// System or Role default value.
209    default_value: Option<Box<dyn Value>>,
210    /// Value `LOCAL` to a transaction, will be unset at the completion of the transaction.
211    local_value: Option<Box<dyn Value>>,
212    /// Value set during a transaction, will be set if the transaction is committed.
213    staged_value: Option<Box<dyn Value>>,
214    /// Value that overrides the default.
215    session_value: Option<Box<dyn Value>>,
216}
217
218impl Clone for SessionVar {
219    fn clone(&self) -> Self {
220        SessionVar {
221            definition: self.definition.clone(),
222            default_value: self.default_value.as_ref().map(|v| v.box_clone()),
223            local_value: self.local_value.as_ref().map(|v| v.box_clone()),
224            staged_value: self.staged_value.as_ref().map(|v| v.box_clone()),
225            session_value: self.session_value.as_ref().map(|v| v.box_clone()),
226        }
227    }
228}
229
230impl SessionVar {
231    pub const fn new(var: VarDefinition) -> Self {
232        SessionVar {
233            definition: var,
234            default_value: None,
235            local_value: None,
236            staged_value: None,
237            session_value: None,
238        }
239    }
240
241    /// Checks if the provided [`VarInput`] is valid for the current session variable, returning
242    /// the formatted output if it's valid.
243    pub fn check(&self, input: VarInput) -> Result<String, VarError> {
244        let v = self.definition.parse(input)?;
245        self.validate_constraints(v.as_ref())?;
246
247        Ok(v.format())
248    }
249
250    /// Parse the input and update the stored value to match.
251    pub fn set(&mut self, input: VarInput, local: bool) -> Result<(), VarError> {
252        let v = self.definition.parse(input)?;
253
254        // Validate our parsed value.
255        self.validate_constraints(v.as_ref())?;
256
257        if local {
258            self.local_value = Some(v);
259        } else {
260            self.local_value = None;
261            self.staged_value = Some(v);
262        }
263        Ok(())
264    }
265
266    /// Sets the default value for the variable.
267    pub fn set_default(&mut self, input: VarInput) -> Result<(), VarError> {
268        let v = self.definition.parse(input)?;
269        self.validate_constraints(v.as_ref())?;
270        self.default_value = Some(v);
271        Ok(())
272    }
273
274    /// Reset the stored value to the default.
275    pub fn reset(&mut self, local: bool) {
276        let value = self
277            .default_value
278            .as_ref()
279            .map(|v| v.as_ref())
280            .unwrap_or_else(|| self.definition.value.value());
281        if local {
282            self.local_value = Some(value.box_clone());
283        } else {
284            self.local_value = None;
285            self.staged_value = Some(value.box_clone());
286        }
287    }
288
289    /// Returns a possibly new SessionVar if this needs to mutate at transaction end.
290    #[must_use]
291    pub fn end_transaction(&self, action: EndTransactionAction) -> Option<Self> {
292        if !self.is_mutating() {
293            return None;
294        }
295        let mut next: Self = self.clone();
296        next.local_value = None;
297        match action {
298            EndTransactionAction::Commit if next.staged_value.is_some() => {
299                next.session_value = next.staged_value.take()
300            }
301            _ => next.staged_value = None,
302        }
303        Some(next)
304    }
305
306    /// Whether this Var needs to mutate at the end of a transaction.
307    pub fn is_mutating(&self) -> bool {
308        self.local_value.is_some() || self.staged_value.is_some()
309    }
310
311    pub fn value_dyn(&self) -> &dyn Value {
312        self.local_value
313            .as_deref()
314            .or(self.staged_value.as_deref())
315            .or(self.session_value.as_deref())
316            .or(self.default_value.as_deref())
317            .unwrap_or_else(|| self.definition.value.value())
318    }
319
320    /// Returns the [`Value`] that is currently stored as the `session_value`.
321    ///
322    /// Note: This should __only__ be used for inspection, if you want to determine the current
323    /// value of this [`SessionVar`] you should use [`SessionVar::value`].
324    pub fn inspect_session_value(&self) -> Option<&dyn Value> {
325        self.session_value.as_deref()
326    }
327
328    fn validate_constraints(&self, val: &dyn Value) -> Result<(), VarError> {
329        if let Some(constraint) = &self.definition.constraint {
330            constraint.check_constraint(self, self.value_dyn(), val)
331        } else {
332            Ok(())
333        }
334    }
335}
336
337impl Var for SessionVar {
338    fn name(&self) -> &'static str {
339        self.definition.name.as_str()
340    }
341
342    fn value(&self) -> String {
343        self.value_dyn().format()
344    }
345
346    fn description(&self) -> &'static str {
347        self.definition.description
348    }
349
350    fn type_name(&self) -> Cow<'static, str> {
351        self.definition.type_name()
352    }
353
354    fn visible(
355        &self,
356        user: &User,
357        system_vars: &super::vars::SystemVars,
358    ) -> Result<(), super::vars::VarError> {
359        self.definition.visible(user, system_vars)
360    }
361}
362
363#[derive(Debug, Clone, PartialEq, Eq)]
364pub struct MzVersion {
365    /// Inputs to computed variables.
366    build_info: &'static BuildInfo,
367    /// Helm chart version
368    helm_chart_version: Option<String>,
369}
370
371impl MzVersion {
372    pub fn new(build_info: &'static BuildInfo, helm_chart_version: Option<String>) -> Self {
373        MzVersion {
374            build_info,
375            helm_chart_version,
376        }
377    }
378}
379
380/// Session variables.
381///
382/// See the [`crate::session::vars`] module documentation for more details on the
383/// Materialize configuration model.
384#[derive(Debug, Clone)]
385pub struct SessionVars {
386    /// The set of all session variables.
387    vars: OrdMap<&'static UncasedStr, SessionVar>,
388    /// Inputs to computed variables.
389    mz_version: MzVersion,
390    /// Information about the user associated with this Session.
391    user: User,
392}
393
394impl SessionVars {
395    /// Creates a new [`SessionVars`] without considering the System or Role defaults.
396    pub fn new_unchecked(
397        build_info: &'static BuildInfo,
398        user: User,
399        helm_chart_version: Option<String>,
400    ) -> SessionVars {
401        use definitions::*;
402
403        let vars = [
404            &FAILPOINTS,
405            &SERVER_VERSION,
406            &SERVER_VERSION_NUM,
407            &SQL_SAFE_UPDATES,
408            &REAL_TIME_RECENCY,
409            &EMIT_PLAN_INSIGHTS_NOTICE,
410            &EMIT_TIMESTAMP_NOTICE,
411            &EMIT_TRACE_ID_NOTICE,
412            &AUTO_ROUTE_CATALOG_QUERIES,
413            &ENABLE_SESSION_RBAC_CHECKS,
414            &ENABLE_SESSION_CARDINALITY_ESTIMATES,
415            &MAX_IDENTIFIER_LENGTH,
416            &STATEMENT_LOGGING_SAMPLE_RATE,
417            &EMIT_INTROSPECTION_QUERY_NOTICE,
418            &UNSAFE_NEW_TRANSACTION_WALL_TIME,
419            &WELCOME_MESSAGE,
420        ]
421        .into_iter()
422        .chain(SESSION_SYSTEM_VARS.iter().map(|(_name, var)| *var))
423        .map(|var| (var.name, SessionVar::new(var.clone())))
424        .collect();
425
426        SessionVars {
427            vars,
428            mz_version: MzVersion::new(build_info, helm_chart_version),
429            user,
430        }
431    }
432
433    fn expect_value<V: Value>(&self, var: &VarDefinition) -> &V {
434        let var = self
435            .vars
436            .get(var.name)
437            .expect("provided var should be in state");
438        let val = var.value_dyn();
439        val.as_any().downcast_ref::<V>().expect("success")
440    }
441
442    /// Returns an iterator over the configuration parameters and their current
443    /// values for this session.
444    ///
445    /// Note that this function does not check that the access variable should
446    /// be visible because of other settings or users. Before or after accessing
447    /// this method, you should call `Var::visible`.
448    pub fn iter(&self) -> impl Iterator<Item = &dyn Var> {
449        #[allow(clippy::as_conversions)]
450        self.vars
451            .values()
452            .map(|v| v.as_var())
453            .chain([&self.mz_version as &dyn Var, &self.user])
454    }
455
456    /// Returns an iterator over configuration parameters (and their current
457    /// values for this session) that are expected to be sent to the client when
458    /// a new connection is established or when their value changes.
459    pub fn notify_set(&self) -> impl Iterator<Item = &dyn Var> {
460        // WARNING: variables in this set are not checked for visibility, and
461        // are assumed to be visible for all sessions.
462        //
463        // This is fixible with some elbow grease, but at the moment it seems
464        // unlikely that we'll have a variable in the notify set that shouldn't
465        // be visible to all sessions.
466        [
467            &APPLICATION_NAME,
468            &CLIENT_ENCODING,
469            &DATE_STYLE,
470            &INTEGER_DATETIMES,
471            &SERVER_VERSION,
472            &STANDARD_CONFORMING_STRINGS,
473            &TIMEZONE,
474            &INTERVAL_STYLE,
475            // Including `cluster`, `cluster_replica`, `database`, and `search_path` in the notify
476            // set is a Materialize extension. Doing so allows users to more easily identify where
477            // their queries will be executing, which is important to know when you consider the
478            // size of a cluster, what indexes are present, etc.
479            &CLUSTER,
480            &CLUSTER_REPLICA,
481            &DEFAULT_CLUSTER_REPLICATION_FACTOR,
482            &DATABASE,
483            &SEARCH_PATH,
484        ]
485        .into_iter()
486        .map(|v| self.vars[v.name].as_var())
487        // Including `mz_version` in the notify set is a Materialize
488        // extension. Doing so allows applications to detect whether they
489        // are talking to Materialize or PostgreSQL without an additional
490        // network roundtrip. This is known to be safe because CockroachDB
491        // has an analogous extension [0].
492        // [0]: https://github.com/cockroachdb/cockroach/blob/369c4057a/pkg/sql/pgwire/conn.go#L1840
493        .chain(std::iter::once(self.mz_version.as_var()))
494    }
495
496    /// Resets all variables to their default value.
497    pub fn reset_all(&mut self) {
498        let names: Vec<_> = self.vars.keys().copied().collect();
499        for name in names {
500            self.vars[name].reset(false);
501        }
502    }
503
504    /// Returns a [`Var`] representing the configuration parameter with the
505    /// specified name.
506    ///
507    /// Configuration parameters are matched case insensitively. If no such
508    /// configuration parameter exists, `get` returns an error.
509    ///
510    /// Note that if `name` is known at compile time, you should instead use the
511    /// named accessor to access the variable with its true Rust type. For
512    /// example, `self.get("sql_safe_updates").value()` returns the string
513    /// `"true"` or `"false"`, while `self.sql_safe_updates()` returns a bool.
514    pub fn get(&self, system_vars: &SystemVars, name: &str) -> Result<&dyn Var, VarError> {
515        let name = compat_translate_name(name);
516
517        let name = UncasedStr::new(name);
518        if name == MZ_VERSION_NAME {
519            Ok(&self.mz_version)
520        } else if name == IS_SUPERUSER_NAME {
521            Ok(&self.user)
522        } else {
523            self.vars
524                .get(name)
525                .map(|v| {
526                    v.visible(&self.user, system_vars)?;
527                    Ok(v.as_var())
528                })
529                .transpose()?
530                .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
531        }
532    }
533
534    /// Returns a [`SessionVar`] for inspection.
535    ///
536    /// Note: If you're trying to determine the value of the variable with `name` you should
537    /// instead use the named accessor, or [`SessionVars::get`].
538    pub fn inspect(&self, name: &str) -> Result<&SessionVar, VarError> {
539        let name = compat_translate_name(name);
540
541        self.vars
542            .get(UncasedStr::new(name))
543            .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
544    }
545
546    /// Sets the configuration parameter named `name` to the value represented
547    /// by `value`.
548    ///
549    /// The new value may be either committed or rolled back by the next call to
550    /// [`SessionVars::end_transaction`]. If `local` is true, the new value is always
551    /// discarded by the next call to [`SessionVars::end_transaction`], even if the
552    /// transaction is marked to commit.
553    ///
554    /// Like with [`SessionVars::get`], configuration parameters are matched case
555    /// insensitively. If `value` is not valid, as determined by the underlying
556    /// configuration parameter, or if the named configuration parameter does
557    /// not exist, an error is returned.
558    pub fn set(
559        &mut self,
560        system_vars: &SystemVars,
561        name: &str,
562        input: VarInput,
563        local: bool,
564    ) -> Result<(), VarError> {
565        let (name, input) = compat_translate(name, input);
566
567        let name = UncasedStr::new(name);
568        self.check_read_only(name)?;
569
570        self.vars
571            .get_mut(name)
572            .map(|v| {
573                v.visible(&self.user, system_vars)?;
574                v.set(input, local)
575            })
576            .transpose()?
577            .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
578    }
579
580    /// Sets the default value for the parameter named `name` to the value
581    /// represented by `value`.
582    pub fn set_default(&mut self, name: &str, input: VarInput) -> Result<(), VarError> {
583        let (name, input) = compat_translate(name, input);
584
585        let name = UncasedStr::new(name);
586        self.check_read_only(name)?;
587
588        self.vars
589            .get_mut(name)
590            // Note: visibility is checked when persisting a role default.
591            .map(|v| v.set_default(input))
592            .transpose()?
593            .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
594    }
595
596    /// Sets the configuration parameter named `name` to its default value.
597    ///
598    /// The new value may be either committed or rolled back by the next call to
599    /// [`SessionVars::end_transaction`]. If `local` is true, the new value is
600    /// always discarded by the next call to [`SessionVars::end_transaction`],
601    /// even if the transaction is marked to commit.
602    ///
603    /// Like with [`SessionVars::get`], configuration parameters are matched
604    /// case insensitively. If the named configuration parameter does not exist,
605    /// an error is returned.
606    ///
607    /// If the variable does not exist or the user does not have the visibility
608    /// requires, this function returns an error.
609    pub fn reset(
610        &mut self,
611        system_vars: &SystemVars,
612        name: &str,
613        local: bool,
614    ) -> Result<(), VarError> {
615        let name = compat_translate_name(name);
616
617        let name = UncasedStr::new(name);
618        self.check_read_only(name)?;
619
620        self.vars
621            .get_mut(name)
622            .map(|v| {
623                v.visible(&self.user, system_vars)?;
624                v.reset(local);
625                Ok(())
626            })
627            .transpose()?
628            .ok_or_else(|| VarError::UnknownParameter(name.to_string()))
629    }
630
631    /// Returns an error if the variable corresponding to `name` is read only.
632    fn check_read_only(&self, name: &UncasedStr) -> Result<(), VarError> {
633        if name == MZ_VERSION_NAME {
634            Err(VarError::ReadOnlyParameter(MZ_VERSION_NAME.as_str()))
635        } else if name == IS_SUPERUSER_NAME {
636            Err(VarError::ReadOnlyParameter(IS_SUPERUSER_NAME.as_str()))
637        } else if name == MAX_IDENTIFIER_LENGTH.name {
638            Err(VarError::ReadOnlyParameter(
639                MAX_IDENTIFIER_LENGTH.name.as_str(),
640            ))
641        } else {
642            Ok(())
643        }
644    }
645
646    /// Commits or rolls back configuration parameter updates made via
647    /// [`SessionVars::set`] since the last call to `end_transaction`.
648    ///
649    /// Returns any session parameters that changed because the transaction ended.
650    #[mz_ore::instrument(level = "debug")]
651    pub fn end_transaction(
652        &mut self,
653        action: EndTransactionAction,
654    ) -> BTreeMap<&'static str, String> {
655        let mut changed = BTreeMap::new();
656        let mut updates = Vec::new();
657        for (name, var) in self.vars.iter() {
658            if !var.is_mutating() {
659                continue;
660            }
661            let before = var.value();
662            let next = var.end_transaction(action).expect("must mutate");
663            let after = next.value();
664            updates.push((*name, next));
665
666            // Report the new value of the parameter.
667            if before != after {
668                changed.insert(var.name(), after);
669            }
670        }
671        self.vars.extend(updates);
672        changed
673    }
674
675    /// Returns the value of the `application_name` configuration parameter.
676    pub fn application_name(&self) -> &str {
677        self.expect_value::<String>(&APPLICATION_NAME).as_str()
678    }
679
680    /// Returns the build info.
681    pub fn build_info(&self) -> &'static BuildInfo {
682        self.mz_version.build_info
683    }
684
685    /// Returns the value of the `client_encoding` configuration parameter.
686    pub fn client_encoding(&self) -> &ClientEncoding {
687        self.expect_value(&CLIENT_ENCODING)
688    }
689
690    /// Returns the value of the `client_min_messages` configuration parameter.
691    pub fn client_min_messages(&self) -> &ClientSeverity {
692        self.expect_value(&CLIENT_MIN_MESSAGES)
693    }
694
695    /// Returns the value of the `cluster` configuration parameter.
696    pub fn cluster(&self) -> &str {
697        self.expect_value::<String>(&CLUSTER).as_str()
698    }
699
700    /// Returns the value of the `cluster_replica` configuration parameter.
701    pub fn cluster_replica(&self) -> Option<&str> {
702        self.expect_value::<Option<String>>(&CLUSTER_REPLICA)
703            .as_deref()
704    }
705
706    /// Returns the value of the `current_object_missing_warnings` configuration
707    /// parameter.
708    pub fn current_object_missing_warnings(&self) -> bool {
709        *self.expect_value::<bool>(&CURRENT_OBJECT_MISSING_WARNINGS)
710    }
711
712    /// Returns the value of the `DateStyle` configuration parameter.
713    pub fn date_style(&self) -> &[&str] {
714        &self.expect_value::<DateStyle>(&DATE_STYLE).0
715    }
716
717    /// Returns the value of the `database` configuration parameter.
718    pub fn database(&self) -> &str {
719        self.expect_value::<String>(&DATABASE).as_str()
720    }
721
722    /// Returns the value of the `extra_float_digits` configuration parameter.
723    pub fn extra_float_digits(&self) -> i32 {
724        *self.expect_value(&EXTRA_FLOAT_DIGITS)
725    }
726
727    /// Returns the value of the `integer_datetimes` configuration parameter.
728    pub fn integer_datetimes(&self) -> bool {
729        *self.expect_value(&INTEGER_DATETIMES)
730    }
731
732    /// Returns the value of the `intervalstyle` configuration parameter.
733    pub fn intervalstyle(&self) -> &IntervalStyle {
734        self.expect_value(&INTERVAL_STYLE)
735    }
736
737    /// Returns the value of the `mz_version` configuration parameter.
738    pub fn mz_version(&self) -> String {
739        self.mz_version.value()
740    }
741
742    /// Returns the value of the `search_path` configuration parameter.
743    pub fn search_path(&self) -> &[Ident] {
744        self.expect_value::<Vec<Ident>>(&SEARCH_PATH).as_slice()
745    }
746
747    /// Returns the value of the `server_version` configuration parameter.
748    pub fn server_version(&self) -> &str {
749        self.expect_value::<String>(&SERVER_VERSION).as_str()
750    }
751
752    /// Returns the value of the `server_version_num` configuration parameter.
753    pub fn server_version_num(&self) -> i32 {
754        *self.expect_value(&SERVER_VERSION_NUM)
755    }
756
757    /// Returns the value of the `sql_safe_updates` configuration parameter.
758    pub fn sql_safe_updates(&self) -> bool {
759        *self.expect_value(&SQL_SAFE_UPDATES)
760    }
761
762    /// Returns the value of the `standard_conforming_strings` configuration
763    /// parameter.
764    pub fn standard_conforming_strings(&self) -> bool {
765        *self.expect_value(&STANDARD_CONFORMING_STRINGS)
766    }
767
768    /// Returns the value of the `statement_timeout` configuration parameter.
769    pub fn statement_timeout(&self) -> &Duration {
770        self.expect_value(&STATEMENT_TIMEOUT)
771    }
772
773    /// Returns the value of the `idle_in_transaction_session_timeout` configuration parameter.
774    pub fn idle_in_transaction_session_timeout(&self) -> &Duration {
775        self.expect_value(&IDLE_IN_TRANSACTION_SESSION_TIMEOUT)
776    }
777
778    /// Returns the value of the `timezone` configuration parameter.
779    pub fn timezone(&self) -> &TimeZone {
780        self.expect_value(&TIMEZONE)
781    }
782
783    /// Returns the value of the `transaction_isolation` configuration
784    /// parameter.
785    pub fn transaction_isolation(&self) -> &IsolationLevel {
786        self.expect_value(&TRANSACTION_ISOLATION)
787    }
788
789    /// Returns the value of `real_time_recency` configuration parameter.
790    pub fn real_time_recency(&self) -> bool {
791        *self.expect_value(&REAL_TIME_RECENCY)
792    }
793
794    /// Returns the value of the `real_time_recency_timeout` configuration parameter.
795    pub fn real_time_recency_timeout(&self) -> &Duration {
796        self.expect_value(&REAL_TIME_RECENCY_TIMEOUT)
797    }
798
799    /// Returns the value of `emit_plan_insights_notice` configuration parameter.
800    pub fn emit_plan_insights_notice(&self) -> bool {
801        *self.expect_value(&EMIT_PLAN_INSIGHTS_NOTICE)
802    }
803
804    /// Returns the value of `emit_timestamp_notice` configuration parameter.
805    pub fn emit_timestamp_notice(&self) -> bool {
806        *self.expect_value(&EMIT_TIMESTAMP_NOTICE)
807    }
808
809    /// Returns the value of `emit_trace_id_notice` configuration parameter.
810    pub fn emit_trace_id_notice(&self) -> bool {
811        *self.expect_value(&EMIT_TRACE_ID_NOTICE)
812    }
813
814    /// Returns the value of `auto_route_catalog_queries` configuration parameter.
815    pub fn auto_route_catalog_queries(&self) -> bool {
816        *self.expect_value(&AUTO_ROUTE_CATALOG_QUERIES)
817    }
818
819    /// Returns the value of `enable_session_rbac_checks` configuration parameter.
820    pub fn enable_session_rbac_checks(&self) -> bool {
821        *self.expect_value(&ENABLE_SESSION_RBAC_CHECKS)
822    }
823
824    /// Returns the value of `enable_session_cardinality_estimates` configuration parameter.
825    pub fn enable_session_cardinality_estimates(&self) -> bool {
826        *self.expect_value(&ENABLE_SESSION_CARDINALITY_ESTIMATES)
827    }
828
829    /// Returns the value of `is_superuser` configuration parameter.
830    pub fn is_superuser(&self) -> bool {
831        self.user.is_superuser()
832    }
833
834    /// Returns the user associated with this `SessionVars` instance.
835    pub fn user(&self) -> &User {
836        &self.user
837    }
838
839    /// Returns the value of the `max_query_result_size` configuration parameter.
840    pub fn max_query_result_size(&self) -> u64 {
841        self.expect_value::<ByteSize>(&MAX_QUERY_RESULT_SIZE)
842            .as_bytes()
843    }
844
845    /// Sets the external metadata associated with the user.
846    pub fn set_external_user_metadata(&mut self, metadata: ExternalUserMetadata) {
847        self.user.external_metadata = Some(metadata);
848    }
849
850    pub fn set_cluster(&mut self, cluster: String) {
851        let var = self
852            .vars
853            .get_mut(UncasedStr::new(CLUSTER.name()))
854            .expect("cluster variable must exist");
855        var.set(VarInput::Flat(&cluster), false)
856            .expect("setting cluster must succeed");
857    }
858
859    pub fn set_local_transaction_isolation(&mut self, transaction_isolation: IsolationLevel) {
860        let var = self
861            .vars
862            .get_mut(UncasedStr::new(TRANSACTION_ISOLATION.name()))
863            .expect("transaction_isolation variable must exist");
864        var.set(VarInput::Flat(transaction_isolation.as_str()), true)
865            .expect("setting transaction isolation must succeed");
866    }
867
868    pub fn get_statement_logging_sample_rate(&self) -> Numeric {
869        *self.expect_value(&STATEMENT_LOGGING_SAMPLE_RATE)
870    }
871
872    /// Returns the value of the `emit_introspection_query_notice` configuration parameter.
873    pub fn emit_introspection_query_notice(&self) -> bool {
874        *self.expect_value(&EMIT_INTROSPECTION_QUERY_NOTICE)
875    }
876
877    pub fn unsafe_new_transaction_wall_time(&self) -> Option<CheckedTimestamp<DateTime<Utc>>> {
878        *self.expect_value(&UNSAFE_NEW_TRANSACTION_WALL_TIME)
879    }
880
881    /// Returns the value of the `welcome_message` configuration parameter.
882    pub fn welcome_message(&self) -> bool {
883        *self.expect_value(&WELCOME_MESSAGE)
884    }
885}
886
887// TODO(database-issues#8069) remove together with `compat_translate`
888pub const OLD_CATALOG_SERVER_CLUSTER: &str = "mz_introspection";
889pub const OLD_AUTO_ROUTE_CATALOG_QUERIES: &str = "auto_route_introspection_queries";
890
891/// If the given variable name and/or input is deprecated, return a corresponding updated value,
892/// otherwise return the original.
893///
894/// This method was introduced to gracefully handle the rename of the `mz_introspection` cluster to
895/// `mz_cluster_server`. The plan is to remove it once all users have migrated to the new name. The
896/// debug logs will be helpful for checking this in production.
897// TODO(database-issues#8069) remove this after sufficient time has passed
898fn compat_translate<'a, 'b>(name: &'a str, input: VarInput<'b>) -> (&'a str, VarInput<'b>) {
899    if name == CLUSTER.name() {
900        if let Ok(value) = CLUSTER.parse(input) {
901            if value.format() == OLD_CATALOG_SERVER_CLUSTER {
902                tracing::debug!(
903                    github_27285 = true,
904                    "encountered deprecated `cluster` variable value: {}",
905                    OLD_CATALOG_SERVER_CLUSTER,
906                );
907                return (name, VarInput::Flat("mz_catalog_server"));
908            }
909        }
910    }
911
912    if name == OLD_AUTO_ROUTE_CATALOG_QUERIES {
913        tracing::debug!(
914            github_27285 = true,
915            "encountered deprecated `{}` variable name",
916            OLD_AUTO_ROUTE_CATALOG_QUERIES,
917        );
918        return (AUTO_ROUTE_CATALOG_QUERIES.name(), input);
919    }
920
921    (name, input)
922}
923
924fn compat_translate_name(name: &str) -> &str {
925    let (name, _) = compat_translate(name, VarInput::Flat(""));
926    name
927}
928
929/// A `SystemVar` is persisted on disk value for a configuration parameter. If unset,
930/// the server default is used instead.
931#[derive(Debug)]
932pub struct SystemVar {
933    definition: VarDefinition,
934    /// Value currently persisted to disk.
935    persisted_value: Option<Box<dyn Value>>,
936    /// Current default, not persisted to disk.
937    dynamic_default: Option<Box<dyn Value>>,
938}
939
940impl Clone for SystemVar {
941    fn clone(&self) -> Self {
942        SystemVar {
943            definition: self.definition.clone(),
944            persisted_value: self.persisted_value.as_ref().map(|v| v.box_clone()),
945            dynamic_default: self.dynamic_default.as_ref().map(|v| v.box_clone()),
946        }
947    }
948}
949
950impl SystemVar {
951    pub fn new(definition: VarDefinition) -> Self {
952        SystemVar {
953            definition,
954            persisted_value: None,
955            dynamic_default: None,
956        }
957    }
958
959    fn is_default(&self, input: VarInput) -> Result<bool, VarError> {
960        let v = self.definition.parse(input)?;
961        Ok(self.definition.default_value() == v.as_ref())
962    }
963
964    pub fn value_dyn(&self) -> &dyn Value {
965        self.persisted_value
966            .as_deref()
967            .or(self.dynamic_default.as_deref())
968            .unwrap_or_else(|| self.definition.default_value())
969    }
970
971    pub fn value<V: 'static>(&self) -> &V {
972        let val = self.value_dyn();
973        val.as_any().downcast_ref::<V>().expect("success")
974    }
975
976    fn parse(&self, input: VarInput) -> Result<Box<dyn Value>, VarError> {
977        let v = self.definition.parse(input)?;
978        // Validate our parsed value.
979        self.validate_constraints(v.as_ref())?;
980        Ok(v)
981    }
982
983    fn set(&mut self, input: VarInput) -> Result<bool, VarError> {
984        let v = self.parse(input)?;
985
986        if self.persisted_value.as_ref() != Some(&v) {
987            self.persisted_value = Some(v);
988            Ok(true)
989        } else {
990            Ok(false)
991        }
992    }
993
994    fn reset(&mut self) -> bool {
995        if self.persisted_value.is_some() {
996            self.persisted_value = None;
997            true
998        } else {
999            false
1000        }
1001    }
1002
1003    fn set_default(&mut self, input: VarInput) -> Result<(), VarError> {
1004        let v = self.definition.parse(input)?;
1005        self.dynamic_default = Some(v);
1006        Ok(())
1007    }
1008
1009    fn validate_constraints(&self, val: &dyn Value) -> Result<(), VarError> {
1010        if let Some(constraint) = &self.definition.constraint {
1011            constraint.check_constraint(self, self.value_dyn(), val)
1012        } else {
1013            Ok(())
1014        }
1015    }
1016}
1017
1018impl Var for SystemVar {
1019    fn name(&self) -> &'static str {
1020        self.definition.name.as_str()
1021    }
1022
1023    fn value(&self) -> String {
1024        self.value_dyn().format()
1025    }
1026
1027    fn description(&self) -> &'static str {
1028        self.definition.description
1029    }
1030
1031    fn type_name(&self) -> Cow<'static, str> {
1032        self.definition.type_name()
1033    }
1034
1035    fn visible(&self, user: &User, system_vars: &SystemVars) -> Result<(), VarError> {
1036        self.definition.visible(user, system_vars)
1037    }
1038}
1039
1040#[derive(Debug, Error)]
1041pub enum NetworkPolicyError {
1042    #[error("Access denied for address {0}")]
1043    AddressDenied(IpAddr),
1044}
1045
1046/// On disk variables.
1047///
1048/// See the [`crate::session::vars`] module documentation for more details on the
1049/// Materialize configuration model.
1050#[derive(Derivative, Clone)]
1051#[derivative(Debug)]
1052pub struct SystemVars {
1053    /// Allows "unsafe" parameters to be set.
1054    allow_unsafe: bool,
1055    /// Set of all [`SystemVar`]s.
1056    vars: BTreeMap<&'static UncasedStr, SystemVar>,
1057    /// External components interested in when a [`SystemVar`] gets updated.
1058    #[derivative(Debug = "ignore")]
1059    callbacks: BTreeMap<String, Vec<Arc<dyn Fn(&SystemVars) + Send + Sync>>>,
1060
1061    /// NB: This is intentionally disconnected from the one that is plumbed around to persist and
1062    /// the controllers. This is so we can explicitly control and reason about when changes to config
1063    /// values are propagated to the rest of the system.
1064    dyncfgs: ConfigSet,
1065}
1066
1067impl Default for SystemVars {
1068    fn default() -> Self {
1069        Self::new()
1070    }
1071}
1072
1073impl SystemVars {
1074    pub fn new() -> Self {
1075        let system_vars = vec![
1076            &MAX_KAFKA_CONNECTIONS,
1077            &MAX_POSTGRES_CONNECTIONS,
1078            &MAX_MYSQL_CONNECTIONS,
1079            &MAX_SQL_SERVER_CONNECTIONS,
1080            &MAX_AWS_PRIVATELINK_CONNECTIONS,
1081            &MAX_TABLES,
1082            &MAX_SOURCES,
1083            &MAX_SINKS,
1084            &MAX_MATERIALIZED_VIEWS,
1085            &MAX_CLUSTERS,
1086            &MAX_REPLICAS_PER_CLUSTER,
1087            &MAX_CREDIT_CONSUMPTION_RATE,
1088            &MAX_DATABASES,
1089            &MAX_SCHEMAS_PER_DATABASE,
1090            &MAX_OBJECTS_PER_SCHEMA,
1091            &MAX_SECRETS,
1092            &MAX_ROLES,
1093            &MAX_CONTINUAL_TASKS,
1094            &MAX_NETWORK_POLICIES,
1095            &MAX_RULES_PER_NETWORK_POLICY,
1096            &MAX_RESULT_SIZE,
1097            &MAX_COPY_FROM_SIZE,
1098            &ALLOWED_CLUSTER_REPLICA_SIZES,
1099            &DISK_CLUSTER_REPLICAS_DEFAULT,
1100            &upsert_rocksdb::UPSERT_ROCKSDB_AUTO_SPILL_TO_DISK,
1101            &upsert_rocksdb::UPSERT_ROCKSDB_AUTO_SPILL_THRESHOLD_BYTES,
1102            &upsert_rocksdb::UPSERT_ROCKSDB_COMPACTION_STYLE,
1103            &upsert_rocksdb::UPSERT_ROCKSDB_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET,
1104            &upsert_rocksdb::UPSERT_ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES,
1105            &upsert_rocksdb::UPSERT_ROCKSDB_UNIVERSAL_COMPACTION_RATIO,
1106            &upsert_rocksdb::UPSERT_ROCKSDB_PARALLELISM,
1107            &upsert_rocksdb::UPSERT_ROCKSDB_COMPRESSION_TYPE,
1108            &upsert_rocksdb::UPSERT_ROCKSDB_BOTTOMMOST_COMPRESSION_TYPE,
1109            &upsert_rocksdb::UPSERT_ROCKSDB_BATCH_SIZE,
1110            &upsert_rocksdb::UPSERT_ROCKSDB_RETRY_DURATION,
1111            &upsert_rocksdb::UPSERT_ROCKSDB_STATS_LOG_INTERVAL_SECONDS,
1112            &upsert_rocksdb::UPSERT_ROCKSDB_STATS_PERSIST_INTERVAL_SECONDS,
1113            &upsert_rocksdb::UPSERT_ROCKSDB_POINT_LOOKUP_BLOCK_CACHE_SIZE_MB,
1114            &upsert_rocksdb::UPSERT_ROCKSDB_SHRINK_ALLOCATED_BUFFERS_BY_RATIO,
1115            &upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_CLUSTER_MEMORY_FRACTION,
1116            &upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_MEMORY_BYTES,
1117            &upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_ALLOW_STALL,
1118            &STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES,
1119            &STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_TO_CLUSTER_SIZE_FRACTION,
1120            &STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_DISK_ONLY,
1121            &STORAGE_STATISTICS_INTERVAL,
1122            &STORAGE_STATISTICS_COLLECTION_INTERVAL,
1123            &STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO,
1124            &STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS,
1125            &PERSIST_FAST_PATH_LIMIT,
1126            &METRICS_RETENTION,
1127            &UNSAFE_MOCK_AUDIT_EVENT_TIMESTAMP,
1128            &ENABLE_RBAC_CHECKS,
1129            &PG_SOURCE_CONNECT_TIMEOUT,
1130            &PG_SOURCE_TCP_KEEPALIVES_IDLE,
1131            &PG_SOURCE_TCP_KEEPALIVES_INTERVAL,
1132            &PG_SOURCE_TCP_KEEPALIVES_RETRIES,
1133            &PG_SOURCE_TCP_USER_TIMEOUT,
1134            &PG_SOURCE_TCP_CONFIGURE_SERVER,
1135            &PG_SOURCE_SNAPSHOT_STATEMENT_TIMEOUT,
1136            &PG_SOURCE_WAL_SENDER_TIMEOUT,
1137            &PG_SOURCE_SNAPSHOT_COLLECT_STRICT_COUNT,
1138            &PG_SOURCE_SNAPSHOT_FALLBACK_TO_STRICT_COUNT,
1139            &PG_SOURCE_SNAPSHOT_WAIT_FOR_COUNT,
1140            &MYSQL_SOURCE_TCP_KEEPALIVE,
1141            &MYSQL_SOURCE_SNAPSHOT_MAX_EXECUTION_TIME,
1142            &MYSQL_SOURCE_SNAPSHOT_LOCK_WAIT_TIMEOUT,
1143            &MYSQL_SOURCE_CONNECT_TIMEOUT,
1144            &SSH_CHECK_INTERVAL,
1145            &SSH_CONNECT_TIMEOUT,
1146            &SSH_KEEPALIVES_IDLE,
1147            &KAFKA_SOCKET_KEEPALIVE,
1148            &KAFKA_SOCKET_TIMEOUT,
1149            &KAFKA_TRANSACTION_TIMEOUT,
1150            &KAFKA_SOCKET_CONNECTION_SETUP_TIMEOUT,
1151            &KAFKA_FETCH_METADATA_TIMEOUT,
1152            &KAFKA_PROGRESS_RECORD_FETCH_TIMEOUT,
1153            &ENABLE_LAUNCHDARKLY,
1154            &MAX_CONNECTIONS,
1155            &NETWORK_POLICY,
1156            &SUPERUSER_RESERVED_CONNECTIONS,
1157            &KEEP_N_SOURCE_STATUS_HISTORY_ENTRIES,
1158            &KEEP_N_SINK_STATUS_HISTORY_ENTRIES,
1159            &KEEP_N_PRIVATELINK_STATUS_HISTORY_ENTRIES,
1160            &REPLICA_STATUS_HISTORY_RETENTION_WINDOW,
1161            &ARRANGEMENT_EXERT_PROPORTIONALITY,
1162            &ENABLE_STORAGE_SHARD_FINALIZATION,
1163            &ENABLE_CONSOLIDATE_AFTER_UNION_NEGATE,
1164            &ENABLE_DEFAULT_CONNECTION_VALIDATION,
1165            &ENABLE_REDUCE_REDUCTION,
1166            &MIN_TIMESTAMP_INTERVAL,
1167            &MAX_TIMESTAMP_INTERVAL,
1168            &LOGGING_FILTER,
1169            &OPENTELEMETRY_FILTER,
1170            &LOGGING_FILTER_DEFAULTS,
1171            &OPENTELEMETRY_FILTER_DEFAULTS,
1172            &SENTRY_FILTERS,
1173            &WEBHOOKS_SECRETS_CACHING_TTL_SECS,
1174            &COORD_SLOW_MESSAGE_WARN_THRESHOLD,
1175            &grpc_client::CONNECT_TIMEOUT,
1176            &grpc_client::HTTP2_KEEP_ALIVE_INTERVAL,
1177            &grpc_client::HTTP2_KEEP_ALIVE_TIMEOUT,
1178            &cluster_scheduling::CLUSTER_MULTI_PROCESS_REPLICA_AZ_AFFINITY_WEIGHT,
1179            &cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY,
1180            &cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT,
1181            &cluster_scheduling::CLUSTER_ENABLE_TOPOLOGY_SPREAD,
1182            &cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE,
1183            &cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MAX_SKEW,
1184            &cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_SOFT,
1185            &cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY,
1186            &cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY_WEIGHT,
1187            &cluster_scheduling::CLUSTER_ALWAYS_USE_DISK,
1188            &cluster_scheduling::CLUSTER_ALTER_CHECK_READY_INTERVAL,
1189            &cluster_scheduling::CLUSTER_CHECK_SCHEDULING_POLICIES_INTERVAL,
1190            &cluster_scheduling::CLUSTER_SECURITY_CONTEXT_ENABLED,
1191            &cluster_scheduling::CLUSTER_REFRESH_MV_COMPACTION_ESTIMATE,
1192            &grpc_client::HTTP2_KEEP_ALIVE_TIMEOUT,
1193            &STATEMENT_LOGGING_MAX_SAMPLE_RATE,
1194            &STATEMENT_LOGGING_DEFAULT_SAMPLE_RATE,
1195            &STATEMENT_LOGGING_TARGET_DATA_RATE,
1196            &STATEMENT_LOGGING_MAX_DATA_CREDIT,
1197            &ENABLE_INTERNAL_STATEMENT_LOGGING,
1198            &OPTIMIZER_STATS_TIMEOUT,
1199            &OPTIMIZER_ONESHOT_STATS_TIMEOUT,
1200            &PRIVATELINK_STATUS_UPDATE_QUOTA_PER_MINUTE,
1201            &WEBHOOK_CONCURRENT_REQUEST_LIMIT,
1202            &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_SIZE,
1203            &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_WAIT,
1204            &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL,
1205            &PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL_STAGGER,
1206            &USER_STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION,
1207            &FORCE_SOURCE_TABLE_SYNTAX,
1208            &OPTIMIZER_E2E_LATENCY_WARNING_THRESHOLD,
1209        ];
1210
1211        let dyncfgs = mz_dyncfgs::all_dyncfgs();
1212        let dyncfg_vars: Vec<_> = dyncfgs
1213            .entries()
1214            .map(|cfg| match cfg.default() {
1215                ConfigVal::Bool(default) => {
1216                    VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
1217                }
1218                ConfigVal::U32(default) => {
1219                    VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
1220                }
1221                ConfigVal::Usize(default) => {
1222                    VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
1223                }
1224                ConfigVal::OptUsize(default) => {
1225                    VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
1226                }
1227                ConfigVal::F64(default) => {
1228                    VarDefinition::new_runtime(cfg.name(), *default, cfg.desc(), false)
1229                }
1230                ConfigVal::String(default) => {
1231                    VarDefinition::new_runtime(cfg.name(), default.clone(), cfg.desc(), false)
1232                }
1233                ConfigVal::Duration(default) => {
1234                    VarDefinition::new_runtime(cfg.name(), default.clone(), cfg.desc(), false)
1235                }
1236                ConfigVal::Json(default) => {
1237                    VarDefinition::new_runtime(cfg.name(), default.clone(), cfg.desc(), false)
1238                }
1239            })
1240            .collect();
1241
1242        let vars: BTreeMap<_, _> = system_vars
1243            .into_iter()
1244            // Include all of our feature flags.
1245            .chain(definitions::FEATURE_FLAGS.iter().copied())
1246            // Include the subset of Session variables we allow system defaults for.
1247            .chain(SESSION_SYSTEM_VARS.values().copied())
1248            .cloned()
1249            // Include Persist configs.
1250            .chain(dyncfg_vars)
1251            .map(|var| (var.name, SystemVar::new(var)))
1252            .collect();
1253
1254        let vars = SystemVars {
1255            vars,
1256            callbacks: BTreeMap::new(),
1257            allow_unsafe: false,
1258            dyncfgs,
1259        };
1260
1261        vars
1262    }
1263
1264    pub fn dyncfgs(&self) -> &ConfigSet {
1265        &self.dyncfgs
1266    }
1267
1268    pub fn set_unsafe(mut self, allow_unsafe: bool) -> Self {
1269        self.allow_unsafe = allow_unsafe;
1270        self
1271    }
1272
1273    pub fn allow_unsafe(&self) -> bool {
1274        self.allow_unsafe
1275    }
1276
1277    fn expect_value<V: 'static>(&self, var: &VarDefinition) -> &V {
1278        let val = self
1279            .vars
1280            .get(var.name)
1281            .expect("provided var should be in state");
1282
1283        val.value_dyn()
1284            .as_any()
1285            .downcast_ref::<V>()
1286            .expect("provided var type should matched stored var")
1287    }
1288
1289    fn expect_config_value<V: ConfigType + 'static>(&self, name: &UncasedStr) -> &V {
1290        let val = self
1291            .vars
1292            .get(name)
1293            .unwrap_or_else(|| panic!("provided var {name} should be in state"));
1294
1295        val.value_dyn()
1296            .as_any()
1297            .downcast_ref()
1298            .expect("provided var type should matched stored var")
1299    }
1300
1301    /// Reset all the values to their defaults (preserving
1302    /// defaults from `VarMut::set_default).
1303    pub fn reset_all(&mut self) {
1304        for (_, var) in &mut self.vars {
1305            var.reset();
1306        }
1307    }
1308
1309    /// Returns an iterator over the configuration parameters and their current
1310    /// values on disk.
1311    pub fn iter(&self) -> impl Iterator<Item = &dyn Var> {
1312        self.vars
1313            .values()
1314            .map(|v| v.as_var())
1315            .filter(|v| !SESSION_SYSTEM_VARS.contains_key(UncasedStr::new(v.name())))
1316    }
1317
1318    /// Returns an iterator over the configuration parameters and their current
1319    /// values on disk. Compared to [`SystemVars::iter`], this should omit vars
1320    /// that shouldn't be synced by SystemParameterFrontend.
1321    pub fn iter_synced(&self) -> impl Iterator<Item = &dyn Var> {
1322        self.iter().filter(|v| v.name() != ENABLE_LAUNCHDARKLY.name)
1323    }
1324
1325    /// Returns an iterator over the configuration parameters that can be overriden per-Session.
1326    pub fn iter_session(&self) -> impl Iterator<Item = &dyn Var> {
1327        self.vars
1328            .values()
1329            .map(|v| v.as_var())
1330            .filter(|v| SESSION_SYSTEM_VARS.contains_key(UncasedStr::new(v.name())))
1331    }
1332
1333    /// Returns whether or not this parameter can be modified by a superuser.
1334    pub fn user_modifiable(&self, name: &str) -> bool {
1335        SESSION_SYSTEM_VARS.contains_key(UncasedStr::new(name))
1336            || name == ENABLE_RBAC_CHECKS.name()
1337            || name == NETWORK_POLICY.name()
1338    }
1339
1340    /// Returns a [`Var`] representing the configuration parameter with the
1341    /// specified name.
1342    ///
1343    /// Configuration parameters are matched case insensitively. If no such
1344    /// configuration parameter exists, `get` returns an error.
1345    ///
1346    /// Note that:
1347    /// - If `name` is known at compile time, you should instead use the named
1348    /// accessor to access the variable with its true Rust type. For example,
1349    /// `self.get("max_tables").value()` returns the string `"25"` or the
1350    /// current value, while `self.max_tables()` returns an i32.
1351    ///
1352    /// - This function does not check that the access variable should be
1353    /// visible because of other settings or users. Before or after accessing
1354    /// this method, you should call `Var::visible`.
1355    ///
1356    /// # Errors
1357    ///
1358    /// The call will return an error:
1359    /// 1. If `name` does not refer to a valid [`SystemVars`] field.
1360    pub fn get(&self, name: &str) -> Result<&dyn Var, VarError> {
1361        self.vars
1362            .get(UncasedStr::new(name))
1363            .map(|v| v.as_var())
1364            .ok_or_else(|| VarError::UnknownParameter(name.into()))
1365    }
1366
1367    /// Check if the given `values` is the default value for the [`Var`]
1368    /// identified by `name`.
1369    ///
1370    /// Note that this function does not check that the access variable should
1371    /// be visible because of other settings or users. Before or after accessing
1372    /// this method, you should call `Var::visible`.
1373    ///
1374    /// # Errors
1375    ///
1376    /// The call will return an error:
1377    /// 1. If `name` does not refer to a valid [`SystemVars`] field.
1378    /// 2. If `values` does not represent a valid [`SystemVars`] value for
1379    ///    `name`.
1380    pub fn is_default(&self, name: &str, input: VarInput) -> Result<bool, VarError> {
1381        self.vars
1382            .get(UncasedStr::new(name))
1383            .ok_or_else(|| VarError::UnknownParameter(name.into()))
1384            .and_then(|v| v.is_default(input))
1385    }
1386
1387    /// Sets the configuration parameter named `name` to the value represented
1388    /// by `input`.
1389    ///
1390    /// Like with [`SystemVars::get`], configuration parameters are matched case
1391    /// insensitively. If `input` is not valid, as determined by the underlying
1392    /// configuration parameter, or if the named configuration parameter does
1393    /// not exist, an error is returned.
1394    ///
1395    /// Return a `bool` value indicating whether the [`Var`] identified by
1396    /// `name` was modified by this call (it won't be if it already had the
1397    /// given `input`).
1398    ///
1399    /// Note that this function does not check that the access variable should
1400    /// be visible because of other settings or users. Before or after accessing
1401    /// this method, you should call `Var::visible`.
1402    ///
1403    /// # Errors
1404    ///
1405    /// The call will return an error:
1406    /// 1. If `name` does not refer to a valid [`SystemVars`] field.
1407    /// 2. If `input` does not represent a valid [`SystemVars`] value for
1408    ///    `name`.
1409    pub fn set(&mut self, name: &str, input: VarInput) -> Result<bool, VarError> {
1410        let result = self
1411            .vars
1412            .get_mut(UncasedStr::new(name))
1413            .ok_or_else(|| VarError::UnknownParameter(name.into()))
1414            .and_then(|v| v.set(input))?;
1415        self.notify_callbacks(name);
1416        Ok(result)
1417    }
1418
1419    /// Parses the configuration parameter value represented by `input` named
1420    /// `name`.
1421    ///
1422    /// Like with [`SystemVars::get`], configuration parameters are matched case
1423    /// insensitively. If `input` is not valid, as determined by the underlying
1424    /// configuration parameter, or if the named configuration parameter does
1425    /// not exist, an error is returned.
1426    ///
1427    /// Return a `Box<dyn Value>` that is the result of parsing `input`.
1428    ///
1429    /// Note that this function does not check that the access variable should
1430    /// be visible because of other settings or users. Before or after accessing
1431    /// this method, you should call `Var::visible`.
1432    ///
1433    /// # Errors
1434    ///
1435    /// The call will return an error:
1436    /// 1. If `name` does not refer to a valid [`SystemVars`] field.
1437    /// 2. If `input` does not represent a valid [`SystemVars`] value for
1438    ///    `name`.
1439    pub fn parse(&self, name: &str, input: VarInput) -> Result<Box<dyn Value>, VarError> {
1440        self.vars
1441            .get(UncasedStr::new(name))
1442            .ok_or_else(|| VarError::UnknownParameter(name.into()))
1443            .and_then(|v| v.parse(input))
1444    }
1445
1446    /// Set the default for this variable. This is the value this
1447    /// variable will be be `reset` to. If no default is set, the static default in the
1448    /// variable definition is used instead.
1449    ///
1450    /// Note that this function does not check that the access variable should
1451    /// be visible because of other settings or users. Before or after accessing
1452    /// this method, you should call `Var::visible`.
1453    pub fn set_default(&mut self, name: &str, input: VarInput) -> Result<(), VarError> {
1454        let result = self
1455            .vars
1456            .get_mut(UncasedStr::new(name))
1457            .ok_or_else(|| VarError::UnknownParameter(name.into()))
1458            .and_then(|v| v.set_default(input))?;
1459        self.notify_callbacks(name);
1460        Ok(result)
1461    }
1462
1463    /// Sets the configuration parameter named `name` to its default value.
1464    ///
1465    /// Like with [`SystemVars::get`], configuration parameters are matched case
1466    /// insensitively. If the named configuration parameter does not exist, an
1467    /// error is returned.
1468    ///
1469    /// Return a `bool` value indicating whether the [`Var`] identified by
1470    /// `name` was modified by this call (it won't be if was already reset).
1471    ///
1472    /// Note that this function does not check that the access variable should
1473    /// be visible because of other settings or users. Before or after accessing
1474    /// this method, you should call `Var::visible`.
1475    ///
1476    /// # Errors
1477    ///
1478    /// The call will return an error:
1479    /// 1. If `name` does not refer to a valid [`SystemVars`] field.
1480    pub fn reset(&mut self, name: &str) -> Result<bool, VarError> {
1481        let result = self
1482            .vars
1483            .get_mut(UncasedStr::new(name))
1484            .ok_or_else(|| VarError::UnknownParameter(name.into()))
1485            .map(|v| v.reset())?;
1486        self.notify_callbacks(name);
1487        Ok(result)
1488    }
1489
1490    /// Returns a map from each system parameter's name to its default value.
1491    pub fn defaults(&self) -> BTreeMap<String, String> {
1492        self.vars
1493            .iter()
1494            .map(|(name, var)| {
1495                let default = var
1496                    .dynamic_default
1497                    .as_deref()
1498                    .unwrap_or_else(|| var.definition.default_value());
1499                (name.as_str().to_owned(), default.format())
1500            })
1501            .collect()
1502    }
1503
1504    /// Registers a closure that will get called when the value for the
1505    /// specified [`VarDefinition`] changes.
1506    ///
1507    /// The callback is guaranteed to be called at least once.
1508    pub fn register_callback(
1509        &mut self,
1510        var: &VarDefinition,
1511        callback: Arc<dyn Fn(&SystemVars) + Send + Sync>,
1512    ) {
1513        self.callbacks
1514            .entry(var.name().to_string())
1515            .or_default()
1516            .push(callback);
1517        self.notify_callbacks(var.name());
1518    }
1519
1520    /// Notify any external components interested in this variable.
1521    fn notify_callbacks(&self, name: &str) {
1522        // Get the callbacks interested in this variable.
1523        if let Some(callbacks) = self.callbacks.get(name) {
1524            for callback in callbacks {
1525                (callback)(self);
1526            }
1527        }
1528    }
1529
1530    /// Returns the system default for the [`CLUSTER`] session variable. To know the active cluster
1531    /// for the current session, you must check the [`SessionVars`].
1532    pub fn default_cluster(&self) -> String {
1533        self.expect_value::<String>(&CLUSTER).to_owned()
1534    }
1535
1536    /// Returns the value of the `max_kafka_connections` configuration parameter.
1537    pub fn max_kafka_connections(&self) -> u32 {
1538        *self.expect_value(&MAX_KAFKA_CONNECTIONS)
1539    }
1540
1541    /// Returns the value of the `max_postgres_connections` configuration parameter.
1542    pub fn max_postgres_connections(&self) -> u32 {
1543        *self.expect_value(&MAX_POSTGRES_CONNECTIONS)
1544    }
1545
1546    /// Returns the value of the `max_mysql_connections` configuration parameter.
1547    pub fn max_mysql_connections(&self) -> u32 {
1548        *self.expect_value(&MAX_MYSQL_CONNECTIONS)
1549    }
1550
1551    /// Returns the value of the `max_sql_server_connections` configuration parameter.
1552    pub fn max_sql_server_connections(&self) -> u32 {
1553        *self.expect_value(&MAX_SQL_SERVER_CONNECTIONS)
1554    }
1555
1556    /// Returns the value of the `max_aws_privatelink_connections` configuration parameter.
1557    pub fn max_aws_privatelink_connections(&self) -> u32 {
1558        *self.expect_value(&MAX_AWS_PRIVATELINK_CONNECTIONS)
1559    }
1560
1561    /// Returns the value of the `max_tables` configuration parameter.
1562    pub fn max_tables(&self) -> u32 {
1563        *self.expect_value(&MAX_TABLES)
1564    }
1565
1566    /// Returns the value of the `max_sources` configuration parameter.
1567    pub fn max_sources(&self) -> u32 {
1568        *self.expect_value(&MAX_SOURCES)
1569    }
1570
1571    /// Returns the value of the `max_sinks` configuration parameter.
1572    pub fn max_sinks(&self) -> u32 {
1573        *self.expect_value(&MAX_SINKS)
1574    }
1575
1576    /// Returns the value of the `max_materialized_views` configuration parameter.
1577    pub fn max_materialized_views(&self) -> u32 {
1578        *self.expect_value(&MAX_MATERIALIZED_VIEWS)
1579    }
1580
1581    /// Returns the value of the `max_clusters` configuration parameter.
1582    pub fn max_clusters(&self) -> u32 {
1583        *self.expect_value(&MAX_CLUSTERS)
1584    }
1585
1586    /// Returns the value of the `max_replicas_per_cluster` configuration parameter.
1587    pub fn max_replicas_per_cluster(&self) -> u32 {
1588        *self.expect_value(&MAX_REPLICAS_PER_CLUSTER)
1589    }
1590
1591    /// Returns the value of the `max_credit_consumption_rate` configuration parameter.
1592    pub fn max_credit_consumption_rate(&self) -> Numeric {
1593        *self.expect_value(&MAX_CREDIT_CONSUMPTION_RATE)
1594    }
1595
1596    /// Returns the value of the `max_databases` configuration parameter.
1597    pub fn max_databases(&self) -> u32 {
1598        *self.expect_value(&MAX_DATABASES)
1599    }
1600
1601    /// Returns the value of the `max_schemas_per_database` configuration parameter.
1602    pub fn max_schemas_per_database(&self) -> u32 {
1603        *self.expect_value(&MAX_SCHEMAS_PER_DATABASE)
1604    }
1605
1606    /// Returns the value of the `max_objects_per_schema` configuration parameter.
1607    pub fn max_objects_per_schema(&self) -> u32 {
1608        *self.expect_value(&MAX_OBJECTS_PER_SCHEMA)
1609    }
1610
1611    /// Returns the value of the `max_secrets` configuration parameter.
1612    pub fn max_secrets(&self) -> u32 {
1613        *self.expect_value(&MAX_SECRETS)
1614    }
1615
1616    /// Returns the value of the `max_roles` configuration parameter.
1617    pub fn max_roles(&self) -> u32 {
1618        *self.expect_value(&MAX_ROLES)
1619    }
1620
1621    /// Returns the value of the `max_continual_tasks` configuration parameter.
1622    pub fn max_continual_tasks(&self) -> u32 {
1623        *self.expect_value(&MAX_CONTINUAL_TASKS)
1624    }
1625
1626    /// Returns the value of the `max_network_policies` configuration parameter.
1627    pub fn max_network_policies(&self) -> u32 {
1628        *self.expect_value(&MAX_NETWORK_POLICIES)
1629    }
1630
1631    /// Returns the value of the `max_network_policies` configuration parameter.
1632    pub fn max_rules_per_network_policy(&self) -> u32 {
1633        *self.expect_value(&MAX_RULES_PER_NETWORK_POLICY)
1634    }
1635
1636    /// Returns the value of the `max_result_size` configuration parameter.
1637    pub fn max_result_size(&self) -> u64 {
1638        self.expect_value::<ByteSize>(&MAX_RESULT_SIZE).as_bytes()
1639    }
1640
1641    /// Returns the value of the `max_copy_from_size` configuration parameter.
1642    pub fn max_copy_from_size(&self) -> u32 {
1643        *self.expect_value(&MAX_COPY_FROM_SIZE)
1644    }
1645
1646    /// Returns the value of the `allowed_cluster_replica_sizes` configuration parameter.
1647    pub fn allowed_cluster_replica_sizes(&self) -> Vec<String> {
1648        self.expect_value::<Vec<Ident>>(&ALLOWED_CLUSTER_REPLICA_SIZES)
1649            .into_iter()
1650            .map(|s| s.as_str().into())
1651            .collect()
1652    }
1653
1654    /// Returns the value of the `default_cluster_replication_factor` configuration parameter.
1655    pub fn default_cluster_replication_factor(&self) -> u32 {
1656        *self.expect_value::<u32>(&DEFAULT_CLUSTER_REPLICATION_FACTOR)
1657    }
1658
1659    /// Returns the `disk_cluster_replicas_default` configuration parameter.
1660    pub fn disk_cluster_replicas_default(&self) -> bool {
1661        *self.expect_value(&DISK_CLUSTER_REPLICAS_DEFAULT)
1662    }
1663
1664    pub fn upsert_rocksdb_auto_spill_to_disk(&self) -> bool {
1665        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_AUTO_SPILL_TO_DISK)
1666    }
1667
1668    pub fn upsert_rocksdb_auto_spill_threshold_bytes(&self) -> usize {
1669        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_AUTO_SPILL_THRESHOLD_BYTES)
1670    }
1671
1672    pub fn upsert_rocksdb_compaction_style(&self) -> mz_rocksdb_types::config::CompactionStyle {
1673        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_COMPACTION_STYLE)
1674    }
1675
1676    pub fn upsert_rocksdb_optimize_compaction_memtable_budget(&self) -> usize {
1677        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET)
1678    }
1679
1680    pub fn upsert_rocksdb_level_compaction_dynamic_level_bytes(&self) -> bool {
1681        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES)
1682    }
1683
1684    pub fn upsert_rocksdb_universal_compaction_ratio(&self) -> i32 {
1685        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_UNIVERSAL_COMPACTION_RATIO)
1686    }
1687
1688    pub fn upsert_rocksdb_parallelism(&self) -> Option<i32> {
1689        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_PARALLELISM)
1690    }
1691
1692    pub fn upsert_rocksdb_compression_type(&self) -> mz_rocksdb_types::config::CompressionType {
1693        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_COMPRESSION_TYPE)
1694    }
1695
1696    pub fn upsert_rocksdb_bottommost_compression_type(
1697        &self,
1698    ) -> mz_rocksdb_types::config::CompressionType {
1699        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_BOTTOMMOST_COMPRESSION_TYPE)
1700    }
1701
1702    pub fn upsert_rocksdb_batch_size(&self) -> usize {
1703        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_BATCH_SIZE)
1704    }
1705
1706    pub fn upsert_rocksdb_retry_duration(&self) -> Duration {
1707        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_RETRY_DURATION)
1708    }
1709
1710    pub fn upsert_rocksdb_stats_log_interval_seconds(&self) -> u32 {
1711        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_STATS_LOG_INTERVAL_SECONDS)
1712    }
1713
1714    pub fn upsert_rocksdb_stats_persist_interval_seconds(&self) -> u32 {
1715        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_STATS_PERSIST_INTERVAL_SECONDS)
1716    }
1717
1718    pub fn upsert_rocksdb_point_lookup_block_cache_size_mb(&self) -> Option<u32> {
1719        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_POINT_LOOKUP_BLOCK_CACHE_SIZE_MB)
1720    }
1721
1722    pub fn upsert_rocksdb_shrink_allocated_buffers_by_ratio(&self) -> usize {
1723        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_SHRINK_ALLOCATED_BUFFERS_BY_RATIO)
1724    }
1725
1726    pub fn upsert_rocksdb_write_buffer_manager_cluster_memory_fraction(&self) -> Option<Numeric> {
1727        *self.expect_value(
1728            &upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_CLUSTER_MEMORY_FRACTION,
1729        )
1730    }
1731
1732    pub fn upsert_rocksdb_write_buffer_manager_memory_bytes(&self) -> Option<usize> {
1733        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_MEMORY_BYTES)
1734    }
1735
1736    pub fn upsert_rocksdb_write_buffer_manager_allow_stall(&self) -> bool {
1737        *self.expect_value(&upsert_rocksdb::UPSERT_ROCKSDB_WRITE_BUFFER_MANAGER_ALLOW_STALL)
1738    }
1739
1740    pub fn persist_fast_path_limit(&self) -> usize {
1741        *self.expect_value(&PERSIST_FAST_PATH_LIMIT)
1742    }
1743
1744    /// Returns the `pg_source_connect_timeout` configuration parameter.
1745    pub fn pg_source_connect_timeout(&self) -> Duration {
1746        *self.expect_value(&PG_SOURCE_CONNECT_TIMEOUT)
1747    }
1748
1749    /// Returns the `pg_source_tcp_keepalives_retries` configuration parameter.
1750    pub fn pg_source_tcp_keepalives_retries(&self) -> u32 {
1751        *self.expect_value(&PG_SOURCE_TCP_KEEPALIVES_RETRIES)
1752    }
1753
1754    /// Returns the `pg_source_tcp_keepalives_idle` configuration parameter.
1755    pub fn pg_source_tcp_keepalives_idle(&self) -> Duration {
1756        *self.expect_value(&PG_SOURCE_TCP_KEEPALIVES_IDLE)
1757    }
1758
1759    /// Returns the `pg_source_tcp_keepalives_interval` configuration parameter.
1760    pub fn pg_source_tcp_keepalives_interval(&self) -> Duration {
1761        *self.expect_value(&PG_SOURCE_TCP_KEEPALIVES_INTERVAL)
1762    }
1763
1764    /// Returns the `pg_source_tcp_user_timeout` configuration parameter.
1765    pub fn pg_source_tcp_user_timeout(&self) -> Duration {
1766        *self.expect_value(&PG_SOURCE_TCP_USER_TIMEOUT)
1767    }
1768
1769    /// Returns the `pg_source_tcp_configure_server` configuration parameter.
1770    pub fn pg_source_tcp_configure_server(&self) -> bool {
1771        *self.expect_value(&PG_SOURCE_TCP_CONFIGURE_SERVER)
1772    }
1773
1774    /// Returns the `pg_source_snapshot_statement_timeout` configuration parameter.
1775    pub fn pg_source_snapshot_statement_timeout(&self) -> Duration {
1776        *self.expect_value(&PG_SOURCE_SNAPSHOT_STATEMENT_TIMEOUT)
1777    }
1778
1779    /// Returns the `pg_source_wal_sender_timeout` configuration parameter.
1780    pub fn pg_source_wal_sender_timeout(&self) -> Option<Duration> {
1781        *self.expect_value(&PG_SOURCE_WAL_SENDER_TIMEOUT)
1782    }
1783
1784    /// Returns the `pg_source_snapshot_collect_strict_count` configuration parameter.
1785    pub fn pg_source_snapshot_collect_strict_count(&self) -> bool {
1786        *self.expect_value(&PG_SOURCE_SNAPSHOT_COLLECT_STRICT_COUNT)
1787    }
1788    /// Returns the `pg_source_snapshot_fallback_to_strict_count` configuration parameter.
1789    pub fn pg_source_snapshot_fallback_to_strict_count(&self) -> bool {
1790        *self.expect_value(&PG_SOURCE_SNAPSHOT_FALLBACK_TO_STRICT_COUNT)
1791    }
1792    /// Returns the `pg_source_snapshot_collect_strict_count` configuration parameter.
1793    pub fn pg_source_snapshot_wait_for_count(&self) -> bool {
1794        *self.expect_value(&PG_SOURCE_SNAPSHOT_WAIT_FOR_COUNT)
1795    }
1796
1797    /// Returns the `mysql_source_tcp_keepalive` configuration parameter.
1798    pub fn mysql_source_tcp_keepalive(&self) -> Duration {
1799        *self.expect_value(&MYSQL_SOURCE_TCP_KEEPALIVE)
1800    }
1801
1802    /// Returns the `mysql_source_snapshot_max_execution_time` configuration parameter.
1803    pub fn mysql_source_snapshot_max_execution_time(&self) -> Duration {
1804        *self.expect_value(&MYSQL_SOURCE_SNAPSHOT_MAX_EXECUTION_TIME)
1805    }
1806
1807    /// Returns the `mysql_source_snapshot_lock_wait_timeout` configuration parameter.
1808    pub fn mysql_source_snapshot_lock_wait_timeout(&self) -> Duration {
1809        *self.expect_value(&MYSQL_SOURCE_SNAPSHOT_LOCK_WAIT_TIMEOUT)
1810    }
1811
1812    /// Returns the `mysql_source_connect_timeout` configuration parameter.
1813    pub fn mysql_source_connect_timeout(&self) -> Duration {
1814        *self.expect_value(&MYSQL_SOURCE_CONNECT_TIMEOUT)
1815    }
1816
1817    /// Returns the `ssh_check_interval` configuration parameter.
1818    pub fn ssh_check_interval(&self) -> Duration {
1819        *self.expect_value(&SSH_CHECK_INTERVAL)
1820    }
1821
1822    /// Returns the `ssh_connect_timeout` configuration parameter.
1823    pub fn ssh_connect_timeout(&self) -> Duration {
1824        *self.expect_value(&SSH_CONNECT_TIMEOUT)
1825    }
1826
1827    /// Returns the `ssh_keepalives_idle` configuration parameter.
1828    pub fn ssh_keepalives_idle(&self) -> Duration {
1829        *self.expect_value(&SSH_KEEPALIVES_IDLE)
1830    }
1831
1832    /// Returns the `kafka_socket_keepalive` configuration parameter.
1833    pub fn kafka_socket_keepalive(&self) -> bool {
1834        *self.expect_value(&KAFKA_SOCKET_KEEPALIVE)
1835    }
1836
1837    /// Returns the `kafka_socket_timeout` configuration parameter.
1838    pub fn kafka_socket_timeout(&self) -> Option<Duration> {
1839        *self.expect_value(&KAFKA_SOCKET_TIMEOUT)
1840    }
1841
1842    /// Returns the `kafka_transaction_timeout` configuration parameter.
1843    pub fn kafka_transaction_timeout(&self) -> Duration {
1844        *self.expect_value(&KAFKA_TRANSACTION_TIMEOUT)
1845    }
1846
1847    /// Returns the `kafka_socket_connection_setup_timeout` configuration parameter.
1848    pub fn kafka_socket_connection_setup_timeout(&self) -> Duration {
1849        *self.expect_value(&KAFKA_SOCKET_CONNECTION_SETUP_TIMEOUT)
1850    }
1851
1852    /// Returns the `kafka_fetch_metadata_timeout` configuration parameter.
1853    pub fn kafka_fetch_metadata_timeout(&self) -> Duration {
1854        *self.expect_value(&KAFKA_FETCH_METADATA_TIMEOUT)
1855    }
1856
1857    /// Returns the `kafka_progress_record_fetch_timeout` configuration parameter.
1858    pub fn kafka_progress_record_fetch_timeout(&self) -> Option<Duration> {
1859        *self.expect_value(&KAFKA_PROGRESS_RECORD_FETCH_TIMEOUT)
1860    }
1861
1862    /// Returns the `crdb_connect_timeout` configuration parameter.
1863    pub fn crdb_connect_timeout(&self) -> Duration {
1864        *self.expect_config_value(UncasedStr::new(
1865            mz_persist_client::cfg::CRDB_CONNECT_TIMEOUT.name(),
1866        ))
1867    }
1868
1869    /// Returns the `crdb_tcp_user_timeout` configuration parameter.
1870    pub fn crdb_tcp_user_timeout(&self) -> Duration {
1871        *self.expect_config_value(UncasedStr::new(
1872            mz_persist_client::cfg::CRDB_TCP_USER_TIMEOUT.name(),
1873        ))
1874    }
1875
1876    /// Returns the `storage_dataflow_max_inflight_bytes` configuration parameter.
1877    pub fn storage_dataflow_max_inflight_bytes(&self) -> Option<usize> {
1878        *self.expect_value(&STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES)
1879    }
1880
1881    /// Returns the `storage_dataflow_max_inflight_bytes_to_cluster_size_fraction` configuration parameter.
1882    pub fn storage_dataflow_max_inflight_bytes_to_cluster_size_fraction(&self) -> Option<Numeric> {
1883        *self.expect_value(&STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_TO_CLUSTER_SIZE_FRACTION)
1884    }
1885
1886    /// Returns the `storage_shrink_upsert_unused_buffers_by_ratio` configuration parameter.
1887    pub fn storage_shrink_upsert_unused_buffers_by_ratio(&self) -> usize {
1888        *self.expect_value(&STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO)
1889    }
1890
1891    /// Returns the `storage_dataflow_max_inflight_bytes_disk_only` configuration parameter.
1892    pub fn storage_dataflow_max_inflight_bytes_disk_only(&self) -> bool {
1893        *self.expect_value(&STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_DISK_ONLY)
1894    }
1895
1896    /// Returns the `storage_statistics_interval` configuration parameter.
1897    pub fn storage_statistics_interval(&self) -> Duration {
1898        *self.expect_value(&STORAGE_STATISTICS_INTERVAL)
1899    }
1900
1901    /// Returns the `storage_statistics_collection_interval` configuration parameter.
1902    pub fn storage_statistics_collection_interval(&self) -> Duration {
1903        *self.expect_value(&STORAGE_STATISTICS_COLLECTION_INTERVAL)
1904    }
1905
1906    /// Returns the `storage_record_source_sink_namespaced_errors` configuration parameter.
1907    pub fn storage_record_source_sink_namespaced_errors(&self) -> bool {
1908        *self.expect_value(&STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS)
1909    }
1910
1911    /// Returns the `persist_stats_filter_enabled` configuration parameter.
1912    pub fn persist_stats_filter_enabled(&self) -> bool {
1913        *self.expect_config_value(UncasedStr::new(
1914            mz_persist_client::stats::STATS_FILTER_ENABLED.name(),
1915        ))
1916    }
1917
1918    pub fn dyncfg_updates(&self) -> ConfigUpdates {
1919        let mut updates = ConfigUpdates::default();
1920        for entry in self.dyncfgs.entries() {
1921            let name = UncasedStr::new(entry.name());
1922            let val = match entry.val() {
1923                ConfigVal::Bool(_) => ConfigVal::from(*self.expect_config_value::<bool>(name)),
1924                ConfigVal::U32(_) => ConfigVal::from(*self.expect_config_value::<u32>(name)),
1925                ConfigVal::Usize(_) => ConfigVal::from(*self.expect_config_value::<usize>(name)),
1926                ConfigVal::OptUsize(_) => {
1927                    ConfigVal::from(*self.expect_config_value::<Option<usize>>(name))
1928                }
1929                ConfigVal::F64(_) => ConfigVal::from(*self.expect_config_value::<f64>(name)),
1930                ConfigVal::String(_) => {
1931                    ConfigVal::from(self.expect_config_value::<String>(name).clone())
1932                }
1933                ConfigVal::Duration(_) => {
1934                    ConfigVal::from(*self.expect_config_value::<Duration>(name))
1935                }
1936                ConfigVal::Json(_) => {
1937                    ConfigVal::from(self.expect_config_value::<serde_json::Value>(name).clone())
1938                }
1939            };
1940            updates.add_dynamic(entry.name(), val);
1941        }
1942        updates.apply(&self.dyncfgs);
1943        updates
1944    }
1945
1946    /// Returns the `metrics_retention` configuration parameter.
1947    pub fn metrics_retention(&self) -> Duration {
1948        *self.expect_value(&METRICS_RETENTION)
1949    }
1950
1951    /// Returns the `unsafe_mock_audit_event_timestamp` configuration parameter.
1952    pub fn unsafe_mock_audit_event_timestamp(&self) -> Option<mz_repr::Timestamp> {
1953        *self.expect_value(&UNSAFE_MOCK_AUDIT_EVENT_TIMESTAMP)
1954    }
1955
1956    /// Returns the `enable_rbac_checks` configuration parameter.
1957    pub fn enable_rbac_checks(&self) -> bool {
1958        *self.expect_value(&ENABLE_RBAC_CHECKS)
1959    }
1960
1961    /// Returns the `max_connections` configuration parameter.
1962    pub fn max_connections(&self) -> u32 {
1963        *self.expect_value(&MAX_CONNECTIONS)
1964    }
1965
1966    pub fn default_network_policy_name(&self) -> String {
1967        self.expect_value::<String>(&NETWORK_POLICY).clone()
1968    }
1969
1970    /// Returns the `superuser_reserved_connections` configuration parameter.
1971    pub fn superuser_reserved_connections(&self) -> u32 {
1972        *self.expect_value(&SUPERUSER_RESERVED_CONNECTIONS)
1973    }
1974
1975    pub fn keep_n_source_status_history_entries(&self) -> usize {
1976        *self.expect_value(&KEEP_N_SOURCE_STATUS_HISTORY_ENTRIES)
1977    }
1978
1979    pub fn keep_n_sink_status_history_entries(&self) -> usize {
1980        *self.expect_value(&KEEP_N_SINK_STATUS_HISTORY_ENTRIES)
1981    }
1982
1983    pub fn keep_n_privatelink_status_history_entries(&self) -> usize {
1984        *self.expect_value(&KEEP_N_PRIVATELINK_STATUS_HISTORY_ENTRIES)
1985    }
1986
1987    pub fn replica_status_history_retention_window(&self) -> Duration {
1988        *self.expect_value(&REPLICA_STATUS_HISTORY_RETENTION_WINDOW)
1989    }
1990
1991    /// Returns the `arrangement_exert_proportionality` configuration parameter.
1992    pub fn arrangement_exert_proportionality(&self) -> u32 {
1993        *self.expect_value(&ARRANGEMENT_EXERT_PROPORTIONALITY)
1994    }
1995
1996    /// Returns the `enable_storage_shard_finalization` configuration parameter.
1997    pub fn enable_storage_shard_finalization(&self) -> bool {
1998        *self.expect_value(&ENABLE_STORAGE_SHARD_FINALIZATION)
1999    }
2000
2001    pub fn enable_consolidate_after_union_negate(&self) -> bool {
2002        *self.expect_value(&ENABLE_CONSOLIDATE_AFTER_UNION_NEGATE)
2003    }
2004
2005    pub fn enable_reduce_reduction(&self) -> bool {
2006        *self.expect_value(&ENABLE_REDUCE_REDUCTION)
2007    }
2008
2009    /// Returns the `enable_default_connection_validation` configuration parameter.
2010    pub fn enable_default_connection_validation(&self) -> bool {
2011        *self.expect_value(&ENABLE_DEFAULT_CONNECTION_VALIDATION)
2012    }
2013
2014    /// Returns the `min_timestamp_interval` configuration parameter.
2015    pub fn min_timestamp_interval(&self) -> Duration {
2016        *self.expect_value(&MIN_TIMESTAMP_INTERVAL)
2017    }
2018    /// Returns the `max_timestamp_interval` configuration parameter.
2019    pub fn max_timestamp_interval(&self) -> Duration {
2020        *self.expect_value(&MAX_TIMESTAMP_INTERVAL)
2021    }
2022
2023    pub fn logging_filter(&self) -> CloneableEnvFilter {
2024        self.expect_value::<CloneableEnvFilter>(&LOGGING_FILTER)
2025            .clone()
2026    }
2027
2028    pub fn opentelemetry_filter(&self) -> CloneableEnvFilter {
2029        self.expect_value::<CloneableEnvFilter>(&OPENTELEMETRY_FILTER)
2030            .clone()
2031    }
2032
2033    pub fn logging_filter_defaults(&self) -> Vec<SerializableDirective> {
2034        self.expect_value::<Vec<SerializableDirective>>(&LOGGING_FILTER_DEFAULTS)
2035            .clone()
2036    }
2037
2038    pub fn opentelemetry_filter_defaults(&self) -> Vec<SerializableDirective> {
2039        self.expect_value::<Vec<SerializableDirective>>(&OPENTELEMETRY_FILTER_DEFAULTS)
2040            .clone()
2041    }
2042
2043    pub fn sentry_filters(&self) -> Vec<SerializableDirective> {
2044        self.expect_value::<Vec<SerializableDirective>>(&SENTRY_FILTERS)
2045            .clone()
2046    }
2047
2048    pub fn webhooks_secrets_caching_ttl_secs(&self) -> usize {
2049        *self.expect_value(&WEBHOOKS_SECRETS_CACHING_TTL_SECS)
2050    }
2051
2052    pub fn coord_slow_message_warn_threshold(&self) -> Duration {
2053        *self.expect_value(&COORD_SLOW_MESSAGE_WARN_THRESHOLD)
2054    }
2055
2056    pub fn grpc_client_http2_keep_alive_interval(&self) -> Duration {
2057        *self.expect_value(&grpc_client::HTTP2_KEEP_ALIVE_INTERVAL)
2058    }
2059
2060    pub fn grpc_client_http2_keep_alive_timeout(&self) -> Duration {
2061        *self.expect_value(&grpc_client::HTTP2_KEEP_ALIVE_TIMEOUT)
2062    }
2063
2064    pub fn grpc_connect_timeout(&self) -> Duration {
2065        *self.expect_value(&grpc_client::CONNECT_TIMEOUT)
2066    }
2067
2068    pub fn cluster_multi_process_replica_az_affinity_weight(&self) -> Option<i32> {
2069        *self.expect_value(&cluster_scheduling::CLUSTER_MULTI_PROCESS_REPLICA_AZ_AFFINITY_WEIGHT)
2070    }
2071
2072    pub fn cluster_soften_replication_anti_affinity(&self) -> bool {
2073        *self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY)
2074    }
2075
2076    pub fn cluster_soften_replication_anti_affinity_weight(&self) -> i32 {
2077        *self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT)
2078    }
2079
2080    pub fn cluster_enable_topology_spread(&self) -> bool {
2081        *self.expect_value(&cluster_scheduling::CLUSTER_ENABLE_TOPOLOGY_SPREAD)
2082    }
2083
2084    pub fn cluster_topology_spread_ignore_non_singular_scale(&self) -> bool {
2085        *self.expect_value(&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE)
2086    }
2087
2088    pub fn cluster_topology_spread_max_skew(&self) -> i32 {
2089        *self.expect_value(&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MAX_SKEW)
2090    }
2091
2092    pub fn cluster_topology_spread_soft(&self) -> bool {
2093        *self.expect_value(&cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_SOFT)
2094    }
2095
2096    pub fn cluster_soften_az_affinity(&self) -> bool {
2097        *self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY)
2098    }
2099
2100    pub fn cluster_soften_az_affinity_weight(&self) -> i32 {
2101        *self.expect_value(&cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY_WEIGHT)
2102    }
2103
2104    pub fn cluster_always_use_disk(&self) -> bool {
2105        *self.expect_value(&cluster_scheduling::CLUSTER_ALWAYS_USE_DISK)
2106    }
2107
2108    pub fn cluster_alter_check_ready_interval(&self) -> Duration {
2109        *self.expect_value(&cluster_scheduling::CLUSTER_ALTER_CHECK_READY_INTERVAL)
2110    }
2111
2112    pub fn cluster_check_scheduling_policies_interval(&self) -> Duration {
2113        *self.expect_value(&cluster_scheduling::CLUSTER_CHECK_SCHEDULING_POLICIES_INTERVAL)
2114    }
2115
2116    pub fn cluster_security_context_enabled(&self) -> bool {
2117        *self.expect_value(&cluster_scheduling::CLUSTER_SECURITY_CONTEXT_ENABLED)
2118    }
2119
2120    pub fn cluster_refresh_mv_compaction_estimate(&self) -> Duration {
2121        *self.expect_value(&cluster_scheduling::CLUSTER_REFRESH_MV_COMPACTION_ESTIMATE)
2122    }
2123
2124    /// Returns the `privatelink_status_update_quota_per_minute` configuration parameter.
2125    pub fn privatelink_status_update_quota_per_minute(&self) -> u32 {
2126        *self.expect_value(&PRIVATELINK_STATUS_UPDATE_QUOTA_PER_MINUTE)
2127    }
2128
2129    pub fn statement_logging_target_data_rate(&self) -> Option<usize> {
2130        *self.expect_value(&STATEMENT_LOGGING_TARGET_DATA_RATE)
2131    }
2132
2133    pub fn statement_logging_max_data_credit(&self) -> Option<usize> {
2134        *self.expect_value(&STATEMENT_LOGGING_MAX_DATA_CREDIT)
2135    }
2136
2137    /// Returns the `statement_logging_max_sample_rate` configuration parameter.
2138    pub fn statement_logging_max_sample_rate(&self) -> Numeric {
2139        *self.expect_value(&STATEMENT_LOGGING_MAX_SAMPLE_RATE)
2140    }
2141
2142    /// Returns the `statement_logging_default_sample_rate` configuration parameter.
2143    pub fn statement_logging_default_sample_rate(&self) -> Numeric {
2144        *self.expect_value(&STATEMENT_LOGGING_DEFAULT_SAMPLE_RATE)
2145    }
2146
2147    /// Returns the `enable_internal_statement_logging` configuration parameter.
2148    pub fn enable_internal_statement_logging(&self) -> bool {
2149        *self.expect_value(&ENABLE_INTERNAL_STATEMENT_LOGGING)
2150    }
2151
2152    /// Returns the `optimizer_stats_timeout` configuration parameter.
2153    pub fn optimizer_stats_timeout(&self) -> Duration {
2154        *self.expect_value(&OPTIMIZER_STATS_TIMEOUT)
2155    }
2156
2157    /// Returns the `optimizer_oneshot_stats_timeout` configuration parameter.
2158    pub fn optimizer_oneshot_stats_timeout(&self) -> Duration {
2159        *self.expect_value(&OPTIMIZER_ONESHOT_STATS_TIMEOUT)
2160    }
2161
2162    /// Returns the `webhook_concurrent_request_limit` configuration parameter.
2163    pub fn webhook_concurrent_request_limit(&self) -> usize {
2164        *self.expect_value(&WEBHOOK_CONCURRENT_REQUEST_LIMIT)
2165    }
2166
2167    /// Returns the `pg_timestamp_oracle_connection_pool_max_size` configuration parameter.
2168    pub fn pg_timestamp_oracle_connection_pool_max_size(&self) -> usize {
2169        *self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_SIZE)
2170    }
2171
2172    /// Returns the `pg_timestamp_oracle_connection_pool_max_wait` configuration parameter.
2173    pub fn pg_timestamp_oracle_connection_pool_max_wait(&self) -> Option<Duration> {
2174        *self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_WAIT)
2175    }
2176
2177    /// Returns the `pg_timestamp_oracle_connection_pool_ttl` configuration parameter.
2178    pub fn pg_timestamp_oracle_connection_pool_ttl(&self) -> Duration {
2179        *self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL)
2180    }
2181
2182    /// Returns the `pg_timestamp_oracle_connection_pool_ttl_stagger` configuration parameter.
2183    pub fn pg_timestamp_oracle_connection_pool_ttl_stagger(&self) -> Duration {
2184        *self.expect_value(&PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL_STAGGER)
2185    }
2186
2187    /// Returns the `user_storage_managed_collections_batch_duration` configuration parameter.
2188    pub fn user_storage_managed_collections_batch_duration(&self) -> Duration {
2189        *self.expect_value(&USER_STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION)
2190    }
2191
2192    pub fn force_source_table_syntax(&self) -> bool {
2193        *self.expect_value(&FORCE_SOURCE_TABLE_SYNTAX)
2194    }
2195
2196    pub fn optimizer_e2e_latency_warning_threshold(&self) -> Duration {
2197        *self.expect_value(&OPTIMIZER_E2E_LATENCY_WARNING_THRESHOLD)
2198    }
2199
2200    /// Returns whether the named variable is a compute configuration parameter
2201    /// (things that go in `ComputeParameters` and are sent to replicas via `UpdateConfiguration`
2202    /// commands).
2203    pub fn is_compute_config_var(&self, name: &str) -> bool {
2204        name == MAX_RESULT_SIZE.name() || self.is_dyncfg_var(name) || is_tracing_var(name)
2205    }
2206
2207    /// Returns whether the named variable is a metrics configuration parameter
2208    pub fn is_metrics_config_var(&self, name: &str) -> bool {
2209        self.is_dyncfg_var(name)
2210    }
2211
2212    /// Returns whether the named variable is a storage configuration parameter.
2213    pub fn is_storage_config_var(&self, name: &str) -> bool {
2214        name == PG_SOURCE_CONNECT_TIMEOUT.name()
2215            || name == PG_SOURCE_TCP_KEEPALIVES_IDLE.name()
2216            || name == PG_SOURCE_TCP_KEEPALIVES_INTERVAL.name()
2217            || name == PG_SOURCE_TCP_KEEPALIVES_RETRIES.name()
2218            || name == PG_SOURCE_TCP_USER_TIMEOUT.name()
2219            || name == PG_SOURCE_TCP_CONFIGURE_SERVER.name()
2220            || name == PG_SOURCE_SNAPSHOT_STATEMENT_TIMEOUT.name()
2221            || name == PG_SOURCE_WAL_SENDER_TIMEOUT.name()
2222            || name == PG_SOURCE_SNAPSHOT_COLLECT_STRICT_COUNT.name()
2223            || name == PG_SOURCE_SNAPSHOT_FALLBACK_TO_STRICT_COUNT.name()
2224            || name == PG_SOURCE_SNAPSHOT_WAIT_FOR_COUNT.name()
2225            || name == MYSQL_SOURCE_TCP_KEEPALIVE.name()
2226            || name == MYSQL_SOURCE_SNAPSHOT_MAX_EXECUTION_TIME.name()
2227            || name == MYSQL_SOURCE_SNAPSHOT_LOCK_WAIT_TIMEOUT.name()
2228            || name == MYSQL_SOURCE_CONNECT_TIMEOUT.name()
2229            || name == ENABLE_STORAGE_SHARD_FINALIZATION.name()
2230            || name == SSH_CHECK_INTERVAL.name()
2231            || name == SSH_CONNECT_TIMEOUT.name()
2232            || name == SSH_KEEPALIVES_IDLE.name()
2233            || name == KAFKA_SOCKET_KEEPALIVE.name()
2234            || name == KAFKA_SOCKET_TIMEOUT.name()
2235            || name == KAFKA_TRANSACTION_TIMEOUT.name()
2236            || name == KAFKA_SOCKET_CONNECTION_SETUP_TIMEOUT.name()
2237            || name == KAFKA_FETCH_METADATA_TIMEOUT.name()
2238            || name == KAFKA_PROGRESS_RECORD_FETCH_TIMEOUT.name()
2239            || name == STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES.name()
2240            || name == STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_TO_CLUSTER_SIZE_FRACTION.name()
2241            || name == STORAGE_DATAFLOW_MAX_INFLIGHT_BYTES_DISK_ONLY.name()
2242            || name == STORAGE_SHRINK_UPSERT_UNUSED_BUFFERS_BY_RATIO.name()
2243            || name == STORAGE_RECORD_SOURCE_SINK_NAMESPACED_ERRORS.name()
2244            || name == STORAGE_STATISTICS_INTERVAL.name()
2245            || name == STORAGE_STATISTICS_COLLECTION_INTERVAL.name()
2246            || name == USER_STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION.name()
2247            || is_upsert_rocksdb_config_var(name)
2248            || self.is_dyncfg_var(name)
2249            || is_tracing_var(name)
2250    }
2251
2252    /// Returns whether the named variable is a dyncfg configuration parameter.
2253    fn is_dyncfg_var(&self, name: &str) -> bool {
2254        self.dyncfgs.entries().any(|e| name == e.name())
2255    }
2256}
2257
2258pub fn is_tracing_var(name: &str) -> bool {
2259    name == LOGGING_FILTER.name()
2260        || name == LOGGING_FILTER_DEFAULTS.name()
2261        || name == OPENTELEMETRY_FILTER.name()
2262        || name == OPENTELEMETRY_FILTER_DEFAULTS.name()
2263        || name == SENTRY_FILTERS.name()
2264}
2265
2266/// Returns whether the named variable is a caching configuration parameter.
2267pub fn is_secrets_caching_var(name: &str) -> bool {
2268    name == WEBHOOKS_SECRETS_CACHING_TTL_SECS.name()
2269}
2270
2271fn is_upsert_rocksdb_config_var(name: &str) -> bool {
2272    name == upsert_rocksdb::UPSERT_ROCKSDB_COMPACTION_STYLE.name()
2273        || name == upsert_rocksdb::UPSERT_ROCKSDB_OPTIMIZE_COMPACTION_MEMTABLE_BUDGET.name()
2274        || name == upsert_rocksdb::UPSERT_ROCKSDB_LEVEL_COMPACTION_DYNAMIC_LEVEL_BYTES.name()
2275        || name == upsert_rocksdb::UPSERT_ROCKSDB_UNIVERSAL_COMPACTION_RATIO.name()
2276        || name == upsert_rocksdb::UPSERT_ROCKSDB_PARALLELISM.name()
2277        || name == upsert_rocksdb::UPSERT_ROCKSDB_COMPRESSION_TYPE.name()
2278        || name == upsert_rocksdb::UPSERT_ROCKSDB_BOTTOMMOST_COMPRESSION_TYPE.name()
2279        || name == upsert_rocksdb::UPSERT_ROCKSDB_BATCH_SIZE.name()
2280        || name == upsert_rocksdb::UPSERT_ROCKSDB_STATS_LOG_INTERVAL_SECONDS.name()
2281        || name == upsert_rocksdb::UPSERT_ROCKSDB_STATS_PERSIST_INTERVAL_SECONDS.name()
2282        || name == upsert_rocksdb::UPSERT_ROCKSDB_POINT_LOOKUP_BLOCK_CACHE_SIZE_MB.name()
2283        || name == upsert_rocksdb::UPSERT_ROCKSDB_SHRINK_ALLOCATED_BUFFERS_BY_RATIO.name()
2284}
2285
2286/// Returns whether the named variable is a Postgres/CRDB timestamp oracle
2287/// configuration parameter.
2288pub fn is_pg_timestamp_oracle_config_var(name: &str) -> bool {
2289    name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_SIZE.name()
2290        || name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_MAX_WAIT.name()
2291        || name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL.name()
2292        || name == PG_TIMESTAMP_ORACLE_CONNECTION_POOL_TTL_STAGGER.name()
2293        || name == CRDB_CONNECT_TIMEOUT.name()
2294        || name == CRDB_TCP_USER_TIMEOUT.name()
2295}
2296
2297/// Returns whether the named variable is a cluster scheduling config
2298pub fn is_cluster_scheduling_var(name: &str) -> bool {
2299    name == cluster_scheduling::CLUSTER_MULTI_PROCESS_REPLICA_AZ_AFFINITY_WEIGHT.name()
2300        || name == cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY.name()
2301        || name == cluster_scheduling::CLUSTER_SOFTEN_REPLICATION_ANTI_AFFINITY_WEIGHT.name()
2302        || name == cluster_scheduling::CLUSTER_ENABLE_TOPOLOGY_SPREAD.name()
2303        || name == cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_IGNORE_NON_SINGULAR_SCALE.name()
2304        || name == cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_MAX_SKEW.name()
2305        || name == cluster_scheduling::CLUSTER_TOPOLOGY_SPREAD_SOFT.name()
2306        || name == cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY.name()
2307        || name == cluster_scheduling::CLUSTER_SOFTEN_AZ_AFFINITY_WEIGHT.name()
2308        || name == cluster_scheduling::CLUSTER_ALWAYS_USE_DISK.name()
2309}
2310
2311/// Returns whether the named variable is an HTTP server related config var.
2312pub fn is_http_config_var(name: &str) -> bool {
2313    name == WEBHOOK_CONCURRENT_REQUEST_LIMIT.name()
2314}
2315
2316/// Set of [`SystemVar`]s that can also get set at a per-Session level.
2317///
2318/// TODO(parkmycar): Instead of a separate list, make this a field on VarDefinition.
2319static SESSION_SYSTEM_VARS: LazyLock<BTreeMap<&'static UncasedStr, &'static VarDefinition>> =
2320    LazyLock::new(|| {
2321        [
2322            &APPLICATION_NAME,
2323            &CLIENT_ENCODING,
2324            &CLIENT_MIN_MESSAGES,
2325            &CLUSTER,
2326            &CLUSTER_REPLICA,
2327            &DEFAULT_CLUSTER_REPLICATION_FACTOR,
2328            &CURRENT_OBJECT_MISSING_WARNINGS,
2329            &DATABASE,
2330            &DATE_STYLE,
2331            &EXTRA_FLOAT_DIGITS,
2332            &INTEGER_DATETIMES,
2333            &INTERVAL_STYLE,
2334            &REAL_TIME_RECENCY_TIMEOUT,
2335            &SEARCH_PATH,
2336            &STANDARD_CONFORMING_STRINGS,
2337            &STATEMENT_TIMEOUT,
2338            &IDLE_IN_TRANSACTION_SESSION_TIMEOUT,
2339            &TIMEZONE,
2340            &TRANSACTION_ISOLATION,
2341            &MAX_QUERY_RESULT_SIZE,
2342        ]
2343        .into_iter()
2344        .map(|var| (UncasedStr::new(var.name()), var))
2345        .collect()
2346    });
2347
2348// Provides a wrapper to express that a particular `ServerVar` is meant to be used as a feature
2349/// flag.
2350#[derive(Debug)]
2351pub struct FeatureFlag {
2352    pub flag: &'static VarDefinition,
2353    pub feature_desc: &'static str,
2354}
2355
2356impl FeatureFlag {
2357    /// Returns an error unless the feature flag is enabled in the provided
2358    /// `system_vars`.
2359    pub fn require(&'static self, system_vars: &SystemVars) -> Result<(), VarError> {
2360        match *system_vars.expect_value::<bool>(self.flag) {
2361            true => Ok(()),
2362            false => Err(VarError::RequiresFeatureFlag { feature_flag: self }),
2363        }
2364    }
2365}
2366
2367impl PartialEq for FeatureFlag {
2368    fn eq(&self, other: &FeatureFlag) -> bool {
2369        self.flag.name() == other.flag.name()
2370    }
2371}
2372
2373impl Eq for FeatureFlag {}
2374
2375impl Var for MzVersion {
2376    fn name(&self) -> &'static str {
2377        MZ_VERSION_NAME.as_str()
2378    }
2379
2380    fn value(&self) -> String {
2381        self.build_info
2382            .human_version(self.helm_chart_version.clone())
2383    }
2384
2385    fn description(&self) -> &'static str {
2386        "Shows the Materialize server version (Materialize)."
2387    }
2388
2389    fn type_name(&self) -> Cow<'static, str> {
2390        String::type_name()
2391    }
2392
2393    fn visible(&self, _: &User, _: &SystemVars) -> Result<(), VarError> {
2394        Ok(())
2395    }
2396}
2397
2398impl Var for User {
2399    fn name(&self) -> &'static str {
2400        IS_SUPERUSER_NAME.as_str()
2401    }
2402
2403    fn value(&self) -> String {
2404        self.is_superuser().format()
2405    }
2406
2407    fn description(&self) -> &'static str {
2408        "Reports whether the current session is a superuser (PostgreSQL)."
2409    }
2410
2411    fn type_name(&self) -> Cow<'static, str> {
2412        bool::type_name()
2413    }
2414
2415    fn visible(&self, _: &User, _: &SystemVars) -> Result<(), VarError> {
2416        Ok(())
2417    }
2418}